Skip to content

Commit dcaf372

Browse files
committed
Fix backfill marked complete before DagRuns are created (#61375)
The scheduler's _mark_backfills_complete() could mark a backfill as completed during the window between the Backfill row commit and DagRun creation. Add an EXISTS guard on BackfillDagRun so backfills still being initialized are skipped.
1 parent 04b921b commit dcaf372

2 files changed

Lines changed: 60 additions & 2 deletions

File tree

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
TaskInletAssetReference,
8585
TaskOutletAssetReference,
8686
)
87-
from airflow.models.backfill import Backfill
87+
from airflow.models.backfill import Backfill, BackfillDagRun
8888
from airflow.models.callback import Callback
8989
from airflow.models.dag import DagModel
9090
from airflow.models.dag_version import DagVersion
@@ -1895,6 +1895,9 @@ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None:
18951895
# todo: AIP-78 simplify this function to an update statement
18961896
query = select(Backfill).where(
18971897
Backfill.completed_at.is_(None),
1898+
# Guard: backfill must have at least one BackfillDagRun association,
1899+
# otherwise it is still being set up (see #61375).
1900+
exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id == Backfill.id)),
18981901
~exists(
18991902
select(DagRun.id).where(
19001903
and_(DagRun.backfill_id == Backfill.id, DagRun.state.in_(unfinished_states))
@@ -1903,9 +1906,13 @@ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None:
19031906
)
19041907
backfills = list(session.scalars(query))
19051908
if not backfills:
1909+
self.log.debug("No backfills to mark as complete.")
19061910
return
19071911
self.log.info("marking %s backfills as complete", len(backfills))
19081912
for b in backfills:
1913+
self.log.info(
1914+
"Marking backfill %s (dag_id=%s) as complete at %s", b.id, b.dag_id, now
1915+
)
19091916
b.completed_at = now
19101917

19111918
@add_debug_span

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
AssetPartitionDagRun,
6565
PartitionedAssetKeyLog,
6666
)
67-
from airflow.models.backfill import Backfill, _create_backfill
67+
from airflow.models.backfill import Backfill, BackfillDagRun, ReprocessBehavior, _create_backfill
6868
from airflow.models.dag import DagModel, get_last_dagrun, infer_automated_data_interval
6969
from airflow.models.dag_version import DagVersion
7070
from airflow.models.dagbundle import DagBundleModel
@@ -8688,6 +8688,57 @@ def test_mark_backfills_completed(dag_maker, session):
86888688
assert b.completed_at.timestamp() > 0
86898689

86908690

8691+
def test_mark_backfills_complete_skips_initializing_backfill(dag_maker, session):
8692+
clear_db_backfills()
8693+
dag_id = "test_backfill_race_lifecycle"
8694+
with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
8695+
BashOperator(task_id="hi", bash_command="echo hi")
8696+
b = Backfill(
8697+
dag_id=dag_id,
8698+
from_date=pendulum.parse("2021-01-01"),
8699+
to_date=pendulum.parse("2021-01-03"),
8700+
max_active_runs=10,
8701+
dag_run_conf={},
8702+
reprocess_behavior=ReprocessBehavior.NONE,
8703+
)
8704+
session.add(b)
8705+
session.commit()
8706+
backfill_id = b.id
8707+
session.expunge_all()
8708+
runner = SchedulerJobRunner(
8709+
job=Job(job_type=SchedulerJobRunner.job_type), executors=[MockExecutor(do_update=False)]
8710+
)
8711+
runner._mark_backfills_complete()
8712+
b = session.get(Backfill, backfill_id)
8713+
assert b.completed_at is None
8714+
session.expunge_all()
8715+
dr = DagRun(
8716+
dag_id=dag_id,
8717+
run_id="backfill__2021-01-01T00:00:00+00:00",
8718+
run_type=DagRunType.BACKFILL_JOB,
8719+
logical_date=pendulum.parse("2021-01-01"),
8720+
data_interval=(pendulum.parse("2021-01-01"), pendulum.parse("2021-01-02")),
8721+
run_after=pendulum.parse("2021-01-02"),
8722+
state=DagRunState.SUCCESS,
8723+
backfill_id=backfill_id,
8724+
)
8725+
session.add(dr)
8726+
session.flush()
8727+
session.add(
8728+
BackfillDagRun(
8729+
backfill_id=backfill_id,
8730+
dag_run_id=dr.id,
8731+
logical_date=pendulum.parse("2021-01-01"),
8732+
sort_ordinal=1,
8733+
)
8734+
)
8735+
session.commit()
8736+
session.expunge_all()
8737+
runner._mark_backfills_complete()
8738+
b = session.get(Backfill, backfill_id)
8739+
assert b.completed_at is not None
8740+
8741+
86918742
class Key1Mapper(CorePartitionMapper):
86928743
"""Partition Mapper that returns only key-1 as downstream key"""
86938744

0 commit comments

Comments
 (0)