diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..e41c54a --- /dev/null +++ b/.dockerignore @@ -0,0 +1,12 @@ +.git +.github +.pytest_cache +.venv +venv +__pycache__ +*.pyc +.env +runtime +**/data/index +**/data/trees +**/data/output diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..a8a2f40 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,23 @@ +name: tests + +on: + pull_request: + push: + branches: + - main + +jobs: + pytest: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/setup-uv@v5 + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + - name: Install + run: uv sync --all-extras --group test + - name: Check whitespace + run: git diff --check + - name: Run tests + run: uv run --all-extras --group test pytest -q diff --git a/DocComparator/.env.example b/DocComparator/.env.example index beb177c..dfe2631 100644 --- a/DocComparator/.env.example +++ b/DocComparator/.env.example @@ -18,3 +18,6 @@ LLAMA_CLOUD_API_KEY=your_llama_cloud_api_key_here # Optional: bounded parallel LLM comparisons per selected Doc 1 section # DC_COMPARE_CONCURRENCY=3 + +# Optional: bounded parallel selected-section pipeline +# DC_SECTION_CONCURRENCY=2 diff --git a/DocComparator/README.md b/DocComparator/README.md index 4d32119..b6ad658 100644 --- a/DocComparator/README.md +++ b/DocComparator/README.md @@ -178,6 +178,7 @@ All configuration is centralized in `src/pprag_doc_comparator/config.py`. Overri | `DC_EMBEDDING_BATCH_SIZE` | `20` | Number of chunks embedded per Gemini request during indexing | | `DC_EMBEDDING_BATCH_DELAY` | `1` | Seconds to wait between embedding batches during indexing | | `DC_COMPARE_CONCURRENCY` | `3` | Maximum parallel LLM section comparisons per selected Doc 1 section | +| `DC_SECTION_CONCURRENCY` | `2` | Maximum selected Doc 1 sections cross-retrieved and compared in parallel | ### Indexing Throughput @@ -194,6 +195,8 @@ DocComparator compares each selected Doc 1 section against up to bounded concurrency controlled by `DC_COMPARE_CONCURRENCY`, while preserving input order in the final report. Lower the value if you hit LLM rate limits; increase it if your quota allows more parallel requests. +The selected-section pipeline can also run multiple Doc 1 sections concurrently +with `DC_SECTION_CONCURRENCY`; output ordering remains deterministic. --- diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..81deaca --- /dev/null +++ b/Dockerfile @@ -0,0 +1,21 @@ +FROM python:3.11-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PIP_NO_CACHE_DIR=1 + +WORKDIR /app + +RUN pip install --no-cache-dir uv + +COPY pyproject.toml uv.lock README.md ./ +COPY src ./src +COPY Text-Only ./Text-Only +COPY MultiModal ./MultiModal +COPY DocComparator ./DocComparator + +RUN uv sync --all-extras --group test + +EXPOSE 8501 + +CMD ["uv", "run", "pprag", "doctor"] diff --git a/README.md b/README.md index 14cd6c1..6b97b48 100644 --- a/README.md +++ b/README.md @@ -152,6 +152,32 @@ All include sample data so you can clone, build the index, and start exploring i --- +## Runtime Checks and Deployment + +Run a local readiness check without loading optional application stacks: + +```bash +pprag doctor +pprag doctor --json +``` + +Run local runtime evaluations for cache and concurrency behavior: + +```bash +pprag eval runtime +``` + +Container scaffolding is included for hosted pilots: + +```bash +docker compose up doc-comparator +``` + +Use `deploy/production.env.example` as a template for private runtime settings. +Do not commit real secrets. + +--- + ## Author **Partha Sarkar** diff --git a/deploy/production.env.example b/deploy/production.env.example new file mode 100644 index 0000000..13c761d --- /dev/null +++ b/deploy/production.env.example @@ -0,0 +1,22 @@ +# Copy to a private env file before use. Do not commit real secrets. +GOOGLE_API_KEY= +LLAMA_CLOUD_API_KEY= + +# Runtime roots +PPRAG_PROJECT_ROOT=/app + +# Explicit trust is required before loading locally generated FAISS metadata. +PPRAG_TRUST_LOCAL_FAISS=1 +DC_TRUST_FAISS_INDEX=1 +PP_TRUST_FAISS_INDEX=1 + +# Metadata-only audit log. Prompts and document text are not written by default. +PPRAG_AUDIT_LOG=/app/runtime/audit/pprag.jsonl + +# Throughput controls +PP_EMBEDDING_BATCH_SIZE=20 +PP_EMBEDDING_BATCH_DELAY=1 +DC_EMBEDDING_BATCH_SIZE=20 +DC_EMBEDDING_BATCH_DELAY=1 +DC_COMPARE_CONCURRENCY=3 +DC_SECTION_CONCURRENCY=2 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..22aff3f --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,22 @@ +services: + doc-comparator: + build: . + command: uv run pprag compare serve --server.address 0.0.0.0 --server.port 8501 + env_file: + - ./deploy/production.env.example + ports: + - "8501:8501" + volumes: + - ./DocComparator/data:/app/DocComparator/data + - ./runtime/audit:/app/runtime/audit + + multimodal: + build: . + command: uv run pprag multimodal serve --server.address 0.0.0.0 --server.port 8502 + env_file: + - ./deploy/production.env.example + ports: + - "8502:8502" + volumes: + - ./MultiModal/data:/app/MultiModal/data + - ./runtime/audit:/app/runtime/audit diff --git a/src/pprag/audit.py b/src/pprag/audit.py new file mode 100644 index 0000000..9f38d37 --- /dev/null +++ b/src/pprag/audit.py @@ -0,0 +1,41 @@ +"""Optional JSONL audit-event logging for enterprise deployments.""" +from __future__ import annotations + +import json +import os +import time +import uuid +from pathlib import Path +from typing import Any + + +def audit_log_path() -> str | None: + """Return the configured audit log path, if audit logging is enabled.""" + return os.getenv("PPRAG_AUDIT_LOG") or os.getenv("DC_AUDIT_LOG") + + +def new_event_id() -> str: + """Return a short unique id for correlating runtime events.""" + return uuid.uuid4().hex[:16] + + +def write_audit_event(event_type: str, **fields: Any) -> None: + """Append a single metadata-only audit event when configured. + + This intentionally avoids prompts and document text. Callers should pass + ids, counts, timings, ratings, and status values only. + """ + path = audit_log_path() + if not path: + return + + payload = { + "event_id": new_event_id(), + "event_type": event_type, + "timestamp": time.time(), + **fields, + } + log_path = Path(path).expanduser() + log_path.parent.mkdir(parents=True, exist_ok=True) + with open(log_path, "a", encoding="utf-8") as fh: + fh.write(json.dumps(payload, sort_keys=True, default=str) + "\n") diff --git a/src/pprag/cli.py b/src/pprag/cli.py index da5a534..1c407e1 100644 --- a/src/pprag/cli.py +++ b/src/pprag/cli.py @@ -99,6 +99,12 @@ def build_parser() -> argparse.ArgumentParser: compare_serve = compare_sub.add_parser("serve", help="Start the DocComparator Streamlit UI") compare_serve.add_argument("args", nargs=argparse.REMAINDER) + doctor = subparsers.add_parser("doctor", help="Check local runtime readiness") + doctor.add_argument("args", nargs=argparse.REMAINDER) + + eval_parser = subparsers.add_parser("eval", help="Run local runtime evaluations") + eval_parser.add_argument("args", nargs=argparse.REMAINDER) + return parser @@ -159,6 +165,12 @@ def main(argv: Sequence[str] | None = None) -> int: if args.command in ("ui", "serve"): return _run_streamlit("compare", "pprag_doc_comparator", args.args) + if args.modality == "doctor": + return _run_module("full", "pprag", "doctor", args.args) + + if args.modality == "eval": + return _run_module("full", "pprag", "eval", args.args) + except MissingExtraError as exc: parser.exit(2, f"{exc}\n") diff --git a/src/pprag/config_validation.py b/src/pprag/config_validation.py new file mode 100644 index 0000000..f040ec3 --- /dev/null +++ b/src/pprag/config_validation.py @@ -0,0 +1,81 @@ +"""Runtime configuration checks for local and hosted deployments.""" +from __future__ import annotations + +import importlib.util +import os +from pathlib import Path +from typing import Any + + +def env_status(name: str, *, required: bool = False) -> dict[str, Any]: + """Return a redacted environment-variable status entry.""" + value = os.getenv(name) + ok = bool(value) or not required + return { + "name": name, + "required": required, + "present": bool(value), + "ok": ok, + } + + +def dependency_status(import_name: str, *, required: bool = False) -> dict[str, Any]: + """Return whether an importable dependency is installed.""" + present = importlib.util.find_spec(import_name) is not None + return { + "name": import_name, + "required": required, + "present": present, + "ok": present or not required, + } + + +def path_status(path: str | Path, *, create: bool = False) -> dict[str, Any]: + """Return whether a runtime path exists and is writable.""" + path = Path(path).expanduser() + if create: + try: + path.mkdir(parents=True, exist_ok=True) + except OSError: + pass + exists = path.exists() + writable = os.access(path, os.W_OK) if exists else os.access(path.parent, os.W_OK) + return { + "path": str(path), + "exists": exists, + "writable": writable, + "ok": exists and writable, + } + + +def validate_runtime(root: str | Path = ".") -> dict[str, Any]: + """Run lightweight deployment checks without importing optional stacks.""" + root = Path(root).resolve() + paths = [ + path_status(root / "Text-Only" / "data", create=False), + path_status(root / "MultiModal" / "data", create=False), + path_status(root / "DocComparator" / "data", create=False), + ] + env = [ + env_status("GOOGLE_API_KEY", required=False), + env_status("LLAMA_CLOUD_API_KEY", required=False), + env_status("PPRAG_TRUST_LOCAL_FAISS", required=False), + env_status("PPRAG_AUDIT_LOG", required=False), + env_status("DC_AUDIT_LOG", required=False), + ] + dependencies = [ + dependency_status("google.generativeai", required=False), + dependency_status("langchain_community", required=False), + dependency_status("faiss", required=False), + dependency_status("streamlit", required=False), + dependency_status("llama_cloud", required=False), + ] + + checks = [*paths, *env, *dependencies] + return { + "root": str(root), + "ok": all(item.get("ok", False) for item in checks if "ok" in item), + "paths": paths, + "environment": env, + "dependencies": dependencies, + } diff --git a/src/pprag/doctor.py b/src/pprag/doctor.py new file mode 100644 index 0000000..eef7099 --- /dev/null +++ b/src/pprag/doctor.py @@ -0,0 +1,34 @@ +"""CLI runtime checks for Proxy-Pointer deployments.""" +from __future__ import annotations + +import argparse +import json +from typing import Sequence + +from pprag.config_validation import validate_runtime + + +def main(argv: Sequence[str] | None = None) -> int: + parser = argparse.ArgumentParser(description="Check local Proxy-Pointer runtime readiness.") + parser.add_argument("--root", default=".", help="Repository or deployment root to inspect.") + parser.add_argument("--json", action="store_true", help="Print machine-readable JSON.") + args = parser.parse_args(argv) + + result = validate_runtime(args.root) + if args.json: + print(json.dumps(result, indent=2, sort_keys=True)) + else: + status = "OK" if result["ok"] else "WARN" + print(f"Runtime check: {status}") + print(f"Root: {result['root']}") + for group in ("paths", "environment", "dependencies"): + print(f"\n{group}:") + for item in result[group]: + name = item.get("name") or item.get("path") + marker = "ok" if item.get("ok") else "check" + print(f" [{marker}] {name}") + return 0 if result["ok"] else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/pprag/document_manifest.py b/src/pprag/document_manifest.py new file mode 100644 index 0000000..55a51c9 --- /dev/null +++ b/src/pprag/document_manifest.py @@ -0,0 +1,136 @@ +"""Document manifest and text-cache helpers for indexed Markdown files.""" +from __future__ import annotations + +import hashlib +import json +import logging +import os +from functools import lru_cache +from pathlib import Path +from typing import Any + + +MANIFEST_FILENAME = "document_manifest.json" + + +def document_content_hash(path: str | Path) -> str: + """Return the short content hash used in generated document ids.""" + digest = hashlib.sha256() + with open(path, "rb") as fh: + for chunk in iter(lambda: fh.read(1024 * 1024), b""): + digest.update(chunk) + return digest.hexdigest()[:12] + + +def document_id_for_path(path: str | Path) -> str: + """Return the stable document id for a Markdown file.""" + path = Path(path) + return f"{path.stem}_{document_content_hash(path)}" + + +def manifest_path_for_index(index_dir: str | Path) -> Path: + """Return the default manifest path for an index directory.""" + return Path(index_dir) / MANIFEST_FILENAME + + +def build_manifest_entries( + md_paths, + *, + trees_dir: str | Path | None = None, +) -> dict[str, dict[str, Any]]: + """Build manifest entries keyed by document id.""" + entries: dict[str, dict[str, Any]] = {} + tree_root = Path(trees_dir) if trees_dir is not None else None + + for raw_path in md_paths: + path = Path(raw_path) + if not path.exists(): + logging.warning("Skipping missing Markdown file in manifest: %s", path) + continue + stat = path.stat() + doc_id = document_id_for_path(path) + tree_path = tree_root / f"{path.stem}_structure.json" if tree_root else None + entries[doc_id] = { + "doc_id": doc_id, + "stem": path.stem, + "md_path": str(path), + "tree_path": str(tree_path) if tree_path else None, + "size": stat.st_size, + "mtime_ns": stat.st_mtime_ns, + } + + return entries + + +def write_document_manifest( + index_dir: str | Path, + entries: dict[str, dict[str, Any]], +) -> Path: + """Persist a document manifest next to the vector index.""" + manifest_path = manifest_path_for_index(index_dir) + manifest_path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "version": 1, + "documents": entries, + } + manifest_path.write_text(json.dumps(payload, indent=2, sort_keys=True), encoding="utf-8") + return manifest_path + + +@lru_cache(maxsize=16) +def _load_manifest_cached(manifest_path: str, mtime_ns: int, size: int): + with open(manifest_path, "r", encoding="utf-8") as fh: + payload = json.load(fh) + documents = payload.get("documents", {}) + return documents if isinstance(documents, dict) else {} + + +def load_document_manifest(index_dir: str | Path) -> dict[str, dict[str, Any]]: + """Load a manifest, invalidating the cache when the file changes.""" + manifest_path = manifest_path_for_index(index_dir) + try: + stat = manifest_path.stat() + except OSError: + return {} + try: + return dict(_load_manifest_cached(str(manifest_path), stat.st_mtime_ns, stat.st_size)) + except (OSError, json.JSONDecodeError, TypeError) as exc: + logging.warning("Unable to load document manifest %s: %s", manifest_path, exc) + return {} + + +def resolve_markdown_path_from_manifest( + doc_id: str, + *, + index_dir: str | Path, +) -> str | None: + """Resolve a document id to a Markdown path through the persisted manifest.""" + entry = load_document_manifest(index_dir).get(doc_id) + if not isinstance(entry, dict): + return None + md_path = entry.get("md_path") + if isinstance(md_path, str) and os.path.exists(md_path): + return md_path + return None + + +@lru_cache(maxsize=256) +def _read_text_lines_cached(path: str, mtime_ns: int, size: int): + with open(path, "r", encoding="utf-8") as fh: + return tuple(fh.readlines()) + + +def read_text_lines_cached(path: str | Path) -> list[str] | None: + """Read text lines with an mtime/size-aware cache.""" + try: + stat = os.stat(path) + return list(_read_text_lines_cached(str(path), stat.st_mtime_ns, stat.st_size)) + except OSError as exc: + logging.warning("Unable to read text file %s: %s", path, exc) + return None + + +def clear_document_caches() -> None: + """Clear manifest and text-line caches, mainly for tests and long-running apps.""" + _load_manifest_cached.cache_clear() + _read_text_lines_cached.cache_clear() diff --git a/src/pprag/eval.py b/src/pprag/eval.py new file mode 100644 index 0000000..19cb552 --- /dev/null +++ b/src/pprag/eval.py @@ -0,0 +1,67 @@ +"""Local evaluation helpers for runtime regressions.""" +from __future__ import annotations + +import argparse +import json +import tempfile +import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Sequence + +from pprag.document_manifest import clear_document_caches, read_text_lines_cached + + +def _run_parallel_units(work_items: int, workers: int, delay: float) -> float: + def unit(_): + time.sleep(delay) + return True + + started = time.perf_counter() + with ThreadPoolExecutor(max_workers=workers) as executor: + list(executor.map(unit, range(work_items))) + return time.perf_counter() - started + + +def runtime_eval() -> dict[str, float | int]: + """Run deterministic local runtime checks with no external services.""" + with tempfile.TemporaryDirectory() as tmp: + path = Path(tmp) / "fixture.md" + path.write_text("# Heading\n" + "line\n" * 100, encoding="utf-8") + clear_document_caches() + + started = time.perf_counter() + read_text_lines_cached(path) + cold_read = time.perf_counter() - started + + started = time.perf_counter() + read_text_lines_cached(path) + cached_read = time.perf_counter() - started + + serial = _run_parallel_units(work_items=4, workers=1, delay=0.03) + parallel = _run_parallel_units(work_items=4, workers=4, delay=0.03) + + return { + "markdown_cold_read_seconds": round(cold_read, 6), + "markdown_cached_read_seconds": round(cached_read, 6), + "section_serial_seconds": round(serial, 6), + "section_parallel_seconds": round(parallel, 6), + "section_speedup": round(serial / parallel, 2) if parallel else 0, + } + + +def main(argv: Sequence[str] | None = None) -> int: + parser = argparse.ArgumentParser(description="Run local Proxy-Pointer runtime evaluations.") + subparsers = parser.add_subparsers(dest="command", metavar="COMMAND") + subparsers.add_parser("runtime", help="Run local cache and concurrency checks.") + args = parser.parse_args(argv) + + if args.command in (None, "runtime"): + print(json.dumps(runtime_eval(), indent=2, sort_keys=True)) + return 0 + parser.error("unknown command") + return 2 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/pprag/gemini_embeddings.py b/src/pprag/gemini_embeddings.py index e30bf2e..36c0912 100644 --- a/src/pprag/gemini_embeddings.py +++ b/src/pprag/gemini_embeddings.py @@ -5,6 +5,8 @@ import time from collections.abc import Callable, Sequence +from pprag.model_runtime import is_rate_limit_error + def normalize_embedding_response(result, expected_count: int): """Return a list of embedding vectors from Gemini single or batch shapes.""" @@ -26,28 +28,6 @@ def normalize_embedding_response(result, expected_count: int): return vectors -def is_rate_limit_error(exc: Exception) -> bool: - """Return True when an SDK exception looks like quota or rate limiting.""" - status_code = ( - getattr(getattr(exc, "response", None), "status_code", None) - or getattr(exc, "status_code", None) - ) - if status_code == 429: - return True - - error_str = str(exc).lower() - error_code = str(getattr(exc, "code", "")).lower() - return ( - "429" in error_str - or "resourceexhausted" in error_str - or "resource exhausted" in error_str - or "quota exceeded" in error_str - or "quota_exceeded" in error_code - or "too_many_requests" in error_code - or "rate limit" in error_str - ) - - def embed_content_with_retry( genai_module, *, diff --git a/src/pprag/model_runtime.py b/src/pprag/model_runtime.py new file mode 100644 index 0000000..3631761 --- /dev/null +++ b/src/pprag/model_runtime.py @@ -0,0 +1,117 @@ +"""Shared model-call runtime helpers.""" +from __future__ import annotations + +import json +import logging +import re +import time +from collections.abc import Callable +from typing import Any + + +def is_rate_limit_error(exc: Exception) -> bool: + """Return True when an SDK exception looks like quota or throttling.""" + status_code = ( + getattr(getattr(exc, "response", None), "status_code", None) + or getattr(exc, "status_code", None) + ) + if status_code == 429: + return True + + error_str = str(exc).lower() + error_code = str(getattr(exc, "code", "")).lower() + return ( + "429" in error_str + or "resourceexhausted" in error_str + or "resource exhausted" in error_str + or "quota exceeded" in error_str + or "quota_exceeded" in error_code + or "too_many_requests" in error_code + or "rate limit" in error_str + ) + + +def call_with_retry( + operation: Callable[[], Any], + *, + operation_name: str, + max_retries: int = 5, + base_delay: float = 2.0, + sleep: Callable[[float], None] = time.sleep, +): + """Run an operation with exponential backoff for retryable throttling.""" + for attempt in range(max_retries): + try: + return operation() + except Exception as exc: + if not is_rate_limit_error(exc) or attempt == max_retries - 1: + raise + delay = base_delay * (2 ** attempt) + logging.warning("%s rate limited. Retrying in %ss...", operation_name, delay) + sleep(delay) + + raise RuntimeError(f"{operation_name} failed without returning or raising") + + +def generate_content_with_retry( + model, + prompt, + *, + generation_config=None, + timeout: float | None = None, + max_retries: int = 5, + base_delay: float = 2.0, + sleep: Callable[[float], None] = time.sleep, +): + """Call a generation model with consistent retry and optional timeout.""" + def _call(): + kwargs = {} + if generation_config is not None: + kwargs["generation_config"] = generation_config + if timeout is not None: + kwargs["request_options"] = {"timeout": timeout} + try: + return model.generate_content(prompt, **kwargs) + except TypeError: + kwargs.pop("request_options", None) + return model.generate_content(prompt, **kwargs) + + return call_with_retry( + _call, + operation_name="Generation request", + max_retries=max_retries, + base_delay=base_delay, + sleep=sleep, + ) + + +def response_text(response) -> str: + """Extract text from an SDK response object.""" + text = getattr(response, "text", "") + return "" if text is None else str(text).strip() + + +def strip_json_fence(text: str) -> str: + """Remove common markdown fences around JSON payloads.""" + stripped = text.strip() + if not stripped.startswith("```"): + return stripped + lines = stripped.splitlines() + if lines and lines[0].strip().startswith("```") and lines[-1].strip() == "```": + return "\n".join(lines[1:-1]).strip() + return stripped + + +def parse_json_object(text: str) -> dict[str, Any]: + """Parse a JSON object, including common fenced output shapes.""" + cleaned = strip_json_fence(text) + try: + parsed = json.loads(cleaned) + except json.JSONDecodeError: + match = re.search(r"\{.*\}", cleaned, re.DOTALL) + if not match: + raise + parsed = json.loads(match.group(0)) + if not isinstance(parsed, dict): + raise ValueError(f"Expected JSON object, received {type(parsed).__name__}") + return parsed diff --git a/src/pprag_doc_comparator/app.py b/src/pprag_doc_comparator/app.py index 34175f4..482dab8 100644 --- a/src/pprag_doc_comparator/app.py +++ b/src/pprag_doc_comparator/app.py @@ -30,16 +30,13 @@ from pprag_doc_comparator.validation.criteria_validator import ( validate_criteria, build_section_selection_query, - build_cross_retrieval_query, build_comparison_prompt, ) from pprag_doc_comparator.comparison.section_selector import ( select_relevant_sections, load_full_section_text, resolve_md_path_for_doc_id ) -from pprag_doc_comparator.comparison.cross_retriever import retrieve_matching_sections -from pprag_doc_comparator.comparison.section_comparator import ( - compare_section_matches, extract_rating -) +from pprag_doc_comparator.comparison.section_comparator import extract_rating +from pprag_doc_comparator.comparison.pipeline import compare_doc1_sections from pprag_doc_comparator.report.report_builder import ( build_executive_summary, build_section_block, assemble_report @@ -478,55 +475,39 @@ def update_progress(pct, msg): comparison_base_pct = 35 comparison_range_pct = 55 # 35% to 90% - report_so_far = "" - - for sec_idx, doc1_sec in enumerate(doc1_sections): - sec_num = sec_idx + 1 - sec_pct = comparison_base_pct + int( - (sec_idx / num_sections) * comparison_range_pct - ) + def on_section_complete(completed, total, result): + pct = comparison_base_pct + int((completed / total) * comparison_range_pct) update_progress( - sec_pct, - f"📋 Comparing section {sec_num}/{num_sections}: {doc1_sec.get('title', '')}" + pct, + f"📋 Compared section {completed}/{total}: {result['doc1_section'].get('title', '')}" ) - compared_node_ids.add(doc1_sec.get("node_id")) + update_progress(35, f"📋 Comparing {num_sections} selected sections...") + pipeline_results = compare_doc1_sections( + vector_db=vector_db, + doc2_id=doc2_id, + doc1_sections=doc1_sections, + comparison_prompt=comparison_prompt, + criteria=criteria, + doc_type=doc_type, + doc1_name=doc1_name, + doc2_name=doc2_name, + max_doc2_matches=MAX_DOC2_MATCHES, + progress_callback=on_section_complete, + ) - # Cross-retrieve Doc 2 matches - cross_query = build_cross_retrieval_query( - doc1_sec.get("breadcrumb", ""), - doc1_sec.get("full_text", ""), - criteria - ) + report_so_far = "" - doc2_matches = retrieve_matching_sections( - vector_db, doc2_id, cross_query, - doc1_breadcrumb=doc1_sec.get("breadcrumb", ""), - criteria=criteria, doc_type=doc_type, - k_final=MAX_DOC2_MATCHES - ) + for section_result in pipeline_results: + doc1_sec = section_result["doc1_section"] + compared_node_ids.add(doc1_sec.get("node_id")) + doc2_matches = section_result["doc2_matches"] if not doc2_matches: - logging.info(f" [SKIP] No Doc 2 equivalents found for: {doc1_sec.get('title')}") continue - # Compare Doc 2 matches concurrently with bounded workers. The helper - # preserves input order so report rendering stays deterministic. - update_progress( - sec_pct + int(comparison_range_pct / num_sections), - f"📋 Comparing {len(doc2_matches)} match(es) for {doc1_sec.get('title', '')}..." - ) - raw_results = compare_section_matches( - comparison_prompt, - doc1_sec, - doc2_matches, - doc1_name=doc1_name, - doc2_name=doc2_name, - doc_type=doc_type, - ) - comparison_results = [] - for result, doc2_match in zip(raw_results, doc2_matches): + for result, doc2_match in zip(section_result["comparison_results"], doc2_matches): rating = extract_rating(result) comparison_results.append((result, doc2_match)) all_ratings[rating] = all_ratings.get(rating, 0) + 1 diff --git a/src/pprag_doc_comparator/comparison/cross_retriever.py b/src/pprag_doc_comparator/comparison/cross_retriever.py index 2f6143b..76fd92a 100644 --- a/src/pprag_doc_comparator/comparison/cross_retriever.py +++ b/src/pprag_doc_comparator/comparison/cross_retriever.py @@ -17,10 +17,8 @@ from pprag_doc_comparator.comparison.cross_retriever import retrieve_matching_sections """ import os -import sys import re import logging -import time from pprag_doc_comparator.config import DOCUMENTS_DIR, LLM_MODEL from pprag_doc_comparator.validation.criteria_validator import build_cross_reranker_prompt @@ -28,10 +26,10 @@ load_full_section_text, resolve_md_path_for_doc_id, ) +from pprag.model_runtime import generate_content_with_retry, parse_json_object import google.generativeai as genai import typing -import json class RankingResponse(typing.TypedDict): ranked_indices: list[int] @@ -148,39 +146,25 @@ def _rerank_candidates(candidates, doc1_breadcrumb, criteria, doc_type, k_final) try: model = genai.GenerativeModel(LLM_MODEL) - max_retries = 5 - base_delay = 2.0 - response = None - for attempt in range(max_retries): - try: - response = model.generate_content( - prompt, - generation_config=genai.GenerationConfig( - temperature=0.1, - max_output_tokens=2048, - response_mime_type="application/json", - response_schema=RankingResponse - ) - ) - break - except Exception as e: - if "429" in str(e) or "Resource exhausted" in str(e): - if attempt == max_retries - 1: - raise e - delay = base_delay * (2 ** attempt) - logging.warning(f"Rate limit hit during cross-retrieval. Retrying in {delay}s...") - time.sleep(delay) - else: - raise e + response = generate_content_with_retry( + model, + prompt, + generation_config=genai.GenerationConfig( + temperature=0.1, + max_output_tokens=2048, + response_mime_type="application/json", + response_schema=RankingResponse + ), + ) response_text = response.text.strip() logging.info(f" [CROSS-RERANKER] Raw response: {response_text[:200]}") ranked_ids = [] try: - ranking_data = json.loads(response_text) + ranking_data = parse_json_object(response_text) ranked_ids = [str(idx) for idx in ranking_data.get("ranked_indices", [])] - except (json.JSONDecodeError, TypeError): + except (ValueError, TypeError): # Powerful regex fallback for unformatted verbose responses potential_nums = re.findall(r'\d+', response_text) ranked_ids = [num for num in potential_nums if num in index_map] diff --git a/src/pprag_doc_comparator/comparison/pipeline.py b/src/pprag_doc_comparator/comparison/pipeline.py new file mode 100644 index 0000000..df2c9b0 --- /dev/null +++ b/src/pprag_doc_comparator/comparison/pipeline.py @@ -0,0 +1,180 @@ +"""Core DocComparator comparison pipeline helpers.""" +from __future__ import annotations + +import logging +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Callable, TypedDict + +from pprag.audit import write_audit_event +from pprag_doc_comparator.config import ( + MAX_DOC2_MATCHES, + SECTION_CONCURRENCY, +) +from pprag_doc_comparator.comparison.cross_retriever import retrieve_matching_sections +from pprag_doc_comparator.comparison.section_comparator import compare_section_matches +from pprag_doc_comparator.validation.criteria_validator import build_cross_retrieval_query + + +class SectionComparisonResult(TypedDict): + section_index: int + doc1_section: dict + doc2_matches: list[dict] + comparison_results: list[dict] + + +def compare_doc1_section( + *, + vector_db, + doc2_id: str, + doc1_section: dict, + section_index: int, + comparison_prompt: str, + criteria: str, + doc_type: str, + doc1_name: str, + doc2_name: str, + max_doc2_matches: int = MAX_DOC2_MATCHES, +) -> SectionComparisonResult: + """Retrieve Doc 2 matches for one Doc 1 section and compare them.""" + started = time.perf_counter() + write_audit_event( + "doc_comparator.section.started", + doc2_id=doc2_id, + section_index=section_index, + doc1_node_id=doc1_section.get("node_id"), + doc1_title=doc1_section.get("title"), + ) + cross_query = build_cross_retrieval_query( + doc1_section.get("breadcrumb", ""), + doc1_section.get("full_text", ""), + criteria, + ) + + doc2_matches = retrieve_matching_sections( + vector_db, + doc2_id, + cross_query, + doc1_breadcrumb=doc1_section.get("breadcrumb", ""), + criteria=criteria, + doc_type=doc_type, + k_final=max_doc2_matches, + ) + + if not doc2_matches: + logging.info(" [SKIP] No Doc 2 equivalents found for: %s", doc1_section.get("title")) + write_audit_event( + "doc_comparator.section.no_matches", + doc2_id=doc2_id, + section_index=section_index, + doc1_node_id=doc1_section.get("node_id"), + elapsed_seconds=round(time.perf_counter() - started, 6), + ) + return { + "section_index": section_index, + "doc1_section": doc1_section, + "doc2_matches": [], + "comparison_results": [], + } + + raw_results = compare_section_matches( + comparison_prompt, + doc1_section, + doc2_matches, + doc1_name=doc1_name, + doc2_name=doc2_name, + doc_type=doc_type, + ) + write_audit_event( + "doc_comparator.section.completed", + doc2_id=doc2_id, + section_index=section_index, + doc1_node_id=doc1_section.get("node_id"), + match_count=len(doc2_matches), + result_count=len(raw_results), + elapsed_seconds=round(time.perf_counter() - started, 6), + ) + + return { + "section_index": section_index, + "doc1_section": doc1_section, + "doc2_matches": doc2_matches, + "comparison_results": raw_results, + } + + +def compare_doc1_sections( + *, + vector_db, + doc2_id: str, + doc1_sections: list[dict], + comparison_prompt: str, + criteria: str, + doc_type: str, + doc1_name: str, + doc2_name: str, + max_doc2_matches: int = MAX_DOC2_MATCHES, + max_workers: int | None = None, + progress_callback: Callable[[int, int, SectionComparisonResult], None] | None = None, +) -> list[SectionComparisonResult]: + """Compare selected Doc 1 sections with bounded parallel section workers. + + The returned list preserves Doc 1 section order so report generation remains + stable even when worker completion order differs. + """ + if not doc1_sections: + return [] + + worker_count = max(1, int(max_workers or SECTION_CONCURRENCY)) + worker_count = min(worker_count, len(doc1_sections)) + + if worker_count == 1: + results = [] + total = len(doc1_sections) + for index, doc1_section in enumerate(doc1_sections): + result = compare_doc1_section( + vector_db=vector_db, + doc2_id=doc2_id, + doc1_section=doc1_section, + section_index=index, + comparison_prompt=comparison_prompt, + criteria=criteria, + doc_type=doc_type, + doc1_name=doc1_name, + doc2_name=doc2_name, + max_doc2_matches=max_doc2_matches, + ) + results.append(result) + if progress_callback: + progress_callback(len(results), total, result) + return results + + ordered: list[SectionComparisonResult | None] = [None] * len(doc1_sections) + with ThreadPoolExecutor(max_workers=worker_count) as executor: + futures = [ + executor.submit( + compare_doc1_section, + vector_db=vector_db, + doc2_id=doc2_id, + doc1_section=doc1_section, + section_index=index, + comparison_prompt=comparison_prompt, + criteria=criteria, + doc_type=doc_type, + doc1_name=doc1_name, + doc2_name=doc2_name, + max_doc2_matches=max_doc2_matches, + ) + for index, doc1_section in enumerate(doc1_sections) + ] + + total = len(futures) + completed = 0 + for future in as_completed(futures): + result = future.result() + ordered[result["section_index"]] = result + completed += 1 + if progress_callback: + progress_callback(completed, total, result) + + return [result for result in ordered if result is not None] diff --git a/src/pprag_doc_comparator/comparison/section_comparator.py b/src/pprag_doc_comparator/comparison/section_comparator.py index 9a06c5a..e60cc2c 100644 --- a/src/pprag_doc_comparator/comparison/section_comparator.py +++ b/src/pprag_doc_comparator/comparison/section_comparator.py @@ -9,13 +9,12 @@ from pprag_doc_comparator.comparison.section_comparator import compare_sections, generate_section_summary """ import os -import sys import re import logging -import time from concurrent.futures import ThreadPoolExecutor from pprag_doc_comparator.config import COMPARE_CONCURRENCY, LLM_MODEL +from pprag.model_runtime import generate_content_with_retry import google.generativeai as genai @@ -86,28 +85,14 @@ def compare_sections(comparison_prompt, doc1_section, doc2_section, doc1_name="D try: model = genai.GenerativeModel(LLM_MODEL) - max_retries = 5 - base_delay = 2.0 - response = None - for attempt in range(max_retries): - try: - response = model.generate_content( - prompt, - generation_config=genai.GenerationConfig( - temperature=0.0, - max_output_tokens=5000, - ) - ) - break - except Exception as e: - if "429" in str(e) or "Resource exhausted" in str(e): - if attempt == max_retries - 1: - raise e - delay = base_delay * (2 ** attempt) - logging.warning(f"Rate limit hit during generation. Retrying in {delay}s...") - time.sleep(delay) - else: - raise e + response = generate_content_with_retry( + model, + prompt, + generation_config=genai.GenerationConfig( + temperature=0.0, + max_output_tokens=5000, + ), + ) raw = response.text.strip() logging.info(f" [COMPARE] Raw LLM response ({len(raw)} chars): {raw[:300]}") diff --git a/src/pprag_doc_comparator/comparison/section_selector.py b/src/pprag_doc_comparator/comparison/section_selector.py index 8e785aa..a97691e 100644 --- a/src/pprag_doc_comparator/comparison/section_selector.py +++ b/src/pprag_doc_comparator/comparison/section_selector.py @@ -11,16 +11,18 @@ """ import os import re -import sys -import json import logging -import time -import hashlib from functools import lru_cache from pathlib import Path -from pprag_doc_comparator.config import LLM_MODEL, DOCUMENTS_DIR +from pprag_doc_comparator.config import LLM_MODEL, DOCUMENTS_DIR, INDEX_DIR from pprag_doc_comparator.validation.criteria_validator import build_selector_prompt +from pprag.document_manifest import ( + document_id_for_path, + read_text_lines_cached, + resolve_markdown_path_from_manifest, +) +from pprag.model_runtime import generate_content_with_retry, parse_json_object import google.generativeai as genai import typing @@ -92,30 +94,16 @@ def select_relevant_sections(vector_db, doc_id, criteria_query, try: model = genai.GenerativeModel(LLM_MODEL) - max_retries = 5 - base_delay = 2.0 - response = None - for attempt in range(max_retries): - try: - response = model.generate_content( - prompt, - generation_config=genai.GenerationConfig( - temperature=0.1, - max_output_tokens=2048, - response_mime_type="application/json", - response_schema=RankingResponse - ) - ) - break - except Exception as e: - if "429" in str(e) or "Resource exhausted" in str(e): - if attempt == max_retries - 1: - raise e - delay = base_delay * (2 ** attempt) - logging.warning(f"Rate limit hit during section selection. Retrying in {delay}s...") - time.sleep(delay) - else: - raise e + response = generate_content_with_retry( + model, + prompt, + generation_config=genai.GenerationConfig( + temperature=0.1, + max_output_tokens=2048, + response_mime_type="application/json", + response_schema=RankingResponse + ), + ) response_text = response.text.strip() @@ -130,9 +118,9 @@ def select_relevant_sections(vector_db, doc_id, criteria_query, ranked_ids = [] try: - ranking_data = json.loads(response_text) + ranking_data = parse_json_object(response_text) ranked_ids = [str(idx) for idx in ranking_data.get("ranked_indices", [])] - except (json.JSONDecodeError, TypeError): + except (ValueError, TypeError): # Powerful regex fallback: find any freestanding integer strings in the text potential_nums = re.findall(r'\d+', response_text) ranked_ids = [num for num in potential_nums if num in index_map] @@ -166,6 +154,10 @@ def resolve_md_path_for_doc_id(doc_id, data_dir=None): if data_dir is None: data_dir = str(DOCUMENTS_DIR) + manifest_path = resolve_markdown_path_from_manifest(doc_id, index_dir=INDEX_DIR) + if manifest_path is not None: + return manifest_path + try: filenames = os.listdir(data_dir) except OSError as exc: @@ -177,31 +169,18 @@ def resolve_md_path_for_doc_id(doc_id, data_dir=None): continue md_path = os.path.join(data_dir, filename) try: - with open(md_path, "rb") as fh: - content_hash = hashlib.sha256(fh.read()).hexdigest()[:12] + file_doc_id = document_id_for_path(md_path) except OSError as exc: logging.warning("Unable to hash markdown file %s: %s", md_path, exc) continue - file_doc_id = f"{Path(md_path).stem}_{content_hash}" if file_doc_id == doc_id: return md_path return None -@lru_cache(maxsize=128) -def _read_md_lines_cached(md_path, mtime_ns, size): - with open(md_path, "r", encoding="utf-8") as fh: - return tuple(fh.readlines()) - - def read_md_lines_cached(md_path): """Read Markdown lines with an mtime/size-aware cache.""" - try: - stat = os.stat(md_path) - return list(_read_md_lines_cached(md_path, stat.st_mtime_ns, stat.st_size)) - except OSError as exc: - logging.warning("Unable to read markdown file %s: %s", md_path, exc) - return None + return read_text_lines_cached(md_path) def load_full_section_text(doc_id, start_line, end_line, data_dir=None, md_path=None): diff --git a/src/pprag_doc_comparator/config.py b/src/pprag_doc_comparator/config.py index 5f46fe2..a23184c 100644 --- a/src/pprag_doc_comparator/config.py +++ b/src/pprag_doc_comparator/config.py @@ -43,8 +43,8 @@ def _default_project_root(subproject: str) -> Path: LLAMA_PARSE_TIER = os.getenv("LLAMA_PARSE_TIER", "cost_effective") # ── Models ────────────────────────────────────────────────────────────── -EMBEDDING_MODEL = "models/gemini-embedding-001" -EMBEDDING_DIMS = 1536 +EMBEDDING_MODEL = os.getenv("DC_EMBEDDING_MODEL", "models/gemini-embedding-001") +EMBEDDING_DIMS = int(os.getenv("DC_EMBEDDING_DIMS", "1536")) LLM_MODEL = os.getenv("DC_LLM_MODEL", "gemini-3.1-flash-lite") # ── Embedding Throughput ──────────────────────────────────────────────── @@ -59,3 +59,4 @@ def _default_project_root(subproject: str) -> Path: MAX_DOC1_SECTIONS = 10 MAX_DOC2_MATCHES = 3 COMPARE_CONCURRENCY = max(1, int(os.getenv("DC_COMPARE_CONCURRENCY", "3"))) +SECTION_CONCURRENCY = max(1, int(os.getenv("DC_SECTION_CONCURRENCY", "2"))) diff --git a/src/pprag_doc_comparator/indexing/build_doc_index.py b/src/pprag_doc_comparator/indexing/build_doc_index.py index 367a8d8..f80e0a3 100644 --- a/src/pprag_doc_comparator/indexing/build_doc_index.py +++ b/src/pprag_doc_comparator/indexing/build_doc_index.py @@ -14,9 +14,7 @@ """ import os import sys -import time import json -import hashlib import logging from pathlib import Path @@ -27,12 +25,18 @@ EMBEDDING_BATCH_SIZE, EMBEDDING_BATCH_DELAY, ) from pprag_doc_comparator.indexing.build_skeleton_trees import build_skeleton_trees +from pprag.document_manifest import ( + build_manifest_entries, + document_id_for_path, + write_document_manifest, +) from pprag.faiss_security import require_trusted_faiss_deserialization from pprag.gemini_embeddings import ( embed_content_with_retry, embed_texts_batched, normalize_embedding_response, ) +from pprag.model_runtime import generate_content_with_retry, parse_json_object import google.generativeai as genai from langchain_community.vectorstores import FAISS @@ -44,10 +48,7 @@ # ── Document Identity ────────────────────────────────────────────────── def get_doc_id(file_path: str) -> str: """Generate a stable ID from file content using content hash.""" - with open(file_path, "rb") as f: - content_hash = hashlib.sha256(f.read()).hexdigest()[:12] - base_name = Path(file_path).stem - return f"{base_name}_{content_hash}" + return document_id_for_path(file_path) # ── Custom Embedding Wrapper ────────────────────────────────────────── @@ -118,39 +119,18 @@ def get_noise_node_ids(doc_name, structure): model = genai.GenerativeModel(LLM_MODEL) - max_retries = 5 - base_delay = 2.0 - response = None - for attempt in range(max_retries): - try: - response = model.generate_content( - prompt, - generation_config=genai.GenerationConfig( - temperature=0.0, - max_output_tokens=2048, - ) - ) - break - except Exception as e: - if "429" in str(e) or "Resource exhausted" in str(e): - if attempt == max_retries - 1: - raise e - delay = base_delay * (2 ** attempt) - logging.warning(f"Rate limit hit during noise filtering. Retrying in {delay}s...") - time.sleep(delay) - else: - raise e + response = generate_content_with_retry( + model, + prompt, + generation_config=genai.GenerationConfig( + temperature=0.0, + max_output_tokens=2048, + ), + ) # Robustly extract JSON or fallback to regex try: - text = response.text.strip() - if text.startswith("```json"): - text = text[7:] - if text.endswith("```"): - text = text[:-3] - text = text.strip() - - result = json.loads(text) + result = parse_json_object(response.text) noise_ids = set() for entry in result.get("noise_nodes", []): nid = entry.get("node_id") @@ -338,6 +318,9 @@ def build_comparator_index(md_paths=None, incremental=True, progress_callback=No os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith(".md") ] + md_paths = list(md_paths) + manifest_entries = build_manifest_entries(md_paths, trees_dir=trees_dir) + write_document_manifest(save_path, manifest_entries) all_chunks = [] indexed_doc_ids = [] @@ -353,6 +336,8 @@ def build_comparator_index(md_paths=None, incremental=True, progress_callback=No doc_id = get_doc_id(md_path) if doc_id in existing_docs: logging.info(f" [CACHED] {doc_id}: Already indexed.") + if doc_id in manifest_entries: + manifest_entries[doc_id]["indexed"] = True indexed_doc_ids.append(doc_id) continue @@ -360,6 +345,9 @@ def build_comparator_index(md_paths=None, incremental=True, progress_callback=No progress_callback(f"Indexing: {base_name}...") chunks, doc_id = index_single_document(md_path, tree_path, progress_callback) + if doc_id in manifest_entries: + manifest_entries[doc_id]["indexed"] = True + manifest_entries[doc_id]["chunk_count"] = len(chunks) all_chunks.extend(chunks) indexed_doc_ids.append(doc_id) @@ -378,4 +366,6 @@ def build_comparator_index(md_paths=None, incremental=True, progress_callback=No elif vector_db is None: logging.warning("No chunks generated and no existing index.") + write_document_manifest(save_path, manifest_entries) + return vector_db, indexed_doc_ids diff --git a/src/pprag_doc_comparator/validation/criteria_validator.py b/src/pprag_doc_comparator/validation/criteria_validator.py index 71f0e8e..fa33943 100644 --- a/src/pprag_doc_comparator/validation/criteria_validator.py +++ b/src/pprag_doc_comparator/validation/criteria_validator.py @@ -8,9 +8,7 @@ from pprag_doc_comparator.validation.criteria_validator import validate_criteria, build_prompts """ import os -import sys import json -import time import logging import typing @@ -21,6 +19,7 @@ class ValidationResponse(typing.TypedDict): document_type_detected: str from pprag_doc_comparator.config import LLM_MODEL +from pprag.model_runtime import generate_content_with_retry, parse_json_object import google.generativeai as genai @@ -93,33 +92,20 @@ def validate_criteria(criteria, doc1_tree_path, doc1_md_path, model = genai.GenerativeModel(LLM_MODEL) - max_retries = 5 - base_delay = 2.0 - response = None - for attempt in range(max_retries): - try: - response = model.generate_content( - prompt, - generation_config=genai.GenerationConfig( - temperature=0.1, - max_output_tokens=2048, - response_mime_type="application/json", - response_schema=ValidationResponse - ) - ) - break - except Exception as e: - if "429" in str(e) or "Resource exhausted" in str(e): - if attempt == max_retries - 1: - logging.error("Criteria validation failed after retries: %s", e) - response = None - break - delay = base_delay * (2 ** attempt) - time.sleep(delay) - else: - logging.error("Criteria validation request failed: %s", e) - response = None - break + try: + response = generate_content_with_retry( + model, + prompt, + generation_config=genai.GenerationConfig( + temperature=0.1, + max_output_tokens=2048, + response_mime_type="application/json", + response_schema=ValidationResponse + ), + ) + except Exception as exc: + logging.error("Criteria validation request failed: %s", exc) + response = None if response is None: return { @@ -129,14 +115,10 @@ def validate_criteria(criteria, doc1_tree_path, doc1_md_path, "document_type_detected": "document", } - text = response.text.strip() - if text.startswith("```"): - lines = text.split("\n") - text = "\n".join(lines[1:-1]).strip() - try: - result = json.loads(text) - except json.JSONDecodeError: + result = parse_json_object(response.text) + except (ValueError, TypeError, json.JSONDecodeError): + text = getattr(response, "text", "") logging.error(f"Failed to parse criteria validation response: {text}") result = { diff --git a/src/pprag_multimodal/config.py b/src/pprag_multimodal/config.py index ca27e6b..e07499f 100644 --- a/src/pprag_multimodal/config.py +++ b/src/pprag_multimodal/config.py @@ -35,10 +35,10 @@ def _default_project_root(subproject: str) -> Path: RESULTS_DIR = os.getenv("PP_RESULTS_DIR", os.path.join(BASE_DIR, "results")) # ── Model Config ──────────────────────────────────────────────────────── -EMBEDDING_MODEL = "models/gemini-embedding-001" -EMBEDDING_DIMS = 1536 -NOISE_FILTER_MODEL = "gemini-3.1-flash-lite" -SYNTH_MODEL = "gemini-3.5-flash" +EMBEDDING_MODEL = os.getenv("PP_EMBEDDING_MODEL", "models/gemini-embedding-001") +EMBEDDING_DIMS = int(os.getenv("PP_EMBEDDING_DIMS", "1536")) +NOISE_FILTER_MODEL = os.getenv("PP_NOISE_FILTER_MODEL", "gemini-3.1-flash-lite") +SYNTH_MODEL = os.getenv("PP_SYNTH_MODEL", "gemini-3.1-flash-lite") VISION_FILTER = False # Set to True for high-fidelity image verification (adds ~30s latency) EMBEDDING_BATCH_SIZE = int(os.getenv("PP_EMBEDDING_BATCH_SIZE", "20")) EMBEDDING_BATCH_DELAY = float(os.getenv("PP_EMBEDDING_BATCH_DELAY", "5")) diff --git a/src/pprag_multimodal/indexing/build_md_index.py b/src/pprag_multimodal/indexing/build_md_index.py index fc2485e..67fb02a 100644 --- a/src/pprag_multimodal/indexing/build_md_index.py +++ b/src/pprag_multimodal/indexing/build_md_index.py @@ -12,8 +12,6 @@ import json import logging import argparse -import time -from collections.abc import Sequence as SequenceABC from pprag_multimodal.config import ( DATASET_DIR, TREES_DIR, INDEX_DIR, @@ -24,6 +22,12 @@ build_skeleton_trees, get_md_path_for_doc ) from pprag.faiss_security import require_trusted_faiss_deserialization +from pprag.gemini_embeddings import ( + embed_content_with_retry, + embed_texts_batched, + normalize_embedding_response, +) +from pprag.model_runtime import parse_json_object import google.generativeai as genai from langchain_community.vectorstores import FAISS @@ -34,40 +38,6 @@ logging.basicConfig(level=logging.INFO, format='%(message)s') -def _normalize_embeddings(result, expected_count): - if not isinstance(result, dict): - raise ValueError(f"Unexpected embedding response type: {type(result).__name__}") - embeddings = result.get("embeddings", result.get("embedding")) - if embeddings is None: - raise ValueError(f"Embedding response missing embedding data: {result!r}") - if expected_count == 1 and embeddings and isinstance(embeddings[0], (int, float)): - return [embeddings] - if not isinstance(embeddings, SequenceABC): - raise ValueError(f"Embedding response is not a sequence: {result!r}") - vectors = list(embeddings) - if len(vectors) != expected_count: - raise ValueError(f"Expected {expected_count} embedding(s), received {len(vectors)}") - return vectors - - -def _is_rate_limit_error(exc): - status_code = getattr(getattr(exc, "response", None), "status_code", None) - if status_code == 429: - return True - text = str(exc).lower() - return "429" in text or "quota" in text or "rate" in text or "resource exhausted" in text - - -def _strip_json_fence(text): - stripped = text.strip() - if not stripped.startswith("```"): - return stripped - lines = stripped.split("\n") - if lines and lines[0].strip().startswith("```") and lines[-1].strip() == "```": - return "\n".join(lines[1:-1]).strip() - return stripped - - def _node_line_index(node, md_lines, doc_id, node_id): try: line_num = int(node["line_num"]) @@ -102,44 +72,23 @@ def __init__(self, model=EMBEDDING_MODEL, dimensionality=EMBEDDING_DIMS): self.dimensionality = dimensionality def embed_documents(self, texts): - all_embeddings = [] - batch_size = max(1, EMBEDDING_BATCH_SIZE) - - for i in range(0, len(texts), batch_size): - batch = texts[i:i + batch_size] - batch_num = i // batch_size + 1 - total_batches = (len(texts) + batch_size - 1) // batch_size - logging.info(f" Embedding batch {batch_num}/{total_batches} ({len(batch)} chunks)...") - - for attempt in range(5): - try: - result = genai.embed_content( - model=self.model, - content=batch, - output_dimensionality=self.dimensionality - ) - all_embeddings.extend(_normalize_embeddings(result, len(batch))) - break - except Exception as e: - if _is_rate_limit_error(e) and attempt < 4: - wait = 2 ** attempt - logging.info(f" Rate limited, waiting {wait}s (attempt {attempt+1}/5): {e}") - time.sleep(wait) - else: - logging.exception("Embedding request failed") - raise - if i + batch_size < len(texts): - time.sleep(max(0.0, EMBEDDING_BATCH_DELAY)) - - return all_embeddings + return embed_texts_batched( + genai, + texts, + model=self.model, + output_dimensionality=self.dimensionality, + batch_size=EMBEDDING_BATCH_SIZE, + batch_delay=EMBEDDING_BATCH_DELAY, + ) def embed_query(self, text): - result = genai.embed_content( + result = embed_content_with_retry( + genai, model=self.model, content=text, output_dimensionality=self.dimensionality ) - return _normalize_embeddings(result, 1)[0] + return normalize_embedding_response(result, 1)[0] # ── Noise Filter ──────────────────────────────────────────────────────── @@ -194,8 +143,7 @@ def _slim_tree(nodes): max_output_tokens=2048, ) ) - text = _strip_json_fence(response.text) - result = json.loads(text) + result = parse_json_object(response.text) except Exception as e: raw = getattr(locals().get("response", None), "text", "") logging.exception(" [Error running noise filter for %s]; skipping noise filtering. Raw: %.500r", doc_name, raw) diff --git a/src/pprag_text_only/agent/pp_rag_bot.py b/src/pprag_text_only/agent/pp_rag_bot.py index c2058ef..73af584 100644 --- a/src/pprag_text_only/agent/pp_rag_bot.py +++ b/src/pprag_text_only/agent/pp_rag_bot.py @@ -13,12 +13,13 @@ """ import os import re -import json import logging -from collections.abc import Sequence as SequenceABC from pprag_text_only.config import DATA_DIR, INDEX_DIR, EMBEDDING_MODEL, EMBEDDING_DIMS, SYNTH_MODEL +from pprag.document_manifest import read_text_lines_cached from pprag.faiss_security import require_trusted_faiss_deserialization +from pprag.gemini_embeddings import embed_content_with_retry, normalize_embedding_response +from pprag.model_runtime import generate_content_with_retry logger = logging.getLogger(__name__) @@ -27,28 +28,6 @@ from langchain_core.embeddings import Embeddings -def _normalize_embeddings(result, expected_count): - """Return a list of embedding vectors from Gemini's single or batch shapes.""" - if not isinstance(result, dict): - raise ValueError(f"Unexpected embedding response type: {type(result).__name__}") - - if "embeddings" in result: - embeddings = result["embeddings"] - elif "embedding" in result: - embeddings = result["embedding"] - else: - raise ValueError(f"Embedding response missing embedding data: {result!r}") - - if expected_count == 1 and embeddings and isinstance(embeddings[0], (int, float)): - return [embeddings] - if not isinstance(embeddings, SequenceABC): - raise ValueError(f"Embedding response is not a sequence: {result!r}") - vectors = list(embeddings) - if len(vectors) != expected_count: - raise ValueError(f"Expected {expected_count} embedding(s), received {len(vectors)}") - return vectors - - def _parse_non_negative_int(value, default=0): try: return max(0, int(value)) @@ -80,20 +59,22 @@ def __init__(self, model=EMBEDDING_MODEL, dimensionality=EMBEDDING_DIMS): self.dimensionality = dimensionality def embed_documents(self, texts): - result = genai.embed_content( + result = embed_content_with_retry( + genai, model=self.model, content=texts, output_dimensionality=self.dimensionality ) - return _normalize_embeddings(result, len(texts)) + return normalize_embedding_response(result, len(texts)) def embed_query(self, text): - result = genai.embed_content( + result = embed_content_with_retry( + genai, model=self.model, content=text, output_dimensionality=self.dimensionality ) - return _normalize_embeddings(result, 1)[0] + return normalize_embedding_response(result, 1)[0] class ProxyPointerRAG: @@ -114,13 +95,12 @@ def __init__(self, index_path=None, data_dir=None): self.llm_timeout = float(os.getenv("PPRAG_LLM_TIMEOUT", "120")) def _generate_content(self, *args, **kwargs): - try: - return self.model.generate_content( - *args, request_options={"timeout": self.llm_timeout}, **kwargs - ) - except TypeError: - # Test doubles and older SDKs may not accept request_options. - return self.model.generate_content(*args, **kwargs) + return generate_content_with_retry( + self.model, + *args, + timeout=self.llm_timeout, + **kwargs, + ) def retrieve_unique_nodes(self, query, k_search=200, k_final=5): """Stage 1: Broad vector recall → Stage 2: LLM re-ranking.""" @@ -224,14 +204,18 @@ def chat(self, query): for p in pointers: md_path = os.path.join(self.data_dir, f"{p['doc_id']}.md") if os.path.exists(md_path): - with open(md_path, "r", encoding="utf-8") as f: - lines = f.readlines() + lines = read_text_lines_cached(md_path) + if lines is not None: safe_start = max(0, min(len(lines), p["start_line"])) safe_end = max(safe_start, min(len(lines), p["end_line"])) text = "".join(lines[safe_start:safe_end]).strip() or p["content"] context.append( f"### REFERENCE: {p['global_breadcrumb']}\n{text}" ) + else: + context.append( + f"### REFERENCE: {p['global_breadcrumb']}\n{p['content']}" + ) else: # Fallback to vector DB chunk content if .md file is missing context.append( diff --git a/src/pprag_text_only/config.py b/src/pprag_text_only/config.py index aea0f94..93680c6 100644 --- a/src/pprag_text_only/config.py +++ b/src/pprag_text_only/config.py @@ -48,10 +48,10 @@ def _default_project_root(subproject: str) -> Path: LLAMA_PARSE_TIER = os.getenv("LLAMA_PARSE_TIER", "cost_effective") # ── Models ────────────────────────────────────────────────────────────── -EMBEDDING_MODEL = "models/gemini-embedding-001" -EMBEDDING_DIMS = 1536 -NOISE_FILTER_MODEL = "gemini-3.1-flash-lite" -SYNTH_MODEL = "gemini-3.1-flash-lite" +EMBEDDING_MODEL = os.getenv("PP_EMBEDDING_MODEL", "models/gemini-embedding-001") +EMBEDDING_DIMS = int(os.getenv("PP_EMBEDDING_DIMS", "1536")) +NOISE_FILTER_MODEL = os.getenv("PP_NOISE_FILTER_MODEL", "gemini-3.1-flash-lite") +SYNTH_MODEL = os.getenv("PP_SYNTH_MODEL", "gemini-3.1-flash-lite") # ── Embedding Throughput ──────────────────────────────────────────────── EMBEDDING_BATCH_SIZE = int(os.getenv("PP_EMBEDDING_BATCH_SIZE", "20")) diff --git a/tests/test_cli_packaging.py b/tests/test_cli_packaging.py index f23e5d6..5cac06e 100644 --- a/tests/test_cli_packaging.py +++ b/tests/test_cli_packaging.py @@ -184,6 +184,13 @@ def test_streamlit_modalities_expose_serve_alias(): assert rc == 0 assert "serve" in compare_help.getvalue() + top_help = io.StringIO() + with contextlib.redirect_stdout(top_help): + rc = pprag.cli.main(["--help"]) + assert rc == 0 + assert "doctor" in top_help.getvalue() + assert "eval" in top_help.getvalue() + def test_serve_alias_starts_streamlit_app(monkeypatch): import pprag.cli @@ -224,6 +231,26 @@ def fake_run_module(extra, package, module, args): ] +def test_runtime_commands_dispatch_to_shared_modules(monkeypatch): + import pprag.cli + + calls = [] + + def fake_run_module(extra, package, module, args): + calls.append((extra, package, module, list(args))) + return 0 + + monkeypatch.setattr(pprag.cli, "_run_module", fake_run_module) + + assert pprag.cli.main(["doctor", "--json"]) == 0 + assert pprag.cli.main(["eval", "runtime"]) == 0 + + assert calls == [ + ("full", "pprag", "doctor", ["--json"]), + ("full", "pprag", "eval", ["runtime"]), + ] + + def test_missing_extra_message_names_install_target(): from pprag.cli import MissingExtraError, require_extra diff --git a/tests/test_enterprise_upgrades.py b/tests/test_enterprise_upgrades.py new file mode 100644 index 0000000..a67d1cc --- /dev/null +++ b/tests/test_enterprise_upgrades.py @@ -0,0 +1,149 @@ +import builtins +import time +from pathlib import Path + +from pprag.document_manifest import ( + build_manifest_entries, + clear_document_caches, + load_document_manifest, + read_text_lines_cached, + resolve_markdown_path_from_manifest, + write_document_manifest, +) +from pprag.eval import runtime_eval +from pprag.model_runtime import call_with_retry, parse_json_object +from pprag_doc_comparator.comparison import pipeline + + +def test_document_manifest_resolves_doc_ids_without_directory_scan(tmp_path): + docs_dir = tmp_path / "docs" + trees_dir = tmp_path / "trees" + index_dir = tmp_path / "index" + docs_dir.mkdir() + trees_dir.mkdir() + + md_path = docs_dir / "contract.md" + md_path.write_text("# Contract\n\nSection body\n", encoding="utf-8") + + entries = build_manifest_entries([md_path], trees_dir=trees_dir) + manifest_path = write_document_manifest(index_dir, entries) + + assert manifest_path.exists() + manifest = load_document_manifest(index_dir) + doc_id = next(iter(manifest)) + + assert manifest[doc_id]["md_path"] == str(md_path) + assert resolve_markdown_path_from_manifest(doc_id, index_dir=index_dir) == str(md_path) + + +def test_shared_line_cache_reuses_file_reads(monkeypatch, tmp_path): + md_path = tmp_path / "doc.md" + md_path.write_text("# Heading\nline 1\nline 2\n", encoding="utf-8") + + clear_document_caches() + open_count = 0 + real_open = builtins.open + + def counting_open(*args, **kwargs): + nonlocal open_count + if Path(args[0]) == md_path: + open_count += 1 + return real_open(*args, **kwargs) + + monkeypatch.setattr(builtins, "open", counting_open) + + assert read_text_lines_cached(md_path) == ["# Heading\n", "line 1\n", "line 2\n"] + assert read_text_lines_cached(md_path) == ["# Heading\n", "line 1\n", "line 2\n"] + assert open_count == 1 + + +def test_section_pipeline_preserves_order_with_parallel_speedup(monkeypatch): + def fake_retrieve_matching_sections(*args, **kwargs): + time.sleep(0.05) + section_title = kwargs["doc1_breadcrumb"] + return [{"title": f"match for {section_title}", "full_text": "doc2"}] + + def fake_compare_section_matches(*args, **kwargs): + doc1_section = args[1] + return [{"rating": "GREEN", "doc2_title": f"match for {doc1_section['title']}"}] + + monkeypatch.setattr(pipeline, "retrieve_matching_sections", fake_retrieve_matching_sections) + monkeypatch.setattr(pipeline, "compare_section_matches", fake_compare_section_matches) + + sections = [ + {"title": f"Section {i}", "breadcrumb": f"Section {i}", "full_text": "doc1"} + for i in range(4) + ] + + start = time.perf_counter() + serial = pipeline.compare_doc1_sections( + vector_db=object(), + doc2_id="doc2", + doc1_sections=sections, + comparison_prompt="prompt", + criteria="risk", + doc_type="contract", + doc1_name="doc1", + doc2_name="doc2", + max_workers=1, + ) + serial_seconds = time.perf_counter() - start + + start = time.perf_counter() + parallel = pipeline.compare_doc1_sections( + vector_db=object(), + doc2_id="doc2", + doc1_sections=sections, + comparison_prompt="prompt", + criteria="risk", + doc_type="contract", + doc1_name="doc1", + doc2_name="doc2", + max_workers=4, + ) + parallel_seconds = time.perf_counter() - start + + assert [item["doc1_section"]["title"] for item in parallel] == [ + "Section 0", + "Section 1", + "Section 2", + "Section 3", + ] + assert serial == parallel + assert parallel_seconds < serial_seconds * 0.75 + + +def test_runtime_retry_and_json_parsing_are_shared(): + attempts = 0 + sleeps = [] + + class RateLimited(Exception): + status_code = 429 + + def flaky_operation(): + nonlocal attempts + attempts += 1 + if attempts == 1: + raise RateLimited("quota exceeded") + return "ok" + + assert call_with_retry( + flaky_operation, + operation_name="test request", + sleep=sleeps.append, + ) == "ok" + assert attempts == 2 + assert sleeps == [2.0] + + assert parse_json_object('```json\n{"ranked_indices": [2, 1]}\n```') == { + "ranked_indices": [2, 1] + } + + +def test_runtime_eval_reports_cache_and_parallel_metrics(): + result = runtime_eval() + + assert result["markdown_cold_read_seconds"] >= 0 + assert result["markdown_cached_read_seconds"] >= 0 + assert result["section_serial_seconds"] > result["section_parallel_seconds"] + assert result["section_speedup"] > 1 diff --git a/tests/test_performance_upgrades.py b/tests/test_performance_upgrades.py index 463e42e..b14174f 100644 --- a/tests/test_performance_upgrades.py +++ b/tests/test_performance_upgrades.py @@ -5,9 +5,9 @@ from pprag_doc_comparator.comparison import section_comparator from pprag_doc_comparator.comparison.section_comparator import compare_section_matches from pprag_doc_comparator.comparison.section_selector import ( - _read_md_lines_cached, load_full_section_text, ) +from pprag.document_manifest import clear_document_caches from pprag_multimodal.agent.mm_rag_bot import MultimodalProxyPointerRAG @@ -15,7 +15,7 @@ def test_load_full_section_text_reuses_cached_markdown_lines(monkeypatch, tmp_pa md_path = tmp_path / "doc.md" md_path.write_text("# Heading\nline 1\nline 2\nline 3\n", encoding="utf-8") - _read_md_lines_cached.cache_clear() + clear_document_caches() open_count = 0 real_open = builtins.open