Skip to content

Commit e8e3fba

Browse files
committed
Fix zipped DAG template loading with safe path handling
Switch DAG templating to a zip-aware loader that preserves searchpath order and applies the same path traversal rules as FileSystemLoader. Add tests for mixed paths, ordering, and zip traversal blocking.
1 parent 91159ed commit e8e3fba

3 files changed

Lines changed: 609 additions & 29 deletions

File tree

task-sdk/src/airflow/sdk/definitions/_internal/templater.py

Lines changed: 268 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919

2020
import datetime
2121
import logging
22-
from collections.abc import Collection, Iterable, Sequence
22+
import os
23+
import re
24+
import zipfile
25+
from collections.abc import Callable, Collection, Iterable, Sequence
2326
from dataclasses import dataclass
2427
from typing import TYPE_CHECKING, Any
2528

@@ -37,6 +40,10 @@
3740
from airflow.sdk.types import Operator
3841

3942

43+
# Regex pattern to detect zip file paths: matches "path/to/archive.zip/inner/path"
44+
ZIP_REGEX = re.compile(rf"((.*\.zip){re.escape(os.sep)})?(.*)")
45+
46+
4047
@dataclass(frozen=True)
4148
class LiteralValue(ResolveMixin):
4249
"""
@@ -57,6 +64,264 @@ def resolve(self, context: Context) -> Any:
5764
log = logging.getLogger(__name__)
5865

5966

67+
class ZipAwareFileSystemLoader(jinja2.FileSystemLoader):
68+
"""
69+
A Jinja2 template loader that extends FileSystemLoader to support loading templates from within zip archives.
70+
71+
This loader handles the case where DAGs are packaged as zip files.
72+
73+
This loader handles the case where DAGs are packaged as zip files. When a
74+
searchpath contains a zip file path (e.g., "/path/to/dags.zip" or
75+
"/path/to/dags.zip/subdir"), templates inside the zip can be loaded transparently.
76+
77+
For regular filesystem paths, it delegates to the parent FileSystemLoader
78+
for optimal performance.
79+
80+
This addresses Issue #59310 where template files in zipped DAG packages
81+
could not be resolved by the standard FileSystemLoader.
82+
83+
:param searchpath: A list of paths to search for templates. Paths can be:
84+
- Regular filesystem directories
85+
- Paths to zip files (will search inside the zip root)
86+
- Paths inside zip files (e.g., "archive.zip/subdir")
87+
:param encoding: The encoding to use when reading template files.
88+
:param followlinks: Whether to follow symbolic links (for regular directories).
89+
90+
Example usage::
91+
92+
loader = ZipAwareFileSystemLoader(["/path/to/dags.zip", "/path/to/templates"])
93+
env = jinja2.Environment(loader=loader)
94+
template = env.get_template("query.sql")
95+
96+
See: https://github.com/apache/airflow/issues/59310
97+
"""
98+
99+
def __init__(
100+
self,
101+
searchpath: str | os.PathLike[str] | Sequence[str | os.PathLike[str]],
102+
encoding: str = "utf-8",
103+
followlinks: bool = False,
104+
) -> None:
105+
# Convert to list first to process
106+
if isinstance(searchpath, (str, os.PathLike)):
107+
searchpath = [searchpath]
108+
all_paths = [os.fspath(p) for p in searchpath]
109+
110+
# Separate zip paths from regular paths at initialization time (once)
111+
# Store zip info by index to preserve searchpath order
112+
self._zip_path_map: dict[int, tuple[str, str]] = {} # {index: (archive_path, internal_base_path)}
113+
regular_paths: list[str] = []
114+
115+
for idx, path in enumerate(all_paths):
116+
zip_info = self._parse_zip_path(path)
117+
if zip_info:
118+
self._zip_path_map[idx] = zip_info
119+
else:
120+
regular_paths.append(path)
121+
122+
# Store regular paths for filesystem lookups
123+
self._regular_searchpaths = regular_paths
124+
125+
# Initialize parent with regular paths only (empty list is OK for our use case)
126+
# We override get_source anyway, so parent's searchpath is only used for list_templates
127+
super().__init__(regular_paths if regular_paths else [], encoding, followlinks)
128+
129+
# Store all paths for reference and error messages
130+
self._all_searchpaths = all_paths
131+
self.searchpath = all_paths
132+
133+
@staticmethod
134+
def _parse_zip_path(path: str) -> tuple[str, str] | None:
135+
"""
136+
Parse a path to extract zip archive and internal path components.
137+
138+
:param path: The path to parse
139+
:return: Tuple of (archive_path, internal_base_path) if path is a zip path,
140+
None otherwise
141+
"""
142+
# Check if the path itself is a zip file (no internal path)
143+
if path.endswith(".zip") and os.path.isfile(path) and zipfile.is_zipfile(path):
144+
return (path, "")
145+
146+
# Check for paths inside a zip (e.g., "archive.zip/subdir")
147+
match = ZIP_REGEX.search(path)
148+
if match:
149+
_, archive, internal = match.groups()
150+
if archive and os.path.isfile(archive) and zipfile.is_zipfile(archive):
151+
return (archive, internal or "")
152+
153+
return None
154+
155+
def _read_from_zip(self, archive_path: str, internal_path: str) -> str:
156+
"""
157+
Read a file from inside a zip archive.
158+
159+
:param archive_path: Path to the zip file
160+
:param internal_path: Path to the file inside the zip
161+
:return: The file contents as a string
162+
:raises TemplateNotFound: If the file doesn't exist in the zip
163+
"""
164+
try:
165+
with zipfile.ZipFile(archive_path, "r") as zf:
166+
# Normalize path separators for zip (always forward slashes)
167+
normalized_path = internal_path.replace(os.sep, "/")
168+
with zf.open(normalized_path) as f:
169+
return f.read().decode(self.encoding)
170+
except KeyError as exc:
171+
raise jinja2.TemplateNotFound(internal_path) from exc
172+
except (OSError, zipfile.BadZipFile) as exc:
173+
raise jinja2.TemplateNotFound(
174+
f"{internal_path} (error reading from {archive_path}: {exc})"
175+
) from exc
176+
177+
def _get_source_from_single_zip(
178+
self, archive_path: str, base_internal_path: str, template: str
179+
) -> tuple[str, str, Callable[[], bool]] | None:
180+
"""
181+
Try to get template source from a single zip archive.
182+
183+
:param archive_path: Path to the zip file
184+
:param base_internal_path: Base path inside the zip (may be empty)
185+
:param template: The name of the template to load
186+
:return: A tuple of (source, filename, uptodate_func) if found, None otherwise
187+
"""
188+
import posixpath
189+
190+
from jinja2.loaders import split_template_path
191+
192+
pieces = split_template_path(template)
193+
if base_internal_path:
194+
internal_path = posixpath.join(base_internal_path, *pieces)
195+
else:
196+
internal_path = posixpath.join(*pieces)
197+
198+
try:
199+
source = self._read_from_zip(archive_path, internal_path)
200+
filename = os.path.join(archive_path, internal_path)
201+
202+
archive_mtime = os.path.getmtime(archive_path)
203+
204+
def uptodate(archive: str = archive_path, mtime: float = archive_mtime) -> bool:
205+
try:
206+
return os.path.getmtime(archive) == mtime
207+
except OSError:
208+
return False
209+
210+
return source, filename, uptodate
211+
except jinja2.TemplateNotFound:
212+
return None
213+
214+
def _get_source_from_filesystem(
215+
self, searchpath: str, template: str
216+
) -> tuple[str, str, Callable[[], bool]] | None:
217+
"""
218+
Try to get template source from a single filesystem path.
219+
220+
:param searchpath: The directory to search in
221+
:param template: The name of the template to load
222+
:return: A tuple of (source, filename, uptodate_func) if found, None otherwise
223+
"""
224+
from jinja2.loaders import split_template_path
225+
226+
pieces = split_template_path(template)
227+
filename = os.path.join(searchpath, *pieces)
228+
229+
if not os.path.isfile(filename):
230+
return None
231+
232+
try:
233+
with open(filename, encoding=self.encoding) as f:
234+
contents = f.read()
235+
236+
mtime = os.path.getmtime(filename)
237+
238+
def uptodate(filepath: str = filename, file_mtime: float = mtime) -> bool:
239+
try:
240+
return os.path.getmtime(filepath) == file_mtime
241+
except OSError:
242+
return False
243+
244+
return contents, os.path.normpath(filename), uptodate
245+
except OSError:
246+
return None
247+
248+
def get_source(
249+
self, environment: jinja2.Environment, template: str
250+
) -> tuple[str, str, Callable[[], bool]]:
251+
"""
252+
Get the template source, filename, and reload helper for a template.
253+
254+
Searches through searchpaths in order, handling both zip archives and
255+
regular filesystem paths according to their original order.
256+
257+
:param environment: The Jinja2 environment
258+
:param template: The name of the template to load
259+
:return: A tuple of (source, filename, uptodate_func)
260+
:raises TemplateNotFound: If the template cannot be found
261+
"""
262+
regular_path_idx = 0
263+
264+
for idx, _path in enumerate(self._all_searchpaths):
265+
if idx in self._zip_path_map:
266+
archive_path, base_internal_path = self._zip_path_map[idx]
267+
result = self._get_source_from_single_zip(archive_path, base_internal_path, template)
268+
if result:
269+
return result
270+
else:
271+
if regular_path_idx < len(self._regular_searchpaths):
272+
result = self._get_source_from_filesystem(
273+
self._regular_searchpaths[regular_path_idx], template
274+
)
275+
regular_path_idx += 1
276+
if result:
277+
return result
278+
279+
# Template not found in any searchpath
280+
raise jinja2.TemplateNotFound(
281+
f"'{template}' not found in search path: {', '.join(repr(p) for p in self._all_searchpaths)}"
282+
)
283+
284+
def list_templates(self) -> list[str]:
285+
"""
286+
Return a list of available templates.
287+
288+
Combines templates from both zip archives and regular filesystem paths.
289+
290+
:return: A sorted list of template names
291+
"""
292+
found: set[str] = set()
293+
294+
# Get templates from zip paths
295+
for archive_path, base_internal_path in self._zip_path_map.values():
296+
try:
297+
with zipfile.ZipFile(archive_path, "r") as zf:
298+
for name in zf.namelist():
299+
# Skip directories
300+
if name.endswith("/"):
301+
continue
302+
if base_internal_path:
303+
prefix = base_internal_path.replace(os.sep, "/") + "/"
304+
if name.startswith(prefix):
305+
relative = name[len(prefix) :]
306+
found.add(relative)
307+
else:
308+
found.add(name)
309+
except (OSError, zipfile.BadZipFile):
310+
continue
311+
312+
# Get templates from regular paths
313+
for searchpath in self._regular_searchpaths:
314+
if not os.path.isdir(searchpath):
315+
continue
316+
for dirpath, _, filenames in os.walk(searchpath, followlinks=self.followlinks):
317+
for filename in filenames:
318+
filepath = os.path.join(dirpath, filename)
319+
relative = os.path.relpath(filepath, searchpath)
320+
found.add(relative.replace(os.sep, "/"))
321+
322+
return sorted(found)
323+
324+
60325
class Templater:
61326
"""
62327
This renders the template fields of object.
@@ -108,14 +373,6 @@ def resolve_template_files(self) -> None:
108373
log.exception("Failed to get source %s", item)
109374
self.prepare_template()
110375

