Build pipelines with Kubernetes
This article focuses on using an out-of-the-box Kubernetes resource. For further customization, use the open_pipes_session approach instead.
This article covers how to use Dagster Pipes with Dagster's Kubernetes integration to launch Kubernetes pods and execute external code.
Pipes allows your code to interact with Dagster outside of a full Dagster environment. Instead, the environment only needs to contain dagster-pipes, a single-file Python package with no dependencies that can be installed from PyPI or easily vendored. dagster-pipes handles streaming stdout/stderr and Dagster events back to the orchestration process.
Prerequisites
- In the Dagster environment, you'll need to install the following packages:
pip install dagster dagster-webserver dagster-k8s
Refer to the Dagster installation guide for more info.
- A Kubernetes cluster. This can be an existing cluster, or, if you're working locally, you can use kind or Docker Desktop.
Step 1: Define the external Kubernetes code container
In this step, you'll create a Kubernetes container image that runs some code that uses dagster-pipes.
Step 1.1: Write a Python script
First, you'll write a Python script that uses dagster-pipes and is executed in a container via Kubernetes:
# my_python_script.py
from dagster_pipes import open_dagster_pipes
with open_dagster_pipes() as pipes:
    # Stream log message back to Dagster
    pipes.log.info(f"Using some_parameter value: {pipes.get_extra('some_parameter')}")
    # ... your code that computes and persists the asset
    pipes.report_asset_materialization(
        metadata={
            "some_metric": {"raw_value": 2, "type": "int"}
        },
        data_version="alpha",
    )
Let's review what this code does:
- 
Imports open_dagster_pipesfromdagster_pipes
- 
Initializes the Dagster Pipes context ( open_dagster_pipes), which yields an instance ofPipesContextcalledpipes.We're using the default context loader ( PipesDefaultContextLoader) and message writer (PipesDefaultMessageWriter) in this example. These objects establish communication between the orchestration and external process. On the orchestration end, these match a correspondingPipesContextInjectorandPipesMessageReader, which are instantiated inside thePipesK8sClient.
- 
Inside the body of the context manager ( open_dagster_pipes), retrieve a log and report an asset materialization. These calls use the temporary communications channels established byPipesDefaultContextLoaderandPipesDefaultMessageWriter. To see the full range of what you can do with thePipesContext, see the API docs or the general Pipes documentation.
At this point you can execute the rest of your Kubernetes code as normal, invoking various PipesContext APIs as needed.
Step 1.2: Define and build the container image
Next, you'll package the script into a container image using a Dockerfile. For example:
FROM python:3.10-slim
RUN pip install dagster-pipes
COPY my_python_script.py .
ENTRYPOINT [ "python","my_python_script.py" ]
Then, build the image:
docker build -t pipes-example:v1 .
Note: Depending on the Kubernetes setup you're using, you may need to upload the container image to a registry or otherwise make it available to the cluster. For example: kind load docker-image pipes-example:v1
Step 2: Create the Dagster objects
In this step, you'll create a Dagster asset that, when materialized, opens a Dagster pipes session and spins up a Kubernetes pod to execute the container created in the previous step.
Step 2.1: Define the Dagster asset
In your Dagster project, create a file named dagster_k8s_pipes.py and paste in the following code:
# dagster_k8s_pipes.py
from dagster import AssetExecutionContext, Definitions, asset
from dagster_k8s import PipesK8sClient
@asset
def k8s_pipes_asset(context: AssetExecutionContext, k8s_pipes_client: PipesK8sClient):
  return k8s_pipes_client.run(
      context=context,
      image="pipes-example:v1",
      extras={
            "some_parameter": 1
      }
  ).get_materialize_result()
Here's what we did in this example:
- 
Created an asset named k8s_pipes_asset
- 
Provided AssetExecutionContextas thecontextargument to the asset. This object provides access to system APIs such as resources, config, and logging.
- 
Specified a resource for the asset to use, PipesK8sClient, which is a pre-built Dagster resource that allows you to quickly get Pipes working with Kubernetes.We also specified the following for the resource: - context- The asset's- context(- AssetExecutionContext) data
- image- The Kubernetes image we created in Step 1
 These arguments are passed to the runmethod ofPipesK8sClient, which submits the provided cluster information to the Kubernetes API and then runs the specifiedimage.
- 
Returned a MaterializeResultobject representing the result of execution. This is obtained by callingget_materialize_resulton thePipesClientCompletedInvocationobject returned byrunafter the execution in Kubernetes has completed.
Depending on your Kubernetes setup, there may be a few additional things you need to do:
- If the default behavior doesn't target the correct cluster, supply the load_incluster_config,kubeconfig_file, andkube_contextarguments onPipesK8sClient
- If you need to alter default spec behaviors, use arguments on PipesK8sClient.runsuch asbase_pod_spec.
Step 2.2: Create Dagster Definitions
Next, you'll add the asset and Kubernetes resource to your project's code location via the Definitions object. This makes the resource available to other Dagster definitions in the project.
Copy and paste the following to the bottom of dagster_k8s_pipes.py:
# dagster_k8s_pipes.py
defs = Definitions(
  assets=[k8s_pipes_asset],
  resources={
    "k8s_pipes_client": PipesK8sClient(),
  },
)
At this point, dagster_k8s_pipes.py should look like the following:
# dagster_k8s_pipes.py
from dagster import AssetExecutionContext, Definitions, asset
from dagster_k8s import PipesK8sClient
@asset
def k8s_pipes_asset(context: AssetExecutionContext, k8s_pipes_client: PipesK8sClient):
  return k8s_pipes_client.run(
      context=context,
      image="pipes-example:v1",
      extras={
            "some_parameter": 1
      }
  ).get_materialize_result()
defs = Definitions(
  assets=[k8s_pipes_asset],
  resources={
    "k8s_pipes_client": PipesK8sClient(),
  },
)
Step 3: Launch the Kubernetes container from the Dagster UI
In this step, you'll run the Kubernetes container you defined in Step 1 from the Dagster UI.
- 
In a new command line session, run the following to start the UI: dagster dev -f dagster_k8s_pipes.py
- 
Navigate to localhost:3000, where you should see the UI. 
- 
Click Materialize near the top right corner of the page, then click View on the Launched Run popup. Wait for the run to complete, and the event log should look like this: 