From b6916a5e72b46783fd9f9bede16d891eb24a4be8 Mon Sep 17 00:00:00 2001 From: suguanYang Date: Thu, 4 Jun 2026 12:19:30 +0000 Subject: [PATCH] fix: route oversized pdf shards through s3 --- .../document_parser/formats/pdf/parser.py | 409 +++++++++++------- .../orchestration/format_adapters.py | 1 + .../contract/test_parse_task_contract.py | 19 + .../services/storage/job_file_storage.py | 18 + 4 files changed, 285 insertions(+), 162 deletions(-) diff --git a/apps/worker/app/services/document_parser/formats/pdf/parser.py b/apps/worker/app/services/document_parser/formats/pdf/parser.py index 0d3bcbca..217c89f6 100755 --- a/apps/worker/app/services/document_parser/formats/pdf/parser.py +++ b/apps/worker/app/services/document_parser/formats/pdf/parser.py @@ -1,5 +1,7 @@ # pyright: reportArgumentType=false import os +import re +import shutil from app.services.document_parser.formats.markdown.parser import parse_md from app.services.document_parser.orchestration.oversized_pdf_policy import ( @@ -10,6 +12,7 @@ from loguru import logger from shared.core.config import settings +from shared.services.storage.job_file_storage import JobFileStorage def parse_pdfs( @@ -20,6 +23,7 @@ def parse_pdfs( profile=None, relative_root=None, s3_key=None, + job_id=None, ): route = profile.route if profile else "standard" base_llm_paras.update({"doc_name": filename}) @@ -43,6 +47,7 @@ def parse_pdfs( return _parse_oversized_pdf( pdf_path, filename, output_dir, base_llm_paras, profile=profile, relative_root=relative_root, s3_key=s3_key, + job_id=job_id, ) except Exception as exc: logger.exception( @@ -74,7 +79,7 @@ def parse_pdfs( def _parse_oversized_pdf( pdf_path, filename, output_dir, base_llm_paras, - profile=None, relative_root=None, s3_key=None, + profile=None, relative_root=None, s3_key=None, job_id=None, ): """Handle PDFs exceeding MinerU's page limit via shard-first hierarchy. @@ -101,187 +106,267 @@ def _parse_oversized_pdf( split_pdf, ) - job_id = base_llm_paras.get("doc_name", filename) - - # 1. Run doc_agent to get full anatomy map (shard plan + TOC info) - with stage_timer("pdf.doc_agent", filename=filename): - anatomy = run_doc_agent(pdf_path, job_id=job_id, output_dir=output_dir) + doc_agent_job_id = job_id or base_llm_paras.get("doc_name", filename) + work_dir: str | None = None + temp_shard_s3_keys: list[str] = [] + + try: + # 1. Run doc_agent to get full anatomy map (shard plan + TOC info) + with stage_timer("pdf.doc_agent", filename=filename): + anatomy = run_doc_agent( + pdf_path, + job_id=doc_agent_job_id, + output_dir=output_dir, + ) - agent_shards = anatomy.shard_plan.shards + agent_shards = anatomy.shard_plan.shards + + # 2. Extract TOC info from anatomy for page exclusion and heading constraint + toc_pages: set[int] = set() + toc_hierarchies = None + if anatomy.toc_result and anatomy.toc_result.toc_pages: + toc_pages = set(anatomy.toc_result.toc_pages) + toc_hierarchies = anatomy.toc_hierarchies + logger.info( + f"📌 DOC_AGENT TOC detected: {len(toc_pages)} pages to exclude " + f"({sorted(toc_pages)}), " + f"{len(toc_hierarchies) if toc_hierarchies else 0} hierarchy regions" + ) - # 2. Extract TOC info from anatomy for page exclusion and heading constraint - toc_pages: set[int] = set() - toc_hierarchies = None - if anatomy.toc_result and anatomy.toc_result.toc_pages: - toc_pages = set(anatomy.toc_result.toc_pages) - toc_hierarchies = anatomy.toc_hierarchies - logger.info( - f"📌 DOC_AGENT TOC detected: {len(toc_pages)} pages to exclude " - f"({sorted(toc_pages)}), " - f"{len(toc_hierarchies) if toc_hierarchies else 0} hierarchy regions" + # 3. Bin-pack agent shards to maximize MinerU page limit + merged_shards = bin_pack_shards( + agent_shards, + max_pages=settings.MAX_PDF_PAGE_LIMIT, ) - - # 3. Bin-pack agent shards to maximize MinerU page limit - merged_shards = bin_pack_shards(agent_shards, max_pages=settings.MAX_PDF_PAGE_LIMIT) - logger.info( - f"📦 Bin-packed {len(agent_shards)} agent shards → " - f"{len(merged_shards)} MinerU shards" - ) - for ms in merged_shards: logger.info( - f" shard_{ms.shard_index}: pages {ms.page_start}-{ms.page_end} " - f"({ms.page_count} pages)" - ) - - # 4. Physically split PDF (exclude TOC pages if detected) - work_dir = os.path.join(output_dir, "_shards") - os.makedirs(work_dir, exist_ok=True) - with stage_timer("pdf.split", filename=filename): - shard_pdf_paths, _page_remap = split_pdf( - pdf_path, merged_shards, work_dir, - exclude_pages=toc_pages if toc_pages else None, + f"📦 Bin-packed {len(agent_shards)} agent shards → " + f"{len(merged_shards)} MinerU shards" ) + for ms in merged_shards: + logger.info( + f" shard_{ms.shard_index}: pages {ms.page_start}-{ms.page_end} " + f"({ms.page_count} pages)" + ) - # 5. Parse each shard via MinerU (parallel) - shard_output_dirs: list[str | None] = [None] * len(shard_pdf_paths) - concurrency = settings.MINERU_SHARD_CONCURRENCY + # 4. Physically split PDF (exclude TOC pages if detected) + work_dir = os.path.join(output_dir, "_shards") + os.makedirs(work_dir, exist_ok=True) + with stage_timer("pdf.split", filename=filename): + shard_pdf_paths, _page_remap = split_pdf( + pdf_path, merged_shards, work_dir, + exclude_pages=toc_pages if toc_pages else None, + ) - def _parse_single_shard(shard_idx, shard_pdf): - shard_out = os.path.join(work_dir, f"shard_{shard_idx}_output") - os.makedirs(shard_out, exist_ok=True) - shard_filename = ( - f"{os.path.splitext(filename)[0]}_shard{shard_idx}.pdf" - ) - logger.info( - f" 🔄 MinerU shard_{shard_idx}: parsing" + temp_shard_s3_keys = [ + _build_temp_shard_s3_key( + source_s3_key=s3_key, + job_id=job_id, + filename=filename, + shard_index=shard_index, + ) + for shard_index, _shard_pdf_path in enumerate(shard_pdf_paths) + ] + + # 5. Parse each shard via MinerU (parallel) + shard_output_dirs: list[str | None] = [None] * len(shard_pdf_paths) + concurrency = settings.MINERU_SHARD_CONCURRENCY + + def _parse_single_shard(shard_idx, shard_pdf): + assert work_dir is not None + shard_out = os.path.join(work_dir, f"shard_{shard_idx}_output") + os.makedirs(shard_out, exist_ok=True) + shard_filename = ( + f"{os.path.splitext(filename)[0]}_shard{shard_idx}.pdf" + ) + shard_s3_key = temp_shard_s3_keys[shard_idx] + logger.info( + f" 🔄 MinerU shard_{shard_idx}: parsing via S3 URL " + f"({shard_s3_key})" + ) + parse_via_full(shard_pdf, shard_filename, shard_out, s3_key=shard_s3_key) + return shard_out + + with stage_timer( + "pdf.mineru_parallel", filename=filename, shard_count=len(shard_pdf_paths) + ): + with ThreadPoolExecutor(max_workers=concurrency) as executor: + futures = { + executor.submit(_parse_single_shard, i, shard_pdf_path): i + for i, shard_pdf_path in enumerate(shard_pdf_paths) + } + for future in as_completed(futures): + idx = futures[future] + shard_output_dirs[idx] = future.result() + + # 6. Per-shard heading prediction (parallel) + @dataclass + class ShardHeadingResult: + shard_index: int + lines_with_heading: list[str] + heading_count: int + + smart_parse = base_llm_paras.get("smart_title_parse", True) + hierarchy_model_name = ( + base_llm_paras.get("hierarchy_model_name") + or base_llm_paras.get("model_name", settings.NORMOL_MODEL) ) - parse_via_full(shard_pdf, shard_filename, shard_out, s3_key=None) - return shard_out - - with stage_timer( - "pdf.mineru_parallel", filename=filename, shard_count=len(shard_pdf_paths) - ): - with ThreadPoolExecutor(max_workers=concurrency) as executor: - futures = { - executor.submit(_parse_single_shard, i, shard_pdf_path): i - for i, shard_pdf_path in enumerate(shard_pdf_paths) - } - for future in as_completed(futures): - idx = futures[future] - shard_output_dirs[idx] = future.result() - - # 6. Per-shard heading prediction (parallel) - @dataclass - class ShardHeadingResult: - shard_index: int - lines_with_heading: list[str] - heading_count: int - - smart_parse = base_llm_paras.get("smart_title_parse", True) - hierarchy_model_name = ( - base_llm_paras.get("hierarchy_model_name") - or base_llm_paras.get("model_name", settings.NORMOL_MODEL) - ) - def _predict_shard_headings(shard_idx: int, shard_out_dir: str) -> ShardHeadingResult: - """Run full heading prediction pipeline on a single shard's full.md.""" - md_path = os.path.join(shard_out_dir, "full.md") - if not os.path.exists(md_path): - raise FileNotFoundError(f"shard_{shard_idx}: full.md not found") - - with open(md_path, "r", encoding="utf-8") as f: - md_lines = f.readlines() - md_lines = [line.strip() for line in md_lines if line.strip() != ""] - md_lines = merge_html_tables(md_lines) + def _predict_shard_headings(shard_idx: int, shard_out_dir: str) -> ShardHeadingResult: + """Run full heading prediction pipeline on a single shard's full.md.""" + md_path = os.path.join(shard_out_dir, "full.md") + if not os.path.exists(md_path): + raise FileNotFoundError(f"shard_{shard_idx}: full.md not found") + + with open(md_path, "r", encoding="utf-8") as f: + md_lines = f.readlines() + md_lines = [line.strip() for line in md_lines if line.strip() != ""] + md_lines = merge_html_tables(md_lines) + + # TOC context: first TOC shared by all shards; subsequent TOCs assigned + # by page boundary. For simplicity, all TOCs are passed since pred_titles + # only matches headings actually present in this shard's content. + shard_toc = toc_hierarchies + + lines_with_heading = eval_md_headings( + md_lines, + source_type="md", + toc_hierarchies=shard_toc, + smart_parse=smart_parse, + model_name=hierarchy_model_name, + output_dir=shard_out_dir, + layout_json_path=( + os.path.join(shard_out_dir, "layout.json") + if os.path.exists(os.path.join(shard_out_dir, "layout.json")) + else None + ), + ) - # TOC context: first TOC shared by all shards; subsequent TOCs assigned - # by page boundary. For simplicity, all TOCs are passed since pred_titles - # only matches headings actually present in this shard's content. - shard_toc = toc_hierarchies + heading_count = sum(1 for line in lines_with_heading if line.startswith("#")) + logger.info( + f" ✅ shard_{shard_idx}: {heading_count} headings identified " + f"from {len(lines_with_heading)} lines" + ) + return ShardHeadingResult( + shard_index=shard_idx, + lines_with_heading=lines_with_heading, + heading_count=heading_count, + ) - lines_with_heading = eval_md_headings( - md_lines, - source_type="md", - toc_hierarchies=shard_toc, - smart_parse=smart_parse, - model_name=hierarchy_model_name, - output_dir=shard_out_dir, - layout_json_path=( - os.path.join(shard_out_dir, "layout.json") - if os.path.exists(os.path.join(shard_out_dir, "layout.json")) - else None - ), + shard_heading_results: list[ShardHeadingResult | None] = [None] * len(shard_output_dirs) + + with stage_timer( + "pdf.shard_headings", filename=filename, shard_count=len(shard_output_dirs) + ): + with ThreadPoolExecutor(max_workers=concurrency) as executor: + futures = { + executor.submit(_predict_shard_headings, i, shard_dir): i + for i, shard_dir in enumerate(shard_output_dirs) + if shard_dir is not None + } + for future in as_completed(futures): + idx = futures[future] + shard_heading_results[idx] = future.result() + + # 7. Merge: concatenate lines_with_heading (in shard order) + merge images + complete_heading_results: list[ShardHeadingResult] = [] + for index, result in enumerate(shard_heading_results): + if result is None: + raise RuntimeError(f"Missing heading result for shard_{index}") + complete_heading_results.append(result) + + # Compute level offsets: continuation shards get shifted deeper. + shard_offsets: list[int] = [] + for shard in agent_shards: + if shard.is_continuation: + shard_offsets.append(max(shard.split_depth - 1, 0)) + else: + shard_offsets.append(0) + if any(offset > 0 for offset in shard_offsets): + logger.info(f"📐 Shard level offsets: {shard_offsets}") + + all_lines_with_heading: list[str] = merge_shard_lines( + [result.lines_with_heading for result in complete_heading_results], + shard_offsets=shard_offsets, + ) + total_headings = sum( + 1 for line in all_lines_with_heading if line.startswith("#") ) - heading_count = sum(1 for line in lines_with_heading if line.startswith("#")) logger.info( - f" ✅ shard_{shard_idx}: {heading_count} headings identified " - f"from {len(lines_with_heading)} lines" - ) - return ShardHeadingResult( - shard_index=shard_idx, - lines_with_heading=lines_with_heading, - heading_count=heading_count, + f"📎 Merged {len(complete_heading_results)} shards: " + f"{len(all_lines_with_heading)} lines, {total_headings} headings" ) - shard_heading_results: list[ShardHeadingResult | None] = [None] * len(shard_output_dirs) - - with stage_timer( - "pdf.shard_headings", filename=filename, shard_count=len(shard_output_dirs) - ): - with ThreadPoolExecutor(max_workers=concurrency) as executor: - futures = { - executor.submit(_predict_shard_headings, i, shard_dir): i - for i, shard_dir in enumerate(shard_output_dirs) - if shard_dir is not None - } - for future in as_completed(futures): - idx = futures[future] - shard_heading_results[idx] = future.result() - - # 7. Merge: concatenate lines_with_heading (in shard order) + merge images - complete_heading_results: list[ShardHeadingResult] = [] - for index, result in enumerate(shard_heading_results): - if result is None: - raise RuntimeError(f"Missing heading result for shard_{index}") - complete_heading_results.append(result) - - # Compute level offsets: continuation shards get shifted deeper - shard_offsets: list[int] = [] - for shard in agent_shards: - if shard.is_continuation: - shard_offsets.append(max(shard.split_depth - 1, 0)) - else: - shard_offsets.append(0) - if any(o > 0 for o in shard_offsets): - logger.info(f"📐 Shard level offsets: {shard_offsets}") - - all_lines_with_heading: list[str] = merge_shard_lines( - [result.lines_with_heading for result in complete_heading_results], - shard_offsets=shard_offsets, - ) - total_headings = sum( - 1 for line in all_lines_with_heading if line.startswith("#") - ) + with stage_timer("pdf.merge_images", filename=filename): + merge_images(shard_output_dirs, output_dir) + + logger.info("✅ Shard-first hierarchy complete, entering parse_md Phase B") - logger.info( - f"📎 Merged {len(complete_heading_results)} shards: " - f"{len(all_lines_with_heading)} lines, {total_headings} headings" + # 8. parse_md Phase B only (skip TOC detection + heading prediction) + with stage_timer("pdf.parse_md", filename=filename): + return parse_md( + output_dir, + source_type="md", + base_llm_paras=base_llm_paras, + relative_root=relative_root, + lines_with_heading=all_lines_with_heading, + ) + finally: + _cleanup_temp_shard_s3_assets(temp_shard_s3_keys) + _cleanup_local_shard_workspace(work_dir) + + +def _build_temp_shard_s3_key( + *, + source_s3_key: str | None, + job_id: str | None, + filename: str, + shard_index: int, +) -> str: + owner_segment = _sanitize_temp_storage_segment( + job_id or _source_key_stem(source_s3_key) or os.path.splitext(filename)[0] ) + return f"tmp/mineru-shards/{owner_segment}/shard_{shard_index}.pdf" - with stage_timer("pdf.merge_images", filename=filename): - merge_images(shard_output_dirs, output_dir) - logger.info("✅ Shard-first hierarchy complete, entering parse_md Phase B") +def _source_key_stem(source_s3_key: str | None) -> str | None: + if not source_s3_key: + return None + key_name = os.path.basename(source_s3_key.rstrip("/")) + stem, _extension = os.path.splitext(key_name) + return stem or None - # 8. parse_md Phase B only (skip TOC detection + heading prediction) - with stage_timer("pdf.parse_md", filename=filename): - return parse_md( - output_dir, - source_type="md", - base_llm_paras=base_llm_paras, - relative_root=relative_root, - lines_with_heading=all_lines_with_heading, - ) +def _sanitize_temp_storage_segment(value: object) -> str: + normalized = re.sub(r"[^A-Za-z0-9_.-]+", "-", str(value)).strip(".-") + return normalized or "document" + + +def _cleanup_temp_shard_s3_assets(s3_keys: list[str]) -> None: + if not s3_keys: + return + storage = JobFileStorage() + for s3_key in s3_keys: + try: + deleted = storage.delete_upload_file(s3_key) + if deleted: + logger.info(f"Deleted temporary MinerU shard S3 object: {s3_key}") + else: + logger.debug(f"Temporary MinerU shard S3 object was absent: {s3_key}") + except Exception as exc: + logger.warning( + f"Failed to delete temporary MinerU shard S3 object " + f"{s3_key}: {exc}" + ) + +def _cleanup_local_shard_workspace(work_dir: str | None) -> None: + if not work_dir or not os.path.exists(work_dir): + return + try: + shutil.rmtree(work_dir) + logger.info(f"Deleted temporary MinerU shard workspace: {work_dir}") + except Exception as exc: + logger.warning( + f"Failed to delete temporary MinerU shard workspace {work_dir}: {exc}" + ) diff --git a/apps/worker/app/services/document_parser/orchestration/format_adapters.py b/apps/worker/app/services/document_parser/orchestration/format_adapters.py index 6dc3409c..e6af2f8e 100644 --- a/apps/worker/app/services/document_parser/orchestration/format_adapters.py +++ b/apps/worker/app/services/document_parser/orchestration/format_adapters.py @@ -85,6 +85,7 @@ def parse(self, session: ParseSession) -> ParseOutput: profile=session.profile, relative_root=session.relative_root, s3_key=session.s3_key, + job_id=session.job_id, ) return ParseOutput(output_dir=session.full_output_dir, parsed_df=parsed_df) diff --git a/apps/worker/tests/contract/test_parse_task_contract.py b/apps/worker/tests/contract/test_parse_task_contract.py index 843825bc..355966cd 100644 --- a/apps/worker/tests/contract/test_parse_task_contract.py +++ b/apps/worker/tests/contract/test_parse_task_contract.py @@ -558,6 +558,13 @@ class _Profile: page_count = 3 calls: dict[str, object] = {} + parse_s3_keys: list[str | None] = [] + deleted_s3_keys: list[str] = [] + + class _FakeJobFileStorage: + def delete_upload_file(self, storage_key: str) -> bool: + deleted_s3_keys.append(storage_key) + return True def _fake_run_doc_agent(pdf_path_arg: str, job_id: str, output_dir: str): calls["doc_agent"] = { @@ -610,6 +617,7 @@ def _fake_split_pdf(pdf_path_arg, shards, work_dir, exclude_pages=None): return paths, None def _fake_parse_via_full(shard_pdf, shard_filename, shard_out, s3_key=None): + parse_s3_keys.append(s3_key) shard_index = 0 if "shard0" in shard_filename else 1 lines_by_shard = { 0: ["# Chapter 1", "Shard one body."], @@ -648,6 +656,7 @@ def _identity_eval_md_headings( "app.services.document_parser.formats.markdown.parser.eval_md_headings", _identity_eval_md_headings, ) + monkeypatch.setattr(pdf_parser, "JobFileStorage", _FakeJobFileStorage) df = pdf_parser.parse_pdfs( str(pdf_path), @@ -664,10 +673,20 @@ def _identity_eval_md_headings( }, profile=_Profile(), relative_root="oversized.pdf", + s3_key="uploads/job-oversized.pdf", + job_id="job-oversized", ) assert calls["exclude_pages"] == {1} + assert calls["doc_agent"]["job_id"] == "job-oversized" assert len(calls["heading_dirs"]) == 2 + expected_s3_keys = [ + "tmp/mineru-shards/job-oversized/shard_0.pdf", + "tmp/mineru-shards/job-oversized/shard_1.pdf", + ] + assert parse_s3_keys == expected_s3_keys + assert deleted_s3_keys == expected_s3_keys + assert not (output_dir / "_shards").exists() assert list(df["type"]) == ["PTXT", "PTXT"] assert list(df["content"]) == ["Shard one body.", "Shard two body."] assert list(df["path"]) == [ diff --git a/packages/shared-python/shared/services/storage/job_file_storage.py b/packages/shared-python/shared/services/storage/job_file_storage.py index 77d49e2b..d0cc47b7 100644 --- a/packages/shared-python/shared/services/storage/job_file_storage.py +++ b/packages/shared-python/shared/services/storage/job_file_storage.py @@ -157,6 +157,24 @@ def upload_source_file( bucket=self.uploads_bucket, ) + def delete_object( + self, + storage_key: str, + *, + bucket: str, + ) -> bool: + try: + return self.storage_adapter.delete_object(storage_key, bucket) + except Exception as exc: + raise StorageServiceException( + internal_message=f"Storage delete failed: {exc}", + operation="delete_object", + original_exception=exc, + ) from exc + + def delete_upload_file(self, storage_key: str) -> bool: + return self.delete_object(storage_key, bucket=self.uploads_bucket) + def upload_fileobj( self, file_obj: BinaryIO,