111-
def _should_render_native(self, dag: DAG | None = None) -> bool:
112-
# Operator explicitly set? Use that value, otherwise inherit from DAG
113-
render_op_template_as_native_obj = getattr(self, "render_template_as_native_obj", None)
114-
if render_op_template_as_native_obj is not None:
115-
return render_op_template_as_native_obj
116-
117-
return dag.render_template_as_native_obj if dag else False
118-
119376
def _do_render_template_fields(
120377
self,
121378
parent: Any,
@@ -136,7 +393,7 @@ def _do_render_template_fields(
136393
setattr(parent, attr_name, rendered_content)
137394

138395
def _render(self, template, context, dag=None) -> Any:
139-
if self._should_render_native(dag):
396+
if dag and dag.render_template_as_native_obj:
140397
return render_template_as_native(template, context)
141398
return render_template_to_string(template, context)
142399

@@ -316,7 +573,7 @@ def create_template_env(
316573
"cache_size": 0,
317574
}
318575
if searchpath:
319-
jinja_env_options["loader"] = jinja2.FileSystemLoader(searchpath)
576+
jinja_env_options["loader"] = ZipAwareFileSystemLoader(searchpath)
320577
if jinja_environment_kwargs:
321578
jinja_env_options.update(jinja_environment_kwargs)
322579

0 commit comments

Comments
 (0)