Skip to content

Commit 872e812

Browse files
committed
Fix zipped DAG template loading with safe path handling
Switch DAG templating to a zip-aware loader that preserves searchpath order and applies the same path traversal rules as FileSystemLoader. Add tests for mixed paths, ordering, and zip traversal blocking.
1 parent cf8acef commit 872e812

3 files changed

Lines changed: 605 additions & 3 deletions

File tree

task-sdk/src/airflow/sdk/definitions/_internal/templater.py

Lines changed: 266 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919

2020
import datetime
2121
import logging
22-
from collections.abc import Collection, Iterable, Sequence
22+
import os
23+
import re
24+
import zipfile
25+
from collections.abc import Callable, Collection, Iterable, Sequence
2326
from dataclasses import dataclass
2427
from typing import TYPE_CHECKING, Any
2528

@@ -37,6 +40,10 @@
3740
from airflow.sdk.types import Operator
3841

3942

43+
# Regex pattern to detect zip file paths: matches "path/to/archive.zip/inner/path"
44+
ZIP_REGEX = re.compile(rf"((.*\.zip){re.escape(os.sep)})?(.*)")
45+
46+
4047
@dataclass(frozen=True)
4148
class LiteralValue(ResolveMixin):
4249
"""
@@ -57,6 +64,264 @@ def resolve(self, context: Context) -> Any:
5764
log = logging.getLogger(__name__)
5865

5966

67+
class ZipAwareFileSystemLoader(jinja2.FileSystemLoader):
68+
"""
69+
A Jinja2 template loader that extends FileSystemLoader to support loading templates from within zip archives.
70+
71+
This loader handles the case where DAGs are packaged as zip files.
72+
73+
This loader handles the case where DAGs are packaged as zip files. When a
74+
searchpath contains a zip file path (e.g., "/path/to/dags.zip" or
75+
"/path/to/dags.zip/subdir"), templates inside the zip can be loaded transparently.
76+
77+
For regular filesystem paths, it delegates to the parent FileSystemLoader
78+
for optimal performance.
79+
80+
This addresses Issue #59310 where template files in zipped DAG packages
81+
could not be resolved by the standard FileSystemLoader.
82+
83+
:param searchpath: A list of paths to search for templates. Paths can be:
84+
- Regular filesystem directories
85+
- Paths to zip files (will search inside the zip root)
86+
- Paths inside zip files (e.g., "archive.zip/subdir")
87+
:param encoding: The encoding to use when reading template files.
88+
:param followlinks: Whether to follow symbolic links (for regular directories).
89+
90+
Example usage::
91+
92+
loader = ZipAwareFileSystemLoader(["/path/to/dags.zip", "/path/to/templates"])
93+
env = jinja2.Environment(loader=loader)
94+
template = env.get_template("query.sql")
95+
96+
See: https://github.com/apache/airflow/issues/59310
97+
"""
98+
99+
def __init__(
100+
self,
101+
searchpath: str | os.PathLike[str] | Sequence[str | os.PathLike[str]],
102+
encoding: str = "utf-8",
103+
followlinks: bool = False,
104+
) -> None:
105+
# Convert to list first to process
106+
if isinstance(searchpath, (str, os.PathLike)):
107+
searchpath = [searchpath]
108+
all_paths = [os.fspath(p) for p in searchpath]
109+
110+
# Separate zip paths from regular paths at initialization time (once)
111+
# Store zip info by index to preserve searchpath order
112+
self._zip_path_map: dict[int, tuple[str, str]] = {} # {index: (archive_path, internal_base_path)}
113+
regular_paths: list[str] = []
114+
115+
for idx, path in enumerate(all_paths):
116+
zip_info = self._parse_zip_path(path)
117+
if zip_info:
118+
self._zip_path_map[idx] = zip_info
119+
else:
120+
regular_paths.append(path)
121+
122+
# Store regular paths for filesystem lookups
123+
self._regular_searchpaths = regular_paths
124+
125+
# Initialize parent with regular paths only (empty list is OK for our use case)
126+
# We override get_source anyway, so parent's searchpath is only used for list_templates
127+
super().__init__(regular_paths if regular_paths else [], encoding, followlinks)
128+
129+
# Store all paths for reference and error messages
130+
self._all_searchpaths = all_paths
131+
self.searchpath = all_paths
132+
133+
@staticmethod
134+
def _parse_zip_path(path: str) -> tuple[str, str] | None:
135+
"""
136+
Parse a path to extract zip archive and internal path components.
137+
138+
:param path: The path to parse
139+
:return: Tuple of (archive_path, internal_base_path) if path is a zip path,
140+
None otherwise
141+
"""
142+
# Check if the path itself is a zip file (no internal path)
143+
if path.endswith(".zip") and os.path.isfile(path) and zipfile.is_zipfile(path):
144+
return (path, "")
145+
146+
# Check for paths inside a zip (e.g., "archive.zip/subdir")
147+
match = ZIP_REGEX.search(path)
148+
if match:
149+
_, archive, internal = match.groups()
150+
if archive and os.path.isfile(archive) and zipfile.is_zipfile(archive):
151+
return (archive, internal or "")
152+
153+
return None
154+
155+
def _read_from_zip(self, archive_path: str, internal_path: str) -> str:
156+
"""
157+
Read a file from inside a zip archive.
158+
159+
:param archive_path: Path to the zip file
160+
:param internal_path: Path to the file inside the zip
161+
:return: The file contents as a string
162+
:raises TemplateNotFound: If the file doesn't exist in the zip
163+
"""
164+
try:
165+
with zipfile.ZipFile(archive_path, "r") as zf:
166+
# Normalize path separators for zip (always forward slashes)
167+
normalized_path = internal_path.replace(os.sep, "/")
168+
with zf.open(normalized_path) as f:
169+
return f.read().decode(self.encoding)
170+
except KeyError as exc:
171+
raise jinja2.TemplateNotFound(internal_path) from exc
172+
except (OSError, zipfile.BadZipFile) as exc:
173+
raise jinja2.TemplateNotFound(
174+
f"{internal_path} (error reading from {archive_path}: {exc})"
175+
) from exc
176+
177+
def _get_source_from_single_zip(
178+
self, archive_path: str, base_internal_path: str, template: str
179+
) -> tuple[str, str, Callable[[], bool]] | None:
180+
"""
181+
Try to get template source from a single zip archive.
182+
183+
:param archive_path: Path to the zip file
184+
:param base_internal_path: Base path inside the zip (may be empty)
185+
:param template: The name of the template to load
186+
:return: A tuple of (source, filename, uptodate_func) if found, None otherwise
187+
"""
188+
import posixpath
189+
190+
from jinja2.loaders import split_template_path
191+
192+
pieces = split_template_path(template)
193+
if base_internal_path:
194+
internal_path = posixpath.join(base_internal_path, *pieces)
195+
else:
196+
internal_path = posixpath.join(*pieces)
197+
198+
try:
199+
source = self._read_from_zip(archive_path, internal_path)
200+
filename = os.path.join(archive_path, internal_path)
201+
202+
archive_mtime = os.path.getmtime(archive_path)
203+
204+
def uptodate(archive: str = archive_path, mtime: float = archive_mtime) -> bool:
205+
try:
206+
return os.path.getmtime(archive) == mtime
207+
except OSError:
208+
return False
209+
210+
return source, filename, uptodate
211+
except jinja2.TemplateNotFound:
212+
return None
213+
214+
def _get_source_from_filesystem(
215+
self, searchpath: str, template: str
216+
) -> tuple[str, str, Callable[[], bool]] | None:
217+
"""
218+
Try to get template source from a single filesystem path.
219+
220+
:param searchpath: The directory to search in
221+
:param template: The name of the template to load
222+
:return: A tuple of (source, filename, uptodate_func) if found, None otherwise
223+
"""
224+
from jinja2.loaders import split_template_path
225+
226+
pieces = split_template_path(template)
227+
filename = os.path.join(searchpath, *pieces)
228+
229+
if not os.path.isfile(filename):
230+
return None
231+
232+
try:
233+
with open(filename, encoding=self.encoding) as f:
234+
contents = f.read()
235+
236+
mtime = os.path.getmtime(filename)
237+
238+
def uptodate(filepath: str = filename, file_mtime: float = mtime) -> bool:
239+
try:
240+
return os.path.getmtime(filepath) == file_mtime
241+
except OSError:
242+
return False
243+
244+
return contents, os.path.normpath(filename), uptodate
245+
except OSError:
246+
return None
247+
248+
def get_source(
249+
self, environment: jinja2.Environment, template: str
250+
) -> tuple[str, str, Callable[[], bool]]:
251+
"""
252+
Get the template source, filename, and reload helper for a template.
253+
254+
Searches through searchpaths in order, handling both zip archives and
255+
regular filesystem paths according to their original order.
256+
257+
:param environment: The Jinja2 environment
258+
:param template: The name of the template to load
259+
:return: A tuple of (source, filename, uptodate_func)
260+
:raises TemplateNotFound: If the template cannot be found
261+
"""
262+
regular_path_idx = 0
263+
264+
for idx, _path in enumerate(self._all_searchpaths):
265+
if idx in self._zip_path_map:
266+
archive_path, base_internal_path = self._zip_path_map[idx]
267+
result = self._get_source_from_single_zip(archive_path, base_internal_path, template)
268+
if result:
269+
return result
270+
else:
271+
if regular_path_idx < len(self._regular_searchpaths):
272+
result = self._get_source_from_filesystem(
273+
self._regular_searchpaths[regular_path_idx], template
274+
)
275+
regular_path_idx += 1
276+
if result:
277+
return result
278+
279+
# Template not found in any searchpath
280+
raise jinja2.TemplateNotFound(
281+
f"'{template}' not found in search path: {', '.join(repr(p) for p in self._all_searchpaths)}"
282+
)
283+
284+
def list_templates(self) -> list[str]:
285+
"""
286+
Return a list of available templates.
287+
288+
Combines templates from both zip archives and regular filesystem paths.
289+
290+
:return: A sorted list of template names
291+
"""
292+
found: set[str] = set()
293+
294+
# Get templates from zip paths
295+
for archive_path, base_internal_path in self._zip_path_map.values():
296+
try:
297+
with zipfile.ZipFile(archive_path, "r") as zf:
298+
for name in zf.namelist():
299+
# Skip directories
300+
if name.endswith("/"):
301+
continue
302+
if base_internal_path:
303+
prefix = base_internal_path.replace(os.sep, "/") + "/"
304+
if name.startswith(prefix):
305+
relative = name[len(prefix) :]
306+
found.add(relative)
307+
else:
308+
found.add(name)
309+
except (OSError, zipfile.BadZipFile):
310+
continue
311+
312+
# Get templates from regular paths
313+
for searchpath in self._regular_searchpaths:
314+
if not os.path.isdir(searchpath):
315+
continue
316+
for dirpath, _, filenames in os.walk(searchpath, followlinks=self.followlinks):
317+
for filename in filenames:
318+
filepath = os.path.join(dirpath, filename)
319+
relative = os.path.relpath(filepath, searchpath)
320+
found.add(relative.replace(os.sep, "/"))
321+
322+
return sorted(found)
323+
324+
60325
class Templater:
61326
"""
62327
This renders the template fields of object.

task-sdk/src/airflow/sdk/definitions/dag.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -775,16 +775,22 @@ def resolve_template_files(self):
775775

776776
def get_template_env(self, *, force_sandboxed: bool = False) -> jinja2.Environment:
777777
"""Build a Jinja2 environment."""
778-
from airflow.sdk.definitions._internal.templater import NativeEnvironment, SandboxedEnvironment
778+
from airflow.sdk.definitions._internal.templater import (
779+
NativeEnvironment,
780+
SandboxedEnvironment,
781+
ZipAwareFileSystemLoader,
782+
)
779783

780784
# Collect directories to search for template files
781785
searchpath = [self.folder]
782786
if self.template_searchpath:
783787
searchpath += self.template_searchpath
784788

785789
# Default values (for backward compatibility)
790+
# Use ZipAwareFileSystemLoader to support loading templates from zipped DAGs
791+
# See: https://github.com/apache/airflow/issues/59310
786792
jinja_env_options = {
787-
"loader": jinja2.FileSystemLoader(searchpath),
793+
"loader": ZipAwareFileSystemLoader(searchpath),
788794
"undefined": self.template_undefined,
789795
"extensions": ["jinja2.ext.do"],
790796
"cache_size": 0,

0 commit comments

Comments
 (0)