Apache Airflow 3.0 marks a significant evolution in data pipeline orchestration, introducing major improvements that simplify architecture, enhance scalability, and offer powerful new features.
The previous Airflow architecture required every component (scheduler, workers, web server) to directly access the metadata database, which was challenging in secure and private environments. Airflow 3.0 solves this with a new API server that centralizes all database interactions, allowing secure, proxy-based communication.
Benefits:
Previously, event-driven workflows relied on inefficient polling mechanisms. Airflow 3.0 introduces asset watchers, enabling workflows to trigger tasks based on events from external systems (e.g., AWS S3, Kafka).
Example:
from airflow.decorators import dag, asset
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from datetime import datetime
@dag(start_date=datetime(2024, 6, 4))
def s3_event_dag():
asset_watch = S3KeySensor(
task_id='watch_s3',
bucket_name='my-bucket',
bucket_key='path/to/file.csv',
aws_conn_id='aws_default',
mode='reschedule',
)
s3_event_dag()
Airflow 3.0 shifts from task-oriented DAG definitions to asset-based pipelines, making workflows more declarative.
Task Flow Example (Airflow 2.x):
@dag(start_date=datetime(2024, 6, 4))
def traditional_dag():
@task
def extract(): return "data"
@task
def transform(data): return data + " transformed"
@task
def load(data): print(data)
data = extract()
transformed = transform(data)
load(transformed)
traditional_dag()
Asset Example (Airflow 3.0):
from airflow.decorators import asset, asset_group
@asset_group
def data_pipeline():
@asset
def extract():
return "data"
@asset
def transform(extract):
return extract + " transformed"
@asset
def load(transform):
print(transform)
DAG Bundles allow structured and controlled management of DAGs and supporting files. Bundles can be sourced from local files, Git repositories, or cloud storage.
Example: