From ed5290a448d25d3c500389900f46ae9441e8736f Mon Sep 17 00:00:00 2001 From: ze-mu-zhou <996029583@qq.com> Date: Sat, 7 Feb 2026 13:34:41 +0800 Subject: [PATCH] fix(parse): prevent Zip Slip path traversal in _extract_zip (CWE-22) Replace unsafe zipfile.extractall() with per-member extraction and 5-layer defense-in-depth validation. --- openviking/parse/parsers/code/code.py | 50 +++++++- tests/README.md | 4 + tests/misc/test_extract_zip.py | 170 ++++++++++++++++++++++++++ 3 files changed, 222 insertions(+), 2 deletions(-) create mode 100644 tests/misc/test_extract_zip.py diff --git a/openviking/parse/parsers/code/code.py b/openviking/parse/parsers/code/code.py index 8e774b4..df8c338 100644 --- a/openviking/parse/parsers/code/code.py +++ b/openviking/parse/parsers/code/code.py @@ -12,9 +12,10 @@ import asyncio import os import shutil +import stat import tempfile import time -from pathlib import Path +from pathlib import Path, PurePosixPath from typing import Any, List, Optional, Union from openviking.parse.base import ( @@ -246,7 +247,52 @@ async def _extract_zip(self, zip_path: str, target_dir: str) -> str: name = path.stem with zipfile.ZipFile(zip_path, "r") as zip_ref: - zip_ref.extractall(target_dir) + target = Path(target_dir).resolve() + for info in zip_ref.infolist(): + mode = info.external_attr >> 16 + # Skip directory entries (check both name convention and external attrs) + if info.is_dir() or stat.S_ISDIR(mode): + continue + # Skip symlink entries to prevent symlink-based escapes + if stat.S_ISLNK(mode): + logger.warning(f"Skipping symlink entry in zip: {info.filename}") + continue + # Reject entries with suspicious raw path components before extraction + raw = info.filename.replace("\\", "/") + raw_parts = [p for p in raw.split("/") if p] + if ".." in raw_parts: + raise ValueError(f"Zip Slip detected: entry {info.filename!r} contains '..'") + if PurePosixPath(raw).is_absolute() or (len(raw) >= 2 and raw[1] == ":"): + raise ValueError( + f"Zip Slip detected: entry {info.filename!r} is an absolute path" + ) + # Normalize the member name the same way zipfile does + # (strip drive/UNC, remove empty/"."/ ".." components) then verify + arcname = info.filename.replace("/", os.sep) + if os.path.altsep: + arcname = arcname.replace(os.path.altsep, os.sep) + arcname = os.path.splitdrive(arcname)[1] + arcname = os.sep.join(p for p in arcname.split(os.sep) if p not in ("", ".", "..")) + if not arcname: + continue # entry normalizes to empty path, skip + member_path = (Path(target_dir) / arcname).resolve() + if not member_path.is_relative_to(target): + raise ValueError( + f"Zip Slip detected: entry {info.filename!r} escapes target directory" + ) + # Extract single member and verify the actual path on disk + extracted = Path(zip_ref.extract(info, target_dir)).resolve() + if not extracted.is_relative_to(target): + # Best-effort cleanup of the escaped file + try: + extracted.unlink(missing_ok=True) + except OSError as cleanup_err: + logger.warning( + f"Failed to clean up escaped file {extracted}: {cleanup_err}" + ) + raise ValueError( + f"Zip Slip detected: entry {info.filename!r} escapes target directory" + ) return name diff --git a/tests/README.md b/tests/README.md index 2d0263a..4c67883 100644 --- a/tests/README.md +++ b/tests/README.md @@ -153,6 +153,10 @@ Miscellaneous tests. | File | Description | Key Test Cases | |------|-------------|----------------| | `test_vikingdb_observer.py` | Database observer | State change notifications, observer registration/unregistration, event filtering | +| `test_code_parser.py` | Code repository parser | `ignore_dirs` compliance, `ignore_extensions` compliance, file type detection, symbolic link handling | +| `test_config_validation.py` | Configuration validation | Config schema validation, required fields, type checking | +| `test_debug_service.py` | Debug service | Debug endpoint tests, service diagnostics | +| `test_extract_zip.py` | Zip extraction security (Zip Slip) | Path traversal prevention (`../`), absolute path rejection, symlink entry filtering, backslash traversal, UNC path rejection, directory entry skipping, normal extraction | ### engine/ diff --git a/tests/misc/test_extract_zip.py b/tests/misc/test_extract_zip.py new file mode 100644 index 0000000..7be5ff8 --- /dev/null +++ b/tests/misc/test_extract_zip.py @@ -0,0 +1,170 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for CodeRepositoryParser._extract_zip Zip Slip protection.""" + +import io +import os +import stat +import zipfile +from pathlib import Path + +import pytest + +from openviking.parse.parsers.code.code import CodeRepositoryParser + + +def _make_zip(entries: dict[str, str], target_path: str) -> None: + """Create a zip file with the given filename->content mapping.""" + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + for name, content in entries.items(): + zf.writestr(name, content) + Path(target_path).write_bytes(buf.getvalue()) + + +def _make_zip_with_symlink(target_path: str) -> None: + """Create a zip containing a symlink entry via raw external_attr.""" + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + info = zipfile.ZipInfo("evil_link") + info.external_attr = (stat.S_IFLNK | 0o777) << 16 + zf.writestr(info, "/etc/passwd") + Path(target_path).write_bytes(buf.getvalue()) + + +def _assert_no_escape(tmp_path: Path, target_dir: str) -> None: + """Assert no files were written outside target_dir within tmp_path.""" + target = Path(target_dir).resolve() + for f in tmp_path.rglob("*"): + resolved = f.resolve() + if resolved == target or resolved.is_relative_to(target): + continue + if f.suffix == ".zip": + continue + raise AssertionError(f"File escaped target_dir: {resolved}") + + +@pytest.fixture +def parser(): + return CodeRepositoryParser() + + +@pytest.fixture +def workspace(tmp_path): + """Provide a temp workspace with zip_path, target_dir, and tmp_path.""" + zip_path = str(tmp_path / "test.zip") + target_dir = str(tmp_path / "extracted") + os.makedirs(target_dir) + return tmp_path, zip_path, target_dir + + +class TestExtractZipNormal: + """Verify normal zip extraction still works.""" + + async def test_extracts_files_correctly(self, parser, workspace): + tmp_path, zip_path, target_dir = workspace + _make_zip( + {"src/main.py": "print('hello')", "README.md": "# Test"}, + zip_path, + ) + name = await parser._extract_zip(zip_path, target_dir) + assert name == "test" + assert (Path(target_dir) / "src" / "main.py").read_text() == "print('hello')" + assert (Path(target_dir) / "README.md").read_text() == "# Test" + + async def test_returns_stem_as_name(self, parser, tmp_path): + zip_path = str(tmp_path / "my-repo.zip") + target_dir = str(tmp_path / "out") + os.makedirs(target_dir) + _make_zip({"a.txt": "content"}, zip_path) + name = await parser._extract_zip(zip_path, target_dir) + assert name == "my-repo" + + +class TestExtractZipPathTraversal: + """Verify Zip Slip path traversal raises ValueError.""" + + async def test_rejects_dot_dot_traversal(self, parser, workspace): + tmp_path, zip_path, target_dir = workspace + _make_zip({"../../evil.txt": "pwned"}, zip_path) + with pytest.raises(ValueError, match="Zip Slip detected"): + await parser._extract_zip(zip_path, target_dir) + _assert_no_escape(tmp_path, target_dir) + + async def test_rejects_absolute_path(self, parser, workspace): + tmp_path, zip_path, target_dir = workspace + _make_zip({"/etc/passwd": "root:x:0:0"}, zip_path) + with pytest.raises(ValueError, match="Zip Slip detected"): + await parser._extract_zip(zip_path, target_dir) + _assert_no_escape(tmp_path, target_dir) + + async def test_rejects_nested_traversal(self, parser, workspace): + tmp_path, zip_path, target_dir = workspace + _make_zip({"foo/../../evil.txt": "pwned"}, zip_path) + with pytest.raises(ValueError, match="Zip Slip detected"): + await parser._extract_zip(zip_path, target_dir) + _assert_no_escape(tmp_path, target_dir) + + @pytest.mark.skipif(os.name != "nt", reason="Windows-specific test") + async def test_rejects_windows_drive_path(self, parser, workspace): + tmp_path, zip_path, target_dir = workspace + _make_zip({"C:\\evil.txt": "pwned"}, zip_path) + with pytest.raises(ValueError, match="Zip Slip detected"): + await parser._extract_zip(zip_path, target_dir) + _assert_no_escape(tmp_path, target_dir) + + async def test_rejects_backslash_traversal(self, parser, workspace): + tmp_path, zip_path, target_dir = workspace + _make_zip({"..\\..\\evil.txt": "pwned"}, zip_path) + with pytest.raises(ValueError, match="Zip Slip detected"): + await parser._extract_zip(zip_path, target_dir) + _assert_no_escape(tmp_path, target_dir) + + async def test_rejects_unc_path(self, parser, workspace): + tmp_path, zip_path, target_dir = workspace + _make_zip({"\\\\server\\share\\evil.txt": "pwned"}, zip_path) + with pytest.raises(ValueError, match="Zip Slip detected"): + await parser._extract_zip(zip_path, target_dir) + _assert_no_escape(tmp_path, target_dir) + + +class TestExtractZipSymlink: + """Verify symlink entries are skipped.""" + + async def test_skips_symlink_entry(self, parser, workspace): + tmp_path, zip_path, target_dir = workspace + _make_zip_with_symlink(zip_path) + await parser._extract_zip(zip_path, target_dir) + extracted_files = list(Path(target_dir).rglob("*")) + assert len(extracted_files) == 0 + + +class TestExtractZipEmptyNormalization: + """Verify entries containing '..' are rejected even if they normalize safely.""" + + async def test_rejects_dot_dot_entry(self, parser, workspace): + tmp_path, zip_path, target_dir = workspace + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + # "./.." contains ".." and must be rejected + info = zipfile.ZipInfo("./..") + info.external_attr = 0 + zf.writestr(info, "should be rejected") + zf.writestr("src/main.py", "print('ok')") + Path(zip_path).write_bytes(buf.getvalue()) + with pytest.raises(ValueError, match="Zip Slip detected"): + await parser._extract_zip(zip_path, target_dir) + + +class TestExtractZipDirectoryEntry: + """Verify explicit directory entries are skipped without error.""" + + async def test_skips_directory_entries(self, parser, workspace): + tmp_path, zip_path, target_dir = workspace + buf = io.BytesIO() + with zipfile.ZipFile(buf, "w") as zf: + zf.writestr("mydir/", "") + zf.writestr("mydir/file.txt", "content") + Path(zip_path).write_bytes(buf.getvalue()) + await parser._extract_zip(zip_path, target_dir) + assert (Path(target_dir) / "mydir" / "file.txt").read_text() == "content"