dagster-airlift integration reference
dagster-airlift is a toolkit for observing and migrating Airflow DAGs within Dagster. This reference page provides additional information for working with dagster-airlift that is not provided within the tutorial. You should start by reading the dagster-airlift tutorial before using this reference page.
- Migration best practices
- Supporting custom authorization
- Dagster Plus Authorization
- Dealing with changing Airflow
- Automating changes to code locations
- Peering to multiple Airflow instances
- Customizing DAG proxying operator
Migration best practices
When migrating Airflow DAGs to Dagster, we recommend a few best practices:
- Create separate packages for the Airflow and Dagster deployments. Airflow has complex dependencies and can be difficult to install in the same environment as Dagster.
- Create user acceptance tests in Dagster before migrating. This will help you catch issues easily during migration.
- Understand the rollback procedure for your migration. When proxying execution to Dagster from Airflow, you can always rollback with a single line-of-code change in the Airflow DAG.
Supporting custom authorization
If your Dagster deployment lives behind a custom auth backend, you can customize the Airflow-to-Dagster proxying behavior to authenticate to your backend. proxying_to_dagster can take a parameter dagster_operator_klass, which allows you to define a custom BaseProxyTasktoDagsterOperator class. This allows you to override how a session is created. Let's say for example, your Dagster installation requires an access key to be set whenever a request is made, and that access key is set in an Airflow Variable called my_api_key. We can create a custom BaseProxyTasktoDagsterOperator subclass which will retrieve that variable value and set it on the session, so that any requests to Dagster's graphql API will be made using that api key.
from pathlib import Path
import requests
from airflow import DAG
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
class CustomProxyToDagsterOperator(BaseProxyTaskToDagsterOperator):
    def get_dagster_session(self, context: Context) -> requests.Session:
        if "var" not in context:
            raise ValueError("No variables found in context")
        api_key = context["var"]["value"].get("my_api_key")
        session = requests.Session()
        session.headers.update({"Authorization": f"Bearer {api_key}"})
        return session
    def get_dagster_url(self, context: Context) -> str:
        return "https://dagster.example.com/"
dag = DAG(
    dag_id="custom_proxy_example",
)
# At the end of your dag file
proxying_to_dagster(
    global_vars=globals(),
    proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
    build_from_task_fn=CustomProxyToDagsterOperator.build_from_task,
)
Dagster+ authorization
You can use a custom proxy operator to establish a connection to a Dagster plus deployment. The below example proxies to Dagster Plus using organization name, deployment name, and user token set as Airflow Variables. To set a Dagster+ user token, see "Managing user tokens in Dagster+".
import requests
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyTaskToDagsterOperator
class DagsterCloudProxyOperator(BaseProxyTaskToDagsterOperator):
    def get_variable(self, context: Context, var_name: str) -> str:
        if "var" not in context:
            raise ValueError("No variables found in context")
        return context["var"]["value"][var_name]
    def get_dagster_session(self, context: Context) -> requests.Session:
        dagster_cloud_user_token = self.get_variable(context, "dagster_cloud_user_token")
        session = requests.Session()
        session.headers.update({"Dagster-Cloud-Api-Token": dagster_cloud_user_token})
        return session
    def get_dagster_url(self, context: Context) -> str:
        org_name = self.get_variable(context, "dagster_plus_organization_name")
        deployment_name = self.get_variable(context, "dagster_plus_deployment_name")
        return f"https://{org_name}.dagster.plus/{deployment_name}"
Dealing with changing Airflow
In order to make spin-up more efficient, dagster-airlift caches the state of the Airflow instance in the dagster database, so that repeat fetches of the code location don't require additional calls to Airflow's rest API. However, this means that the Dagster definitions can potentially fall out of sync with Airflow. Here are a few different ways this can manifest:
- A new Airflow DAG is added. The lineage information does not show up for this dag, and materializations are not recorded.
- A DAGis removed. The polling sensor begins failing, because there exist assets which expect that DAGto exist.
- The task dependency structure within a DAGchanges. This may result in unsyncedstatuses in Dagster, or missing materializations. This is not an exhaustive list of problems, but most of the time the tell is that materializations are missing, or assets are missing. When you find yourself in this state, you can forcedagster-airliftto reload Airflow state by reloading the code location. To do this, go to theDeploymenttab on the top nav, and clickRedeployon the code location relevant to your asset. After some time, the code location should be reloaded with refreshed state from Airflow.
Automating changes to code locations
If changes to your Airflow instance are controlled via a ci/cd process, you can add a step to automatically induce a redeploy of the relevant code location. To learn how to use the Dagster GraphQL client to do this, see the Dagster GraphQL client docs.
Peering to multiple Airflow instances
Airlift supports peering to multiple Airflow instances, as you can invoke build_defs_from_airflow_instance multiple times and combine them with Definitions.merge:
from dagster import Definitions
from dagster_airlift.core import AirflowInstance, build_defs_from_airflow_instance
defs = Definitions.merge(
    build_defs_from_airflow_instance(
        airflow_instance=AirflowInstance(
            auth_backend=BasicAuthBackend(
                webserver_url="http://yourcompany.com/instance_one",
                username="admin",
                password="admin",
            ),
            name="airflow_instance_one",
        )
    ),
    build_defs_from_airflow_instance(
        airflow_instance=AirflowInstance(
            auth_backend=BasicAuthBackend(
                webserver_url="http://yourcompany.com/instance_two",
                username="admin",
                password="admin",
            ),
            name="airflow_instance_two",
        )
    ),
)
Customizing DAG proxying operator
Similar to how we can customize the operator we construct on a per-DAGbasis, we can customize the operator we construct on a per-DAGbasis. We can use the build_from_dag_fn argument of proxying_to_dagster to provide a custom operator in place of the default.
For example, in the following example we can see that the operator is customized to provide an authorization header which authenticates Dagster.
from pathlib import Path
import requests
from airflow import DAG
from airflow.utils.context import Context
from dagster_airlift.in_airflow import BaseProxyDAGToDagsterOperator, proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
class CustomProxyToDagsterOperator(BaseProxyDAGToDagsterOperator):
    def get_dagster_session(self, context: Context) -> requests.Session:
        if "var" not in context:
            raise ValueError("No variables found in context")
        api_key = context["var"]["value"].get("my_api_key")
        session = requests.Session()
        session.headers.update({"Authorization": f"Bearer {api_key}"})
        return session
    def get_dagster_url(self, context: Context) -> str:
        return "https://dagster.example.com/"
    # This method controls how the operator is built from the dag.
    @classmethod
    def build_from_dag(cls, dag: DAG):
        return CustomProxyToDagsterOperator(dag=dag, task_id="OVERRIDDEN")
dag = DAG(
    dag_id="custom_dag_level_proxy_example",
)
# At the end of your dag file
proxying_to_dagster(
    global_vars=globals(),
    proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
    build_from_dag_fn=CustomProxyToDagsterOperator.build_from_dag,
)
BaseProxyDAGToDagsterOperator has three abstract methods which must be implemented:
- get_dagster_session, which controls the creation of a valid session to access the Dagster graphql API.
- get_dagster_url, which retrieves the domain at which the dagster webserver lives.
- build_from_dag, which controls how the proxying task is constructed from the provided DAG.