Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b0272b4
feat: implement storage CRUD layer with SQLx and benchmarks
maralbahari May 21, 2026
473485d
use rust criterion for benchmarking and clean code
maralbahari May 22, 2026
0bb4eb8
clean code
maralbahari May 22, 2026
9ad6c66
cover more unit tests and add integration tests
maralbahari May 22, 2026
c6b9968
Merge remote-tracking branch 'origin/main' into impl-database-crud
maralbahari May 29, 2026
b681865
avoid unnecessary clone
maralbahari May 29, 2026
4a29e6f
move integration test in agentic-core
maralbahari May 29, 2026
0f9d2c3
fix multi-thread unit test and clean the main cargo.toml
maralbahari May 29, 2026
3ec60a2
fix cargo clippy
maralbahari May 29, 2026
06be1e1
fix clippy errors in benchmark
maralbahari May 29, 2026
fdf5696
Merge remote-tracking branch 'origin/main' into agentic-core-executor
maralbahari Jun 3, 2026
ff666a4
feat: implement agentic loop executor (ADR-03)
maralbahari Jun 3, 2026
89f8f8c
add integration test based on pre-recorded cassets from openai
maralbahari Jun 3, 2026
b024ef2
clean code and fix cargo clippy
maralbahari Jun 3, 2026
da38e55
improve error handling
maralbahari Jun 3, 2026
8d2d843
improve apis
maralbahari Jun 3, 2026
080aabe
simplify call_inference
maralbahari Jun 3, 2026
04a0271
fix benchmarking to record per turn pref and merge all benches into s…
maralbahari Jun 4, 2026
2d17980
Merge remote-tracking branch 'origin/main' into agentic-core-executor
maralbahari Jun 4, 2026
fc25c96
fix cargo clippy
maralbahari Jun 4, 2026
734cae1
clean code
maralbahari Jun 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 182 additions & 63 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ pedantic = { level = "warn", priority = -1 }

[workspace.dependencies]
agentic-core = { path = "crates/agentic-core" }
async-stream = "0.3"
axum = "0.8"
either = "1"
bytes = "1"
clap = { version = "4", features = ["derive", "env"] }
criterion = { version = "0.5", features = ["async_tokio"] }
Expand Down
8 changes: 6 additions & 2 deletions crates/agentic-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ license.workspace = true
repository.workspace = true

[dependencies]
async-stream.workspace = true
bytes.workspace = true
either.workspace = true
futures.workspace = true
http.workspace = true
reqwest = { workspace = true, features = ["default-tls", "stream"] }
Expand All @@ -22,12 +24,14 @@ chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1", features = ["v7", "serde"] }

[dev-dependencies]
axum.workspace = true
criterion = { workspace = true }
serde_yaml = "0.9"
tokio = { workspace = true, features = ["full"] }

[[bench]]
name = "storage_crud"
name = "benches"
harness = false


[lints]
workspace = true
6 changes: 6 additions & 0 deletions crates/agentic-core/benches/benches.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod executor_throughput;
mod storage_crud;

use criterion::criterion_main;

criterion_main!(storage_crud::storage_benches, executor_throughput::executor_benches);
304 changes: 304 additions & 0 deletions crates/agentic-core/benches/executor_throughput.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
//! Throughput benchmarks for the executor agentic loop (`execute`).
//!
//! Measures wall-clock time per turn across chain depths 1–N, for both
//! blocking (non-streaming) and streaming execution paths.
//!
//! | Group | What grows with depth |
//! |--------------------|----------------------------------------------------|
//! | `execute/blocking` | rehydrate cost (DB reads) + JSON fetch + persist |
//! | `execute/streaming`| rehydrate cost + SSE accumulate + persist |
//! | `rehydrate_only` | pure rehydrate step, no LLM call |
//!
//! # Configuring max depth
//!
//! Set `BENCH_MAX_DEPTH` before running to control how many depths are swept:
//!
//! ```bash
//! BENCH_MAX_DEPTH=3 cargo bench --bench executor_throughput
//! ```
//!
//! Defaults to 5 when the variable is unset.
//!
//! # Sample size
//!
//! Pass `-- --sample-size=N` (criterion flag) to override the number of
//! iterations criterion collects per benchmark:
//!
//! ```bash
//! cargo bench --bench executor_throughput -- --sample-size=20
//! ```

use std::sync::{Arc, Mutex};

use axum::Router;
use axum::http::header;
use axum::response::IntoResponse;
use axum::routing::post;
use criterion::{BatchSize, BenchmarkId, Criterion, black_box, criterion_group};
use either::Either;
use futures::StreamExt;

use agentic_core::executor::{ConversationHandler, ExecutionContext, ResponseHandler, execute, rehydrate_conversation};
use agentic_core::storage::{ConversationStore, DbPool, ResponseStore, create_pool_with_schema};
use agentic_core::types::io::{ResponsesInput, ToolChoice};
use agentic_core::types::request_response::RequestPayload;

fn max_depth() -> usize {
std::env::var("BENCH_MAX_DEPTH")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(5)
.max(1)
}

