diff --git a/README.md b/README.md index f16e612b..bee77765 100644 --- a/README.md +++ b/README.md @@ -150,6 +150,18 @@ See more options and usage details with: gitingest --help ``` +### Extracting files from a digest + +You can also reconstruct the source files from a digest using the `extract` command: + +```bash +# Extract files from digest.txt to the current directory +gitingest extract digest.txt + +# Extract files to a specific output directory +gitingest extract digest.txt --output ./restored-project +``` + ## 🐍 Python package usage ```python diff --git a/src/gitingest/__init__.py b/src/gitingest/__init__.py index 75f3ea41..8e603934 100644 --- a/src/gitingest/__init__.py +++ b/src/gitingest/__init__.py @@ -1,5 +1,6 @@ """Gitingest: A package for ingesting data from Git repositories.""" from gitingest.entrypoint import ingest, ingest_async +from gitingest.extract import extract -__all__ = ["ingest", "ingest_async"] +__all__ = ["ingest", "ingest_async", "extract"] diff --git a/src/gitingest/__main__.py b/src/gitingest/__main__.py index ea01dae2..2f619633 100644 --- a/src/gitingest/__main__.py +++ b/src/gitingest/__main__.py @@ -11,6 +11,7 @@ from gitingest.config import MAX_FILE_SIZE, OUTPUT_FILE_NAME from gitingest.entrypoint import ingest_async +from gitingest.extract import extract # Import logging configuration first to intercept all logging from gitingest.utils.logging_config import get_logger @@ -31,7 +32,34 @@ class _CLIArgs(TypedDict): output: str | None -@click.command() +class DefaultGroup(click.Group): + """A Click Group that invokes a default command if a subcommand is not found.""" + + def parse_args(self, ctx, args): + if args and args[0] in ["--help", "-h"]: + return super().parse_args(ctx, args) + + if not args or args[0] not in self.commands: + # Default to ingest command + # Insert "ingest" as the first argument + args = ["ingest"] + args + + return super().parse_args(ctx, args) + + +@click.group(cls=DefaultGroup) +def main() -> None: + """Gitingest CLI tool. + + The default command is 'ingest', which analyzes a directory or repository. + Use 'gitingest ingest --help' to see options for the default command. + + To extract files from a digest, use 'gitingest extract'. + """ + pass + + +@main.command(name="ingest") @click.argument("source", type=str, default=".") @click.option( "--max-size", @@ -76,7 +104,7 @@ class _CLIArgs(TypedDict): default=None, help="Output file path (default: digest.txt in current directory). Use '-' for stdout.", ) -def main(**cli_kwargs: Unpack[_CLIArgs]) -> None: +def ingest_command(**cli_kwargs: Unpack[_CLIArgs]) -> None: """Run the CLI entry point to analyze a repo / directory and dump its contents. Parameters @@ -114,6 +142,42 @@ def main(**cli_kwargs: Unpack[_CLIArgs]) -> None: asyncio.run(_async_main(**cli_kwargs)) +@main.command(name="extract") +@click.argument("digest_file", type=click.Path(exists=True, dir_okay=False)) +@click.option( + "--output", + "-o", + default=".", + type=click.Path(file_okay=False, dir_okay=True), + help="Output directory where files will be extracted.", +) +def extract_command(digest_file: str, output: str) -> None: + """Extract files from a gitingest digest file. + + Parameters + ---------- + digest_file : str + Path to the digest file. + output : str + Directory where extracted files will be saved. + + Examples + -------- + Extract files from digest.txt to the current directory: + $ gitingest extract digest.txt + + Extract files to a specific output directory: + $ gitingest extract digest.txt --output ./restored-project + + """ + try: + extract(digest_file, output) + click.echo(f"Successfully extracted files to '{output}'") + except Exception as exc: + click.echo(f"Error extracting files: {exc}", err=True) + raise click.Abort from exc + + async def _async_main( source: str, *, diff --git a/src/gitingest/extract.py b/src/gitingest/extract.py new file mode 100644 index 00000000..de92bcfd --- /dev/null +++ b/src/gitingest/extract.py @@ -0,0 +1,114 @@ +"""Module for extracting files from a gitingest digest.""" + +from __future__ import annotations + +import os +import re +from pathlib import Path + +from gitingest.schemas.filesystem import EMPTY_FILE, SEPARATOR +from gitingest.utils.logging_config import get_logger + +logger = get_logger(__name__) + + +def extract(digest_path: str | Path, output_dir: str | Path = ".") -> None: + """Extract files from a gitingest digest file. + + Parameters + ---------- + digest_path : str | Path + Path to the digest file. + output_dir : str | Path + Directory where extracted files will be saved. + + """ + digest_path = Path(digest_path) + output_dir = Path(output_dir) + + if not digest_path.exists(): + raise FileNotFoundError(f"Digest file not found: {digest_path}") + + logger.info("Reading digest file", extra={"digest_path": str(digest_path)}) + with digest_path.open("r", encoding="utf-8") as f: + content = f.read() + + # Create the output directory if it doesn't exist + output_dir.mkdir(parents=True, exist_ok=True) + + # Regex to identify file blocks + # Format: + # ================================================ + # FILE: path/to/file + # ================================================ + # content... + separator_pattern = re.escape(SEPARATOR) + pattern = re.compile( + rf"^{separator_pattern}\n(FILE|SYMLINK): (.+)\n{separator_pattern}\n", + re.MULTILINE, + ) + + matches = list(pattern.finditer(content)) + + if not matches: + logger.warning("No files found in the digest.") + return + + logger.info(f"Found {len(matches)} files to extract.") + + for i, match in enumerate(matches): + node_type = match.group(1) + path_info = match.group(2).strip() + + # Calculate content range + start_idx = match.end() + end_idx = matches[i + 1].start() if i + 1 < len(matches) else len(content) + + # Extract content and remove trailing newlines added during ingestion + file_content = content[start_idx:end_idx] + # The ingestion process adds "\n\n" to content_string and joins with "\n", so we expect 3 newlines. + if file_content.endswith("\n\n\n"): + file_content = file_content[:-3] + + if node_type == "SYMLINK": + # SYMLINK: source -> target + if " -> " in path_info: + link_path_str, target_path_str = path_info.split(" -> ", 1) + link_full_path = output_dir / link_path_str + + # Ensure parent dir exists + link_full_path.parent.mkdir(parents=True, exist_ok=True) + + # Create symlink + # We need to be careful with existing files + if link_full_path.exists() or link_full_path.is_symlink(): + link_full_path.unlink() + + try: + os.symlink(target_path_str, link_full_path) + logger.debug( + f"Created symlink: {link_full_path} -> {target_path_str}" + ) + except OSError as e: + logger.error(f"Failed to create symlink {link_full_path}: {e}") + else: + logger.warning(f"Invalid symlink format: {path_info}") + + else: + # FILE: path + # path_info is the file path + target_file_path = output_dir / path_info + + # Ensure parent dir exists + target_file_path.parent.mkdir(parents=True, exist_ok=True) + + try: + if file_content == EMPTY_FILE: + file_content = "" + with target_file_path.open("w", encoding="utf-8") as f: + f.write(file_content) + logger.debug(f"Extracted: {target_file_path}") + except OSError as e: + logger.error(f"Failed to write file {target_file_path}: {e}") + + logger.info("Extraction complete.") diff --git a/src/gitingest/schemas/filesystem.py b/src/gitingest/schemas/filesystem.py index cc66e7b1..70713114 100644 --- a/src/gitingest/schemas/filesystem.py +++ b/src/gitingest/schemas/filesystem.py @@ -15,7 +15,7 @@ from pathlib import Path SEPARATOR = "=" * 48 # Tiktoken, the tokenizer openai uses, counts 2 tokens if we have more than 48 - +EMPTY_FILE = "[Empty file]" class FileSystemNodeType(Enum): """Enum representing the type of a file system node (directory or file).""" @@ -140,7 +140,7 @@ def content(self) -> str: # pylint: disable=too-many-return-statements return "Error reading file" if chunk == b"": - return "[Empty file]" + return EMPTY_FILE if not _decodes(chunk, "utf-8"): return "[Binary file]" diff --git a/tests/test_cli.py b/tests/test_cli.py index bc08eb15..80621bde 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -90,6 +90,34 @@ def test_cli_with_stdout_output() -> None: output_file.unlink() +def test_cli_extract_file(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Test the extract command.""" + monkeypatch.chdir(tmp_path) + + # Create a dummy digest file + digest_content = ( + "Directory structure:\n" + "└── test_file.py\n\n" + "================================================\n" + "FILE: test_file.py\n" + "================================================\n" + "print('hello world')\n\n\n" + ) + digest_file = tmp_path / "digest.txt" + digest_file.write_text(digest_content, encoding="utf-8") + + # Run extract + result = _invoke_isolated_cli_runner(["extract", str(digest_file), "-o", "."]) + + assert result.exit_code == 0, result.stderr + assert "Successfully extracted files" in result.stdout + + # Check if file was extracted + extracted_file = tmp_path / "test_file.py" + assert extracted_file.exists() + assert extracted_file.read_text(encoding="utf-8") == "print('hello world')" + + def _invoke_isolated_cli_runner(args: list[str]) -> Result: """Return a ``CliRunner`` that keeps ``stderr`` separate on Click 8.0-8.1.""" kwargs = {} diff --git a/tests/test_extract.py b/tests/test_extract.py new file mode 100644 index 00000000..b0de15f0 --- /dev/null +++ b/tests/test_extract.py @@ -0,0 +1,137 @@ +"""Unit tests for the gitingest.extract module.""" + +from __future__ import annotations + +import os +import pytest +from pathlib import Path + +from gitingest.extract import extract +from gitingest.schemas.filesystem import SEPARATOR + + +def test_extract_basic_file(tmp_path: Path) -> None: + """Test basic extraction of a single text file.""" + digest_content = ( + "Directory structure:\n" + "└── file1.txt\n\n" + f"{SEPARATOR}\n" + "FILE: file1.txt\n" + f"{SEPARATOR}\n" + "Hello, World!\n\n\n" + ) + digest_file = tmp_path / "test_digest.txt" + digest_file.write_text(digest_content, encoding="utf-8") + + output_dir = tmp_path / "extracted_output" + extract(digest_file, output_dir) + + extracted_file = output_dir / "file1.txt" + assert extracted_file.exists() + assert extracted_file.read_text(encoding="utf-8") == "Hello, World!" + + +def test_extract_to_specified_directory(tmp_path: Path) -> None: + """Test extraction to a custom output directory.""" + digest_content = ( + "Directory structure:\n" + "└── sub/file.txt\n\n" + f"{SEPARATOR}\n" + "FILE: sub/file.txt\n" + f"{SEPARATOR}\n" + "Content in subfolder.\n\n\n" + ) + digest_file = tmp_path / "custom_digest.txt" + digest_file.write_text(digest_content, encoding="utf-8") + + output_dir = tmp_path / "my_custom_output" + extract(digest_file, output_dir) + + extracted_file = output_dir / "sub" / "file.txt" + assert extracted_file.exists() + assert extracted_file.read_text(encoding="utf-8") == "Content in subfolder." + + +def test_extract_empty_file(tmp_path: Path) -> None: + """Test extraction of an empty file placeholder.""" + digest_content = ( + "Directory structure:\n" + "└── empty.txt\n\n" + f"{SEPARATOR}\n" + "FILE: empty.txt\n" + f"{SEPARATOR}\n" + "[Empty file]\n\n\n" + ) + digest_file = tmp_path / "empty_digest.txt" + digest_file.write_text(digest_content, encoding="utf-8") + + output_dir = tmp_path / "output_empty" + extract(digest_file, output_dir) + + extracted_file = output_dir / "empty.txt" + assert extracted_file.exists() + assert extracted_file.read_text(encoding="utf-8") == "" + + +def test_extract_binary_file_placeholder(tmp_path: Path) -> None: + """Test extraction of a binary file placeholder.""" + digest_content = ( + "Directory structure:\n" + "└── image.png\n\n" + f"{SEPARATOR}\n" + "FILE: image.png\n" + f"{SEPARATOR}\n" + "[Binary file]\n\n\n" + ) + digest_file = tmp_path / "binary_digest.txt" + digest_file.write_text(digest_content, encoding="utf-8") + + output_dir = tmp_path / "output_binary" + extract(digest_file, output_dir) + + extracted_file = output_dir / "image.png" + assert extracted_file.exists() + assert extracted_file.read_text(encoding="utf-8") == "[Binary file]" + + +def test_extract_symlink(tmp_path: Path) -> None: + """Test extraction of a symlink.""" + # Create a target file first + target_file = tmp_path / "target.txt" + target_file.write_text("This is the target.", encoding="utf-8") + + digest_content = ( + "Directory structure:\n" + "├── target.txt\n" + "└── link.txt -> target.txt\n\n" + f"{SEPARATOR}\n" + "FILE: target.txt\n" + f"{SEPARATOR}\n" + "This is the target.\n\n\n" + f"{SEPARATOR}\n" + "SYMLINK: link.txt -> target.txt\n" + f"{SEPARATOR}\n" + "\n\n" # Symlinks have empty content in the digest + ) + digest_file = tmp_path / "symlink_digest.txt" + digest_file.write_text(digest_content, encoding="utf-8") + + output_dir = tmp_path / "output_symlink" + extract(digest_file, output_dir) + + extracted_symlink = output_dir / "link.txt" + extracted_target = output_dir / "target.txt" + + assert extracted_target.exists() + assert extracted_target.read_text() == "This is the target." + assert extracted_symlink.is_symlink() + assert os.readlink(extracted_symlink) == str(Path("target.txt")) # Symlink target is relative + + +def test_extract_file_not_found(tmp_path: Path) -> None: + """Test that FileNotFoundError is raised for a missing digest file.""" + non_existent_digest = tmp_path / "non_existent.txt" + output_dir = tmp_path / "output_error" + + with pytest.raises(FileNotFoundError, match="Digest file not found"): + extract(non_existent_digest, output_dir)