Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4069,6 +4069,11 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[
This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query,
you should ensure that any scheduling decisions are made in a single transaction -- as soon as the
transaction is committed it will be unlocked.

For dataset-triggered scheduling, DAGs that have ``DatasetDagRunQueue`` rows but no matching
``SerializedDagModel`` row are omitted from the returned ``dataset_triggered_dag_info`` until
serialization exists; queue rows are **not** deleted here so the scheduler can re-evaluate on a
later run.
"""
from airflow.models.serialized_dag import SerializedDagModel

Expand All @@ -4094,13 +4099,33 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None:
ser_dags = session.scalars(
select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys()))
).all()
ser_dag_ids = {s.dag_id for s in ser_dags}
missing_from_serialized = set(by_dag.keys()) - ser_dag_ids
if missing_from_serialized:
log.debug(
"DAGs in DDRQ but missing SerializedDagModel "
"(skipping — condition cannot be evaluated): %s",
sorted(missing_from_serialized),
)
for dag_id in missing_from_serialized:
del by_dag[dag_id]
del dag_statuses[dag_id]
for ser_dag in ser_dags:
dag_id = ser_dag.dag_id
statuses = dag_statuses[dag_id]
dataset_condition = ser_dag.dag.timetable.dataset_condition

if not dag_ready(dag_id, cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses):
if not dag_ready(dag_id, cond=dataset_condition, statuses=statuses):
del by_dag[dag_id]
del dag_statuses[dag_id]
else:
log.debug(
"Dataset condition satisfied: dag_id=%s, condition=%s, ddrq_uris=%s, ddrq_count=%d",
dag_id,
dataset_condition,
sorted(statuses.keys()),
len(statuses),
)
del dag_statuses
dataset_triggered_dag_info = {}
for dag_id, records in by_dag.items():
Expand Down
1 change: 1 addition & 0 deletions newsfragments/63546.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix premature dataset-triggered DagRuns when ``SerializedDagModel`` was missing while ``DatasetDagRunQueue`` still had rows for that DAG; queue entries are kept for the next evaluation.
103 changes: 103 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3064,6 +3064,109 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session):
dag_models = query.all()
assert dag_models == [dag_model]

def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, session, caplog):
"""DDRQ rows for a dag_id without SerializedDagModel must be skipped (no dataset_triggered info).

Rows must remain in ``dataset_dag_run_queue`` so the scheduler can re-evaluate on a later
heartbeat once ``SerializedDagModel`` exists (``dags_needing_dagruns`` only drops them from
the in-memory candidate set, it does not delete ORM rows).
"""
orphan_dag_id = "ddr_q_no_serialized_dag"
session.add(DatasetModel(uri="dataset_for_orphan_ddrq"))
session.flush()
dataset_id = session.query(DatasetModel.id).filter_by(uri="dataset_for_orphan_ddrq").scalar()
session.add(
DagModel(
dag_id=orphan_dag_id,
max_active_tasks=1,
has_task_concurrency_limits=False,
next_dagrun=timezone.datetime(2038, 1, 1),
next_dagrun_create_after=timezone.datetime(2038, 1, 2),
is_active=True,
has_import_errors=False,
is_paused=False,
)
)
session.flush()
session.add(DatasetDagRunQueue(dataset_id=dataset_id, target_dag_id=orphan_dag_id))
session.flush()

with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
_query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session)

assert orphan_dag_id not in dataset_triggered_dag_info
assert "DAGs in DDRQ but missing SerializedDagModel" in caplog.text
assert orphan_dag_id in caplog.text
assert (
session.query(DatasetDagRunQueue)
.filter(DatasetDagRunQueue.target_dag_id == orphan_dag_id)
.count()
== 1
)

def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(self, session, caplog):
"""When multiple dags lack SerializedDagModel, the warning lists dag_ids sorted."""
session.add_all(
[
DatasetModel(uri="ds_ghost_z"),
DatasetModel(uri="ds_ghost_a"),
]
)
session.flush()
ds_z_id = session.query(DatasetModel.id).filter_by(uri="ds_ghost_z").scalar()
ds_a_id = session.query(DatasetModel.id).filter_by(uri="ds_ghost_a").scalar()
far = timezone.datetime(2038, 1, 1)
far_after = timezone.datetime(2038, 1, 2)
session.add_all(
[
DagModel(
dag_id="ghost_z",
max_active_tasks=1,
has_task_concurrency_limits=False,
next_dagrun=far,
next_dagrun_create_after=far_after,
is_active=True,
has_import_errors=False,
is_paused=False,
),
DagModel(
dag_id="ghost_a",
max_active_tasks=1,
has_task_concurrency_limits=False,
next_dagrun=far,
next_dagrun_create_after=far_after,
is_active=True,
has_import_errors=False,
is_paused=False,
),
]
)
session.flush()

session.add_all(
[
DatasetDagRunQueue(dataset_id=ds_z_id, target_dag_id="ghost_z"),
DatasetDagRunQueue(dataset_id=ds_a_id, target_dag_id="ghost_a"),
]
)
session.flush()

with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"):
_query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session)

assert "ghost_a" not in dataset_triggered_dag_info
assert "ghost_z" not in dataset_triggered_dag_info
msg = next(
r.message for r in caplog.records if "DAGs in DDRQ but missing SerializedDagModel" in r.message
)
assert msg.index("ghost_a") < msg.index("ghost_z")
assert (
session.query(DatasetDagRunQueue)
.filter(DatasetDagRunQueue.target_dag_id.in_(("ghost_a", "ghost_z")))
.count()
== 2
)

def test_dags_needing_dagruns_dataset_aliases(self, dag_maker, session):
# link dataset_alias hello_alias to dataset hello
dataset_model = DatasetModel(uri="hello")
Expand Down