diff --git a/apps/worker/app/services/document_parser/formats/pdf/parser.py b/apps/worker/app/services/document_parser/formats/pdf/parser.py index 7168cf32..ab1de83a 100755 --- a/apps/worker/app/services/document_parser/formats/pdf/parser.py +++ b/apps/worker/app/services/document_parser/formats/pdf/parser.py @@ -1,7 +1,5 @@ # pyright: reportArgumentType=false import os -import re -import shutil from app.services.document_parser.formats.markdown.parser import parse_md from app.services.document_parser.orchestration.oversized_pdf_policy import ( @@ -12,7 +10,6 @@ from loguru import logger from shared.core.config import settings -from shared.services.storage.job_file_storage import JobFileStorage def parse_pdfs( @@ -23,7 +20,6 @@ def parse_pdfs( profile=None, relative_root=None, s3_key=None, - job_id=None, ): route = profile.route if profile else "standard" base_llm_paras.update({"doc_name": filename}) @@ -47,7 +43,6 @@ def parse_pdfs( return _parse_oversized_pdf( pdf_path, filename, output_dir, base_llm_paras, profile=profile, relative_root=relative_root, s3_key=s3_key, - job_id=job_id, ) except Exception as exc: logger.exception( @@ -79,7 +74,7 @@ def parse_pdfs( def _parse_oversized_pdf( pdf_path, filename, output_dir, base_llm_paras, - profile=None, relative_root=None, s3_key=None, job_id=None, + profile=None, relative_root=None, s3_key=None, ): """Handle PDFs exceeding MinerU's page limit via shard-first hierarchy. @@ -106,257 +101,176 @@ def _parse_oversized_pdf( split_pdf, ) - doc_agent_job_id = job_id or base_llm_paras.get("doc_name", filename) - work_dir: str | None = None - temp_shard_s3_keys: list[str] = [] - - try: - # 1. Run doc_agent to get full anatomy map (shard plan + TOC info) - with stage_timer("pdf.doc_agent", filename=filename): - anatomy = run_doc_agent( - pdf_path, - job_id=doc_agent_job_id, - output_dir=output_dir, - ) + job_id = base_llm_paras.get("doc_name", filename) - agent_shards = anatomy.shard_plan.shards - - # 2. Extract TOC info from anatomy for page exclusion and heading constraint - toc_pages: set[int] = set() - toc_hierarchies = None - if anatomy.toc_result and anatomy.toc_result.toc_pages: - toc_pages = set(anatomy.toc_result.toc_pages) - toc_hierarchies = anatomy.toc_hierarchies - logger.info( - f"📌 DOC_AGENT TOC detected: {len(toc_pages)} pages to exclude " - f"({sorted(toc_pages)}), " - f"{len(toc_hierarchies) if toc_hierarchies else 0} hierarchy regions" - ) + # 1. Run doc_agent to get full anatomy map (shard plan + TOC info) + with stage_timer("pdf.doc_agent", filename=filename): + anatomy = run_doc_agent(pdf_path, job_id=job_id, output_dir=output_dir) - # 3. Bin-pack agent shards to maximize MinerU page limit - merged_shards = bin_pack_shards( - agent_shards, - max_pages=settings.MAX_PDF_PAGE_LIMIT, - ) + agent_shards = anatomy.shard_plan.shards + + # 2. Extract TOC info from anatomy for page exclusion and heading constraint + toc_pages: set[int] = set() + toc_hierarchies = None + if anatomy.toc_result and anatomy.toc_result.toc_pages: + toc_pages = set(anatomy.toc_result.toc_pages) + toc_hierarchies = anatomy.toc_hierarchies logger.info( - f"📦 Bin-packed {len(agent_shards)} agent shards → " - f"{len(merged_shards)} MinerU shards" + f"📌 DOC_AGENT TOC detected: {len(toc_pages)} pages to exclude " + f"({sorted(toc_pages)}), " + f"{len(toc_hierarchies) if toc_hierarchies else 0} hierarchy regions" ) - for ms in merged_shards: - logger.info( - f" shard_{ms.shard_index}: pages {ms.page_start}-{ms.page_end} " - f"({ms.page_count} pages)" - ) - # 4. Physically split PDF (exclude TOC pages if detected) - work_dir = os.path.join(output_dir, "_shards") - os.makedirs(work_dir, exist_ok=True) - with stage_timer("pdf.split", filename=filename): - shard_pdf_paths, _page_remap = split_pdf( - pdf_path, merged_shards, work_dir, - exclude_pages=toc_pages if toc_pages else None, - ) - - temp_shard_s3_keys = [ - _build_temp_shard_s3_key( - source_s3_key=s3_key, - job_id=job_id, - filename=filename, - shard_index=shard_index, - ) - for shard_index, _shard_pdf_path in enumerate(shard_pdf_paths) - ] - - # 5. Parse each shard via MinerU (parallel) - shard_output_dirs: list[str | None] = [None] * len(shard_pdf_paths) - concurrency = settings.MINERU_SHARD_CONCURRENCY - - def _parse_single_shard(shard_idx, shard_pdf): - assert work_dir is not None - shard_out = os.path.join(work_dir, f"shard_{shard_idx}_output") - os.makedirs(shard_out, exist_ok=True) - shard_filename = ( - f"{os.path.splitext(filename)[0]}_shard{shard_idx}.pdf" - ) - shard_s3_key = temp_shard_s3_keys[shard_idx] - logger.info( - f" 🔄 MinerU shard_{shard_idx}: parsing via S3 URL " - f"({shard_s3_key})" - ) - parse_via_full(shard_pdf, shard_filename, shard_out, s3_key=shard_s3_key) - return shard_out - - with stage_timer( - "pdf.mineru_parallel", filename=filename, shard_count=len(shard_pdf_paths) - ): - with ThreadPoolExecutor(max_workers=concurrency) as executor: - futures = { - executor.submit(_parse_single_shard, i, shard_pdf_path): i - for i, shard_pdf_path in enumerate(shard_pdf_paths) - } - for future in as_completed(futures): - idx = futures[future] - shard_output_dirs[idx] = future.result() - - # 6. Per-shard heading prediction (parallel) - @dataclass - class ShardHeadingResult: - shard_index: int - lines_with_heading: list[str] - heading_count: int - - smart_parse = base_llm_paras.get("smart_title_parse", True) - hierarchy_model_name = ( - base_llm_paras.get("hierarchy_model_name") - or base_llm_paras.get("model_name", settings.NORMOL_MODEL) + # 3. Bin-pack agent shards to maximize MinerU page limit + merged_shards = bin_pack_shards(agent_shards, max_pages=settings.MAX_PDF_PAGE_LIMIT) + logger.info( + f"📦 Bin-packed {len(agent_shards)} agent shards → " + f"{len(merged_shards)} MinerU shards" + ) + for ms in merged_shards: + logger.info( + f" shard_{ms.shard_index}: pages {ms.page_start}-{ms.page_end} " + f"({ms.page_count} pages)" ) - def _predict_shard_headings(shard_idx: int, shard_out_dir: str) -> ShardHeadingResult: - """Run full heading prediction pipeline on a single shard's full.md.""" - md_path = os.path.join(shard_out_dir, "full.md") - if not os.path.exists(md_path): - raise FileNotFoundError(f"shard_{shard_idx}: full.md not found") - - with open(md_path, "r", encoding="utf-8") as f: - md_lines = f.readlines() - md_lines = [line.strip() for line in md_lines if line.strip() != ""] - md_lines = merge_html_tables(md_lines) - - # TOC context: first TOC shared by all shards; subsequent TOCs assigned - # by page boundary. For simplicity, all TOCs are passed since pred_titles - # only matches headings actually present in this shard's content. - shard_toc = toc_hierarchies - - lines_with_heading = eval_md_headings( - md_lines, - source_type="md", - toc_hierarchies=shard_toc, - smart_parse=smart_parse, - model_name=hierarchy_model_name, - output_dir=shard_out_dir, - layout_json_path=( - os.path.join(shard_out_dir, "layout.json") - if os.path.exists(os.path.join(shard_out_dir, "layout.json")) - else None - ), - ) + # 4. Physically split PDF (exclude TOC pages if detected) + work_dir = os.path.join(output_dir, "_shards") + os.makedirs(work_dir, exist_ok=True) + with stage_timer("pdf.split", filename=filename): + shard_pdf_paths, _page_remap = split_pdf( + pdf_path, merged_shards, work_dir, + exclude_pages=toc_pages if toc_pages else None, + ) - heading_count = sum(1 for line in lines_with_heading if line.startswith("#")) - logger.info( - f" ✅ shard_{shard_idx}: {heading_count} headings identified " - f"from {len(lines_with_heading)} lines" - ) - return ShardHeadingResult( - shard_index=shard_idx, - lines_with_heading=lines_with_heading, - heading_count=heading_count, - ) + # 5. Parse each shard via MinerU (parallel) + shard_output_dirs: list[str | None] = [None] * len(shard_pdf_paths) + concurrency = settings.MINERU_SHARD_CONCURRENCY - shard_heading_results: list[ShardHeadingResult | None] = [None] * len(shard_output_dirs) - - with stage_timer( - "pdf.shard_headings", filename=filename, shard_count=len(shard_output_dirs) - ): - with ThreadPoolExecutor(max_workers=concurrency) as executor: - futures = { - executor.submit(_predict_shard_headings, i, shard_dir): i - for i, shard_dir in enumerate(shard_output_dirs) - if shard_dir is not None - } - for future in as_completed(futures): - idx = futures[future] - shard_heading_results[idx] = future.result() - - # 7. Merge: concatenate lines_with_heading (in shard order) + merge images - complete_heading_results: list[ShardHeadingResult] = [] - for index, result in enumerate(shard_heading_results): - if result is None: - raise RuntimeError(f"Missing heading result for shard_{index}") - complete_heading_results.append(result) - - all_lines_with_heading: list[str] = merge_shard_lines( - [result.lines_with_heading for result in complete_heading_results] - ) - total_headings = sum( - 1 for line in all_lines_with_heading if line.startswith("#") + def _parse_single_shard(shard_idx, shard_pdf): + shard_out = os.path.join(work_dir, f"shard_{shard_idx}_output") + os.makedirs(shard_out, exist_ok=True) + shard_filename = ( + f"{os.path.splitext(filename)[0]}_shard{shard_idx}.pdf" ) - logger.info( - f"📎 Merged {len(complete_heading_results)} shards: " - f"{len(all_lines_with_heading)} lines, {total_headings} headings" + f" 🔄 MinerU shard_{shard_idx}: parsing" ) + parse_via_full(shard_pdf, shard_filename, shard_out, s3_key=None) + return shard_out + + with stage_timer( + "pdf.mineru_parallel", filename=filename, shard_count=len(shard_pdf_paths) + ): + with ThreadPoolExecutor(max_workers=concurrency) as executor: + futures = { + executor.submit(_parse_single_shard, i, shard_pdf_path): i + for i, shard_pdf_path in enumerate(shard_pdf_paths) + } + for future in as_completed(futures): + idx = futures[future] + shard_output_dirs[idx] = future.result() + + # 6. Per-shard heading prediction (parallel) + @dataclass + class ShardHeadingResult: + shard_index: int + lines_with_heading: list[str] + heading_count: int + + smart_parse = base_llm_paras.get("smart_title_parse", True) + hierarchy_model_name = ( + base_llm_paras.get("hierarchy_model_name") + or base_llm_paras.get("model_name", settings.NORMOL_MODEL) + ) - with stage_timer("pdf.merge_images", filename=filename): - merge_images(shard_output_dirs, output_dir) + def _predict_shard_headings(shard_idx: int, shard_out_dir: str) -> ShardHeadingResult: + """Run full heading prediction pipeline on a single shard's full.md.""" + md_path = os.path.join(shard_out_dir, "full.md") + if not os.path.exists(md_path): + raise FileNotFoundError(f"shard_{shard_idx}: full.md not found") - logger.info("✅ Shard-first hierarchy complete, entering parse_md Phase B") + with open(md_path, "r", encoding="utf-8") as f: + md_lines = f.readlines() + md_lines = [line.strip() for line in md_lines if line.strip() != ""] + md_lines = merge_html_tables(md_lines) - # 8. parse_md Phase B only (skip TOC detection + heading prediction) - with stage_timer("pdf.parse_md", filename=filename): - return parse_md( - output_dir, - source_type="md", - base_llm_paras=base_llm_paras, - relative_root=relative_root, - lines_with_heading=all_lines_with_heading, - ) - finally: - _cleanup_temp_shard_s3_assets(temp_shard_s3_keys) - _cleanup_local_shard_workspace(work_dir) - - -def _build_temp_shard_s3_key( - *, - source_s3_key: str | None, - job_id: str | None, - filename: str, - shard_index: int, -) -> str: - owner_segment = _sanitize_temp_storage_segment( - job_id or _source_key_stem(source_s3_key) or os.path.splitext(filename)[0] - ) - return f"tmp/mineru-shards/{owner_segment}/shard_{shard_index}.pdf" + # TOC context: first TOC shared by all shards; subsequent TOCs assigned + # by page boundary. For simplicity, all TOCs are passed since pred_titles + # only matches headings actually present in this shard's content. + shard_toc = toc_hierarchies + lines_with_heading = eval_md_headings( + md_lines, + source_type="md", + toc_hierarchies=shard_toc, + smart_parse=smart_parse, + model_name=hierarchy_model_name, + output_dir=shard_out_dir, + layout_json_path=( + os.path.join(shard_out_dir, "layout.json") + if os.path.exists(os.path.join(shard_out_dir, "layout.json")) + else None + ), + ) -def _source_key_stem(source_s3_key: str | None) -> str | None: - if not source_s3_key: - return None - key_name = os.path.basename(source_s3_key.rstrip("/")) - stem, _extension = os.path.splitext(key_name) - return stem or None + heading_count = sum(1 for line in lines_with_heading if line.startswith("#")) + logger.info( + f" ✅ shard_{shard_idx}: {heading_count} headings identified " + f"from {len(lines_with_heading)} lines" + ) + return ShardHeadingResult( + shard_index=shard_idx, + lines_with_heading=lines_with_heading, + heading_count=heading_count, + ) + shard_heading_results: list[ShardHeadingResult | None] = [None] * len(shard_output_dirs) + + with stage_timer( + "pdf.shard_headings", filename=filename, shard_count=len(shard_output_dirs) + ): + with ThreadPoolExecutor(max_workers=concurrency) as executor: + futures = { + executor.submit(_predict_shard_headings, i, shard_dir): i + for i, shard_dir in enumerate(shard_output_dirs) + if shard_dir is not None + } + for future in as_completed(futures): + idx = futures[future] + shard_heading_results[idx] = future.result() + + # 7. Merge: concatenate lines_with_heading (in shard order) + merge images + complete_heading_results: list[ShardHeadingResult] = [] + for index, result in enumerate(shard_heading_results): + if result is None: + raise RuntimeError(f"Missing heading result for shard_{index}") + complete_heading_results.append(result) + + all_lines_with_heading: list[str] = merge_shard_lines( + [result.lines_with_heading for result in complete_heading_results] + ) + total_headings = sum( + 1 for line in all_lines_with_heading if line.startswith("#") + ) -def _sanitize_temp_storage_segment(value: object) -> str: - normalized = re.sub(r"[^A-Za-z0-9_.-]+", "-", str(value)).strip(".-") - return normalized or "document" + logger.info( + f"📎 Merged {len(complete_heading_results)} shards: " + f"{len(all_lines_with_heading)} lines, {total_headings} headings" + ) + with stage_timer("pdf.merge_images", filename=filename): + merge_images(shard_output_dirs, output_dir) -def _cleanup_temp_shard_s3_assets(s3_keys: list[str]) -> None: - if not s3_keys: - return + logger.info("✅ Shard-first hierarchy complete, entering parse_md Phase B") - storage = JobFileStorage() - for s3_key in s3_keys: - try: - deleted = storage.delete_upload_file(s3_key) - if deleted: - logger.info(f"Deleted temporary MinerU shard S3 object: {s3_key}") - else: - logger.debug(f"Temporary MinerU shard S3 object was absent: {s3_key}") - except Exception as exc: - logger.warning( - f"Failed to delete temporary MinerU shard S3 object " - f"{s3_key}: {exc}" - ) + # 8. parse_md Phase B only (skip TOC detection + heading prediction) + with stage_timer("pdf.parse_md", filename=filename): + return parse_md( + output_dir, + source_type="md", + base_llm_paras=base_llm_paras, + relative_root=relative_root, + lines_with_heading=all_lines_with_heading, + ) -def _cleanup_local_shard_workspace(work_dir: str | None) -> None: - if not work_dir or not os.path.exists(work_dir): - return - try: - shutil.rmtree(work_dir) - logger.info(f"Deleted temporary MinerU shard workspace: {work_dir}") - except Exception as exc: - logger.warning( - f"Failed to delete temporary MinerU shard workspace {work_dir}: {exc}" - ) diff --git a/apps/worker/app/services/document_parser/orchestration/format_adapters.py b/apps/worker/app/services/document_parser/orchestration/format_adapters.py index e6af2f8e..6dc3409c 100644 --- a/apps/worker/app/services/document_parser/orchestration/format_adapters.py +++ b/apps/worker/app/services/document_parser/orchestration/format_adapters.py @@ -85,7 +85,6 @@ def parse(self, session: ParseSession) -> ParseOutput: profile=session.profile, relative_root=session.relative_root, s3_key=session.s3_key, - job_id=session.job_id, ) return ParseOutput(output_dir=session.full_output_dir, parsed_df=parsed_df) diff --git a/apps/worker/tests/contract/test_parse_task_contract.py b/apps/worker/tests/contract/test_parse_task_contract.py index 355966cd..843825bc 100644 --- a/apps/worker/tests/contract/test_parse_task_contract.py +++ b/apps/worker/tests/contract/test_parse_task_contract.py @@ -558,13 +558,6 @@ class _Profile: page_count = 3 calls: dict[str, object] = {} - parse_s3_keys: list[str | None] = [] - deleted_s3_keys: list[str] = [] - - class _FakeJobFileStorage: - def delete_upload_file(self, storage_key: str) -> bool: - deleted_s3_keys.append(storage_key) - return True def _fake_run_doc_agent(pdf_path_arg: str, job_id: str, output_dir: str): calls["doc_agent"] = { @@ -617,7 +610,6 @@ def _fake_split_pdf(pdf_path_arg, shards, work_dir, exclude_pages=None): return paths, None def _fake_parse_via_full(shard_pdf, shard_filename, shard_out, s3_key=None): - parse_s3_keys.append(s3_key) shard_index = 0 if "shard0" in shard_filename else 1 lines_by_shard = { 0: ["# Chapter 1", "Shard one body."], @@ -656,7 +648,6 @@ def _identity_eval_md_headings( "app.services.document_parser.formats.markdown.parser.eval_md_headings", _identity_eval_md_headings, ) - monkeypatch.setattr(pdf_parser, "JobFileStorage", _FakeJobFileStorage) df = pdf_parser.parse_pdfs( str(pdf_path), @@ -673,20 +664,10 @@ def _identity_eval_md_headings( }, profile=_Profile(), relative_root="oversized.pdf", - s3_key="uploads/job-oversized.pdf", - job_id="job-oversized", ) assert calls["exclude_pages"] == {1} - assert calls["doc_agent"]["job_id"] == "job-oversized" assert len(calls["heading_dirs"]) == 2 - expected_s3_keys = [ - "tmp/mineru-shards/job-oversized/shard_0.pdf", - "tmp/mineru-shards/job-oversized/shard_1.pdf", - ] - assert parse_s3_keys == expected_s3_keys - assert deleted_s3_keys == expected_s3_keys - assert not (output_dir / "_shards").exists() assert list(df["type"]) == ["PTXT", "PTXT"] assert list(df["content"]) == ["Shard one body.", "Shard two body."] assert list(df["path"]) == [ diff --git a/packages/shared-python/shared/services/storage/job_file_storage.py b/packages/shared-python/shared/services/storage/job_file_storage.py index d0cc47b7..77d49e2b 100644 --- a/packages/shared-python/shared/services/storage/job_file_storage.py +++ b/packages/shared-python/shared/services/storage/job_file_storage.py @@ -157,24 +157,6 @@ def upload_source_file( bucket=self.uploads_bucket, ) - def delete_object( - self, - storage_key: str, - *, - bucket: str, - ) -> bool: - try: - return self.storage_adapter.delete_object(storage_key, bucket) - except Exception as exc: - raise StorageServiceException( - internal_message=f"Storage delete failed: {exc}", - operation="delete_object", - original_exception=exc, - ) from exc - - def delete_upload_file(self, storage_key: str) -> bool: - return self.delete_object(storage_key, bucket=self.uploads_bucket) - def upload_fileobj( self, file_obj: BinaryIO,