Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
588 changes: 588 additions & 0 deletions examples/python/RAG_QA_chatbot/backend/agent_loop.py

Large diffs are not rendered by default.

162 changes: 162 additions & 0 deletions examples/python/RAG_QA_chatbot/backend/contract_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""Agent chat slice endpoints — the real LLM agent loop over the v6 UI Message Stream.

Two endpoints serve the streaming agent chat the frontend `useChat` hook consumes, one per
request-contract track (the team is still comparing them — see
`docs/design/agent-workflows/frontend-agent-chat-ui.md`):

* **Track A** — `POST /api/agent/chat`. `messages` is the AI SDK `UIMessage[]` (parts);
the approval decision rides inside the assistant message's tool part.
* **Track B** — `POST /api/agent/chat-agenta`. `messages` is the Agenta `{role, content}`
shape; the approval decision rides in a top-level `tool_approvals` side field.

Request envelope (FE as of 2026-06-19): `session_id` + `references` (+ Track B
`tool_approvals`) at the top level, with `data: {messages, parameters}` nested.
`_normalize_envelope` lifts `data.*` back to flat keys so the parsing below stays simple,
and it still accepts the older flat `{messages, ...}` shape.

The response stream is identical across tracks. Both delegate to the real agent loop in
`agent_loop.py` (real LLM function-calling, real `search_docs` retrieval, an approval-gated
`send_summary_email`, a real Agenta trace). **Credentials are required** — set up
`.env` (OPENAI_API_KEY + QDRANT_URL/KEY + AGENTA_*) and ingest the docs; there is no
credential-free mock. The framing is SSE (`data: <json>\\n\\n`, terminated by `[DONE]`,
header `x-vercel-ai-ui-message-stream: v1`); `session_id` is echoed on the `start` part's
`messageMetadata.sessionId`.
"""

from typing import Any, Dict, List

from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse

from . import agent_loop

router = APIRouter()


# ---- Track A: approvals read from UIMessage tool parts ---------------------------------


def _pending_approvals_uimessage(
messages: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Tool parts the user has just approved/denied but that have no output yet.

Track A: the FE encodes the decision on the assistant message's tool part as
`state == "approval-responded"` with `approval: {id, approved}`.
"""
pending: List[Dict[str, Any]] = []
for msg in messages:
if msg.get("role") != "assistant":
continue
for part in msg.get("parts") or []:
ptype = part.get("type", "")
if not ptype.startswith("tool-"):
continue
if part.get("state") != "approval-responded":
continue
approval = part.get("approval") or {}
pending.append(
{
"toolCallId": part.get("toolCallId"),
"toolName": ptype[len("tool-") :],
"input": part.get("input"),
"approved": bool(approval.get("approved")),
}
)
return pending


# ---- Track B: approvals read from the `tool_approvals` side channel --------------------


