From 9f7626ff008b13e723236d6d55c083d686fcdaf0 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:25:33 -0400 Subject: [PATCH 1/7] Broadened exception handling in DagRunContext validator Previously only DetachedInstanceError was caught when accessing consumed_asset_events on ORM DagRun objects. Other SQLAlchemy exceptions (e.g. InvalidRequestError) crashed the scheduler. closes: #63374 --- .../src/airflow/callbacks/callback_requests.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/callbacks/callback_requests.py b/airflow-core/src/airflow/callbacks/callback_requests.py index ce48438f0e70a..25cb639534901 100644 --- a/airflow-core/src/airflow/callbacks/callback_requests.py +++ b/airflow-core/src/airflow/callbacks/callback_requests.py @@ -24,7 +24,6 @@ from sqlalchemy import inspect as sa_inspect from sqlalchemy.exc import NoInspectionAvailable from sqlalchemy.orm.attributes import set_committed_value -from sqlalchemy.orm.exc import DetachedInstanceError from airflow.api_fastapi.execution_api.datamodels import taskinstance as ti_datamodel # noqa: TC001 from airflow.utils.state import TaskInstanceState @@ -116,8 +115,9 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s except NoInspectionAvailable: return values - # Relationship access may raise DetachedInstanceError; on that path, reload DagRun - # from the DB to avoid crashing the scheduler. + # Relationship access may raise DetachedInstanceError or other SQLAlchemy + # exceptions (e.g. InvalidRequestError when the session is closed); on that + # path, reload the DagRun from the DB to avoid crashing the scheduler. try: events = dag_run.consumed_asset_events set_committed_value( @@ -125,10 +125,10 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s "consumed_asset_events", list(events) if events is not None else [], ) - except DetachedInstanceError: + except Exception: log.warning( - "DagRunContext encountered DetachedInstanceError while accessing " - "consumed_asset_events; reloading DagRun from DB." + "DagRunContext failed to access consumed_asset_events; reloading DagRun from DB.", + exc_info=True, ) from sqlalchemy import select from sqlalchemy.orm import selectinload @@ -137,8 +137,8 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s from airflow.models.dagrun import DagRun from airflow.utils.session import create_session - # Defensive guardrail: reload DagRun with eager-loaded relationships on - # DetachedInstanceError to recover state without adding DB I/O to the hot path. + # Reload DagRun with eager-loaded relationships to recover state + # without adding DB I/O to the hot path. with create_session() as session: dag_run_reloaded = session.scalar( select(DagRun) From 2eaa1f9625ece554cc62a101ef6b73828a714eac Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:25:49 -0400 Subject: [PATCH 2/7] Made produce_dag_callback resilient to DagRunContext failures DagRunContext creation could crash when ORM relationship access failed, preventing the callback from being produced entirely. The callback is now sent with minimal context on failure. --- airflow-core/src/airflow/models/dagrun.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index c93f0ed8e1e13..02657eb0990a3 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -1331,16 +1331,26 @@ def produce_dag_callback( ) -> DagCallbackRequest | None: """Create a callback request for the DAG, or execute the callbacks directly if instructed, and return None.""" if not execute: + try: + context_from_server = DagRunContext( + dag_run=self, + last_ti=relevant_ti, + ) + except Exception: + self.log.exception( + "Failed to build DagRunContext for dag_id=%s run_id=%s; " + "sending callback with minimal context", + self.dag_id, + self.run_id, + ) + context_from_server = None return DagCallbackRequest( filepath=self.dag_model.relative_fileloc, dag_id=self.dag_id, run_id=self.run_id, bundle_name=self.dag_model.bundle_name, bundle_version=self.bundle_version, - context_from_server=DagRunContext( - dag_run=self, - last_ti=relevant_ti, - ), + context_from_server=context_from_server, is_failure_callback=(not success), msg=reason, ) From 057b1266879931c54a9afe9bac8b67f322575dff Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:26:03 -0400 Subject: [PATCH 3/7] Added warning log for callbacks with mismatched bundle_name --- airflow-core/src/airflow/dag_processing/manager.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 76f641da0f32a..69f2c356d2852 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -561,6 +561,15 @@ def _fetch_callbacks( ] for callback in callbacks: req = callback.get_callback_request() + if req.bundle_name not in bundle_names: + self.log.warning( + "Callback for dag_id=%s has bundle_name=%r which is not served by " + "this DAG processor (serving bundles: %s). Skipping.", + getattr(req, "dag_id", "unknown"), + req.bundle_name, + bundle_names, + ) + continue try: callback_queue.append(req) session.delete(callback) From 1ece90c35aceaafbee9616a07428ed2ac4ea1261 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:26:18 -0400 Subject: [PATCH 4/7] Added info log when DAG callback is sent to processor --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index bda95bbf4b06a..9772229bd9598 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2439,6 +2439,12 @@ def _send_dag_callbacks_to_processor( callback: DagCallbackRequest | None = None, ) -> None: if callback: + self.log.info( + "Sending %s callback request for dag_id=%s, run_id=%s to DAG Processor", + "failure" if callback.is_failure_callback else "success", + callback.dag_id, + callback.run_id, + ) self.executor.send_callback(callback) else: self.log.debug("callback is empty") From a2d4cf097c1f42d201625b63c5c5c3116ac54f9a Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:26:33 -0400 Subject: [PATCH 5/7] Added tests for DAG callback resilience and bundle mismatch warning --- .../tests/unit/dag_processing/test_manager.py | 7 ++-- airflow-core/tests/unit/models/test_dagrun.py | 40 +++++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 46f23e6e3b512..8ef7585401e4a 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -1202,8 +1202,8 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path, configure_te assert len(session.scalars(select(DbCallbackRequest)).all()) == 1 @conf_vars({("core", "load_examples"): "False"}) - def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundle): - """Ensure callbacks for bundles not owned by current dag processor manager are ignored and not deleted.""" + def test_fetch_callbacks_skips_other_bundles_with_warning(self, configure_testing_dag_bundle): + """Callbacks for bundles not served by this processor are skipped with a warning log.""" dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py" @@ -1239,10 +1239,9 @@ def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundl # Only the matching callback should be returned assert [c.run_id for c in callbacks] == ["match"] - # The non-matching callback should remain in the DB + # The non-matching callback should remain in the DB for the correct processor remaining = session.scalars(select(DbCallbackRequest)).all() assert len(remaining) == 1 - # Decode remaining request and verify it's for the other bundle remaining_req = remaining[0].get_callback_request() assert remaining_req.bundle_name == "other-bundle" diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index dd34c2d10e7ea..118fbb8a5b230 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -665,6 +665,46 @@ def on_failure_callable(context): ), ) + def test_produce_dag_callback_resilient_to_context_failure(self, testing_dag_bundle, dag_maker, session): + """produce_dag_callback should still return a callback even when DagRunContext creation fails.""" + + def on_failure_callable(context): + pass + + relative_fileloc = "test_produce_dag_callback_resilient.py" + with dag_maker( + dag_id="test_produce_dag_callback_resilient", + schedule=datetime.timedelta(days=1), + start_date=datetime.datetime(2017, 1, 1), + on_failure_callback=on_failure_callable, + ) as dag: + EmptyOperator(task_id="task1") + dm = DagModel.get_dagmodel(dag.dag_id, session=session) + dm.relative_fileloc = relative_fileloc + session.merge(dm) + session.commit() + + initial_task_states = {"task1": TaskInstanceState.FAILED} + dag.relative_fileloc = relative_fileloc + SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="dag_maker") + session.commit() + + dag_run = self.create_dag_run(dag=dag, task_states=initial_task_states, session=session) + dag_run.dag_model = dm + + # Patch DagRunContext to raise an exception during construction + with mock.patch( + "airflow.models.dagrun.DagRunContext", side_effect=RuntimeError("Simulated context failure") + ): + _, callback = dag_run.update_state(execute_callbacks=False) + + assert dag_run.state == DagRunState.FAILED + # Callback should still be produced with context_from_server=None + assert callback is not None + assert callback.dag_id == "test_produce_dag_callback_resilient" + assert callback.is_failure_callback is True + assert callback.context_from_server is None + def test_dagrun_set_state_end_date(self, dag_maker, session): with dag_maker(schedule=datetime.timedelta(days=1), start_date=DEFAULT_DATE): pass From bbcec762996f749bc346e575ed4b9c07e1feb6fd Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:30:36 -0400 Subject: [PATCH 6/7] Added newsfragment for #63374 --- airflow-core/newsfragments/63374.bugfix.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 airflow-core/newsfragments/63374.bugfix.rst diff --git a/airflow-core/newsfragments/63374.bugfix.rst b/airflow-core/newsfragments/63374.bugfix.rst new file mode 100644 index 0000000000000..2df4cc7ddd36d --- /dev/null +++ b/airflow-core/newsfragments/63374.bugfix.rst @@ -0,0 +1 @@ +Fixed DAG-level on_failure_callback not firing when DagRunContext creation failed due to ORM session errors. From dfd11a3323c28757e24c608f7aeef15ef2eeac49 Mon Sep 17 00:00:00 2001 From: Sathvik Chowdary Veerapaneni Date: Mon, 16 Mar 2026 00:43:14 -0400 Subject: [PATCH 7/7] Renamed newsfragment to match PR number --- airflow-core/newsfragments/{63374.bugfix.rst => 63692.bugfix.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow-core/newsfragments/{63374.bugfix.rst => 63692.bugfix.rst} (100%) diff --git a/airflow-core/newsfragments/63374.bugfix.rst b/airflow-core/newsfragments/63692.bugfix.rst similarity index 100% rename from airflow-core/newsfragments/63374.bugfix.rst rename to airflow-core/newsfragments/63692.bugfix.rst