diff --git a/README.md b/README.md index 513c7d8..e5f30cb 100644 --- a/README.md +++ b/README.md @@ -331,8 +331,10 @@ Excluded paths include: - `.sandbox-bin/` - `.sandbox-secrets/` - `.tmp/` +- `.venv/` - `tmp/` - live SQLite WAL/SHM files such as `*.sqlite-wal` and `*.sqlite-shm` +- Unix sockets, FIFOs, device nodes, symlinks, and other non-regular files The tool does not change ACLs. Windows sandbox ACL issues are diagnostic only. @@ -583,6 +585,7 @@ Fetch and follow instructions from https://raw.githubusercontent.com/gaoguobin/c - 不默认删除旧备份。 - 不自动修改 ACL;Windows sandbox ACL 问题只诊断。 - 不备份 `.sandbox-secrets`、`.sandbox`、`.sandbox-bin`、`.tmp`、`tmp`。 +- 不备份 `.venv`、Unix socket、FIFO、device、symlink 等非普通文件。 - 不直接备份 live SQLite WAL/SHM 文件。 - `*.sqlite` 使用 Python `sqlite3` online backup API 复制,并执行 `PRAGMA integrity_check`。 - 第三方 provider 配置是一等公民:`config.toml`、`hooks.json`、`model_provider`、`model_providers`、 diff --git a/skills/claude-code-environment-backup/SKILL.md b/skills/claude-code-environment-backup/SKILL.md index c43e52d..c31e2c5 100644 --- a/skills/claude-code-environment-backup/SKILL.md +++ b/skills/claude-code-environment-backup/SKILL.md @@ -126,7 +126,7 @@ These flows are still natural-language initiated. The user should paste or ask C - Do not delete old backups by default. - Before deleting old backups, verify that a newer backup reported `ok=true`, has an archive and SHA256 file, appears in `list-backups`, and passes restore dry-run. - Do not edit ACLs. Diagnose Windows sandbox ACL problems only. -- Do not include `.sandbox-secrets`, `.sandbox`, `.sandbox-bin`, `.tmp`, `tmp`, or live SQLite WAL/SHM files. +- Do not include `.sandbox-secrets`, `.sandbox`, `.sandbox-bin`, `.tmp`, `.venv`, `tmp`, live SQLite WAL/SHM files, symlinks, Unix sockets, FIFOs, or device nodes. - Do not ask the user to manually run CLI commands in normal backup/doctor/list workflows. ## Periodic backup requests diff --git a/skills/codex-environment-backup/SKILL.md b/skills/codex-environment-backup/SKILL.md index 7acea03..13896e9 100644 --- a/skills/codex-environment-backup/SKILL.md +++ b/skills/codex-environment-backup/SKILL.md @@ -127,7 +127,7 @@ These flows are still natural-language initiated. The user should paste or ask C - Do not delete old backups by default. - Before deleting old backups, verify that a newer backup reported `ok=true`, has an archive and SHA256 file, appears in `list-backups`, and passes restore dry-run. - Do not edit ACLs. Diagnose Windows sandbox ACL problems only. -- Do not include `.sandbox-secrets`, `.sandbox`, `.sandbox-bin`, `.tmp`, `tmp`, or live SQLite WAL/SHM files. +- Do not include `.sandbox-secrets`, `.sandbox`, `.sandbox-bin`, `.tmp`, `.venv`, `tmp`, live SQLite WAL/SHM files, symlinks, Unix sockets, FIFOs, or device nodes. - Do not ask the user to manually run CLI commands in normal backup/doctor/list workflows. ## Periodic backup requests diff --git a/src/agent_environment_backup/core.py b/src/agent_environment_backup/core.py index 9638c34..7f4326b 100644 --- a/src/agent_environment_backup/core.py +++ b/src/agent_environment_backup/core.py @@ -32,6 +32,7 @@ ".sandbox-bin", ".sandbox-secrets", ".tmp", + ".venv", "tmp", } @@ -136,6 +137,15 @@ def is_sqlite_database(path: Path) -> bool: return path.suffix.lower() == ".sqlite" +def regular_file_skip_reason(path: Path) -> str | None: + mode = path.lstat().st_mode + if stat.S_ISLNK(mode): + return "symlink" + if not stat.S_ISREG(mode): + return "not a regular file" + return None + + def sha256_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as handle: @@ -603,6 +613,7 @@ def iter_source_files( home: Path, errors: list[dict[str, str]] | None = None, extra_excluded_dirs: frozenset[str] = frozenset(), + skipped: list[dict[str, str]] | None = None, ) -> Iterator[tuple[Path, Path]]: def onerror(exc: OSError) -> None: entry = walk_error_entry(home, exc, method="walk") @@ -623,6 +634,24 @@ def onerror(exc: OSError) -> None: relative = source.relative_to(home) if is_excluded(relative, extra_excluded_dirs): continue + try: + skip_reason = regular_file_skip_reason(source) + except OSError as exc: + entry = walk_error_entry(home, exc, method="stat") + entry["relative_path"] = normalize_relative(relative) + if errors is not None: + errors.append(entry) + continue + raise BackupError(entry["error"]) from exc + if skip_reason is not None: + if skipped is not None: + skipped.append( + { + "relative_path": normalize_relative(relative), + "reason": skip_reason, + } + ) + continue yield source, relative @@ -772,6 +801,7 @@ def restore_kit_markdown(display_name: str = "Codex") -> str: import os import shutil import sqlite3 + import stat import sys import tarfile import zipfile @@ -779,7 +809,7 @@ def restore_kit_markdown(display_name: str = "Codex") -> str: from pathlib import Path from textwrap import dedent - EXCLUDED_DIR_NAMES = {".sandbox", ".sandbox-bin", ".sandbox-secrets", ".tmp", "tmp"} + EXCLUDED_DIR_NAMES = {".sandbox", ".sandbox-bin", ".sandbox-secrets", ".tmp", ".venv", "tmp"} LIVE_SQLITE_SUFFIXES = (".sqlite-wal", ".sqlite-shm", "-wal", "-shm") PROFILE_HOME_DEFAULTS = { "codex": ".codex", @@ -814,6 +844,14 @@ def is_excluded(relative_path: Path, profile: str = "codex") -> bool: return True return relative_path.name.lower().endswith(LIVE_SQLITE_SUFFIXES) + def regular_file_skip_reason(path: Path) -> str | None: + mode = path.lstat().st_mode + if stat.S_ISLNK(mode): + return "symlink" + if not stat.S_ISREG(mode): + return "not a regular file" + return None + def sha256_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as handle: @@ -841,7 +879,7 @@ def walk_error_entry(base: Path, exc: OSError, method: str) -> dict: "error": str(exc), } - def iter_source_files(source_root: Path, errors: list, profile: str = "codex"): + def iter_source_files(source_root: Path, errors: list, skipped: list, profile: str = "codex"): def onerror(exc: OSError) -> None: errors.append(walk_error_entry(source_root, exc, "walk")) @@ -854,6 +892,18 @@ def onerror(exc: OSError) -> None: relative = source.relative_to(source_root) if is_excluded(relative, profile): continue + try: + skip_reason = regular_file_skip_reason(source) + except OSError as exc: + errors.append({ + "relative_path": relative.as_posix(), + "method": "stat", + "error": str(exc), + }) + continue + if skip_reason is not None: + skipped.append({"relative_path": relative.as_posix(), "reason": skip_reason}) + continue yield source, relative def backup_sqlite_database(source: Path, destination: Path) -> None: @@ -881,7 +931,8 @@ def create_backup(source_home: Path, backup_root: Path, prefix: str, profile: st entries = [] sqlite_checks = [] errors = [] - for source, relative in iter_source_files(source_home, errors, profile): + skipped = [] + for source, relative in iter_source_files(source_home, errors, skipped, profile): destination = files_dir / relative if source.suffix.lower() == ".sqlite": backup_sqlite_database(source, destination) @@ -907,6 +958,7 @@ def create_backup(source_home: Path, backup_root: Path, prefix: str, profile: st "errors": len(errors), }, "errors": errors, + "skipped": skipped, "entries": entries, }) write_json(backup_dir / "sqlite-integrity-check.json", sqlite_checks) @@ -1326,9 +1378,10 @@ def create_backup( entries: list[dict[str, Any]] = [] sqlite_checks: list[dict[str, Any]] = [] errors: list[dict[str, str]] = [] + skipped: list[dict[str, str]] = [] extra_excluded = frozenset(profile.extra_excluded_dirs) - for source, relative in iter_source_files(home, errors, extra_excluded): + for source, relative in iter_source_files(home, errors, extra_excluded, skipped): destination = files_dir / relative method = "copy2" try: @@ -1375,10 +1428,12 @@ def create_backup( }, "entries": entries, "errors": errors, + "skipped": skipped, "counts": { "files": len(entries), "sqlite_databases": sum(1 for entry in entries if entry["method"] == "sqlite_backup"), "errors": len(errors), + "skipped": len(skipped), }, } write_json(backup_dir / "manifest.json", manifest) @@ -1426,6 +1481,7 @@ def create_backup( "restore_kit": restore_kit, "counts": manifest["counts"], "errors": errors, + "skipped": skipped, "sensitive_note": sensitive_note, } @@ -1455,6 +1511,26 @@ def safe_extract_tar(archive_path: Path, destination: Path) -> None: shutil.copyfileobj(extracted, handle) +def validate_tar_members(archive_path: Path) -> dict[str, Any]: + with tarfile.open(archive_path, "r:*") as archive: + root = Path("/__archive_validation__") + for member in archive.getmembers(): + target = (root / member.name).resolve() + if not is_relative_to(target, root): + return { + "ok": False, + "archive": str(archive_path), + "error": f"Archive member escapes extraction root: {member.name}", + } + if not member.isdir() and not member.isfile(): + return { + "ok": False, + "archive": str(archive_path), + "error": f"Unsupported archive member type: {member.name}", + } + return {"ok": True, "archive": str(archive_path)} + + def safe_extract_zip(archive_path: Path, destination: Path) -> None: destination_resolved = destination.resolve() with zipfile.ZipFile(archive_path) as archive: @@ -1479,6 +1555,39 @@ def safe_extract_zip(archive_path: Path, destination: Path) -> None: shutil.copyfileobj(source, handle) +def validate_zip_members(archive_path: Path) -> dict[str, Any]: + with zipfile.ZipFile(archive_path) as archive: + root = Path("/__archive_validation__") + for member in archive.infolist(): + target = (root / member.filename).resolve() + if not is_relative_to(target, root): + return { + "ok": False, + "archive": str(archive_path), + "error": f"Archive member escapes extraction root: {member.filename}", + } + mode = member.external_attr >> 16 + file_type = stat.S_IFMT(mode) + if member.is_dir(): + continue + if file_type and file_type != stat.S_IFREG: + return { + "ok": False, + "archive": str(archive_path), + "error": f"Unsupported archive member type: {member.filename}", + } + return {"ok": True, "archive": str(archive_path)} + + +def validate_archive_members(archive_path: Path) -> dict[str, Any]: + try: + if archive_path.suffix == ".zip": + return validate_zip_members(archive_path) + return validate_tar_members(archive_path) + except Exception as exc: + return {"ok": False, "archive": str(archive_path), "error": str(exc)} + + def locate_backup_dir(path: Path) -> Path: if (path / "manifest.json").exists() and (path / "files").is_dir(): return path @@ -1729,6 +1838,13 @@ def count_files_under(path: Path) -> int: return sum(1 for candidate in path.rglob("*") if candidate.is_file()) +def read_json_file(path: Path) -> dict[str, Any] | list[Any] | None: + try: + return json.loads(path.read_text(encoding="utf-8")) + except Exception: + return None + + def backup_list_item(manifest: Path, data: dict[str, Any]) -> dict[str, Any]: counts = data.get("counts") if isinstance(data.get("counts"), dict) else {} entries = data.get("entries") if isinstance(data.get("entries"), list) else [] @@ -1756,11 +1872,28 @@ def backup_list_item(manifest: Path, data: dict[str, Any]) -> dict[str, Any]: if errors is None: errors = 0 - status = "ok" if schema_version == 1 else "legacy_manifest" archive_candidates = [ manifest.parent.with_name(f"{manifest.parent.name}.tar.gz"), manifest.parent.with_name(f"{manifest.parent.name}.zip"), ] + archive_validation = [ + validate_archive_members(path) + for path in archive_candidates + if path.exists() + ] + sqlite_integrity = read_json_file(manifest.parent / "sqlite-integrity-check.json") + sqlite_failed = ( + isinstance(sqlite_integrity, list) + and any(not check.get("ok") for check in sqlite_integrity if isinstance(check, dict)) + ) + + status = "ok" if schema_version == 1 else "legacy_manifest" + if schema_version == 1 and ( + errors > 0 + or sqlite_failed + or any(not item.get("ok") for item in archive_validation) + ): + status = "failed" item: dict[str, Any] = { "backup_dir": str(manifest.parent), "status": status, @@ -1772,6 +1905,10 @@ def backup_list_item(manifest: Path, data: dict[str, Any]) -> dict[str, Any]: "errors": errors, "archives": [str(path) for path in archive_candidates if path.exists()], } + if archive_validation: + item["archive_validation"] = archive_validation + if sqlite_failed: + item["sqlite_integrity_ok"] = False if status == "legacy_manifest": item["legacy_summary"] = { "generated_at": data.get("generated_at"), diff --git a/tests/test_core.py b/tests/test_core.py index a57f6cd..bc28281 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -3,10 +3,12 @@ import io import json import os +import socket import sqlite3 import subprocess import sys import tarfile +import tempfile import unittest import uuid from contextlib import contextmanager @@ -102,6 +104,7 @@ def test_backup_creates_manifest_and_excludes_live_sqlite_sidecars(self) -> None self.assertTrue(result["ok"], result) self.assertTrue(Path(result["archive"]).exists()) self.assertTrue(Path(result["sha256_file"]).exists()) + self.assertIn(result["archive_sha256"], Path(result["sha256_file"]).read_text(encoding="utf-8")) for helper_path in result["restore_kit"].values(): self.assertTrue(Path(helper_path).exists(), helper_path) restore_ps1 = Path(result["restore_kit"]["restore_ps1"]).read_text(encoding="utf-8") @@ -221,6 +224,9 @@ def fake_walk(top, *args, **kwargs): manifest = json.loads(Path(result["manifest"]).read_text(encoding="utf-8")) self.assertEqual(manifest["counts"]["errors"], 1) self.assertTrue(Path(result["archive"]).exists()) + listing = list_backups(backup_root) + self.assertEqual(listing["backups"][0]["status"], "failed") + self.assertEqual(listing["backups"][0]["errors"], 1) def test_restore_dry_run_and_apply_overlay(self) -> None: with self.temp_root() as temp_dir: @@ -265,6 +271,71 @@ def test_restore_dry_run_and_apply_overlay(self) -> None: self.assertTrue((target_home / "old.txt").exists()) self.assertTrue(restore_result["restore"]["restored_files"] >= 1) + def test_backup_skips_venv_symlinks_and_unix_sockets(self) -> None: + temp_parent = "/tmp" if os.name != "nt" and Path("/tmp").is_dir() else None + with tempfile.TemporaryDirectory(prefix="ceb-", dir=temp_parent) as temp_dir: + root = Path(temp_dir) + home = self.make_home(root) + venv_bin = home / "codex-environment-backup" / ".venv" / "bin" + venv_bin.mkdir(parents=True) + python_target = venv_bin / "python-real" + python_target.write_text("fake python", encoding="utf-8") + try: + (venv_bin / "python").symlink_to(python_target) + except OSError: + (venv_bin / "python").write_text("fake python", encoding="utf-8") + + socket_path = home / "app-server-control" / "app-server-control.sock" + socket_created = False + if hasattr(socket, "AF_UNIX"): + socket_path.parent.mkdir(parents=True) + server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + try: + try: + server.bind(str(socket_path)) + socket_created = True + except OSError: + if os.name != "nt": + raise + socket_created = False + result = create_backup( + home, + backup_root=root / "backups", + timestamp="codex-backup-special-files", + run_doctor_commands=False, + ) + finally: + server.close() + else: + result = create_backup( + home, + backup_root=root / "backups", + timestamp="codex-backup-special-files", + run_doctor_commands=False, + ) + + self.assertTrue(result["ok"], result) + self.assertTrue(Path(result["archive"]).exists()) + self.assertIn(result["archive_sha256"], Path(result["sha256_file"]).read_text(encoding="utf-8")) + + manifest = json.loads(Path(result["manifest"]).read_text(encoding="utf-8")) + paths = {entry["relative_path"] for entry in manifest["entries"]} + self.assertFalse(any(".venv" in path for path in paths), paths) + self.assertNotIn("app-server-control/app-server-control.sock", paths) + self.assertEqual(manifest["counts"]["errors"], 0) + if socket_created: + skipped = {entry["relative_path"]: entry["reason"] for entry in manifest["skipped"]} + self.assertEqual(skipped["app-server-control/app-server-control.sock"], "not a regular file") + + dry_run = restore_backup(Path(result["archive"]), root / "restore-target") + self.assertTrue(dry_run["ok"], dry_run) + self.assertTrue(dry_run["dry_run"]) + + listing = list_backups(root / "backups") + self.assertEqual(listing["backups"][0]["status"], "ok") + self.assertEqual(listing["backups"][0]["errors"], 0) + self.assertTrue(all(item["ok"] for item in listing["backups"][0]["archive_validation"])) + def test_restore_aborts_when_pre_restore_backup_is_incomplete(self) -> None: with self.temp_root() as temp_dir: root = Path(temp_dir) @@ -410,6 +481,40 @@ def test_restore_rejects_tar_symlink_members(self) -> None: with self.assertRaises(BackupError): restore_backup(archive_path, root / "target") + def test_list_backups_marks_invalid_archive_failed(self) -> None: + with self.temp_root() as temp_dir: + root = Path(temp_dir) + backup_root = root / "backups" + backup_dir = backup_root / "codex-backup-invalid-archive" + (backup_dir / "files").mkdir(parents=True) + (backup_dir / "manifest.json").write_text( + json.dumps( + { + "schema_version": 1, + "created_at": "2026-05-20T00:00:00+00:00", + "profile": "codex", + "counts": {"files": 0, "sqlite_databases": 0, "errors": 0}, + "entries": [], + "errors": [], + } + ) + + "\n", + encoding="utf-8", + ) + archive_path = backup_root / "codex-backup-invalid-archive.tar.gz" + with tarfile.open(archive_path, "w:gz") as archive: + info = tarfile.TarInfo("codex-backup-invalid-archive/files/link") + info.type = tarfile.SYMTYPE + info.linkname = "../outside" + archive.addfile(info) + + listing = list_backups(backup_root) + item = listing["backups"][0] + self.assertEqual(item["status"], "failed") + self.assertEqual(item["errors"], 0) + self.assertFalse(item["archive_validation"][0]["ok"]) + self.assertIn("Unsupported archive member type", item["archive_validation"][0]["error"]) + def test_doctor_commands_scope_to_requested_codex_home(self) -> None: with self.temp_root() as temp_dir: root = Path(temp_dir)