From d8a64b6e4a91ab8da6d76103d6fc3190d7419577 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Fri, 13 Mar 2026 12:04:59 -0300 Subject: [PATCH 01/11] feat(scheduler): add INFO logging for dataset-triggered DagRun creation Log which DAGs are selected as dataset-triggered (with ADRQ timestamp ranges) and log successful DagRun creation with dag_id, exec_date, prev_exec, event count, and event URIs. This provides visibility into the scheduler's dataset trigger decisions for debugging premature trigger incidents. Made-with: Cursor --- airflow/jobs/scheduler_job_runner.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 2725dd71d9f3b..28b467b001b30 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1326,6 +1326,13 @@ def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: Sessio non_dataset_dags = all_dags_needing_dag_runs.difference(dataset_triggered_dags) self._create_dag_runs(non_dataset_dags, session) if dataset_triggered_dags: + self.log.info( + "Dataset-triggered DAGs ready: %s", + { + dag_id: (str(first), str(last)) + for dag_id, (first, last) in dataset_triggered_dag_info.items() + }, + ) self._create_dag_runs_dataset_triggered( dataset_triggered_dags, dataset_triggered_dag_info, session ) @@ -1502,6 +1509,15 @@ def _create_dag_runs_dataset_triggered( ) Stats.incr("dataset.triggered_dagruns") dag_run.consumed_dataset_events.extend(dataset_events) + self.log.info( + "Dataset-triggered DagRun created: dag_id=%s, exec_date=%s, " + "prev_exec=%s, events_consumed=%d, event_uris=%s", + dag.dag_id, + exec_date, + previous_dag_run.execution_date if previous_dag_run else None, + len(dataset_events), + sorted({e.dataset.uri for e in dataset_events}), + ) session.execute( delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == dag_run.dag_id) ) From 3b47b1d447553debf0a1155b06573e5cb1c80bd0 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Mon, 16 Mar 2026 12:16:07 -0300 Subject: [PATCH 02/11] feat(scheduler): add deep diagnostic logging for dataset trigger decisions Log the full context of dataset-triggered scheduling to debug premature trigger incidents: - P0: Log condition, DDRQ URIs, and count when dataset_condition is satisfied (INFO in dags_needing_dagruns) - P1: Warn on DDRQ/event mismatch when queued URIs have no matching DatasetEvent in the timestamp range (WARNING in _create_dag_runs_dataset_triggered) - P2: Include data_interval start/end in the DagRun creation log - P3: Log consumed event timestamps and source DAG/run_id (DEBUG) Made-with: Cursor --- airflow/jobs/scheduler_job_runner.py | 39 +++++++++++++++++++++++++++- airflow/models/dag.py | 12 ++++++++- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 28b467b001b30..72300a0972c6c 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1487,6 +1487,26 @@ def _create_dag_runs_dataset_triggered( .where(*dataset_event_filters) ).all() + ddrq_records = session.scalars( + select(DatasetDagRunQueue).where( + DatasetDagRunQueue.target_dag_id == dag.dag_id + ) + ).all() + ddrq_uris = {r.dataset.uri for r in ddrq_records} + consumed_uris = {e.dataset.uri for e in dataset_events} + missing_uris = ddrq_uris - consumed_uris + if missing_uris: + self.log.warning( + "DDRQ/event mismatch: dag_id=%s has DDRQ URIs %s with no matching " + "DatasetEvent in range (prev_exec=%s, exec_date=%s]. " + "Consumed URIs: %s. Possible stale DDRQ records.", + dag.dag_id, + sorted(missing_uris), + previous_dag_run.execution_date if previous_dag_run else None, + exec_date, + sorted(consumed_uris), + ) + data_interval = dag.timetable.data_interval_for_events(exec_date, dataset_events) run_id = dag.timetable.generate_run_id( run_type=DagRunType.DATASET_TRIGGERED, @@ -1511,13 +1531,30 @@ def _create_dag_runs_dataset_triggered( dag_run.consumed_dataset_events.extend(dataset_events) self.log.info( "Dataset-triggered DagRun created: dag_id=%s, exec_date=%s, " - "prev_exec=%s, events_consumed=%d, event_uris=%s", + "prev_exec=%s, data_interval=(%s, %s), " + "events_consumed=%d, event_uris=%s", dag.dag_id, exec_date, previous_dag_run.execution_date if previous_dag_run else None, + data_interval.start, + data_interval.end, len(dataset_events), sorted({e.dataset.uri for e in dataset_events}), ) + if dataset_events: + event_timestamps = [e.timestamp for e in dataset_events] + self.log.debug( + "Consumed event details: dag_id=%s, " + "event_ts_range=(%s, %s), " + "events=[%s]", + dag.dag_id, + min(event_timestamps), + max(event_timestamps), + ", ".join( + f"{e.dataset.uri}|ts={e.timestamp}|src={e.source_dag_id}/{e.source_run_id}" + for e in dataset_events + ), + ) session.execute( delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == dag_run.dag_id) ) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 6b3744db13894..b2984560bc5d6 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4097,10 +4097,20 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] + dataset_condition = ser_dag.dag.timetable.dataset_condition - if not dag_ready(dag_id, cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses): + if not dag_ready(dag_id, cond=dataset_condition, statuses=statuses): del by_dag[dag_id] del dag_statuses[dag_id] + else: + log.info( + "Dataset condition satisfied: dag_id=%s, condition=%s, " + "ddrq_uris=%s, ddrq_count=%d", + dag_id, + dataset_condition, + sorted(statuses.keys()), + len(statuses), + ) del dag_statuses dataset_triggered_dag_info = {} for dag_id, records in by_dag.items(): From 2b88a0a0180b98a0b0a884bd5a16d766bc3931d2 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Mon, 16 Mar 2026 15:28:19 -0300 Subject: [PATCH 03/11] chore(scheduler): Add [DEBUG DATASETS] tag to logs Made-with: Cursor --- airflow/jobs/scheduler_job_runner.py | 8 ++++---- airflow/models/dag.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 72300a0972c6c..1a8aeb3dde72c 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1327,7 +1327,7 @@ def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: Sessio self._create_dag_runs(non_dataset_dags, session) if dataset_triggered_dags: self.log.info( - "Dataset-triggered DAGs ready: %s", + "[DEBUG DATASETS] Dataset-triggered DAGs ready: %s", { dag_id: (str(first), str(last)) for dag_id, (first, last) in dataset_triggered_dag_info.items() @@ -1497,7 +1497,7 @@ def _create_dag_runs_dataset_triggered( missing_uris = ddrq_uris - consumed_uris if missing_uris: self.log.warning( - "DDRQ/event mismatch: dag_id=%s has DDRQ URIs %s with no matching " + "[DEBUG DATASETS] DDRQ/event mismatch: dag_id=%s has DDRQ URIs %s with no matching " "DatasetEvent in range (prev_exec=%s, exec_date=%s]. " "Consumed URIs: %s. Possible stale DDRQ records.", dag.dag_id, @@ -1530,7 +1530,7 @@ def _create_dag_runs_dataset_triggered( Stats.incr("dataset.triggered_dagruns") dag_run.consumed_dataset_events.extend(dataset_events) self.log.info( - "Dataset-triggered DagRun created: dag_id=%s, exec_date=%s, " + "[DEBUG DATASETS] Dataset-triggered DagRun created: dag_id=%s, exec_date=%s, " "prev_exec=%s, data_interval=(%s, %s), " "events_consumed=%d, event_uris=%s", dag.dag_id, @@ -1544,7 +1544,7 @@ def _create_dag_runs_dataset_triggered( if dataset_events: event_timestamps = [e.timestamp for e in dataset_events] self.log.debug( - "Consumed event details: dag_id=%s, " + "[DEBUG DATASETS] Consumed event details: dag_id=%s, " "event_ts_range=(%s, %s), " "events=[%s]", dag.dag_id, diff --git a/airflow/models/dag.py b/airflow/models/dag.py index b2984560bc5d6..64cdd0497d286 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4104,7 +4104,7 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: del dag_statuses[dag_id] else: log.info( - "Dataset condition satisfied: dag_id=%s, condition=%s, " + "[DEBUG DATASETS] Dataset condition satisfied: dag_id=%s, condition=%s, " "ddrq_uris=%s, ddrq_count=%d", dag_id, dataset_condition, From a0dd6c40e41a6a36edc3549baa2a4d9970d611cd Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Fri, 20 Mar 2026 10:55:49 -0300 Subject: [PATCH 04/11] fix(scheduler): guard against condition bypass when SerializedDagModel is missing DAGs with DDRQ entries but no corresponding SerializedDagModel were bypassing dataset condition evaluation in dags_needing_dagruns() and entering dataset_triggered_dag_info unchecked. This caused premature triggers with partial events when the DAG processor was mid-parse cycle. Now explicitly detects the mismatch and excludes those DAGs from the current scheduler loop. DDRQ entries are preserved so the DAG is re-evaluated on the next heartbeat (~5s) once serialization completes. Made-with: Cursor --- airflow/models/dag.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 64cdd0497d286..f0dd1e906f0c3 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4094,6 +4094,17 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: ser_dags = session.scalars( select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys())) ).all() + ser_dag_ids = {s.dag_id for s in ser_dags} + missing_from_serialized = set(by_dag.keys()) - ser_dag_ids + if missing_from_serialized: + log.warning( + "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel " + "(skipping — condition cannot be evaluated): %s", + sorted(missing_from_serialized), + ) + for dag_id in missing_from_serialized: + del by_dag[dag_id] + del dag_statuses[dag_id] for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] From 515441feab8a4f290a07a393ab74b26b4df78ee0 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Fri, 20 Mar 2026 15:02:24 -0300 Subject: [PATCH 05/11] fix(dag): remove missing_from_serialized after processing serialized DAGs This change ensures that the `missing_from_serialized` variable is deleted after its entries have been processed, preventing potential memory leaks and maintaining cleaner state management within the DAG model. --- airflow/models/dag.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index f0dd1e906f0c3..80b48cb5cf25e 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4105,6 +4105,7 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: for dag_id in missing_from_serialized: del by_dag[dag_id] del dag_statuses[dag_id] + del missing_from_serialized for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] From d11bc746158e635e5dce018c5699123e581b1989 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Fri, 20 Mar 2026 17:25:51 -0300 Subject: [PATCH 06/11] test(models): add tests for DDRQ entries without SerializedDagModel --- tests/models/test_dag.py | 81 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 5f721b61d2691..9b6d623cc878b 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3064,6 +3064,87 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session): dag_models = query.all() assert dag_models == [dag_model] + def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, session, caplog): + """DDRQ rows for a dag_id without SerializedDagModel must be skipped (no dataset_triggered info).""" + orphan_dag_id = "ddr_q_no_serialized_dag" + session.add(DatasetModel(uri="dataset_for_orphan_ddrq")) + session.flush() + dataset_id = session.query(DatasetModel.id).filter_by(uri="dataset_for_orphan_ddrq").scalar() + session.add( + DagModel( + dag_id=orphan_dag_id, + max_active_tasks=1, + has_task_concurrency_limits=False, + next_dagrun=timezone.datetime(2038, 1, 1), + next_dagrun_create_after=timezone.datetime(2038, 1, 2), + is_active=True, + has_import_errors=False, + is_paused=False, + ) + ) + session.add(DatasetDagRunQueue(dataset_id=dataset_id, target_dag_id=orphan_dag_id)) + session.flush() + + with caplog.at_level(logging.WARNING, logger="airflow.models.dag"): + _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) + + assert orphan_dag_id not in dataset_triggered_dag_info + assert "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel" in caplog.text + assert orphan_dag_id in caplog.text + + def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(self, session, caplog): + """When multiple dags lack SerializedDagModel, the warning lists dag_ids sorted.""" + session.add_all( + [ + DatasetModel(uri="ds_ghost_z"), + DatasetModel(uri="ds_ghost_a"), + ] + ) + session.flush() + ds_z_id = session.query(DatasetModel.id).filter_by(uri="ds_ghost_z").scalar() + ds_a_id = session.query(DatasetModel.id).filter_by(uri="ds_ghost_a").scalar() + far = timezone.datetime(2038, 1, 1) + far_after = timezone.datetime(2038, 1, 2) + session.add_all( + [ + DagModel( + dag_id="ghost_z", + max_active_tasks=1, + has_task_concurrency_limits=False, + next_dagrun=far, + next_dagrun_create_after=far_after, + is_active=True, + has_import_errors=False, + is_paused=False, + ), + DagModel( + dag_id="ghost_a", + max_active_tasks=1, + has_task_concurrency_limits=False, + next_dagrun=far, + next_dagrun_create_after=far_after, + is_active=True, + has_import_errors=False, + is_paused=False, + ), + DatasetDagRunQueue(dataset_id=ds_z_id, target_dag_id="ghost_z"), + DatasetDagRunQueue(dataset_id=ds_a_id, target_dag_id="ghost_a"), + ] + ) + session.flush() + + with caplog.at_level(logging.WARNING, logger="airflow.models.dag"): + _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) + + assert "ghost_a" not in dataset_triggered_dag_info + assert "ghost_z" not in dataset_triggered_dag_info + msg = next( + r.message + for r in caplog.records + if "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel" in r.message + ) + assert msg.index("ghost_a") < msg.index("ghost_z") + def test_dags_needing_dagruns_dataset_aliases(self, dag_maker, session): # link dataset_alias hello_alias to dataset hello dataset_model = DatasetModel(uri="hello") From 508da4cea6346af3df916e3ea4ba64a3601b1f5e Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Thu, 26 Mar 2026 16:42:22 -0300 Subject: [PATCH 07/11] chore(scheduler): remove dataset debug logging Drop [DEBUG DATASETS] instrumentation from SchedulerJobRunner and DagModel dataset-readiness loop; inline timetable dataset_condition where it is only used once. --- airflow/jobs/scheduler_job_runner.py | 53 ---------------------------- airflow/models/dag.py | 12 +------ 2 files changed, 1 insertion(+), 64 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 1a8aeb3dde72c..2725dd71d9f3b 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -1326,13 +1326,6 @@ def _create_dagruns_for_dags(self, guard: CommitProhibitorGuard, session: Sessio non_dataset_dags = all_dags_needing_dag_runs.difference(dataset_triggered_dags) self._create_dag_runs(non_dataset_dags, session) if dataset_triggered_dags: - self.log.info( - "[DEBUG DATASETS] Dataset-triggered DAGs ready: %s", - { - dag_id: (str(first), str(last)) - for dag_id, (first, last) in dataset_triggered_dag_info.items() - }, - ) self._create_dag_runs_dataset_triggered( dataset_triggered_dags, dataset_triggered_dag_info, session ) @@ -1487,26 +1480,6 @@ def _create_dag_runs_dataset_triggered( .where(*dataset_event_filters) ).all() - ddrq_records = session.scalars( - select(DatasetDagRunQueue).where( - DatasetDagRunQueue.target_dag_id == dag.dag_id - ) - ).all() - ddrq_uris = {r.dataset.uri for r in ddrq_records} - consumed_uris = {e.dataset.uri for e in dataset_events} - missing_uris = ddrq_uris - consumed_uris - if missing_uris: - self.log.warning( - "[DEBUG DATASETS] DDRQ/event mismatch: dag_id=%s has DDRQ URIs %s with no matching " - "DatasetEvent in range (prev_exec=%s, exec_date=%s]. " - "Consumed URIs: %s. Possible stale DDRQ records.", - dag.dag_id, - sorted(missing_uris), - previous_dag_run.execution_date if previous_dag_run else None, - exec_date, - sorted(consumed_uris), - ) - data_interval = dag.timetable.data_interval_for_events(exec_date, dataset_events) run_id = dag.timetable.generate_run_id( run_type=DagRunType.DATASET_TRIGGERED, @@ -1529,32 +1502,6 @@ def _create_dag_runs_dataset_triggered( ) Stats.incr("dataset.triggered_dagruns") dag_run.consumed_dataset_events.extend(dataset_events) - self.log.info( - "[DEBUG DATASETS] Dataset-triggered DagRun created: dag_id=%s, exec_date=%s, " - "prev_exec=%s, data_interval=(%s, %s), " - "events_consumed=%d, event_uris=%s", - dag.dag_id, - exec_date, - previous_dag_run.execution_date if previous_dag_run else None, - data_interval.start, - data_interval.end, - len(dataset_events), - sorted({e.dataset.uri for e in dataset_events}), - ) - if dataset_events: - event_timestamps = [e.timestamp for e in dataset_events] - self.log.debug( - "[DEBUG DATASETS] Consumed event details: dag_id=%s, " - "event_ts_range=(%s, %s), " - "events=[%s]", - dag.dag_id, - min(event_timestamps), - max(event_timestamps), - ", ".join( - f"{e.dataset.uri}|ts={e.timestamp}|src={e.source_dag_id}/{e.source_run_id}" - for e in dataset_events - ), - ) session.execute( delete(DatasetDagRunQueue).where(DatasetDagRunQueue.target_dag_id == dag_run.dag_id) ) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 80b48cb5cf25e..fdcab7bdaf64d 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4109,20 +4109,10 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] - dataset_condition = ser_dag.dag.timetable.dataset_condition - if not dag_ready(dag_id, cond=dataset_condition, statuses=statuses): + if not dag_ready(dag_id, cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses): del by_dag[dag_id] del dag_statuses[dag_id] - else: - log.info( - "[DEBUG DATASETS] Dataset condition satisfied: dag_id=%s, condition=%s, " - "ddrq_uris=%s, ddrq_count=%d", - dag_id, - dataset_condition, - sorted(statuses.keys()), - len(statuses), - ) del dag_statuses dataset_triggered_dag_info = {} for dag_id, records in by_dag.items(): From 872c56c8f058607a111bafb2f4ef6754a2683e9a Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Thu, 26 Mar 2026 17:30:02 -0300 Subject: [PATCH 08/11] fix(datasets): demote missing SerializedDagModel DDRQ log to debug Log the DDRQ-without-serialization case at debug and remove the [DEBUG DATASETS] prefix; drop redundant del of missing_from_serialized. Tests capture DEBUG, match the new message, and assert dataset_dag_run_queue rows remain after dags_needing_dagruns. --- airflow/models/dag.py | 5 ++--- tests/models/test_dag.py | 29 ++++++++++++++++++++++------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index fdcab7bdaf64d..92d729a34e5dd 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4097,15 +4097,14 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: ser_dag_ids = {s.dag_id for s in ser_dags} missing_from_serialized = set(by_dag.keys()) - ser_dag_ids if missing_from_serialized: - log.warning( - "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel " + log.debug( + "DAGs in DDRQ but missing SerializedDagModel " "(skipping — condition cannot be evaluated): %s", sorted(missing_from_serialized), ) for dag_id in missing_from_serialized: del by_dag[dag_id] del dag_statuses[dag_id] - del missing_from_serialized for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 9b6d623cc878b..acb18cd08a577 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3065,7 +3065,12 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session): assert dag_models == [dag_model] def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, session, caplog): - """DDRQ rows for a dag_id without SerializedDagModel must be skipped (no dataset_triggered info).""" + """DDRQ rows for a dag_id without SerializedDagModel must be skipped (no dataset_triggered info). + + Rows must remain in ``dataset_dag_run_queue`` so the scheduler can re-evaluate on a later + heartbeat once ``SerializedDagModel`` exists (``dags_needing_dagruns`` only drops them from + the in-memory candidate set, it does not delete ORM rows). + """ orphan_dag_id = "ddr_q_no_serialized_dag" session.add(DatasetModel(uri="dataset_for_orphan_ddrq")) session.flush() @@ -3085,12 +3090,18 @@ def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, sessi session.add(DatasetDagRunQueue(dataset_id=dataset_id, target_dag_id=orphan_dag_id)) session.flush() - with caplog.at_level(logging.WARNING, logger="airflow.models.dag"): + with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"): _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) assert orphan_dag_id not in dataset_triggered_dag_info - assert "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel" in caplog.text + assert "DAGs in DDRQ but missing SerializedDagModel" in caplog.text assert orphan_dag_id in caplog.text + assert ( + session.query(DatasetDagRunQueue) + .filter(DatasetDagRunQueue.target_dag_id == orphan_dag_id) + .count() + == 1 + ) def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(self, session, caplog): """When multiple dags lack SerializedDagModel, the warning lists dag_ids sorted.""" @@ -3133,17 +3144,21 @@ def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(se ) session.flush() - with caplog.at_level(logging.WARNING, logger="airflow.models.dag"): + with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"): _query, dataset_triggered_dag_info = DagModel.dags_needing_dagruns(session) assert "ghost_a" not in dataset_triggered_dag_info assert "ghost_z" not in dataset_triggered_dag_info msg = next( - r.message - for r in caplog.records - if "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel" in r.message + r.message for r in caplog.records if "DAGs in DDRQ but missing SerializedDagModel" in r.message ) assert msg.index("ghost_a") < msg.index("ghost_z") + assert ( + session.query(DatasetDagRunQueue) + .filter(DatasetDagRunQueue.target_dag_id.in_(("ghost_a", "ghost_z"))) + .count() + == 2 + ) def test_dags_needing_dagruns_dataset_aliases(self, dag_maker, session): # link dataset_alias hello_alias to dataset hello From 5065888d433dde1448cea3dbb50936182698f490 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Thu, 26 Mar 2026 17:40:13 -0300 Subject: [PATCH 09/11] chore(datasets): debug-log when dataset condition passes in dags_needing_dagruns --- airflow/models/dag.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 92d729a34e5dd..310c791a45c7a 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4108,10 +4108,19 @@ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None: for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] + dataset_condition = ser_dag.dag.timetable.dataset_condition - if not dag_ready(dag_id, cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses): + if not dag_ready(dag_id, cond=dataset_condition, statuses=statuses): del by_dag[dag_id] del dag_statuses[dag_id] + else: + log.debug( + "Dataset condition satisfied: dag_id=%s, condition=%s, ddrq_uris=%s, ddrq_count=%d", + dag_id, + dataset_condition, + sorted(statuses.keys()), + len(statuses), + ) del dag_statuses dataset_triggered_dag_info = {} for dag_id, records in by_dag.items(): From 6156c5779a0dbd3d944312a9d9af1e8a97da9124 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Thu, 26 Mar 2026 18:12:14 -0300 Subject: [PATCH 10/11] docs: add bugfix newsfragment and dags_needing_dagruns dataset docstring --- airflow/models/dag.py | 5 +++++ newsfragments/63546.bugfix.rst | 1 + 2 files changed, 6 insertions(+) create mode 100644 newsfragments/63546.bugfix.rst diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 310c791a45c7a..35ea26cb54be8 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -4069,6 +4069,11 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[ This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query, you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked. + + For dataset-triggered scheduling, DAGs that have ``DatasetDagRunQueue`` rows but no matching + ``SerializedDagModel`` row are omitted from the returned ``dataset_triggered_dag_info`` until + serialization exists; queue rows are **not** deleted here so the scheduler can re-evaluate on a + later run. """ from airflow.models.serialized_dag import SerializedDagModel diff --git a/newsfragments/63546.bugfix.rst b/newsfragments/63546.bugfix.rst new file mode 100644 index 0000000000000..666d5db207a58 --- /dev/null +++ b/newsfragments/63546.bugfix.rst @@ -0,0 +1 @@ +Fix premature dataset-triggered DagRuns when ``SerializedDagModel`` was missing while ``DatasetDagRunQueue`` still had rows for that DAG; queue entries are kept for the next evaluation. From e4d014e9025d05ab04ca5f2db12412b0e95324d6 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Fri, 27 Mar 2026 19:16:13 -0300 Subject: [PATCH 11/11] test(dag): persist DagModel before DatasetDagRunQueue in unit tests Split DagModel and DatasetDagRunQueue inserts and flush after DagModel so foreign-key order matches production DB constraints in TestDagModel. --- tests/models/test_dag.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index acb18cd08a577..87212b13943d2 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3087,6 +3087,7 @@ def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, sessi is_paused=False, ) ) + session.flush() session.add(DatasetDagRunQueue(dataset_id=dataset_id, target_dag_id=orphan_dag_id)) session.flush() @@ -3138,6 +3139,12 @@ def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(se has_import_errors=False, is_paused=False, ), + ] + ) + session.flush() + + session.add_all( + [ DatasetDagRunQueue(dataset_id=ds_z_id, target_dag_id="ghost_z"), DatasetDagRunQueue(dataset_id=ds_a_id, target_dag_id="ghost_a"), ]