def _pending_approvals_agenta(body: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Track B: the Agenta `{role, content}` message contract has no slot for an approval
decision, so the FE adapter surfaces it in a top-level `tool_approvals` field:

"tool_approvals": [ { "tool_call_id": "call_x", "approved": true } ]

An entry is "pending" only while the matching tool call has no `tool` result message
yet — the same window Track A detects via `state == "approval-responded"`.
"""
approvals = body.get("tool_approvals") or []
if not approvals:
return []

# tool_call_ids that already have a result (so they are no longer pending)
resolved: set = set()
for msg in body.get("messages") or []:
if msg.get("role") == "tool" and msg.get("tool_call_id"):
resolved.add(msg["tool_call_id"])

pending: List[Dict[str, Any]] = []
for entry in approvals:
tool_call_id = entry.get("tool_call_id")
if not tool_call_id or tool_call_id in resolved:
continue
pending.append(
{
"toolCallId": tool_call_id,
"toolName": entry.get("tool_name", "tool"),
"input": entry.get("input"),
"approved": bool(entry.get("approved")),
}
)
return pending


def _normalize_envelope(body: Dict[str, Any]) -> Dict[str, Any]:
"""Accept the agent-protocol envelope `{session_id, references, data: {messages,
parameters}}` (what the FE sends) while staying backward-compatible with the older flat
`{messages, ag_config, ...}` shape.

Lifts `data.messages` / `data.parameters` to the top level so the per-track parsing
below and `agent_loop.run_turn` can keep reading flat keys unchanged. `session_id` and
`tool_approvals` already travel at the top level, so they need no remapping.
"""
data = body.get("data")
if not isinstance(data, dict):
return body
merged = dict(body)
if "messages" not in merged and "messages" in data:
merged["messages"] = data.get("messages")
if "parameters" in data:
merged.setdefault("parameters", data.get("parameters"))
merged.setdefault("ag_config", data.get("parameters")) # legacy alias
return merged


def _build_response(body: Dict[str, Any], track: str) -> StreamingResponse:
"""Parse the request per track, then stream the real agent loop as a v6 SSE response."""
body = _normalize_envelope(body)
messages: List[Dict[str, Any]] = body.get("messages") or []
pending = (
_pending_approvals_agenta(body)
if track == "agenta"
else _pending_approvals_uimessage(messages)
)
return StreamingResponse(
agent_loop.run_turn(body, track, pending),
media_type="text/event-stream",
headers={
"x-vercel-ai-ui-message-stream": "v1",
"cache-control": "no-cache",
},
)


@router.post("/api/agent/chat")
async def agent_chat(request: Request) -> StreamingResponse:
"""Track A — request `messages` is the AI SDK `UIMessage[]` shape (`{role, parts}`)."""
return _build_response(await request.json(), track="uimessage")


@router.post("/api/agent/chat-agenta")
async def agent_chat_agenta(request: Request) -> StreamingResponse:
"""Track B — request `messages` is the Agenta `{role, content}` shape; the approval
decision rides in the `tool_approvals` side field."""
return _build_response(await request.json(), track="agenta")


@router.get("/api/agent/health")
async def agent_health() -> Dict[str, str]:
return {"status": "healthy", "endpoint": "agent chat slice (real agent loop)"}
6 changes: 6 additions & 0 deletions examples/python/RAG_QA_chatbot/backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from fastapi.responses import StreamingResponse

from .config import settings
from .contract_stream import router as contract_router
from .rag import format_context, generate, retrieve

# Initialize Agenta for observability
Expand Down Expand Up @@ -64,6 +65,11 @@ class ChatRequest(BaseModel):
model_config = {"extra": "ignore"} # tolerate id, trigger, etc. from AI SDK v4+


# Agent chat slice endpoints (POST /api/agent/chat[-agenta]) — the real agent loop over
# the v6 UI Message Stream. Requires credentials. See backend/contract_stream.py.
app.include_router(contract_router)


@app.get("/health")
async def health():
"""Health check endpoint."""
Expand Down
25 changes: 24 additions & 1 deletion examples/python/RAG_QA_chatbot/backend/rag.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""RAG logic: retrieve and generate."""

import re
from dataclasses import dataclass
from typing import AsyncGenerator, List, Optional, Tuple
from urllib.parse import urlsplit, urlunsplit

import agenta as ag
from agenta.sdk.managers.shared import SharedManager
Expand All @@ -11,6 +13,27 @@

from .config import settings

_DOCUSAURUS_ORDER_PREFIX = re.compile(r"^\d+-")


def normalize_doc_url(url: str) -> str:
"""Strip Docusaurus numeric ordering prefixes (`NN-`) from each path segment.

The `.mdx` filenames carry sidebar-ordering prefixes (`01-architecture.mdx`) that the
public docs site drops from the URL (`/architecture`). Older ingests stored the URL with
the prefix, which 404s — this repairs them at read time so source links resolve.
"""
if not url:
return url
try:
parts = urlsplit(url)
except ValueError:
return url
new_path = "/".join(
_DOCUSAURUS_ORDER_PREFIX.sub("", seg) for seg in parts.path.split("/")
)
return urlunsplit(parts._replace(path=new_path))


@dataclass
class RetrievedDoc:
Expand Down Expand Up @@ -85,7 +108,7 @@ def retrieve(
RetrievedDoc(
content=point.payload["content"],
title=point.payload["title"],
url=point.payload["url"],
url=normalize_doc_url(point.payload["url"]),
score=point.score,
)
)
Expand Down
11 changes: 11 additions & 0 deletions examples/python/RAG_QA_chatbot/env.example
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,14 @@ TOP_K=10
# ===========================================
AGENTA_API_KEY=your-agenta-api-key
AGENTA_HOST=https://cloud.agenta.ai

# ===========================================
# Agent chat slice (POST /api/agent/chat[-agenta])
# ===========================================
# Optional: make the approval-gated `send_summary_email` tool send for real. Without
# these it records the message to sent_emails.jsonl (still a real, inspectable effect).
# SMTP_HOST=smtp.example.com
# SMTP_PORT=587
# SMTP_USER=apikey
# SMTP_PASSWORD=your-smtp-password
# SMTP_FROM=agent@example.com
64 changes: 64 additions & 0 deletions examples/python/RAG_QA_chatbot/ingest/fix_urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Backfill corrected public docs URLs into the vector-store payloads, in place.

The public URL is derived from each doc's file path + frontmatter `slug` (see `loaders.py`:
Docusaurus strips numeric ordering prefixes, and an absolute frontmatter `slug` overrides
the path). Older ingests stored stale URLs (kept the `NN-` prefix, ignored frontmatter
slugs) that 404. This rewrites ONLY the `url` payload field — no re-embedding, no model
cost — by re-deriving URLs with the current loader and matching points by `file_path`.

