Skip to content
Merged
10 changes: 4 additions & 6 deletions apps/api/app/api/v1/routes/retrieval.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Retrieval API routes for lexical + graph-routing baseline."""
"""Retrieval API routes for evidence-only agentic retrieval."""

from __future__ import annotations

Expand Down Expand Up @@ -59,7 +59,7 @@ class RetrievalQueryRequest(BaseModel):
)
use_agentic: bool | None = Field(
None,
description="Per-request agentic mode toggle. true=force agentic, false=force legacy, null=use server default.",
description="Deprecated mode hint retained for cache/request compatibility; retrieval always uses the agentic workflow.",
)

@field_validator("channels")
Expand Down Expand Up @@ -99,10 +99,8 @@ class RetrievalQueryResponse(BaseModel):
decision_trace: list[dict] | None = Field(
default=None,
description=(
"Per-step navigation decisions from agentic retrieval. "
"Each entry has phase, document, action, reason, collected_paths, "
"and drill_into. Use this to understand "
"why KNOWHERE stopped or made specific navigation choices."
"Per-step agentic retrieval trace. Each entry follows the "
"observation/decision/result schema."
),
)

Expand Down
139 changes: 61 additions & 78 deletions apps/api/tests/contract/test_agentic_discovery_selection_contract.py
Original file line number Diff line number Diff line change
@@ -1,96 +1,79 @@
from shared.services.retrieval.agentic.core.types import DocTreeNode
from shared.services.retrieval.agentic.discovery.selection import (
_build_discovery_path_selections,
_project_discovery_hints,
)
from shared.services.retrieval.agentic.navigation.actions import build_legal_actions


def test_root_discovery_hint_is_projected_for_llm_selection() -> None:
hint_lines, hint_by_path, excluded_hints = _project_discovery_hints(
[
def test_discovery_hint_is_projected_as_collect_action() -> None:
action_set = build_legal_actions(
items=[],
current_scope=None,
collected_paths=[],
expanded_scopes=set(),
discovery_hints=[
{
"section_path": "Root",
"chunk_id": "chunk_root_relevant",
"summary": "document-level market chart",
"section_path": "2 阶段性调整还是牛熊切换? / 2.1 牛熊切换缘何开启?",
"discovery_score": 0.82,
"chunk_type": "text",
}
],
exclude_paths=None,
rejected_paths=set(),
rejected_collect_paths=set(),
total_images=0,
total_tables=0,
budget_snapshot=None,
)

assert hint_lines == [
'▸ path="Root"',
" document-level market chart",
]
assert hint_by_path["Root"]["chunk_id"] == "chunk_root_relevant"
assert len(action_set.collect) == 1
action = action_set.collect[0]
assert action.id == "D1"
assert action.action == "COLLECT"
assert action.source == "discovery"
assert action.path == "2 阶段性调整还是牛熊切换? / 2.1 牛熊切换缘何开启?"
assert action.score == 0.82


def test_root_discovery_hint_without_llm_selection_does_not_hydrate() -> None:
node = DocTreeNode()

path_selections, chunk_refs = _build_discovery_path_selections(
selections=[],
hint_by_path={
"Root": {
"section_path": "Root",
"chunk_id": "chunk_root_relevant",
def test_discovery_hint_under_collected_path_is_not_repeated() -> None:
action_set = build_legal_actions(
items=[],
current_scope=None,
collected_paths=[
{
"path": "2 阶段性调整还是牛熊切换?",
"hydrate_mode": "chunks",
}
},
document_id="doc_root",
node=node,
)

assert path_selections == []
assert chunk_refs == []
assert node.confidence == {}


def test_explicit_root_discovery_selection_with_chunk_id_uses_exact_chunk_ref() -> None:
node = DocTreeNode()

path_selections, chunk_refs = _build_discovery_path_selections(
selections=[{"path": "Root", "confidence": 0.91}],
hint_by_path={
"Root": {
"section_path": "Root",
"chunk_id": "chunk_root_relevant",
],
expanded_scopes=set(),
discovery_hints=[
{
"section_path": "2 阶段性调整还是牛熊切换? / 2.1 牛熊切换缘何开启?",
"discovery_score": 0.82,
}
},
document_id="doc_root",
node=node,
],
rejected_paths=set(),
rejected_collect_paths=set(),
total_images=0,
total_tables=0,
budget_snapshot=None,
)

assert path_selections == []
assert chunk_refs == [
{
"document_id": "doc_root",
"chunk_id": "chunk_root_relevant",
"section_path": "Root",
}
]
assert node.confidence["Root"] == 0.91
assert action_set.collect == []


def test_explicit_root_discovery_selection_without_chunk_id_keeps_path_fallback() -> None:
node = DocTreeNode()

path_selections, chunk_refs = _build_discovery_path_selections(
selections=[{"path": "Root", "confidence": 0.7}],
hint_by_path={
"Root": {
"section_path": "Root",
"chunk_id": "",
def test_discovery_hint_under_rejected_collect_path_is_not_repeated() -> None:
action_set = build_legal_actions(
items=[],
current_scope=None,
collected_paths=[],
expanded_scopes=set(),
discovery_hints=[
{
"section_path": "1、2016:机构行为助推行情演绎 / 二是英国“脱欧”影响下",
"discovery_score": 0.7,
}
},
document_id="doc_root",
node=node,
],
rejected_paths=set(),
rejected_collect_paths={"1、2016:机构行为助推行情演绎"},
total_images=0,
total_tables=0,
budget_snapshot=None,
)

assert path_selections == [
{
"path": "Root",
"confidence": 0.7,
"hydrate_mode": "self_only",
}
]
assert chunk_refs == []
assert node.confidence["Root"] == 0.7
assert action_set.collect == []
146 changes: 35 additions & 111 deletions apps/api/tests/contract/test_retrieval_contract.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from collections.abc import Callable, Coroutine, Sequence
from contextlib import AbstractAsyncContextManager
from datetime import datetime, timezone
from typing import Any, cast
from uuid import uuid4

Expand Down Expand Up @@ -357,139 +356,64 @@ async def test_should_return_empty_results_for_an_empty_query(


@pytest.mark.asyncio
async def test_legacy_retrieval_should_rank_hot_chunk_before_cold_chunk_when_discovery_scores_tie(
async def test_retrieval_should_ignore_false_agentic_hint_and_use_workflow(
developer_api_client_factory: Callable[
[], AbstractAsyncContextManager[AsyncClient]
],
monkeypatch: MonkeyPatch,
) -> None:
async with developer_api_client_factory() as api_client:
cold_document = await _seed_retrieval_document(
user_id="local-dev-user",
namespace="contract-hot-ranking",
source_file_name="cold.pdf",
section_path="ranking/cold",
content="same ranking marker cold",
async def fake_run_request(
self: object,
db: AsyncSession,
*,
request: WorkflowRunRequest,
llm_fn: object | None = None,
) -> WorkflowResult:
return WorkflowResult(
namespace=request.namespace,
query=request.query,
router_used="workflow_single_step",
answer_text="",
plan=QueryPlan.single_step(request.query),
referenced_chunks=[],
results=[],
)
hot_document = await _seed_retrieval_document(

monkeypatch.setattr(
"shared.services.retrieval.workflow.orchestrator.WorkflowOrchestrator.run_request",
fake_run_request,
)

async with developer_api_client_factory() as api_client:
await _seed_retrieval_document(
user_id="local-dev-user",
namespace="contract-hot-ranking",
source_file_name="hot.pdf",
section_path="ranking/hot",
content="same ranking marker hot",
namespace="contract-agentic-only",
source_file_name="a.pdf",
section_path="agentic/a",
content="same ranking marker a",
)
await _seed_retrieval_document(
user_id="local-dev-user",
namespace="contract-hot-ranking",
source_file_name="filler.pdf",
section_path="ranking/filler",
content="same ranking marker filler",
)

now = datetime.now(timezone.utc).replace(tzinfo=None)
await ContractDatabase.execute(
"""
INSERT INTO retrieval_hit_stats (
id,
user_id,
namespace,
hit_kind,
document_id,
chunk_id,
hit_count,
last_hit_at,
created_at,
updated_at
) VALUES (
:id,
:user_id,
:namespace,
'chunk',
:document_id,
:chunk_id,
:hit_count,
:now,
:now,
:now
)
""",
{
"id": f"rhs_{uuid4().hex[:12]}",
"user_id": "local-dev-user",
"namespace": "contract-hot-ranking",
"document_id": hot_document["document_id"],
"chunk_id": hot_document["chunk_id"],
"hit_count": 100,
"now": now,
},
namespace="contract-agentic-only",
source_file_name="b.pdf",
section_path="agentic/b",
content="same ranking marker b",
)

def to_channel_row(document: dict[str, str]) -> dict[str, object]:
return {
"document_id": document["document_id"],
"chunk_id": document["chunk_id"],
"section_id": document["section_id"],
"section_path": document["section_path"],
"source_file_name": "cold.pdf"
if document["document_id"] == cold_document["document_id"]
else "hot.pdf",
"chunk_type": "text",
"content": "same ranking marker",
"score": 1.0,
"file_path": None,
"chunk_metadata": {},
"job_result_id": document["job_result_id"],
"job_id": document["job_id"],
"sort_order": 0,
}

async def fake_content_channel(*_args: object, **_kwargs: object) -> list[dict[str, object]]:
return [
to_channel_row(hot_document),
to_channel_row(cold_document),
]

async def fake_path_channel(*_args: object, **_kwargs: object) -> list[dict[str, object]]:
return [
to_channel_row(cold_document),
to_channel_row(hot_document),
]

async def fake_graph_routing(*_args: object, **_kwargs: object) -> list[dict[str, object]]:
return []

monkeypatch.setattr(
"shared.services.retrieval.execution.legacy_route.path_channel",
fake_path_channel,
)
monkeypatch.setattr(
"shared.services.retrieval.execution.legacy_route.content_channel",
fake_content_channel,
)
monkeypatch.setattr(
"shared.services.retrieval.execution.legacy_route.list_graph_routed_chunks",
fake_graph_routing,
)

response = await api_client.post(
"/api/v1/retrieval/query",
json={
"namespace": "contract-hot-ranking",
"namespace": "contract-agentic-only",
"query": "same ranking marker",
"top_k": 1,
"channels": ["path", "content"],
"channel_weights": {"path": 1.0, "content": 1.0},
"use_agentic": False,
},
)

assert response.status_code == 200

response_json = cast(dict[str, object], response.json())
results = cast(list[dict[str, object]], response_json["results"])

assert len(results) == 1
assert _result_source(results[0])["document_id"] == hot_document["document_id"]
assert response_json["router_used"] == "workflow_single_step"
assert response_json["results"] == []


@pytest.mark.asyncio
Expand Down
4 changes: 3 additions & 1 deletion apps/worker/app/services/document_agent/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class H1Candidate:
page: int
confidence: float
matched_line: str
source: Literal["toc_exact_top", "toc_fuzzy_top", "heading_grep", "none"]
source: Literal["toc_exact_top", "toc_fuzzy_top", "heading_grep", "toc_grep", "h2_refine", "none"]
evidence: dict[str, Any] = field(default_factory=dict)

def to_dict(self) -> dict[str, Any]:
Expand Down Expand Up @@ -163,6 +163,8 @@ class Shard:
anchor_type: Literal["h1_boundary", "blank_separator", "forced_max_size"]
anchor_evidence: str
confidence: float
split_depth: int = 1 # 1=H1 cut, 2=H2 cut, etc.
is_continuation: bool = False # True for continuation shards that don't contain parent heading

def to_dict(self) -> dict[str, Any]:
return asdict(self)
Expand Down
Loading
Loading