Execution
Materializing Assets
- dagster.materialize
- Executes a single-threaded, in-process run which materializes provided assets. - By default, will materialize assets to the local filesystem. - Parameters: - 
assets (Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset]]) – The assets to materialize. Unless you’re using deps or non_argument_deps, you must also include all assets that are upstream of the assets that you want to materialize. This is because those upstream asset definitions have information that is needed to load their contents while materializing the downstream assets. 
- 
resources (Optional[Mapping[str, object]]) – The resources needed for execution. Can provide resource instances 
- 
run_config (Optional[Any]) – The run config to use for the run that materializes the assets. 
- 
partition_key – (Optional[str]) 
- 
tags (Optional[Mapping[str, str]]) – Tags for the run. 
- 
selection (Optional[Union[str, Sequence[str], Sequence[AssetKey], Sequence[Union[AssetsDefinition, SourceAsset]], AssetSelection]]) – A sub-selection of assets to materialize. If not provided, then all assets will be materialized. 
 - Returns: The result of the execution.Return type: ExecuteInProcessResult Examples: - @asset
 def asset1():
 ...
 @asset
 def asset2(asset1):
 ...
 # executes a run that materializes asset1 and then asset2
 materialize([asset1, asset2])
 # executes a run that materializes just asset2, loading its input from asset1
 materialize([asset1, asset2], selection=[asset2])
- 
- dagster.materialize_to_memory
- Executes a single-threaded, in-process run which materializes provided assets in memory. - Will explicitly use - mem_io_manager()for all required io manager keys. If any io managers are directly provided using the resources argument, a- DagsterInvariantViolationErrorwill be thrown.- Parameters: - 
assets (Sequence[Union[AssetsDefinition, AssetSpec, SourceAsset]]) – The assets to materialize. Can also provide SourceAssetobjects to fill dependencies for asset defs.
- 
run_config (Optional[Any]) – The run config to use for the run that materializes the assets. 
- 
resources (Optional[Mapping[str, object]]) – The resources needed for execution. Can provide resource instances 
- 
partition_key – (Optional[str]) 
- 
tags (Optional[Mapping[str, str]]) – Tags for the run. 
- 
selection (Optional[Union[str, Sequence[str], Sequence[AssetKey], Sequence[Union[AssetsDefinition, SourceAsset]], AssetSelection]]) – A sub-selection of assets to materialize. If not provided, then all assets will be materialized. 
 - Returns: The result of the execution.Return type: ExecuteInProcessResult Examples: - @asset
 def asset1():
 ...
 @asset
 def asset2(asset1):
 ...
 # executes a run that materializes asset1 and then asset2
 materialize([asset1, asset2])
 # executes a run that materializes just asset1
 materialize([asset1, asset2], selection=[asset1])
