Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,10 @@ Excluded paths include:
- `.sandbox-bin/`
- `.sandbox-secrets/`
- `.tmp/`
- `.venv/`
- `tmp/`
- live SQLite WAL/SHM files such as `*.sqlite-wal` and `*.sqlite-shm`
- Unix sockets, FIFOs, device nodes, symlinks, and other non-regular files

The tool does not change ACLs. Windows sandbox ACL issues are diagnostic only.

Expand Down Expand Up @@ -583,6 +585,7 @@ Fetch and follow instructions from https://raw.githubusercontent.com/gaoguobin/c
- 不默认删除旧备份。
- 不自动修改 ACL;Windows sandbox ACL 问题只诊断。
- 不备份 `.sandbox-secrets`、`.sandbox`、`.sandbox-bin`、`.tmp`、`tmp`。
- 不备份 `.venv`、Unix socket、FIFO、device、symlink 等非普通文件。
- 不直接备份 live SQLite WAL/SHM 文件。
- `*.sqlite` 使用 Python `sqlite3` online backup API 复制,并执行 `PRAGMA integrity_check`。
- 第三方 provider 配置是一等公民:`config.toml`、`hooks.json`、`model_provider`、`model_providers`、
Expand Down
2 changes: 1 addition & 1 deletion skills/claude-code-environment-backup/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ These flows are still natural-language initiated. The user should paste or ask C
- Do not delete old backups by default.
- Before deleting old backups, verify that a newer backup reported `ok=true`, has an archive and SHA256 file, appears in `list-backups`, and passes restore dry-run.
- Do not edit ACLs. Diagnose Windows sandbox ACL problems only.
- Do not include `.sandbox-secrets`, `.sandbox`, `.sandbox-bin`, `.tmp`, `tmp`, or live SQLite WAL/SHM files.
- Do not include `.sandbox-secrets`, `.sandbox`, `.sandbox-bin`, `.tmp`, `.venv`, `tmp`, live SQLite WAL/SHM files, symlinks, Unix sockets, FIFOs, or device nodes.
- Do not ask the user to manually run CLI commands in normal backup/doctor/list workflows.

## Periodic backup requests
Expand Down
2 changes: 1 addition & 1 deletion skills/codex-environment-backup/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ These flows are still natural-language initiated. The user should paste or ask C
- Do not delete old backups by default.
- Before deleting old backups, verify that a newer backup reported `ok=true`, has an archive and SHA256 file, appears in `list-backups`, and passes restore dry-run.
- Do not edit ACLs. Diagnose Windows sandbox ACL problems only.
- Do not include `.sandbox-secrets`, `.sandbox`, `.sandbox-bin`, `.tmp`, `tmp`, or live SQLite WAL/SHM files.
- Do not include `.sandbox-secrets`, `.sandbox`, `.sandbox-bin`, `.tmp`, `.venv`, `tmp`, live SQLite WAL/SHM files, symlinks, Unix sockets, FIFOs, or device nodes.
- Do not ask the user to manually run CLI commands in normal backup/doctor/list workflows.

## Periodic backup requests
Expand Down
147 changes: 142 additions & 5 deletions src/agent_environment_backup/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
".sandbox-bin",
".sandbox-secrets",
".tmp",
".venv",
"tmp",
}

Expand Down Expand Up @@ -136,6 +137,15 @@ def is_sqlite_database(path: Path) -> bool:
return path.suffix.lower() == ".sqlite"


def regular_file_skip_reason(path: Path) -> str | None:
mode = path.lstat().st_mode
if stat.S_ISLNK(mode):
return "symlink"
if not stat.S_ISREG(mode):
return "not a regular file"
return None


def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
Expand Down Expand Up @@ -603,6 +613,7 @@ def iter_source_files(
home: Path,
errors: list[dict[str, str]] | None = None,
extra_excluded_dirs: frozenset[str] = frozenset(),
skipped: list[dict[str, str]] | None = None,
) -> Iterator[tuple[Path, Path]]:
def onerror(exc: OSError) -> None:
entry = walk_error_entry(home, exc, method="walk")
Expand All @@ -623,6 +634,24 @@ def onerror(exc: OSError) -> None:
relative = source.relative_to(home)
if is_excluded(relative, extra_excluded_dirs):
continue
try:
skip_reason = regular_file_skip_reason(source)
except OSError as exc:
entry = walk_error_entry(home, exc, method="stat")
entry["relative_path"] = normalize_relative(relative)
if errors is not None:
errors.append(entry)
continue
raise BackupError(entry["error"]) from exc
if skip_reason is not None:
if skipped is not None:
skipped.append(
{
"relative_path": normalize_relative(relative),
"reason": skip_reason,
}
)
continue
yield source, relative


