Partitioned Asset System

1. Asset Definition

Define assets in a DAG file

sales_data = Asset(name="sales_data")
inventory_data = Asset(name="inventory_data")
pricing_data = Asset(name="pricing_data")

Consumer DAG: trigger only when all 3 assets arrive for the same partition

schedule = PartitionedAssetTimetable(
    assets=sales_data & inventory_data & pricing_data,  # AND condition
    partition_mapper=IdentityMapper(),  # pass partition key through unchanged
)

After parsing, Airflow creates DB records


2. Asset Event Registration

When someone produces an asset event with a partition key (via REST API or task execution):

POST /api/v2/assets/events
{
  "asset_id": 20,
  "partition_key": "2024-01-15"
}

DB write flow

AssetManager.register_asset_change() (airflow-core/src/airflow/assets/manager.py:173) creates an AssetEvent with the partition key, then calls _queue_dagruns() (:316).

_queue_dagruns() identifies partition-driven DAGs by checking timetable_summary == "Partitioned Asset" (:332), then delegates to _queue_partitioned_dags() (:360).

AssetEvent (asset_id=20, partition_key="2024-01-15")
    |
    v
AssetManager._queue_partitioned_dags()
    |   partition_mapper.to_downstream("2024-01-15") -> "2024-01-15"
    |
    v
_get_or_create_apdr()
    |   Finds or creates AssetPartitionDagRun
    |   (target_dag_id, partition_key="2024-01-15", created_dag_run_id=NULL)
    |
    v
PartitionedAssetKeyLog
    asset_id = 20
    source_partition_key = "2024-01-15"
    target_partition_key = "2024-01-15"   <- from IdentityMapper
    asset_partition_dag_run_id = <apdr.id>

Race condition handling

_get_or_create_apdr() (:411) uses a mutex lock on AssetModel (PostgreSQL/MySQL) or AssetPartitionDagRunMutexLock (SQLite) to prevent duplicate APDR creation when multiple processes handle events concurrently.