Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/core-concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ While both Dag constructors get called when the file is accessed, only ``dag_1``

When searching for Dags inside the Dag bundle, Airflow only considers Python files that contain the strings ``airflow`` and ``dag`` (case-insensitively) as an optimization.

To consider all Python files instead, disable the ``DAG_DISCOVERY_SAFE_MODE`` configuration flag.
To consider all Python files instead, set the ``[core] dag_discovery_safe_mode`` configuration flag to ``False``. This is the setting to use when your Dags are defined through a wrapper or abstraction whose source does not contain those strings. The flag is read by the Dag file processor, so set it on that component (and on anything that runs ``airflow dags reserialize``) and restart it for the change to take effect -- otherwise Dags may appear after a manual reserialize and then disappear again on the next processor scan.

You can also provide an ``.airflowignore`` file inside your Dag bundle, or any of its subfolders, which describes patterns of files for the loader to ignore. It covers the directory it's in plus all subfolders underneath it. See :ref:`.airflowignore <concepts:airflowignore>` below for details of the file syntax.

Expand Down
1 change: 1 addition & 0 deletions airflow-core/newsfragments/68518.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Keep Dags from ZIP archives active when ``[core] dag_discovery_safe_mode`` is ``False``. Previously, Dags packaged in ZIP files whose source did not contain the ``airflow``/``dag`` keywords were parsed and activated by the Dag file processor but then immediately deactivated, because the scan that determines which files still exist always applied the keyword heuristic regardless of the configured ``dag_discovery_safe_mode``.
13 changes: 12 additions & 1 deletion airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,18 @@ core:
default: "True"
dag_discovery_safe_mode:
description: |
If enabled, Airflow will only scan files containing both ``DAG`` and ``airflow`` (case-insensitive).
If enabled, Airflow only scans files containing both ``DAG`` and ``airflow`` (case-insensitive)
when looking for Dags. Set this to ``False`` to scan every Python file instead -- required when
your Dags are defined through a wrapper or abstraction whose source does not contain those strings.

This setting is read by the Dag file processor, so it must be configured for that component. In a
separate-process deployment (for example the Helm chart) set it on the ``dag-processor`` as well as
on anything that runs ``airflow dags reserialize``, and restart the component for a change to take
effect. If only some components see ``False``, Dags can appear after a manual reserialize and then
be deactivated again on the next processor scan.

As an alternative to scanning every file, set ``[core] might_contain_dag_callable`` to a custom
heuristic that recognizes your wrapper.
version_added: 1.10.3
type: string
example: ~
Expand Down
30 changes: 27 additions & 3 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,16 @@ class DagFileProcessorManager(LoggingMixin):
factory=_config_get_factory("dag_processor", "file_parsing_sort_mode")
)

dag_discovery_safe_mode: bool = attrs.field(
factory=_config_bool_factory("core", "DAG_DISCOVERY_SAFE_MODE")
)
"""Resolved once per process so file discovery and the deactivation scan use the same value.

When ``False`` the keyword heuristic is bypassed and every Python file is scanned -- this must
apply consistently to discovery (:meth:`_find_files_in_bundle`) and to the set of observed
filelocs (:meth:`_get_observed_filelocs`); otherwise freshly-parsed keyword-less Dags are
deactivated right after being parsed (issue #66104)."""

_api_server: InProcessExecutionAPI = attrs.field(init=False, factory=InProcessExecutionAPI)
"""API server to interact with Metadata DB"""

Expand Down Expand Up @@ -883,8 +893,19 @@ def _find_files_in_bundle(self, bundle: BaseDagBundle) -> list[Path]:
"""Get relative paths for dag files from bundle dir."""
# Build up a list of Python files that could contain DAGs
self.log.info("Searching for files in %s at %s", bundle.name, bundle.path)
rel_paths = [Path(x).relative_to(bundle.path) for x in list_py_file_paths(bundle.path)]
self.log.info("Found %s files for bundle %s", len(rel_paths), bundle.name)
rel_paths = [
Path(x).relative_to(bundle.path)
for x in list_py_file_paths(bundle.path, safe_mode=self.dag_discovery_safe_mode)
]
# Logging the effective safe_mode makes a misconfigured dag-processor diagnosable: a
# component left at the default safe_mode=True silently skips keyword-less files even
# when the deployment set core.dag_discovery_safe_mode=False elsewhere (issue #66104).
self.log.info(
"Found %s files for bundle %s (dag_discovery_safe_mode=%s)",
len(rel_paths),
bundle.name,
self.dag_discovery_safe_mode,
)

