Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ CLAUDE.md
# Integration tests build artifacts
integration_tests/target/

# Secrets / environment
.env
.env.local
.env.*.local

# OS
.DS_Store
Thumbs.db
Expand Down
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
name = "macp-runtime"
version = "0.4.0"
edition = "2021"
default-run = "macp-runtime"

[features]
default = []
rocksdb-backend = ["dep:rocksdb"]
redis-backend = ["dep:redis"]
otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp", "dep:tracing-opentelemetry"]

[dependencies]
tokio = { version = "1", features = ["full"] }
Expand All @@ -25,6 +27,10 @@ async-trait = "0.1"
uuid = { version = "1", features = ["v4"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
opentelemetry = { version = "0.22", optional = true }
opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"], optional = true }
opentelemetry-otlp = { version = "0.15", features = ["tonic"], optional = true }
tracing-opentelemetry = { version = "0.23", optional = true }
rocksdb = { version = "0.22", optional = true }
redis = { version = "0.27", features = ["tokio-comp", "aio"], optional = true }

Expand Down
8 changes: 8 additions & 0 deletions proto/macp/v1/core.proto
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ message CommitmentPayload {
string configuration_version = 7;
}

message ParticipantActivity {
string participant_id = 1;
int64 last_message_at_unix_ms = 2;
uint32 message_count = 3;
}

message SessionMetadata {
string session_id = 1;
string mode = 2;
Expand All @@ -129,6 +135,8 @@ message SessionMetadata {
string mode_version = 6;
string configuration_version = 7;
string policy_version = 8;
repeated string participants = 9;
repeated ParticipantActivity participant_activity = 10;
}

message GetSessionRequest {
Expand Down
67 changes: 67 additions & 0 deletions scripts/run-e2e.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env bash
# Run MACP Tier 3 E2E tests with real LLM agents.
#
# Usage:
# ./scripts/run-e2e.sh # Run all E2E tests
# ./scripts/run-e2e.sh decision # Run only the decision test
# ./scripts/run-e2e.sh task # Run only the task test
# ./scripts/run-e2e.sh signals # Run only the decision+signals test
#
# Prerequisites:
# 1. Set OPENAI_API_KEY in .env or export it in your shell
# 2. Rust toolchain + protoc installed

set -euo pipefail
cd "$(dirname "$0")/.."

# Load .env if present (without overriding existing env vars)
if [ -f .env ]; then
set -a
# shellcheck disable=SC1091
source .env
set +a
echo "Loaded .env"
fi

# Check for API key
if [ -z "${OPENAI_API_KEY:-}" ]; then
echo "ERROR: OPENAI_API_KEY is not set."
echo ""
echo " Option 1: Add OPENAI_API_KEY=sk-... to .env"
echo " Option 2: export OPENAI_API_KEY='sk-...'"
exit 1
fi

echo "OPENAI_API_KEY is set (${#OPENAI_API_KEY} chars)"
echo ""

# Build the runtime binary
echo "Building runtime..."
cargo build
echo ""

# Select test filter
FILTER=""
case "${1:-all}" in
decision) FILTER="real_llm_agents_coordinate_decision" ;;
task) FILTER="real_llm_agents_delegate_task" ;;
signals) FILTER="decision_with_signals_full_flow" ;;
all) FILTER="" ;;
*)
echo "Unknown test: $1"
echo "Usage: $0 [decision|task|signals|all]"
exit 1
;;
esac

echo "Running Tier 3 E2E tests..."
echo "═══════════════════════════════════════════════════════"
echo ""

cd integration_tests
export MACP_TEST_BINARY=../target/debug/macp-runtime
if [ -n "$FILTER" ]; then
cargo test --test tier3 "$FILTER" -- --ignored --test-threads=1 --show-output
else
cargo test --test tier3 -- --ignored --test-threads=1 --show-output
fi
46 changes: 39 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,44 @@ use tonic::transport::{Identity, Server, ServerTlsConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));

#[cfg(feature = "otel")]
{
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

let otlp_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT")
.unwrap_or_else(|_| "http://localhost:4317".into());
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(&otlp_endpoint);
let provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(exporter)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("failed to init OTEL tracer");
let tracer = provider.tracer("macp-runtime");
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);

tracing_subscriber::registry()
.with(env_filter)
.with(tracing_subscriber::fmt::layer())
.with(otel_layer)
.init();
tracing::info!(
"OpenTelemetry tracing enabled (endpoint: {})",
otlp_endpoint
);
}

#[cfg(not(feature = "otel"))]
{
tracing_subscriber::fmt().with_env_filter(env_filter).init();
}

let addr = std::env::var("MACP_BIND_ADDR")
.unwrap_or_else(|_| "127.0.0.1:50051".into())
Expand Down Expand Up @@ -154,7 +186,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
mode_registry,
));
let security = SecurityLayer::from_env()?;
let svc = MacpServer::new(runtime, security);
let svc = MacpServer::new(Arc::clone(&runtime), security);

let allow_insecure = std::env::var("MACP_ALLOW_INSECURE").ok().as_deref() == Some("1");
let tls_cert = std::env::var("MACP_TLS_CERT_PATH").ok();
Expand Down
2 changes: 2 additions & 0 deletions src/mode/decision.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ mod tests {
context: vec![],
roots: vec![],
initiator_sender: "agent://orchestrator".into(),
participant_message_counts: std::collections::HashMap::new(),
participant_last_seen: std::collections::HashMap::new(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/mode/handoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ mod tests {
context: vec![],
roots: vec![],
initiator_sender: "owner".into(),
participant_message_counts: std::collections::HashMap::new(),
participant_last_seen: std::collections::HashMap::new(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/mode/multi_round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ mod tests {
context: vec![],
roots: vec![],
initiator_sender: "coordinator".into(),
participant_message_counts: std::collections::HashMap::new(),
participant_last_seen: std::collections::HashMap::new(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/mode/passthrough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ mod tests {
context: vec![],
roots: vec![],
initiator_sender: "alice".into(),
participant_message_counts: std::collections::HashMap::new(),
participant_last_seen: std::collections::HashMap::new(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/mode/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ mod tests {
context: vec![],
roots: vec![],
initiator_sender: "agent://buyer".into(),
participant_message_counts: std::collections::HashMap::new(),
participant_last_seen: std::collections::HashMap::new(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/mode/quorum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ mod tests {
context: vec![],
roots: vec![],
initiator_sender: "coordinator".into(),
participant_message_counts: std::collections::HashMap::new(),
participant_last_seen: std::collections::HashMap::new(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/mode/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ mod tests {
context: vec![],
roots: vec![],
initiator_sender: "planner".into(),
participant_message_counts: std::collections::HashMap::new(),
participant_last_seen: std::collections::HashMap::new(),
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ impl From<PersistedSession> for Session {
})
.collect(),
initiator_sender: session.initiator_sender,
participant_message_counts: std::collections::HashMap::new(),
participant_last_seen: std::collections::HashMap::new(),
}
}
}
Expand Down Expand Up @@ -242,6 +244,8 @@ mod tests {
name: "r1".into(),
}],
initiator_sender: "alice".into(),
participant_message_counts: std::collections::HashMap::new(),
participant_last_seen: std::collections::HashMap::new(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ fn replay_from_start(
context: start_payload.context.clone(),
roots: start_payload.roots.clone(),
initiator_sender: start_entry.sender.clone(),
participant_message_counts: std::collections::HashMap::new(),
participant_last_seen: std::collections::HashMap::new(),
};

// 4. Call mode.on_session_start(), apply response
Expand Down
5 changes: 4 additions & 1 deletion src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl Runtime {
) -> Result<ProcessResult, MacpError> {
match env.message_type.as_str() {
"SessionStart" => self.process_session_start(env, max_open_sessions).await,
"Signal" => self.process_signal(env).await,
"Signal" | "Progress" => self.process_signal(env).await,
_ => self.process_message(env).await,
}
}
Expand Down Expand Up @@ -272,6 +272,8 @@ impl Runtime {
context: start_payload.context.clone(),
roots: start_payload.roots.clone(),
initiator_sender: env.sender.clone(),
participant_message_counts: std::collections::HashMap::new(),
participant_last_seen: std::collections::HashMap::new(),
};

let response = mode.on_session_start(&session, env)?;
Expand Down Expand Up @@ -360,6 +362,7 @@ impl Runtime {
// 2. Update in-memory state
self.log_store.append(&env.session_id, incoming_entry).await;
session.seen_message_ids.insert(env.message_id.clone());
session.record_participant_activity(&env.sender, chrono::Utc::now().timestamp_millis());
session.apply_mode_response(response);
let result_state = session.state.clone();

Expand Down
26 changes: 21 additions & 5 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use macp_runtime::pb::{
Envelope, GetManifestRequest, GetManifestResponse, GetSessionRequest, GetSessionResponse,
InitializeRequest, InitializeResponse, ListExtModesRequest, ListExtModesResponse,
ListModesRequest, ListModesResponse, ListRootsRequest, ListRootsResponse,
MacpError as PbMacpError, ManifestCapability, ModeRegistryCapability, ProgressCapability,
PromoteModeRequest, PromoteModeResponse, RegisterExtModeRequest, RegisterExtModeResponse,
RootsCapability, RuntimeInfo, SendRequest, SendResponse, SessionMetadata,
SessionState as PbSessionState, SessionsCapability, StreamSessionRequest,
MacpError as PbMacpError, ManifestCapability, ModeRegistryCapability, ParticipantActivity,
ProgressCapability, PromoteModeRequest, PromoteModeResponse, RegisterExtModeRequest,
RegisterExtModeResponse, RootsCapability, RuntimeInfo, SendRequest, SendResponse,
SessionMetadata, SessionState as PbSessionState, SessionsCapability, StreamSessionRequest,
StreamSessionResponse, UnregisterExtModeRequest, UnregisterExtModeResponse,
WatchModeRegistryRequest, WatchModeRegistryResponse, WatchRootsRequest, WatchRootsResponse,
WatchSignalsRequest, WatchSignalsResponse,
Expand Down Expand Up @@ -404,7 +404,7 @@ impl MacpRuntimeService for MacpServer {
cancellation: Some(CancellationCapability {
cancel_session: true,
}),
progress: Some(ProgressCapability { progress: false }),
progress: Some(ProgressCapability { progress: true }),
manifest: Some(ManifestCapability { get_manifest: true }),
mode_registry: Some(ModeRegistryCapability {
list_modes: true,
Expand Down Expand Up @@ -475,6 +475,20 @@ impl MacpRuntimeService for MacpServer {
.await
.ok_or_else(|| Status::not_found(format!("Session '{}' not found", session_id)))?;

let participant_activity = session
.participant_message_counts
.iter()
.map(|(pid, count)| ParticipantActivity {
participant_id: pid.clone(),
last_message_at_unix_ms: session
.participant_last_seen
.get(pid)
.copied()
.unwrap_or(0),
message_count: *count,
})
.collect();

Ok(Response::new(GetSessionResponse {
metadata: Some(SessionMetadata {
session_id: session.session_id.clone(),
Expand All @@ -485,6 +499,8 @@ impl MacpRuntimeService for MacpServer {
mode_version: session.mode_version.clone(),
configuration_version: session.configuration_version.clone(),
policy_version: session.policy_version.clone(),
participants: session.participants.clone(),
participant_activity,
}),
}))
}
Expand Down
13 changes: 12 additions & 1 deletion src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::error::MacpError;
use crate::mode::ModeResponse;
use crate::pb::SessionStartPayload;
use prost::Message;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

pub const MAX_TTL_MS: i64 = 24 * 60 * 60 * 1000;

Expand Down Expand Up @@ -32,9 +32,20 @@ pub struct Session {
pub context: Vec<u8>,
pub roots: Vec<crate::pb::Root>,
pub initiator_sender: String,
pub participant_message_counts: HashMap<String, u32>,
pub participant_last_seen: HashMap<String, i64>,
}

impl Session {
pub fn record_participant_activity(&mut self, sender: &str, timestamp_ms: i64) {
*self
.participant_message_counts
.entry(sender.to_string())
.or_insert(0) += 1;
self.participant_last_seen
.insert(sender.to_string(), timestamp_ms);
}

pub fn apply_mode_response(&mut self, response: ModeResponse) {
match response {
ModeResponse::NoOp => {}
Expand Down
2 changes: 2 additions & 0 deletions src/storage/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ mod tests {
name: "r1".into(),
}],
initiator_sender: "alice".into(),
participant_message_counts: std::collections::HashMap::new(),
participant_last_seen: std::collections::HashMap::new(),
}
}

Expand Down
Loading
Loading