Expand Down Expand Up @@ -772,14 +801,15 @@ def restore_kit_markdown(display_name: str = "Codex") -> str:
import os
import shutil
import sqlite3
import stat
import sys
import tarfile
import zipfile
from datetime import datetime
from pathlib import Path
from textwrap import dedent

EXCLUDED_DIR_NAMES = {".sandbox", ".sandbox-bin", ".sandbox-secrets", ".tmp", "tmp"}
EXCLUDED_DIR_NAMES = {".sandbox", ".sandbox-bin", ".sandbox-secrets", ".tmp", ".venv", "tmp"}
LIVE_SQLITE_SUFFIXES = (".sqlite-wal", ".sqlite-shm", "-wal", "-shm")
PROFILE_HOME_DEFAULTS = {
"codex": ".codex",
Expand Down Expand Up @@ -814,6 +844,14 @@ def is_excluded(relative_path: Path, profile: str = "codex") -> bool:
return True
return relative_path.name.lower().endswith(LIVE_SQLITE_SUFFIXES)

def regular_file_skip_reason(path: Path) -> str | None:
mode = path.lstat().st_mode
if stat.S_ISLNK(mode):
return "symlink"
if not stat.S_ISREG(mode):
return "not a regular file"
return None

def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
Expand Down Expand Up @@ -841,7 +879,7 @@ def walk_error_entry(base: Path, exc: OSError, method: str) -> dict:
"error": str(exc),
}

def iter_source_files(source_root: Path, errors: list, profile: str = "codex"):
def iter_source_files(source_root: Path, errors: list, skipped: list, profile: str = "codex"):
def onerror(exc: OSError) -> None:
errors.append(walk_error_entry(source_root, exc, "walk"))

Expand All @@ -854,6 +892,18 @@ def onerror(exc: OSError) -> None:
relative = source.relative_to(source_root)
if is_excluded(relative, profile):
continue
try:
skip_reason = regular_file_skip_reason(source)
except OSError as exc:
errors.append({
"relative_path": relative.as_posix(),
"method": "stat",
"error": str(exc),
})
continue
if skip_reason is not None:
skipped.append({"relative_path": relative.as_posix(), "reason": skip_reason})
continue
yield source, relative

def backup_sqlite_database(source: Path, destination: Path) -> None:
Expand Down Expand Up @@ -881,7 +931,8 @@ def create_backup(source_home: Path, backup_root: Path, prefix: str, profile: st
entries = []
sqlite_checks = []
errors = []
for source, relative in iter_source_files(source_home, errors, profile):
skipped = []
for source, relative in iter_source_files(source_home, errors, skipped, profile):
destination = files_dir / relative
if source.suffix.lower() == ".sqlite":
backup_sqlite_database(source, destination)
Expand All @@ -907,6 +958,7 @@ def create_backup(source_home: Path, backup_root: Path, prefix: str, profile: st
"errors": len(errors),
},
"errors": errors,
"skipped": skipped,
"entries": entries,
})
write_json(backup_dir / "sqlite-integrity-check.json", sqlite_checks)
Expand Down Expand Up @@ -1326,9 +1378,10 @@ def create_backup(
entries: list[dict[str, Any]] = []
sqlite_checks: list[dict[str, Any]] = []
errors: list[dict[str, str]] = []
skipped: list[dict[str, str]] = []
extra_excluded = frozenset(profile.extra_excluded_dirs)

for source, relative in iter_source_files(home, errors, extra_excluded):
for source, relative in iter_source_files(home, errors, extra_excluded, skipped):
destination = files_dir / relative
method = "copy2"
try:
Expand Down Expand Up @@ -1375,10 +1428,12 @@ def create_backup(
},
"entries": entries,
"errors": errors,
"skipped": skipped,
"counts": {
"files": len(entries),
"sqlite_databases": sum(1 for entry in entries if entry["method"] == "sqlite_backup"),
"errors": len(errors),
"skipped": len(skipped),
},
}
write_json(backup_dir / "manifest.json", manifest)
Expand Down Expand Up @@ -1426,6 +1481,7 @@ def create_backup(
"restore_kit": restore_kit,
"counts": manifest["counts"],
"errors": errors,
"skipped": skipped,
"sensitive_note": sensitive_note,
}