return rel_paths

Expand All @@ -902,7 +923,10 @@ def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]:
try:
with zipfile.ZipFile(abs_path) as z:
for info in z.infolist():
if might_contain_dag(info.filename, True, z):
# Must use the same safe_mode as discovery/parsing: with a hardcoded True a
# keyword-less zip member parsed under safe_mode=False would be absent here and
# then deactivated immediately after being parsed (issue #66104).
if might_contain_dag(info.filename, self.dag_discovery_safe_mode, z):
yield os.path.join(abs_path, info.filename)
except zipfile.BadZipFile:
self.log.exception("There was an error accessing ZIP file %s", abs_path)
Expand Down
62 changes: 62 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,41 @@ def _create_zip_bundle_with_valid_and_broken_dags(zip_path: Path) -> None:
)


def _create_zip_bundle_with_keywordless_dag(zip_path: Path) -> None:
"""Build a zip with one keyword-bearing member and one keyword-less ("wrapped") member.

``with_keywords.py`` contains the ``airflow``/``dag`` strings the safe-mode heuristic looks
for. ``no_keywords.py`` mimics a custom wrapper whose source contains neither ``airflow`` nor
``dag``/``asset`` -- exactly the case ``dag_discovery_safe_mode=False`` exists to support
(issue #66104).
"""
with zipfile.ZipFile(zip_path, "w") as zf:
zf.writestr(
"with_keywords.py",
textwrap.dedent(
"""
from airflow.sdk import DAG

with DAG(dag_id="zip_with_keywords"):
pass
"""
),
)
zf.writestr(
"no_keywords.py",
textwrap.dedent(
"""
from mycompany.pipelines import flow


@flow(name="nightly")
def nightly():
run_step("extract")
"""
),
)


class TestDagFileProcessorManager:
@pytest.fixture(autouse=True)
def _disable_examples(self):
Expand Down Expand Up @@ -295,6 +330,33 @@ def test_get_observed_filelocs_expands_zip_inner_paths(self, tmp_path):
"test_zip.zip/broken_dag.py",
}

@pytest.mark.parametrize(
"safe_mode",
[
pytest.param(False, id="safe-mode-off-includes-keywordless"),
pytest.param(True, id="safe-mode-on-filters-keywordless"),
],
)
def test_get_observed_filelocs_respects_dag_discovery_safe_mode(self, tmp_path, safe_mode):
"""ZIP-member discovery used for deactivation must honor the configured safe_mode.

Regression for #66104: with ``dag_discovery_safe_mode=False`` a keyword-less (wrapped)
zip member is parsed and activated, so it must also be reported as observed -- otherwise
it is deactivated right after being parsed. This path previously hardcoded safe_mode=True.
"""
zip_path = tmp_path / "test_zip.zip"
_create_zip_bundle_with_keywordless_dag(zip_path)

manager = DagFileProcessorManager(max_runs=1, dag_discovery_safe_mode=safe_mode)
observed_filelocs = manager._get_observed_filelocs(
{DagFileInfo(bundle_name="testing", rel_path=Path("test_zip.zip"), bundle_path=tmp_path)}
)

expected = {"test_zip.zip/with_keywords.py"}
if not safe_mode:
expected.add("test_zip.zip/no_keywords.py")
assert observed_filelocs == expected

@pytest.mark.usefixtures("clear_parse_import_errors")
def test_refresh_dag_bundles_keeps_zip_inner_file_errors(self, session, tmp_path, configure_dag_bundles):
bundle_path = tmp_path / "bundleone"
Expand Down
Loading