- 
Executing Jobs
- class dagster.JobDefinition
- Defines a Dagster job. - execute_in_process
- Execute the Job in-process, gathering results in-memory. - The executor_def on the Job will be ignored, and replaced with the in-process executor. If using the default io_manager, it will switch from filesystem to in-memory. - Parameters: - (Optional[Mapping[str (run_config) – The configuration for the run
- Any]] – The configuration for the run
- instance (Optional[DagsterInstance]) – The instance to execute against, an ephemeral one will be used if none provided.
- partition_key – (Optional[str])
- raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur.
- op_selection (Optional[Sequence[str]]) – A list of op selection queries (including single op
- input_values (Optional[Mapping[str, Any]]) – A dictionary that maps python objects to the top-level inputs of the job. Input
- resources (Optional[Mapping[str, Any]]) – The resources needed if any are required. Can provide resource instances directly,
 - Returns: - ExecuteInProcessResult
 - run_request_for_partition
- deprecatedThis API will be removed in version 2.0.0. Directly instantiate RunRequest(partition_key=...)instead..Creates a RunRequest object for a run that processes the given partition. Parameters: - partition_key – The key of the partition to request a run for.
- run_key (Optional[str]) – A string key to identify this launched run. For sensors, ensures that
- tags (Optional[Dict[str, str]]) – A dictionary of tags (string key-value pairs) to attach
- (Optional[Mapping[str (run_config) – Configuration for the run. If the job has
- Any]] – Configuration for the run. If the job has
- current_time (Optional[datetime]) – Used to determine which time-partitions exist.
- dynamic_partitions_store (Optional[DynamicPartitionsStore]) – The DynamicPartitionsStore
 Returns: an object that requests a run to process the given partition.Return type: RunRequest 
 - with_hooks
- Apply a set of hooks to all op instances within the job. 
 - with_top_level_resources
- Apply a set of resources to all op instances within the job. 
 - property config_mapping
- The config mapping for the job, if it has one. - A config mapping defines a way to map a top-level config schema to run config for the job. 
 - property executor_def
- Returns the default - ExecutorDefinitionfor the job.- If the user has not specified an executor definition, then this will default to the - multi_or_in_process_executor(). If a default is specified on the- Definitionsobject the job was provided to, then that will be used instead.
 - property has_specified_executor
- Returns True if this job has explicitly specified an executor, and False if the executor was inherited through defaults or the - Definitionsobject the job was provided to.
 - property has_specified_loggers
- Returns true if the job explicitly set loggers, and False if loggers were inherited through defaults or the - Definitionsobject the job was provided to.
 - property loggers
- Returns the set of LoggerDefinition objects specified on the job. - If the user has not specified a mapping of - LoggerDefinitionobjects, then this will default to the- colored_console_logger()under the key console. If a default is specified on the- Definitionsobject the job was provided to, then that will be used instead.
 - property partitioned_config
- The partitioned config for the job, if it has one. - A partitioned config defines a way to map partition keys to run config for the job. 
 - property partitions_def
- Returns the - PartitionsDefinitionfor the job, if it has one.- A partitions definition defines the set of partition keys the job operates on. 
 - property resource_defs
- Returns the set of ResourceDefinition objects specified on the job. - This may not be the complete set of resources required by the job, since those can also be provided on the - Definitionsobject the job may be provided to.
 
- dagster.execute_job
- Execute a job synchronously. - This API represents dagster’s python entrypoint for out-of-process execution. For most testing purposes, - execute_in_process()will be more suitable, but when wanting to run execution using an out-of-process executor (such as- dagster. multiprocess_executor), then execute_job is suitable.- execute_job expects a persistent - DagsterInstancefor execution, meaning the $DAGSTER_HOME environment variable must be set. It also expects a reconstructable pointer to a- JobDefinitionso that it can be reconstructed in separate processes. This can be done by wrapping the- JobDefinitionin a call to- dagster. reconstructable().- from dagster import DagsterInstance, execute_job, job, reconstructable
 @job
 def the_job():
 ...
 instance = DagsterInstance.get()
 result = execute_job(reconstructable(the_job), instance=instance)
 assert result.success- If using the - to_job()method to construct the- JobDefinition, then the invocation must be wrapped in a module-scope function, which can be passed to- reconstructable.- from dagster import graph, reconstructable
 @graph
 def the_graph():
 ...
 def define_job():
 return the_graph.to_job(...)
 result = execute_job(reconstructable(define_job), ...)- Since execute_job is potentially executing outside of the current process, output objects need to be retrieved by use of the provided job’s io managers. Output objects can be retrieved by opening the result of execute_job as a context manager. - from dagster import execute_job
 with execute_job(...) as result:
 output_obj = result.output_for_node("some_op")- execute_jobcan also be used to reexecute a run, by providing a- ReexecutionOptionsobject.- from dagster import ReexecutionOptions, execute_job
 instance = DagsterInstance.get()
 options = ReexecutionOptions.from_failure(run_id=failed_run_id, instance)
 execute_job(reconstructable(job), instance, reexecution_options=options)- Parameters: - 
job (ReconstructableJob) – A reconstructable pointer to a JobDefinition.
- 
instance (DagsterInstance) – The instance to execute against. 
- 
run_config (Optional[dict]) – The configuration that parametrizes this run, as a dict. 
- 
tags (Optional[Dict[str, Any]]) – Arbitrary key-value pairs that will be added to run logs. 
- 
raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur. 
- 
op_selection (Optional[List[str]]) – A list of op selection queries (including single op names) to execute. For example: - ['some_op']: selects- some_opitself.
- ['*some_op']: select- some_opand all its ancestors (upstream dependencies).
- ['*some_op+++']: select- some_op, all its ancestors, and its descendants
- ['*some_op', 'other_op_a', 'other_op_b+']: select- some_opand all its
 
- 
reexecution_options (Optional[ReexecutionOptions]) – Reexecution options to provide to the run, if this run is 
 - Returns: The result of job execution.Return type: - JobExecutionResult
- 
- class dagster.ReexecutionOptions
- Reexecution options for python-based execution in Dagster. - Parameters: - 
parent_run_id (str) – The run_id of the run to reexecute. 
- 
step_selection (Sequence[str]) – The list of step selections to reexecute. Must be a subset or match of the set of steps executed in the original run. For example: - ['some_op']: selects- some_opitself.
- ['*some_op']: select- some_opand all its ancestors (upstream dependencies).
- ['*some_op+++']: select- some_op, all its ancestors, and its descendants
- ['*some_op', 'other_op_a', 'other_op_b+']: select- some_opand all its
 
 
- 
- dagster.instance_for_test
- Creates a persistent - DagsterInstanceavailable within a context manager.- When a context manager is opened, if no temp_dir parameter is set, a new temporary directory will be created for the duration of the context manager’s opening. If the set_dagster_home parameter is set to True (True by default), the $DAGSTER_HOME environment variable will be overridden to be this directory (or the directory passed in by temp_dir) for the duration of the context manager being open. - Parameters: - overrides (Optional[Mapping[str, Any]]) – Config to provide to instance (config format follows that typically found in an instance.yaml file).
- set_dagster_home (Optional[bool]) – If set to True, the $DAGSTER_HOME environment variable will be
- temp_dir (Optional[str]) – The directory to use for storing local artifacts produced by the
 
Executing Graphs
- class dagster.GraphDefinition
- Defines a Dagster op graph. - An op graph is made up of - Nodes, which can either be an op (the functional unit of computation), or another graph.
- Dependencies, which determine how the values produced by nodes as outputs flow from
 - End users should prefer the - @graphdecorator. GraphDefinition is generally intended to be used by framework authors or for programatically generated graphs.- Parameters: - name (str) – The name of the graph. Must be unique within any GraphDefinition
- description (Optional[str]) – A human-readable description of the job.
- node_defs (Optional[Sequence[NodeDefinition]]) – The set of ops / graphs used in this graph.
- dependencies (Optional[Dict[Union[str, NodeInvocation], Dict[str, DependencyDefinition]]]) – A structure that declares the dependencies of each op’s inputs on the outputs of other
- input_mappings (Optional[Sequence[InputMapping]]) – Defines the inputs to the nested graph, and
- output_mappings (Optional[Sequence[OutputMapping]]) – Defines the outputs of the nested graph,
- config (Optional[ConfigMapping]) – Defines the config of the graph, and how its schema maps
- tags (Optional[Dict[str, Any]]) – Arbitrary metadata for any execution of the graph.
- composition_fn (Optional[Callable]) – The function that defines this graph. Used to generate
 - Examples: - @op
 def return_one():
 return 1
 @op
 def add_one(num):
 return num + 1
 graph_def = GraphDefinition(
 name='basic',
 node_defs=[return_one, add_one],
 dependencies=\{'add_one': \{'num': DependencyDefinition('return_one')}},
 )- alias
- Aliases the graph with a new name. - Can only be used in the context of a Examples:- @graph,- @job, or- @asset_graphdecorated function.- @job
 def do_it_all():
 my_graph.alias("my_graph_alias")
 - execute_in_process
- Execute this graph in-process, collecting results in-memory. - Parameters: - run_config (Optional[Mapping[str, Any]]) – Run config to provide to execution. The configuration for the underlying graph
- instance (Optional[DagsterInstance]) – The instance to execute against, an ephemeral one will be used if none provided.
- resources (Optional[Mapping[str, Any]]) – The resources needed if any are required. Can provide resource instances directly,
- raise_on_error (Optional[bool]) – Whether or not to raise exceptions when they occur.
- op_selection (Optional[List[str]]) – A list of op selection queries (including single op
- input_values (Optional[Mapping[str, Any]]) – A dictionary that maps python objects to the top-level inputs of the graph.
 - Returns: - ExecuteInProcessResult
 - tag
- Attaches the provided tags to the graph immutably. - Can only be used in the context of a Examples:- @graph,- @job, or- @asset_graphdecorated function.- @job
 def do_it_all():
 my_graph.tag(\{"my_tag": "my_value"})
 - to_job
- Make this graph in to an executable Job by providing remaining components required for execution. - Parameters: - 
name (Optional[str]) – The name for the Job. Defaults to the name of the this graph. 
- 
resource_defs (Optional[Mapping [str, object]]) – Resources that are required by this graph for execution. 
- 
config – Describes how the job is parameterized at runtime. If no value is provided, then the schema for the job’s run config is a standard format based on its ops and resources. If a dictionary is provided, then it must conform to the standard config schema, and it will be used as the job’s run config for the job whenever the job is executed. The values provided will be viewable and editable in the Dagster UI, so be careful with secrets. If a ConfigMappingobject is provided, then the schema for the job’s run config is determined by the config mapping, and the ConfigMapping, which should return configuration in the standard format to configure the job.
- 
tags (Optional[Mapping[str, object]]) – A set of key-value tags that annotate the job and can 
- 
run_tags (Optional[Mapping[str, object]]) – A set of key-value tags that will be automatically attached to runs launched by this 
- 
metadata (Optional[Mapping[str, RawMetadataValue]]) – Arbitrary information that will be attached to the JobDefinition and be viewable in the Dagster UI. 
- 
logger_defs (Optional[Mapping[str, LoggerDefinition]]) – A dictionary of string logger identifiers to their implementations. 
- 
executor_def (Optional[ExecutorDefinition]) – How this Job will be executed. Defaults to multi_or_in_process_executor,
- 
op_retry_policy (Optional[RetryPolicy]) – The default retry policy for all ops in this job. 
- 
partitions_def (Optional[PartitionsDefinition]) – Defines a discrete set of partition 
- 
asset_layer (Optional[AssetLayer]) – Top level information about the assets this job 
- 
input_values (Optional[Mapping[str, Any]]) – A dictionary that maps python objects to the top-level inputs of a job. 
 - Returns: JobDefinition 
- 
 - with_hooks
- Attaches the provided hooks to the graph immutably. - Can only be used in the context of a Examples:- @graph,- @job, or- @asset_graphdecorated function.- @job
 def do_it_all():
 my_graph.with_hooks(\{my_hook})
 - with_retry_policy
- Attaches the provided retry policy to the graph immutably. - Can only be used in the context of a Examples:- @graph,- @job, or- @asset_graphdecorated function.- @job
 def do_it_all():
 my_graph.with_retry_policy(RetryPolicy(max_retries=5))
 - property config_mapping
- The config mapping for the graph, if present. - By specifying a config mapping function, you can override the configuration for the child nodes contained within a graph. 
 - property input_mappings
- Input mappings for the graph. - An input mapping is a mapping from an input of the graph to an input of a child node. 
 - property name
- The name of the graph. 
 - property output_mappings
- Output mappings for the graph. - An output mapping is a mapping from an output of the graph to an output of a child node. 
 - property tags
- The tags associated with the graph. 
 
Execution results
- class dagster.ExecuteInProcessResult
- Result object returned by in-process testing APIs. - Users should not instantiate this object directly. Used for retrieving run success, events, and outputs from execution methods that return this object. - This object is returned by: - dagster.GraphDefinition.execute_in_process()
- dagster.JobDefinition.execute_in_process()
- dagster.materialize_to_memory()
- dagster.materialize()
 - asset_value
- Retrieves the value of an asset that was materialized during the execution of the job. - Parameters: asset_key (CoercibleToAssetKey) – The key of the asset to retrieve.Returns: The value of the retrieved asset.Return type: Any 
 - output_for_node
- Retrieves output value with a particular name from the in-process run of the job. - Parameters: - node_str (str) – Name of the op/graph whose output should be retrieved. If the intended
- output_name (Optional[str]) – Name of the output on the op/graph to retrieve. Defaults to
 - Returns: The value of the retrieved output.Return type: Any 
 - output_value
- Retrieves output of top-level job, if an output is returned. - Parameters: output_name (Optional[str]) – The name of the output to retrieve. Defaults to result, the default output name in dagster.Returns: The value of the retrieved output.Return type: Any 
 - property all_events
- All dagster events emitted during execution. - Type: List[DagsterEvent] 
 - property dagster_run
- The Dagster run that was executed. - Type: DagsterRun 
 - property job_def
- The job definition that was executed. - Type: JobDefinition 
 - property run_id
- The run ID of the executed - DagsterRun.- Type: str 
 
- class dagster.JobExecutionResult
- Result object returned by - dagster.execute_job().- Used for retrieving run success, events, and outputs from execute_job. Users should not directly instantiate this class. - Events and run information can be retrieved off of the object directly. In order to access outputs, the ExecuteJobResult object needs to be opened as a context manager, which will re-initialize the resources from execution. - output_for_node
- Retrieves output value with a particular name from the run of the job. - In order to use this method, the ExecuteJobResult object must be opened as a context manager. If this method is used without opening the context manager, it will result in a - DagsterInvariantViolationError.- Parameters: - node_str (str) – Name of the op/graph whose output should be retrieved. If the intended
- output_name (Optional[str]) – Name of the output on the op/graph to retrieve. Defaults to
 - Returns: The value of the retrieved output.Return type: Any 
 - output_value
- Retrieves output of top-level job, if an output is returned. - In order to use this method, the ExecuteJobResult object must be opened as a context manager. If this method is used without opening the context manager, it will result in a - DagsterInvariantViolationError. If the top-level job has no output, calling this method will also result in a- DagsterInvariantViolationError.- Parameters: output_name (Optional[str]) – The name of the output to retrieve. Defaults to result, the default output name in dagster.Returns: The value of the retrieved output.Return type: Any 
 - property all_events
- List of all events yielded by the job execution. - Type: Sequence[DagsterEvent] 
 - property dagster_run
- The Dagster run that was executed. - Type: DagsterRun 
 - property job_def
- The job definition that was executed. - Type: JobDefinition 
 - property run_id
- The id of the Dagster run that was executed. - Type: str 
 
- class dagster.DagsterEvent
- Events yielded by op and job execution. - Users should not instantiate this class. - event_type_value
- Value for a DagsterEventType. - Type: str 
 - job_name
- Type: str 
 - node_handle
- Type: NodeHandle 
 - step_kind_value
- Value for a StepKind. - Type: str 
 - logging_tags
- Type: Dict[str, str] 
 - event_specific_data
- Type must correspond to event_type_value. - Type: Any 
 - message
- Type: str 
 - pid
- Type: int 
 - step_key
- DEPRECATED - Type: Optional[str] 
 - property asset_key
- For events that correspond to a specific asset_key / partition (ASSET_MATERIALIZTION, ASSET_OBSERVATION, ASSET_MATERIALIZATION_PLANNED), returns that asset key. Otherwise, returns None. - Type: Optional[AssetKey] 
 - property event_type
- The type of this event. - Type: DagsterEventType 
 - property is_asset_materialization_planned
- If this event is of type ASSET_MATERIALIZATION_PLANNED. - Type: bool 
 - property is_asset_observation
- If this event is of type ASSET_OBSERVATION. - Type: bool 
 - property is_engine_event
- If this event is of type ENGINE_EVENT. - Type: bool 
 - property is_expectation_result
- If this event is of type STEP_EXPECTATION_RESULT. - Type: bool 
 - property is_failure
- If this event represents the failure of a run or step. - Type: bool 
 - property is_handled_output
- If this event is of type HANDLED_OUTPUT. - Type: bool 
 - property is_hook_event
- If this event relates to the execution of a hook. - Type: bool 
 - property is_loaded_input
- If this event is of type LOADED_INPUT. - Type: bool 
 - property is_resource_init_failure
- If this event is of type RESOURCE_INIT_FAILURE. - Type: bool 
 - property is_step_event
- If this event relates to a specific step. - Type: bool 
 - property is_step_failure
- If this event is of type STEP_FAILURE. - Type: bool 
 - property is_step_materialization
- If this event is of type ASSET_MATERIALIZATION. - Type: bool 
 - property is_step_restarted
- If this event is of type STEP_RESTARTED. - Type: bool 
 - property is_step_skipped
- If this event is of type STEP_SKIPPED. - Type: bool 
 - property is_step_start
- If this event is of type STEP_START. - Type: bool 
 - property is_step_success
- If this event is of type STEP_SUCCESS. - Type: bool 
 - property is_step_up_for_retry
- If this event is of type STEP_UP_FOR_RETRY. - Type: bool 
 - property is_successful_output
- If this event is of type STEP_OUTPUT. - Type: bool 
 - property partition
- For events that correspond to a specific asset_key / partition (ASSET_MATERIALIZTION, ASSET_OBSERVATION, ASSET_MATERIALIZATION_PLANNED), returns that partition. Otherwise, returns None. - Type: Optional[AssetKey] 
 
- class dagster.DagsterEventType
- The types of events that may be yielded by op and job execution. 
Reconstructable jobs
- class dagster.reconstructable
- Create a - ReconstructableJobfrom a function that returns a- JobDefinition/- JobDefinition, or a function decorated with- @job.- When your job must cross process boundaries, e.g., for execution on multiple nodes or in different systems (like - dagstermill), Dagster must know how to reconstruct the job on the other side of the process boundary.- Passing a job created with - ~dagster.GraphDefinition.to_jobto- reconstructable(), requires you to wrap that job’s definition in a module-scoped function, and pass that function instead:- from dagster import graph, reconstructable
 @graph
 def my_graph():
 ...
 def define_my_job():
 return my_graph.to_job()
 reconstructable(define_my_job)- This function implements a very conservative strategy for reconstruction, so that its behavior is easy to predict, but as a consequence it is not able to reconstruct certain kinds of jobs or jobs, such as those defined by lambdas, in nested scopes (e.g., dynamically within a method call), or in interactive environments such as the Python REPL or Jupyter notebooks. - If you need to reconstruct objects constructed in these ways, you should use - build_reconstructable_job()instead, which allows you to specify your own reconstruction strategy.- Examples: - from dagster import job, reconstructable
 @job
 def foo_job():
 ...
 reconstructable_foo_job = reconstructable(foo_job)
 @graph
 def foo():
 ...
 def make_bar_job():
 return foo.to_job()
 reconstructable_bar_job = reconstructable(make_bar_job)
Executors
- dagster.multi_or_in_process_executor ExecutorDefinition
- The default executor for a job. - This is the executor available by default on a - JobDefinitionthat does not provide custom executors. This executor has a multiprocessing-enabled mode, and a single-process mode. By default, multiprocessing mode is enabled. Switching between multiprocess mode and in-process mode can be achieved via config.- execution:
 config:
 multiprocess:
 execution:
 config:
 in_process:- When using the multiprocess mode, - max_concurrentand- retriescan also be configured.- execution:
 config:
 multiprocess:
 max_concurrent: 4
 retries:
 enabled:- The - max_concurrentarg is optional and tells the execution engine how many processes may run concurrently. By default, or if you set- max_concurrentto be 0, this is the return value of- python:multiprocessing.cpu_count().- When using the in_process mode, then only retries can be configured. - Execution priority can be configured using the - dagster/prioritytag via op metadata, where the higher the number the higher the priority. 0 is the default and both positive and negative numbers can be used.
- dagster.in_process_executor ExecutorDefinition
- The in-process executor executes all steps in a single process. - To select it, include the following top-level fragment in config: - execution:
 in_process:- Execution priority can be configured using the - dagster/prioritytag via op metadata, where the higher the number the higher the priority. 0 is the default and both positive and negative numbers can be used.
- dagster.multiprocess_executor ExecutorDefinition
- The multiprocess executor executes each step in an individual process. - Any job that does not specify custom executors will use the multiprocess_executor by default. To configure the multiprocess executor, include a fragment such as the following in your run config: - execution:
 config:
 multiprocess:
 max_concurrent: 4- The - max_concurrentarg is optional and tells the execution engine how many processes may run concurrently. By default, or if you set- max_concurrentto be None or 0, this is the return value of- python:multiprocessing.cpu_count().- Execution priority can be configured using the - dagster/prioritytag via op metadata, where the higher the number the higher the priority. 0 is the default and both positive and negative numbers can be used.
Contexts
- class dagster.AssetExecutionContext
- add_asset_metadata
- Add metadata to an asset materialization event. This metadata will be available in the Dagster UI. - Parameters: - metadata (Mapping[str, Any]) – The metadata to add to the asset
- asset_key (Optional[CoercibleToAssetKey]) – The asset key to add metadata to.
- partition_key (Optional[str]) – The partition key to add metadata to, if
 - Examples: - Adding metadata to the asset materialization event for a single asset: - import dagster as dg
 @dg.asset
 def my_asset(context):
 # Add metadata
 context.add_asset_metadata(\{"key": "value"})- Adding metadata to the asset materialization event for a particular partition of a partitioned asset: - import dagster as dg
 @dg.asset(partitions_def=dg.StaticPartitionsDefinition(["a", "b"]))
 def my_asset(context):
 # Adds metadata to all partitions currently being materialized, since no
 # partition is specified.
 context.add_asset_metadata(\{"key": "value"})
 for partition_key in context.partition_keys:
 # Add metadata only to the event for partition "a"
 if partition_key == "a":
 context.add_asset_metadata(\{"key": "value"}, partition_key=partition_key)- Adding metadata to the asset materialization event for a particular asset in a multi-asset. - import dagster as dg
 @dg.multi_asset(specs=[dg.AssetSpec("asset1"), dg.AssetSpec("asset2")])
 def my_multi_asset(context):
 # Add metadata to the materialization event for "asset1"
 context.add_asset_metadata(\{"key": "value"}, asset_key="asset1")
 # THIS line will fail since asset key is not specified:
 context.add_asset_metadata(\{"key": "value"})
 - add_output_metadata
- Add metadata to one of the outputs of an op. - This can be invoked multiple times per output in the body of an op. If the same key is passed multiple times, the value associated with the last call will be used. - Parameters: - metadata (Mapping[str, Any]) – The metadata to attach to the output
- output_name (Optional[str]) – The name of the output to attach metadata to. If there is only one output on the op, then this argument does not need to be provided. The metadata will automatically be attached to the only output.
- mapping_key (Optional[str]) – The mapping key of the output to attach metadata to. If the
 - from dagster import Out, op
 from typing import Tuple
 @op
 def add_metadata(context):
 context.add_output_metadata(\{"foo", "bar"})
 return 5 # Since the default output is called "result", metadata will be attached to the output "result".
 @op(out=\{"a": Out(), "b": Out()})
 def add_metadata_two_outputs(context) -> Tuple[str, int]:
 context.add_output_metadata(\{"foo": "bar"}, output_name="b")
 context.add_output_metadata(\{"baz": "bat"}, output_name="a")
 return ("dog", 5)
 - asset_key_for_input
- Return the AssetKey for the corresponding input. 
 - asset_key_for_output
- Return the AssetKey for the corresponding output. 
 - asset_partition_key_for_input
- Returns the partition key of the upstream asset corresponding to the given input. - Parameters: input_name (str) – The name of the input to get the partition key for. Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset():
 ...
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partition_key_for_input("upstream_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # "2023-08-21"
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partition_key_for_input("self_dependent_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # "2023-08-20"
 - asset_partition_key_for_output
- deprecatedThis API will be removed in version a future release. You have called the deprecated method asset_partition_key_for_output on AssetExecutionContext. Use context.partition_key instead.. Returns the asset partition key for the given output. Parameters: output_name (str) – For assets defined with the @assetdecorator, the name of the output will be automatically provided. For assets defined with@multi_asset,output_nameshould be the op output associated with the asset key (as determined by AssetOut) to get the partition key for. Examples:partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partition_key_for_output())
 # materializing the 2023-08-21 partition of this asset will log:
 # "2023-08-21"
 @multi_asset(
 outs=\{
 "first_asset": AssetOut(key=["my_assets", "first_asset"]),
 "second_asset": AssetOut(key=["my_assets", "second_asset"])
 }
 partitions_def=partitions_def,
 )
 def a_multi_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partition_key_for_output("first_asset"))
 context.log.info(context.asset_partition_key_for_output("second_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # "2023-08-21"
 # "2023-08-21"
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partition_key_for_output())
 # materializing the 2023-08-21 partition of this asset will log:
 # "2023-08-21"
 - asset_partition_key_range_for_input
- Return the PartitionKeyRange for the corresponding input. Errors if the asset depends on a non-contiguous chunk of the input. - If you want to write your asset to support running a backfill of several partitions in a single run, you can use - asset_partition_key_range_for_inputto get the range of partitions keys of the input that are relevant to that backfill.- Parameters: input_name (str) – The name of the input to get the time window for. Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset():
 ...
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partition_key_range_for_input("upstream_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-21", end="2023-08-25")
 @asset(
 ins=\{
 "upstream_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 partitions_def=partitions_def,
 )
 def another_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partition_key_range_for_input("upstream_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-20", end="2023-08-24")
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partition_key_range_for_input("self_dependent_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-20", end="2023-08-24")
 - asset_partition_key_range_for_output
- deprecatedThis API will be removed in version a future release. You have called the deprecated method asset_partition_key_range_for_output on AssetExecutionContext. Use context.partition_key_range instead.. Return the PartitionKeyRange for the corresponding output. Errors if the run is not partitioned. If you want to write your asset to support running a backfill of several partitions in a single run, you can use asset_partition_key_range_for_outputto get all of the partitions being materialized by the backfill.Parameters: output_name (str) – For assets defined with the @assetdecorator, the name of the output will be automatically provided. For assets defined with@multi_asset,output_nameshould be the op output associated with the asset key (as determined by AssetOut) to get the partition key range for. Examples:partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partition_key_range_for_output())
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-21", end="2023-08-25")
 @multi_asset(
 outs=\{
 "first_asset": AssetOut(key=["my_assets", "first_asset"]),
 "second_asset": AssetOut(key=["my_assets", "second_asset"])
 }
 partitions_def=partitions_def,
 )
 def a_multi_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partition_key_range_for_output("first_asset"))
 context.log.info(context.asset_partition_key_range_for_output("second_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-21", end="2023-08-25")
 # PartitionKeyRange(start="2023-08-21", end="2023-08-25")
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partition_key_range_for_output())
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-21", end="2023-08-25")
 - asset_partition_keys_for_input
- Returns a list of the partition keys of the upstream asset corresponding to the given input. - If you want to write your asset to support running a backfill of several partitions in a single run, you can use - asset_partition_keys_for_inputto get all of the partition keys of the input that are relevant to that backfill.- Parameters: input_name (str) – The name of the input to get the time window for. Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset():
 ...
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partition_keys_for_input("upstream_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
 @asset(
 ins=\{
 "upstream_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 partitions_def=partitions_def,
 )
 def another_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partition_keys_for_input("upstream_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-20", "2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24"]
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partition_keys_for_input("self_dependent_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-20", "2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24"]
 - asset_partition_keys_for_output
- deprecatedThis API will be removed in version a future release. You have called the deprecated method asset_partition_keys_for_output on AssetExecutionContext. Use context.partition_keys instead.. Returns a list of the partition keys for the given output. If you want to write your asset to support running a backfill of several partitions in a single run, you can use asset_partition_keys_for_outputto get all of the partitions being materialized by the backfill.Parameters: output_name (str) – For assets defined with the @assetdecorator, the name of the output will be automatically provided. For assets defined with@multi_asset,output_nameshould be the op output associated with the asset key (as determined by AssetOut) to get the partition keys for. Examples:partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partition_keys_for_output())
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
 @multi_asset(
 outs=\{
 "first_asset": AssetOut(key=["my_assets", "first_asset"]),
 "second_asset": AssetOut(key=["my_assets", "second_asset"])
 }
 partitions_def=partitions_def,
 )
 def a_multi_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partition_keys_for_output("first_asset"))
 context.log.info(context.asset_partition_keys_for_output("second_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
 # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partition_keys_for_output())
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
 - asset_partitions_def_for_input
- The PartitionsDefinition on the upstream asset corresponding to this input. - Parameters: input_name (str) – The name of the input to get the PartitionsDefinition for. Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset():
 ...
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partitions_def_for_input("upstream_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # DailyPartitionsDefinition("2023-08-20")
 - asset_partitions_def_for_output
- deprecatedThis API will be removed in version a future release. You have called the deprecated method asset_partitions_def_for_output on AssetExecutionContext. Use context.assets_def.partitions_def instead.. The PartitionsDefinition on the asset corresponding to this output. Parameters: output_name (str) – For assets defined with the @assetdecorator, the name of the output will be automatically provided. For assets defined with@multi_asset,output_nameshould be the op output associated with the asset key (as determined by AssetOut) to get the PartitionsDefinition for. Examples:partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partitions_def_for_output())
 # materializing the 2023-08-21 partition of this asset will log:
 # DailyPartitionsDefinition("2023-08-20")
 @multi_asset(
 outs=\{
 "first_asset": AssetOut(key=["my_assets", "first_asset"]),
 "second_asset": AssetOut(key=["my_assets", "second_asset"])
 }
 partitions_def=partitions_def,
 )
 def a_multi_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partitions_def_for_output("first_asset"))
 context.log.info(context.asset_partitions_def_for_output("second_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # DailyPartitionsDefinition("2023-08-20")
 # DailyPartitionsDefinition("2023-08-20")
 - asset_partitions_time_window_for_input
- The time window for the partitions of the input asset. - If you want to write your asset to support running a backfill of several partitions in a single run, you can use - asset_partitions_time_window_for_inputto get the time window of the input that are relevant to that backfill.- Raises an error if either of the following are true: - The input asset has no partitioning.
- The input asset is not partitioned with a TimeWindowPartitionsDefinition or a MultiPartitionsDefinition with one time-partitioned dimension.
 - Parameters: input_name (str) – The name of the input to get the partition key for. Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset():
 ...
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partitions_time_window_for_input("upstream_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-22")
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-26")
 @asset(
 ins=\{
 "upstream_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 partitions_def=partitions_def,
 )
 def another_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partitions_time_window_for_input("upstream_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-20", "2023-08-21")
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-26")
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partitions_time_window_for_input("self_dependent_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-20", "2023-08-21")
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # TimeWindow("2023-08-20", "2023-08-25")
 - asset_partitions_time_window_for_output
- deprecatedThis API will be removed in version a future release. You have called the deprecated method asset_partitions_time_window_for_output on AssetExecutionContext. Use context.partition_time_window instead.. The time window for the partitions of the output asset. If you want to write your asset to support running a backfill of several partitions in a single run, you can use asset_partitions_time_window_for_outputto get the TimeWindow of all of the partitions being materialized by the backfill.Raises an error if either of the following are true: - The output asset has no partitioning.
- The output asset is not partitioned with a TimeWindowPartitionsDefinition or a MultiPartitionsDefinition with one time-partitioned dimension.
 Parameters: output_name (str) – For assets defined with the @assetdecorator, the name of the output will be automatically provided. For assets defined with@multi_asset,output_nameshould be the op output associated with the asset key (as determined by AssetOut) to get the time window for. Examples:partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partitions_time_window_for_output())
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-22")
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-26")
 @multi_asset(
 outs=\{
 "first_asset": AssetOut(key=["my_assets", "first_asset"]),
 "second_asset": AssetOut(key=["my_assets", "second_asset"])
 }
 partitions_def=partitions_def,
 )
 def a_multi_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partitions_time_window_for_output("first_asset"))
 context.log.info(context.asset_partitions_time_window_for_output("second_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-22")
 # TimeWindow("2023-08-21", "2023-08-22")
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-26")
 # TimeWindow("2023-08-21", "2023-08-26")
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partitions_time_window_for_output())
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-22")
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-26")
 - get_asset_provenance
- experimentalThis API may break in future versions, even between dot releases. Return the provenance information for the most recent materialization of an asset. Parameters: asset_key (AssetKey) – Key of the asset for which to retrieve provenance.Returns: Provenance information for the most recent materialization of the asset. Returns None if the asset was never materialized or the materialization record is too old to contain provenance information. Return type: Optional[DataProvenance] 
 - get_mapping_key
- deprecatedThis API will be removed in version a future release. You have called the deprecated method get_mapping_key on AssetExecutionContext. Use context.op_execution_context.get_mapping_key instead.. Which mapping_key this execution is for if downstream of a DynamicOutput, otherwise None. 
 - get_tag
- deprecatedThis API will be removed in version a future release. You have called the deprecated method get_tag on AssetExecutionContext. Use context.run.tags.get(key) instead.. Get a logging tag. Parameters: key (tag) – The tag to get.Returns: The value of the tag, if present.Return type: Optional[str] 
 - has_tag
- deprecatedThis API will be removed in version a future release. You have called the deprecated method has_tag on AssetExecutionContext. Use key in context.run.tags instead.. Check if a logging tag is set. Parameters: key (str) – The tag to check.Returns: Whether the tag is set.Return type: bool 
 - log_event
- Log an AssetMaterialization, AssetObservation, or ExpectationResult from within the body of an op. - Events logged with this method will appear in the list of DagsterEvents, as well as the event log. - Parameters: event (Union[AssetMaterialization, AssetObservation, ExpectationResult]) – The event to log. Examples: - from dagster import op, AssetMaterialization
 @op
 def log_materialization(context):
 context.log_event(AssetMaterialization("foo"))
 - output_for_asset_key
- Return the output name for the corresponding asset key. 
 - property asset_key
- The AssetKey for the current asset. In a multi_asset, use asset_key_for_output instead. 
 - property asset_partition_key_range
- deprecatedThis API will be removed in version 2.0. Use partition_key_rangeinstead..The range of partition keys for the current run. If run is for a single partition key, return a PartitionKeyRange with the same start and end. Raises an error if the current run is not a partitioned run. 
 - property assets_def
- The backing AssetsDefinition for what is currently executing, errors if not available. 
 - property has_assets_def
- If there is a backing AssetsDefinition for what is currently executing. 
 - property has_partition_key
- Whether the current run is a partitioned run. 
 - property has_partition_key_range
- Whether the current run is a partitioned run. 
 - property instance
- The current Dagster instance. - Type: DagsterInstance 
 - property job_def
- The definition for the currently executing job. Information like the job name, and job tags can be found on the JobDefinition. Returns: JobDefinition. 
 - property job_name
- The name of the currently executing pipeline. - Type: str 
 - property log
- The log manager available in the execution context. Logs will be viewable in the Dagster UI. Returns: DagsterLogManager. - Example: - @asset
 def logger(context):
 context.log.info("Info level message")
 - property op_config
- deprecatedThis API will be removed in version a future release. You have called the deprecated method op_config on AssetExecutionContext. Use context.op_execution_context.op_config instead.. The parsed config specific to this op. Type: Any 
 - property op_def
- The current op definition. - Type: OpDefinition 
 - property partition_key
- The partition key for the current run. - Raises an error if the current run is not a partitioned run. Or if the current run is operating over a range of partitions (ie. a backfill of several partitions executed in a single run). - Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def my_asset(context: AssetExecutionContext):
 context.log.info(context.partition_key)
 # materializing the 2023-08-21 partition of this asset will log:
 # "2023-08-21"
 - property partition_key_range
- The range of partition keys for the current run. - If run is for a single partition key, returns a PartitionKeyRange with the same start and end. Raises an error if the current run is not a partitioned run. - Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def my_asset(context: AssetExecutionContext):
 context.log.info(context.partition_key_range)
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-21", end="2023-08-25")
 - property partition_keys
- Returns a list of the partition keys for the current run. - If you want to write your asset to support running a backfill of several partitions in a single run, you can use - partition_keysto get all of the partitions being materialized by the backfill.- Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(partitions_def=partitions_def)
 def an_asset(context: AssetExecutionContext):
 context.log.info(context.partition_keys)
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
 - property partition_time_window
- The partition time window for the current run. - Raises an error if the current run is not a partitioned run, or if the job’s partition definition is not a TimeWindowPartitionsDefinition. - Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def my_asset(context: AssetExecutionContext):
 context.log.info(context.partition_time_window)
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-22")
 - property pdb
- Gives access to pdb debugging from within the asset. Materializing the asset via the Dagster UI or CLI will enter the pdb debugging context in the process used to launch the UI or run the CLI. - Returns: dagster.utils.forked_pdb.ForkedPdb - Example: - @asset
 def debug(context):
 context.pdb.set_trace()
 - property resources
- The currently available resources. - Type: Resources 
 - property selected_asset_check_keys
- Get the asset check keys that correspond to the current selection of assets this execution is expected to materialize. 
 - property selected_asset_keys
- Get the set of AssetKeys this execution is expected to materialize. 
 - property selected_output_names
- deprecatedThis API will be removed in version a future release. You have called the deprecated method selected_output_names on AssetExecutionContext. Use context.op_execution_context.selected_output_names instead.. Get the output names that correspond to the current selection of assets this execution is expected to materialize. 
 
- class dagster.OpExecutionContext
- The - contextobject that can be made available as the first argument to the function used for computing an op or asset.- This context object provides system information such as resources, config, and logging. - To construct an execution context for testing purposes, use - dagster.build_op_context().- Example: - from dagster import op, OpExecutionContext
 @op
 def hello_world(context: OpExecutionContext):
 context.log.info("Hello, world!")- add_output_metadata
- Add metadata to one of the outputs of an op. - This can be invoked multiple times per output in the body of an op. If the same key is passed multiple times, the value associated with the last call will be used. - Parameters: - metadata (Mapping[str, Any]) – The metadata to attach to the output
- output_name (Optional[str]) – The name of the output to attach metadata to. If there is only one output on the op, then this argument does not need to be provided. The metadata will automatically be attached to the only output.
- mapping_key (Optional[str]) – The mapping key of the output to attach metadata to. If the
 - from dagster import Out, op
 from typing import Tuple
 @op
 def add_metadata(context):
 context.add_output_metadata(\{"foo", "bar"})
 return 5 # Since the default output is called "result", metadata will be attached to the output "result".
 @op(out=\{"a": Out(), "b": Out()})
 def add_metadata_two_outputs(context) -> Tuple[str, int]:
 context.add_output_metadata(\{"foo": "bar"}, output_name="b")
 context.add_output_metadata(\{"baz": "bat"}, output_name="a")
 return ("dog", 5)
 - asset_key_for_input
- Return the AssetKey for the corresponding input. 
 - asset_key_for_output
- Return the AssetKey for the corresponding output. 
 - asset_partition_key_for_input
- Returns the partition key of the upstream asset corresponding to the given input. - Parameters: input_name (str) – The name of the input to get the partition key for. Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset():
 ...
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partition_key_for_input("upstream_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # "2023-08-21"
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partition_key_for_input("self_dependent_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # "2023-08-20"
 - asset_partition_key_for_output
- deprecatedThis API will be removed in version 2.0. Use partition_keyinstead..Returns the asset partition key for the given output. Parameters: output_name (str) – For assets defined with the @assetdecorator, the name of the output will be automatically provided. For assets defined with@multi_asset,output_nameshould be the op output associated with the asset key (as determined by AssetOut) to get the partition key for. Examples:partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partition_key_for_output())
 # materializing the 2023-08-21 partition of this asset will log:
 # "2023-08-21"
 @multi_asset(
 outs=\{
 "first_asset": AssetOut(key=["my_assets", "first_asset"]),
 "second_asset": AssetOut(key=["my_assets", "second_asset"])
 }
 partitions_def=partitions_def,
 )
 def a_multi_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partition_key_for_output("first_asset"))
 context.log.info(context.asset_partition_key_for_output("second_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # "2023-08-21"
 # "2023-08-21"
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partition_key_for_output())
 # materializing the 2023-08-21 partition of this asset will log:
 # "2023-08-21"
 - asset_partition_key_range_for_input
- Return the PartitionKeyRange for the corresponding input. Errors if the asset depends on a non-contiguous chunk of the input. - If you want to write your asset to support running a backfill of several partitions in a single run, you can use - asset_partition_key_range_for_inputto get the range of partitions keys of the input that are relevant to that backfill.- Parameters: input_name (str) – The name of the input to get the time window for. Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset():
 ...
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partition_key_range_for_input("upstream_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-21", end="2023-08-25")
 @asset(
 ins=\{
 "upstream_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 partitions_def=partitions_def,
 )
 def another_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partition_key_range_for_input("upstream_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-20", end="2023-08-24")
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partition_key_range_for_input("self_dependent_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-20", end="2023-08-24")
 - asset_partition_key_range_for_output
- deprecatedThis API will be removed in version 2.0. Use partition_key_rangeinstead..Return the PartitionKeyRange for the corresponding output. Errors if the run is not partitioned. If you want to write your asset to support running a backfill of several partitions in a single run, you can use asset_partition_key_range_for_outputto get all of the partitions being materialized by the backfill.Parameters: output_name (str) – For assets defined with the @assetdecorator, the name of the output will be automatically provided. For assets defined with@multi_asset,output_nameshould be the op output associated with the asset key (as determined by AssetOut) to get the partition key range for. Examples:partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partition_key_range_for_output())
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-21", end="2023-08-25")
 @multi_asset(
 outs=\{
 "first_asset": AssetOut(key=["my_assets", "first_asset"]),
 "second_asset": AssetOut(key=["my_assets", "second_asset"])
 }
 partitions_def=partitions_def,
 )
 def a_multi_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partition_key_range_for_output("first_asset"))
 context.log.info(context.asset_partition_key_range_for_output("second_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-21", end="2023-08-25")
 # PartitionKeyRange(start="2023-08-21", end="2023-08-25")
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partition_key_range_for_output())
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-21", end="2023-08-25")
 - asset_partition_keys_for_input
- Returns a list of the partition keys of the upstream asset corresponding to the given input. - If you want to write your asset to support running a backfill of several partitions in a single run, you can use - asset_partition_keys_for_inputto get all of the partition keys of the input that are relevant to that backfill.- Parameters: input_name (str) – The name of the input to get the time window for. Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset():
 ...
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partition_keys_for_input("upstream_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
 @asset(
 ins=\{
 "upstream_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 partitions_def=partitions_def,
 )
 def another_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partition_keys_for_input("upstream_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-20", "2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24"]
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partition_keys_for_input("self_dependent_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-20", "2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24"]
 - asset_partition_keys_for_output
- deprecatedThis API will be removed in version 2.0. Use partition_keysinstead..Returns a list of the partition keys for the given output. If you want to write your asset to support running a backfill of several partitions in a single run, you can use asset_partition_keys_for_outputto get all of the partitions being materialized by the backfill.Parameters: output_name (str) – For assets defined with the @assetdecorator, the name of the output will be automatically provided. For assets defined with@multi_asset,output_nameshould be the op output associated with the asset key (as determined by AssetOut) to get the partition keys for. Examples:partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partition_keys_for_output())
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
 @multi_asset(
 outs=\{
 "first_asset": AssetOut(key=["my_assets", "first_asset"]),
 "second_asset": AssetOut(key=["my_assets", "second_asset"])
 }
 partitions_def=partitions_def,
 )
 def a_multi_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partition_keys_for_output("first_asset"))
 context.log.info(context.asset_partition_keys_for_output("second_asset"))
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
 # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partition_keys_for_output())
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
 - asset_partitions_def_for_input
- The PartitionsDefinition on the upstream asset corresponding to this input. - Parameters: input_name (str) – The name of the input to get the PartitionsDefinition for. Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset():
 ...
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partitions_def_for_input("upstream_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # DailyPartitionsDefinition("2023-08-20")
 - asset_partitions_def_for_output
- The PartitionsDefinition on the asset corresponding to this output. - Parameters: output_name (str) – For assets defined with the - @assetdecorator, the name of the output will be automatically provided. For assets defined with- @multi_asset,- output_nameshould be the op output associated with the asset key (as determined by AssetOut) to get the PartitionsDefinition for. Examples:- partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partitions_def_for_output())
 # materializing the 2023-08-21 partition of this asset will log:
 # DailyPartitionsDefinition("2023-08-20")
 @multi_asset(
 outs=\{
 "first_asset": AssetOut(key=["my_assets", "first_asset"]),
 "second_asset": AssetOut(key=["my_assets", "second_asset"])
 }
 partitions_def=partitions_def,
 )
 def a_multi_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partitions_def_for_output("first_asset"))
 context.log.info(context.asset_partitions_def_for_output("second_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # DailyPartitionsDefinition("2023-08-20")
 # DailyPartitionsDefinition("2023-08-20")
 - asset_partitions_time_window_for_input
- The time window for the partitions of the input asset. - If you want to write your asset to support running a backfill of several partitions in a single run, you can use - asset_partitions_time_window_for_inputto get the time window of the input that are relevant to that backfill.- Raises an error if either of the following are true: - The input asset has no partitioning.
- The input asset is not partitioned with a TimeWindowPartitionsDefinition or a MultiPartitionsDefinition with one time-partitioned dimension.
 - Parameters: input_name (str) – The name of the input to get the partition key for. Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def upstream_asset():
 ...
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partitions_time_window_for_input("upstream_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-22")
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-26")
 @asset(
 ins=\{
 "upstream_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 partitions_def=partitions_def,
 )
 def another_asset(context: AssetExecutionContext, upstream_asset):
 context.log.info(context.asset_partitions_time_window_for_input("upstream_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-20", "2023-08-21")
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-26")
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partitions_time_window_for_input("self_dependent_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-20", "2023-08-21")
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # TimeWindow("2023-08-20", "2023-08-25")
 - asset_partitions_time_window_for_output
- deprecatedThis API will be removed in version 2.0. Use partition_time_windowinstead..The time window for the partitions of the output asset. If you want to write your asset to support running a backfill of several partitions in a single run, you can use asset_partitions_time_window_for_outputto get the TimeWindow of all of the partitions being materialized by the backfill.Raises an error if either of the following are true: - The output asset has no partitioning.
- The output asset is not partitioned with a TimeWindowPartitionsDefinition or a MultiPartitionsDefinition with one time-partitioned dimension.
 Parameters: output_name (str) – For assets defined with the @assetdecorator, the name of the output will be automatically provided. For assets defined with@multi_asset,output_nameshould be the op output associated with the asset key (as determined by AssetOut) to get the time window for. Examples:partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def an_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partitions_time_window_for_output())
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-22")
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-26")
 @multi_asset(
 outs=\{
 "first_asset": AssetOut(key=["my_assets", "first_asset"]),
 "second_asset": AssetOut(key=["my_assets", "second_asset"])
 }
 partitions_def=partitions_def,
 )
 def a_multi_asset(context: AssetExecutionContext):
 context.log.info(context.asset_partitions_time_window_for_output("first_asset"))
 context.log.info(context.asset_partitions_time_window_for_output("second_asset"))
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-22")
 # TimeWindow("2023-08-21", "2023-08-22")
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-26")
 # TimeWindow("2023-08-21", "2023-08-26")
 @asset(
 partitions_def=partitions_def,
 ins=\{
 "self_dependent_asset": AssetIn(partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1))
 }
 )
 def self_dependent_asset(context: AssetExecutionContext, self_dependent_asset):
 context.log.info(context.asset_partitions_time_window_for_output())
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-22")
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-26")
 - get_asset_provenance
- experimentalThis API may break in future versions, even between dot releases. Return the provenance information for the most recent materialization of an asset. Parameters: asset_key (AssetKey) – Key of the asset for which to retrieve provenance.Returns: Provenance information for the most recent materialization of the asset. Returns None if the asset was never materialized or the materialization record is too old to contain provenance information. Return type: Optional[DataProvenance] 
 - get_mapping_key
- Which mapping_key this execution is for if downstream of a DynamicOutput, otherwise None. 
 - get_tag
- Get a logging tag. - Parameters: key (tag) – The tag to get.Returns: The value of the tag, if present.Return type: Optional[str] 
 - has_tag
- Check if a logging tag is set. - Parameters: key (str) – The tag to check.Returns: Whether the tag is set.Return type: bool 
 - log_event
- Log an AssetMaterialization, AssetObservation, or ExpectationResult from within the body of an op. - Events logged with this method will appear in the list of DagsterEvents, as well as the event log. - Parameters: event (Union[AssetMaterialization, AssetObservation, ExpectationResult]) – The event to log. Examples: - from dagster import op, AssetMaterialization
 @op
 def log_materialization(context):
 context.log_event(AssetMaterialization("foo"))
 - output_for_asset_key
- Return the output name for the corresponding asset key. 
 - property asset_key
- The AssetKey for the current asset. In a multi_asset, use asset_key_for_output instead. 
 - property asset_partition_key_range
- deprecatedThis API will be removed in version 2.0. Use partition_key_rangeinstead..The range of partition keys for the current run. If run is for a single partition key, return a PartitionKeyRange with the same start and end. Raises an error if the current run is not a partitioned run. 
 - property assets_def
- The backing AssetsDefinition for what is currently executing, errors if not available. 
 - property has_assets_def
- If there is a backing AssetsDefinition for what is currently executing. 
 - property has_partition_key
- Whether the current run is a partitioned run. 
 - property has_partition_key_range
- Whether the current run is a partitioned run. 
 - property instance
- The current Dagster instance. - Type: DagsterInstance 
 - property job_def
- The currently executing job. - Type: JobDefinition 
 - property job_name
- The name of the currently executing pipeline. - Type: str 
 - property log
- The log manager available in the execution context. - Type: DagsterLogManager 
 - property op_config
- The parsed config specific to this op. - Type: Any 
 - property op_def
- The current op definition. - Type: OpDefinition 
 - property partition_key
- The partition key for the current run. - Raises an error if the current run is not a partitioned run. Or if the current run is operating over a range of partitions (ie. a backfill of several partitions executed in a single run). - Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def my_asset(context: AssetExecutionContext):
 context.log.info(context.partition_key)
 # materializing the 2023-08-21 partition of this asset will log:
 # "2023-08-21"
 - property partition_key_range
- The range of partition keys for the current run. - If run is for a single partition key, returns a PartitionKeyRange with the same start and end. Raises an error if the current run is not a partitioned run. - Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def my_asset(context: AssetExecutionContext):
 context.log.info(context.partition_key_range)
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # PartitionKeyRange(start="2023-08-21", end="2023-08-25")
 - property partition_keys
- Returns a list of the partition keys for the current run. - If you want to write your asset to support running a backfill of several partitions in a single run, you can use - partition_keysto get all of the partitions being materialized by the backfill.- Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(partitions_def=partitions_def)
 def an_asset(context: AssetExecutionContext):
 context.log.info(context.partition_keys)
 # running a backfill of the 2023-08-21 through 2023-08-25 partitions of this asset will log:
 # ["2023-08-21", "2023-08-22", "2023-08-23", "2023-08-24", "2023-08-25"]
 - property partition_time_window
- The partition time window for the current run. - Raises an error if the current run is not a partitioned run, or if the job’s partition definition is not a TimeWindowPartitionsDefinition. - Examples: - partitions_def = DailyPartitionsDefinition("2023-08-20")
 @asset(
 partitions_def=partitions_def
 )
 def my_asset(context: AssetExecutionContext):
 context.log.info(context.partition_time_window)
 # materializing the 2023-08-21 partition of this asset will log:
 # TimeWindow("2023-08-21", "2023-08-22")
 - property pdb
- Gives access to pdb debugging from within the op. - Example: - @op
 def debug(context):
 context.pdb.set_trace()- Type: dagster.utils.forked_pdb.ForkedPdb 
 - property resources
- The currently available resources. - Type: Resources 
 - property retry_number
- Which retry attempt is currently executing i.e. 0 for initial attempt, 1 for first retry, etc. 
 - property run
- The current run. - Type: DagsterRun 
 - property run_config
- The run config for the current execution. - Type: dict 
 - property run_id
- The id of the current execution’s run. - Type: str 
 - property selected_asset_check_keys
- Get the asset check keys that correspond to the current selection of assets this execution is expected to materialize. 
 - property selected_asset_keys
- Get the set of AssetKeys this execution is expected to materialize. 
 - property selected_output_names
- Get the output names that correspond to the current selection of assets this execution is expected to materialize. 
 
- dagster.build_op_context
- Builds op execution context from provided parameters. - build_op_contextcan be used as either a function or context manager. If there is a provided resource that is a context manager, then- build_op_contextmust be used as a context manager. This function can be used to provide the context argument when directly invoking a op.- Parameters: - resources (Optional[Dict[str, Any]]) – The resources to provide to the context. These can be
- op_config (Optional[Mapping[str, Any]]) – The config to provide to the op.
- resources_config (Optional[Mapping[str, Any]]) – The config to provide to the resources.
- instance (Optional[DagsterInstance]) – The dagster instance configured for the context.
- mapping_key (Optional[str]) – A key representing the mapping key from an upstream dynamic
- partition_key (Optional[str]) – String value representing partition key to execute with.
- partition_key_range (Optional[PartitionKeyRange]) – Partition key range to execute with.
- run_tags – Optional[Mapping[str, str]]: The tags for the executing run.
 - Examples: - context = build_op_context()
 op_to_invoke(context)
 with build_op_context(resources=\{"foo": context_manager_resource}) as context:
 op_to_invoke(context)
- dagster.build_asset_context
- Builds asset execution context from provided parameters. - build_asset_contextcan be used as either a function or context manager. If there is a provided resource that is a context manager, then- build_asset_contextmust be used as a context manager. This function can be used to provide the context argument when directly invoking an asset.- Parameters: - resources (Optional[Dict[str, Any]]) – The resources to provide to the context. These can be
- resources_config (Optional[Mapping[str, Any]]) – The config to provide to the resources.
- asset_config (Optional[Mapping[str, Any]]) – The config to provide to the asset.
- instance (Optional[DagsterInstance]) – The dagster instance configured for the context.
- partition_key (Optional[str]) – String value representing partition key to execute with.
- partition_key_range (Optional[PartitionKeyRange]) – Partition key range to execute with.
- run_tags – Optional[Mapping[str, str]]: The tags for the executing run.
 - Examples: - context = build_asset_context()
 asset_to_invoke(context)
 with build_asset_context(resources=\{"foo": context_manager_resource}) as context:
 asset_to_invoke(context)
- class dagster.TypeCheckContext
- The - contextobject available to a type check function on a DagsterType.- property log
- Centralized log dispatch from user code. 
 - property resources
- An object whose attributes contain the resources available to this op. 
 - property run_id
- The id of this job run. 
 
Job configuration
- dagster.validate_run_config
- Function to validate a provided run config blob against a given job. - If validation is successful, this function will return a dictionary representation of the validated config actually used during execution. - Parameters: - job_def (JobDefinition) – The job definition to validate run
- run_config (Optional[Dict[str, Any]]) – The run config to validate
 - Returns: A dictionary representation of the validated config.Return type: Dict[str, Any] 
Run Config Schema
The run_config used for jobs has the following schema:
\{
  # configuration for execution, required if executors require config
  execution: \{
    # the name of one, and only one available executor, typically 'in_process' or 'multiprocess'
    __executor_name__: \{
      # executor-specific config, if required or permitted
      config: \{
        ...
      }
    }
  },
  # configuration for loggers, required if loggers require config
  loggers: \{
    # the name of an available logger
    __logger_name__: \{
      # logger-specific config, if required or permitted
      config: \{
        ...
      }
    },
    ...
  },
  # configuration for resources, required if resources require config
  resources: \{
    # the name of a resource
    __resource_name__: \{
      # resource-specific config, if required or permitted
      config: \{
        ...
      }
    },
    ...
  },
  # configuration for underlying ops, required if ops require config
  ops: \{
    # these keys align with the names of the ops, or their alias in this job
    __op_name__: \{
      # pass any data that was defined via config_field
      config: ...,
      # configurably specify input values, keyed by input name
      inputs: \{
        __input_name__: \{
          # if an dagster_type_loader is specified, that schema must be satisfied here;
          # scalar, built-in types will generally allow their values to be specified directly:
          value: ...
        }
      },
    }
  },
}