diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 6b3744db13894..35ea26cb54be8 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4069,6 +4069,11 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[ This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked. + + For dataset-triggered scheduling, DAGs that have ``DatasetDagRunQueue`` rows but no matching + ``SerializedDagModel`` row are omitted from the returned ``dataset_triggered_dag_info`` until + serialization exists; queue rows are **not** deleted here so the scheduler can re-evaluate on a + later run. """ from airflow.models.serialized_dag import SerializedDagModel @@ -4094,13 +4099,33 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: ser_dags = session.scalars( select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys())) ).all() + ser_dag_ids = {s.dag_id for s in ser_dags} + missing_from_serialized = set(by_dag.keys()) - ser_dag_ids + if missing_from_serialized: + log.debug( + "DAGs in DDRQ but missing SerializedDagModel " + "(skipping — condition cannot be evaluated): %s", + sorted(missing_from_serialized), + ) + for dag_id in missing_from_serialized: + del by_dag[dag_id] + del dag_statuses[dag_id] for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] + dataset_condition = ser_dag.dag.timetable.dataset_condition - if not dag_ready(dag_id, cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses): + if not dag_ready(dag_id, cond=dataset_condition, statuses=statuses): del by_dag[dag_id] del dag_statuses[dag_id] + else: + log.debug( + "Dataset condition satisfied: dag_id=%s, condition=%s, ddrq_uris=%s, ddrq_count=%d", + dag_id, + dataset_condition, + sorted(statuses.keys()), + len(statuses), + ) del dag_statuses dataset_triggered_dag_info = {} for dag_id, records in by_dag.items(): diff --git a/newsfragments/63546.bugfix.rst b/newsfragments/63546.bugfix.rst new file mode 100644 index 0000000000000..666d5db207a58 --- /dev/null +++ b/newsfragments/63546.bugfix.rst @@ -0,0 +1 @@ +Fix premature dataset-triggered DagRuns when ``SerializedDagModel`` was missing while ``DatasetDagRunQueue`` still had rows for that DAG; queue entries are kept for the next evaluation. diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 5f721b61d2691..87212b13943d2 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3064,6 +3064,109 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session): dag_models = query.all() assert dag_models == [dag_model] + def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, session, caplog): + """DDRQ rows for a dag_id without SerializedDagModel must be skipped (no dataset_triggered info). + + Rows must remain in ``dataset_dag_run_queue`` so the scheduler can re-evaluate on a later + heartbeat once ``SerializedDagModel`` exists (``dags_needing_dagruns`` only drops them from + the in-memory candidate set, it does not delete ORM rows). + """ + orphan_dag_id = "ddr_q_no_serialized_dag" + session.add(DatasetModel(uri="dataset_for_orphan_ddrq")) + session.flush() + dataset_id = session.query(DatasetModel.id).filter_by(uri="dataset_for_orphan_ddrq").scalar() + session.add( + DagModel( + dag_id=orphan_dag_id, + max_active_tasks=1, + has_task_concurrency_limits=False, + next_dagrun=timezone.datetime(2038, 1, 1), + next_dagrun_create_after=timezone.datetime(2038, 1, 2), + is_active=True, + has_import_errors=False, + is_paused=False, + ) + ) + session.flush() + session.add(DatasetDagRunQueue(dataset_id=dataset_id, target_dag_id=orphan_dag_id)) + session.flush() + + with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"): + _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) + + assert orphan_dag_id not in dataset_triggered_dag_info + assert "DAGs in DDRQ but missing SerializedDagModel" in caplog.text + assert orphan_dag_id in caplog.text + assert ( + session.query(DatasetDagRunQueue) + .filter(DatasetDagRunQueue.target_dag_id == orphan_dag_id) + .count() + == 1 + ) + + def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(self, session, caplog): + """When multiple dags lack SerializedDagModel, the warning lists dag_ids sorted.""" + session.add_all( + [ + DatasetModel(uri="ds_ghost_z"), + DatasetModel(uri="ds_ghost_a"), + ] + ) + session.flush() + ds_z_id = session.query(DatasetModel.id).filter_by(uri="ds_ghost_z").scalar() + ds_a_id = session.query(DatasetModel.id).filter_by(uri="ds_ghost_a").scalar() + far = timezone.datetime(2038, 1, 1) + far_after = timezone.datetime(2038, 1, 2) + session.add_all( + [ + DagModel( + dag_id="ghost_z", + max_active_tasks=1, + has_task_concurrency_limits=False, + next_dagrun=far, + next_dagrun_create_after=far_after, + is_active=True, + has_import_errors=False, + is_paused=False, + ), + DagModel( + dag_id="ghost_a", + max_active_tasks=1, + has_task_concurrency_limits=False, + next_dagrun=far, + next_dagrun_create_after=far_after, + is_active=True, + has_import_errors=False, + is_paused=False, + ), + ] + ) + session.flush() + + session.add_all( + [ + DatasetDagRunQueue(dataset_id=ds_z_id, target_dag_id="ghost_z"), + DatasetDagRunQueue(dataset_id=ds_a_id, target_dag_id="ghost_a"), + ] + ) + session.flush() + + with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"): + _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) + + assert "ghost_a" not in dataset_triggered_dag_info + assert "ghost_z" not in dataset_triggered_dag_info + msg = next( + r.message for r in caplog.records if "DAGs in DDRQ but missing SerializedDagModel" in r.message + ) + assert msg.index("ghost_a") < msg.index("ghost_z") + assert ( + session.query(DatasetDagRunQueue) + .filter(DatasetDagRunQueue.target_dag_id.in_(("ghost_a", "ghost_z"))) + .count() + == 2 + ) + def test_dags_needing_dagruns_dataset_aliases(self, dag_maker, session): # link dataset_alias hello_alias to dataset hello dataset_model = DatasetModel(uri="hello")