python -m ingest.fix_urls --source ../../../docs/docs --base-url https://docs.agenta.ai
"""

import argparse
import os
from collections import defaultdict

from dotenv import load_dotenv
from qdrant_client import QdrantClient

from .loaders import load_mdx


def main():
parser = argparse.ArgumentParser(description="Backfill corrected doc URLs in Qdrant")
parser.add_argument("--source", required=True, help="Path to docs directory")
parser.add_argument("--base-url", required=True, help="Base URL for doc links")
parser.add_argument("--collection", default=None, help="Collection (default: from env)")
args = parser.parse_args()

load_dotenv()
collection = args.collection or os.getenv("COLLECTION_NAME", "docs_collection")

url_by_path = {d.file_path: d.url for d in load_mdx(args.source, args.base_url)}
print(f"Re-derived {len(url_by_path)} URLs from {args.source}")

client = QdrantClient(url=os.getenv("QDRANT_URL"), api_key=os.getenv("QDRANT_API_KEY"))

pending: dict[str, list] = defaultdict(list) # correct_url -> [point ids needing it]
scanned = 0
offset = None
while True:
points, offset = client.scroll(
collection, limit=256, with_payload=True, offset=offset
)
for p in points:
scanned += 1
correct = url_by_path.get(p.payload.get("file_path"))
if correct and correct != p.payload.get("url"):
pending[correct].append(p.id)
if offset is None:
break

updated = 0
for url, ids in pending.items():
client.set_payload(collection, payload={"url": url}, points=ids)
updated += len(ids)

print(
f"Scanned {scanned} points; updated {updated} URLs across "
f"{len(pending)} docs in '{collection}'."
)


if __name__ == "__main__":
main()
16 changes: 13 additions & 3 deletions examples/python/RAG_QA_chatbot/ingest/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import glob
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import List
Expand Down Expand Up @@ -41,9 +42,18 @@ def load_mdx(docs_path: str, base_url: str) -> List[Document]:
# Get title from frontmatter or filename
title = post.get("title", Path(file_path).stem)

# Convert file path to URL
relative_path = os.path.relpath(file_path, docs_path)
url_path = os.path.splitext(relative_path)[0]
# Convert file path to the public docs URL. Docusaurus strips numeric
# ordering prefixes (`01-architecture.mdx` → `/architecture`), so strip
# `NN-` from each path segment. An absolute frontmatter `slug` wins.
slug = post.get("slug")
if isinstance(slug, str) and slug.startswith("/"):
url_path = slug.strip("/")
else:
relative_path = os.path.relpath(file_path, docs_path)
no_ext = os.path.splitext(relative_path)[0]
url_path = "/".join(
re.sub(r"^\d+-", "", seg) for seg in no_ext.split(os.sep)
)
url = f"{base_url.rstrip('/')}/{url_path}"

documents.append(
Expand Down
Loading
Loading