@@ -3065,7 +3065,12 @@ def test_dags_needing_dagruns_datasets(self, dag_maker, session):
30653065 assert dag_models == [dag_model ]
30663066
30673067 def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing (self , session , caplog ):
3068- """DDRQ rows for a dag_id without SerializedDagModel must be skipped (no dataset_triggered info)."""
3068+ """DDRQ rows for a dag_id without SerializedDagModel must be skipped (no dataset_triggered info).
3069+
3070+ Rows must remain in ``dataset_dag_run_queue`` so the scheduler can re-evaluate on a later
3071+ heartbeat once ``SerializedDagModel`` exists (``dags_needing_dagruns`` only drops them from
3072+ the in-memory candidate set, it does not delete ORM rows).
3073+ """
30693074 orphan_dag_id = "ddr_q_no_serialized_dag"
30703075 session .add (DatasetModel (uri = "dataset_for_orphan_ddrq" ))
30713076 session .flush ()
@@ -3085,12 +3090,18 @@ def test_dags_needing_dagruns_skips_ddrq_when_serialized_dag_missing(self, sessi
30853090 session .add (DatasetDagRunQueue (dataset_id = dataset_id , target_dag_id = orphan_dag_id ))
30863091 session .flush ()
30873092
3088- with caplog .at_level (logging .WARNING , logger = "airflow.models.dag" ):
3093+ with caplog .at_level (logging .DEBUG , logger = "airflow.models.dag" ):
30893094 _query , dataset_triggered_dag_info = DagModel .dags_needing_dagruns (session )
30903095
30913096 assert orphan_dag_id not in dataset_triggered_dag_info
3092- assert "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel" in caplog .text
3097+ assert "DAGs in DDRQ but missing SerializedDagModel" in caplog .text
30933098 assert orphan_dag_id in caplog .text
3099+ assert (
3100+ session .query (DatasetDagRunQueue )
3101+ .filter (DatasetDagRunQueue .target_dag_id == orphan_dag_id )
3102+ .count ()
3103+ == 1
3104+ )
30943105
30953106 def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids (self , session , caplog ):
30963107 """When multiple dags lack SerializedDagModel, the warning lists dag_ids sorted."""
@@ -3133,17 +3144,21 @@ def test_dags_needing_dagruns_missing_serialized_warning_lists_sorted_dag_ids(se
31333144 )
31343145 session .flush ()
31353146
3136- with caplog .at_level (logging .WARNING , logger = "airflow.models.dag" ):
3147+ with caplog .at_level (logging .DEBUG , logger = "airflow.models.dag" ):
31373148 _query , dataset_triggered_dag_info = DagModel .dags_needing_dagruns (session )
31383149
31393150 assert "ghost_a" not in dataset_triggered_dag_info
31403151 assert "ghost_z" not in dataset_triggered_dag_info
31413152 msg = next (
3142- r .message
3143- for r in caplog .records
3144- if "[DEBUG DATASETS] DAGs in DDRQ but missing SerializedDagModel" in r .message
3153+ r .message for r in caplog .records if "DAGs in DDRQ but missing SerializedDagModel" in r .message
31453154 )
31463155 assert msg .index ("ghost_a" ) < msg .index ("ghost_z" )
3156+ assert (
3157+ session .query (DatasetDagRunQueue )
3158+ .filter (DatasetDagRunQueue .target_dag_id .in_ (("ghost_a" , "ghost_z" )))
3159+ .count ()
3160+ == 2
3161+ )
31473162
31483163 def test_dags_needing_dagruns_dataset_aliases (self , dag_maker , session ):
31493164 # link dataset_alias hello_alias to dataset hello
0 commit comments