From 6dc525830ca30513de4cbd00df727b9da659e6e5 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Fri, 27 Mar 2026 10:25:51 -0300 Subject: [PATCH 1/3] fix(scheduler): skip asset-triggered dags without SerializedDagModel in dags_needing_dagruns Remove those dag_ids from the in-memory candidate set until serialization exists; retain AssetDagRunQueue rows and emit DEBUG logs. Add unit tests and a bugfix newsfragment. --- airflow-core/src/airflow/models/dag.py | 14 +++ airflow-core/tests/unit/models/test_dag.py | 116 ++++++++++++++++++++- 2 files changed, 129 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index c98f15af9104d..05cda28e50e42 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -631,6 +631,10 @@ def dags_needing_dagruns(cls, session: Session) -> tuple[Any, dict[str, datetime you should ensure that any scheduling decisions are made in a single transaction -- as soon as the transaction is committed it will be unlocked. + For asset-triggered scheduling, DAGs that have ``AssetDagRunQueue`` rows but no matching + ``SerializedDagModel`` row are omitted from ``triggered_date_by_dag`` until serialization exists; + queue rows are **not** deleted here so the scheduler can re-evaluate on a later run. + :meta private: """ from airflow.models.serialized_dag import SerializedDagModel @@ -677,6 +681,16 @@ def dag_ready(dag_id: str, cond: SerializedAssetBase, statuses: dict[UKey, bool] for dag_id, adrqs in adrq_by_dag.items() } ser_dags = SerializedDagModel.get_latest_serialized_dags(dag_ids=list(dag_statuses), session=session) + ser_dag_ids = {ser_dag.dag_id for ser_dag in ser_dags} + missing_from_serialized = set(adrq_by_dag.keys()) - ser_dag_ids + if missing_from_serialized: + log.debug( + "DAGs in ADRQ but missing SerializedDagModel (skipping — condition cannot be evaluated): %s", + sorted(missing_from_serialized), + ) + for dag_id in missing_from_serialized: + del adrq_by_dag[dag_id] + del dag_statuses[dag_id] for ser_dag in ser_dags: dag_id = ser_dag.dag_id statuses = dag_statuses[dag_id] diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index 046c85ea79901..da0500ed8deca 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -33,7 +33,7 @@ import pendulum import pytest import time_machine -from sqlalchemy import delete, inspect, select, update +from sqlalchemy import delete, func, inspect, select, update from airflow import settings from airflow._shared.module_loading import qualname @@ -2047,6 +2047,120 @@ def test_dags_needing_dagruns_assets(self, dag_maker, session): dag_models = query.all() assert dag_models == [dag_model] + def test_dags_needing_dagruns_skips_adrq_when_serialized_dag_missing( + self, session, caplog, testing_dag_bundle + ): + """ADRQ rows for a dag_id without SerializedDagModel must be skipped (no triggered_date_by_dag). + + Rows must remain in ``asset_dag_run_queue`` so the scheduler can re-evaluate on a later run once + ``SerializedDagModel`` exists (``dags_needing_dagruns`` only drops them from the in-memory + candidate set, it does not delete ORM rows). + """ + orphan_dag_id = "adrq_no_serialized_dag" + orphan_uri = "test://asset_for_orphan_adrq" + session.add(AssetModel(uri=orphan_uri)) + session.flush() + asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == orphan_uri)) + session.add( + DagModel( + dag_id=orphan_dag_id, + bundle_name="testing", + max_active_tasks=1, + has_task_concurrency_limits=False, + max_consecutive_failed_dag_runs=0, + next_dagrun=timezone.datetime(2038, 1, 1), + next_dagrun_create_after=timezone.datetime(2038, 1, 2), + is_stale=False, + has_import_errors=False, + is_paused=False, + asset_expression={"any": [{"uri": orphan_uri}]}, + ) + ) + session.add(AssetDagRunQueue(asset_id=asset_id, target_dag_id=orphan_dag_id)) + session.flush() + + with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"): + _query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session) + + assert orphan_dag_id not in triggered_date_by_dag + assert "DAGs in ADRQ but missing SerializedDagModel" in caplog.text + assert orphan_dag_id in caplog.text + assert ( + session.scalar( + select(func.count()) + .select_from(AssetDagRunQueue) + .where(AssetDagRunQueue.target_dag_id == orphan_dag_id) + ) + == 1 + ) + + def test_dags_needing_dagruns_missing_serialized_debug_lists_sorted_dag_ids( + self, session, caplog, testing_dag_bundle + ): + """When multiple dags lack SerializedDagModel, the debug log lists dag_ids sorted.""" + session.add_all( + [ + AssetModel(uri="test://ds_ghost_z"), + AssetModel(uri="test://ds_ghost_a"), + ] + ) + session.flush() + id_z = session.scalar(select(AssetModel.id).where(AssetModel.uri == "test://ds_ghost_z")) + id_a = session.scalar(select(AssetModel.id).where(AssetModel.uri == "test://ds_ghost_a")) + far = timezone.datetime(2038, 1, 1) + far_after = timezone.datetime(2038, 1, 2) + session.add_all( + [ + DagModel( + dag_id="ghost_z", + bundle_name="testing", + max_active_tasks=1, + has_task_concurrency_limits=False, + max_consecutive_failed_dag_runs=0, + next_dagrun=far, + next_dagrun_create_after=far_after, + is_stale=False, + has_import_errors=False, + is_paused=False, + asset_expression={"any": [{"uri": "test://ds_ghost_z"}]}, + ), + DagModel( + dag_id="ghost_a", + bundle_name="testing", + max_active_tasks=1, + has_task_concurrency_limits=False, + max_consecutive_failed_dag_runs=0, + next_dagrun=far, + next_dagrun_create_after=far_after, + is_stale=False, + has_import_errors=False, + is_paused=False, + asset_expression={"any": [{"uri": "test://ds_ghost_a"}]}, + ), + AssetDagRunQueue(asset_id=id_z, target_dag_id="ghost_z"), + AssetDagRunQueue(asset_id=id_a, target_dag_id="ghost_a"), + ] + ) + session.flush() + + with caplog.at_level(logging.DEBUG, logger="airflow.models.dag"): + _query, triggered_date_by_dag = DagModel.dags_needing_dagruns(session) + + assert "ghost_a" not in triggered_date_by_dag + assert "ghost_z" not in triggered_date_by_dag + msg = next( + r.message for r in caplog.records if "DAGs in ADRQ but missing SerializedDagModel" in r.message + ) + assert msg.index("ghost_a") < msg.index("ghost_z") + assert ( + session.scalar( + select(func.count()) + .select_from(AssetDagRunQueue) + .where(AssetDagRunQueue.target_dag_id.in_(("ghost_a", "ghost_z"))) + ) + == 2 + ) + def test_dags_needing_dagruns_query_count(self, dag_maker, session): """Test that dags_needing_dagruns avoids N+1 on adrq.asset access.""" num_assets = 10 From a4fe14813d801a7e7cbca0b10af7e4992d7b92f8 Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Fri, 27 Mar 2026 15:27:25 -0300 Subject: [PATCH 2/3] fix(scheduler): prevent premature asset-triggered DagRuns when SerializedDagModel is unavailable --- airflow-core/newsfragments/64322.bugfix.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 airflow-core/newsfragments/64322.bugfix.rst diff --git a/airflow-core/newsfragments/64322.bugfix.rst b/airflow-core/newsfragments/64322.bugfix.rst new file mode 100644 index 0000000000000..c59a9fceed449 --- /dev/null +++ b/airflow-core/newsfragments/64322.bugfix.rst @@ -0,0 +1 @@ +Fix premature asset-triggered DagRuns when ``AssetDagRunQueue`` had rows but ``SerializedDagModel`` was not yet available; the scheduler now skips those DAGs until serialization exists. From 5f4bd8a39c8e0eb1b6fc46d29779d3b939b90f6c Mon Sep 17 00:00:00 2001 From: Leonardo Soares Date: Fri, 27 Mar 2026 19:08:04 -0300 Subject: [PATCH 3/3] test(dag): persist DagModel before AssetDagRunQueue in unit tests Split DagModel and AssetDagRunQueue inserts and flush after DagModel so foreign-key order matches production DB constraints in TestDagModel. --- airflow-core/tests/unit/models/test_dag.py | 36 +++++++++++++--------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index da0500ed8deca..528749457f971 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -2061,21 +2061,23 @@ def test_dags_needing_dagruns_skips_adrq_when_serialized_dag_missing( session.add(AssetModel(uri=orphan_uri)) session.flush() asset_id = session.scalar(select(AssetModel.id).where(AssetModel.uri == orphan_uri)) - session.add( - DagModel( - dag_id=orphan_dag_id, - bundle_name="testing", - max_active_tasks=1, - has_task_concurrency_limits=False, - max_consecutive_failed_dag_runs=0, - next_dagrun=timezone.datetime(2038, 1, 1), - next_dagrun_create_after=timezone.datetime(2038, 1, 2), - is_stale=False, - has_import_errors=False, - is_paused=False, - asset_expression={"any": [{"uri": orphan_uri}]}, - ) + + dag_model = DagModel( + dag_id=orphan_dag_id, + bundle_name="testing", + max_active_tasks=1, + has_task_concurrency_limits=False, + max_consecutive_failed_dag_runs=0, + next_dagrun=timezone.datetime(2038, 1, 1), + next_dagrun_create_after=timezone.datetime(2038, 1, 2), + is_stale=False, + has_import_errors=False, + is_paused=False, + asset_expression={"any": [{"uri": orphan_uri}]}, ) + session.add(dag_model) + session.flush() + session.add(AssetDagRunQueue(asset_id=asset_id, target_dag_id=orphan_dag_id)) session.flush() @@ -2137,6 +2139,12 @@ def test_dags_needing_dagruns_missing_serialized_debug_lists_sorted_dag_ids( is_paused=False, asset_expression={"any": [{"uri": "test://ds_ghost_a"}]}, ), + ] + ) + session.flush() + + session.add_all( + [ AssetDagRunQueue(asset_id=id_z, target_dag_id="ghost_z"), AssetDagRunQueue(asset_id=id_a, target_dag_id="ghost_a"), ]