sales_data = Asset(name="sales_data")
inventory_data = Asset(name="inventory_data")
pricing_data = Asset(name="pricing_data")
schedule = PartitionedAssetTimetable(
assets=sales_data & inventory_data & pricing_data, # AND condition
partition_mapper=IdentityMapper(), # pass partition key through unchanged
)
DagScheduleAssetReference (dag_schedule_asset_reference table)
asset_id (PK, FK), dag_id (PK, FK), created_at, updated_atairflow-core/src/airflow/models/asset.py:578When someone produces an asset event with a partition key (via REST API or task execution):
POST /api/v2/assets/events
{
"asset_id": 20,
"partition_key": "2024-01-15"
}
AssetManager.register_asset_change() (airflow-core/src/airflow/assets/manager.py:173) creates an AssetEvent with the partition key, then calls _queue_dagruns() (:316).
_queue_dagruns() identifies partition-driven DAGs by checking timetable_summary == "Partitioned Asset" (:332), then delegates to _queue_partitioned_dags() (:360).
AssetEvent (asset_id=20, partition_key="2024-01-15")
|
v
AssetManager._queue_partitioned_dags()
| partition_mapper.to_downstream("2024-01-15") -> "2024-01-15"
|
v
_get_or_create_apdr()
| Finds or creates AssetPartitionDagRun
| (target_dag_id, partition_key="2024-01-15", created_dag_run_id=NULL)
|
v
PartitionedAssetKeyLog
asset_id = 20
source_partition_key = "2024-01-15"
target_partition_key = "2024-01-15" <- from IdentityMapper
asset_partition_dag_run_id = <apdr.id>
_get_or_create_apdr() (:411) uses a mutex lock on AssetModel (PostgreSQL/MySQL) or AssetPartitionDagRunMutexLock (SQLite) to prevent duplicate APDR creation when multiple processes handle events concurrently.