diff --git a/apps/worker/app/services/document_parser/formats/image/parser.py b/apps/worker/app/services/document_parser/formats/image/parser.py index 9dd3808a..251e2e28 100755 --- a/apps/worker/app/services/document_parser/formats/image/parser.py +++ b/apps/worker/app/services/document_parser/formats/image/parser.py @@ -142,7 +142,7 @@ def ask_image( if task in ("summary-images", "atlas-page-info"): image_model = settings.IMAGE_MODEL or "gpt-4-vision-preview" - else: # Image Q&A and OCR use better models + else: # OCR and image type classification use higher-capability models image_model = settings.IMAGE_MODEL_MAX or "gpt-4-vision-preview" if len(urls_) > 0: diff --git a/packages/shared-python/shared/core/config/ai.py b/packages/shared-python/shared/core/config/ai.py index 315511f6..9d035051 100644 --- a/packages/shared-python/shared/core/config/ai.py +++ b/packages/shared-python/shared/core/config/ai.py @@ -35,7 +35,7 @@ class AIConfig(BaseModel): IMAGE_MODEL_MAX: str = Field( default="qwen3.5-flash", - description="Higher-capability image model for OCR and ask-image Q&A", + description="Higher-capability image model for OCR and image type classification", ) RETRIEVAL_DECOMPOSITION_ENABLED: bool = Field( default=False, diff --git a/packages/shared-python/shared/services/ai/llm_mock.py b/packages/shared-python/shared/services/ai/llm_mock.py index c751e325..ccb84143 100644 --- a/packages/shared-python/shared/services/ai/llm_mock.py +++ b/packages/shared-python/shared/services/ai/llm_mock.py @@ -139,15 +139,11 @@ def _detect_mock_task(prompt_text: str) -> str: if "perform ocr operation" in normalized_prompt: return "ocr-image" if ( - "you will receive an image" in normalized_prompt - and "line 1: output a short title" in normalized_prompt + "you will receive an image from a document" in normalized_prompt + and "identify the image type" in normalized_prompt ): return "summary-images" - if ( - "you will receive one or more images and the user's current question" - in normalized_prompt - ): - return "ask-image" + if "summaries of sub-sections from a document section" in normalized_prompt: return "file-summary" if ( @@ -302,7 +298,7 @@ def _build_mock_response(task_name: str) -> str: "atlas-page-info": "Mock atlas page info", "ocr-image": "Mock OCR text", "summary-images": "Mock Image Title\nMock image summary", - "ask-image": "Mock image answer", + "file-summary": "Mock section summary", "summary-titled": "Mock Title\nMock summary", "summary": "Mock summary", diff --git a/packages/shared-python/shared/services/ai/prompt_service.py b/packages/shared-python/shared/services/ai/prompt_service.py index 20bc6e3f..451f9b50 100755 --- a/packages/shared-python/shared/services/ai/prompt_service.py +++ b/packages/shared-python/shared/services/ai/prompt_service.py @@ -566,16 +566,65 @@ def build_prompt(task, texts, query, **kwargs): temperature = 0.1 max_tokens = int(kwargs["paras"]["max_tokens"] * 1.2) if texts.strip(): - img_context = f"- Image context is [{texts}], you may reference the title for summarization" + img_context = f"- Image context is [{texts}], you may reference the context for summarization" else: img_context = "" prompt = f""" - You will receive an image, which may be a photo, chart, or an image requiring OCR. - Your task is to extract the main content described in the image. Note: - - Line 1: Output a short title (no more than 15 characters) summarizing the image's core topic - - Line 2 onward: Provide a precise and concise summary, using text descriptions only, avoid extracting specific data from the image - - Your response **MUST BE in the SAME LANGUAGE** as any text visible in the image (if there is no text, English is preferred) + You will receive an image from a document. Your task is to extract the most + USEFUL information from this image based on its type. + + **STEP 1: Identify the image type** (do NOT output this step, use it internally): + - Credential/ID: identity cards, passports, driver licenses, business licenses, certificates, permits + - Data Chart: bar charts, line charts, pie charts, scatter plots, heatmaps, gauge charts + - Table Screenshot: tabular data rendered as an image + - Diagram: flowcharts, org charts, architecture diagrams, mind maps, UML diagrams + - Engineering Drawing: architectural plans, circuit diagrams, CAD drawings, mechanical drawings + - Photo: real-world photographs of people, objects, scenes, products + - Other: anything not fitting the above categories + + **STEP 2: Extract information according to image type**: + + For Credential/ID images: + - Extract ALL visible fields: name, ID number, date of birth, expiry date, + issuing authority, company name, registration number, legal representative, + business scope, qualification level, etc. + - Preserve exact values as shown (numbers, dates, codes) + + For Data Charts: + - Chart title, axis labels and units + - Key data points, trends, and notable patterns + - Time range or categories covered + - Data source if visible + + For Table Screenshots: + - Table title and column headers + - Key data entries and notable values + - Number of rows/columns and what the table represents + + For Diagrams (flow/architecture/org): + - All node names and their relationships + - Flow direction and process steps + - Hierarchy levels and key connections + + For Engineering/Technical Drawings: + - Drawing title, drawing number, scale + - Key dimensions and annotations + - Component/part names, material specifications + + For Photos: + - Primary subject and scene description + - Notable features, text, or signage visible + - Context clues about location or purpose + + For Other: + - Describe the most important visual information + + **Output format**: + - Line 1: A concise title (no more than 20 characters) capturing the core topic + - Line 2 onward: The extracted information following the type-specific guidelines above + - Your response **MUST BE in the SAME LANGUAGE** as any text visible in the image + (if no text, use English) - If the image is blank, unreadable, or contains no meaningful content, return exactly: null {img_context} @@ -595,23 +644,6 @@ def build_prompt(task, texts, query, **kwargs): - Do not add any format wrappers, prefixes, or explanations beyond the text content """ - elif task == "ask-image": - temperature = 0.1 - max_tokens = int(kwargs["paras"]["max_tokens"] * 1.2) - - prompt = f""" - You will receive one or more images and the user's current question: [{query}] - You may also receive context related to the image(s). - - {texts} - - Your task is to answer the user's question based on the image(s) and context (if any). Note: - - Your answer must be in the SAME LANGUAGE as the user's question - - Provide a complete and accurate answer with some explanation, but not exceeding {max_tokens} characters - - If the image content is unrelated to the user's question, return exactly: null - - Do not return any additional explanations or descriptions beyond the answer - """ - elif task == "judge-image-type": temperature = 0.1 prompt = """ diff --git a/packages/shared-python/shared/services/retrieval/agentic/discovery/selection.py b/packages/shared-python/shared/services/retrieval/agentic/discovery/selection.py index 30269429..c89208cd 100644 --- a/packages/shared-python/shared/services/retrieval/agentic/discovery/selection.py +++ b/packages/shared-python/shared/services/retrieval/agentic/discovery/selection.py @@ -11,6 +11,7 @@ from shared.services.retrieval.agentic.core.budget import BudgetExceeded from shared.services.retrieval.agentic.prompts import ( DISCOVERY_SELECT_PROMPT, + adjust_budget_snapshot, format_budget_block, parse_action_response, ) @@ -208,10 +209,17 @@ def _build_discovery_selection_prompt( hint_lines: list[str], budget_snapshot: dict | None, ) -> str: + # Estimate this call's prompt token cost and adjust snapshot so + # the LLM sees post-call budget (consistent with navigate_step). + items_text = "\n".join(hint_lines) + prompt_tokens_est = (len(items_text) + 400) // 2 # rough chars-to-tokens + adjusted_snapshot = adjust_budget_snapshot( + budget_snapshot, prompt_tokens_est, + ) return DISCOVERY_SELECT_PROMPT.format( doc_name=doc_name or document_id, - budget_block=format_budget_block(budget_snapshot), - items="\n".join(hint_lines), + budget_block=format_budget_block(adjusted_snapshot), + items=items_text, query=query, ) diff --git a/packages/shared-python/shared/services/retrieval/agentic/navigation/tools.py b/packages/shared-python/shared/services/retrieval/agentic/navigation/tools.py index e863bb30..345d9e3f 100644 --- a/packages/shared-python/shared/services/retrieval/agentic/navigation/tools.py +++ b/packages/shared-python/shared/services/retrieval/agentic/navigation/tools.py @@ -27,6 +27,7 @@ from shared.services.retrieval.agentic.core.budget import BudgetExceeded from shared.services.retrieval.agentic.prompts import ( COLLECTOR_PROMPT, + adjust_budget_snapshot, format_budget_block, parse_collector_response, ) @@ -42,38 +43,6 @@ from shared.services.retrieval.llm_adapter import LLMFn -def _adjust_budget_snapshot( - snapshot: dict | None, - additional_tokens: int, -) -> dict | None: - """Adjust a budget snapshot by adding estimated tokens for the current call. - - This ensures the LLM sees the budget state *after* this call's cost, - not before, preventing misleadingly low percentages. - """ - if not snapshot: - return snapshot - import copy - adjusted = copy.deepcopy(snapshot) - planning = adjusted.get("planning") - if not planning: - return adjusted - capacity = planning.get("capacity", 1) - used = planning.get("used", 0) + additional_tokens - used_pct = min(int(used * 100 / capacity), 100) if capacity > 0 else 100 - planning["used"] = used - planning["used_pct"] = used_pct - planning["remaining"] = max(0, capacity - used) - if used_pct >= 90: - planning["status"] = "EXHAUSTED" - elif used_pct >= 75: - planning["status"] = "CRITICAL" - elif used_pct >= 50: - planning["status"] = "TIGHT" - else: - planning["status"] = "HEALTHY" - return adjusted - async def navigate_step( db: AsyncSession, @@ -162,7 +131,7 @@ async def navigate_step( prompt_tokens_est = ( len(items_text) + len(trace_block) + len(tools_block) + 800 ) // 2 # rough chars-to-tokens ratio - adjusted_snapshot = _adjust_budget_snapshot( + adjusted_snapshot = adjust_budget_snapshot( budget_snapshot, prompt_tokens_est, ) diff --git a/packages/shared-python/shared/services/retrieval/agentic/orchestrator.py b/packages/shared-python/shared/services/retrieval/agentic/orchestrator.py index a9e77aca..1e47d7f0 100644 --- a/packages/shared-python/shared/services/retrieval/agentic/orchestrator.py +++ b/packages/shared-python/shared/services/retrieval/agentic/orchestrator.py @@ -21,6 +21,10 @@ from loguru import logger from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select as sa_select + +from shared.models.database.document import Document +from shared.services.retrieval.agentic import tools as agentic_tools from shared.services.retrieval.agentic.core.budget import BudgetLedger from shared.services.retrieval.agentic.discovery.phase import ( run_initial_discovery, @@ -39,6 +43,7 @@ AgentRunConfig, AgentState, AgenticResult, + ToolResult, ) from shared.services.retrieval.llm_adapter import LLMFn from shared.services.retrieval.settings import DEFAULT_TOP_K @@ -252,6 +257,29 @@ async def run( ) await navigation_runner.navigate_selected_documents() decision_trace.extend(navigation_runner.decision_steps) + + # ── Phase 2.5: Discovery fallback for unnavigated documents ── + # If ALL navigated documents produced zero evidence, check + # whether bottom discovery found rows in documents that were + # never navigated (i.e. KG Select missed them). Run a + # budget-exempt discovery merge on those docs as a last resort. + has_evidence = any( + t.has_content() for t in state.doc_trees.values() + ) + if not has_evidence and llm_fn is not None: + fallback_results = await _run_discovery_fallback( + db, + state=state, + trace=trace, + trace_enabled=trace_enabled, + user_id=user_id, + namespace=namespace, + query=query, + discovery_by_doc=discovery_by_doc, + llm_fn=llm_fn, + ) + decision_trace.extend(fallback_results) + context_remaining = state.ledger.remaining('context') if state.ledger else config.token_budget_total evidence_text = await _trim_evidence_to_budget( db, @@ -312,3 +340,132 @@ async def run( ) return result + + +async def _run_discovery_fallback( + db: AsyncSession, + *, + state: AgentState, + trace: TraceRecorder, + trace_enabled: bool, + user_id: str, + namespace: str, + query: str, + discovery_by_doc: dict[str, list[dict[str, Any]]], + llm_fn: LLMFn, +) -> list[dict[str, Any]]: + """Budget-exempt discovery merge for documents that KG Select missed. + + Called only when navigation produced zero evidence. For each document + that has bottom-discovery rows but was never navigated, run + ``discovery_select_step`` with the raw ``llm_fn`` (no budget gate) so + the LLM can select relevant paths from the keyword-discovered hints. + """ + unnavigated_doc_ids = [ + doc_id + for doc_id in discovery_by_doc + if doc_id not in state.ever_explored_doc_ids + ] + if not unnavigated_doc_ids: + return [] + + logger.info( + f' agentic: Phase 2.5 — discovery fallback for ' + f'{len(unnavigated_doc_ids)} unnavigated doc(s)' + ) + + # Look up source_file_name for unnavigated docs + doc_stmt = ( + sa_select(Document.document_id, Document.source_file_name) + .where(Document.document_id.in_(unnavigated_doc_ids)) + ) + doc_result = await db.execute(doc_stmt) + doc_names: dict[str, str] = { + str(doc_id): str(name or doc_id) + for doc_id, name in doc_result.all() + } + + decision_entries: list[dict[str, Any]] = [] + + for doc_id in unnavigated_doc_ids: + doc_hints = discovery_by_doc.get(doc_id, []) + if not doc_hints: + continue + + doc_name = doc_names.get(doc_id, doc_id) + state.doc_id_to_name[doc_id] = doc_name + + try: + result = await agentic_tools.discovery_select_step( + db, + document_id=doc_id, + query=query, + llm_fn=llm_fn, # raw — no budget gate + user_id=user_id, + namespace=namespace, + doc_name=doc_name, + discovery_hints=doc_hints, + exclude_paths=None, # no navigation happened, nothing to exclude + budget_snapshot=None, # budget-exempt + ) + except Exception as exc: + logger.warning( + f' agentic: discovery fallback failed for ' + f'doc={doc_id}: {exc}' + ) + decision_entries.append({ + 'phase': 'discovery_fallback', + 'document': doc_name, + 'document_id': doc_id, + 'action': 'error', + 'reason': f'discovery_select_step failed: {exc}', + 'candidate_count': len(doc_hints), + 'hydrated_count': 0, + }) + continue + + state.step_count += 1 + discovery_node = result.node + + if trace_enabled: + trace.record_step( + 'discovery_fallback', + ToolResult( + status='selected' if discovery_node.has_content() else 'empty', + payload={ + 'document_id': doc_id, + 'hints_count': len(doc_hints), + 'hydrated_count': len(discovery_node.leaf_content), + }, + ), + decision_reason=f'fallback_discovery_{doc_name}', + ) + + action = 'select' if discovery_node.has_content() else 'skip' + decision_entries.append({ + 'phase': 'discovery_fallback', + 'document': doc_name, + 'document_id': doc_id, + 'action': action, + 'reason': ( + f'Navigation produced 0 evidence. ' + f'Discovery fallback on unnavigated doc: ' + f'{len(doc_hints)} hints, ' + f'{len(discovery_node.leaf_content)} selected.' + ), + 'candidate_count': result.candidate_count, + 'hydrated_count': len(discovery_node.leaf_content), + 'selected_paths': list(discovery_node.leaf_content.keys()), + }) + + if discovery_node.has_content(): + if doc_id in state.doc_trees: + state.doc_trees[doc_id].merge(discovery_node) + else: + state.doc_trees[doc_id] = discovery_node + logger.info( + f' agentic: discovery fallback doc={doc_name} ' + f'hydrated {len(discovery_node.leaf_content)} paths' + ) + + return decision_entries diff --git a/packages/shared-python/shared/services/retrieval/agentic/prompts.py b/packages/shared-python/shared/services/retrieval/agentic/prompts.py index 18ebafac..cd174f93 100644 --- a/packages/shared-python/shared/services/retrieval/agentic/prompts.py +++ b/packages/shared-python/shared/services/retrieval/agentic/prompts.py @@ -256,6 +256,39 @@ def extract(data: dict) -> dict: return default +def adjust_budget_snapshot( + snapshot: dict | None, + additional_tokens: int, +) -> dict | None: + """Adjust a budget snapshot by adding estimated tokens for the current call. + + This ensures the LLM sees the budget state *after* this call's cost, + not before, preventing misleadingly low percentages. + """ + if not snapshot: + return snapshot + import copy + adjusted = copy.deepcopy(snapshot) + planning = adjusted.get("planning") + if not planning: + return adjusted + capacity = planning.get("capacity", 1) + used = planning.get("used", 0) + additional_tokens + used_pct = min(int(used * 100 / capacity), 100) if capacity > 0 else 100 + planning["used"] = used + planning["used_pct"] = used_pct + planning["remaining"] = max(0, capacity - used) + if used_pct >= 90: + planning["status"] = "EXHAUSTED" + elif used_pct >= 75: + planning["status"] = "CRITICAL" + elif used_pct >= 50: + planning["status"] = "TIGHT" + else: + planning["status"] = "HEALTHY" + return adjusted + + def format_budget_block(snapshot: dict | None) -> str: if not snapshot: return ""