From 35351a96af5b273cf5b55c6b76618578c81ad0fb Mon Sep 17 00:00:00 2001 From: Rajarshi Nandi Date: Tue, 3 Mar 2026 20:12:55 +0000 Subject: [PATCH] Add multi-document batch processing and knowledge base search MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses the use case raised in issue #119 — processing multiple PDFs and querying the results as a unified knowledge base. New module: pageindex/batch_processor.py - process_batch(doc_paths, output_dir, **kwargs) Processes a list of PDF files using the existing page_index_main() pipeline, saves per-document structure JSONs, and writes a kb_index.json manifest listing every document and its output file. Skips failed documents without aborting the whole batch. - KnowledgeBaseSearch(kb_index_path) Loads a knowledge base produced by process_batch() and exposes: .search(query, top_k) — case-insensitive substring search across all node titles and summaries; scores title matches higher than summary-only matches. .get_document(doc_name) — retrieve one document's full structure. .list_documents() — list all successfully processed doc names. CLI (run_pageindex.py): - New --batch-dir flag: point at a directory of PDFs and the batch processor runs over all of them, printing a summary on completion. Tests (tests/test_batch_processor.py): - 28 mock-based tests covering validation, success path, error handling, partial failures, search scoring, cross-document search, nested nodes, and edge cases. No API key required. Closes #119 --- pageindex/__init__.py | 3 +- pageindex/batch_processor.py | 349 ++++++++++++++++++++++++ run_pageindex.py | 50 +++- tests/test_batch_processor.py | 491 ++++++++++++++++++++++++++++++++++ 4 files changed, 885 insertions(+), 8 deletions(-) create mode 100644 pageindex/batch_processor.py create mode 100644 tests/test_batch_processor.py diff --git a/pageindex/__init__.py b/pageindex/__init__.py index 4606eb396..ff5831fdd 100644 --- a/pageindex/__init__.py +++ b/pageindex/__init__.py @@ -1,2 +1,3 @@ from .page_index import * -from .page_index_md import md_to_tree \ No newline at end of file +from .page_index_md import md_to_tree +from .batch_processor import process_batch, KnowledgeBaseSearch \ No newline at end of file diff --git a/pageindex/batch_processor.py b/pageindex/batch_processor.py new file mode 100644 index 000000000..bcdde6c75 --- /dev/null +++ b/pageindex/batch_processor.py @@ -0,0 +1,349 @@ +""" +batch_processor.py — Multi-document processing and knowledge base search. + +Public API +---------- +process_batch(doc_paths, output_dir, **kwargs) -> dict + Process a list of PDF files and write per-document structure JSONs plus a + kb_index.json manifest to output_dir. + +KnowledgeBaseSearch(kb_index_path) + Load a knowledge base produced by process_batch() and search across it. +""" + +import json +import logging +import os +from datetime import datetime, timezone +from typing import List, Optional + +from pageindex.page_index import page_index_main +from pageindex.utils import ConfigLoader, config as Config + + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _flatten_nodes(node: dict, doc_name: str, out: list) -> None: + """Recursively collect every node from a structure tree into *out*.""" + entry = { + "doc_name": doc_name, + "title": node.get("title", ""), + "start_index": node.get("start_index"), + "end_index": node.get("end_index"), + } + if "node_id" in node: + entry["node_id"] = node["node_id"] + if "summary" in node: + entry["summary"] = node["summary"] + out.append(entry) + for child in node.get("nodes", []): + _flatten_nodes(child, doc_name, out) + + +# --------------------------------------------------------------------------- +# process_batch +# --------------------------------------------------------------------------- + +def process_batch( + doc_paths: List[str], + output_dir: str = "./results", + model: Optional[str] = None, + toc_check_page_num: Optional[int] = None, + max_page_num_each_node: Optional[int] = None, + max_token_num_each_node: Optional[int] = None, + if_add_node_id: Optional[str] = None, + if_add_node_summary: Optional[str] = None, + if_add_doc_description: Optional[str] = None, + if_add_node_text: Optional[str] = None, + batch_logger: Optional[logging.Logger] = None, +) -> dict: + """Process multiple PDF files and build a knowledge-base index. + + For each PDF, the document's hierarchical structure is saved as:: + + {output_dir}/{stem}_structure.json + + A manifest file is also written:: + + {output_dir}/kb_index.json + + The manifest lists every document, its status, and its output file so that + :class:`KnowledgeBaseSearch` can load and query the results. + + Args: + doc_paths: List of PDF file paths to process. + output_dir: Directory for output files (created if absent). + model: LLM model name. Falls back to ``config.yaml`` default when ``None``. + toc_check_page_num: Pages to scan for a table of contents. + max_page_num_each_node: Maximum pages per tree node. + max_token_num_each_node: Maximum tokens per tree node. + if_add_node_id: ``'yes'``/``'no'`` — attach node IDs. + if_add_node_summary: ``'yes'``/``'no'`` — generate node summaries. + if_add_doc_description: ``'yes'``/``'no'`` — generate a document description. + if_add_node_text: ``'yes'``/``'no'`` — include raw page text per node. + batch_logger: Optional logger; falls back to the module-level logger. + + Returns: + A dict with keys: + + - ``"processed"`` — list of filenames that succeeded. + - ``"failed"`` — list of ``{"doc": path, "error": message}`` dicts. + - ``"output_dir"`` — absolute path to the output directory. + - ``"kb_index_path"`` — absolute path to ``kb_index.json``. + + Raises: + ValueError: if *doc_paths* is empty or contains non-PDF files. + """ + log = batch_logger or logger + + if not doc_paths: + raise ValueError("doc_paths must not be empty.") + + non_pdf = [p for p in doc_paths if not p.lower().endswith(".pdf")] + if non_pdf: + raise ValueError( + f"Batch processing only supports PDF files. " + f"Unsupported files: {non_pdf}" + ) + + os.makedirs(output_dir, exist_ok=True) + abs_output_dir = os.path.abspath(output_dir) + + # Build shared opt — skip keys whose value is None so defaults apply. + user_opt = { + k: v for k, v in { + "model": model, + "toc_check_page_num": toc_check_page_num, + "max_page_num_each_node": max_page_num_each_node, + "max_token_num_each_node": max_token_num_each_node, + "if_add_node_id": if_add_node_id, + "if_add_node_summary": if_add_node_summary, + "if_add_doc_description": if_add_doc_description, + "if_add_node_text": if_add_node_text, + }.items() if v is not None + } + opt = ConfigLoader().load(user_opt) + + processed: List[str] = [] + failed: List[dict] = [] + kb_documents: List[dict] = [] + + for doc_path in doc_paths: + doc_name = os.path.basename(doc_path) + stem = os.path.splitext(doc_name)[0] + output_file = os.path.join(abs_output_dir, f"{stem}_structure.json") + + if not os.path.isfile(doc_path): + failed.append({"doc": doc_path, "error": "File not found."}) + log.warning(f"Skipping '{doc_name}': file not found.") + continue + + try: + log.info(f"Processing '{doc_name}' ...") + result = page_index_main(doc_path, opt) + + with open(output_file, "w", encoding="utf-8") as f: + json.dump(result, f, indent=2, ensure_ascii=False) + + kb_documents.append({ + "doc_name": result.get("doc_name", doc_name), + "doc_description": result.get("doc_description", ""), + "output_file": os.path.basename(output_file), + "status": "success", + }) + processed.append(doc_name) + log.info(f"Saved '{output_file}'") + + except Exception as exc: + failed.append({"doc": doc_path, "error": str(exc)}) + log.error(f"Failed to process '{doc_name}': {exc}") + + kb_index = { + "created_at": datetime.now(timezone.utc).isoformat(), + "total_documents": len(kb_documents), + "documents": kb_documents, + } + kb_index_path = os.path.join(abs_output_dir, "kb_index.json") + with open(kb_index_path, "w", encoding="utf-8") as f: + json.dump(kb_index, f, indent=2, ensure_ascii=False) + + log.info( + f"Batch complete — processed: {len(processed)}, " + f"failed: {len(failed)}. Index: {kb_index_path}" + ) + return { + "processed": processed, + "failed": failed, + "output_dir": abs_output_dir, + "kb_index_path": kb_index_path, + } + + +# --------------------------------------------------------------------------- +# KnowledgeBaseSearch +# --------------------------------------------------------------------------- + +class KnowledgeBaseSearch: + """Load a knowledge base produced by :func:`process_batch` and search it. + + Searching is case-insensitive substring matching across node titles and + summaries — no embeddings needed, consistent with PageIndex's + reasoning-first philosophy. + + Scoring: a title match scores 2, a summary match scores 1. Results are + returned in descending score order. + + Example:: + + kb = KnowledgeBaseSearch("./results/kb_index.json") + hits = kb.search("revenue growth", top_k=5) + full = kb.get_document("earnings_report.pdf") + names = kb.list_documents() + """ + + def __init__(self, kb_index_path: str) -> None: + """ + Args: + kb_index_path: Path to ``kb_index.json`` produced by + :func:`process_batch`. + + Raises: + FileNotFoundError: if the index file does not exist. + """ + if not os.path.isfile(kb_index_path): + raise FileNotFoundError( + f"Knowledge base index not found: {kb_index_path}" + ) + with open(kb_index_path, "r", encoding="utf-8") as f: + self._index: dict = json.load(f) + + self._results_dir = os.path.dirname(os.path.abspath(kb_index_path)) + self._structures: dict = {} # doc_name -> loaded structure dict + self._flat_nodes: Optional[list] = None # flattened once, then cached + + # ------------------------------------------------------------------ + # Private helpers + # ------------------------------------------------------------------ + + def _load_structure(self, doc_entry: dict) -> dict: + doc_name = doc_entry["doc_name"] + if doc_name not in self._structures: + path = os.path.join(self._results_dir, doc_entry["output_file"]) + with open(path, "r", encoding="utf-8") as f: + self._structures[doc_name] = json.load(f) + return self._structures[doc_name] + + def _get_all_nodes(self) -> list: + if self._flat_nodes is not None: + return self._flat_nodes + + self._flat_nodes = [] + for doc_entry in self._index.get("documents", []): + if doc_entry.get("status") != "success": + continue + try: + structure = self._load_structure(doc_entry) + doc_name = structure.get("doc_name", doc_entry["doc_name"]) + for top_node in structure.get("structure", []): + _flatten_nodes(top_node, doc_name, self._flat_nodes) + except (FileNotFoundError, json.JSONDecodeError): + continue + return self._flat_nodes + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def search(self, query: str, top_k: int = 5) -> list: + """Search across all document nodes by title and summary. + + Args: + query: Case-insensitive search string. + top_k: Maximum number of results to return. + + Returns: + List of result dicts sorted by score (highest first):: + + { + "doc_name": str, + "title": str, + "start_index": int, + "end_index": int, + "score": int, # 1–3 + "match_on": list, # ["title"] / ["summary"] / both + "node_id": str, # if present + "summary": str, # if present + } + + Raises: + ValueError: if *query* is empty. + """ + if not query or not query.strip(): + raise ValueError("query must not be empty.") + + q = query.lower() + scored = [] + + for node in self._get_all_nodes(): + score = 0 + match_on = [] + + if q in node["title"].lower(): + score += 2 + match_on.append("title") + if q in node.get("summary", "").lower(): + score += 1 + match_on.append("summary") + + if score > 0: + result = { + "doc_name": node["doc_name"], + "title": node["title"], + "start_index": node["start_index"], + "end_index": node["end_index"], + "score": score, + "match_on": match_on, + } + if "node_id" in node: + result["node_id"] = node["node_id"] + if "summary" in node: + result["summary"] = node["summary"] + scored.append(result) + + scored.sort(key=lambda x: x["score"], reverse=True) + return scored[:top_k] + + def get_document(self, doc_name: str) -> dict: + """Retrieve the full structure dict for a specific document. + + Args: + doc_name: The ``doc_name`` as stored in the index + (e.g. ``"earnings_report.pdf"``). + + Returns: + The full structure dict: ``{doc_name, structure, doc_description}``. + + Raises: + KeyError: if *doc_name* is not found in the knowledge base. + """ + for doc_entry in self._index.get("documents", []): + if doc_entry["doc_name"] == doc_name: + return self._load_structure(doc_entry) + raise KeyError(f"Document '{doc_name}' not found in the knowledge base.") + + def list_documents(self) -> List[str]: + """Return the names of all successfully processed documents. + + Returns: + List of ``doc_name`` strings. + """ + return [ + d["doc_name"] + for d in self._index.get("documents", []) + if d.get("status") == "success" + ] diff --git a/run_pageindex.py b/run_pageindex.py index 107024505..04d170c19 100644 --- a/run_pageindex.py +++ b/run_pageindex.py @@ -3,12 +3,15 @@ import json from pageindex import * from pageindex.page_index_md import md_to_tree +from pageindex.batch_processor import process_batch if __name__ == "__main__": # Set up argument parser parser = argparse.ArgumentParser(description='Process PDF or Markdown document and generate structure') parser.add_argument('--pdf_path', type=str, help='Path to the PDF file') parser.add_argument('--md_path', type=str, help='Path to the Markdown file') + parser.add_argument('--batch-dir', type=str, + help='Directory of PDF files to process as a batch (creates kb_index.json)') parser.add_argument('--model', type=str, default='gpt-4o-2024-11-20', help='Model to use') @@ -36,14 +39,47 @@ parser.add_argument('--summary-token-threshold', type=int, default=200, help='Token threshold for generating summaries (markdown only)') args = parser.parse_args() + + # Validate that exactly one mode is specified + modes = [bool(args.pdf_path), bool(args.md_path), bool(args.batch_dir)] + if sum(modes) == 0: + raise ValueError("One of --pdf_path, --md_path, or --batch-dir must be specified.") + if sum(modes) > 1: + raise ValueError("Only one of --pdf_path, --md_path, or --batch-dir can be specified.") - # Validate that exactly one file type is specified - if not args.pdf_path and not args.md_path: - raise ValueError("Either --pdf_path or --md_path must be specified") - if args.pdf_path and args.md_path: - raise ValueError("Only one of --pdf_path or --md_path can be specified") - - if args.pdf_path: + if args.batch_dir: + # Batch mode — process all PDFs in a directory + if not os.path.isdir(args.batch_dir): + raise ValueError(f"Directory not found: {args.batch_dir}") + + pdf_files = sorted( + os.path.join(args.batch_dir, f) + for f in os.listdir(args.batch_dir) + if f.lower().endswith(".pdf") + ) + if not pdf_files: + raise ValueError(f"No PDF files found in: {args.batch_dir}") + + print(f"Found {len(pdf_files)} PDF(s) in '{args.batch_dir}'. Processing...") + output_dir = './results' + summary = process_batch( + doc_paths=pdf_files, + output_dir=output_dir, + model=args.model, + if_add_node_id=args.if_add_node_id, + if_add_node_summary=args.if_add_node_summary, + if_add_doc_description=args.if_add_doc_description, + if_add_node_text=args.if_add_node_text, + ) + print(f"Batch complete.") + print(f" Processed : {len(summary['processed'])} document(s)") + print(f" Failed : {len(summary['failed'])} document(s)") + if summary['failed']: + for entry in summary['failed']: + print(f" - {entry['doc']}: {entry['error']}") + print(f" Index : {summary['kb_index_path']}") + + elif args.pdf_path: # Validate PDF file if not args.pdf_path.lower().endswith('.pdf'): raise ValueError("PDF file must have .pdf extension") diff --git a/tests/test_batch_processor.py b/tests/test_batch_processor.py new file mode 100644 index 000000000..f51b24b8a --- /dev/null +++ b/tests/test_batch_processor.py @@ -0,0 +1,491 @@ +""" +Tests for pageindex/batch_processor.py + +All tests use unittest.mock and tempfile — no API key required. +""" +import json +import os +import tempfile +import unittest +from unittest.mock import MagicMock, patch + + +# Fake document structure returned by a mocked page_index_main +def _fake_result(doc_name, description=None): + result = { + "doc_name": doc_name, + "structure": [ + { + "title": "Introduction", + "start_index": 1, + "end_index": 2, + "node_id": "0000", + "summary": "An overview of the document covering revenue growth.", + "nodes": [ + { + "title": "Background", + "start_index": 1, + "end_index": 1, + "node_id": "0001", + "summary": "Historical context and market analysis.", + } + ], + }, + { + "title": "Financial Results", + "start_index": 3, + "end_index": 5, + "node_id": "0002", + "summary": "Quarterly earnings showed strong performance.", + }, + ], + } + if description: + result["doc_description"] = description + return result + + +# --------------------------------------------------------------------------- +# process_batch — input validation +# --------------------------------------------------------------------------- + +class TestProcessBatchValidation(unittest.TestCase): + + def test_empty_list_raises(self): + from pageindex.batch_processor import process_batch + with self.assertRaises(ValueError, msg="Should raise on empty list"): + process_batch([]) + + def test_non_pdf_raises(self): + from pageindex.batch_processor import process_batch + with self.assertRaises(ValueError, msg="Should raise on non-PDF file"): + process_batch(["document.md"]) + + def test_mixed_pdf_and_non_pdf_raises(self): + from pageindex.batch_processor import process_batch + with self.assertRaises(ValueError): + process_batch(["report.pdf", "notes.docx"]) + + +# --------------------------------------------------------------------------- +# process_batch — success path +# --------------------------------------------------------------------------- + +class TestProcessBatchSuccess(unittest.TestCase): + + def setUp(self): + self.tmp = tempfile.TemporaryDirectory() + # Create two fake PDF files (content irrelevant — page_index_main is mocked) + self.pdf1 = os.path.join(self.tmp.name, "report.pdf") + self.pdf2 = os.path.join(self.tmp.name, "earnings.pdf") + for p in (self.pdf1, self.pdf2): + open(p, "w").close() + + def tearDown(self): + self.tmp.cleanup() + + def test_two_docs_processed_successfully(self): + from pageindex.batch_processor import process_batch + + side_effects = [ + _fake_result("report.pdf", description="Annual report."), + _fake_result("earnings.pdf"), + ] + + with patch("pageindex.batch_processor.page_index_main", + side_effect=side_effects): + summary = process_batch( + [self.pdf1, self.pdf2], + output_dir=self.tmp.name, + ) + + self.assertEqual(summary["processed"], ["report.pdf", "earnings.pdf"]) + self.assertEqual(summary["failed"], []) + + def test_structure_files_written(self): + from pageindex.batch_processor import process_batch + + with patch("pageindex.batch_processor.page_index_main", + side_effect=[_fake_result("report.pdf"), _fake_result("earnings.pdf")]): + process_batch([self.pdf1, self.pdf2], output_dir=self.tmp.name) + + self.assertTrue(os.path.isfile(os.path.join(self.tmp.name, "report_structure.json"))) + self.assertTrue(os.path.isfile(os.path.join(self.tmp.name, "earnings_structure.json"))) + + def test_kb_index_written(self): + from pageindex.batch_processor import process_batch + + with patch("pageindex.batch_processor.page_index_main", + side_effect=[_fake_result("report.pdf"), _fake_result("earnings.pdf")]): + summary = process_batch([self.pdf1, self.pdf2], output_dir=self.tmp.name) + + kb_path = summary["kb_index_path"] + self.assertTrue(os.path.isfile(kb_path)) + + with open(kb_path) as f: + kb = json.load(f) + + self.assertEqual(kb["total_documents"], 2) + self.assertEqual(len(kb["documents"]), 2) + doc_names = [d["doc_name"] for d in kb["documents"]] + self.assertIn("report.pdf", doc_names) + self.assertIn("earnings.pdf", doc_names) + self.assertTrue(all(d["status"] == "success" for d in kb["documents"])) + + def test_doc_description_stored_in_index(self): + from pageindex.batch_processor import process_batch + + with patch("pageindex.batch_processor.page_index_main", + return_value=_fake_result("report.pdf", description="A great report.")): + process_batch([self.pdf1], output_dir=self.tmp.name) + + with open(os.path.join(self.tmp.name, "kb_index.json")) as f: + kb = json.load(f) + + self.assertEqual(kb["documents"][0]["doc_description"], "A great report.") + + def test_structure_json_content_is_correct(self): + from pageindex.batch_processor import process_batch + + fake = _fake_result("report.pdf") + with patch("pageindex.batch_processor.page_index_main", return_value=fake): + process_batch([self.pdf1], output_dir=self.tmp.name) + + with open(os.path.join(self.tmp.name, "report_structure.json")) as f: + saved = json.load(f) + + self.assertEqual(saved["doc_name"], "report.pdf") + self.assertEqual(len(saved["structure"]), 2) + + +# --------------------------------------------------------------------------- +# process_batch — error handling +# --------------------------------------------------------------------------- + +class TestProcessBatchErrors(unittest.TestCase): + + def setUp(self): + self.tmp = tempfile.TemporaryDirectory() + self.good_pdf = os.path.join(self.tmp.name, "good.pdf") + open(self.good_pdf, "w").close() + self.missing_pdf = os.path.join(self.tmp.name, "missing.pdf") # not created + + def tearDown(self): + self.tmp.cleanup() + + def test_missing_file_goes_to_failed(self): + from pageindex.batch_processor import process_batch + + with patch("pageindex.batch_processor.page_index_main", + return_value=_fake_result("good.pdf")): + summary = process_batch( + [self.good_pdf, self.missing_pdf], + output_dir=self.tmp.name, + ) + + self.assertIn("good.pdf", summary["processed"]) + self.assertEqual(len(summary["failed"]), 1) + self.assertIn("missing.pdf", summary["failed"][0]["doc"]) + self.assertEqual(summary["failed"][0]["error"], "File not found.") + + def test_processing_exception_goes_to_failed(self): + from pageindex.batch_processor import process_batch + + with patch("pageindex.batch_processor.page_index_main", + side_effect=RuntimeError("LLM error")): + summary = process_batch([self.good_pdf], output_dir=self.tmp.name) + + self.assertEqual(summary["processed"], []) + self.assertEqual(len(summary["failed"]), 1) + self.assertIn("LLM error", summary["failed"][0]["error"]) + + def test_kb_index_still_written_on_partial_failure(self): + from pageindex.batch_processor import process_batch + + good_pdf2 = os.path.join(self.tmp.name, "good2.pdf") + open(good_pdf2, "w").close() + + with patch("pageindex.batch_processor.page_index_main", + side_effect=[_fake_result("good.pdf"), RuntimeError("crash")]): + summary = process_batch( + [self.good_pdf, good_pdf2], + output_dir=self.tmp.name, + ) + + self.assertTrue(os.path.isfile(summary["kb_index_path"])) + with open(summary["kb_index_path"]) as f: + kb = json.load(f) + # Only the successful doc appears in the index + self.assertEqual(kb["total_documents"], 1) + self.assertEqual(len(summary["processed"]), 1) + self.assertEqual(len(summary["failed"]), 1) + + def test_output_dir_is_created_if_absent(self): + from pageindex.batch_processor import process_batch + + new_dir = os.path.join(self.tmp.name, "new_subdir") + self.assertFalse(os.path.exists(new_dir)) + + with patch("pageindex.batch_processor.page_index_main", + return_value=_fake_result("good.pdf")): + process_batch([self.good_pdf], output_dir=new_dir) + + self.assertTrue(os.path.isdir(new_dir)) + + +# --------------------------------------------------------------------------- +# KnowledgeBaseSearch — setup helpers +# --------------------------------------------------------------------------- + +def _write_kb(tmp_dir, docs): + """Write fake structure files + kb_index.json into tmp_dir. + + docs: list of (doc_name, structure_dict) + """ + kb_docs = [] + for doc_name, structure in docs: + stem = os.path.splitext(doc_name)[0] + out_file = f"{stem}_structure.json" + with open(os.path.join(tmp_dir, out_file), "w") as f: + json.dump({"doc_name": doc_name, "structure": structure}, f) + kb_docs.append({ + "doc_name": doc_name, + "doc_description": "", + "output_file": out_file, + "status": "success", + }) + + kb_index = { + "created_at": "2026-03-01T00:00:00+00:00", + "total_documents": len(kb_docs), + "documents": kb_docs, + } + kb_path = os.path.join(tmp_dir, "kb_index.json") + with open(kb_path, "w") as f: + json.dump(kb_index, f) + return kb_path + + +_STRUCTURE_A = [ + { + "title": "Revenue Growth", + "start_index": 1, + "end_index": 3, + "node_id": "0000", + "summary": "Quarterly revenue increased by 20% driven by new markets.", + "nodes": [ + { + "title": "Market Expansion", + "start_index": 2, + "end_index": 3, + "node_id": "0001", + "summary": "Expansion into Asia and Europe.", + } + ], + } +] + +_STRUCTURE_B = [ + { + "title": "Risk Factors", + "start_index": 1, + "end_index": 2, + "node_id": "0000", + "summary": "Operational and market risks are discussed.", + } +] + + +# --------------------------------------------------------------------------- +# KnowledgeBaseSearch — initialisation +# --------------------------------------------------------------------------- + +class TestKnowledgeBaseSearchInit(unittest.TestCase): + + def test_raises_if_index_missing(self): + from pageindex.batch_processor import KnowledgeBaseSearch + with self.assertRaises(FileNotFoundError): + KnowledgeBaseSearch("/nonexistent/kb_index.json") + + def test_loads_successfully(self): + from pageindex.batch_processor import KnowledgeBaseSearch + with tempfile.TemporaryDirectory() as tmp: + kb_path = _write_kb(tmp, [("doc_a.pdf", _STRUCTURE_A)]) + kb = KnowledgeBaseSearch(kb_path) + self.assertIsNotNone(kb) + + +# --------------------------------------------------------------------------- +# KnowledgeBaseSearch.list_documents +# --------------------------------------------------------------------------- + +class TestListDocuments(unittest.TestCase): + + def test_returns_all_successful_docs(self): + from pageindex.batch_processor import KnowledgeBaseSearch + with tempfile.TemporaryDirectory() as tmp: + kb_path = _write_kb(tmp, [ + ("doc_a.pdf", _STRUCTURE_A), + ("doc_b.pdf", _STRUCTURE_B), + ]) + kb = KnowledgeBaseSearch(kb_path) + names = kb.list_documents() + self.assertIn("doc_a.pdf", names) + self.assertIn("doc_b.pdf", names) + self.assertEqual(len(names), 2) + + def test_excludes_failed_docs(self): + from pageindex.batch_processor import KnowledgeBaseSearch + with tempfile.TemporaryDirectory() as tmp: + # Write one good doc manually, then inject a failed entry into index + kb_path = _write_kb(tmp, [("doc_a.pdf", _STRUCTURE_A)]) + with open(kb_path) as f: + kb_data = json.load(f) + kb_data["documents"].append({ + "doc_name": "bad.pdf", + "doc_description": "", + "output_file": "bad_structure.json", + "status": "failed", + }) + with open(kb_path, "w") as f: + json.dump(kb_data, f) + + kb = KnowledgeBaseSearch(kb_path) + names = kb.list_documents() + + self.assertNotIn("bad.pdf", names) + self.assertIn("doc_a.pdf", names) + + +# --------------------------------------------------------------------------- +# KnowledgeBaseSearch.get_document +# --------------------------------------------------------------------------- + +class TestGetDocument(unittest.TestCase): + + def test_returns_correct_structure(self): + from pageindex.batch_processor import KnowledgeBaseSearch + with tempfile.TemporaryDirectory() as tmp: + kb_path = _write_kb(tmp, [("doc_a.pdf", _STRUCTURE_A)]) + kb = KnowledgeBaseSearch(kb_path) + doc = kb.get_document("doc_a.pdf") + + self.assertEqual(doc["doc_name"], "doc_a.pdf") + self.assertEqual(len(doc["structure"]), 1) + self.assertEqual(doc["structure"][0]["title"], "Revenue Growth") + + def test_raises_key_error_for_unknown_doc(self): + from pageindex.batch_processor import KnowledgeBaseSearch + with tempfile.TemporaryDirectory() as tmp: + kb_path = _write_kb(tmp, [("doc_a.pdf", _STRUCTURE_A)]) + kb = KnowledgeBaseSearch(kb_path) + with self.assertRaises(KeyError): + kb.get_document("nonexistent.pdf") + + +# --------------------------------------------------------------------------- +# KnowledgeBaseSearch.search +# --------------------------------------------------------------------------- + +class TestSearch(unittest.TestCase): + + def setUp(self): + self.tmp = tempfile.TemporaryDirectory() + self.kb_path = _write_kb(self.tmp.name, [ + ("doc_a.pdf", _STRUCTURE_A), + ("doc_b.pdf", _STRUCTURE_B), + ]) + + def tearDown(self): + self.tmp.cleanup() + + def test_empty_query_raises(self): + from pageindex.batch_processor import KnowledgeBaseSearch + kb = KnowledgeBaseSearch(self.kb_path) + with self.assertRaises(ValueError): + kb.search("") + with self.assertRaises(ValueError): + kb.search(" ") + + def test_title_match_found(self): + from pageindex.batch_processor import KnowledgeBaseSearch + kb = KnowledgeBaseSearch(self.kb_path) + results = kb.search("Revenue Growth") + self.assertTrue(len(results) > 0) + self.assertEqual(results[0]["title"], "Revenue Growth") + self.assertIn("title", results[0]["match_on"]) + + def test_summary_match_found(self): + from pageindex.batch_processor import KnowledgeBaseSearch + kb = KnowledgeBaseSearch(self.kb_path) + # "markets" appears only in a summary, not in a title + results = kb.search("new markets") + self.assertTrue(len(results) > 0) + self.assertIn("summary", results[0]["match_on"]) + + def test_case_insensitive(self): + from pageindex.batch_processor import KnowledgeBaseSearch + kb = KnowledgeBaseSearch(self.kb_path) + upper = kb.search("REVENUE GROWTH") + lower = kb.search("revenue growth") + self.assertEqual(len(upper), len(lower)) + + def test_no_results_returns_empty_list(self): + from pageindex.batch_processor import KnowledgeBaseSearch + kb = KnowledgeBaseSearch(self.kb_path) + results = kb.search("xyzzy_no_match_ever") + self.assertEqual(results, []) + + def test_title_match_scores_higher_than_summary_only(self): + from pageindex.batch_processor import KnowledgeBaseSearch + kb = KnowledgeBaseSearch(self.kb_path) + # "expansion" appears in both title ("Market Expansion") and summary + results = kb.search("expansion") + title_hits = [r for r in results if "title" in r["match_on"]] + summary_only_hits = [r for r in results if r["match_on"] == ["summary"]] + if title_hits and summary_only_hits: + self.assertGreater(title_hits[0]["score"], summary_only_hits[0]["score"]) + + def test_top_k_limits_results(self): + from pageindex.batch_processor import KnowledgeBaseSearch + kb = KnowledgeBaseSearch(self.kb_path) + # "e" appears everywhere — many matches expected + results = kb.search("e", top_k=2) + self.assertLessEqual(len(results), 2) + + def test_result_contains_required_fields(self): + from pageindex.batch_processor import KnowledgeBaseSearch + kb = KnowledgeBaseSearch(self.kb_path) + results = kb.search("revenue") + self.assertTrue(len(results) > 0) + for r in results: + self.assertIn("doc_name", r) + self.assertIn("title", r) + self.assertIn("start_index", r) + self.assertIn("end_index", r) + self.assertIn("score", r) + self.assertIn("match_on", r) + + def test_searches_across_multiple_documents(self): + from pageindex.batch_processor import KnowledgeBaseSearch + kb = KnowledgeBaseSearch(self.kb_path) + # "risk" only in doc_b, "revenue" only in doc_a + risk_results = kb.search("risk") + revenue_results = kb.search("revenue") + risk_docs = {r["doc_name"] for r in risk_results} + revenue_docs = {r["doc_name"] for r in revenue_results} + self.assertIn("doc_b.pdf", risk_docs) + self.assertIn("doc_a.pdf", revenue_docs) + self.assertNotIn("doc_b.pdf", revenue_docs) + + def test_nested_nodes_are_searchable(self): + from pageindex.batch_processor import KnowledgeBaseSearch + kb = KnowledgeBaseSearch(self.kb_path) + # "Market Expansion" is a child node inside doc_a + results = kb.search("Market Expansion") + self.assertTrue(len(results) > 0) + self.assertEqual(results[0]["title"], "Market Expansion") + + +if __name__ == "__main__": + unittest.main(verbosity=2)