Skip to content
1 change: 1 addition & 0 deletions airflow-core/newsfragments/63692.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed DAG-level on_failure_callback not firing when DagRunContext creation failed due to ORM session errors.
16 changes: 8 additions & 8 deletions airflow-core/src/airflow/callbacks/callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from sqlalchemy import inspect as sa_inspect
from sqlalchemy.exc import NoInspectionAvailable
from sqlalchemy.orm.attributes import set_committed_value
from sqlalchemy.orm.exc import DetachedInstanceError

from airflow.api_fastapi.execution_api.datamodels import taskinstance as ti_datamodel # noqa: TC001
from airflow.utils.state import TaskInstanceState
Expand Down Expand Up @@ -116,19 +115,20 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s
except NoInspectionAvailable:
return values

# Relationship access may raise DetachedInstanceError; on that path, reload DagRun
# from the DB to avoid crashing the scheduler.
# Relationship access may raise DetachedInstanceError or other SQLAlchemy
# exceptions (e.g. InvalidRequestError when the session is closed); on that
# path, reload the DagRun from the DB to avoid crashing the scheduler.
try:
events = dag_run.consumed_asset_events
set_committed_value(
dag_run,
"consumed_asset_events",
list(events) if events is not None else [],
)
except DetachedInstanceError:
except Exception:
log.warning(
"DagRunContext encountered DetachedInstanceError while accessing "
"consumed_asset_events; reloading DagRun from DB."
"DagRunContext failed to access consumed_asset_events; reloading DagRun from DB.",
exc_info=True,
)
from sqlalchemy import select
from sqlalchemy.orm import selectinload
Expand All @@ -137,8 +137,8 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s
from airflow.models.dagrun import DagRun
from airflow.utils.session import create_session

# Defensive guardrail: reload DagRun with eager-loaded relationships on
# DetachedInstanceError to recover state without adding DB I/O to the hot path.
# Reload DagRun with eager-loaded relationships to recover state
# without adding DB I/O to the hot path.
with create_session() as session:
dag_run_reloaded = session.scalar(
select(DagRun)
Expand Down
9 changes: 9 additions & 0 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,15 @@ def _fetch_callbacks(
]
for callback in callbacks:
req = callback.get_callback_request()
if req.bundle_name not in bundle_names:
self.log.warning(
"Callback for dag_id=%s has bundle_name=%r which is not served by "
"this DAG processor (serving bundles: %s). Skipping.",
getattr(req, "dag_id", "unknown"),
req.bundle_name,
bundle_names,
)
continue
try:
callback_queue.append(req)
session.delete(callback)
Expand Down
6 changes: 6 additions & 0 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2439,6 +2439,12 @@ def _send_dag_callbacks_to_processor(
callback: DagCallbackRequest | None = None,
) -> None:
if callback:
self.log.info(
"Sending %s callback request for dag_id=%s, run_id=%s to DAG Processor",
"failure" if callback.is_failure_callback else "success",
callback.dag_id,
callback.run_id,
)
self.executor.send_callback(callback)
else:
self.log.debug("callback is empty")
Expand Down
18 changes: 14 additions & 4 deletions airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1331,16 +1331,26 @@ def produce_dag_callback(
) -> DagCallbackRequest | None:
"""Create a callback request for the DAG, or execute the callbacks directly if instructed, and return None."""
if not execute:
try:
context_from_server = DagRunContext(
dag_run=self,
last_ti=relevant_ti,
)
except Exception:
self.log.exception(
"Failed to build DagRunContext for dag_id=%s run_id=%s; "
"sending callback with minimal context",
self.dag_id,
self.run_id,
)
context_from_server = None
return DagCallbackRequest(
filepath=self.dag_model.relative_fileloc,
dag_id=self.dag_id,
run_id=self.run_id,
bundle_name=self.dag_model.bundle_name,
bundle_version=self.bundle_version,
context_from_server=DagRunContext(
dag_run=self,
last_ti=relevant_ti,
),
context_from_server=context_from_server,
is_failure_callback=(not success),
msg=reason,
)
Expand Down
7 changes: 3 additions & 4 deletions airflow-core/tests/unit/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,8 +1202,8 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path, configure_te
assert len(session.scalars(select(DbCallbackRequest)).all()) == 1

@conf_vars({("core", "load_examples"): "False"})
def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundle):
"""Ensure callbacks for bundles not owned by current dag processor manager are ignored and not deleted."""
def test_fetch_callbacks_skips_other_bundles_with_warning(self, configure_testing_dag_bundle):
"""Callbacks for bundles not served by this processor are skipped with a warning log."""

dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"

Expand Down Expand Up @@ -1239,10 +1239,9 @@ def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundl
# Only the matching callback should be returned
assert [c.run_id for c in callbacks] == ["match"]

# The non-matching callback should remain in the DB
# The non-matching callback should remain in the DB for the correct processor
remaining = session.scalars(select(DbCallbackRequest)).all()
assert len(remaining) == 1
# Decode remaining request and verify it's for the other bundle
remaining_req = remaining[0].get_callback_request()
assert remaining_req.bundle_name == "other-bundle"

Expand Down
40 changes: 40 additions & 0 deletions airflow-core/tests/unit/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,46 @@ def on_failure_callable(context):
),
)

def test_produce_dag_callback_resilient_to_context_failure(self, testing_dag_bundle, dag_maker, session):
"""produce_dag_callback should still return a callback even when DagRunContext creation fails."""

def on_failure_callable(context):
pass

relative_fileloc = "test_produce_dag_callback_resilient.py"
with dag_maker(
dag_id="test_produce_dag_callback_resilient",
schedule=datetime.timedelta(days=1),
start_date=datetime.datetime(2017, 1, 1),
on_failure_callback=on_failure_callable,
) as dag:
EmptyOperator(task_id="task1")
dm = DagModel.get_dagmodel(dag.dag_id, session=session)
dm.relative_fileloc = relative_fileloc
session.merge(dm)
session.commit()

initial_task_states = {"task1": TaskInstanceState.FAILED}
dag.relative_fileloc = relative_fileloc
SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="dag_maker")
session.commit()

dag_run = self.create_dag_run(dag=dag, task_states=initial_task_states, session=session)
dag_run.dag_model = dm

# Patch DagRunContext to raise an exception during construction
with mock.patch(
"airflow.models.dagrun.DagRunContext", side_effect=RuntimeError("Simulated context failure")
):
_, callback = dag_run.update_state(execute_callbacks=False)

assert dag_run.state == DagRunState.FAILED
# Callback should still be produced with context_from_server=None
assert callback is not None
assert callback.dag_id == "test_produce_dag_callback_resilient"
assert callback.is_failure_callback is True
assert callback.context_from_server is None

def test_dagrun_set_state_end_date(self, dag_maker, session):
with dag_maker(schedule=datetime.timedelta(days=1), start_date=DEFAULT_DATE):
pass
Expand Down
Loading