Observing multiple Airflow instances
At this point, you should have finished the setup step, and now the example code set up with a fresh virtual environment, and two Airflow instances running locally. Now, we can start writing Dagster code.
Observing the Airflow instances
We'll start by creating asset representations of our DAGs in Dagster.
Create a new shell and navigate to the root of the tutorial directory. You will need to set up the dagster-airlift package in your Dagster environment:
source .venv/bin/activate
uv pip install 'dagster-airlift[core]' dagster-webserver dagster
Observing the warehouse Airflow instance
Next, we'll declare a reference to our warehouse Airflow instance, which is running at http://localhost:8081.
from dagster_airlift.core import AirflowBasicAuthBackend, AirflowInstance
warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)
Now, we can use the load_airflow_dag_asset_specs function to create asset representations of the DAGs in the warehouse Airflow instance:
from dagster_airlift.core import load_airflow_dag_asset_specs
assets = load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
)
Now, let's add these assets to a Definitions object:
from dagster import Definitions
defs = Definitions(assets=assets)
Let's set up some environment variables, and then point Dagster to see the asset created from our Airflow instance:
# Set up environment variables to point to the airlift-federation-tutorial directory on your machine
export TUTORIAL_EXAMPLE_DIR=$(pwd)
export DAGSTER_HOME="$TUTORIAL_EXAMPLE_DIR/.dagster_home"
dagster dev -f airlift_federation_tutorial/dagster_defs/definitions.py
If we navigate to the Dagster UI (running at http://localhost:3000), we should see the assets created from the warehouse Airflow instance.

There's a lot of DAGs in this instance, and we only want to focus on the load_customers DAG. Let's filter the assets to only include the load_customers DAG:
load_customers = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
Let's instead add this asset to our Definitions object:
defs = Definitions(assets=[load_customers])
Now, our Dagster environment only includes the load_customers DAG from the warehouse Airflow instance.

Finally, we'll use a sensor to poll the warehouse Airflow instance for new runs. This way, whenever we get a successful run of the load_customers DAG, we'll see a materialization in the Dagster UI:
from dagster_airlift.core import build_airflow_polling_sensor
warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers],
airflow_instance=warehouse_airflow_instance,
)
Now, we can add this sensor to our Definitions object:
defs = Definitions(assets=[load_customers], sensors=[warehouse_sensor])
You can test this by navigating to the Airflow UI at localhost:8081, and triggering a run of the load_customers DAG. When the run completes, you should see a materialization in the Dagster UI.

Observing the metrics Airflow instance
We can repeat the same process for the customer_metrics DAG in the metrics Airflow instance, which runs at http://localhost:8082. We'll leave this as an exercise to test your understanding.
When complete, your code should look like this:
from dagster import Definitions
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
build_airflow_polling_sensor,
load_airflow_dag_asset_specs,
)
warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)
metrics_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="metrics",
)
load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
)
warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
airflow_instance=warehouse_airflow_instance,
)
metrics_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
airflow_instance=metrics_airflow_instance,
)
defs = Definitions(
assets=[load_customers_dag_asset, customer_metrics_dag_asset],
sensors=[warehouse_sensor, metrics_sensor],
)
Adding lineage between load_customers and customer_metrics
Now that we have both DAGs loaded into Dagster, we can observe the cross-dag lineage between them. To do this, we'll use the replace_attributes function to add a dependency from the load_customers asset to the customer_metrics asset:
customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
deps=[load_customers],
)
Now, after adding the updated customer_metrics_dag_asset to our Definitions object, we should see the lineage between the two DAGs in the Dagster UI.

Next steps
Next, we'll federate the execution of our DAGs across both Airflow instances. Follow along here.