Skip to content
26 changes: 26 additions & 0 deletions airflow-core/docs/authoring-and-scheduling/asset-scheduling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -419,3 +419,29 @@ AssetTimetable Integration
You can schedule Dags based on both asset events and time-based schedules using ``AssetOrTimeSchedule``. This allows you to create workflows when a Dag needs both to be triggered by data updates and run periodically according to a fixed timetable.

For more detailed information on ``AssetOrTimeSchedule``, refer to the corresponding section in :ref:`AssetOrTimeSchedule <asset-timetable-section>`.


Controlling DagRun creation per asset event
---------------------------------------------

.. versionadded:: 3.3.0

By default, when multiple asset events arrive for the same Dag between
Comment thread
renat-sagut marked this conversation as resolved.
scheduler ticks, they are batched into a single DagRun. Set
``batch_asset_events=False`` on the timetable to create one DagRun per
individual event instead.

.. code-block:: python

from airflow.sdk import DAG, Asset
from airflow.timetables.simple import AssetTriggeredTimetable

# Each update to "data-file" produces its own DagRun
with DAG(
dag_id="per-event-consumer",
schedule=AssetTriggeredTimetable(
assets=Asset("s3://bucket/data-file"),
batch_asset_events=False,
),
):
...
10 changes: 9 additions & 1 deletion airflow-core/src/airflow/assets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ def _queue_partitioned_dags(
rollup_fingerprint=fingerprint,
asset_id=asset_id,
session=session,
allow_reuse=timetable.batch_asset_events,
)
log_record = PartitionedAssetKeyLog(
asset_id=asset_id,
Expand All @@ -666,6 +667,7 @@ def _get_or_create_apdr(
rollup_fingerprint: dict,
asset_id: int,
session: Session,
allow_reuse: bool = True,
) -> AssetPartitionDagRun:
"""
Get or create an APDR.
Expand All @@ -679,6 +681,12 @@ def _get_or_create_apdr(
``rollup_fingerprint`` is the serialized mapper / window definition for all partitioned
assets in the timetable at creation time; the scheduler discards APDRs whose stamp no
longer matches the current timetable's fingerprint (mapper / window may have changed).

When ``allow_reuse=True`` (default), an existing pending APDR for the same
``(target_dag, partition_key)`` is reused — multiple events accumulate on one
APDR. When ``allow_reuse=False`` (set when the timetable's ``batch_asset_events``
is ``False``), a new APDR is always created so each event gets its own APDR
and the scheduler produces one DagRun per event.
"""
with _lock_asset_model(session=session, asset_id=asset_id):
latest_apdr: AssetPartitionDagRun | None = session.scalar(
Expand All @@ -690,7 +698,7 @@ def _get_or_create_apdr(
.order_by(AssetPartitionDagRun.id.desc())
.limit(1)
)
if latest_apdr and latest_apdr.created_dag_run_id is None:
if latest_apdr and latest_apdr.created_dag_run_id is None and allow_reuse:
cls.logger().debug(
"Existing APDR found for key %s dag_id %s",
target_key,
Expand Down
49 changes: 28 additions & 21 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2572,27 +2572,34 @@ def _create_dag_runs_asset_triggered(
)
)

dag_run = dag.create_dagrun(
run_id=DagRun.generate_run_id(
run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, run_after=triggered_date
),
logical_date=None,
data_interval=None,
run_after=triggered_date,
run_type=DagRunType.ASSET_TRIGGERED,
triggered_by=DagRunTriggeredByType.ASSET,
state=DagRunState.QUEUED,
creating_job_id=self.job.id,
session=session,
)
stats.incr("asset.triggered_dagruns")
dag_run.consumed_asset_events.extend(asset_events)
self.log.info(
"Created asset-triggered DagRun for '%s': run_id=%s, consumed %d asset events",
dag.dag_id,
dag_run.run_id,
len(asset_events),
)
# Build the list of (run_after, events) to process: one entry per DagRun to create.
if dag.timetable.batch_asset_events:
event_runs = [(triggered_date, asset_events)]
else:
event_runs = [(timezone.coerce_datetime(ev.timestamp), [ev]) for ev in asset_events]

for run_after, events in event_runs:
dag_run = dag.create_dagrun(
run_id=DagRun.generate_run_id(
run_type=DagRunType.ASSET_TRIGGERED, logical_date=None, run_after=run_after
),
logical_date=None,
data_interval=None,
run_after=run_after,
run_type=DagRunType.ASSET_TRIGGERED,
triggered_by=DagRunTriggeredByType.ASSET,
state=DagRunState.QUEUED,
creating_job_id=self.job.id,
session=session,
)
stats.incr("asset.triggered_dagruns")
dag_run.consumed_asset_events.extend(events)
self.log.info(
"Created asset-triggered DagRun for '%s': run_id=%s, consumed %d asset events",
dag.dag_id,
dag_run.run_id,
len(events),
)

# Delete only consumed ADRQ rows to avoid dropping newly queued events
# (e.g. DagRun triggered by asset A while a new event for asset B arrives).
Expand Down
6 changes: 5 additions & 1 deletion airflow-core/src/airflow/serialization/encoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,10 @@ def _(

@serialize_timetable.register
def _(self, timetable: AssetTriggeredTimetable) -> dict[str, Any]:
return {"asset_condition": encode_asset_like(timetable.asset_condition)}
return {
"asset_condition": encode_asset_like(timetable.asset_condition),
"batch_asset_events": timetable.batch_asset_events,
}

@serialize_timetable.register
def _(self, timetable: EventsTimetable) -> dict[str, Any]:
Expand Down Expand Up @@ -434,6 +437,7 @@ def _(self, timetable: CoreTimetable) -> dict[str, Any]:
def _(self, timetable: PartitionedAssetTimetable) -> dict[str, Any]:
return {
"asset_condition": encode_asset_like(timetable.asset_condition),
"batch_asset_events": timetable.batch_asset_events,
"default_partition_mapper": encode_partition_mapper(timetable.default_partition_mapper),
"partition_mapper_config": [
(encode_asset_like(asset), encode_partition_mapper(partition_mapper))
Expand Down
20 changes: 16 additions & 4 deletions airflow-core/src/airflow/timetables/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,11 @@ class AssetTriggeredTimetable(_TrivialTimetable):

description: str = "Triggered by assets"

def __init__(self, assets: Collection[SerializedAsset] | SerializedAssetBase) -> None:
def __init__(
self, assets: Collection[SerializedAsset] | SerializedAssetBase, *, batch_asset_events: bool = True
) -> None:
super().__init__()
self.batch_asset_events = batch_asset_events
# Compatibility: Handle SDK assets if needed so this class works in dag files.
if isinstance(assets, SerializedAssetBase | BaseAsset):
self.asset_condition = ensure_serialized_asset(assets)
Expand All @@ -228,14 +231,20 @@ def __init__(self, assets: Collection[SerializedAsset] | SerializedAssetBase) ->
def deserialize(cls, data: dict[str, Any]) -> Timetable:
from airflow.serialization.decoders import decode_asset_like

return cls(decode_asset_like(data["asset_condition"]))
return cls(
decode_asset_like(data["asset_condition"]),
batch_asset_events=data.get("batch_asset_events", True),
)

@property
def summary(self) -> str:
return "Asset"

def serialize(self) -> dict[str, Any]:
return {"asset_condition": encode_asset_like(self.asset_condition)}
return {
"asset_condition": encode_asset_like(self.asset_condition),
"batch_asset_events": self.batch_asset_events,
}

def generate_run_id(
self,
Expand Down Expand Up @@ -283,10 +292,11 @@ def __init__(
self,
*,
assets: SerializedAssetBase,
batch_asset_events: bool = True,
partition_mapper_config: dict[SerializedAssetBase, PartitionMapper] | None = None,
default_partition_mapper: PartitionMapper = DEFAULT_PARTITION_MAPPER,
) -> None:
super().__init__(assets=assets)
super().__init__(assets=assets, batch_asset_events=batch_asset_events)
self.partition_mapper_config = partition_mapper_config or {}
self.default_partition_mapper = default_partition_mapper

Expand Down Expand Up @@ -360,6 +370,7 @@ def serialize(self) -> dict[str, Any]:

return {
"asset_condition": encode_asset_like(self.asset_condition),
"batch_asset_events": self.batch_asset_events,
"partition_mapper_config": [
(encode_asset_like(asset), encode_partition_mapper(partition_mapper))
for asset, partition_mapper in self.partition_mapper_config.items()
Expand All @@ -377,6 +388,7 @@ def deserialize(cls, data: dict[str, Any]) -> PartitionedAssetTimetable:

timetable = cls(
assets=decode_asset_like(data["asset_condition"]),
batch_asset_events=data.get("batch_asset_events", True),
default_partition_mapper=decode_partition_mapper(default_partition_mapper_data),
partition_mapper_config={
decode_asset_like(ser_asest): decode_partition_mapper(ser_partition_mapper)
Expand Down
80 changes: 80 additions & 0 deletions airflow-core/tests/unit/assets/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,86 @@ def _get_or_create_apdr():
assert len(set(ids)) == 1
assert session.scalar(select(func.count()).select_from(AssetPartitionDagRun)) == 1

@pytest.mark.usefixtures("testing_dag_bundle")
def test_get_or_create_apdr_allow_reuse_true_reuses_pending(self, session):
"""``allow_reuse=True`` (default) reuses a pending APDR for the same ``(target_dag, partition_key)``.

When two events arrive for the same key and ``allow_reuse=True``, the
second call returns the same APDR — they accumulate on one row.
"""
clear_db_apdr()
clear_db_pakl()
asm = AssetModel(uri="test://reuse-true/", name="reuse_asset_true", group="asset")
testing_dag = DagModel(dag_id="reuse_test_dag_true", is_stale=False, bundle_name="testing")
session.add_all([asm, testing_dag])
session.commit()
session.flush()
assert session.scalar(select(func.count()).select_from(AssetPartitionDagRun)) == 0

rollup_fingerprint = {}

apdr_1 = AssetManager._get_or_create_apdr(
target_key="key-1",
target_dag=testing_dag,
rollup_fingerprint=rollup_fingerprint,
asset_id=asm.id,
session=session,
allow_reuse=True,
)
apdr_2 = AssetManager._get_or_create_apdr(
target_key="key-1",
target_dag=testing_dag,
rollup_fingerprint=rollup_fingerprint,
asset_id=asm.id,
session=session,
allow_reuse=True,
)

assert apdr_1.id == apdr_2.id
assert session.scalar(select(func.count()).select_from(AssetPartitionDagRun)) == 1
assert apdr_1.created_dag_run_id is None # still pending

@pytest.mark.usefixtures("testing_dag_bundle")
def test_get_or_create_apdr_allow_reuse_false_creates_new(self, session):
"""``allow_reuse=False`` creates a new APDR each call even if a pending one exists for the same key.

When two events arrive for the same key and ``allow_reuse=False``, each
event gets its own APDR — the scheduler later produces one DagRun per
event.
"""
clear_db_apdr()
clear_db_pakl()
asm = AssetModel(uri="test://reuse-false/", name="reuse_asset_false", group="asset")
testing_dag = DagModel(dag_id="reuse_test_dag_false", is_stale=False, bundle_name="testing")
session.add_all([asm, testing_dag])
session.commit()
session.flush()
assert session.scalar(select(func.count()).select_from(AssetPartitionDagRun)) == 0

rollup_fingerprint = {}

apdr_1 = AssetManager._get_or_create_apdr(
target_key="key-1",
target_dag=testing_dag,
rollup_fingerprint=rollup_fingerprint,
asset_id=asm.id,
session=session,
allow_reuse=False,
)
apdr_2 = AssetManager._get_or_create_apdr(
target_key="key-1",
target_dag=testing_dag,
rollup_fingerprint=rollup_fingerprint,
asset_id=asm.id,
session=session,
allow_reuse=False,
)

assert apdr_1.id != apdr_2.id
assert session.scalar(select(func.count()).select_from(AssetPartitionDagRun)) == 2
assert apdr_1.created_dag_run_id is None
assert apdr_2.created_dag_run_id is None

@pytest.mark.need_serialized_dag
@pytest.mark.usefixtures("testing_dag_bundle")
def test_queue_partitioned_dags_stamps_rollup_fingerprint(self, session, dag_maker):
Expand Down
Loading
Loading