const NON_STREAMING_BODY: &str = r#"{
"id": "resp_bench_upstream",
"object": "response",
"created_at": 1700000000,
"status": "completed",
"model": "test-model",
"output": [{
"type": "message",
"id": "msg_bench",
"role": "assistant",
"status": "completed",
"content": [{"type": "output_text", "text": "OK", "annotations": []}]
}],
"usage": {
"input_tokens": 5, "output_tokens": 1, "total_tokens": 6,
"input_tokens_details": {"cached_tokens": 0},
"output_tokens_details": {"reasoning_tokens": 0}
}
}"#;

const STREAMING_BODY: &str = concat!(
"data: {\"type\":\"response.created\",\"response\":{\"id\":\"resp_bench_upstream\",\"status\":\"in_progress\"}}\n\n",
"data: {\"type\":\"response.output_item.added\",\"item\":{\"id\":\"msg_bench\",\"type\":\"message\",\"status\":\"in_progress\",\"content\":[],\"role\":\"assistant\"}}\n\n",
"data: {\"type\":\"response.output_text.delta\",\"delta\":\"OK\"}\n\n",
"data: {\"type\":\"response.completed\",\"response\":{",
"\"id\":\"resp_bench_upstream\",\"object\":\"response\",\"created_at\":1700000000,",
"\"status\":\"completed\",\"model\":\"test-model\",",
"\"output\":[{\"type\":\"message\",\"id\":\"msg_bench\",\"role\":\"assistant\",",
"\"status\":\"completed\",\"content\":[{\"type\":\"output_text\",\"text\":\"OK\",\"annotations\":[]}]}],",
"\"usage\":{\"input_tokens\":5,\"output_tokens\":1,\"total_tokens\":6,",
"\"input_tokens_details\":{\"cached_tokens\":0},",
"\"output_tokens_details\":{\"reasoning_tokens\":0}}",
"}}\n\n",
"data: [DONE]\n\n",
);

fn start_mock_server(rt: &tokio::runtime::Runtime) -> String {
let listener = rt.block_on(async { tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap() });
let addr = listener.local_addr().unwrap();

rt.spawn(async move {
let app = Router::new()
.route(
"/v1/responses",
post(|body: axum::body::Bytes| async move {
let is_stream = serde_json::from_slice::<serde_json::Value>(&body)
.ok()
.and_then(|j| j["stream"].as_bool())
.unwrap_or(false);

if is_stream {
axum::http::Response::builder()
.status(200)
.header(header::CONTENT_TYPE, "text/event-stream; charset=utf-8")
.body(axum::body::Body::from(STREAMING_BODY))
.unwrap()
.into_response()
} else {
axum::http::Response::builder()
.status(200)
.header(header::CONTENT_TYPE, "application/json")
.body(axum::body::Body::from(NON_STREAMING_BODY))
.unwrap()
.into_response()
}
}),
)
.route(
"/v1/conversations",
post(|| async { (axum::http::StatusCode::OK, "{}") }),
);
axum::serve(listener, app).await.ok();
});

format!("http://{addr}")
}

fn make_request(input: &str, stream: bool, prev_id: Option<String>) -> RequestPayload {
RequestPayload {
model: "test-model".to_string(),
input: ResponsesInput::Text(input.to_string()),
instructions: None,
previous_response_id: prev_id,
conversation_id: None,
tools: None,
tool_choice: ToolChoice::Auto,
stream,
store: true,
include: None,
temperature: None,
top_p: None,
max_output_tokens: None,
truncation: None,
metadata: None,
}
}

fn build_exec_ctx(rt: &tokio::runtime::Runtime, mock_url: String) -> (Arc<ExecutionContext>, Arc<DbPool>) {
let pool = rt.block_on(async { create_pool_with_schema(None).await.expect("bench pool creation failed") });
let conv_handler = ConversationHandler::new(ConversationStore::new(pool.clone()));
let resp_handler = ResponseHandler::new(ResponseStore::new(pool.clone()));
let client = Arc::new(reqwest::Client::new());
let exec_ctx = Arc::new(ExecutionContext::new(
conv_handler,
resp_handler,
client,
mock_url,
None,
));
(exec_ctx, pool)
}

/// Delete all rows from every table so the next bench group starts with a
/// clean state. Accumulated rows from setup closures are removed; this
/// prevents cross-contamination between groups and unbounded DB growth.
fn clear_db(rt: &tokio::runtime::Runtime, pool: &DbPool) {
rt.block_on(async {
sqlx::query("DELETE FROM items").execute(pool).await.ok();
sqlx::query("DELETE FROM responses").execute(pool).await.ok();
sqlx::query("DELETE FROM conversations").execute(pool).await.ok();
});
}

/// Build a chain of `depth - 1` non-streaming turns and return the last
/// response ID. Called in the setup closure — cost does NOT count toward the
/// benchmark measurement.
async fn seed_chain(exec_ctx: &Arc<ExecutionContext>, depth: usize) -> Option<String> {
let mut prev_id: Option<String> = None;
for i in 0..depth.saturating_sub(1) {
let req = make_request(&format!("seed {i}"), false, prev_id.take());
if let Either::Left(p) = execute(req, Arc::clone(exec_ctx)).await.expect("seed") {
prev_id = Some(p.id);
}
}
prev_id
}

