diff --git a/.gitignore b/.gitignore index cc70cb0..b113f43 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,11 @@ CLAUDE.md # Integration tests build artifacts integration_tests/target/ +# Secrets / environment +.env +.env.local +.env.*.local + # OS .DS_Store Thumbs.db diff --git a/Cargo.toml b/Cargo.toml index b2102ff..322d808 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,11 +2,13 @@ name = "macp-runtime" version = "0.4.0" edition = "2021" +default-run = "macp-runtime" [features] default = [] rocksdb-backend = ["dep:rocksdb"] redis-backend = ["dep:redis"] +otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp", "dep:tracing-opentelemetry"] [dependencies] tokio = { version = "1", features = ["full"] } @@ -25,6 +27,10 @@ async-trait = "0.1" uuid = { version = "1", features = ["v4"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +opentelemetry = { version = "0.22", optional = true } +opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"], optional = true } +opentelemetry-otlp = { version = "0.15", features = ["tonic"], optional = true } +tracing-opentelemetry = { version = "0.23", optional = true } rocksdb = { version = "0.22", optional = true } redis = { version = "0.27", features = ["tokio-comp", "aio"], optional = true } diff --git a/proto/macp/v1/core.proto b/proto/macp/v1/core.proto index b25dff5..807e386 100644 --- a/proto/macp/v1/core.proto +++ b/proto/macp/v1/core.proto @@ -120,6 +120,12 @@ message CommitmentPayload { string configuration_version = 7; } +message ParticipantActivity { + string participant_id = 1; + int64 last_message_at_unix_ms = 2; + uint32 message_count = 3; +} + message SessionMetadata { string session_id = 1; string mode = 2; @@ -129,6 +135,8 @@ message SessionMetadata { string mode_version = 6; string configuration_version = 7; string policy_version = 8; + repeated string participants = 9; + repeated ParticipantActivity participant_activity = 10; } message GetSessionRequest { diff --git a/scripts/run-e2e.sh b/scripts/run-e2e.sh new file mode 100755 index 0000000..4369d87 --- /dev/null +++ b/scripts/run-e2e.sh @@ -0,0 +1,67 @@ +#!/usr/bin/env bash +# Run MACP Tier 3 E2E tests with real LLM agents. +# +# Usage: +# ./scripts/run-e2e.sh # Run all E2E tests +# ./scripts/run-e2e.sh decision # Run only the decision test +# ./scripts/run-e2e.sh task # Run only the task test +# ./scripts/run-e2e.sh signals # Run only the decision+signals test +# +# Prerequisites: +# 1. Set OPENAI_API_KEY in .env or export it in your shell +# 2. Rust toolchain + protoc installed + +set -euo pipefail +cd "$(dirname "$0")/.." + +# Load .env if present (without overriding existing env vars) +if [ -f .env ]; then + set -a + # shellcheck disable=SC1091 + source .env + set +a + echo "Loaded .env" +fi + +# Check for API key +if [ -z "${OPENAI_API_KEY:-}" ]; then + echo "ERROR: OPENAI_API_KEY is not set." + echo "" + echo " Option 1: Add OPENAI_API_KEY=sk-... to .env" + echo " Option 2: export OPENAI_API_KEY='sk-...'" + exit 1 +fi + +echo "OPENAI_API_KEY is set (${#OPENAI_API_KEY} chars)" +echo "" + +# Build the runtime binary +echo "Building runtime..." +cargo build +echo "" + +# Select test filter +FILTER="" +case "${1:-all}" in + decision) FILTER="real_llm_agents_coordinate_decision" ;; + task) FILTER="real_llm_agents_delegate_task" ;; + signals) FILTER="decision_with_signals_full_flow" ;; + all) FILTER="" ;; + *) + echo "Unknown test: $1" + echo "Usage: $0 [decision|task|signals|all]" + exit 1 + ;; +esac + +echo "Running Tier 3 E2E tests..." +echo "═══════════════════════════════════════════════════════" +echo "" + +cd integration_tests +export MACP_TEST_BINARY=../target/debug/macp-runtime +if [ -n "$FILTER" ]; then + cargo test --test tier3 "$FILTER" -- --ignored --test-threads=1 --show-output +else + cargo test --test tier3 -- --ignored --test-threads=1 --show-output +fi diff --git a/src/main.rs b/src/main.rs index 0366c83..86bb47f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,12 +18,44 @@ use tonic::transport::{Identity, Server, ServerTlsConfig}; #[tokio::main] async fn main() -> Result<(), Box> { - tracing_subscriber::fmt() - .with_env_filter( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), - ) - .init(); + let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); + + #[cfg(feature = "otel")] + { + use opentelemetry::trace::TracerProvider; + use opentelemetry_otlp::WithExportConfig; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + + let otlp_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") + .unwrap_or_else(|_| "http://localhost:4317".into()); + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(&otlp_endpoint); + let provider = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(exporter) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .expect("failed to init OTEL tracer"); + let tracer = provider.tracer("macp-runtime"); + let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + tracing_subscriber::registry() + .with(env_filter) + .with(tracing_subscriber::fmt::layer()) + .with(otel_layer) + .init(); + tracing::info!( + "OpenTelemetry tracing enabled (endpoint: {})", + otlp_endpoint + ); + } + + #[cfg(not(feature = "otel"))] + { + tracing_subscriber::fmt().with_env_filter(env_filter).init(); + } let addr = std::env::var("MACP_BIND_ADDR") .unwrap_or_else(|_| "127.0.0.1:50051".into()) @@ -154,7 +186,7 @@ async fn main() -> Result<(), Box> { mode_registry, )); let security = SecurityLayer::from_env()?; - let svc = MacpServer::new(runtime, security); + let svc = MacpServer::new(Arc::clone(&runtime), security); let allow_insecure = std::env::var("MACP_ALLOW_INSECURE").ok().as_deref() == Some("1"); let tls_cert = std::env::var("MACP_TLS_CERT_PATH").ok(); diff --git a/src/mode/decision.rs b/src/mode/decision.rs index 74e7667..dc74715 100644 --- a/src/mode/decision.rs +++ b/src/mode/decision.rs @@ -287,6 +287,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "agent://orchestrator".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/mode/handoff.rs b/src/mode/handoff.rs index 9c100c3..2cf3ba4 100644 --- a/src/mode/handoff.rs +++ b/src/mode/handoff.rs @@ -242,6 +242,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "owner".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/mode/multi_round.rs b/src/mode/multi_round.rs index 43a4cef..f46e2aa 100644 --- a/src/mode/multi_round.rs +++ b/src/mode/multi_round.rs @@ -200,6 +200,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "coordinator".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/mode/passthrough.rs b/src/mode/passthrough.rs index 210cc37..9943e18 100644 --- a/src/mode/passthrough.rs +++ b/src/mode/passthrough.rs @@ -86,6 +86,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/mode/proposal.rs b/src/mode/proposal.rs index b3c933d..13976eb 100644 --- a/src/mode/proposal.rs +++ b/src/mode/proposal.rs @@ -292,6 +292,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "agent://buyer".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/mode/quorum.rs b/src/mode/quorum.rs index 6af00ff..7b426f9 100644 --- a/src/mode/quorum.rs +++ b/src/mode/quorum.rs @@ -227,6 +227,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "coordinator".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/mode/task.rs b/src/mode/task.rs index f4e5ab1..42c34e9 100644 --- a/src/mode/task.rs +++ b/src/mode/task.rs @@ -309,6 +309,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "planner".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/registry.rs b/src/registry.rs index 3e71911..af34fea 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -105,6 +105,8 @@ impl From for Session { }) .collect(), initiator_sender: session.initiator_sender, + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } } @@ -242,6 +244,8 @@ mod tests { name: "r1".into(), }], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/replay.rs b/src/replay.rs index 3b38bc2..db2ddbd 100644 --- a/src/replay.rs +++ b/src/replay.rs @@ -194,6 +194,8 @@ fn replay_from_start( context: start_payload.context.clone(), roots: start_payload.roots.clone(), initiator_sender: start_entry.sender.clone(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), }; // 4. Call mode.on_session_start(), apply response diff --git a/src/runtime.rs b/src/runtime.rs index be3f47f..b1509ad 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -196,7 +196,7 @@ impl Runtime { ) -> Result { match env.message_type.as_str() { "SessionStart" => self.process_session_start(env, max_open_sessions).await, - "Signal" => self.process_signal(env).await, + "Signal" | "Progress" => self.process_signal(env).await, _ => self.process_message(env).await, } } @@ -272,6 +272,8 @@ impl Runtime { context: start_payload.context.clone(), roots: start_payload.roots.clone(), initiator_sender: env.sender.clone(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), }; let response = mode.on_session_start(&session, env)?; @@ -360,6 +362,7 @@ impl Runtime { // 2. Update in-memory state self.log_store.append(&env.session_id, incoming_entry).await; session.seen_message_ids.insert(env.message_id.clone()); + session.record_participant_activity(&env.sender, chrono::Utc::now().timestamp_millis()); session.apply_mode_response(response); let result_state = session.state.clone(); diff --git a/src/server.rs b/src/server.rs index 32ebf64..ece2ff3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -5,10 +5,10 @@ use macp_runtime::pb::{ Envelope, GetManifestRequest, GetManifestResponse, GetSessionRequest, GetSessionResponse, InitializeRequest, InitializeResponse, ListExtModesRequest, ListExtModesResponse, ListModesRequest, ListModesResponse, ListRootsRequest, ListRootsResponse, - MacpError as PbMacpError, ManifestCapability, ModeRegistryCapability, ProgressCapability, - PromoteModeRequest, PromoteModeResponse, RegisterExtModeRequest, RegisterExtModeResponse, - RootsCapability, RuntimeInfo, SendRequest, SendResponse, SessionMetadata, - SessionState as PbSessionState, SessionsCapability, StreamSessionRequest, + MacpError as PbMacpError, ManifestCapability, ModeRegistryCapability, ParticipantActivity, + ProgressCapability, PromoteModeRequest, PromoteModeResponse, RegisterExtModeRequest, + RegisterExtModeResponse, RootsCapability, RuntimeInfo, SendRequest, SendResponse, + SessionMetadata, SessionState as PbSessionState, SessionsCapability, StreamSessionRequest, StreamSessionResponse, UnregisterExtModeRequest, UnregisterExtModeResponse, WatchModeRegistryRequest, WatchModeRegistryResponse, WatchRootsRequest, WatchRootsResponse, WatchSignalsRequest, WatchSignalsResponse, @@ -404,7 +404,7 @@ impl MacpRuntimeService for MacpServer { cancellation: Some(CancellationCapability { cancel_session: true, }), - progress: Some(ProgressCapability { progress: false }), + progress: Some(ProgressCapability { progress: true }), manifest: Some(ManifestCapability { get_manifest: true }), mode_registry: Some(ModeRegistryCapability { list_modes: true, @@ -475,6 +475,20 @@ impl MacpRuntimeService for MacpServer { .await .ok_or_else(|| Status::not_found(format!("Session '{}' not found", session_id)))?; + let participant_activity = session + .participant_message_counts + .iter() + .map(|(pid, count)| ParticipantActivity { + participant_id: pid.clone(), + last_message_at_unix_ms: session + .participant_last_seen + .get(pid) + .copied() + .unwrap_or(0), + message_count: *count, + }) + .collect(); + Ok(Response::new(GetSessionResponse { metadata: Some(SessionMetadata { session_id: session.session_id.clone(), @@ -485,6 +499,8 @@ impl MacpRuntimeService for MacpServer { mode_version: session.mode_version.clone(), configuration_version: session.configuration_version.clone(), policy_version: session.policy_version.clone(), + participants: session.participants.clone(), + participant_activity, }), })) } diff --git a/src/session.rs b/src/session.rs index 81de2b0..19788f6 100644 --- a/src/session.rs +++ b/src/session.rs @@ -2,7 +2,7 @@ use crate::error::MacpError; use crate::mode::ModeResponse; use crate::pb::SessionStartPayload; use prost::Message; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; pub const MAX_TTL_MS: i64 = 24 * 60 * 60 * 1000; @@ -32,9 +32,20 @@ pub struct Session { pub context: Vec, pub roots: Vec, pub initiator_sender: String, + pub participant_message_counts: HashMap, + pub participant_last_seen: HashMap, } impl Session { + pub fn record_participant_activity(&mut self, sender: &str, timestamp_ms: i64) { + *self + .participant_message_counts + .entry(sender.to_string()) + .or_insert(0) += 1; + self.participant_last_seen + .insert(sender.to_string(), timestamp_ms); + } + pub fn apply_mode_response(&mut self, response: ModeResponse) { match response { ModeResponse::NoOp => {} diff --git a/src/storage/file.rs b/src/storage/file.rs index ee13dcb..bd4c89b 100644 --- a/src/storage/file.rs +++ b/src/storage/file.rs @@ -186,6 +186,8 @@ mod tests { name: "r1".into(), }], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/storage/memory.rs b/src/storage/memory.rs index b755b8d..1628bf8 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -67,6 +67,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/storage/migration.rs b/src/storage/migration.rs index 52d0563..0e2b2de 100644 --- a/src/storage/migration.rs +++ b/src/storage/migration.rs @@ -176,6 +176,8 @@ mod tests { name: "r1".into(), }], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/storage/recovery.rs b/src/storage/recovery.rs index 38953a9..b11c95e 100644 --- a/src/storage/recovery.rs +++ b/src/storage/recovery.rs @@ -73,6 +73,8 @@ mod tests { context: vec![], roots: vec![], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/storage/redis_backend.rs b/src/storage/redis_backend.rs index 97d32c5..9a38771 100644 --- a/src/storage/redis_backend.rs +++ b/src/storage/redis_backend.rs @@ -170,6 +170,8 @@ mod tests { context: vec![9], roots: vec![], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } } diff --git a/src/storage/rocksdb.rs b/src/storage/rocksdb.rs index 00c6326..450860d 100644 --- a/src/storage/rocksdb.rs +++ b/src/storage/rocksdb.rs @@ -258,6 +258,8 @@ mod tests { context: vec![9], roots: vec![], initiator_sender: "alice".into(), + participant_message_counts: std::collections::HashMap::new(), + participant_last_seen: std::collections::HashMap::new(), } }