GCP + PySpark (dagster-gcp-pyspark)
Google BigQuery
This library provides an integration with the BigQuery database and PySpark data processing library.
Related Guides:
- dagster_gcp_pyspark.BigQueryPySparkIOManager IOManagerDefinition
- An I/O manager definition that reads inputs from and writes PySpark DataFrames to BigQuery. - Returns: IOManagerDefinition Examples: - from dagster_gcp_pyspark import BigQueryPySparkIOManager
 from dagster import Definitions, EnvVar
 @asset(
 key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
 )
 def my_table() -> pyspark.sql.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources=\{
 "io_manager": BigQueryPySparkIOManager(project=EnvVar("GCP_PROJECT"))
 }
 )- You can set a default dataset to store the assets using the - datasetconfiguration value of the BigQuery I/O Manager. This dataset will be used if no other dataset is specified directly on an asset or op.- defs = Definitions(
 assets=[my_table],
 resources=\{
 "io_manager": BigQueryPySparkIOManager(project=EnvVar("GCP_PROJECT", dataset="my_dataset")
 }
 )- On individual assets, you an also specify the dataset where they should be stored using metadata or by adding a - key_prefixto the asset key. If both- key_prefixand metadata are defined, the metadata will take precedence.- @asset(
 key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
 )
 def my_table() -> pyspark.sql.DataFrame:
 ...
 @asset(
 # note that the key needs to be "schema"
 metadata=\{"schema": "my_dataset"} # will be used as the dataset in BigQuery
 )
 def my_other_table() -> pyspark.sql.DataFrame:
 ...- For ops, the dataset can be specified by including a “schema” entry in output metadata. - @op(
 out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
 )
 def make_my_table() -> pyspark.sql.DataFrame:
 ...- If none of these is provided, the dataset will default to “public”. - To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn. - @asset(
 ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
 )
 def my_table_a(my_table: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
 # my_table will just contain the data from column "a"
 ...- If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the “gcp_credentials” configuration. Dagster will store this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS to point to the file. After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64 
- class dagster_gcp_pyspark.BigQueryPySparkTypeHandler
- Plugin for the BigQuery I/O Manager that can store and load PySpark DataFrames as BigQuery tables. - Examples: - from dagster_gcp import BigQueryIOManager
 from dagster_bigquery_pandas import BigQueryPySparkTypeHandler
 from dagster import Definitions, EnvVar
 class MyBigQueryIOManager(BigQueryIOManager):
 @staticmethod
 def type_handlers() -> Sequence[DbTypeHandler]:
 return [BigQueryPySparkTypeHandler()]
 @asset(
 key_prefix=["my_dataset"] # my_dataset will be used as the dataset in BigQuery
 )
 def my_table() -> pyspark.sql.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources=\{
 "io_manager": MyBigQueryIOManager(project=EnvVar("GCP_PROJECT"))
 }
 )
Legacy
- dagster_gcp_pyspark.bigquery_pyspark_io_manager IOManagerDefinition
- An I/O manager definition that reads inputs from and writes PySpark DataFrames to BigQuery. - Returns: IOManagerDefinition Examples: - from dagster_gcp_pyspark import bigquery_pyspark_io_manager
 from dagster import Definitions
 @asset(
 key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
 )
 def my_table() -> pd.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources=\{
 "io_manager": bigquery_pyspark_io_manager.configured(\{
 "project" : \{"env": "GCP_PROJECT"}
 })
 }
 )- You can set a default dataset to store the assets using the - datasetconfiguration value of the BigQuery I/O Manager. This dataset will be used if no other dataset is specified directly on an asset or op.- defs = Definitions(
 assets=[my_table],
 resources=\{
 "io_manager": bigquery_pandas_io_manager.configured(\{
 "project" : \{"env": "GCP_PROJECT"}
 "dataset": "my_dataset"
 })
 }
 )- On individual assets, you an also specify the dataset where they should be stored using metadata or by adding a - key_prefixto the asset key. If both- key_prefixand metadata are defined, the metadata will take precedence.- @asset(
 key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
 )
 def my_table() -> pyspark.sql.DataFrame:
 ...
 @asset(
 # note that the key needs to be "schema"
 metadata=\{"schema": "my_dataset"} # will be used as the dataset in BigQuery
 )
 def my_other_table() -> pyspark.sql.DataFrame:
 ...- For ops, the dataset can be specified by including a “schema” entry in output metadata. - @op(
 out=\{"my_table": Out(metadata=\{"schema": "my_schema"})}
 )
 def make_my_table() -> pyspark.sql.DataFrame:
 ...- If none of these is provided, the dataset will default to “public”. - To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn. - @asset(
 ins=\{"my_table": AssetIn("my_table", metadata=\{"columns": ["a"]})}
 )
 def my_table_a(my_table: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
 # my_table will just contain the data from column "a"
 ...- If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the “gcp_credentials” configuration. Dagster will store this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS to point to the file. After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64