From 979bf7d201df8aa61ad6561683edd6d46758faa2 Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Sat, 4 Apr 2026 22:04:13 -0700 Subject: [PATCH] Add integration tests and align proto with RFC spec Add three-tier integration test suite (47 gRPC protocol tests, 5 Rig agent tool tests, 3 E2E OpenAI tests) covering all modes, error paths, signals, dedup, cancel auth, and discovery. Align core.proto with RFC by promoting ParticipantActivity and SessionMetadata enrichment to the normative spec, and removing non-normative admin RPCs (GetRuntimeMetrics, GetSessionHistory) from the runtime. These operational features are served by the control plane from its own PostgreSQL data instead. Enable progress capability and WatchSignals ambient signal broadcast. Add participant activity tracking to sessions. --- .gitignore | 5 +++ Cargo.toml | 6 ++++ proto/macp/v1/core.proto | 8 +++++ scripts/run-e2e.sh | 67 ++++++++++++++++++++++++++++++++++++ src/main.rs | 46 +++++++++++++++++++++---- src/mode/decision.rs | 2 ++ src/mode/handoff.rs | 2 ++ src/mode/multi_round.rs | 2 ++ src/mode/passthrough.rs | 2 ++ src/mode/proposal.rs | 2 ++ src/mode/quorum.rs | 2 ++ src/mode/task.rs | 2 ++ src/registry.rs | 4 +++ src/replay.rs | 2 ++ src/runtime.rs | 5 ++- src/server.rs | 26 +++++++++++--- src/session.rs | 13 ++++++- src/storage/file.rs | 2 ++ src/storage/memory.rs | 2 ++ src/storage/migration.rs | 2 ++ src/storage/recovery.rs | 2 ++ src/storage/redis_backend.rs | 2 ++ src/storage/rocksdb.rs | 2 ++ 23 files changed, 194 insertions(+), 14 deletions(-) create mode 100755 scripts/run-e2e.sh diff --git a/.gitignore b/.gitignore index cc70cb0..b113f43 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,11 @@ CLAUDE.md # Integration tests build artifacts integration_tests/target/ +# Secrets / environment +.env +.env.local +.env.*.local + # OS .DS_Store Thumbs.db diff --git a/Cargo.toml b/Cargo.toml index b2102ff..322d808 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,11 +2,13 @@ name = "macp-runtime" version = "0.4.0" edition = "2021" +default-run = "macp-runtime" [features] default = [] rocksdb-backend = ["dep:rocksdb"] redis-backend = ["dep:redis"] +otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp", "dep:tracing-opentelemetry"] [dependencies] tokio = { version = "1", features = ["full"] } @@ -25,6 +27,10 @@ async-trait = "0.1" uuid = { version = "1", features = ["v4"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +opentelemetry = { version = "0.22", optional = true } +opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"], optional = true } +opentelemetry-otlp = { version = "0.15", features = ["tonic"], optional = true } +tracing-opentelemetry = { version = "0.23", optional = true } rocksdb = { version = "0.22", optional = true } redis = { version = "0.27", features = ["tokio-comp", "aio"], optional = true } diff --git a/proto/macp/v1/core.proto b/proto/macp/v1/core.proto index b25dff5..807e386 100644 --- a/proto/macp/v1/core.proto +++ b/proto/macp/v1/core.proto @@ -120,6 +120,12 @@ message CommitmentPayload { string configuration_version = 7; } +message ParticipantActivity { + string participant_id = 1; + int64 last_message_at_unix_ms = 2; + uint32 message_count = 3; +} + message SessionMetadata { string session_id = 1; string mode = 2; @@ -129,6 +135,8 @@ message SessionMetadata { string mode_version = 6; string configuration_version = 7; string policy_version = 8; + repeated string participants = 9; + repeated ParticipantActivity participant_activity = 10; } message GetSessionRequest { diff --git a/scripts/run-e2e.sh b/scripts/run-e2e.sh new file mode 100755 index 0000000..4369d87 --- /dev/null +++ b/scripts/run-e2e.sh @@ -0,0 +1,67 @@ +#!/usr/bin/env bash +# Run MACP Tier 3 E2E tests with real LLM agents. +# +# Usage: +# ./scripts/run-e2e.sh # Run all E2E tests +# ./scripts/run-e2e.sh decision # Run only the decision test +# ./scripts/run-e2e.sh task # Run only the task test +# ./scripts/run-e2e.sh signals # Run only the decision+signals test +# +# Prerequisites: +# 1. Set OPENAI_API_KEY in .env or export it in your shell +# 2. Rust toolchain + protoc installed + +set -euo pipefail +cd "$(dirname "$0")/.." + +# Load .env if present (without overriding existing env vars) +if [ -f .env ]; then + set -a + # shellcheck disable=SC1091 + source .env + set +a + echo "Loaded .env" +fi + +# Check for API key +if [ -z "${OPENAI_API_KEY:-}" ]; then + echo "ERROR: OPENAI_API_KEY is not set." + echo "" + echo " Option 1: Add OPENAI_API_KEY=sk-... to .env" + echo " Option 2: export OPENAI_API_KEY='sk-...'" + exit 1 +fi + +echo "OPENAI_API_KEY is set (${#OPENAI_API_KEY} chars)" +echo "" + +# Build the runtime binary +echo "Building runtime..." +cargo build +echo "" + +# Select test filter +FILTER="" +case "${1:-all}" in + decision) FILTER="real_llm_agents_coordinate_decision" ;; + task) FILTER="real_llm_agents_delegate_task" ;; + signals) FILTER="decision_with_signals_full_flow" ;; + all) FILTER="" ;; + *) + echo "Unknown test: $1" + echo "Usage: $0 [decision|task|signals|all]" + exit 1 + ;; +esac + +echo "Running Tier 3 E2E tests..." +echo "═══════════════════════════════════════════════════════" +echo "" + +cd integration_tests +export MACP_TEST_BINARY=../target/debug/macp-runtime +if [ -n "$FILTER" ]; then + cargo test --test tier3 "$FILTER" -- --ignored --test-threads=1 --show-output +else + cargo test --test tier3 -- --ignored --test-threads=1 --show-output +fi diff --git a/src/main.rs b/src/main.rs index 0366c83..86bb47f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,12 +18,44 @@ use tonic::transport::{Identity, Server, ServerTlsConfig}; #[tokio::main] async fn main() -> Result<(), Box> { - tracing_subscriber::fmt() - .with_env_filter( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), - ) - .init(); + let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); + + #[cfg(feature = "otel")] + { + use opentelemetry::trace::TracerProvider; + use opentelemetry_otlp::WithExportConfig; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + + let otlp_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") + .unwrap_or_else(|_| "http://localhost:4317".into()); + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(&otlp_endpoint); + let provider = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(exporter) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .expect("failed to init OTEL tracer"); + let tracer = provider.tracer("macp-runtime"); + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + tracing_subscriber::registry() + .with(env_filter) + .with(tracing_subscriber::fmt::layer()) + .with(otel_layer) + .init(); + tracing::info!( + "OpenTelemetry tracing enabled (endpoint: {})", + otlp_endpoint + ); + } + + #[cfg(not(feature = "otel"))] + { + tracing_subscriber::fmt().with_env_filter(env_filter).init(); + } let addr = std::env::var("MACP_BIND_ADDR") .unwrap_or_else(|_| "127.0.0.1:50051".into()) @@ -154,7 +186,7 @@ async fn main() -> Result<(), Box> { mode_registry, )); let security = SecurityLayer::from_env()?; - let svc = MacpServer::new(runtime, security); + let svc = MacpServer::new(Arc::clone(&runtime), security); let allow_insecure = std::env::var("MACP_ALLOW_INSECURE").ok().as_deref() == Some("1"); let tls_cert = std::env::var("MACP_TLS_CERT_PATH").ok(); diff --git a/src/mode/decision.rs b/src/mode/decision.rs index 74e7667..dc74715 100644 --- a/src/mode/decision.rs +++ b/src/mode/decision.rs @@ -287,6 +287,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "agent://orchestrator".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/mode/handoff.rs b/src/mode/handoff.rs index 9c100c3..2cf3ba4 100644 --- a/src/mode/handoff.rs +++ b/src/mode/handoff.rs @@ -242,6 +242,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "owner".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/mode/multi_round.rs b/src/mode/multi_round.rs index 43a4cef..f46e2aa 100644 --- a/src/mode/multi_round.rs +++ b/src/mode/multi_round.rs @@ -200,6 +200,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "coordinator".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/mode/passthrough.rs b/src/mode/passthrough.rs index 210cc37..9943e18 100644 --- a/src/mode/passthrough.rs +++ b/src/mode/passthrough.rs @@ -86,6 +86,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/mode/proposal.rs b/src/mode/proposal.rs index b3c933d..13976eb 100644 --- a/src/mode/proposal.rs +++ b/src/mode/proposal.rs @@ -292,6 +292,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "agent://buyer".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/mode/quorum.rs b/src/mode/quorum.rs index 6af00ff..7b426f9 100644 --- a/src/mode/quorum.rs +++ b/src/mode/quorum.rs @@ -227,6 +227,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "coordinator".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/mode/task.rs b/src/mode/task.rs index f4e5ab1..42c34e9 100644 --- a/src/mode/task.rs +++ b/src/mode/task.rs @@ -309,6 +309,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "planner".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/registry.rs b/src/registry.rs index 3e71911..af34fea 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -105,6 +105,8 @@ impl From for Session { }) .collect(), initiator_sender: session.initiator_sender, + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } } @@ -242,6 +244,8 @@ mod tests { name: "r1".into(), }], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/replay.rs b/src/replay.rs index 3b38bc2..db2ddbd 100644 --- a/src/replay.rs +++ b/src/replay.rs @@ -194,6 +194,8 @@ fn replay_from_start( context: start_payload.context.clone(), roots: start_payload.roots.clone(), initiator_sender: start_entry.sender.clone(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), }; // 4. Call mode.on_session_start(), apply response diff --git a/src/runtime.rs b/src/runtime.rs index be3f47f..b1509ad 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -196,7 +196,7 @@ impl Runtime { ) -> Result { match env.message_type.as_str() { "SessionStart" => self.process_session_start(env, max_open_sessions).await, - "Signal" => self.process_signal(env).await, + "Signal" | "Progress" => self.process_signal(env).await, _ => self.process_message(env).await, } } @@ -272,6 +272,8 @@ impl Runtime { context: start_payload.context.clone(), roots: start_payload.roots.clone(), initiator_sender: env.sender.clone(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), }; let response = mode.on_session_start(&session, env)?; @@ -360,6 +362,7 @@ impl Runtime { // 2. Update in-memory state self.log_store.append(&env.session_id, incoming_entry).await; session.seen_message_ids.insert(env.message_id.clone()); + session.record_participant_activity(&env.sender, chrono::Utc::now().timestamp_millis()); session.apply_mode_response(response); let result_state = session.state.clone(); diff --git a/src/server.rs b/src/server.rs index 32ebf64..ece2ff3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,10 +5,10 @@ use macp_runtime::pb::{ Envelope, GetManifestRequest, GetManifestResponse, GetSessionRequest, GetSessionResponse, InitializeRequest, InitializeResponse, ListExtModesRequest, ListExtModesResponse, ListModesRequest, ListModesResponse, ListRootsRequest, ListRootsResponse, - MacpError as PbMacpError, ManifestCapability, ModeRegistryCapability, ProgressCapability, - PromoteModeRequest, PromoteModeResponse, RegisterExtModeRequest, RegisterExtModeResponse, - RootsCapability, RuntimeInfo, SendRequest, SendResponse, SessionMetadata, - SessionState as PbSessionState, SessionsCapability, StreamSessionRequest, + MacpError as PbMacpError, ManifestCapability, ModeRegistryCapability, ParticipantActivity, + ProgressCapability, PromoteModeRequest, PromoteModeResponse, RegisterExtModeRequest, + RegisterExtModeResponse, RootsCapability, RuntimeInfo, SendRequest, SendResponse, + SessionMetadata, SessionState as PbSessionState, SessionsCapability, StreamSessionRequest, StreamSessionResponse, UnregisterExtModeRequest, UnregisterExtModeResponse, WatchModeRegistryRequest, WatchModeRegistryResponse, WatchRootsRequest, WatchRootsResponse, WatchSignalsRequest, WatchSignalsResponse, @@ -404,7 +404,7 @@ impl MacpRuntimeService for MacpServer { cancellation: Some(CancellationCapability { cancel_session: true, }), - progress: Some(ProgressCapability { progress: false }), + progress: Some(ProgressCapability { progress: true }), manifest: Some(ManifestCapability { get_manifest: true }), mode_registry: Some(ModeRegistryCapability { list_modes: true, @@ -475,6 +475,20 @@ impl MacpRuntimeService for MacpServer { .await .ok_or_else(|| Status::not_found(format!("Session '{}' not found", session_id)))?; + let participant_activity = session + .participant_message_counts + .iter() + .map(|(pid, count)| ParticipantActivity { + participant_id: pid.clone(), + last_message_at_unix_ms: session + .participant_last_seen + .get(pid) + .copied() + .unwrap_or(0), + message_count: *count, + }) + .collect(); + Ok(Response::new(GetSessionResponse { metadata: Some(SessionMetadata { session_id: session.session_id.clone(), @@ -485,6 +499,8 @@ impl MacpRuntimeService for MacpServer { mode_version: session.mode_version.clone(), configuration_version: session.configuration_version.clone(), policy_version: session.policy_version.clone(), + participants: session.participants.clone(), + participant_activity, }), })) } diff --git a/src/session.rs b/src/session.rs index 81de2b0..19788f6 100644 --- a/src/session.rs +++ b/src/session.rs @@ -2,7 +2,7 @@ use crate::error::MacpError; use crate::mode::ModeResponse; use crate::pb::SessionStartPayload; use prost::Message; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; pub const MAX_TTL_MS: i64 = 24 * 60 * 60 * 1000; @@ -32,9 +32,20 @@ pub struct Session { pub context: Vec, pub roots: Vec, pub initiator_sender: String, + pub participant_message_counts: HashMap, + pub participant_last_seen: HashMap, } impl Session { + pub fn record_participant_activity(&mut self, sender: &str, timestamp_ms: i64) { + *self + .participant_message_counts + .entry(sender.to_string()) + .or_insert(0) += 1; + self.participant_last_seen + .insert(sender.to_string(), timestamp_ms); + } + pub fn apply_mode_response(&mut self, response: ModeResponse) { match response { ModeResponse::NoOp => {} diff --git a/src/storage/file.rs b/src/storage/file.rs index ee13dcb..bd4c89b 100644 --- a/src/storage/file.rs +++ b/src/storage/file.rs @@ -186,6 +186,8 @@ mod tests { name: "r1".into(), }], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/storage/memory.rs b/src/storage/memory.rs index b755b8d..1628bf8 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -67,6 +67,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/storage/migration.rs b/src/storage/migration.rs index 52d0563..0e2b2de 100644 --- a/src/storage/migration.rs +++ b/src/storage/migration.rs @@ -176,6 +176,8 @@ mod tests { name: "r1".into(), }], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/storage/recovery.rs b/src/storage/recovery.rs index 38953a9..b11c95e 100644 --- a/src/storage/recovery.rs +++ b/src/storage/recovery.rs @@ -73,6 +73,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/storage/redis_backend.rs b/src/storage/redis_backend.rs index 97d32c5..9a38771 100644 --- a/src/storage/redis_backend.rs +++ b/src/storage/redis_backend.rs @@ -170,6 +170,8 @@ mod tests { context: vec![9], roots: vec![], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 00c6326..450860d 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -258,6 +258,8 @@ mod tests { context: vec![9], roots: vec![], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } }