From 83c3f3df2cc6ba96898000f768339e0569fad639 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 1 Jun 2026 22:31:12 -0700 Subject: [PATCH 1/3] docs: add core public API design doc for inline review Consolidated proposal incorporating leseb's expanded 14-function vision with phased implementation plan. Intended for inline PR review per @franciscojavierarceo's request. Ref: #42 Signed-off-by: Ashwin Giridharan --- docs/design/core-public-api.md | 625 +++++++++++++++++++++++++++++++++ 1 file changed, 625 insertions(+) create mode 100644 docs/design/core-public-api.md diff --git a/docs/design/core-public-api.md b/docs/design/core-public-api.md new file mode 100644 index 0000000..4ae2924 --- /dev/null +++ b/docs/design/core-public-api.md @@ -0,0 +1,625 @@ +# Design: `agentic-core` Public API + +> Status: Draft — soliciting inline review +> References: [ADR-03](../adr/ADR-03_gateway_integration.md), [Issue #42](https://github.com/vllm-project/agentic-api/issues/42), [Praxis #354](https://github.com/praxis-proxy/praxis/issues/354) +> Owner: @ashwing + +--- + +## Overview + +`agentic-core` exposes the agentic loop as composable step functions. Each function is: +- Independently testable (no HTTP server needed) +- Wrappable in a Praxis `HttpFilter` (1:1 mapping per ADR-03) +- Callable directly in standalone mode via `execute()` + +This design incorporates @leseb's expanded proposal (14 functions, 8 traits) and organizes it into implementation phases. + +**Relationship to existing code:** PR #33 landed a concrete `ConversationStore` + `ResponseStore` with SQLx. This design defines the *orchestration-level* traits that wrap or delegate to those concrete stores. The implementation PRs will wire against PR #33's types directly. + +**Step numbering:** Steps are numbered sequentially within each phase (A1, A2, ... B1, B2, ... C1, C2, ...). The Praxis filter mapping table at the end shows how these map to @leseb's filter chain order. + +--- + +## Shared State: `AgenticState` + +Mutable, request-scoped struct that flows through the entire loop. Each step reads some fields and writes others. + +```rust +pub struct AgenticState { + // Identity (set once by validate_request, read-only thereafter) + pub response_id: String, + pub conversation_id: Option, + pub tenant_id: Option, + pub model: String, + pub previous_response_id: Option, + pub request: Value, // original request preserved for forwarding + + // Routing flags (set once by validate_request) + pub store_enabled: bool, + pub stream_enabled: bool, + pub background: bool, + + // Conversation (written by rehydrate, read by call_inference) + pub messages: Vec, + + // Tools (written by dispatch_tools, read by call_inference on loop) + pub tools: Vec, + pub tool_choice: Value, + pub tool_calls: Vec, + + // Output (written by transform_stream, read by persist) + pub output_items: Vec, + pub response_object: Value, + pub usage: Value, + pub status: ResponseStatus, + + // Loop control + pub iteration: u32, + + // Runtime + pub config: AgenticConfig, +} + +pub struct AgenticConfig { + pub max_tool_iterations: u32, + pub compaction_enabled: bool, + pub compaction_threshold_tokens: u64, + pub reasoning_summary_enabled: bool, + pub default_model: Option, +} + +impl Default for AgenticConfig { + fn default() -> Self { + Self { + max_tool_iterations: 10, + compaction_enabled: false, + compaction_threshold_tokens: 100_000, + reasoning_summary_enabled: false, + default_model: None, + } + } +} + +pub enum ResponseStatus { + Queued, + InProgress, + Completed, + Incomplete(String), + Failed(String), + Cancelled, +} + +pub enum LoopDecision { + Continue, + Done, + Incomplete(String), +} + +pub enum BackendFormat { + Responses, + ChatCompletions, +} +``` + +--- + +## Step Functions + +### Phase A: Core Loop + +These are the minimum set needed for a working agentic loop. + +#### A1: `validate_request` + +```rust +pub fn validate_request( + body: &Value, + config: &AgenticConfig, +) -> Result +``` + +Parse the incoming request. Extract routing flags (`stream`, `store`, `background`), `model`, `previous_response_id`, `conversation_id`. Generate `response_id`. Validate constraints (`background=true && store=false` is invalid). Return fully initialized `AgenticState`. + +Does NOT validate inference-specific params (temperature, top_p, etc.) — those are the backend's responsibility. + +**Gateway filter:** `request_validate` + +--- + +#### A2: `rehydrate_conversation` + +```rust +pub async fn rehydrate_conversation( + state: &mut AgenticState, + store: &dyn ResponseStore, +) -> Result<(), AgenticError> +``` + +Load conversation history from `previous_response_id` or `conversation_id`. Reconstruct message list. Append current input. + +- If `previous_response_id`: fetch stored response + messages. Returns `AgenticError::Validation` if the previous response is incomplete/in-progress/cancelled (cannot continue from an unfinished response). +- If `conversation_id` only: fetch conversation messages +- If neither: pass current input through (messages extracted from `state.request`) + +**Gateway filter:** `rehydrate` + +--- + +#### A3: `call_inference` + +```rust +pub async fn call_inference( + state: &AgenticState, // intentionally &, not &mut — inference is read-only on state + client: &dyn InferenceClient, +) -> Result<(InferenceStream, BackendFormat), AgenticError> +``` + +Build the inference request from `state.messages` + `state.tools` + `state.tool_choice`. Delegate to `InferenceClient`. Return raw stream + format indicator. Takes `&AgenticState` (not `&mut`) because it only reads — state mutation happens in `transform_stream` downstream. + +The `InferenceClient` implementation handles request format conversion (Responses vs Chat Completions) internally. + +**Gateway filter:** `responses_proxy` + +--- + +#### A4: `transform_stream` + +```rust +/// Non-streaming: drains the stream, accumulates into state, returns collected events. +pub async fn transform_stream( + state: &mut AgenticState, + raw_stream: InferenceStream, + format: BackendFormat, +) -> Result, AgenticError> + +/// Streaming variant: returns a stream that yields events AND accumulates into shared state. +/// Note: exact ownership model (Arc>, channel-based, or split-state) is TBD — see Open Question 5. +pub fn transform_stream_live( + state: /* shared state handle — see Open Question 5 */, + raw_stream: InferenceStream, + format: BackendFormat, +) -> ResponsesEventStream + +pub type ResponsesEventStream = Pin> + Send>>; +``` + +The SSE state machine. Parses raw bytes from the backend into typed `ResponsesEvent` values. Simultaneously accumulates into `state` (tool_calls, output_items, usage, status). + +**Two variants:** +- `transform_stream` (non-streaming / tool-loop path): consumes the stream fully, populates `state`, returns collected events. Used by `execute()` in the tool loop where we need all events before deciding whether to dispatch tools. +- `transform_stream_live` (streaming to client): returns a new stream that yields events as they arrive while accumulating into shared state via `Arc>`. Used by `agentic-server` when forwarding SSE to clients in real-time. + +When `format` is `Responses`: minimal transformation (assign sequence numbers). +When `format` is `ChatCompletions`: full transformation — all 24 Responses API event types (see [OpenAI Responses API streaming docs](https://platform.openai.com/docs/api-reference/responses/streaming)). + +```rust +pub enum ResponsesEvent { + // Response lifecycle + ResponseCreated(Value), + ResponseInProgress(Value), + ResponseCompleted(Value), + ResponseIncomplete(Value), + ResponseFailed(Value), + // Output items + OutputItemAdded(Value), + OutputItemDone(Value), + // Content parts + ContentPartAdded(Value), + ContentPartDone(Value), + // Text streaming + OutputTextDelta { delta: String, sequence_number: u64 }, + OutputTextDone { text: String, annotations: Vec, sequence_number: u64 }, + OutputTextAnnotationAdded(Value), + // Function calls + FunctionCallArgumentsDelta { delta: String, sequence_number: u64 }, + FunctionCallArgumentsDone { arguments: String, sequence_number: u64 }, + // Refusal + RefusalDelta { delta: String, sequence_number: u64 }, + RefusalDone { refusal: String, sequence_number: u64 }, + // Reasoning + ReasoningDelta { delta: String, sequence_number: u64 }, + ReasoningDone(Value), + ReasoningSummaryTextDelta { delta: String, sequence_number: u64 }, + ReasoningSummaryTextDone(Value), + ReasoningSummaryPartAdded(Value), + ReasoningSummaryPartDone(Value), + // Error + Error(Value), +} +``` + +**Gateway filter:** `stream_events` + +--- + +#### A5: `dispatch_tools` + +```rust +pub async fn dispatch_tools( + state: &mut AgenticState, + deps: &AgenticDeps, // see "Dependency Bundle" section below +) -> Result +``` + +Classify each tool call in `state.tool_calls` and dispatch to the appropriate executor from `deps`: +- **Function tool** (client-side): add to output, return `Done` +- **MCP tool**: execute via `deps.mcp_executor` +- **web_search**: execute via `deps.web_search` +- **file_search**: execute via `deps.vector_store` + +Tool executors that aren't configured (None in deps) skip silently — the tool call is treated as client-side. + +After server-side execution: append results to `state.messages`, increment `state.iteration`. + +Returns `LoopDecision`: +- `Continue` — all server-side, results ready, loop back to inference +- `Done` — no tool calls, or client-side function calls present +- `Incomplete` — iteration limit or `finish_reason == "length"` + +**Gateway filter:** `tool_dispatch` (with branch chain for loop control) + +--- + +#### A6: `persist_response` + +```rust +pub async fn persist_response( + state: &AgenticState, // &, not &mut — persistence is read-only on state + store: &dyn ResponseStore, +) -> Result<(), AgenticError> +``` + +Save final response + messages to store. Skip when `state.store_enabled` is false. Takes `&AgenticState` (not `&mut`) because it only reads accumulated state — no mutations at this point. + +**Gateway filter:** `response_store` (response phase) + +--- + +#### A7 (standalone only): `execute` + +```rust +pub async fn execute( + body: Value, + config: AgenticConfig, + deps: &AgenticDeps, +) -> Result +``` + +Standalone entry point. Composes Phase A steps with default loop logic: + +```text +state = validate_request(body, config)? +rehydrate_conversation(&mut state, &*deps.store).await? + +loop { + let (stream, format) = call_inference(&state, &*deps.inference).await? + let _events = transform_stream(&mut state, stream, format).await? + // state.tool_calls, state.output_items, state.usage now populated + + match dispatch_tools(&mut state, &deps).await? { + LoopDecision::Continue => continue, + LoopDecision::Done => break, + LoopDecision::Incomplete(r) => { state.status = Incomplete(r); break; } + } +} + +persist_response(&state, &*deps.store).await? +Ok(state.response_object) +``` + +Note: `execute` uses the non-streaming `transform_stream` variant. For streaming to clients, `agentic-server` uses `transform_stream_live` and drives the event stream directly. + +--- + +### Phase B: Tool Executors + +Trait implementations for the tool dispatch layer. Phase A defines the trait signatures; Phase B provides concrete implementations. + +Note: `HashMap` below refers to `std::collections::HashMap` (used for HTTP headers in MCP server configs). + +#### B1: `McpToolExecutor` + +```rust +#[async_trait] +pub trait McpToolExecutor: Send + Sync { + /// Execute an MCP tool call. Session management is internal to the implementation. + async fn execute( + &self, + tool_name: &str, + arguments: &Value, + server_config: &Value, + ) -> Result; +} +``` + +Implementations manage MCP sessions internally (create/reuse/close keyed on endpoint + headers). The session management traits below are implementation details, not part of the public API: + +```rust +// Internal to MCP executor implementations — not in public API +pub trait McpSessionManager: Send + Sync { + fn get_or_create_session( + &self, + endpoint_key: &str, + server_url: &str, + headers: &HashMap, + ) -> Result, AgenticError>; +} + +#[async_trait] +pub trait McpSession: Send + Sync { + async fn call_tool(&self, name: &str, arguments: &Value) -> Result; + async fn close(&self) -> Result<(), AgenticError>; +} +``` + +#### B2: `McpToolProvider` + +```rust +#[async_trait] +pub trait McpToolProvider: Send + Sync { + async fn list_tools( + &self, + server_url: &str, + headers: &HashMap, + ) -> Result, AgenticError>; +} +``` + +#### B3: `WebSearchProvider` + +```rust +#[async_trait] +pub trait WebSearchProvider: Send + Sync { + async fn search(&self, query: &str, context_size: ContextSize) -> Result; +} + +pub enum ContextSize { Low, Medium, High } +``` + +#### B4: `VectorStoreClient` + +```rust +#[async_trait] +pub trait VectorStoreClient: Send + Sync { + async fn search( + &self, + store_id: &str, + query: &str, + options: &FileSearchOptions, + ) -> Result, AgenticError>; +} + +pub struct FileSearchOptions { + pub max_num_results: u32, + pub filters: Option, + pub ranking_options: Option, +} +``` + +--- + +### Phase C: Advanced Features + +Optional steps that enhance the loop but aren't required for MVP. + +**Note on ordering:** C1 (`init_store`) and C3 (`parse_tools`) logically run early in the pipeline (before inference). In Phase A without them, `dispatch_tools` works because it reads `state.tool_calls` populated by `transform_stream` from the LLM's output — tool *definitions* are forwarded as-is in `state.tools` from the original request. `parse_tools` adds MCP listing and normalization on top of that basic passthrough. + +#### C1: `init_store` + +```rust +pub async fn init_store( + state: &AgenticState, + store: &dyn ResponseStore, +) -> Result<(), AgenticError> +``` + +Create initial response record (status=queued for background, status=in_progress otherwise). Runs before rehydration so the response ID is persisted early. + +#### C2: `resolve_files` + +```rust +pub async fn resolve_files( + state: &mut AgenticState, + file_store: &dyn FileStore, +) -> Result<(), AgenticError> +``` + +Walk `state.messages`, resolve `file_id` references to inline content via `FileStore` trait. + +#### C3: `parse_tools` + +```rust +pub async fn parse_tools( + state: &mut AgenticState, + mcp_provider: &dyn McpToolProvider, +) -> Result<(), AgenticError> +``` + +Parse tool definitions. For MCP: call `tools/list`, build tool map. Normalize `tool_choice`. Writes to `state.tools` and `state.tool_choice`. + +#### C4: `compact_context` (opt-in via config) + +```rust +pub async fn compact_context( + state: &mut AgenticState, + inference: &dyn InferenceClient, + store: &dyn ResponseStore, +) -> Result +``` + +Token counting + summarization. Only runs when `config.compaction_enabled` is true and threshold is exceeded. Returns `true` if compaction occurred. + +#### C5: `summarize_reasoning` (opt-in via config) + +```rust +pub async fn summarize_reasoning( + state: &mut AgenticState, + inference: &dyn InferenceClient, +) -> Result>, AgenticError> +``` + +Post-streaming reasoning summary generation. Only runs when `config.reasoning_summary_enabled` is true. Runs after the tool loop completes. + +#### C6: `FileStore` trait + +```rust +#[async_trait] +pub trait FileStore: Send + Sync { + async fn get_file( + &self, + file_id: &str, + ) -> Result, AgenticError>; +} + +pub struct FileContent { + pub data: Vec, + pub mime_type: String, + pub filename: Option, +} +``` + +--- + +## Dependency Bundle + +```rust +pub struct AgenticDeps { + // Required (Phase A) + pub store: Arc, + pub inference: Arc, + + // Phase B: tool executors (None = tool calls treated as client-side) + pub mcp_executor: Option>, + pub web_search: Option>, + pub vector_store: Option>, + + // Phase C: advanced features + pub file_store: Option>, + pub mcp_provider: Option>, +} +``` + +Only `store` and `inference` are required. A minimal deployment (proxy + persistence, no server-side tools) only needs those two. All `Option` fields are populated as their respective phases are implemented — the struct definition is stable across all phases. + +**Note on `McpSessionManager`:** Session lifecycle is internal to the `McpToolExecutor` implementation. The executor manages its own session pool — callers don't need to provide sessions explicitly. This keeps the public API surface simple while allowing different session strategies per implementation. + +--- + +## Traits + +### `ResponseStore` + +The orchestration-level store trait. PR #33's concrete `ConversationStore` + `ResponseStore` types are the first implementation. This trait abstracts over them for the step functions. + +```rust +#[async_trait] +pub trait ResponseStore: Send + Sync { + async fn get_response(&self, response_id: &str) -> Result, AgenticError>; + async fn insert_response(&self, response: &Value) -> Result<(), AgenticError>; + async fn update_response(&self, response_id: &str, update: &Value) -> Result<(), AgenticError>; + + async fn get_messages(&self, response_id: &str) -> Result>, AgenticError>; + async fn store_messages(&self, response_id: &str, messages: &[Value]) -> Result<(), AgenticError>; + + async fn list_input_items( + &self, response_id: &str, limit: u32, cursor: Option<&str>, + ) -> Result; +} +``` + +**Note:** PR #33 has `ConversationStore` (conversation-level ops) and `ResponseStore` (response-level ops) as separate concrete types. The orchestration trait above unifies them — the implementation delegates to both under the hood. Whether to keep this unified or split into two traits is an open question for review. + +Implementations: SQLx wrapper around PR #33 (default), OGX (PR #34), InMemory (testing). + +### `InferenceClient` + +```rust +#[async_trait] +pub trait InferenceClient: Send + Sync { + async fn call( + &self, + request: &Value, + config: &AgenticConfig, + ) -> Result<(InferenceStream, BackendFormat), AgenticError>; +} + +pub type InferenceStream = Pin> + Send>>; +``` + +### `AgenticError` + +```rust +#[derive(Debug, thiserror::Error)] +pub enum AgenticError { + #[error("validation: {0}")] + Validation(String), + + #[error("store: {0}")] + Store(#[source] Box), + + #[error("inference: {0}")] + Inference(String), + + #[error("inference timeout after {timeout_s}s")] + InferenceTimeout { timeout_s: f64 }, + + #[error("tool dispatch: {tool_name}: {message}")] + ToolDispatch { tool_name: String, message: String }, + + #[error("max iterations ({max}) reached")] + MaxIterations { max: u32 }, + + #[error("response not found: {0}")] + NotFound(String), + + #[error("stream transform: {0}")] + StreamTransform(String), +} +``` + +--- + +## Gateway Integration (Praxis) + +Each step function maps to exactly one Praxis filter. The "Filter #" column shows the order in the Praxis filter chain (from @leseb's proposal): + +| Step | Core Function | Praxis Filter # | Praxis Filter Name | Phase | +|------|---------------|----------------|--------------------|-------| +| A1 | `validate_request()` | 0 | `request_validate` | A | +| A2 | `rehydrate_conversation()` | 2 | `rehydrate` | A | +| A3 | `call_inference()` | 5 | `responses_proxy` | A | +| A4 | `transform_stream()` | 6 | `stream_events` | A | +| A5 | `dispatch_tools()` | 7 | `tool_dispatch` | A | +| A6 | `persist_response()` | 13 | `response_store` | A | +| B1 | `McpToolExecutor::execute()` | 8 | `mcp_tool` | B | +| B3 | `WebSearchProvider::search()` | 9 | `web_search` | B | +| B4 | `VectorStoreClient::search()` | 10 | `file_search` | B | +| C1 | `init_store()` | 1 | `response_store` (init) | C | +| C2 | `resolve_files()` | 3 | `file_resolve` | C | +| C3 | `parse_tools()` | 4 | `tool_parse` | C | +| C4 | `compact_context()` | 11 | `compact` | C | +| C5 | `summarize_reasoning()` | 12 | `reasoning` | C | + +**Note:** Phase B entries (B1, B3, B4) are trait method calls invoked internally by A5 (`dispatch_tools`). In Praxis, @leseb's proposal exposes them as separate filters (8, 9, 10) for per-tool-type observability and independent configuration. B2 (`McpToolProvider`) has no corresponding filter — it's a setup-time operation used by C3 (`parse_tools`). + +Tool dispatch uses Praxis branch chains for loop control: +```yaml +- filter: tool_dispatch + branch_chains: + - name: tool-loop + on_result: { filter: tool_dispatch, key: action, result: loop } + rejoin: responses_proxy + max_iterations: 10 +``` + +--- + +## Open Questions + +1. **Compact/reasoning as opt-in or mandatory?** Currently proposed as opt-in via `AgenticConfig` flags. If mandatory, they add latency on every request. +2. **Per-tool-type executors: public functions or trait methods?** This proposal keeps them as trait methods called by `dispatch_tools`. Alternative: expose as standalone functions per leseb's proposal. +3. **Praxis #354 status:** Is the filter decomposition accepted? Affects how tightly we couple step numbering. +4. **ResponseStore: unified or split?** PR #33 has separate `ConversationStore` + `ResponseStore`. Should the orchestration trait unify them or keep them separate? +5. **`transform_stream` borrow strategy:** Need to validate in Rust whether `&mut state` + returned stream works, or if we need interior mutability (`RefCell`/`Mutex`) or a consume-and-return pattern. +6. **`AgenticState` field visibility:** All fields are `pub` for simplicity. Should we add accessor methods to enforce read/write contracts per step? From bc210e50ec31c77cfd979a0b7e67b91b9b3def3c Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 3 Jun 2026 16:54:04 -0700 Subject: [PATCH 2/3] docs: rewrite core API design to reference PR #46 as foundation Reframes the design doc as a hybrid reference: - Acknowledges PR #46 (maralbahari) as the base executor loop - Defines clear ownership boundaries (base loop vs tool dispatch) - Organizes into 4 implementation phases, each = one PR - Phase 1 (SSE events) independent of PR #46 - Phases 2-4 build on top of PR #46 Removes speculative API surface (AgenticState, AgenticConfig, full trait definitions) in favor of concrete code snippets matching actual implementation targets. Keeps just enough detail to execute follow-up PRs without over-specifying. Signed-off-by: Ashwin Giridharan --- docs/design/core-public-api.md | 669 +++++++-------------------------- 1 file changed, 138 insertions(+), 531 deletions(-) diff --git a/docs/design/core-public-api.md b/docs/design/core-public-api.md index 4ae2924..8d4c1e6 100644 --- a/docs/design/core-public-api.md +++ b/docs/design/core-public-api.md @@ -1,625 +1,232 @@ # Design: `agentic-core` Public API -> Status: Draft — soliciting inline review +> Status: Active — implementation in progress > References: [ADR-03](../adr/ADR-03_gateway_integration.md), [Issue #42](https://github.com/vllm-project/agentic-api/issues/42), [Praxis #354](https://github.com/praxis-proxy/praxis/issues/354) -> Owner: @ashwing +> Owner: @ashwing (tool dispatch, loop control, streaming tee) + @maralbahari (base loop, store integration) --- -## Overview +## Foundation: PR #46 -`agentic-core` exposes the agentic loop as composable step functions. Each function is: -- Independently testable (no HTTP server needed) -- Wrappable in a Praxis `HttpFilter` (1:1 mapping per ADR-03) -- Callable directly in standalone mode via `execute()` +[PR #46](https://github.com/vllm-project/agentic-api/pull/46) by @maralbahari implements the base executor loop for text-only stateful conversations: -This design incorporates @leseb's expanded proposal (14 functions, 8 traits) and organizes it into implementation phases. +| Function | File | What it does | +|----------|------|--------------| +| `execute()` | `executor/engine.rs` | Entry point — rehydrate → infer → persist | +| `rehydrate_conversation()` | `executor/engine.rs` | Load history from store, build enriched request | +| `call_inference()` | `executor/engine.rs` | Stream SSE from vLLM backend | +| `persist_response()` | `executor/engine.rs` | Save response + items to store | +| `ResponseAccumulator` | `executor/accumulator.rs` | SSE state machine — collects stream into ResponsePayload | +| `ExecutionContext` | `executor/request.rs` | Runtime deps: handlers, HTTP client, LLM URL | +| `RequestContext` | `executor/request.rs` | Per-turn state: original + enriched request, IDs | -**Relationship to existing code:** PR #33 landed a concrete `ConversationStore` + `ResponseStore` with SQLx. This design defines the *orchestration-level* traits that wrap or delegate to those concrete stores. The implementation PRs will wire against PR #33's types directly. - -**Step numbering:** Steps are numbered sequentially within each phase (A1, A2, ... B1, B2, ... C1, C2, ...). The Praxis filter mapping table at the end shows how these map to @leseb's filter chain order. - ---- - -## Shared State: `AgenticState` - -Mutable, request-scoped struct that flows through the entire loop. Each step reads some fields and writes others. - -```rust -pub struct AgenticState { - // Identity (set once by validate_request, read-only thereafter) - pub response_id: String, - pub conversation_id: Option, - pub tenant_id: Option, - pub model: String, - pub previous_response_id: Option, - pub request: Value, // original request preserved for forwarding - - // Routing flags (set once by validate_request) - pub store_enabled: bool, - pub stream_enabled: bool, - pub background: bool, - - // Conversation (written by rehydrate, read by call_inference) - pub messages: Vec, - - // Tools (written by dispatch_tools, read by call_inference on loop) - pub tools: Vec, - pub tool_choice: Value, - pub tool_calls: Vec, - - // Output (written by transform_stream, read by persist) - pub output_items: Vec, - pub response_object: Value, - pub usage: Value, - pub status: ResponseStatus, - - // Loop control - pub iteration: u32, - - // Runtime - pub config: AgenticConfig, -} - -pub struct AgenticConfig { - pub max_tool_iterations: u32, - pub compaction_enabled: bool, - pub compaction_threshold_tokens: u64, - pub reasoning_summary_enabled: bool, - pub default_model: Option, -} - -impl Default for AgenticConfig { - fn default() -> Self { - Self { - max_tool_iterations: 10, - compaction_enabled: false, - compaction_threshold_tokens: 100_000, - reasoning_summary_enabled: false, - default_model: None, - } - } -} - -pub enum ResponseStatus { - Queued, - InProgress, - Completed, - Incomplete(String), - Failed(String), - Cancelled, -} - -pub enum LoopDecision { - Continue, - Done, - Incomplete(String), -} - -pub enum BackendFormat { - Responses, - ChatCompletions, -} -``` +This design builds on top of PR #46 — it does not duplicate or replace that work. --- -## Step Functions - -### Phase A: Core Loop - -These are the minimum set needed for a working agentic loop. +## What This Design Adds -#### A1: `validate_request` +The base loop handles text messages. This design extends it with: -```rust -pub fn validate_request( - body: &Value, - config: &AgenticConfig, -) -> Result -``` - -Parse the incoming request. Extract routing flags (`stream`, `store`, `background`), `model`, `previous_response_id`, `conversation_id`. Generate `response_id`. Validate constraints (`background=true && store=false` is invalid). Return fully initialized `AgenticState`. - -Does NOT validate inference-specific params (temperature, top_p, etc.) — those are the backend's responsibility. - -**Gateway filter:** `request_validate` +1. **Tool dispatch** — detect function_call items in output, execute via traits, loop back +2. **Loop control** — `LoopDecision` enum driving re-entry with iteration limits +3. **Streaming tee** — forward SSE to client in real-time while accumulating for tool detection +4. **Extended SSE events** — function_call, reasoning, file_search, web_search event types +5. **Tool executor traits** — MCP, web_search, vector_store as pluggable implementations --- -#### A2: `rehydrate_conversation` +## Implementation Phases -```rust -pub async fn rehydrate_conversation( - state: &mut AgenticState, - store: &dyn ResponseStore, -) -> Result<(), AgenticError> -``` - -Load conversation history from `previous_response_id` or `conversation_id`. Reconstruct message list. Append current input. +Each phase = one PR with tests. Phases are ordered by dependency. -- If `previous_response_id`: fetch stored response + messages. Returns `AgenticError::Validation` if the previous response is incomplete/in-progress/cancelled (cannot continue from an unfinished response). -- If `conversation_id` only: fetch conversation messages -- If neither: pass current input through (messages extracted from `state.request`) +### Phase 1: SSE Event Extension (no dependency on PR #46) -**Gateway filter:** `rehydrate` +**PR scope:** Extend `types/event.rs` and `accumulator.rs` on current main. ---- - -#### A3: `call_inference` +- Add ~12 new `SSEEventType` variants for function_call, reasoning, output_item lifecycle +- Extend `ResponseAccumulator::process_sse_line()` to detect and collect `OutputItem::FunctionCall` +- Unit tests verifying accumulator correctly builds FunctionCall items from SSE deltas ```rust -pub async fn call_inference( - state: &AgenticState, // intentionally &, not &mut — inference is read-only on state - client: &dyn InferenceClient, -) -> Result<(InferenceStream, BackendFormat), AgenticError> +// New variants in SSEEventType +pub enum SSEEventType { + // Existing (PR #46) + ResponseCreated, + ResponseOutputItemAdded, + ResponseOutputTextDelta, + ResponseDone, + // New — Phase 1 + ResponseOutputItemDone, // detect completed tool calls + FunctionCallArgumentsDelta, // streaming function args + FunctionCallArgumentsDone, // complete function call + ResponseOutputTextDone, // text completion signal + ContentPartAdded, + ContentPartDone, + ReasoningSummaryTextDelta, + ReasoningSummaryTextDone, + // Catch-all + Other, +} ``` -Build the inference request from `state.messages` + `state.tools` + `state.tool_choice`. Delegate to `InferenceClient`. Return raw stream + format indicator. Takes `&AgenticState` (not `&mut`) because it only reads — state mutation happens in `transform_stream` downstream. - -The `InferenceClient` implementation handles request format conversion (Responses vs Chat Completions) internally. - -**Gateway filter:** `responses_proxy` +**Size:** ~200 lines | **Blocked by:** nothing | **Target:** 3rd merged PR --- -#### A4: `transform_stream` - -```rust -/// Non-streaming: drains the stream, accumulates into state, returns collected events. -pub async fn transform_stream( - state: &mut AgenticState, - raw_stream: InferenceStream, - format: BackendFormat, -) -> Result, AgenticError> - -/// Streaming variant: returns a stream that yields events AND accumulates into shared state. -/// Note: exact ownership model (Arc>, channel-based, or split-state) is TBD — see Open Question 5. -pub fn transform_stream_live( - state: /* shared state handle — see Open Question 5 */, - raw_stream: InferenceStream, - format: BackendFormat, -) -> ResponsesEventStream - -pub type ResponsesEventStream = Pin> + Send>>; -``` - -The SSE state machine. Parses raw bytes from the backend into typed `ResponsesEvent` values. Simultaneously accumulates into `state` (tool_calls, output_items, usage, status). +### Phase 2: Loop Control + Tool Dispatch (depends on PR #46) -**Two variants:** -- `transform_stream` (non-streaming / tool-loop path): consumes the stream fully, populates `state`, returns collected events. Used by `execute()` in the tool loop where we need all events before deciding whether to dispatch tools. -- `transform_stream_live` (streaming to client): returns a new stream that yields events as they arrive while accumulating into shared state via `Arc>`. Used by `agentic-server` when forwarding SSE to clients in real-time. +**PR scope:** `executor/dispatch.rs`, `executor/tool_context.rs`, extend `engine.rs`. -When `format` is `Responses`: minimal transformation (assign sequence numbers). -When `format` is `ChatCompletions`: full transformation — all 24 Responses API event types (see [OpenAI Responses API streaming docs](https://platform.openai.com/docs/api-reference/responses/streaming)). +Core contribution — the agentic loop re-entry mechanism: ```rust -pub enum ResponsesEvent { - // Response lifecycle - ResponseCreated(Value), - ResponseInProgress(Value), - ResponseCompleted(Value), - ResponseIncomplete(Value), - ResponseFailed(Value), - // Output items - OutputItemAdded(Value), - OutputItemDone(Value), - // Content parts - ContentPartAdded(Value), - ContentPartDone(Value), - // Text streaming - OutputTextDelta { delta: String, sequence_number: u64 }, - OutputTextDone { text: String, annotations: Vec, sequence_number: u64 }, - OutputTextAnnotationAdded(Value), - // Function calls - FunctionCallArgumentsDelta { delta: String, sequence_number: u64 }, - FunctionCallArgumentsDone { arguments: String, sequence_number: u64 }, - // Refusal - RefusalDelta { delta: String, sequence_number: u64 }, - RefusalDone { refusal: String, sequence_number: u64 }, - // Reasoning - ReasoningDelta { delta: String, sequence_number: u64 }, - ReasoningDone(Value), - ReasoningSummaryTextDelta { delta: String, sequence_number: u64 }, - ReasoningSummaryTextDone(Value), - ReasoningSummaryPartAdded(Value), - ReasoningSummaryPartDone(Value), - // Error - Error(Value), +pub enum LoopDecision { + Continue(Vec), // tool results to append, re-enter inference + Done, // no tool calls, response is final + Incomplete(String), // max iterations or unrecoverable failure } -``` - -**Gateway filter:** `stream_events` - ---- -#### A5: `dispatch_tools` - -```rust pub async fn dispatch_tools( - state: &mut AgenticState, - deps: &AgenticDeps, // see "Dependency Bundle" section below -) -> Result -``` - -Classify each tool call in `state.tool_calls` and dispatch to the appropriate executor from `deps`: -- **Function tool** (client-side): add to output, return `Done` -- **MCP tool**: execute via `deps.mcp_executor` -- **web_search**: execute via `deps.web_search` -- **file_search**: execute via `deps.vector_store` - -Tool executors that aren't configured (None in deps) skip silently — the tool call is treated as client-side. - -After server-side execution: append results to `state.messages`, increment `state.iteration`. + output: &[OutputItem], + tool_ctx: &ToolContext, + iteration: usize, + max_iterations: usize, +) -> ExecutorResult -Returns `LoopDecision`: -- `Continue` — all server-side, results ready, loop back to inference -- `Done` — no tool calls, or client-side function calls present -- `Incomplete` — iteration limit or `finish_reason == "length"` - -**Gateway filter:** `tool_dispatch` (with branch chain for loop control) +pub async fn execute_loop( + request: RequestPayload, + exec_ctx: Arc, + tool_ctx: &ToolContext, +) -> ExecutorResult> +``` ---- +`execute_loop` wraps PR #46's `execute()`: +1. Rehydrate (from PR #46) +2. Call inference (from PR #46) +3. Accumulate response +4. Check output for `OutputItem::FunctionCall` → dispatch → loop or done +5. Persist final response -#### A6: `persist_response` +`ToolContext` holds optional executor references: ```rust -pub async fn persist_response( - state: &AgenticState, // &, not &mut — persistence is read-only on state - store: &dyn ResponseStore, -) -> Result<(), AgenticError> +pub struct ToolContext { + pub mcp_executor: Option>, + pub web_search: Option>, + pub vector_store: Option>, + pub max_iterations: usize, +} ``` -Save final response + messages to store. Skip when `state.store_enabled` is false. Takes `&AgenticState` (not `&mut`) because it only reads accumulated state — no mutations at this point. - -**Gateway filter:** `response_store` (response phase) +**Size:** ~400 lines | **Blocked by:** PR #46 merge | **Target:** first feature PR (Phase 2 of committer track) --- -#### A7 (standalone only): `execute` - -```rust -pub async fn execute( - body: Value, - config: AgenticConfig, - deps: &AgenticDeps, -) -> Result -``` +### Phase 3: Streaming Tee (depends on PR #46) -Standalone entry point. Composes Phase A steps with default loop logic: +**PR scope:** `executor/stream_tee.rs`, refactor `run_stream` path. -```text -state = validate_request(body, config)? -rehydrate_conversation(&mut state, &*deps.store).await? +PR #46's streaming path accumulates everything before emitting to client. This replaces it with a tee: -loop { - let (stream, format) = call_inference(&state, &*deps.inference).await? - let _events = transform_stream(&mut state, stream, format).await? - // state.tool_calls, state.output_items, state.usage now populated - - match dispatch_tools(&mut state, &deps).await? { - LoopDecision::Continue => continue, - LoopDecision::Done => break, - LoopDecision::Incomplete(r) => { state.status = Incomplete(r); break; } - } +```rust +pub struct StreamTee { + client_tx: mpsc::Sender, // forward to client + accumulator: ResponseAccumulator, // detect tool calls } -persist_response(&state, &*deps.store).await? -Ok(state.response_object) +impl StreamTee { + pub fn split( + raw_stream: impl Stream>, + conversation_id: Option<&str>, + ) -> (BoxStream, impl Future) +} ``` -Note: `execute` uses the non-streaming `transform_stream` variant. For streaming to clients, `agentic-server` uses `transform_stream_live` and drives the event stream directly. +Returns two handles: +- `BoxStream` — yields SSE events to client in real-time +- `Future` — resolves when stream completes, contains accumulated output for tool detection ---- +This enables ADR-01's requirement: "SSE stream to the client is interleaved with the tool loop — events go out in real time, not buffered until done." -### Phase B: Tool Executors +**Size:** ~300 lines | **Blocked by:** PR #46 merge | **Target:** feature PR -Trait implementations for the tool dispatch layer. Phase A defines the trait signatures; Phase B provides concrete implementations. +--- -Note: `HashMap` below refers to `std::collections::HashMap` (used for HTTP headers in MCP server configs). +### Phase 4: Tool Executor Traits + Mock Implementations (depends on Phase 2) -#### B1: `McpToolExecutor` +**PR scope:** `executor/tools/` module. ```rust #[async_trait] pub trait McpToolExecutor: Send + Sync { - /// Execute an MCP tool call. Session management is internal to the implementation. async fn execute( &self, tool_name: &str, arguments: &Value, server_config: &Value, - ) -> Result; -} -``` - -Implementations manage MCP sessions internally (create/reuse/close keyed on endpoint + headers). The session management traits below are implementation details, not part of the public API: - -```rust -// Internal to MCP executor implementations — not in public API -pub trait McpSessionManager: Send + Sync { - fn get_or_create_session( - &self, - endpoint_key: &str, - server_url: &str, - headers: &HashMap, - ) -> Result, AgenticError>; + ) -> Result; } -#[async_trait] -pub trait McpSession: Send + Sync { - async fn call_tool(&self, name: &str, arguments: &Value) -> Result; - async fn close(&self) -> Result<(), AgenticError>; -} -``` - -#### B2: `McpToolProvider` - -```rust -#[async_trait] -pub trait McpToolProvider: Send + Sync { - async fn list_tools( - &self, - server_url: &str, - headers: &HashMap, - ) -> Result, AgenticError>; -} -``` - -#### B3: `WebSearchProvider` - -```rust #[async_trait] pub trait WebSearchProvider: Send + Sync { - async fn search(&self, query: &str, context_size: ContextSize) -> Result; + async fn search(&self, query: &str, context_size: ContextSize) -> Result; } -pub enum ContextSize { Low, Medium, High } -``` - -#### B4: `VectorStoreClient` - -```rust #[async_trait] pub trait VectorStoreClient: Send + Sync { async fn search( &self, store_id: &str, query: &str, - options: &FileSearchOptions, - ) -> Result, AgenticError>; -} - -pub struct FileSearchOptions { - pub max_num_results: u32, - pub filters: Option, - pub ranking_options: Option, -} -``` - ---- - -### Phase C: Advanced Features - -Optional steps that enhance the loop but aren't required for MVP. - -**Note on ordering:** C1 (`init_store`) and C3 (`parse_tools`) logically run early in the pipeline (before inference). In Phase A without them, `dispatch_tools` works because it reads `state.tool_calls` populated by `transform_stream` from the LLM's output — tool *definitions* are forwarded as-is in `state.tools` from the original request. `parse_tools` adds MCP listing and normalization on top of that basic passthrough. - -#### C1: `init_store` - -```rust -pub async fn init_store( - state: &AgenticState, - store: &dyn ResponseStore, -) -> Result<(), AgenticError> -``` - -Create initial response record (status=queued for background, status=in_progress otherwise). Runs before rehydration so the response ID is persisted early. - -#### C2: `resolve_files` - -```rust -pub async fn resolve_files( - state: &mut AgenticState, - file_store: &dyn FileStore, -) -> Result<(), AgenticError> -``` - -Walk `state.messages`, resolve `file_id` references to inline content via `FileStore` trait. - -#### C3: `parse_tools` - -```rust -pub async fn parse_tools( - state: &mut AgenticState, - mcp_provider: &dyn McpToolProvider, -) -> Result<(), AgenticError> -``` - -Parse tool definitions. For MCP: call `tools/list`, build tool map. Normalize `tool_choice`. Writes to `state.tools` and `state.tool_choice`. - -#### C4: `compact_context` (opt-in via config) - -```rust -pub async fn compact_context( - state: &mut AgenticState, - inference: &dyn InferenceClient, - store: &dyn ResponseStore, -) -> Result -``` - -Token counting + summarization. Only runs when `config.compaction_enabled` is true and threshold is exceeded. Returns `true` if compaction occurred. - -#### C5: `summarize_reasoning` (opt-in via config) - -```rust -pub async fn summarize_reasoning( - state: &mut AgenticState, - inference: &dyn InferenceClient, -) -> Result>, AgenticError> -``` - -Post-streaming reasoning summary generation. Only runs when `config.reasoning_summary_enabled` is true. Runs after the tool loop completes. - -#### C6: `FileStore` trait - -```rust -#[async_trait] -pub trait FileStore: Send + Sync { - async fn get_file( - &self, - file_id: &str, - ) -> Result, AgenticError>; -} - -pub struct FileContent { - pub data: Vec, - pub mime_type: String, - pub filename: Option, + max_results: u32, + ) -> Result, ExecutorError>; } ``` ---- - -## Dependency Bundle - -```rust -pub struct AgenticDeps { - // Required (Phase A) - pub store: Arc, - pub inference: Arc, - - // Phase B: tool executors (None = tool calls treated as client-side) - pub mcp_executor: Option>, - pub web_search: Option>, - pub vector_store: Option>, - - // Phase C: advanced features - pub file_store: Option>, - pub mcp_provider: Option>, -} -``` +This PR includes mock implementations for integration testing (in-memory tool executors that return canned responses). Real implementations (MCP client, Brave search, Qdrant) come in later PRs. -Only `store` and `inference` are required. A minimal deployment (proxy + persistence, no server-side tools) only needs those two. All `Option` fields are populated as their respective phases are implemented — the struct definition is stable across all phases. - -**Note on `McpSessionManager`:** Session lifecycle is internal to the `McpToolExecutor` implementation. The executor manages its own session pool — callers don't need to provide sessions explicitly. This keeps the public API surface simple while allowing different session strategies per implementation. +**Size:** ~500 lines | **Blocked by:** Phase 2 | **Target:** feature PR --- -## Traits - -### `ResponseStore` - -The orchestration-level store trait. PR #33's concrete `ConversationStore` + `ResponseStore` types are the first implementation. This trait abstracts over them for the step functions. - -```rust -#[async_trait] -pub trait ResponseStore: Send + Sync { - async fn get_response(&self, response_id: &str) -> Result, AgenticError>; - async fn insert_response(&self, response: &Value) -> Result<(), AgenticError>; - async fn update_response(&self, response_id: &str, update: &Value) -> Result<(), AgenticError>; - - async fn get_messages(&self, response_id: &str) -> Result>, AgenticError>; - async fn store_messages(&self, response_id: &str, messages: &[Value]) -> Result<(), AgenticError>; - - async fn list_input_items( - &self, response_id: &str, limit: u32, cursor: Option<&str>, - ) -> Result; -} -``` - -**Note:** PR #33 has `ConversationStore` (conversation-level ops) and `ResponseStore` (response-level ops) as separate concrete types. The orchestration trait above unifies them — the implementation delegates to both under the hood. Whether to keep this unified or split into two traits is an open question for review. +## Design Decisions -Implementations: SQLx wrapper around PR #33 (default), OGX (PR #34), InMemory (testing). - -### `InferenceClient` - -```rust -#[async_trait] -pub trait InferenceClient: Send + Sync { - async fn call( - &self, - request: &Value, - config: &AgenticConfig, - ) -> Result<(InferenceStream, BackendFormat), AgenticError>; -} - -pub type InferenceStream = Pin> + Send>>; -``` - -### `AgenticError` - -```rust -#[derive(Debug, thiserror::Error)] -pub enum AgenticError { - #[error("validation: {0}")] - Validation(String), - - #[error("store: {0}")] - Store(#[source] Box), - - #[error("inference: {0}")] - Inference(String), - - #[error("inference timeout after {timeout_s}s")] - InferenceTimeout { timeout_s: f64 }, - - #[error("tool dispatch: {tool_name}: {message}")] - ToolDispatch { tool_name: String, message: String }, - - #[error("max iterations ({max}) reached")] - MaxIterations { max: u32 }, - - #[error("response not found: {0}")] - NotFound(String), - - #[error("stream transform: {0}")] - StreamTransform(String), -} -``` +| # | Decision | Rationale | +|---|----------|-----------| +| D1 | `ToolContext` separate from `ExecutionContext` | Keeps PR #46's struct focused on inference; tool deps are additive | +| D2 | `LoopDecision` carries tool results directly | Avoids mutating shared state between dispatch and re-entry | +| D3 | Streaming tee as separate module, not refactor of accumulator | Preserves PR #46's non-streaming path unchanged | +| D4 | Traits for tool executors, not concrete types | Enables OGX (PR #34), mock testing, and future providers | +| D5 | Phase 1 independent of PR #46 | Unblocks progress while base loop is in review | --- -## Gateway Integration (Praxis) - -Each step function maps to exactly one Praxis filter. The "Filter #" column shows the order in the Praxis filter chain (from @leseb's proposal): - -| Step | Core Function | Praxis Filter # | Praxis Filter Name | Phase | -|------|---------------|----------------|--------------------|-------| -| A1 | `validate_request()` | 0 | `request_validate` | A | -| A2 | `rehydrate_conversation()` | 2 | `rehydrate` | A | -| A3 | `call_inference()` | 5 | `responses_proxy` | A | -| A4 | `transform_stream()` | 6 | `stream_events` | A | -| A5 | `dispatch_tools()` | 7 | `tool_dispatch` | A | -| A6 | `persist_response()` | 13 | `response_store` | A | -| B1 | `McpToolExecutor::execute()` | 8 | `mcp_tool` | B | -| B3 | `WebSearchProvider::search()` | 9 | `web_search` | B | -| B4 | `VectorStoreClient::search()` | 10 | `file_search` | B | -| C1 | `init_store()` | 1 | `response_store` (init) | C | -| C2 | `resolve_files()` | 3 | `file_resolve` | C | -| C3 | `parse_tools()` | 4 | `tool_parse` | C | -| C4 | `compact_context()` | 11 | `compact` | C | -| C5 | `summarize_reasoning()` | 12 | `reasoning` | C | - -**Note:** Phase B entries (B1, B3, B4) are trait method calls invoked internally by A5 (`dispatch_tools`). In Praxis, @leseb's proposal exposes them as separate filters (8, 9, 10) for per-tool-type observability and independent configuration. B2 (`McpToolProvider`) has no corresponding filter — it's a setup-time operation used by C3 (`parse_tools`). - -Tool dispatch uses Praxis branch chains for loop control: -```yaml -- filter: tool_dispatch - branch_chains: - - name: tool-loop - on_result: { filter: tool_dispatch, key: action, result: loop } - rejoin: responses_proxy - max_iterations: 10 -``` +## Praxis Filter Mapping + +How the complete pipeline maps to @leseb's proposed filter chain: + +| # | Praxis Filter | Core Function | Phase | Owner | +|---|---------------|---------------|-------|-------| +| 0 | `request_validate` | `validate_request()` | Future | — | +| 1 | `response_store` (init) | `init_store()` | Future | — | +| 2 | `rehydrate` | `rehydrate_conversation()` | PR #46 | @maralbahari | +| 3 | `file_resolve` | `resolve_files()` | Future | — | +| 4 | `tool_parse` | `parse_tools()` | Future | — | +| 5 | `responses_proxy` | `call_inference()` | PR #46 | @maralbahari | +| 6 | `stream_events` | `transform_stream()` / tee | Phase 3 | @ashwing | +| 7 | `tool_dispatch` | `dispatch_tools()` | Phase 2 | @ashwing | +| 8 | `mcp_tool` | `McpToolExecutor::execute()` | Phase 4 | @ashwing | +| 9 | `web_search` | `WebSearchProvider::search()` | Phase 4 | @ashwing | +| 10 | `file_search` | `VectorStoreClient::search()` | Phase 4 | @ashwing | +| 11 | `compact` | `compact_context()` | Future | — | +| 12 | `reasoning` | `summarize_reasoning()` | Future | — | +| 13 | `response_store` (resp) | `persist_response()` | PR #46 | @maralbahari | --- ## Open Questions -1. **Compact/reasoning as opt-in or mandatory?** Currently proposed as opt-in via `AgenticConfig` flags. If mandatory, they add latency on every request. -2. **Per-tool-type executors: public functions or trait methods?** This proposal keeps them as trait methods called by `dispatch_tools`. Alternative: expose as standalone functions per leseb's proposal. -3. **Praxis #354 status:** Is the filter decomposition accepted? Affects how tightly we couple step numbering. -4. **ResponseStore: unified or split?** PR #33 has separate `ConversationStore` + `ResponseStore`. Should the orchestration trait unify them or keep them separate? -5. **`transform_stream` borrow strategy:** Need to validate in Rust whether `&mut state` + returned stream works, or if we need interior mutability (`RefCell`/`Mutex`) or a consume-and-return pattern. -6. **`AgenticState` field visibility:** All fields are `pub` for simplicity. Should we add accessor methods to enforce read/write contracts per step? +1. **`execute_loop` vs refactoring `execute`:** Should the loop wrapper be a new function or replace PR #46's `execute()`? Pending maralbahari's response on PR #46 review. +2. **Streaming tee ownership model:** `Arc>` vs channel-based accumulation. Will prototype both in Phase 3 PR. +3. **ResponseStore trait unification:** PR #33 has separate `ConversationStore` + `ResponseStore`. Keep separate or unify? Defer until Phase 4 when we need to abstract over them. From 3b11635c530c9f5d112dd6a148e7e6e8b731aa2a Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 3 Jun 2026 17:13:32 -0700 Subject: [PATCH 3/3] docs: address review findings on design doc - Phase 1 correctly depends on PR #46 (accumulator.rs lives there) - call_inference is sync fn returning lazy stream, not async - persist_response takes explicit handler params (noted) - Native async traits instead of #[async_trait] (Rust 1.85) - Removed undefined ContextSize type, use &str - Phase 2 explicitly non-streaming (streaming gated on Phase 3) - Removed max_iterations redundancy from dispatch_tools params - ADR-01 reference reworded as paraphrase not quote Signed-off-by: Ashwin Giridharan --- docs/design/core-public-api.md | 55 +++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/docs/design/core-public-api.md b/docs/design/core-public-api.md index 8d4c1e6..1eee1de 100644 --- a/docs/design/core-public-api.md +++ b/docs/design/core-public-api.md @@ -14,8 +14,8 @@ |----------|------|--------------| | `execute()` | `executor/engine.rs` | Entry point — rehydrate → infer → persist | | `rehydrate_conversation()` | `executor/engine.rs` | Load history from store, build enriched request | -| `call_inference()` | `executor/engine.rs` | Stream SSE from vLLM backend | -| `persist_response()` | `executor/engine.rs` | Save response + items to store | +| `call_inference()` | `executor/engine.rs` | Returns `impl Stream` of SSE lines (sync fn, not async — stream is lazy) | +| `persist_response()` | `executor/engine.rs` | Save response + items to store (takes handlers as explicit params) | | `ResponseAccumulator` | `executor/accumulator.rs` | SSE state machine — collects stream into ResponsePayload | | `ExecutionContext` | `executor/request.rs` | Runtime deps: handlers, HTTP client, LLM URL | | `RequestContext` | `executor/request.rs` | Per-turn state: original + enriched request, IDs | @@ -40,14 +40,16 @@ The base loop handles text messages. This design extends it with: Each phase = one PR with tests. Phases are ordered by dependency. -### Phase 1: SSE Event Extension (no dependency on PR #46) +### Phase 1: SSE Event Extension (lands after PR #46 merges) -**PR scope:** Extend `types/event.rs` and `accumulator.rs` on current main. +**PR scope:** Extend `types/event.rs` and `executor/accumulator.rs` (both introduced by PR #46). - Add ~12 new `SSEEventType` variants for function_call, reasoning, output_item lifecycle -- Extend `ResponseAccumulator::process_sse_line()` to detect and collect `OutputItem::FunctionCall` +- Modify `ResponseAccumulator::process_sse_line()` (private method — changes are within the same file) to detect and collect `OutputItem::FunctionCall` from streaming deltas - Unit tests verifying accumulator correctly builds FunctionCall items from SSE deltas +**Note:** `accumulator.rs` and `types/event.rs` are introduced by PR #46. This phase lands immediately after PR #46 merges — it's a small, focused follow-up with no other dependencies. + ```rust // New variants in SSEEventType pub enum SSEEventType { @@ -70,7 +72,7 @@ pub enum SSEEventType { } ``` -**Size:** ~200 lines | **Blocked by:** nothing | **Target:** 3rd merged PR +**Size:** ~200 lines | **Blocked by:** PR #46 merge | **Target:** 3rd merged PR (fast follow-up) --- @@ -91,22 +93,24 @@ pub async fn dispatch_tools( output: &[OutputItem], tool_ctx: &ToolContext, iteration: usize, - max_iterations: usize, ) -> ExecutorResult +/// Initially non-streaming only (returns Left). Streaming support added in Phase 3. pub async fn execute_loop( request: RequestPayload, exec_ctx: Arc, tool_ctx: &ToolContext, -) -> ExecutorResult> +) -> ExecutorResult ``` -`execute_loop` wraps PR #46's `execute()`: -1. Rehydrate (from PR #46) -2. Call inference (from PR #46) -3. Accumulate response -4. Check output for `OutputItem::FunctionCall` → dispatch → loop or done -5. Persist final response +`execute_loop` wraps PR #46's functions in a tool-dispatch loop: +1. Rehydrate (delegates to PR #46's `rehydrate_conversation`) +2. Call inference (delegates to PR #46's `call_inference` — returns stream lazily) +3. Accumulate response (via `ResponseAccumulator::from_stream`) +4. Check output for `OutputItem::FunctionCall` → `dispatch_tools` → loop or done +5. Persist final response (delegates to PR #46's `persist_response` with explicit handlers) + +**Phase 2 is non-streaming only.** The tool loop inspects the full accumulated response before deciding. Streaming + tool dispatch (forwarding events to client while detecting tool calls) requires Phase 3's tee pattern. `ToolContext` holds optional executor references: @@ -147,7 +151,7 @@ Returns two handles: - `BoxStream` — yields SSE events to client in real-time - `Future` — resolves when stream completes, contains accumulated output for tool detection -This enables ADR-01's requirement: "SSE stream to the client is interleaved with the tool loop — events go out in real time, not buffered until done." +This enables the real-time streaming requirement from ADR-01 §3 — events should reach the client as they arrive, interleaved with the tool loop, rather than buffered until completion. **Size:** ~300 lines | **Blocked by:** PR #46 merge | **Target:** feature PR @@ -158,29 +162,32 @@ This enables ADR-01's requirement: "SSE stream to the client is interleaved with **PR scope:** `executor/tools/` module. ```rust -#[async_trait] +// Native async traits (Rust 1.75+, no #[async_trait] boxing needed) pub trait McpToolExecutor: Send + Sync { - async fn execute( + fn execute( &self, tool_name: &str, arguments: &Value, server_config: &Value, - ) -> Result; + ) -> impl Future> + Send; } -#[async_trait] pub trait WebSearchProvider: Send + Sync { - async fn search(&self, query: &str, context_size: ContextSize) -> Result; + /// context_size: "low" | "medium" | "high" — controls result verbosity + fn search( + &self, + query: &str, + context_size: &str, + ) -> impl Future> + Send; } -#[async_trait] pub trait VectorStoreClient: Send + Sync { - async fn search( + fn search( &self, store_id: &str, query: &str, max_results: u32, - ) -> Result, ExecutorError>; + ) -> impl Future, ExecutorError>> + Send; } ``` @@ -198,7 +205,7 @@ This PR includes mock implementations for integration testing (in-memory tool ex | D2 | `LoopDecision` carries tool results directly | Avoids mutating shared state between dispatch and re-entry | | D3 | Streaming tee as separate module, not refactor of accumulator | Preserves PR #46's non-streaming path unchanged | | D4 | Traits for tool executors, not concrete types | Enables OGX (PR #34), mock testing, and future providers | -| D5 | Phase 1 independent of PR #46 | Unblocks progress while base loop is in review | +| D5 | Phase 1 is a fast follow-up to PR #46 | Small scope, lands immediately after merge — unblocks Phase 2 quickly | ---