diff --git a/nemo_retriever/src/nemo_retriever/agentic/README.md b/nemo_retriever/src/nemo_retriever/agentic/README.md new file mode 100644 index 0000000000..6fad0fe2b0 --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/agentic/README.md @@ -0,0 +1,140 @@ +# Agentic Retrieval Mode + +Agentic retrieval mode is a retrieval strategy for the main NeMo Retriever +pipeline. It is not a separate evaluation benchmark. The evaluation mode still +answers "how do we score results?", while retrieval mode answers "how do we +produce ranked results?". + +The first integration supports: + +```bash +--evaluation-mode recall --retrieval-mode agentic +``` + +In this mode, the pipeline ingests documents and uploads them to the configured +vector database exactly as it does today. The difference starts at evaluation +time: instead of one standard dense retrieval pass, an LLM-driven graph +retrieval pipeline searches the same vector database and produces ranked +results that are scored with recall-style metrics. + +## Graph Pipeline + +The agentic retriever composes the existing graph operators: + +```mermaid +flowchart LR + QueryCsv[Query CSV] --> Normalize[Normalize Queries] + Normalize --> ReactAgent[ReActAgentOperator] + ReactAgent --> RetrieverTool[Retriever Tool] + RetrieverTool --> VDB[Vector DB] + ReactAgent --> RRFAggregator[RRFAggregatorOperator] + RRFAggregator --> SelectionAgent[SelectionAgentOperator] + SelectionAgent --> RankedResults[Ranked Results] + RankedResults --> Metrics[Recall Metrics] +``` + +`ReActAgentOperator` runs an LLM-driven ReAct loop per query. The agent can +think, issue retrieval subqueries, inspect retrieved candidates, and decide +when it has enough evidence. + +`RRFAggregatorOperator` combines candidates from multiple retrieval steps using +reciprocal rank fusion. This gives more weight to documents that appear near +the top across multiple search attempts. + +`SelectionAgentOperator` runs a final LLM-based selection pass over the fused +candidate set and emits the ranked document IDs used for scoring. + +## CLI Integration + +The main CLI adds a retrieval strategy option: + +```bash +--retrieval-mode standard|agentic +``` + +`--evaluation-mode` remains evaluation-focused: + +```bash +--evaluation-mode recall|beir|qa +``` + +Supported combinations in the first integration: + +- `--evaluation-mode=recall --retrieval-mode=standard` +- `--evaluation-mode=recall --retrieval-mode=agentic` +- `--evaluation-mode=qa --retrieval-mode=standard` + +Unsupported initially: + +- `--evaluation-mode=qa --retrieval-mode=agentic` +- BEIR through the generic pipeline path remains unchanged and unavailable, as + it is in the existing pipeline. + +## Agentic Options + +`--agentic-llm-model` sets the chat model used by both `ReActAgentOperator` and +`SelectionAgentOperator`. It is required when `--retrieval-mode=agentic`. + +`--agentic-invoke-url` optionally sets the OpenAI-compatible chat completions +endpoint used by the agent operators. If omitted, the operators use their +default endpoint. + +`--agentic-react-max-steps` controls the maximum ReAct loop iterations per +query. The default is `10`. + +## Wrapped Standard Retrieval + +Every agent `retrieve` tool call delegates to the existing +`nemo_retriever.retriever.Retriever`. That means agentic mode searches the same +vector database populated by ingestion and reuses the same retrieval settings +where possible. + +Existing options reused by the wrapped retriever: + +- `--api-key`: authentication for agentic LLM calls and remote services unless + a more specific key is provided. +- `--embed-model-name`, `--embed-invoke-url`, `--local-query-embed-backend`, + `--local-hf-batch-size`: query embedding configuration. +- `--reranker`, `--reranker-model-name`, `--reranker-invoke-url`, + `--reranker-api-key`, `--local-reranker-backend`: optional reranking inside + the wrapped retriever. + +The first integration intentionally keeps the lower-level agentic retrieval +parameters fixed: + +- retriever top-k: `10` +- target top-k: `10` +- selection top-k: `10` +- query concurrency: `1` +- text truncation: `500` +- max tokens: provider default +- parallel tool calls: disabled + +## Examples + +Local in-process run: + +```bash +retriever pipeline run ./data/bo767 \ + --run-mode inprocess \ + --evaluation-mode recall \ + --retrieval-mode agentic \ + --query-csv ./data/bo767_query_gt.csv \ + --agentic-llm-model meta/llama-3.3-70b-instruct \ + --api-key os.environ/NVIDIA_API_KEY +``` + +Batch run with remote embedding and agent endpoints: + +```bash +retriever pipeline run ./data/bo767 \ + --run-mode batch \ + --evaluation-mode recall \ + --retrieval-mode agentic \ + --query-csv ./data/bo767_query_gt.csv \ + --embed-invoke-url http://localhost:8000/v1 \ + --agentic-invoke-url http://localhost:9000/v1/chat/completions \ + --agentic-llm-model meta/llama-3.3-70b-instruct \ + --agentic-react-max-steps 5 \ + --api-key os.environ/NVIDIA_API_KEY +``` diff --git a/nemo_retriever/src/nemo_retriever/agentic/__init__.py b/nemo_retriever/src/nemo_retriever/agentic/__init__.py new file mode 100644 index 0000000000..f0536322bc --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/agentic/__init__.py @@ -0,0 +1,21 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Agentic retrieval utilities.""" + +from nemo_retriever.agentic.retrieval import ( + AgenticRetrievalConfig, + AgenticRetriever, + build_beir_run_from_agentic_result, + build_qrels, + run_agentic_recall_evaluation, +) + +__all__ = [ + "AgenticRetrievalConfig", + "AgenticRetriever", + "build_beir_run_from_agentic_result", + "build_qrels", + "run_agentic_recall_evaluation", +] diff --git a/nemo_retriever/src/nemo_retriever/agentic/retrieval.py b/nemo_retriever/src/nemo_retriever/agentic/retrieval.py new file mode 100644 index 0000000000..c4c2329949 --- /dev/null +++ b/nemo_retriever/src/nemo_retriever/agentic/retrieval.py @@ -0,0 +1,370 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Graph-backed agentic retrieval mode. + +The implementation is intentionally additive: it composes the existing graph +operators and wraps :class:`nemo_retriever.retriever.Retriever` without changing +the standard retrieval path. +""" + +from __future__ import annotations + +import logging +import threading +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Optional, Sequence + +import pandas as pd + +from nemo_retriever.graph.abstract_operator import AbstractOperator +from nemo_retriever.model import VL_EMBED_MODEL, VL_RERANK_MODEL +from nemo_retriever.recall.beir import compute_beir_metrics +from nemo_retriever.recall.core import ( + _hit_to_audio_segment_key, + _normalize_pdf_name, + _normalize_query_df, +) +from nemo_retriever.retriever import Retriever + +logger = logging.getLogger(__name__) + +AGENTIC_RETRIEVER_TOP_K = 20 +AGENTIC_TARGET_TOP_K = 10 +AGENTIC_SELECTION_TOP_K = 10 +AGENTIC_NUM_CONCURRENT = 1 +AGENTIC_TEXT_TRUNCATION = 2000 +AGENTIC_PARALLEL_TOOL_CALLS = False +AGENTIC_RRF_K = 60 +AGENTIC_REACT_MAX_STEPS = 10 + + +class AgenticQueryInputOperator(AbstractOperator): + """Adapt ``Retriever(graph=...)`` input DataFrames to agentic query schema.""" + + def preprocess(self, data: Any, **kwargs: Any) -> pd.DataFrame: + _ = kwargs + if not isinstance(data, pd.DataFrame): + raise TypeError(f"AgenticQueryInputOperator expects a pd.DataFrame, got {type(data).__name__}.") + return data.copy() + + def process(self, data: pd.DataFrame, **kwargs: Any) -> pd.DataFrame: + _ = kwargs + out = data.copy() + if "query_text" not in out.columns: + if "query" in out.columns: + out["query_text"] = out["query"].astype(str) + elif "text" in out.columns: + out["query_text"] = out["text"].astype(str) + else: + raise ValueError("Agentic query graph input requires 'query_text', 'query', or 'text'.") + if "query_id" not in out.columns: + out["query_id"] = [str(idx) for idx in range(len(out.index))] + return out[["query_id", "query_text"]] + + def postprocess(self, data: pd.DataFrame, **kwargs: Any) -> pd.DataFrame: + _ = kwargs + return data + + +class AgenticSelectionOutputOperator(AbstractOperator): + """Convert final agentic selection DataFrame to ``Retriever`` hit-list output.""" + + def preprocess(self, data: Any, **kwargs: Any) -> pd.DataFrame: + _ = kwargs + if not isinstance(data, pd.DataFrame): + raise TypeError(f"AgenticSelectionOutputOperator expects a pd.DataFrame, got {type(data).__name__}.") + return data.copy() + + def process(self, data: pd.DataFrame, **kwargs: Any) -> list[list[dict[str, Any]]]: + _ = kwargs + if data.empty: + return [] + required = {"query_id", "doc_id", "rank"} + missing = required - set(data.columns) + if missing: + raise ValueError(f"Agentic selection output missing required columns: {sorted(missing)}") + + hits: list[list[dict[str, Any]]] = [] + for _query_id, group in data.groupby("query_id", sort=False): + query_hits: list[dict[str, Any]] = [] + for _, row in group.sort_values("rank").iterrows(): + hit = row.to_dict() + doc_id = str(hit.get("doc_id", "")) + if doc_id and not hit.get("pdf_page"): + hit["pdf_page"] = doc_id + query_hits.append(hit) + hits.append(query_hits) + return hits + + def postprocess(self, data: list[list[dict[str, Any]]], **kwargs: Any) -> list[list[dict[str, Any]]]: + _ = kwargs + return data + + +@dataclass(frozen=True) +class AgenticRetrievalConfig: + """Configuration for graph-backed agentic retrieval.""" + + vdb_op: str = "lancedb" + vdb_kwargs: dict[str, Any] = field(default_factory=dict) + query_embedder: str = VL_EMBED_MODEL + embedding_endpoint: Optional[str] = None + embedding_api_key: str = "" + embedding_use_grpc: Optional[bool] = None + local_hf_batch_size: int = 32 + local_query_embed_backend: str = "hf" + reranker: Optional[str] = None + reranker_endpoint: Optional[str] = None + reranker_api_key: str = "" + local_reranker_backend: str = "vllm" + embed_modality: str = "text" + llm_model: str = "" + invoke_url: Optional[str] = None + api_key: Optional[str] = None + react_max_steps: int = AGENTIC_REACT_MAX_STEPS + + def __post_init__(self) -> None: + if not str(self.llm_model).strip(): + raise ValueError("Agentic retrieval requires a non-empty llm_model.") + if int(self.react_max_steps) < 1: + raise ValueError("react_max_steps must be >= 1.") + + +class AgenticRetriever: + """Run graph-backed agentic retrieval over query IDs and query texts.""" + + def __init__(self, cfg: AgenticRetrievalConfig, *, match_mode: str = "pdf_page") -> None: + self._cfg = cfg + self._match_mode = str(match_mode) + self._retriever = Retriever( + vdb_kwargs={ + "vdb_op": str(cfg.vdb_op), + "vdb_kwargs": dict(cfg.vdb_kwargs or {}), + }, + embed_kwargs={ + "model_name": str(cfg.query_embedder or VL_EMBED_MODEL), + "embed_model_name": str(cfg.query_embedder or VL_EMBED_MODEL), + "embedding_endpoint": cfg.embedding_endpoint, + "api_key": cfg.embedding_api_key, + "input_type": "query", + "local_ingest_embed_backend": str(cfg.local_query_embed_backend), + "inference_batch_size": int(cfg.local_hf_batch_size), + "embed_inference_batch_size": int(cfg.local_hf_batch_size), + }, + top_k=AGENTIC_TOP_K, + rerank=bool(cfg.reranker), + rerank_kwargs={ + "model_name": cfg.reranker or VL_RERANK_MODEL, + "invoke_url": cfg.reranker_endpoint, + "api_key": cfg.reranker_api_key, + "local_reranker_backend": str(cfg.local_reranker_backend), + "modality": str(cfg.embed_modality), + }, + ) + self._lock = threading.Lock() + + def retrieve(self, query_ids: Sequence[str], query_texts: Sequence[str]) -> pd.DataFrame: + """Return selected ranked documents for each query. + + The output schema matches ``SelectionAgentOperator``: ``query_id``, + ``doc_id``, ``rank``, and ``message``. + """ + + if len(query_ids) != len(query_texts): + raise ValueError("query_ids and query_texts must have the same length.") + + from nemo_retriever.graph.react_agent_operator import ReActAgentOperator + from nemo_retriever.graph.rrf_aggregator_operator import RRFAggregatorOperator + from nemo_retriever.graph.selection_agent_operator import SelectionAgentOperator + + pipeline = ( + AgenticQueryInputOperator() + >> ReActAgentOperator( + invoke_url=_none_if_empty(self._cfg.invoke_url), + llm_model=str(self._cfg.llm_model), + retriever_fn=self._retrieve_for_agent, + retriever_top_k=AGENTIC_TOP_K, + target_top_k=AGENTIC_TOP_K, + user_msg_type="with_results", + max_steps=int(self._cfg.react_max_steps), + api_key=_none_if_empty(self._cfg.api_key), + parallel_tool_calls=AGENTIC_PARALLEL_TOOL_CALLS, + num_concurrent=AGENTIC_NUM_CONCURRENT, + ) + >> RRFAggregatorOperator(k=AGENTIC_RRF_K) + >> SelectionAgentOperator( + invoke_url=_none_if_empty(self._cfg.invoke_url), + llm_model=str(self._cfg.llm_model), + top_k=AGENTIC_TOP_K, + api_key=_none_if_empty(self._cfg.api_key), + parallel_tool_calls=AGENTIC_PARALLEL_TOOL_CALLS, + ) + >> AgenticSelectionOutputOperator() + ) + graph_retriever = Retriever( + graph=pipeline, + top_k=AGENTIC_TOP_K, + embed_kwargs={"text_column": "query_text"}, + ) + raw_hits = graph_retriever.queries([str(query_text) for query_text in query_texts], top_k=AGENTIC_TOP_K) + return _raw_hits_to_agentic_result([str(query_id) for query_id in query_ids], raw_hits) + + def _retrieve_for_agent(self, query_text: str, top_k: int) -> list[dict[str, Any]]: + """Retriever callback used by ``ReActAgentOperator``.""" + + with self._lock: + hits = self._retriever.query(str(query_text), top_k=int(top_k)) + + docs: list[dict[str, Any]] = [] + for hit in hits: + doc_id = _doc_id_for_match_mode(dict(hit), match_mode=self._match_mode) + if not doc_id: + continue + docs.append( + { + "doc_id": doc_id, + "text": str(hit.get("text", ""))[:AGENTIC_TEXT_TRUNCATION], + "score": _hit_score(hit), + } + ) + if len(docs) >= int(top_k): + break + return docs + + +def run_agentic_recall_evaluation( + *, + query_csv: Path, + cfg: AgenticRetrievalConfig, + match_mode: str, + ks: Sequence[int] = (1, 5, 10), +) -> tuple[pd.DataFrame, pd.DataFrame, dict[str, dict[str, int]], dict[str, dict[str, float]], dict[str, float]]: + """Run agentic retrieval for a recall query CSV and compute metrics.""" + + df_query = _normalize_query_df(pd.read_csv(Path(query_csv)), match_mode=str(match_mode)) + query_ids = [str(idx) for idx in df_query.index] + query_texts = df_query["query"].astype(str).tolist() + qrels = build_qrels(query_ids, df_query["golden_answer"].astype(str).tolist()) + + start = time.time() + result = AgenticRetriever(cfg, match_mode=str(match_mode)).retrieve(query_ids, query_texts) + elapsed = time.time() - start + if elapsed > 0: + logger.info( + "Agentic retrieval time for %d queries: %.2f seconds (average %.2f queries/second)", + len(query_ids), + elapsed, + len(query_ids) / elapsed, + ) + + run = build_beir_run_from_agentic_result(query_ids, result) + metrics = compute_beir_metrics(qrels, run, ks=ks) + return df_query, result, qrels, run, metrics + + +def build_qrels(query_ids: Sequence[str], gold_keys: Sequence[str]) -> dict[str, dict[str, int]]: + """Build BEIR-style qrels from normalized recall gold keys.""" + + if len(query_ids) != len(gold_keys): + raise ValueError("query_ids and gold_keys must have the same length.") + return {str(query_id): {str(gold_key): 1} for query_id, gold_key in zip(query_ids, gold_keys)} + + +def build_beir_run_from_agentic_result( + query_ids: Sequence[str], + result: pd.DataFrame, +) -> dict[str, dict[str, float]]: + """Convert ``SelectionAgentOperator`` output to BEIR run format.""" + + run: dict[str, dict[str, float]] = {str(query_id): {} for query_id in query_ids} + if result.empty: + return run + + required = {"query_id", "doc_id", "rank"} + missing = required - set(result.columns) + if missing: + raise ValueError(f"Agentic result missing required columns: {sorted(missing)}") + + for query_id, group in result.groupby("query_id", sort=False): + ordered = group.sort_values("rank") + n = len(ordered.index) + scores: dict[str, float] = {} + for rank, (_, row) in enumerate(ordered.iterrows(), start=1): + doc_id = str(row["doc_id"]) + if doc_id and doc_id not in scores: + scores[doc_id] = float(n - rank + 1) + run[str(query_id)] = scores + return run + + +def _raw_hits_to_agentic_result(query_ids: Sequence[str], raw_hits: Sequence[Sequence[dict[str, Any]]]) -> pd.DataFrame: + rows: list[dict[str, Any]] = [] + for query_id, hits in zip(query_ids, raw_hits): + for rank, hit in enumerate(hits, start=1): + rows.append( + { + "query_id": str(query_id), + "doc_id": str(hit.get("doc_id") or hit.get("pdf_page") or ""), + "rank": int(hit.get("rank", rank)), + "message": str(hit.get("message", "")), + } + ) + if not rows: + return pd.DataFrame(columns=["query_id", "doc_id", "rank", "message"]) + return pd.DataFrame(rows) + + +def _doc_id_for_match_mode(hit: dict[str, Any], *, match_mode: str) -> str: + if match_mode == "audio_segment": + return _hit_to_audio_segment_key(hit) or "" + if match_mode == "pdf_only": + return _doc_id_from_hit(hit) + return _pdf_page_from_hit(hit) + + +def _pdf_page_from_hit(hit: dict[str, Any]) -> str: + pdf_page = hit.get("pdf_page") + if isinstance(pdf_page, str) and pdf_page.strip(): + return pdf_page.strip() + + source = hit.get("source") or hit.get("source_id") or hit.get("path") + page_number = hit.get("page_number") + if source and page_number is not None: + return f"{Path(str(source)).stem}_{page_number}" + return _doc_id_from_hit(hit) + + +def _doc_id_from_hit(hit: dict[str, Any]) -> str: + for key in ("pdf_basename", "source_id", "path", "source", "doc_id"): + value = hit.get(key) + if isinstance(value, str) and value.strip(): + return _normalize_pdf_name(Path(value).stem) + return "" + + +def _hit_score(hit: dict[str, Any]) -> float: + for key in ("_rerank_score", "_score", "score"): + if key in hit: + try: + return float(hit[key]) + except (TypeError, ValueError): + return 0.0 + if "_distance" in hit: + try: + return -float(hit["_distance"]) + except (TypeError, ValueError): + return 0.0 + return 0.0 + + +def _none_if_empty(value: Optional[str]) -> Optional[str]: + if value is None: + return None + stripped = str(value).strip() + if not stripped or stripped.lower() in {"none", "null"}: + return None + return stripped diff --git a/nemo_retriever/src/nemo_retriever/graph/react_agent_operator.py b/nemo_retriever/src/nemo_retriever/graph/react_agent_operator.py index 94777db5d9..bcb976475d 100644 --- a/nemo_retriever/src/nemo_retriever/graph/react_agent_operator.py +++ b/nemo_retriever/src/nemo_retriever/graph/react_agent_operator.py @@ -22,6 +22,20 @@ logger = logging.getLogger(__name__) +_LOG_PREVIEW_CHARS = 300 +_LOG_DOC_ID_LIMIT = 10 + + +def _preview_text(value: Any, *, limit: int = _LOG_PREVIEW_CHARS) -> str: + text = " ".join(str(value or "").split()) + if len(text) <= limit: + return text + return text[:limit].rstrip() + "..." + + +def _preview_doc_ids(docs: List[Dict[str, Any]], *, limit: int = _LOG_DOC_ID_LIMIT) -> List[str]: + return [str(doc.get("doc_id", "")) for doc in docs[:limit]] + # --------------------------------------------------------------------------- # Prompt rendering (verbatim content of 02_v1.j2, rendered via Python) @@ -502,6 +516,15 @@ def _run_single_query( seen_doc_ids: set[str] = set() step_counter = 0 + logger.info( + "ReActAgentOperator: query=%s start max_steps=%d retriever_top_k=%d target_top_k=%d query=%r", + query_id, + self._max_steps, + self._retriever_top_k, + self._target_top_k, + _preview_text(query_text), + ) + # ------ optional initial retrieval (with_results mode) ------ if with_init_docs: init_docs = self._call_retriever(query_text, seen_doc_ids, api_key) @@ -509,6 +532,13 @@ def _run_single_query( step_counter += 1 for d in init_docs: seen_doc_ids.add(d["doc_id"]) + logger.info( + "ReActAgentOperator: query=%s initial_retrieve docs=%d seen=%d doc_ids=%s", + query_id, + len(init_docs), + len(seen_doc_ids), + _preview_doc_ids(init_docs), + ) doc_content = _docs_to_message_content(init_docs) user_msg_content: List[Dict[str, Any]] = [ @@ -522,7 +552,7 @@ def _run_single_query( # ------ main ReAct loop ------ for _step in range(self._max_steps): - logger.debug("query=%r loop_step=%d seen_docs=%d", query_id, _step, len(seen_doc_ids)) + logger.info("ReActAgentOperator: query=%s step=%d begin seen_docs=%d", query_id, _step, len(seen_doc_ids)) try: response = invoke_chat_completion_step( invoke_url=self._invoke_url, @@ -580,6 +610,20 @@ def _run_single_query( msg = choice["message"] finish_reason = choice.get("finish_reason") tool_calls = msg.get("tool_calls") or [] + if msg.get("content"): + logger.info( + "ReActAgentOperator: query=%s step=%d assistant content=%r", + query_id, + _step, + _preview_text(msg.get("content")), + ) + logger.info( + "ReActAgentOperator: query=%s step=%d finish_reason=%s tool_calls=%s", + query_id, + _step, + finish_reason, + [((tc.get("function") or {}).get("name") or "") for tc in tool_calls], + ) # Append assistant turn assistant_turn: Dict[str, Any] = {"role": "assistant"} @@ -590,6 +634,11 @@ def _run_single_query( messages.append(assistant_turn) if finish_reason == "stop" or not tool_calls: + logger.info( + "ReActAgentOperator: query=%s step=%d no tool call; requesting continuation", + query_id, + _step, + ) messages.append({"role": "user", "content": _AUTO_USER_MSG}) continue @@ -609,20 +658,38 @@ def _run_single_query( continue if fn_name == "think": - logger.debug("query=%r step=%d [think] %s", query_id, _step, str(fn_args.get("thought", ""))[:120]) + logger.info( + "ReActAgentOperator: query=%s step=%d think=%r", + query_id, + _step, + _preview_text(fn_args.get("thought")), + ) tool_messages.append( {"role": "tool", "tool_call_id": tc_id, "content": "Your thought has been logged."} ) elif fn_name == "retrieve": subquery = str(fn_args.get("query", query_text)) - logger.debug("query=%r step=%d [retrieve] subquery=%r", query_id, _step, subquery) + logger.info( + "ReActAgentOperator: query=%s step=%d retrieve subquery=%r seen_before=%d", + query_id, + _step, + _preview_text(subquery), + len(seen_doc_ids), + ) retrieved = self._call_retriever(subquery, seen_doc_ids, api_key) - logger.debug("query=%r step=%d [retrieve] got %d new docs", query_id, _step, len(retrieved)) retrieval_log.append(retrieved) step_counter += 1 for d in retrieved: seen_doc_ids.add(d["doc_id"]) + logger.info( + "ReActAgentOperator: query=%s step=%d retrieve docs=%d seen_after=%d doc_ids=%s", + query_id, + _step, + len(retrieved), + len(seen_doc_ids), + _preview_doc_ids(retrieved), + ) doc_content = _docs_to_message_content(retrieved) tool_content: List[Dict[str, Any]] = [ {"type": "text", "text": f"Retrieved {len(retrieved)} documents:"} @@ -631,7 +698,13 @@ def _run_single_query( elif fn_name == "final_results": raw_ids: List[str] = fn_args.get("doc_ids", []) - logger.debug("query=%r step=%d [final_results] doc_ids=%s", query_id, _step, raw_ids) + logger.info( + "ReActAgentOperator: query=%s step=%d final_results doc_ids=%s message=%r", + query_id, + _step, + raw_ids[:_LOG_DOC_ID_LIMIT] if isinstance(raw_ids, list) else raw_ids, + _preview_text(fn_args.get("message")), + ) if isinstance(raw_ids, list) and raw_ids: final_doc_ids = [str(d) for d in raw_ids] tool_messages.append( @@ -652,6 +725,13 @@ def _run_single_query( if loop_done: break + logger.info( + "ReActAgentOperator: query=%s done retrieval_steps=%d seen_docs=%d final_doc_ids=%s", + query_id, + len(retrieval_log), + len(seen_doc_ids), + final_doc_ids[:_LOG_DOC_ID_LIMIT] if final_doc_ids else [], + ) return _build_output_rows(query_id, query_text, retrieval_log, final_doc_ids) def _call_retriever( @@ -678,14 +758,17 @@ def _call_retriever( logger.warning("ReActAgentOperator: retriever_fn failed for query %r: %s", query_text, exc, exc_info=True) return [] - # Filter already-seen and normalise keys + # Filter already-seen and normalise keys. Track this batch separately + # so duplicate rows from the vector DB do not reduce the effective top-k. results: List[Dict[str, Any]] = [] + batch_doc_ids: set[str] = set() for item in raw: doc_id = str(item.get("doc_id", item.get("id", ""))) text = str(item.get("text", "")) score = float(item.get("score", 0.0)) - if doc_id and doc_id not in seen_doc_ids: + if doc_id and doc_id not in seen_doc_ids and doc_id not in batch_doc_ids: results.append({"doc_id": doc_id, "text": text, "score": score}) + batch_doc_ids.add(doc_id) if len(results) >= self._retriever_top_k: break @@ -747,8 +830,32 @@ def _build_output_rows( ) # If final_results was called, also emit those as a synthetic final step - # (step_idx = len(retrieval_log)) so RRF can optionally weight it. - # These are already covered by the existing steps, so we skip deduplication - # here — RRF will naturally up-weight docs that appeared in final_results - # because they were retrieved in earlier steps. + # (step_idx = len(retrieval_log)) so RRF can weight the agent's final + # judgment in addition to the raw retrieval history. + if final_doc_ids: + first_doc_by_id: Dict[str, Dict[str, Any]] = {} + for step_docs in retrieval_log: + for doc in step_docs: + doc_id = str(doc.get("doc_id", "")) + if doc_id and doc_id not in first_doc_by_id: + first_doc_by_id[doc_id] = doc + + emitted: set[str] = set() + final_step_idx = len(retrieval_log) + for rank, doc_id in enumerate(final_doc_ids, 1): + doc_id = str(doc_id) + if not doc_id or doc_id in emitted: + continue + emitted.add(doc_id) + doc = first_doc_by_id.get(doc_id, {}) + rows.append( + { + "query_id": query_id, + "query_text": query_text, + "step_idx": final_step_idx, + "doc_id": doc_id, + "text": doc.get("text", ""), + "rank": rank, + } + ) return rows diff --git a/nemo_retriever/src/nemo_retriever/graph/selection_agent_operator.py b/nemo_retriever/src/nemo_retriever/graph/selection_agent_operator.py index 14371969a8..bddb2bbb37 100644 --- a/nemo_retriever/src/nemo_retriever/graph/selection_agent_operator.py +++ b/nemo_retriever/src/nemo_retriever/graph/selection_agent_operator.py @@ -21,6 +21,16 @@ logger = logging.getLogger(__name__) +_LOG_PREVIEW_CHARS = 300 +_LOG_DOC_ID_LIMIT = 10 + + +def _preview_text(value: Any, *, limit: int = _LOG_PREVIEW_CHARS) -> str: + text = " ".join(str(value or "").split()) + if len(text) <= limit: + return text + return text[:limit].rstrip() + "..." + # --------------------------------------------------------------------------- # Prompt rendering (verbatim content of 01_v0.j2, rendered via Python) # --------------------------------------------------------------------------- @@ -237,8 +247,21 @@ def process(self, data: pd.DataFrame, **kwargs: Any) -> pd.DataFrame: for query_id, group in data.groupby("query_id", sort=False): query_text = str(group["query_text"].iloc[0]) docs = [{"id": str(row["doc_id"]), "text": str(row["text"])} for _, row in group.iterrows()] + logger.info( + "SelectionAgentOperator: query=%s start candidates=%d unique_candidates=%d query=%r", + query_id, + len(docs), + len({doc["id"] for doc in docs}), + _preview_text(query_text), + ) result = self._select_documents(query_text, docs) message = result.get("message", "") + logger.info( + "SelectionAgentOperator: query=%s selected=%s message=%r", + query_id, + result.get("doc_ids", [])[:_LOG_DOC_ID_LIMIT], + _preview_text(message), + ) for rank, doc_id in enumerate(result.get("doc_ids", []), 1): rows.append( { @@ -363,6 +386,12 @@ def _select_documents( """Run the agentic selection loop for a single query.""" valid_ids = list(dict.fromkeys(d["id"] for d in docs)) feasible_k = min(self._top_k, len(valid_ids)) + logger.info( + "SelectionAgentOperator: selecting top_k=%d feasible_k=%d valid_doc_ids=%s", + self._top_k, + feasible_k, + valid_ids[:_LOG_DOC_ID_LIMIT], + ) system_prompt = self._build_system_prompt(feasible_k) tools = self._build_tools(feasible_k, valid_ids) @@ -379,6 +408,12 @@ def _select_documents( extra_body["parallel_tool_calls"] = False for _step in range(self._max_steps): + logger.info( + "SelectionAgentOperator: step=%d begin candidates=%d feasible_k=%d", + _step, + len(valid_ids), + feasible_k, + ) try: response = invoke_chat_completion_step( invoke_url=self._invoke_url, @@ -438,12 +473,24 @@ def _select_documents( assistant_turn: Dict[str, Any] = {"role": "assistant"} if msg.get("content"): assistant_turn["content"] = msg["content"] + logger.info( + "SelectionAgentOperator: step=%d assistant content=%r", + _step, + _preview_text(msg.get("content")), + ) tool_calls = msg.get("tool_calls") or [] + logger.info( + "SelectionAgentOperator: step=%d finish_reason=%s tool_calls=%s", + _step, + finish_reason, + [((tc.get("function") or {}).get("name") or "") for tc in tool_calls], + ) if tool_calls: assistant_turn["tool_calls"] = tool_calls messages.append(assistant_turn) if finish_reason == "stop" or not tool_calls: + logger.info("SelectionAgentOperator: step=%d no tool call; asking for final selection", _step) messages.append( { "role": "user", @@ -468,6 +515,11 @@ def _select_documents( continue if fn.get("name") == "think": + logger.info( + "SelectionAgentOperator: step=%d think=%r", + _step, + _preview_text(fn_args.get("thought")), + ) tool_messages.append( {"role": "tool", "tool_call_id": tc_id, "content": "Your thought has been logged."} ) @@ -480,6 +532,13 @@ def _select_documents( except json.JSONDecodeError: raw_doc_ids = [] doc_ids = [d for d in raw_doc_ids if d in valid_id_set][:feasible_k] + logger.info( + "SelectionAgentOperator: step=%d log_selected_documents raw=%s accepted=%s message=%r", + _step, + raw_doc_ids[:_LOG_DOC_ID_LIMIT] if isinstance(raw_doc_ids, list) else raw_doc_ids, + doc_ids[:_LOG_DOC_ID_LIMIT], + _preview_text(fn_args.get("message")), + ) if not doc_ids and raw_doc_ids: logger.warning( "SelectionAgentOperator: LLM returned %d doc_id(s) for query %r " diff --git a/nemo_retriever/src/nemo_retriever/pipeline/__main__.py b/nemo_retriever/src/nemo_retriever/pipeline/__main__.py index c2b31e5547..975f6587f8 100644 --- a/nemo_retriever/src/nemo_retriever/pipeline/__main__.py +++ b/nemo_retriever/src/nemo_retriever/pipeline/__main__.py @@ -81,6 +81,8 @@ app = typer.Typer(help="End-to-end graph-based ingestion pipeline (extract -> embed -> VDB).") DEFAULT_VDB_OP = "lancedb" +AGENTIC_LLM_MODEL_ENV = "NEMO_RETRIEVER_AGENTIC_LLM_MODEL" +AGENTIC_INVOKE_URL_ENV = "NEMO_RETRIEVER_AGENTIC_INVOKE_URL" # Help panel labels (keep stable so --help groupings read consistently). _PANEL_IO = "I/O and Execution" @@ -726,6 +728,76 @@ def _run_evaluation( return "Recall", time.perf_counter() - evaluation_start, metrics, len(df_query.index), True +def _run_agentic_recall_evaluation( + *, + vdb_op: str, + vdb_kwargs: dict[str, Any], + embed_model_name: str, + embed_invoke_url: Optional[str], + embed_remote_api_key: Optional[str], + embed_modality: str, + query_csv: Path, + recall_match_mode: str, + reranker: Optional[bool], + reranker_model_name: str, + reranker_invoke_url: Optional[str], + reranker_api_key: str, + local_reranker_backend: str, + local_hf_batch_size: int, + local_query_embed_backend: str, + agentic_llm_model: str, + agentic_invoke_url: Optional[str], + agentic_api_key: Optional[str], + agentic_react_max_steps: int, +) -> tuple[str, float, dict[str, float], Optional[int], bool]: + """Run recall evaluation using graph-backed agentic retrieval.""" + + query_csv_path = Path(query_csv) + if not query_csv_path.exists(): + logger.warning("Query CSV not found at %s; skipping agentic recall evaluation.", query_csv_path) + return "Agentic Recall", 0.0, {}, None, False + + from nemo_retriever.agentic.retrieval import AgenticRetrievalConfig, run_agentic_recall_evaluation + from nemo_retriever.model import resolve_embed_model + + embed_model = resolve_embed_model(str(embed_model_name)) + cfg = AgenticRetrievalConfig( + vdb_op=str(vdb_op), + vdb_kwargs=dict(vdb_kwargs or {}), + query_embedder=embed_model, + embedding_endpoint=embed_invoke_url, + embedding_api_key=embed_remote_api_key or "", + embedding_use_grpc=False if embed_invoke_url else None, + local_hf_batch_size=int(local_hf_batch_size), + local_query_embed_backend=local_query_embed_backend, + reranker=reranker_model_name if reranker else None, + reranker_endpoint=reranker_invoke_url, + reranker_api_key=reranker_api_key, + local_reranker_backend=local_reranker_backend, + embed_modality=embed_modality, + llm_model=agentic_llm_model, + invoke_url=agentic_invoke_url, + api_key=agentic_api_key, + react_max_steps=int(agentic_react_max_steps), + ) + evaluation_start = time.perf_counter() + df_query, _result, _qrels, _run, metrics = run_agentic_recall_evaluation( + query_csv=query_csv_path, + cfg=cfg, + match_mode=recall_match_mode, + ks=(1, 5, 10), + ) + logger.info("Agentic recall gold ids: %s", {qid: list(docs.keys()) for qid, docs in _qrels.items()}) + logger.info("Agentic recall retrieved ids: %s", {qid: list(docs.keys())[:10] for qid, docs in _run.items()}) + logger.info("Agentic recall result columns: %s", list(_result.columns)) + if {"query_id", "doc_id", "rank"}.issubset(_result.columns): + logger.info( + "Agentic recall top result rows:\n%s", + _result[["query_id", "doc_id", "rank"]].head(20).to_string(index=False), + ) + return "Agentic Recall", time.perf_counter() - evaluation_start, metrics, len(df_query.index), True + + # --------------------------------------------------------------------------- # Typer command: `retriever pipeline run` # --------------------------------------------------------------------------- @@ -1084,6 +1156,12 @@ def run( query CSV exists (after VDB upload unless --no-vdb).", rich_help_panel=_PANEL_EVAL, ), + retrieval_mode: str = typer.Option( + "standard", + "--retrieval-mode", + help="Retrieval strategy for evaluation: 'standard' (default) or 'agentic' for recall evaluation.", + rich_help_panel=_PANEL_EVAL, + ), query_csv: Path = typer.Option( "./data/bo767_query_gt.csv", "--query-csv", @@ -1125,6 +1203,28 @@ def run( help="Batch size for local HF query embedding during retrieval/reranking.", rich_help_panel=_PANEL_EVAL, ), + agentic_llm_model: Optional[str] = typer.Option( + None, + "--agentic-llm-model", + help=f"Chat model for --retrieval-mode=agentic; may also be set with {AGENTIC_LLM_MODEL_ENV}.", + rich_help_panel=_PANEL_EVAL, + ), + agentic_invoke_url: Optional[str] = typer.Option( + None, + "--agentic-invoke-url", + help=( + "OpenAI-compatible chat completions endpoint for --retrieval-mode=agentic; " + f"may also be set with {AGENTIC_INVOKE_URL_ENV}." + ), + rich_help_panel=_PANEL_EVAL, + ), + agentic_react_max_steps: int = typer.Option( + 10, + "--agentic-react-max-steps", + min=1, + help="Maximum ReAct loop iterations per query for --retrieval-mode=agentic.", + rich_help_panel=_PANEL_EVAL, + ), beir_loader: Optional[str] = typer.Option(None, "--beir-loader", rich_help_panel=_PANEL_EVAL), beir_dataset_name: Optional[str] = typer.Option(None, "--beir-dataset-name", rich_help_panel=_PANEL_EVAL), beir_split: str = typer.Option("test", "--beir-split", rich_help_panel=_PANEL_EVAL), @@ -1168,7 +1268,11 @@ def run( raise ValueError(f"Unsupported --audio-split-type: {audio_split_type!r}") if evaluation_mode not in {"none", "recall", "beir", "qa"}: raise ValueError(f"Unsupported --evaluation-mode: {evaluation_mode!r}") - if evaluation_mode == "recall": + if retrieval_mode not in {"standard", "agentic"}: + raise ValueError(f"Unsupported --retrieval-mode: {retrieval_mode!r}") + if retrieval_mode == "agentic" and evaluation_mode != "recall": + raise typer.BadParameter("--retrieval-mode=agentic is currently supported only with --evaluation-mode=recall.") + if evaluation_mode == "recall" and retrieval_mode == "standard": if input_type != "audio": raise ValueError("--evaluation-mode=recall is only supported with --input-type=audio") if recall_match_mode != "audio_segment": @@ -1178,6 +1282,12 @@ def run( "--evaluation-mode=qa requires --eval-config (QA sweep YAML/JSON). " "Use the same file format as `retriever eval run --config` (dataset, retrieval, models, ...)." ) + resolved_agentic_llm_model = (agentic_llm_model or os.environ.get(AGENTIC_LLM_MODEL_ENV) or "").strip() + resolved_agentic_invoke_url = (agentic_invoke_url or os.environ.get(AGENTIC_INVOKE_URL_ENV) or "").strip() or None + if retrieval_mode == "agentic" and not resolved_agentic_llm_model: + raise typer.BadParameter( + f"--retrieval-mode=agentic requires --agentic-llm-model or {AGENTIC_LLM_MODEL_ENV}." + ) if run_mode == "batch": os.environ["RAY_LOG_TO_DRIVER"] = "1" if ray_log_to_driver else "0" @@ -1445,6 +1555,7 @@ def run( "evaluation_secs": float(evaluation_total_time), "total_secs": float(total_time), "evaluation_mode": "qa", + "retrieval_mode": retrieval_mode, "evaluation_metrics": {}, "evaluation_count": None, "recall_details": bool(recall_details), @@ -1477,31 +1588,56 @@ def run( raise typer.Exit(code=qa_code) return - evaluation_label, evaluation_total_time, evaluation_metrics, evaluation_query_count, ran = _run_evaluation( - evaluation_mode=evaluation_mode, - vdb_op=resolved_vdb_op, - vdb_kwargs=resolved_vdb_kwargs, - embed_model_name=embed_model_name, - embed_invoke_url=embed_invoke_url, - embed_remote_api_key=embed_remote_api_key, - embed_modality=embed_modality, - query_csv=query_csv, - recall_match_mode=recall_match_mode, - audio_match_tolerance_secs=audio_match_tolerance_secs, - reranker=reranker, - reranker_model_name=reranker_model_name, - reranker_invoke_url=reranker_invoke_url, - reranker_api_key=reranker_bearer, - local_reranker_backend=local_reranker_backend, - local_hf_batch_size=local_hf_batch_size, - beir_loader=beir_loader, - beir_dataset_name=beir_dataset_name, - beir_split=beir_split, - beir_query_language=beir_query_language, - beir_doc_id_field=beir_doc_id_field, - beir_k=beir_k, - local_query_embed_backend=local_query_embed_backend, - ) + if retrieval_mode == "agentic": + evaluation_label, evaluation_total_time, evaluation_metrics, evaluation_query_count, ran = ( + _run_agentic_recall_evaluation( + vdb_op=resolved_vdb_op, + vdb_kwargs=resolved_vdb_kwargs, + embed_model_name=embed_model_name, + embed_invoke_url=embed_invoke_url, + embed_remote_api_key=embed_remote_api_key, + embed_modality=embed_modality, + query_csv=query_csv, + recall_match_mode=recall_match_mode, + reranker=reranker, + reranker_model_name=reranker_model_name, + reranker_invoke_url=reranker_invoke_url, + reranker_api_key=reranker_bearer, + local_reranker_backend=local_reranker_backend, + local_hf_batch_size=local_hf_batch_size, + local_query_embed_backend=local_query_embed_backend, + agentic_llm_model=resolved_agentic_llm_model, + agentic_invoke_url=resolved_agentic_invoke_url, + agentic_api_key=remote_api_key, + agentic_react_max_steps=agentic_react_max_steps, + ) + ) + else: + evaluation_label, evaluation_total_time, evaluation_metrics, evaluation_query_count, ran = _run_evaluation( + evaluation_mode=evaluation_mode, + vdb_op=resolved_vdb_op, + vdb_kwargs=resolved_vdb_kwargs, + embed_model_name=embed_model_name, + embed_invoke_url=embed_invoke_url, + embed_remote_api_key=embed_remote_api_key, + embed_modality=embed_modality, + query_csv=query_csv, + recall_match_mode=recall_match_mode, + audio_match_tolerance_secs=audio_match_tolerance_secs, + reranker=reranker, + reranker_model_name=reranker_model_name, + reranker_invoke_url=reranker_invoke_url, + reranker_api_key=reranker_bearer, + local_reranker_backend=local_reranker_backend, + local_hf_batch_size=local_hf_batch_size, + beir_loader=beir_loader, + beir_dataset_name=beir_dataset_name, + beir_split=beir_split, + beir_query_language=beir_query_language, + beir_doc_id_field=beir_doc_id_field, + beir_k=beir_k, + local_query_embed_backend=local_query_embed_backend, + ) if not ran: _write_runtime_summary( @@ -1519,6 +1655,7 @@ def run( "evaluation_secs": 0.0, "total_secs": float(time.perf_counter() - ingest_start), "evaluation_mode": evaluation_mode, + "retrieval_mode": retrieval_mode, "evaluation_metrics": {}, "recall_details": bool(recall_details), "vdb_op": str(resolved_vdb_op), @@ -1547,6 +1684,7 @@ def run( "evaluation_secs": float(evaluation_total_time), "total_secs": float(total_time), "evaluation_mode": evaluation_mode, + "retrieval_mode": retrieval_mode, "evaluation_metrics": dict(evaluation_metrics), "evaluation_count": evaluation_query_count, "recall_details": bool(recall_details), diff --git a/nemo_retriever/tests/test_agentic_eval.py b/nemo_retriever/tests/test_agentic_eval.py new file mode 100644 index 0000000000..05f050ca9a --- /dev/null +++ b/nemo_retriever/tests/test_agentic_eval.py @@ -0,0 +1,201 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES. +# All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import json +from unittest.mock import patch + +import pandas as pd +import pytest +from typer.testing import CliRunner + + +def _make_tool_call_response(fn_name: str, fn_args: dict, tc_id: str = "call_1") -> dict: + return { + "choices": [ + { + "message": { + "content": None, + "tool_calls": [ + { + "id": tc_id, + "type": "function", + "function": {"name": fn_name, "arguments": json.dumps(fn_args)}, + } + ], + }, + "finish_reason": "tool_calls", + } + ] + } + + +class FakeRetriever: + def __init__(self, **kwargs): + self.kwargs = kwargs + self.graph = kwargs.get("graph") + self.top_k = int(kwargs.get("top_k", 10)) + + def query(self, query: str, *, top_k: int | None = None): + if self.graph is not None: + return self.queries([query], top_k=top_k)[0] + _ = query + hits = [ + { + "source": "/tmp/doc.pdf", + "source_id": "/tmp/doc.pdf", + "page_number": 1, + "pdf_page": "doc_1", + "text": "matching document", + "_score": 0.9, + }, + { + "source": "/tmp/other.pdf", + "source_id": "/tmp/other.pdf", + "page_number": 2, + "pdf_page": "other_2", + "text": "other document", + "_score": 0.1, + }, + ] + return hits[:top_k] + + def queries(self, queries, *, top_k: int | None = None): + if self.graph is None: + return [self.query(query, top_k=top_k) for query in queries] + limit = int(top_k) if top_k is not None else self.top_k + df = pd.DataFrame({"query_text": [str(query) for query in queries]}) + graph = self.graph.resolve_for_local_execution() + raw_hits = graph.execute(df)[0] + return [list(hits)[:limit] for hits in raw_hits] + + +def test_build_qrels_requires_aligned_lengths(): + from nemo_retriever.agentic.retrieval import build_qrels + + with pytest.raises(ValueError, match="same length"): + build_qrels(["q1"], ["doc_1", "doc_2"]) + + +def test_build_beir_run_from_agentic_result_orders_by_rank(): + from nemo_retriever.agentic.retrieval import build_beir_run_from_agentic_result + + result = pd.DataFrame( + { + "query_id": ["q1", "q1", "q1"], + "doc_id": ["d2", "d1", "d3"], + "rank": [2, 1, 3], + "message": ["ok", "ok", "ok"], + } + ) + run = build_beir_run_from_agentic_result(["q1", "q2"], result) + + assert list(run["q1"]) == ["d1", "d2", "d3"] + assert run["q1"]["d1"] > run["q1"]["d2"] > run["q1"]["d3"] + assert run["q2"] == {} + + +@patch("nemo_retriever.graph.selection_agent_operator.invoke_chat_completion_step") +@patch("nemo_retriever.graph.react_agent_operator.invoke_chat_completion_step") +@patch("nemo_retriever.agentic.retrieval.Retriever", FakeRetriever) +def test_agentic_retriever_runs_graph_with_wrapped_retriever(mock_react_step, mock_selection_step): + from nemo_retriever.agentic.retrieval import AgenticRetrievalConfig, AgenticRetriever + + mock_react_step.return_value = _make_tool_call_response( + "final_results", + {"doc_ids": ["doc_1"], "message": "done", "search_successful": "true"}, + ) + mock_selection_step.return_value = _make_tool_call_response( + "log_selected_documents", + {"doc_ids": ["doc_1"], "message": "doc_1 is best"}, + ) + + cfg = AgenticRetrievalConfig(llm_model="test-model", invoke_url="http://localhost/v1/chat/completions") + result = AgenticRetriever(cfg, match_mode="pdf_page").retrieve(["0"], ["find doc"]) + + assert list(result.columns) == ["query_id", "doc_id", "rank", "message"] + assert result["query_id"].tolist() == ["0"] + assert result["doc_id"].tolist() == ["doc_1"] + assert result["rank"].tolist() == [1] + + +@patch("nemo_retriever.graph.selection_agent_operator.invoke_chat_completion_step") +@patch("nemo_retriever.graph.react_agent_operator.invoke_chat_completion_step") +@patch("nemo_retriever.agentic.retrieval.Retriever", FakeRetriever) +def test_run_agentic_recall_evaluation_computes_metrics(mock_react_step, mock_selection_step, tmp_path): + from nemo_retriever.agentic.retrieval import AgenticRetrievalConfig, run_agentic_recall_evaluation + + query_csv = tmp_path / "queries.csv" + pd.DataFrame({"query": ["find doc"], "pdf_page": ["doc_1"]}).to_csv(query_csv, index=False) + + mock_react_step.return_value = _make_tool_call_response( + "final_results", + {"doc_ids": ["doc_1"], "message": "done", "search_successful": "true"}, + ) + mock_selection_step.return_value = _make_tool_call_response( + "log_selected_documents", + {"doc_ids": ["doc_1"], "message": "doc_1 is best"}, + ) + + cfg = AgenticRetrievalConfig(llm_model="test-model", invoke_url="http://localhost/v1/chat/completions") + df_query, result, qrels, run, metrics = run_agentic_recall_evaluation( + query_csv=query_csv, + cfg=cfg, + match_mode="pdf_page", + ks=(1, 5, 10), + ) + + assert df_query["golden_answer"].tolist() == ["doc_1"] + assert result["doc_id"].tolist() == ["doc_1"] + assert qrels == {"0": {"doc_1": 1}} + assert run["0"]["doc_1"] == 1.0 + assert metrics["recall@1"] == 1.0 + assert metrics["ndcg@1"] == 1.0 + + +def test_agentic_config_requires_llm_model(): + from nemo_retriever.agentic.retrieval import AgenticRetrievalConfig + + with pytest.raises(ValueError, match="llm_model"): + AgenticRetrievalConfig(llm_model="") + + +def test_pipeline_rejects_agentic_qa_mode(): + from nemo_retriever.pipeline.__main__ import app + + result = CliRunner().invoke( + app, + [ + ".", + "--evaluation-mode", + "qa", + "--retrieval-mode", + "agentic", + "--agentic-llm-model", + "test-model", + ], + ) + + assert result.exit_code != 0 + assert "--retrieval-mode=agentic is currently supported only with" in result.output + assert "--evaluation-mode=recall" in result.output + + +def test_pipeline_requires_agentic_llm_model(): + from nemo_retriever.pipeline.__main__ import app + + result = CliRunner().invoke( + app, + [ + ".", + "--evaluation-mode", + "recall", + "--retrieval-mode", + "agentic", + ], + ) + + assert result.exit_code != 0 + assert "--retrieval-mode=agentic requires --agentic-llm-model" in result.output