From 942843f541e0b9a9b09488d4c62c31a08690cdfb Mon Sep 17 00:00:00 2001 From: deepinsight coder Date: Sat, 13 Jun 2026 22:02:34 +0000 Subject: [PATCH] Keep ZIP-archived Dags active when dag_discovery_safe_mode is False The Dag file processor parsed and activated keyword-less Dags inside ZIP archives but then immediately deactivated them, because the scan that decides which files still exist ignored dag_discovery_safe_mode and always applied the airflow/dag keyword heuristic. Resolve the setting once and use it consistently for discovery and the deactivation scan, and log the effective value so a misconfigured processor is diagnosable. closes: #66104 --- airflow-core/docs/core-concepts/dags.rst | 2 +- airflow-core/newsfragments/68518.bugfix.rst | 1 + .../src/airflow/config_templates/config.yml | 13 +++- .../src/airflow/dag_processing/manager.py | 30 ++++++++- .../tests/unit/dag_processing/test_manager.py | 62 +++++++++++++++++++ 5 files changed, 103 insertions(+), 5 deletions(-) create mode 100644 airflow-core/newsfragments/68518.bugfix.rst diff --git a/airflow-core/docs/core-concepts/dags.rst b/airflow-core/docs/core-concepts/dags.rst index cf7d6ee47167f..9b90be7671077 100644 --- a/airflow-core/docs/core-concepts/dags.rst +++ b/airflow-core/docs/core-concepts/dags.rst @@ -166,7 +166,7 @@ While both Dag constructors get called when the file is accessed, only ``dag_1`` When searching for Dags inside the Dag bundle, Airflow only considers Python files that contain the strings ``airflow`` and ``dag`` (case-insensitively) as an optimization. - To consider all Python files instead, disable the ``DAG_DISCOVERY_SAFE_MODE`` configuration flag. + To consider all Python files instead, set the ``[core] dag_discovery_safe_mode`` configuration flag to ``False``. This is the setting to use when your Dags are defined through a wrapper or abstraction whose source does not contain those strings. The flag is read by the Dag file processor, so set it on that component (and on anything that runs ``airflow dags reserialize``) and restart it for the change to take effect -- otherwise Dags may appear after a manual reserialize and then disappear again on the next processor scan. You can also provide an ``.airflowignore`` file inside your Dag bundle, or any of its subfolders, which describes patterns of files for the loader to ignore. It covers the directory it's in plus all subfolders underneath it. See :ref:`.airflowignore ` below for details of the file syntax. diff --git a/airflow-core/newsfragments/68518.bugfix.rst b/airflow-core/newsfragments/68518.bugfix.rst new file mode 100644 index 0000000000000..db2792de216e5 --- /dev/null +++ b/airflow-core/newsfragments/68518.bugfix.rst @@ -0,0 +1 @@ +Keep Dags from ZIP archives active when ``[core] dag_discovery_safe_mode`` is ``False``. Previously, Dags packaged in ZIP files whose source did not contain the ``airflow``/``dag`` keywords were parsed and activated by the Dag file processor but then immediately deactivated, because the scan that determines which files still exist always applied the keyword heuristic regardless of the configured ``dag_discovery_safe_mode``. diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index eebe2d8c951d5..24ce4da2385a3 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -289,7 +289,18 @@ core: default: "True" dag_discovery_safe_mode: description: | - If enabled, Airflow will only scan files containing both ``DAG`` and ``airflow`` (case-insensitive). + If enabled, Airflow only scans files containing both ``DAG`` and ``airflow`` (case-insensitive) + when looking for Dags. Set this to ``False`` to scan every Python file instead -- required when + your Dags are defined through a wrapper or abstraction whose source does not contain those strings. + + This setting is read by the Dag file processor, so it must be configured for that component. In a + separate-process deployment (for example the Helm chart) set it on the ``dag-processor`` as well as + on anything that runs ``airflow dags reserialize``, and restart the component for a change to take + effect. If only some components see ``False``, Dags can appear after a manual reserialize and then + be deactivated again on the next processor scan. + + As an alternative to scanning every file, set ``[core] might_contain_dag_callable`` to a custom + heuristic that recognizes your wrapper. version_added: 1.10.3 type: string example: ~ diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 783de1747585c..a53165b91c648 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -286,6 +286,16 @@ class DagFileProcessorManager(LoggingMixin): factory=_config_get_factory("dag_processor", "file_parsing_sort_mode") ) + dag_discovery_safe_mode: bool = attrs.field( + factory=_config_bool_factory("core", "DAG_DISCOVERY_SAFE_MODE") + ) + """Resolved once per process so file discovery and the deactivation scan use the same value. + + When ``False`` the keyword heuristic is bypassed and every Python file is scanned -- this must + apply consistently to discovery (:meth:`_find_files_in_bundle`) and to the set of observed + filelocs (:meth:`_get_observed_filelocs`); otherwise freshly-parsed keyword-less Dags are + deactivated right after being parsed (issue #66104).""" + _api_server: InProcessExecutionAPI = attrs.field(init=False, factory=InProcessExecutionAPI) """API server to interact with Metadata DB""" @@ -883,8 +893,19 @@ def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]: """Get relative paths for dag files from bundle dir.""" # Build up a list of Python files that could contain DAGs self.log.info("Searching for files in %s at %s", bundle.name, bundle.path) - rel_paths = [Path(x).relative_to(bundle.path) for x in list_py_file_paths(bundle.path)] - self.log.info("Found %s files for bundle %s", len(rel_paths), bundle.name) + rel_paths = [ + Path(x).relative_to(bundle.path) + for x in list_py_file_paths(bundle.path, safe_mode=self.dag_discovery_safe_mode) + ] + # Logging the effective safe_mode makes a misconfigured dag-processor diagnosable: a + # component left at the default safe_mode=True silently skips keyword-less files even + # when the deployment set core.dag_discovery_safe_mode=False elsewhere (issue #66104). + self.log.info( + "Found %s files for bundle %s (dag_discovery_safe_mode=%s)", + len(rel_paths), + bundle.name, + self.dag_discovery_safe_mode, + ) return rel_paths @@ -902,7 +923,10 @@ def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]: try: with zipfile.ZipFile(abs_path) as z: for info in z.infolist(): - if might_contain_dag(info.filename, True, z): + # Must use the same safe_mode as discovery/parsing: with a hardcoded True a + # keyword-less zip member parsed under safe_mode=False would be absent here and + # then deactivated immediately after being parsed (issue #66104). + if might_contain_dag(info.filename, self.dag_discovery_safe_mode, z): yield os.path.join(abs_path, info.filename) except zipfile.BadZipFile: self.log.exception("There was an error accessing ZIP file %s", abs_path) diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 17795bcc8870c..03c700915db3e 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -157,6 +157,41 @@ def _create_zip_bundle_with_valid_and_broken_dags(zip_path: Path) -> None: ) +def _create_zip_bundle_with_keywordless_dag(zip_path: Path) -> None: + """Build a zip with one keyword-bearing member and one keyword-less ("wrapped") member. + + ``with_keywords.py`` contains the ``airflow``/``dag`` strings the safe-mode heuristic looks + for. ``no_keywords.py`` mimics a custom wrapper whose source contains neither ``airflow`` nor + ``dag``/``asset`` -- exactly the case ``dag_discovery_safe_mode=False`` exists to support + (issue #66104). + """ + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr( + "with_keywords.py", + textwrap.dedent( + """ + from airflow.sdk import DAG + + with DAG(dag_id="zip_with_keywords"): + pass + """ + ), + ) + zf.writestr( + "no_keywords.py", + textwrap.dedent( + """ + from mycompany.pipelines import flow + + + @flow(name="nightly") + def nightly(): + run_step("extract") + """ + ), + ) + + class TestDagFileProcessorManager: @pytest.fixture(autouse=True) def _disable_examples(self): @@ -295,6 +330,33 @@ def test_get_observed_filelocs_expands_zip_inner_paths(self, tmp_path): "test_zip.zip/broken_dag.py", } + @pytest.mark.parametrize( + "safe_mode", + [ + pytest.param(False, id="safe-mode-off-includes-keywordless"), + pytest.param(True, id="safe-mode-on-filters-keywordless"), + ], + ) + def test_get_observed_filelocs_respects_dag_discovery_safe_mode(self, tmp_path, safe_mode): + """ZIP-member discovery used for deactivation must honor the configured safe_mode. + + Regression for #66104: with ``dag_discovery_safe_mode=False`` a keyword-less (wrapped) + zip member is parsed and activated, so it must also be reported as observed -- otherwise + it is deactivated right after being parsed. This path previously hardcoded safe_mode=True. + """ + zip_path = tmp_path / "test_zip.zip" + _create_zip_bundle_with_keywordless_dag(zip_path) + + manager = DagFileProcessorManager(max_runs=1, dag_discovery_safe_mode=safe_mode) + observed_filelocs = manager._get_observed_filelocs( + {DagFileInfo(bundle_name="testing", rel_path=Path("test_zip.zip"), bundle_path=tmp_path)} + ) + + expected = {"test_zip.zip/with_keywords.py"} + if not safe_mode: + expected.add("test_zip.zip/no_keywords.py") + assert observed_filelocs == expected + @pytest.mark.usefixtures("clear_parse_import_errors") def test_refresh_dag_bundles_keeps_zip_inner_file_errors(self, session, tmp_path, configure_dag_bundles): bundle_path = tmp_path / "bundleone"