// Bench: blocking path, depths 1–max_depth
//
// The chain of N-1 prior turns is seeded with `rt.block_on()` BEFORE criterion
// starts the measurement loop, so only turn N is timed.
fn bench_execute_blocking(c: &mut Criterion, exec_ctx: &Arc<ExecutionContext>) {
let mut group = c.benchmark_group("execute/blocking");
let rt = tokio::runtime::Runtime::new().unwrap();

for depth in 1..=max_depth() {
// Pre-seed N-1 turns outside criterion — their cost is NOT measured.
let prev_id = rt.block_on(seed_chain(exec_ctx, depth));

group.bench_with_input(BenchmarkId::new("turns", depth), &depth, |b, _| {
b.to_async(tokio::runtime::Runtime::new().unwrap()).iter_batched(
// Synchronous setup: just hand the pre-seeded prev_id to each sample.
|| prev_id.clone(),
|prev_id| {
let exec_ctx = Arc::clone(exec_ctx);
async move {
let req = make_request("bench turn", false, black_box(prev_id));
execute(req, exec_ctx).await.expect("execute")
}
},
BatchSize::SmallInput,
);
});
}
group.finish();
}

// Bench: streaming path, depths 1–max_depth (same pre-seed approach).
fn bench_execute_streaming(c: &mut Criterion, exec_ctx: &Arc<ExecutionContext>) {
let mut group = c.benchmark_group("execute/streaming");
let rt = tokio::runtime::Runtime::new().unwrap();

for depth in 1..=max_depth() {
let prev_id = rt.block_on(seed_chain(exec_ctx, depth));

group.bench_with_input(BenchmarkId::new("turns", depth), &depth, |b, _| {
b.to_async(tokio::runtime::Runtime::new().unwrap()).iter_batched(
|| prev_id.clone(),
|prev_id| {
let exec_ctx = Arc::clone(exec_ctx);
async move {
let req = make_request("bench turn", true, black_box(prev_id));
let result = execute(req, exec_ctx).await.expect("execute");
if let Either::Right(stream) = result {
let mut stream = Box::pin(stream);
while stream.next().await.is_some() {}
}
}
},
BatchSize::SmallInput,
);
});
}
group.finish();
}

fn bench_rehydrate_only(c: &mut Criterion, exec_ctx: &Arc<ExecutionContext>) {
let mut group = c.benchmark_group("rehydrate_only");

// Grow the shared chain incrementally so deeper depths include all prior
// history items; the chain_tip tracks the latest response ID.
let chain_tip: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let rt = tokio::runtime::Runtime::new().unwrap();

for depth in 1..=max_depth() {
// Extend the chain to `depth` turns if not already deep enough.
rt.block_on(async {
let has_tip = chain_tip.lock().unwrap().is_some();
if depth == 1 || !has_tip {
let prev_id = chain_tip.lock().unwrap().clone();
let req = make_request("seed", false, prev_id);
if let Either::Left(p) = execute(req, Arc::clone(exec_ctx)).await.expect("seed") {
*chain_tip.lock().unwrap() = Some(p.id);
}
}
});

group.bench_with_input(BenchmarkId::new("prev_response_depth", depth), &depth, |b, _| {
b.to_async(tokio::runtime::Runtime::new().unwrap()).iter_batched(
|| chain_tip.lock().unwrap().clone(),
|prev_id| {
let exec_ctx = Arc::clone(exec_ctx);
async move {
let req = make_request("bench", false, black_box(prev_id));
rehydrate_conversation(req, &exec_ctx).await.expect("rehydrate")
}
},
BatchSize::SmallInput,
);
});
}

group.finish();
}

fn init_benches(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let mock_url = start_mock_server(&rt);
let (exec_ctx, pool) = build_exec_ctx(&rt, mock_url);

bench_execute_blocking(c, &exec_ctx);
clear_db(&rt, &pool);

bench_execute_streaming(c, &exec_ctx);
clear_db(&rt, &pool);

bench_rehydrate_only(c, &exec_ctx);
clear_db(&rt, &pool);
}

criterion_group!(executor_benches, init_benches);
5 changes: 2 additions & 3 deletions crates/agentic-core/benches/storage_crud.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use criterion::{BatchSize, Criterion, black_box, criterion_group, criterion_main};
use criterion::{BatchSize, Criterion, black_box, criterion_group};

use agentic_core::storage::{ConversationStore, InOutItem, ResponseMetadata, ResponseStore, create_pool_with_schema};
use agentic_core::types::io::{InputItem, InputMessage, InputMessageContent, OutputItem, OutputMessage};
Expand Down Expand Up @@ -205,5 +205,4 @@ fn init_benches(c: &mut Criterion) {
});
}

criterion_group!(benches, init_benches);
criterion_main!(benches);
criterion_group!(storage_benches, init_benches);
Loading