DuckDB + Polars (dagster-duckdb-polars)
This library provides an integration with the DuckDB database and Polars data processing library.
Related guides:
- dagster_duckdb_polars.DuckDBPolarsIOManager IOManagerDefinition
- An I/O manager definition that reads inputs from and writes Polars DataFrames to DuckDB. When using the DuckDBPolarsIOManager, any inputs and outputs without type annotations will be loaded as Polars DataFrames. - Returns: IOManagerDefinition Examples: - from dagster_duckdb_polars import DuckDBPolarsIOManager
 @asset(
 key_prefix=["my_schema"] # will be used as the schema in DuckDB
 )
 def my_table() -> pl.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources=\{"io_manager": DuckDBPolarsIOManager(database="my_db.duckdb")}
 )- You can set a default schema to store the assets using the - schemaconfiguration value of the DuckDB I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.- defs = Definitions(
 assets=[my_table],
 resources=\{"io_manager": DuckDBPolarsIOManager(database="my_db.duckdb", schema="my_schema")}
 )- On individual assets, you an also specify the schema where they should be stored using metadata or by adding a - key_prefixto the asset key. If both- key_prefixand metadata are defined, the metadata will take precedence.- @asset(
 key_prefix=["my_schema"] # will be used as the schema in duckdb
 )
 def my_table() -> pl.DataFrame:
 ...
 @asset(
 metadata=\{"schema": "my_schema"} # will be used as the schema in duckdb
 )
 def my_other_table() -> pl.DataFrame:
 ...- For ops, the schema can be specified by including a “schema” entry in output metadata. - @op(
 out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
 )
 def make_my_table() -> pl.DataFrame:
 ...- If none of these is provided, the schema will default to “public”. - To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn. - @asset(
 ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
 )
 def my_table_a(my_table: pl.DataFrame) -> pl.DataFrame:
 # my_table will just contain the data from column "a"
 ...
- class dagster_duckdb_polars.DuckDBPolarsTypeHandler
- Stores and loads Polars DataFrames in DuckDB. - To use this type handler, return it from the - type_handlersmethod of an I/O manager that inherits from ``DuckDBIOManager`.- Example: - from dagster_duckdb import DuckDBIOManager
 from dagster_duckdb_polars import DuckDBPolarsTypeHandler
 class MyDuckDBIOManager(DuckDBIOManager):
 @staticmethod
 def type_handlers() -> Sequence[DbTypeHandler]:
 return [DuckDBPolarsTypeHandler()]
 @asset(
 key_prefix=["my_schema"] # will be used as the schema in duckdb
 )
 def my_table() -> pl.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources=\{"io_manager": MyDuckDBIOManager(database="my_db.duckdb")}
 )
Legacy
- dagster_duckdb_polars.duckdb_polars_io_manager IOManagerDefinition
- An I/O manager definition that reads inputs from and writes polars dataframes to DuckDB. When using the duckdb_polars_io_manager, any inputs and outputs without type annotations will be loaded as Polars DataFrames. - Returns: IOManagerDefinition Examples: - from dagster_duckdb_polars import duckdb_polars_io_manager
 @asset(
 key_prefix=["my_schema"] # will be used as the schema in DuckDB
 )
 def my_table() -> pl.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources=\{"io_manager": duckdb_polars_io_manager.configured(\{"database": "my_db.duckdb"})}
 )- You can set a default schema to store the assets using the - schemaconfiguration value of the DuckDB I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.- defs = Definitions(
 assets=[my_table],
 resources=\{"io_manager": duckdb_polars_io_manager.configured(\{"database": "my_db.duckdb", "schema": "my_schema"})}
 )- On individual assets, you an also specify the schema where they should be stored using metadata or by adding a - key_prefixto the asset key. If both- key_prefixand metadata are defined, the metadata will take precedence.- @asset(
 key_prefix=["my_schema"] # will be used as the schema in duckdb
 )
 def my_table() -> pl.DataFrame:
 ...
 @asset(
 metadata=\{"schema": "my_schema"} # will be used as the schema in duckdb
 )
 def my_other_table() -> pl.DataFrame:
 ...- For ops, the schema can be specified by including a “schema” entry in output metadata. - @op(
 out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
 )
 def make_my_table() -> pl.DataFrame:
 ...- If none of these is provided, the schema will default to “public”. - To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn. - @asset(
 ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
 )
 def my_table_a(my_table: pl.DataFrame) -> pl.DataFrame:
 # my_table will just contain the data from column "a"
 ...