Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions openviking/parse/parsers/code/code.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
import asyncio
import os
import shutil
import stat
import tempfile
import time
from pathlib import Path
from pathlib import Path, PurePosixPath
from typing import Any, List, Optional, Union

from openviking.parse.base import (
Expand Down Expand Up @@ -246,7 +247,52 @@ async def _extract_zip(self, zip_path: str, target_dir: str) -> str:
name = path.stem

with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(target_dir)
target = Path(target_dir).resolve()
for info in zip_ref.infolist():
mode = info.external_attr >> 16
# Skip directory entries (check both name convention and external attrs)
if info.is_dir() or stat.S_ISDIR(mode):
continue
# Skip symlink entries to prevent symlink-based escapes
if stat.S_ISLNK(mode):
logger.warning(f"Skipping symlink entry in zip: {info.filename}")
continue
# Reject entries with suspicious raw path components before extraction
raw = info.filename.replace("\\", "/")
raw_parts = [p for p in raw.split("/") if p]
if ".." in raw_parts:
raise ValueError(f"Zip Slip detected: entry {info.filename!r} contains '..'")
if PurePosixPath(raw).is_absolute() or (len(raw) >= 2 and raw[1] == ":"):
raise ValueError(
f"Zip Slip detected: entry {info.filename!r} is an absolute path"
)
# Normalize the member name the same way zipfile does
# (strip drive/UNC, remove empty/"."/ ".." components) then verify
arcname = info.filename.replace("/", os.sep)
if os.path.altsep:
arcname = arcname.replace(os.path.altsep, os.sep)
arcname = os.path.splitdrive(arcname)[1]
arcname = os.sep.join(p for p in arcname.split(os.sep) if p not in ("", ".", ".."))
if not arcname:
continue # entry normalizes to empty path, skip
member_path = (Path(target_dir) / arcname).resolve()
if not member_path.is_relative_to(target):
raise ValueError(
f"Zip Slip detected: entry {info.filename!r} escapes target directory"
)
# Extract single member and verify the actual path on disk
extracted = Path(zip_ref.extract(info, target_dir)).resolve()
if not extracted.is_relative_to(target):
# Best-effort cleanup of the escaped file
try:
extracted.unlink(missing_ok=True)
except OSError as cleanup_err:
logger.warning(
f"Failed to clean up escaped file {extracted}: {cleanup_err}"
)
raise ValueError(
f"Zip Slip detected: entry {info.filename!r} escapes target directory"
)

return name

Expand Down
4 changes: 4 additions & 0 deletions tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ Miscellaneous tests.
| File | Description | Key Test Cases |
|------|-------------|----------------|
| `test_vikingdb_observer.py` | Database observer | State change notifications, observer registration/unregistration, event filtering |
| `test_code_parser.py` | Code repository parser | `ignore_dirs` compliance, `ignore_extensions` compliance, file type detection, symbolic link handling |
| `test_config_validation.py` | Configuration validation | Config schema validation, required fields, type checking |
| `test_debug_service.py` | Debug service | Debug endpoint tests, service diagnostics |
| `test_extract_zip.py` | Zip extraction security (Zip Slip) | Path traversal prevention (`../`), absolute path rejection, symlink entry filtering, backslash traversal, UNC path rejection, directory entry skipping, normal extraction |

### engine/

Expand Down
170 changes: 170 additions & 0 deletions tests/misc/test_extract_zip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
"""Tests for CodeRepositoryParser._extract_zip Zip Slip protection."""

import io
import os
import stat
import zipfile
from pathlib import Path

import pytest

from openviking.parse.parsers.code.code import CodeRepositoryParser


def _make_zip(entries: dict[str, str], target_path: str) -> None:
"""Create a zip file with the given filename->content mapping."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
for name, content in entries.items():
zf.writestr(name, content)
Path(target_path).write_bytes(buf.getvalue())


def _make_zip_with_symlink(target_path: str) -> None:
"""Create a zip containing a symlink entry via raw external_attr."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
info = zipfile.ZipInfo("evil_link")
info.external_attr = (stat.S_IFLNK | 0o777) << 16
zf.writestr(info, "/etc/passwd")
Path(target_path).write_bytes(buf.getvalue())


def _assert_no_escape(tmp_path: Path, target_dir: str) -> None:
"""Assert no files were written outside target_dir within tmp_path."""
target = Path(target_dir).resolve()
for f in tmp_path.rglob("*"):
resolved = f.resolve()
if resolved == target or resolved.is_relative_to(target):
continue
if f.suffix == ".zip":
continue
raise AssertionError(f"File escaped target_dir: {resolved}")


@pytest.fixture
def parser():
return CodeRepositoryParser()


@pytest.fixture
def workspace(tmp_path):
"""Provide a temp workspace with zip_path, target_dir, and tmp_path."""
zip_path = str(tmp_path / "test.zip")
target_dir = str(tmp_path / "extracted")
os.makedirs(target_dir)
return tmp_path, zip_path, target_dir


class TestExtractZipNormal:
"""Verify normal zip extraction still works."""

async def test_extracts_files_correctly(self, parser, workspace):
tmp_path, zip_path, target_dir = workspace
_make_zip(
{"src/main.py": "print('hello')", "README.md": "# Test"},
zip_path,
)
name = await parser._extract_zip(zip_path, target_dir)
assert name == "test"
assert (Path(target_dir) / "src" / "main.py").read_text() == "print('hello')"
assert (Path(target_dir) / "README.md").read_text() == "# Test"

async def test_returns_stem_as_name(self, parser, tmp_path):
zip_path = str(tmp_path / "my-repo.zip")
target_dir = str(tmp_path / "out")
os.makedirs(target_dir)
_make_zip({"a.txt": "content"}, zip_path)
name = await parser._extract_zip(zip_path, target_dir)
assert name == "my-repo"


class TestExtractZipPathTraversal:
"""Verify Zip Slip path traversal raises ValueError."""

async def test_rejects_dot_dot_traversal(self, parser, workspace):
tmp_path, zip_path, target_dir = workspace
_make_zip({"../../evil.txt": "pwned"}, zip_path)
with pytest.raises(ValueError, match="Zip Slip detected"):
await parser._extract_zip(zip_path, target_dir)
_assert_no_escape(tmp_path, target_dir)

async def test_rejects_absolute_path(self, parser, workspace):
tmp_path, zip_path, target_dir = workspace
_make_zip({"/etc/passwd": "root:x:0:0"}, zip_path)
with pytest.raises(ValueError, match="Zip Slip detected"):
await parser._extract_zip(zip_path, target_dir)
_assert_no_escape(tmp_path, target_dir)

async def test_rejects_nested_traversal(self, parser, workspace):
tmp_path, zip_path, target_dir = workspace
_make_zip({"foo/../../evil.txt": "pwned"}, zip_path)
with pytest.raises(ValueError, match="Zip Slip detected"):
await parser._extract_zip(zip_path, target_dir)
_assert_no_escape(tmp_path, target_dir)

@pytest.mark.skipif(os.name != "nt", reason="Windows-specific test")
async def test_rejects_windows_drive_path(self, parser, workspace):
tmp_path, zip_path, target_dir = workspace
_make_zip({"C:\\evil.txt": "pwned"}, zip_path)
with pytest.raises(ValueError, match="Zip Slip detected"):
await parser._extract_zip(zip_path, target_dir)
_assert_no_escape(tmp_path, target_dir)

async def test_rejects_backslash_traversal(self, parser, workspace):
tmp_path, zip_path, target_dir = workspace
_make_zip({"..\\..\\evil.txt": "pwned"}, zip_path)
with pytest.raises(ValueError, match="Zip Slip detected"):
await parser._extract_zip(zip_path, target_dir)
_assert_no_escape(tmp_path, target_dir)

async def test_rejects_unc_path(self, parser, workspace):
tmp_path, zip_path, target_dir = workspace
_make_zip({"\\\\server\\share\\evil.txt": "pwned"}, zip_path)
with pytest.raises(ValueError, match="Zip Slip detected"):
await parser._extract_zip(zip_path, target_dir)
_assert_no_escape(tmp_path, target_dir)


class TestExtractZipSymlink:
"""Verify symlink entries are skipped."""

async def test_skips_symlink_entry(self, parser, workspace):
tmp_path, zip_path, target_dir = workspace
_make_zip_with_symlink(zip_path)
await parser._extract_zip(zip_path, target_dir)
extracted_files = list(Path(target_dir).rglob("*"))
assert len(extracted_files) == 0


class TestExtractZipEmptyNormalization:
"""Verify entries containing '..' are rejected even if they normalize safely."""

async def test_rejects_dot_dot_entry(self, parser, workspace):
tmp_path, zip_path, target_dir = workspace
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
# "./.." contains ".." and must be rejected
info = zipfile.ZipInfo("./..")
info.external_attr = 0
zf.writestr(info, "should be rejected")
zf.writestr("src/main.py", "print('ok')")
Path(zip_path).write_bytes(buf.getvalue())
with pytest.raises(ValueError, match="Zip Slip detected"):
await parser._extract_zip(zip_path, target_dir)


class TestExtractZipDirectoryEntry:
"""Verify explicit directory entries are skipped without error."""

async def test_skips_directory_entries(self, parser, workspace):
tmp_path, zip_path, target_dir = workspace
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("mydir/", "")
zf.writestr("mydir/file.txt", "content")
Path(zip_path).write_bytes(buf.getvalue())
await parser._extract_zip(zip_path, target_dir)
assert (Path(target_dir) / "mydir" / "file.txt").read_text() == "content"
Loading