Skip to content

Commit ba501b0

Browse files
committed
fix: skip scheduling instead of bulk-failing when serialized DAG is transiently missing
When the scheduler cannot find a DAG in the serialized_dag table while checking task concurrency limits, it previously set all SCHEDULED task instances for that DAG to FAILED via a bulk UPDATE. This caused intermittent mass task failures on multi-scheduler setups (e.g. MWAA) where the serialized DAG may be transiently absent during a DAG file parse cycle. Instead, treat the missing serialized DAG as a transient condition: log a warning, add the dag_id to starved_dags so subsequent tasks for the same DAG are also skipped, and let the scheduler retry on the next iteration. The existing test 'test_queued_task_instances_fails_with_missing_dag' has been updated to reflect the new expected behavior (tasks remain SCHEDULED). A new regression test 'test_missing_serialized_dag_does_not_bulk_fail_tasks' explicitly verifies no bulk-failure occurs. Closes #62050
1 parent 0f68191 commit ba501b0

2 files changed

Lines changed: 59 additions & 12 deletions

File tree

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -769,19 +769,17 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -
769769
serialized_dag = self.scheduler_dag_bag.get_dag_for_run(
770770
dag_run=task_instance.dag_run, session=session
771771
)
772-
# If the dag is missing, fail the task and continue to the next task.
772+
# If the dag is transiently missing, skip scheduling it this iteration
773+
# and try again next time instead of bulk-failing all scheduled tasks.
774+
# See: https://github.com/apache/airflow/issues/62050
773775
if not serialized_dag:
774-
self.log.error(
775-
"DAG '%s' for task instance %s not found in serialized_dag table",
776+
self.log.warning(
777+
"DAG '%s' for task instance %s not found in serialized_dag table, "
778+
"skipping scheduling for this iteration and will retry next time",
776779
dag_id,
777780
task_instance,
778781
)
779-
session.execute(
780-
update(TI)
781-
.where(TI.dag_id == dag_id, TI.state == TaskInstanceState.SCHEDULED)
782-
.values(state=TaskInstanceState.FAILED)
783-
.execution_options(synchronize_session="fetch")
784-
)
782+
starved_dags.add(dag_id)
785783
continue
786784

787785
task_concurrency_limit: int | None = None

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1806,8 +1806,14 @@ def test_find_executable_task_instances_in_default_pool(self, dag_maker, mock_ex
18061806
session.rollback()
18071807
session.close()
18081808

1809-
def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
1810-
"""Check that task instances of missing DAGs are failed"""
1809+
def test_queued_task_instances_skips_with_missing_dag(self, dag_maker, session):
1810+
"""Check that task instances of transiently missing DAGs are skipped, not bulk-failed.
1811+
1812+
When the serialized DAG is transiently missing (e.g. during a DAG file parse cycle),
1813+
the scheduler should skip scheduling for that DAG in the current iteration and retry
1814+
next time, rather than bulk-failing all SCHEDULED task instances.
1815+
See: https://github.com/apache/airflow/issues/62050
1816+
"""
18111817
dag_id = "SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag"
18121818
task_id_1 = "dummy"
18131819
task_id_2 = "dummydummy"
@@ -1831,10 +1837,53 @@ def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
18311837
session.flush()
18321838
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
18331839
session.flush()
1840+
# No tasks should be queued
18341841
assert len(res) == 0
1842+
# Tasks should remain SCHEDULED (not be bulk-failed)
18351843
tis = dr.get_task_instances(session=session)
18361844
assert len(tis) == 2
1837-
assert all(ti.state == State.FAILED for ti in tis)
1845+
assert all(ti.state == State.SCHEDULED for ti in tis)
1846+
1847+
def test_missing_serialized_dag_does_not_bulk_fail_tasks(self, dag_maker, session):
1848+
"""Regression test for https://github.com/apache/airflow/issues/62050
1849+
1850+
When the serialized DAG is transiently missing, all SCHEDULED task instances for the DAG
1851+
should remain SCHEDULED so the scheduler can pick them up in the next iteration.
1852+
Previously, the scheduler would bulk-fail all SCHEDULED tasks when it couldn't find the
1853+
serialized DAG in the DagBag.
1854+
"""
1855+
dag_id = "SchedulerJobTest.test_missing_serialized_dag_bulk_fails"
1856+
1857+
with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 2}):
1858+
EmptyOperator(task_id="task_a")
1859+
EmptyOperator(task_id="task_b")
1860+
1861+
scheduler_job = Job()
1862+
self.job_runner = SchedulerJobRunner(job=scheduler_job)
1863+
1864+
# Simulate serialized DAG being transiently missing
1865+
self.job_runner.scheduler_dag_bag = mock.MagicMock()
1866+
self.job_runner.scheduler_dag_bag.get_dag_for_run.return_value = None
1867+
1868+
dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
1869+
for ti in dr.task_instances:
1870+
ti.state = State.SCHEDULED
1871+
session.merge(ti)
1872+
session.flush()
1873+
1874+
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
1875+
session.flush()
1876+
1877+
# No tasks should be queued since serialized DAG is missing
1878+
assert len(res) == 0
1879+
# All tasks should remain SCHEDULED — not FAILED
1880+
tis = dr.get_task_instances(session=session)
1881+
assert len(tis) == 2
1882+
for ti in tis:
1883+
assert ti.state == State.SCHEDULED, (
1884+
f"Task {ti.task_id} was {ti.state} but expected SCHEDULED. "
1885+
"The scheduler should not bulk-fail tasks when serialized DAG is transiently missing."
1886+
)
18381887

18391888
def test_nonexistent_pool(self, dag_maker):
18401889
dag_id = "SchedulerJobTest.test_nonexistent_pool"

0 commit comments

Comments
 (0)