diff --git a/airflow-core/newsfragments/63692.bugfix.rst b/airflow-core/newsfragments/63692.bugfix.rst new file mode 100644 index 0000000000000..2df4cc7ddd36d --- /dev/null +++ b/airflow-core/newsfragments/63692.bugfix.rst @@ -0,0 +1 @@ +Fixed DAG-level on_failure_callback not firing when DagRunContext creation failed due to ORM session errors. diff --git a/airflow-core/src/airflow/callbacks/callback_requests.py b/airflow-core/src/airflow/callbacks/callback_requests.py index ce48438f0e70a..25cb639534901 100644 --- a/airflow-core/src/airflow/callbacks/callback_requests.py +++ b/airflow-core/src/airflow/callbacks/callback_requests.py @@ -24,7 +24,6 @@ from sqlalchemy import inspect as sa_inspect from sqlalchemy.exc import NoInspectionAvailable from sqlalchemy.orm.attributes import set_committed_value -from sqlalchemy.orm.exc import DetachedInstanceError from airflow.api_fastapi.execution_api.datamodels import taskinstance as ti_datamodel # noqa: TC001 from airflow.utils.state import TaskInstanceState @@ -116,8 +115,9 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s except NoInspectionAvailable: return values - # Relationship access may raise DetachedInstanceError; on that path, reload DagRun - # from the DB to avoid crashing the scheduler. + # Relationship access may raise DetachedInstanceError or other SQLAlchemy + # exceptions (e.g. InvalidRequestError when the session is closed); on that + # path, reload the DagRun from the DB to avoid crashing the scheduler. try: events = dag_run.consumed_asset_events set_committed_value( @@ -125,10 +125,10 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s "consumed_asset_events", list(events) if events is not None else [], ) - except DetachedInstanceError: + except Exception: log.warning( - "DagRunContext encountered DetachedInstanceError while accessing " - "consumed_asset_events; reloading DagRun from DB." + "DagRunContext failed to access consumed_asset_events; reloading DagRun from DB.", + exc_info=True, ) from sqlalchemy import select from sqlalchemy.orm import selectinload @@ -137,8 +137,8 @@ def _sanitize_consumed_asset_events(cls, values: Mapping[str, Any]) -> Mapping[s from airflow.models.dagrun import DagRun from airflow.utils.session import create_session - # Defensive guardrail: reload DagRun with eager-loaded relationships on - # DetachedInstanceError to recover state without adding DB I/O to the hot path. + # Reload DagRun with eager-loaded relationships to recover state + # without adding DB I/O to the hot path. with create_session() as session: dag_run_reloaded = session.scalar( select(DagRun) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 76f641da0f32a..69f2c356d2852 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -561,6 +561,15 @@ def _fetch_callbacks( ] for callback in callbacks: req = callback.get_callback_request() + if req.bundle_name not in bundle_names: + self.log.warning( + "Callback for dag_id=%s has bundle_name=%r which is not served by " + "this DAG processor (serving bundles: %s). Skipping.", + getattr(req, "dag_id", "unknown"), + req.bundle_name, + bundle_names, + ) + continue try: callback_queue.append(req) session.delete(callback) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index bda95bbf4b06a..9772229bd9598 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2439,6 +2439,12 @@ def _send_dag_callbacks_to_processor( callback: DagCallbackRequest | None = None, ) -> None: if callback: + self.log.info( + "Sending %s callback request for dag_id=%s, run_id=%s to DAG Processor", + "failure" if callback.is_failure_callback else "success", + callback.dag_id, + callback.run_id, + ) self.executor.send_callback(callback) else: self.log.debug("callback is empty") diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index c93f0ed8e1e13..02657eb0990a3 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -1331,16 +1331,26 @@ def produce_dag_callback( ) -> DagCallbackRequest | None: """Create a callback request for the DAG, or execute the callbacks directly if instructed, and return None.""" if not execute: + try: + context_from_server = DagRunContext( + dag_run=self, + last_ti=relevant_ti, + ) + except Exception: + self.log.exception( + "Failed to build DagRunContext for dag_id=%s run_id=%s; " + "sending callback with minimal context", + self.dag_id, + self.run_id, + ) + context_from_server = None return DagCallbackRequest( filepath=self.dag_model.relative_fileloc, dag_id=self.dag_id, run_id=self.run_id, bundle_name=self.dag_model.bundle_name, bundle_version=self.bundle_version, - context_from_server=DagRunContext( - dag_run=self, - last_ti=relevant_ti, - ), + context_from_server=context_from_server, is_failure_callback=(not success), msg=reason, ) diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 46f23e6e3b512..8ef7585401e4a 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -1202,8 +1202,8 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path, configure_te assert len(session.scalars(select(DbCallbackRequest)).all()) == 1 @conf_vars({("core", "load_examples"): "False"}) - def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundle): - """Ensure callbacks for bundles not owned by current dag processor manager are ignored and not deleted.""" + def test_fetch_callbacks_skips_other_bundles_with_warning(self, configure_testing_dag_bundle): + """Callbacks for bundles not served by this processor are skipped with a warning log.""" dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py" @@ -1239,10 +1239,9 @@ def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundl # Only the matching callback should be returned assert [c.run_id for c in callbacks] == ["match"] - # The non-matching callback should remain in the DB + # The non-matching callback should remain in the DB for the correct processor remaining = session.scalars(select(DbCallbackRequest)).all() assert len(remaining) == 1 - # Decode remaining request and verify it's for the other bundle remaining_req = remaining[0].get_callback_request() assert remaining_req.bundle_name == "other-bundle" diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index dd34c2d10e7ea..118fbb8a5b230 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -665,6 +665,46 @@ def on_failure_callable(context): ), ) + def test_produce_dag_callback_resilient_to_context_failure(self, testing_dag_bundle, dag_maker, session): + """produce_dag_callback should still return a callback even when DagRunContext creation fails.""" + + def on_failure_callable(context): + pass + + relative_fileloc = "test_produce_dag_callback_resilient.py" + with dag_maker( + dag_id="test_produce_dag_callback_resilient", + schedule=datetime.timedelta(days=1), + start_date=datetime.datetime(2017, 1, 1), + on_failure_callback=on_failure_callable, + ) as dag: + EmptyOperator(task_id="task1") + dm = DagModel.get_dagmodel(dag.dag_id, session=session) + dm.relative_fileloc = relative_fileloc + session.merge(dm) + session.commit() + + initial_task_states = {"task1": TaskInstanceState.FAILED} + dag.relative_fileloc = relative_fileloc + SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="dag_maker") + session.commit() + + dag_run = self.create_dag_run(dag=dag, task_states=initial_task_states, session=session) + dag_run.dag_model = dm + + # Patch DagRunContext to raise an exception during construction + with mock.patch( + "airflow.models.dagrun.DagRunContext", side_effect=RuntimeError("Simulated context failure") + ): + _, callback = dag_run.update_state(execute_callbacks=False) + + assert dag_run.state == DagRunState.FAILED + # Callback should still be produced with context_from_server=None + assert callback is not None + assert callback.dag_id == "test_produce_dag_callback_resilient" + assert callback.is_failure_callback is True + assert callback.context_from_server is None + def test_dagrun_set_state_end_date(self, dag_maker, session): with dag_maker(schedule=datetime.timedelta(days=1), start_date=DEFAULT_DATE): pass