Expand Down Expand Up @@ -1455,6 +1511,26 @@ def safe_extract_tar(archive_path: Path, destination: Path) -> None:
shutil.copyfileobj(extracted, handle)


def validate_tar_members(archive_path: Path) -> dict[str, Any]:
with tarfile.open(archive_path, "r:*") as archive:
root = Path("/__archive_validation__")
for member in archive.getmembers():
target = (root / member.name).resolve()
if not is_relative_to(target, root):
return {
"ok": False,
"archive": str(archive_path),
"error": f"Archive member escapes extraction root: {member.name}",
}
if not member.isdir() and not member.isfile():
return {
"ok": False,
"archive": str(archive_path),
"error": f"Unsupported archive member type: {member.name}",
}
return {"ok": True, "archive": str(archive_path)}


def safe_extract_zip(archive_path: Path, destination: Path) -> None:
destination_resolved = destination.resolve()
with zipfile.ZipFile(archive_path) as archive:
Expand All @@ -1479,6 +1555,39 @@ def safe_extract_zip(archive_path: Path, destination: Path) -> None:
shutil.copyfileobj(source, handle)


def validate_zip_members(archive_path: Path) -> dict[str, Any]:
with zipfile.ZipFile(archive_path) as archive:
root = Path("/__archive_validation__")
for member in archive.infolist():
target = (root / member.filename).resolve()
if not is_relative_to(target, root):
return {
"ok": False,
"archive": str(archive_path),
"error": f"Archive member escapes extraction root: {member.filename}",
}
mode = member.external_attr >> 16
file_type = stat.S_IFMT(mode)
if member.is_dir():
continue
if file_type and file_type != stat.S_IFREG:
return {
"ok": False,
"archive": str(archive_path),
"error": f"Unsupported archive member type: {member.filename}",
}
return {"ok": True, "archive": str(archive_path)}


def validate_archive_members(archive_path: Path) -> dict[str, Any]:
try:
if archive_path.suffix == ".zip":
return validate_zip_members(archive_path)
return validate_tar_members(archive_path)
except Exception as exc:
return {"ok": False, "archive": str(archive_path), "error": str(exc)}


def locate_backup_dir(path: Path) -> Path:
if (path / "manifest.json").exists() and (path / "files").is_dir():
return path
Expand Down Expand Up @@ -1729,6 +1838,13 @@ def count_files_under(path: Path) -> int:
return sum(1 for candidate in path.rglob("*") if candidate.is_file())


def read_json_file(path: Path) -> dict[str, Any] | list[Any] | None:
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None


def backup_list_item(manifest: Path, data: dict[str, Any]) -> dict[str, Any]:
counts = data.get("counts") if isinstance(data.get("counts"), dict) else {}
entries = data.get("entries") if isinstance(data.get("entries"), list) else []
Expand Down Expand Up @@ -1756,11 +1872,28 @@ def backup_list_item(manifest: Path, data: dict[str, Any]) -> dict[str, Any]:
if errors is None:
errors = 0

status = "ok" if schema_version == 1 else "legacy_manifest"
archive_candidates = [
manifest.parent.with_name(f"{manifest.parent.name}.tar.gz"),
manifest.parent.with_name(f"{manifest.parent.name}.zip"),
]
archive_validation = [
validate_archive_members(path)
for path in archive_candidates
if path.exists()
]
sqlite_integrity = read_json_file(manifest.parent / "sqlite-integrity-check.json")
sqlite_failed = (
isinstance(sqlite_integrity, list)
and any(not check.get("ok") for check in sqlite_integrity if isinstance(check, dict))
)

status = "ok" if schema_version == 1 else "legacy_manifest"
if schema_version == 1 and (
errors > 0
or sqlite_failed
or any(not item.get("ok") for item in archive_validation)
):
status = "failed"
item: dict[str, Any] = {
"backup_dir": str(manifest.parent),
"status": status,
Expand All @@ -1772,6 +1905,10 @@ def backup_list_item(manifest: Path, data: dict[str, Any]) -> dict[str, Any]:
"errors": errors,
"archives": [str(path) for path in archive_candidates if path.exists()],
}
if archive_validation:
item["archive_validation"] = archive_validation
if sqlite_failed:
item["sqlite_integrity_ok"] = False
if status == "legacy_manifest":
item["legacy_summary"] = {
"generated_at": data.get("generated_at"),
Expand Down
Loading
Loading