From e16bdaecd9e41d5dc1b0f5aa14650f35b00b0e90 Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Thu, 5 Mar 2026 12:33:11 -0800 Subject: [PATCH 1/5] The basic runtime for MACP --- .gitignore | 26 ++ Cargo.toml | 18 ++ README.md | 17 ++ build.rs | 6 + docs/README.md | 119 ++++++++ docs/architecture.md | 399 +++++++++++++++++++++++++++ docs/examples.md | 497 +++++++++++++++++++++++++++++++++ docs/protocol.md | 505 ++++++++++++++++++++++++++++++++++ proto/macp.proto | 38 +++ src/bin/client.rs | 82 ++++++ src/bin/fuzz_client.rs | 348 +++++++++++++++++++++++ src/bin/multi_round_client.rs | 149 ++++++++++ src/error.rs | 25 ++ src/lib.rs | 10 + src/log_store.rs | 114 ++++++++ src/main.rs | 28 ++ src/mode/decision.rs | 96 +++++++ src/mode/mod.rs | 33 +++ src/mode/multi_round.rs | 474 +++++++++++++++++++++++++++++++ src/registry.rs | 33 +++ src/runtime.rs | 395 ++++++++++++++++++++++++++ src/server.rs | 231 ++++++++++++++++ src/session.rs | 130 +++++++++ 23 files changed, 3773 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 README.md create mode 100644 build.rs create mode 100644 docs/README.md create mode 100644 docs/architecture.md create mode 100644 docs/examples.md create mode 100644 docs/protocol.md create mode 100644 proto/macp.proto create mode 100644 src/bin/client.rs create mode 100644 src/bin/fuzz_client.rs create mode 100644 src/bin/multi_round_client.rs create mode 100644 src/error.rs create mode 100644 src/lib.rs create mode 100644 src/log_store.rs create mode 100644 src/main.rs create mode 100644 src/mode/decision.rs create mode 100644 src/mode/mod.rs create mode 100644 src/mode/multi_round.rs create mode 100644 src/registry.rs create mode 100644 src/runtime.rs create mode 100644 src/server.rs create mode 100644 src/session.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..74ef82e --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +# Rust +/target/ +**/*.rs.bk +*.pdb +Cargo.lock + +# Build artifacts +/out/ + +# IDE / Editor +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Claude Code +.claude/ +CLAUDE.md +/plans/ +/tmp/ +/temp/ + +# OS +.DS_Store +Thumbs.db diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..bd7a008 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "macp-runtime" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1", features = ["full"] } +tonic = "0.11" +prost = "0.12" +prost-types = "0.12" +uuid = { version = "1", features = ["v4"] } +thiserror = "1" +chrono = "0.4" +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +[build-dependencies] +tonic-build = "0.11" diff --git a/README.md b/README.md new file mode 100644 index 0000000..a0f0159 --- /dev/null +++ b/README.md @@ -0,0 +1,17 @@ +# macp-runtime v0.1 + +Minimal Coordination Runtime (MCR) + +## Run + +Install protoc first. + +Then: + + cargo build + cargo run + +Server runs on 127.0.0.1:50051 + +Send SessionStart, then Message. +If payload == "resolve", session transitions to RESOLVED. diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..73024d6 --- /dev/null +++ b/build.rs @@ -0,0 +1,6 @@ +fn main() -> Result<(), Box> { + tonic_build::configure() + .build_server(true) + .compile(&["proto/macp.proto"], &["proto"])?; + Ok(()) +} diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 0000000..e645943 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,119 @@ +# MACP Runtime Documentation + +Welcome to the Multi-Agent Coordination Protocol (MACP) Runtime documentation. This guide explains everything about the system in plain language, even if you don't know Rust. + +## What Is This Project? + +The MACP Runtime (also called Minimal Coordination Runtime or MCR) is a **server** that helps multiple AI agents or programs coordinate with each other. Think of it as a traffic controller for conversations between different agents. + +### Real-World Analogy + +Imagine you're organizing a meeting: +1. Someone starts the meeting (SessionStart) +2. People send messages back and forth +3. Eventually, the meeting reaches a decision (Resolved state) +4. Once resolved, no more messages can be sent + +The MACP Runtime manages this entire lifecycle automatically and enforces the rules. + +## What Problem Does It Solve? + +When multiple AI agents or programs need to work together, they need a way to: +- **Start a conversation** (create a session) +- **Exchange messages** safely +- **Track the state** of the conversation +- **Know when it's done** (resolved) +- **Prevent messages after it's done** (enforce invariants) + +Without a coordination runtime, each agent would need to implement all this logic themselves, leading to bugs and inconsistencies. + +## Key Concepts + +### Sessions +A **session** is like a conversation thread. Each session has: +- A unique ID +- A current state (Open, Resolved, or Expired) +- A time-to-live (TTL) - how long before it expires +- Optional resolution data (the final outcome) + +### Messages +**Messages** are sent within a session. Each message includes: +- Which session it belongs to +- A unique message ID +- Who sent it +- When it was sent +- The actual content (payload) + +### States +Sessions go through different **states**: +1. **Open** - Active, accepting messages +2. **Resolved** - Decision made, no more messages allowed +3. **Expired** - TTL expired (planned feature) + +### The Protocol +The **MACP protocol** defines the rules for: +- How to format messages +- What fields are required +- How sessions transition between states +- What errors can occur + +## How It Works (High Level) + +``` +Client MACP Runtime + | | + |--SessionStart("s1")------------->| + |<-------Ack(accepted=true)--------| + | | + |--Message("hello")--------------->| + |<-------Ack(accepted=true)--------| + | | + |--Message("resolve")------------->| (session now RESOLVED) + |<-------Ack(accepted=true)--------| + | | + |--Message("more")---------------->| + |<---Ack(accepted=false, ----------| + | error="SessionNotOpen") | +``` + +## What's Built With + +- **gRPC**: A high-performance communication protocol (like HTTP but faster) +- **Protocol Buffers**: A way to define structured data (like JSON but more efficient) +- **Rust**: A programming language known for safety and speed + +You don't need to know Rust to understand the concepts - the documentation explains everything in plain language. + +## Components + +This runtime consists of: + +1. **Server** (`macp-runtime`) - The main runtime that manages sessions +2. **Client** (`client`) - A test client demonstrating basic usage +3. **Fuzz Client** (`fuzz_client`) - A test client that tries to break the rules + +## Documentation Structure + +- **[architecture.md](./architecture.md)** - How the system is designed internally +- **[protocol.md](./protocol.md)** - The MACP protocol specification +- **[examples.md](./examples.md)** - Step-by-step usage examples + +## Quick Start + +**Terminal 1** - Start the server: +```bash +cargo run +``` + +**Terminal 2** - Run a test client: +```bash +cargo run --bin client +``` + +You'll see the client send messages and the server respond with acknowledgments. + +## Next Steps + +1. Read [protocol.md](./protocol.md) to understand the MACP protocol +2. Read [architecture.md](./architecture.md) to understand how it's built +3. Read [examples.md](./examples.md) to see practical usage diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 0000000..281ad0c --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,399 @@ +# Architecture + +This document explains how the MACP Runtime is built internally. We'll explain each component in plain language, without assuming Rust knowledge. + +## System Overview + +``` +┌─────────────────────────────────────────────────┐ +│ Clients │ +│ (Other programs/agents wanting to coordinate) │ +└────────────┬────────────────────┬────────────────┘ + │ │ + │ gRPC calls │ gRPC calls + │ │ + ▼ ▼ +┌─────────────────────────────────────────────────┐ +│ MACP Runtime Server │ +│ ┌───────────────────────────────────────────┐ │ +│ │ MacpServer (gRPC Adapter Layer) │ │ +│ │ - Receives messages │ │ +│ │ - Validates transport-level fields │ │ +│ │ - Delegates to Runtime kernel │ │ +│ └──────────────┬────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────────────────────────────────┐ │ +│ │ Runtime (Kernel) │ │ +│ │ - Resolves mode │ │ +│ │ - Enforces TTL / session invariants │ │ +│ │ - Dispatches to Mode implementations │ │ +│ │ - Applies ModeResponse │ │ +│ └──────┬───────────────┬────────────────────┘ │ +│ │ │ │ +│ ▼ ▼ │ +│ ┌─────────────┐ ┌─────────────────────┐ │ +│ │ Mode │ │ Mode │ │ +│ │ Dispatcher │ │ Implementations │ │ +│ │ │ │ - DecisionMode │ │ +│ │ │ │ - MultiRoundMode │ │ +│ └─────────────┘ └─────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────────────────────────────────┐ │ +│ │ SessionRegistry LogStore │ │ +│ │ (Session State) (Event Log) │ │ +│ │ HashMap: id->Session HashMap: id->Vec │ │ +│ └───────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────┘ +``` + +## Core Components + +### 1. Protocol Definitions (proto/macp.proto) + +This file defines the "language" that clients and server use to communicate. It's like a contract that both sides agree to follow. + +**What's defined:** +- **Envelope**: The wrapper for every message + - Contains metadata (who, when, which session, which mode) + - Contains the actual payload (the message content) +- **Ack**: The response the server sends back + - `accepted`: true if the message was accepted, false if rejected + - `error`: explanation if rejected +- **SessionQuery / SessionInfo**: For querying session state +- **MACPService**: The service interface + - `SendMessage`: Send an Envelope, get an Ack + - `GetSession`: Query session state by ID + +**Why Protocol Buffers?** +Instead of JSON (text-based), Protocol Buffers use a binary format that's: +- Faster to send/receive +- Smaller in size +- Type-safe (can't accidentally send wrong data types) + +### 2. Generated Code (build.rs + target/debug/build/) + +The `build.rs` script runs before compilation and automatically generates Rust code from the `.proto` file. This generated code handles all the low-level serialization/deserialization. + +**What you need to know:** +- You never edit the generated code +- Changes to `.proto` automatically update the generated code +- The generated code appears as the `pb` module (protocol buffers) + +### 3. Main Server (src/main.rs) + +This is the entry point — where the program starts. + +**What it does:** +1. Creates a `SessionRegistry` (session state storage) +2. Creates a `LogStore` (event log storage) +3. Creates a `Runtime` (coordination kernel with registered modes) +4. Creates a `MacpServer` (gRPC adapter wrapping the runtime) +5. Starts a gRPC server on `127.0.0.1:50051` +6. Waits for incoming connections + +### 4. Error Types (src/error.rs) + +The system defines specific errors for each problem: + +```rust +pub enum MacpError { + InvalidMacpVersion, // Version != "v1" + InvalidEnvelope, // Missing required fields or invalid TTL payload + DuplicateSession, // SessionStart for existing session + UnknownSession, // Message for non-existent session + SessionNotOpen, // Message sent to resolved/expired session + TtlExpired, // Session TTL has elapsed + InvalidTtl, // TTL value out of range (<=0 or >24h) + UnknownMode, // Mode not registered in runtime + InvalidModeState, // Mode state bytes can't be deserialized + InvalidPayload, // Payload doesn't match mode's expected format +} +``` + +**Error conversion:** +Errors are never "panics" (crashes). They're converted to Ack responses: +```rust +Ack { + accepted: false, + error: "SessionNotOpen" +} +``` + +### 5. Session Types (src/session.rs) + +Each **Session** stores information about one coordination: + +```rust +pub struct Session { + pub session_id: String, // e.g., "s1" + pub state: SessionState, // Open, Resolved, or Expired + pub ttl_expiry: i64, // Unix timestamp (milliseconds) + pub resolution: Option>, // Final result (if resolved) + pub mode: String, // Mode name (e.g., "decision", "multi_round") + pub mode_state: Vec, // Mode-specific state (opaque bytes) + pub participants: Vec, // Participant list (for multi_round) +} +``` + +**Session States:** +- **Open**: Session is active, can receive messages +- **Resolved**: Decision made (via mode), no more messages allowed +- **Expired**: TTL has elapsed, enforced on next message receipt + +### 6. Session Registry (src/registry.rs) + +The **SessionRegistry** is like a database in memory. It stores all active sessions. + +**Data structure:** +``` +SessionRegistry { + sessions: HashMap +} +``` + +**Thread-safety:** +Multiple clients can connect at the same time. The `RwLock` ensures: +- Multiple readers can read simultaneously (efficient) +- Only one writer at a time (prevents corruption) +- Readers wait while someone is writing + +### 7. Log Store (src/log_store.rs) + +The **LogStore** maintains an append-only event log per session: + +```rust +pub struct LogEntry { + pub message_id: String, + pub received_at_ms: i64, + pub sender: String, + pub message_type: String, + pub raw_payload: Vec, + pub entry_kind: EntryKind, // Incoming or Internal +} +``` + +- **Incoming** entries: messages received from clients +- **Internal** entries: runtime-generated events (e.g., TtlExpired) +- Entries are always appended before state mutation (log-before-mutate ordering) + +### 8. Mode System (src/mode/) + +The **Mode** trait defines the interface for coordination logic: + +```rust +pub trait Mode: Send + Sync { + fn on_session_start(&self, session: &Session, env: &Envelope) + -> Result; + fn on_message(&self, session: &Session, env: &Envelope) + -> Result; +} +``` + +Modes receive **immutable** session state and return a `ModeResponse`: +- `NoOp` — no state change +- `PersistState(bytes)` — update mode-specific state +- `Resolve(bytes)` — resolve the session +- `PersistAndResolve{state, resolution}` — both at once + +**DecisionMode** (`src/mode/decision.rs`): +- `on_session_start()` → `NoOp` +- `on_message()` → if `payload == b"resolve"` then `Resolve` else `NoOp` + +**MultiRoundMode** (`src/mode/multi_round.rs`): +- Tracks participants, contributions per round, and convergence +- `on_session_start()` → parses config, returns `PersistState` with initial state +- `on_message()` → updates contributions, checks convergence, returns `PersistState` or `PersistAndResolve` + +### 9. Runtime Kernel (src/runtime.rs) + +The **Runtime** is the coordination kernel that orchestrates everything: + +```rust +pub struct Runtime { + pub registry: Arc, + pub log_store: Arc, + modes: HashMap>, +} +``` + +**Processing flow:** +``` +1. Receive Envelope (from MacpServer) +2. For SessionStart: + a. Resolve mode name (empty → "decision") + b. Look up mode implementation → error if unknown + c. Parse TTL from payload + d. Acquire write lock, check for duplicate session + e. Create session log, append Incoming entry + f. Call mode.on_session_start() + g. Insert session, apply ModeResponse +3. For other messages: + a. Acquire write lock, find session + b. TTL check → if expired, log Internal entry, set Expired + c. State check → if not Open, reject + d. Append Incoming log entry + e. Call mode.on_message() + f. Apply ModeResponse +4. Return Ok/Err to MacpServer +``` + +**`apply_mode_response()`** is the single mutation point: +- `NoOp` → nothing +- `PersistState(s)` → `session.mode_state = s` +- `Resolve(r)` → `session.state = Resolved, session.resolution = Some(r)` +- `PersistAndResolve{s,r}` → both + +### 10. MacpServer (src/server.rs) + +The **MacpServer** is now a thin gRPC adapter: + +**Responsibilities:** +1. Validate transport-level fields (version, required fields) +2. Delegate to `Runtime::process()` +3. Convert results to `Ack` responses +4. Handle `GetSession` queries + +All coordination logic lives in the Runtime and Mode implementations. + +## Data Flow Example + +Let's trace what happens when a client sends a multi-round convergence message: + +### Step 1: Client sends SessionStart +``` +Client → gRPC → MacpServer::send_message() + → validate() + → Runtime::process() + → resolve mode = "multi_round" + → parse TTL + → create session log + → MultiRoundMode::on_session_start() + → PersistState(initial_state) + → insert session with mode_state + ← Ack(accepted=true) +``` + +### Step 2: Client sends Contribute +``` +Client → gRPC → MacpServer::send_message() + → validate() + → Runtime::process() + → find session, check TTL, check Open + → append Incoming log entry + → MultiRoundMode::on_message() + → update contributions, check convergence + → PersistState(updated_state) or PersistAndResolve + → apply response + ← Ack(accepted=true) +``` + +### Step 3: Client queries state +``` +Client → gRPC → MacpServer::get_session() + → registry.get_session(id) + ← SessionInfo(state, mode, resolution, ...) +``` + +## Concurrency Model + +**Question:** What happens if two clients send messages at the same time? + +**Answer:** They're handled safely: +1. Both enter `send_message()` simultaneously +2. Both validate independently (no conflicts here) +3. First one acquires the write lock +4. First one processes through the mode, modifies the registry +5. First one releases the lock +6. Second one acquires the write lock +7. Second one processes through the mode, modifies the registry +8. Second one releases the lock + +The `RwLock` ensures they don't interfere with each other. + +## File Structure + +``` +runtime/ +├── proto/ +│ └── macp.proto # Protocol definition (Envelope, Ack, SessionQuery, SessionInfo) +├── src/ +│ ├── main.rs # Entry point, wires up Runtime + gRPC server +│ ├── lib.rs # Shared library (pb module + public module exports) +│ ├── server.rs # Thin gRPC adapter delegating to Runtime +│ ├── error.rs # MacpError enum (all error variants) +│ ├── session.rs # Session, SessionState, TTL parsing +│ ├── registry.rs # SessionRegistry (thread-safe session store) +│ ├── log_store.rs # Append-only LogStore for session event logs +│ ├── runtime.rs # Runtime kernel (dispatch + apply ModeResponse) +│ ├── mode/ +│ │ ├── mod.rs # Mode trait + ModeResponse enum +│ │ ├── decision.rs # DecisionMode (payload=="resolve" → Resolve) +│ │ └── multi_round.rs # MultiRoundMode (convergence-based resolution) +│ └── bin/ +│ ├── client.rs # Test client (happy path) +│ ├── fuzz_client.rs # Test client (error paths + multi-round) +│ └── multi_round_client.rs # Multi-round convergence demo +├── build.rs # Generates code from .proto +├── Cargo.toml # Dependencies and project config +└── target/ # Build output (binaries, generated code) +``` + +## Build Process + +1. `build.rs` runs first + - Reads `proto/macp.proto` + - Generates Rust code in `target/debug/build/*/out/macp.v1.rs` + +2. Rust compiler compiles: + - `src/lib.rs` (library with all modules) + - `src/main.rs` (server binary) + - `src/bin/client.rs` (client binary) + - `src/bin/fuzz_client.rs` (fuzz binary) + - `src/bin/multi_round_client.rs` (multi-round demo binary) + +3. Output binaries: + - `target/debug/macp-runtime` (server) + - `target/debug/client` + - `target/debug/fuzz_client` + - `target/debug/multi_round_client` + +## Design Principles + +### 1. Separation of Concerns +- **Protocol definition** (`.proto`) separate from implementation +- **State management** (`SessionRegistry`) separate from coordination logic +- **Mode logic** separate from runtime kernel +- **Validation** happens before state mutation +- **Logging** happens before mode dispatch + +### 2. Pluggable Coordination +- Runtime provides "physics" (invariants, TTL, logging, routing) +- Modes provide "coordination logic" (when to resolve, what state to track) +- New modes can be added without modifying the runtime kernel + +### 3. Fail-Safe +- Invalid messages are rejected, not ignored +- No partial state updates (atomic operations via single mutation point) +- Errors are explicit, not silent + +### 4. Minimal Coordination +- Server doesn't interpret payloads (except through Mode implementations) +- Sessions are independent (no cross-session coordination) +- Modes receive immutable state and return responses + +### 5. Structural Invariants +The system enforces structural rules: +- Can't start a session twice +- Can't send to non-existent session +- Can't send to resolved/expired session +- Must use correct version +- Must reference registered mode + +These are **protocol-level** invariants, not domain-specific business rules. + +## Next Steps + +- Read [protocol.md](./protocol.md) for the full protocol specification +- Read [examples.md](./examples.md) for practical usage examples diff --git a/docs/examples.md b/docs/examples.md new file mode 100644 index 0000000..872db9d --- /dev/null +++ b/docs/examples.md @@ -0,0 +1,497 @@ +# Examples and Usage + +This document provides step-by-step examples of using the MACP Runtime. Even if you don't know Rust, you can follow along and understand what's happening. + +## Quick Start + +### Running the Server + +**Terminal 1:** +```bash +cd /path/to/runtime +cargo run +``` + +You should see: +``` +macp-runtime v0.1 listening on 127.0.0.1:50051 +``` + +The server is now ready to accept connections. + +### Running the Test Client + +**Terminal 2:** +```bash +cargo run --bin client +``` + +You should see output like: +``` +SessionStart ack: accepted=true error='' +Message ack: accepted=true error='' +Resolve ack: accepted=true error='' +After-resolve ack: accepted=false error='SessionNotOpen' +``` + +**What happened?** +1. Client created a session (decision mode) +2. Client sent a normal message +3. Client sent a "resolve" message (session transitions to Resolved) +4. Client tried to send another message (rejected because session is Resolved) + +## Example 1: Basic Client Walkthrough + +Let's walk through the client code step by step (src/bin/client.rs). + +### Step 1: Connect to the Server + +```rust +let mut client = MacpServiceClient::connect("http://127.0.0.1:50051").await?; +``` + +**What this does:** +- Creates a gRPC client +- Connects to the server at `127.0.0.1:50051` +- If server isn't running, this will fail + +### Step 2: Create an Envelope + +```rust +let start = Envelope { + macp_version: "v1".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: "ajit".into(), + timestamp_unix_ms: 1_700_000_000_000, + payload: vec![], +}; +``` + +**What each field means:** +- `macp_version`: Protocol version (must be "v1") +- `mode`: Coordination mode ("decision" for simple resolve, "multi_round" for convergence) +- `message_type`: "SessionStart" to create a session +- `message_id`: Unique ID for this message ("m1") +- `session_id`: Which session ("s1") +- `sender`: Who's sending it ("ajit") +- `timestamp_unix_ms`: When sent (Unix timestamp in milliseconds) +- `payload`: Message content (empty for basic SessionStart) + +### Step 3: Send and Receive Ack + +```rust +let ack = client.send_message(start).await?.into_inner(); +println!("SessionStart ack: accepted={} error='{}'", ack.accepted, ack.error); +``` + +### Step 4: Resolve the Session + +```rust +let resolve = Envelope { + // ... + payload: b"resolve".to_vec(), // Magic payload for decision mode +}; +``` + +In decision mode, payload `"resolve"` triggers session resolution. + +## Example 2: Multi-Round Convergence + +The multi-round mode enables participant-based convergence. Run the demo: + +**Terminal 2:** +```bash +cargo run --bin multi_round_client +``` + +**Expected output:** +``` +=== Multi-Round Convergence Demo === + +[session_start] accepted=true error='' +[alice_contributes_a] accepted=true error='' +[bob_contributes_b] accepted=true error='' +[get_session] state=Open mode=multi_round participants=["alice", "bob"] +[bob_revises_to_a] accepted=true error='' +[get_session] state=Resolved resolution={"converged_value":"option_a","round":3,"final":{"alice":"option_a","bob":"option_a"}} +[after_convergence] accepted=false error='SessionNotOpen' + +=== Demo Complete === +``` + +### Step-by-Step Walkthrough + +#### 1. Create a Multi-Round Session + +```rust +let payload = serde_json::json!({ + "participants": ["alice", "bob"], + "convergence": {"type": "all_equal"}, + "ttl_ms": 60000 +}); + +let start = Envelope { + macp_version: "v1".into(), + mode: "multi_round".into(), + message_type: "SessionStart".into(), + message_id: "m0".into(), + session_id: "mr1".into(), + sender: "coordinator".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: payload.to_string().into_bytes(), +}; +``` + +**SessionStart payload for multi_round mode:** +- `participants`: List of participant IDs who will contribute +- `convergence.type`: "all_equal" — resolve when all participants submit the same value +- `ttl_ms`: Optional TTL override + +#### 2. Submit Contributions + +```rust +let contribute = Envelope { + macp_version: "v1".into(), + mode: "multi_round".into(), + message_type: "Contribute".into(), + message_id: "m1".into(), + session_id: "mr1".into(), + sender: "alice".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: br#"{"value":"option_a"}"#.to_vec(), +}; +``` + +Each participant sends a `Contribute` message with `{"value": "..."}` payload. + +#### 3. Convergence + +When all participants have contributed and all values are identical, the session auto-resolves with: +```json +{ + "converged_value": "option_a", + "round": 3, + "final": { + "alice": "option_a", + "bob": "option_a" + } +} +``` + +#### 4. Revisions + +Participants can revise their contributions. Each new value increments the round counter. Re-submitting the same value does not increment the round. + +## Example 3: Fuzz Client (Testing Error Paths) + +The fuzz client tests all the ways things can go wrong, including multi-round scenarios: + +**Terminal 2:** +```bash +cargo run --bin fuzz_client +``` + +**Expected output:** +``` +[wrong_version] accepted=false error='InvalidMacpVersion' +[missing_fields] accepted=false error='InvalidEnvelope' +[unknown_session_message] accepted=false error='UnknownSession' +[session_start_ok] accepted=true error='' +[session_start_duplicate] accepted=false error='DuplicateSession' +[message_ok] accepted=true error='' +[resolve] accepted=true error='' +[after_resolve] accepted=false error='SessionNotOpen' +[ttl_session_start] accepted=true error='' +[ttl_expired_message] accepted=false error='TtlExpired' +[invalid_ttl_zero] accepted=false error='InvalidTtl' +[invalid_ttl_negative] accepted=false error='InvalidTtl' +[invalid_ttl_exceeds_max] accepted=false error='InvalidTtl' +[multi_round_start] accepted=true error='' +[multi_round_alice] accepted=true error='' +[multi_round_bob_diff] accepted=true error='' +[multi_round_bob_converge] accepted=true error='' +[multi_round_after_resolve] accepted=false error='SessionNotOpen' +``` + +The last 5 lines show the multi-round scenario: +1. Create multi-round session with alice and bob +2. Alice contributes "option_a" +3. Bob contributes "option_b" (values differ, no convergence) +4. Bob revises to "option_a" (all equal → convergence → auto-resolved) +5. Alice tries to contribute after resolution → `SessionNotOpen` + +## Example 4: Using GetSession + +Query session state at any time: + +```rust +use macp_runtime::pb::SessionQuery; + +let info = client.get_session(SessionQuery { + session_id: "s1".into(), +}).await?.into_inner(); + +println!("Session {} is in state {}", info.session_id, info.state); +println!("Mode: {}", info.mode); +println!("Participants: {:?}", info.participants); + +if !info.resolution.is_empty() { + println!("Resolution: {}", String::from_utf8_lossy(&info.resolution)); +} +``` + +**Response fields:** +- `session_id`, `mode`, `state` ("Open"/"Resolved"/"Expired") +- `ttl_expiry` (Unix ms timestamp) +- `resolution` (bytes, empty if not resolved) +- `mode_state` (bytes, mode-specific internal state) +- `participants` (list of participant IDs) + +If the session doesn't exist, the RPC returns a gRPC `NOT_FOUND` status. + +## Example 5: Common Patterns + +### Pattern 1: Error Handling + +Always check the Ack: + +```rust +let ack = client.send_message(envelope).await?.into_inner(); + +if ack.accepted { + println!("Success!"); +} else { + match ack.error.as_str() { + "InvalidMacpVersion" => println!("Use version v1"), + "InvalidEnvelope" => println!("Check required fields"), + "DuplicateSession" => println!("Session already exists"), + "UnknownSession" => println!("Session doesn't exist"), + "SessionNotOpen" => println!("Session is resolved/expired"), + "TtlExpired" => println!("Session TTL has elapsed, create a new session"), + "InvalidTtl" => println!("TTL must be 1..=86400000 ms"), + "UnknownMode" => println!("Use 'decision' or 'multi_round'"), + "InvalidModeState" => println!("Internal mode state error"), + "InvalidPayload" => println!("Check mode-specific payload format"), + _ => println!("Unknown error: {}", ack.error), + } +} +``` + +### Pattern 2: Unique Message IDs + +Use UUIDs for message IDs: + +```rust +use uuid::Uuid; + +let message_id = Uuid::new_v4().to_string(); + +let envelope = Envelope { + message_id: message_id, + // ... other fields +}; +``` + +### Pattern 3: Current Timestamp + +Use the current time for timestamps: + +```rust +use chrono::Utc; + +let timestamp = Utc::now().timestamp_millis(); + +let envelope = Envelope { + timestamp_unix_ms: timestamp, + // ... other fields +}; +``` + +### Pattern 4: Helper Function + +Create a helper to build envelopes: + +```rust +fn create_envelope( + mode: &str, + message_type: &str, + session_id: &str, + sender: &str, + payload: &[u8], +) -> Envelope { + Envelope { + macp_version: "v1".into(), + mode: mode.into(), + message_type: message_type.into(), + message_id: Uuid::new_v4().to_string(), + session_id: session_id.into(), + sender: sender.into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: payload.to_vec(), + } +} + +// Usage: +let start = create_envelope("decision", "SessionStart", "s1", "alice", b""); +let msg = create_envelope("decision", "Message", "s1", "alice", b"hello"); +let contribute = create_envelope("multi_round", "Contribute", "mr1", "alice", + br#"{"value":"option_a"}"#); +``` + +## Example 6: Multi-Agent Scenario + +Imagine two agents coordinating via multi-round convergence: + +### Coordinator starts the session + +```rust +let payload = serde_json::json!({ + "participants": ["alpha", "beta"], + "convergence": {"type": "all_equal"} +}); + +let start = create_envelope( + "multi_round", "SessionStart", "decision-001", "coordinator", + payload.to_string().as_bytes() +); +client.send_message(start).await?; +``` + +### Agent Alpha contributes + +```rust +let contribute = create_envelope( + "multi_round", "Contribute", "decision-001", "alpha", + br#"{"value":"option_a"}"# +); +client.send_message(contribute).await?; +``` + +### Agent Beta contributes (different value) + +```rust +let contribute = create_envelope( + "multi_round", "Contribute", "decision-001", "beta", + br#"{"value":"option_b"}"# +); +client.send_message(contribute).await?; +// Session still Open — values differ +``` + +### Agent Beta revises + +```rust +let contribute = create_envelope( + "multi_round", "Contribute", "decision-001", "beta", + br#"{"value":"option_a"}"# +); +client.send_message(contribute).await?; +// Session auto-resolved! All participants agreed on "option_a" +``` + +### Check resolution + +```rust +let info = client.get_session(SessionQuery { + session_id: "decision-001".into(), +}).await?.into_inner(); + +assert_eq!(info.state, "Resolved"); +// info.resolution contains {"converged_value":"option_a","round":3,"final":{...}} +``` + +## Example 7: Session with Custom TTL + +You can configure a session's time-to-live by providing a JSON payload in `SessionStart`: + +```rust +// Start a session with a 5-second TTL +let start = Envelope { + macp_version: "v1".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s_ttl_demo".into(), + sender: "agent-1".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: br#"{"ttl_ms": 5000}"#.to_vec(), +}; + +let ack = client.send_message(start).await?.into_inner(); +assert!(ack.accepted); // Session created with 5s TTL + +// Wait for TTL to expire +tokio::time::sleep(Duration::from_secs(6)).await; + +// This message will be rejected with TtlExpired +let late_msg = Envelope { + // ... + payload: b"too late".to_vec(), +}; + +let ack = client.send_message(late_msg).await?.into_inner(); +assert!(!ack.accepted); +assert_eq!(ack.error, "TtlExpired"); +``` + +**TTL rules:** +- Empty payload → default 60-second TTL +- `{"ttl_ms": 5000}` → 5-second TTL +- `ttl_ms` must be between 1 and 86,400,000 (24 hours) +- Values outside this range are rejected with `InvalidTtl` + +## Common Questions + +### Q: Can I send messages from different senders to the same session? + +**A:** Yes! The `sender` field is informational. Any sender can send to any session. + +### Q: What if I use the same message_id twice? + +**A:** The server doesn't currently check for duplicate message IDs. This is a validation you'd add in your client. + +### Q: Can I create multiple sessions with the same ID? + +**A:** No. The second SessionStart will be rejected with `DuplicateSession`. + +### Q: How long do sessions last? + +**A:** Sessions have a configurable TTL (default 60 seconds, max 24 hours). Specify a custom TTL by including `{"ttl_ms": }` in the `SessionStart` payload. + +### Q: Can I "unresolve" a session? + +**A:** No. Resolved is a terminal state. You'd need to create a new session. + +### Q: What happens if the server crashes? + +**A:** All session state is lost (it's in-memory only). Clients would need to reconnect and restart sessions. + +### Q: What modes are available? + +**A:** Currently two modes: +- `decision` (default): Simple resolve-on-payload mode +- `multi_round`: Multi-round convergence with participant tracking + +### Q: What happens if I use an empty mode field? + +**A:** It defaults to `"decision"` for backward compatibility. + +### Q: Can non-participants contribute in multi_round mode? + +**A:** Currently yes — participant membership gating is planned for a future release. + +## Next Steps + +Now that you've seen examples, you can: + +1. **Modify the test clients** - Try different scenarios +2. **Build your own client** - In Python, JavaScript, Go, etc. +3. **Add business logic** - Interpret payloads for your use case +4. **Extend the protocol** - Add new modes with the Mode trait + +For deeper understanding: +- Read [architecture.md](./architecture.md) to see how it's implemented +- Read [protocol.md](./protocol.md) for the complete specification diff --git a/docs/protocol.md b/docs/protocol.md new file mode 100644 index 0000000..3f9a834 --- /dev/null +++ b/docs/protocol.md @@ -0,0 +1,505 @@ +# MACP Protocol Specification + +This document describes the Multi-Agent Coordination Protocol (MACP) in detail. We explain what each field means, what rules apply, and why they exist. + +## Protocol Version + +Current version: **v1** + +All messages must specify `macp_version: "v1"` or they will be rejected. + +## Core Concepts + +### What is a Protocol? + +A protocol is a set of rules that everyone agrees to follow. Like how English grammar has rules (subject-verb-object), the MACP protocol has rules for: +- What information must be included in each message +- What order things must happen +- What's allowed and what's forbidden + +### Why Have a Protocol? + +Without a protocol: +- Different agents might format messages differently +- State transitions could be inconsistent +- Errors would be ambiguous +- Debugging would be impossible + +With a protocol: +- Everyone speaks the same "language" +- Behavior is predictable +- Tools can be built to work with any MACP-compliant system + +## Message Types + +### Envelope + +Every message sent to the server is wrapped in an **Envelope**. Think of it as an addressed package. + +**Fields:** + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `macp_version` | string | Yes | Protocol version (must be "v1") | +| `mode` | string | Yes | Coordination mode (e.g., "decision", "multi_round") | +| `message_type` | string | Yes | Type of message ("SessionStart", "Message", "Contribute", etc.) | +| `message_id` | string | Yes | Unique ID for this message | +| `session_id` | string | Yes | Which session this belongs to | +| `sender` | string | Yes | Who is sending this message | +| `timestamp_unix_ms` | int64 | Yes | When sent (Unix timestamp in milliseconds) | +| `payload` | bytes | No | The actual message content | + +**Example (JSON representation for readability):** +```json +{ + "macp_version": "v1", + "mode": "decision", + "message_type": "SessionStart", + "message_id": "m1", + "session_id": "s1", + "sender": "agent-alpha", + "timestamp_unix_ms": 1700000000000, + "payload": "" +} +``` + +### Ack (Acknowledgment) + +Every message receives an **Ack** response. It tells you if the message was accepted or rejected. + +**Fields:** + +| Field | Type | Description | +|-------|------|-------------| +| `accepted` | bool | `true` if accepted, `false` if rejected | +| `error` | string | Empty if accepted, error name if rejected | + +**Success example:** +```json +{ + "accepted": true, + "error": "" +} +``` + +**Error example:** +```json +{ + "accepted": false, + "error": "SessionNotOpen" +} +``` + +### SessionQuery / SessionInfo + +The `GetSession` RPC allows querying session state: + +**SessionQuery:** + +| Field | Type | Description | +|-------|------|-------------| +| `session_id` | string | The session to query | + +**SessionInfo:** + +| Field | Type | Description | +|-------|------|-------------| +| `session_id` | string | Session identifier | +| `mode` | string | Coordination mode | +| `state` | string | "Open", "Resolved", or "Expired" | +| `ttl_expiry` | int64 | TTL expiry timestamp (Unix ms) | +| `resolution` | bytes | Resolution data (if resolved) | +| `mode_state` | bytes | Current mode-specific state | +| `participants` | repeated string | Session participants | + +## Mode Dispatcher + +The runtime uses a **Mode Dispatcher** to route messages to the appropriate coordination mode. The `mode` field in the Envelope determines which Mode handles the message. + +### Available Modes + +| Mode | Description | Default? | +|------|-------------|----------| +| `decision` | Simple resolve-on-payload mode | Yes (empty mode field defaults here) | +| `multi_round` | Multi-round convergence with participant tracking | No | + +### How Modes Work + +1. Runtime receives an Envelope +2. Resolves the mode name (empty string → "decision") +3. Looks up the registered Mode implementation +4. Calls `mode.on_session_start()` or `mode.on_message()` +5. Mode returns a `ModeResponse` (`NoOp`, `PersistState`, `Resolve`, `PersistAndResolve`) +6. Runtime applies the response to mutate session state + +Modes are **pure logic** — they receive immutable session state and return a response. The runtime kernel handles all mutation. + +## Message Types + +### SessionStart + +Creates a new session. This must be the first message for any session. + +**Requirements:** +- `message_type` must be `"SessionStart"` +- `session_id` must be unique (not already exist) +- `mode` must reference a registered mode (or be empty for default) +- All required Envelope fields must be present + +**What happens:** +1. Server resolves mode (empty → "decision") +2. If mode is unknown → reject with `UnknownMode` +3. Server parses TTL from payload (see TTL Configuration below) +4. If TTL is invalid → reject with `InvalidTtl` +5. Server checks if session already exists → reject with `DuplicateSession` +6. Creates session log, appends incoming entry +7. Calls `mode.on_session_start()` → may return `PersistState` with initial mode state +8. Creates new session with state: `Open`, configured TTL, and mode state + +**Example (decision mode, default TTL):** +```json +{ + "macp_version": "v1", + "mode": "decision", + "message_type": "SessionStart", + "message_id": "msg-001", + "session_id": "session-alpha", + "sender": "agent-1", + "timestamp_unix_ms": 1700000000000, + "payload": "" +} +``` + +**Example (multi_round mode):** +```json +{ + "macp_version": "v1", + "mode": "multi_round", + "message_type": "SessionStart", + "message_id": "msg-001", + "session_id": "session-beta", + "sender": "coordinator", + "timestamp_unix_ms": 1700000000000, + "payload": "{\"participants\":[\"alice\",\"bob\"],\"convergence\":{\"type\":\"all_equal\"},\"ttl_ms\":60000}" +} +``` + +### TTL Configuration + +The `SessionStart` payload can optionally contain a JSON object to configure session TTL: + +```json +{"ttl_ms": 5000} +``` + +**Rules:** + +| Payload | Behavior | +|---------|----------| +| Empty (`b""`) | Default TTL: 60 seconds | +| `{"ttl_ms": 5000}` | Custom TTL: 5 seconds | +| `{}` or `{"ttl_ms": null}` | Default TTL: 60 seconds | +| `{"ttl_ms": 0}` or negative | Rejected with `InvalidTtl` | +| `{"ttl_ms": 86400001}` (>24h) | Rejected with `InvalidTtl` | +| Invalid JSON or non-UTF-8 | Rejected with `InvalidEnvelope` | + +**Bounds:** `ttl_ms` must be in range `1..=86,400,000` (1ms to 24 hours). + +For multi_round mode, the TTL is part of the mode-specific payload alongside `participants` and `convergence`. + +### Regular Message (Decision Mode) + +Sends content within an existing decision mode session. + +**Requirements:** +- `message_type` can be anything except `"SessionStart"` +- `session_id` must reference an existing session +- Session must be in `Open` state + +**What happens:** +1. Server finds the session +2. If not found → reject with `UnknownSession` +3. If found, Open, and TTL has expired → log internal entry, transition to `Expired`, reject with `TtlExpired` +4. If found but not Open → reject with `SessionNotOpen` +5. Append incoming log entry +6. Call `mode.on_message()`: + - Decision mode: if payload is `"resolve"` → `Resolve`, else → `NoOp` +7. Apply mode response + +### Contribute Message (Multi-Round Mode) + +Submits a contribution in a multi-round convergence session. + +**Requirements:** +- `message_type` must be `"Contribute"` +- `session_id` must reference an existing multi_round session +- `sender` should be one of the registered participants +- `payload` must be JSON: `{"value": ""}` + +**What happens:** +1. Mode decodes session's `mode_state` +2. If sender's value changed from previous → increment round counter +3. Check convergence: all participants contributed + all values identical +4. If converged → `PersistAndResolve` with resolution `{"converged_value":"...","round":N,"final":{...}}` +5. If not converged → `PersistState` with updated contributions + +## Multi-Round Mode Specification + +### SessionStart Payload + +```json +{ + "participants": ["alice", "bob"], + "convergence": {"type": "all_equal"}, + "ttl_ms": 60000 +} +``` + +- `participants`: Non-empty list of participant identifiers +- `convergence.type`: Must be `"all_equal"` (only supported strategy) +- `ttl_ms`: Optional TTL override + +### Convergence Strategy: `all_equal` + +The session resolves automatically when: +1. All listed participants have submitted a contribution +2. All contribution values are identical + +### Round Counting + +- Round starts at 0 +- Each time a participant submits a **new or changed** value, the round increments +- Re-submitting the same value does not increment the round + +### Resolution Payload + +When convergence is reached, the resolution contains: +```json +{ + "converged_value": "option_a", + "round": 3, + "final": { + "alice": "option_a", + "bob": "option_a" + } +} +``` + +## Session State Machine + +Sessions follow a strict state machine: + +``` + SessionStart + ↓ + ┌────────┐ + │ OPEN │ ← Initial state + └────────┘ + ↓ + (mode returns Resolve or + PersistAndResolve) + ↓ + ┌──────────┐ + │ RESOLVED │ ← Terminal state + └──────────┘ + + (Alternative path) + ↓ + (TTL expires) + ↓ + ┌─────────┐ + │ EXPIRED │ ← Terminal state + └─────────┘ +``` + +**State descriptions:** + +| State | Can receive messages? | Can transition to | +|-------|----------------------|-------------------| +| OPEN | Yes | RESOLVED, EXPIRED | +| RESOLVED | No | (none - terminal) | +| EXPIRED | No | (none - terminal) | + +Resolution is now **mode-driven** — the runtime applies whatever `ModeResponse` the mode returns, rather than checking for hardcoded payloads. + +## Validation Rules + +The server validates every message before processing. Here are all the checks: + +### 1. Version Check +``` +IF macp_version != "v1" +THEN reject with InvalidMacpVersion +``` + +### 2. Required Fields Check +``` +IF session_id is empty OR message_id is empty +THEN reject with InvalidEnvelope +``` + +### 3. Mode Check (for SessionStart) +``` +IF mode is not registered +THEN reject with UnknownMode +``` + +### 4. Session Existence (for SessionStart) +``` +IF message_type == "SessionStart" AND session exists +THEN reject with DuplicateSession +``` + +### 5. Session Existence (for other messages) +``` +IF message_type != "SessionStart" AND session does not exist +THEN reject with UnknownSession +``` + +### 6. Session State Check +``` +IF session exists AND session.state != OPEN +THEN reject with SessionNotOpen +``` + +### 7. TTL Payload Check (for SessionStart) +``` +IF message_type == "SessionStart" AND payload is non-empty +THEN parse payload as JSON {"ttl_ms": } +IF invalid UTF-8 or invalid JSON THEN reject with InvalidEnvelope +IF ttl_ms <= 0 OR ttl_ms > 86400000 THEN reject with InvalidTtl +``` + +### 8. TTL Expiry Check (for non-SessionStart) +``` +IF session.state == OPEN AND current_time > session.ttl_expiry +THEN log internal TtlExpired entry, transition session to EXPIRED, reject with TtlExpired +``` + +## Error Codes + +All possible errors: + +| Error Code | When it occurs | How to fix | +|------------|----------------|------------| +| `InvalidMacpVersion` | `macp_version` is not "v1" | Use `macp_version: "v1"` | +| `InvalidEnvelope` | Missing required fields | Include all required fields | +| `DuplicateSession` | SessionStart for existing session | Use a different `session_id` | +| `UnknownSession` | Message for non-existent session | Send SessionStart first | +| `SessionNotOpen` | Message to resolved/expired session | Can't send more messages | +| `TtlExpired` | Session TTL has elapsed | Create a new session | +| `InvalidTtl` | TTL value out of range (<=0 or >24h) | Use ttl_ms in range 1..=86400000 | +| `UnknownMode` | Mode field references unregistered mode | Use "decision" or "multi_round" | +| `InvalidModeState` | Internal mode state is corrupted | Typically an internal error | +| `InvalidPayload` | Payload doesn't match mode's expected format | Check mode-specific payload requirements | + +## gRPC Service Definition + +In Protocol Buffers syntax: + +```protobuf +service MACPService { + rpc SendMessage(Envelope) returns (Ack); + rpc GetSession(SessionQuery) returns (SessionInfo); +} +``` + +**What this means:** +- Service name: `MACPService` +- Two operations: `SendMessage` and `GetSession` +- `SendMessage`: Takes `Envelope`, returns `Ack` +- `GetSession`: Takes `SessionQuery`, returns `SessionInfo` (or gRPC NOT_FOUND) +- Communication: Synchronous (client waits for response) + +## Transport + +The protocol uses **gRPC over HTTP/2**: + +**Advantages:** +- Binary protocol (efficient) +- Type-safe (schema enforcement) +- Streaming support (future extension) +- Wide language support +- Built-in authentication (TLS) + +**Default address:** `127.0.0.1:50051` + +## Future Extensions (Planned) + +### 1. Background TTL Cleanup +Currently, TTL is enforced on message receipt (lazy expiry). Future versions will: +- Run a background task to periodically remove expired sessions from memory +- Reduce memory footprint for long-running servers + +### 2. Replay Engine +Replay session logs to reconstruct state for debugging and auditing. + +### 3. GetSessionLog RPC +New RPC to query session event logs: +```protobuf +rpc GetSessionLog(SessionQuery) returns (SessionLog); +``` + +### 4. Participant Membership Gating +Enforce that only registered participants can send messages to a session. + +### 5. Additional Convergence Strategies +- `majority` — resolve when a majority of participants agree +- `threshold` — resolve when N participants agree + +### 6. Streaming +Support for bidirectional streaming: +```protobuf +rpc StreamMessages(stream Envelope) returns (stream Ack); +``` + +## Comparison to Other Protocols + +### vs HTTP REST +- MACP: Binary, type-safe, generated clients +- REST: Text-based, flexible, manual clients + +### vs WebSockets +- MACP: RPC-style (request/response pairs) +- WebSockets: Raw bidirectional streaming + +### vs Message Queues (RabbitMQ, Kafka) +- MACP: Synchronous acknowledgment, session-oriented +- Message Queues: Asynchronous, topic-oriented + +## Best Practices + +### For Clients + +1. **Always check Ack.accepted** + ```rust + let ack = client.send_message(env).await?.into_inner(); + if !ack.accepted { + println!("Error: {}", ack.error); + } + ``` + +2. **Use unique message IDs** + - UUIDs are recommended + - Helps with debugging and tracing + +3. **Send SessionStart first** + - Before any other messages + - Keep track of which sessions you've started + +4. **Handle all error codes** + - Don't just check `accepted` + - Log specific errors for debugging + +5. **Respect Resolved state** + - Don't send messages after resolve + - Cache the state locally to avoid unnecessary calls + +6. **Use GetSession to check state** + - Query session state before sending messages + - Useful for resuming after disconnection + +## Next Steps + +- Read [architecture.md](./architecture.md) to understand how this is implemented +- Read [examples.md](./examples.md) for practical code examples diff --git a/proto/macp.proto b/proto/macp.proto new file mode 100644 index 0000000..160ae9f --- /dev/null +++ b/proto/macp.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; + +package macp.v1; + +message Envelope { + string macp_version = 1; + string mode = 2; + string message_type = 3; + string message_id = 4; + string session_id = 5; + string sender = 6; + int64 timestamp_unix_ms = 7; + bytes payload = 8; +} + +message Ack { + bool accepted = 1; + string error = 2; +} + +message SessionQuery { + string session_id = 1; +} + +message SessionInfo { + string session_id = 1; + string mode = 2; + string state = 3; + int64 ttl_expiry = 4; + bytes resolution = 5; + bytes mode_state = 6; + repeated string participants = 7; +} + +service MACPService { + rpc SendMessage(Envelope) returns (Ack); + rpc GetSession(SessionQuery) returns (SessionInfo); +} diff --git a/src/bin/client.rs b/src/bin/client.rs new file mode 100644 index 0000000..e2779ba --- /dev/null +++ b/src/bin/client.rs @@ -0,0 +1,82 @@ +use macp_runtime::pb::macp_service_client::MacpServiceClient; +use macp_runtime::pb::Envelope; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Server address from main.rs + let mut client = MacpServiceClient::connect("http://127.0.0.1:50051").await?; + + // 1) SessionStart + let start = Envelope { + macp_version: "v1".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: "ajit".into(), + timestamp_unix_ms: 1_700_000_000_000, + payload: vec![], + }; + + let ack = client.send_message(start).await?.into_inner(); + println!( + "SessionStart ack: accepted={} error='{}'", + ack.accepted, ack.error + ); + + // 2) Normal message + let msg = Envelope { + macp_version: "v1".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m2".into(), + session_id: "s1".into(), + sender: "ajit".into(), + timestamp_unix_ms: 1_700_000_000_001, + payload: b"hello".to_vec(), + }; + + let ack = client.send_message(msg).await?.into_inner(); + println!( + "Message ack: accepted={} error='{}'", + ack.accepted, ack.error + ); + + // 3) Resolve message (DecisionMode resolves when payload == "resolve") + let resolve = Envelope { + macp_version: "v1".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m3".into(), + session_id: "s1".into(), + sender: "ajit".into(), + timestamp_unix_ms: 1_700_000_000_002, + payload: b"resolve".to_vec(), + }; + + let ack = client.send_message(resolve).await?.into_inner(); + println!( + "Resolve ack: accepted={} error='{}'", + ack.accepted, ack.error + ); + + // 4) Message after resolve (should be rejected: SessionNotOpen) + let after = Envelope { + macp_version: "v1".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m4".into(), + session_id: "s1".into(), + sender: "ajit".into(), + timestamp_unix_ms: 1_700_000_000_003, + payload: b"should-fail".to_vec(), + }; + + let ack = client.send_message(after).await?.into_inner(); + println!( + "After-resolve ack: accepted={} error='{}'", + ack.accepted, ack.error + ); + + Ok(()) +} diff --git a/src/bin/fuzz_client.rs b/src/bin/fuzz_client.rs new file mode 100644 index 0000000..6e317b6 --- /dev/null +++ b/src/bin/fuzz_client.rs @@ -0,0 +1,348 @@ +use macp_runtime::pb::macp_service_client::MacpServiceClient; +use macp_runtime::pb::Envelope; +use tokio::time::{sleep, Duration}; + +#[allow(clippy::too_many_arguments)] +fn env( + macp_version: &str, + mode: &str, + message_type: &str, + message_id: &str, + session_id: &str, + sender: &str, + ts: i64, + payload: &[u8], +) -> Envelope { + Envelope { + macp_version: macp_version.into(), + mode: mode.into(), + message_type: message_type.into(), + message_id: message_id.into(), + session_id: session_id.into(), + sender: sender.into(), + timestamp_unix_ms: ts, + payload: payload.to_vec(), + } +} + +async fn send(client: &mut MacpServiceClient, label: &str, e: Envelope) { + match client.send_message(e).await { + Ok(resp) => { + let ack = resp.into_inner(); + println!("[{label}] accepted={} error='{}'", ack.accepted, ack.error); + } + Err(status) => { + println!("[{label}] grpc error: {status}"); + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = MacpServiceClient::connect("http://127.0.0.1:50051").await?; + + // --- 1) Wrong MACP version (should reject InvalidMacpVersion) + send( + &mut client, + "wrong_version", + env( + "v0", + "decision", + "SessionStart", + "badv1", + "sv", + "ajit", + 1_700_000_000_100, + b"", + ), + ) + .await; + + // --- 2) Missing required fields (should reject InvalidEnvelope) + // message_id empty + sender empty + ts <= 0 + send( + &mut client, + "missing_fields", + env( + "v1", + "decision", + "SessionStart", + "", + "s_missing", + "", + 0, + b"", + ), + ) + .await; + + // --- 3) Message to unknown session (should reject UnknownSession) + send( + &mut client, + "unknown_session_message", + env( + "v1", + "decision", + "Message", + "m_unknown", + "no_such_session", + "ajit", + 1_700_000_000_101, + b"hello", + ), + ) + .await; + + // --- 4) Valid SessionStart (should accept) + send( + &mut client, + "session_start_ok", + env( + "v1", + "decision", + "SessionStart", + "m1", + "s1", + "ajit", + 1_700_000_000_102, + b"", + ), + ) + .await; + + // --- 5) Duplicate SessionStart (should reject DuplicateSession) + send( + &mut client, + "session_start_duplicate", + env( + "v1", + "decision", + "SessionStart", + "m1_dup", + "s1", + "ajit", + 1_700_000_000_103, + b"", + ), + ) + .await; + + // --- 6) Valid Message (should accept) + send( + &mut client, + "message_ok", + env( + "v1", + "decision", + "Message", + "m2", + "s1", + "ajit", + 1_700_000_000_104, + b"hello", + ), + ) + .await; + + // --- 7) Resolve (payload == "resolve" => session becomes RESOLVED) + send( + &mut client, + "resolve", + env( + "v1", + "decision", + "Message", + "m3", + "s1", + "ajit", + 1_700_000_000_105, + b"resolve", + ), + ) + .await; + + // --- 8) Message after resolved (should reject SessionNotOpen) + send( + &mut client, + "after_resolve", + env( + "v1", + "decision", + "Message", + "m4", + "s1", + "ajit", + 1_700_000_000_106, + b"should_fail", + ), + ) + .await; + + // --- 9) TTL Expiry: SessionStart with short TTL, wait, then send message + send( + &mut client, + "ttl_session_start", + env( + "v1", + "decision", + "SessionStart", + "m_ttl1", + "s_ttl", + "ajit", + 1_700_000_000_200, + br#"{"ttl_ms":1000}"#, + ), + ) + .await; + + sleep(Duration::from_millis(1200)).await; + + send( + &mut client, + "ttl_expired_message", + env( + "v1", + "decision", + "Message", + "m_ttl2", + "s_ttl", + "ajit", + 1_700_000_000_201, + b"should_expire", + ), + ) + .await; + + // --- 10) Invalid TTL values + send( + &mut client, + "invalid_ttl_zero", + env( + "v1", + "decision", + "SessionStart", + "m_bad_ttl1", + "s_bad_ttl1", + "ajit", + 1_700_000_000_300, + br#"{"ttl_ms":0}"#, + ), + ) + .await; + + send( + &mut client, + "invalid_ttl_negative", + env( + "v1", + "decision", + "SessionStart", + "m_bad_ttl2", + "s_bad_ttl2", + "ajit", + 1_700_000_000_301, + br#"{"ttl_ms":-5000}"#, + ), + ) + .await; + + send( + &mut client, + "invalid_ttl_exceeds_max", + env( + "v1", + "decision", + "SessionStart", + "m_bad_ttl3", + "s_bad_ttl3", + "ajit", + 1_700_000_000_302, + br#"{"ttl_ms":86400001}"#, + ), + ) + .await; + + // --- 11) Multi-round convergence test + let mr_payload = r#"{"participants":["alice","bob"],"convergence":{"type":"all_equal"}}"#; + send( + &mut client, + "multi_round_start", + env( + "v1", + "multi_round", + "SessionStart", + "m_mr0", + "s_mr", + "creator", + 1_700_000_000_400, + mr_payload.as_bytes(), + ), + ) + .await; + + send( + &mut client, + "multi_round_alice", + env( + "v1", + "multi_round", + "Contribute", + "m_mr1", + "s_mr", + "alice", + 1_700_000_000_401, + br#"{"value":"option_a"}"#, + ), + ) + .await; + + send( + &mut client, + "multi_round_bob_diff", + env( + "v1", + "multi_round", + "Contribute", + "m_mr2", + "s_mr", + "bob", + 1_700_000_000_402, + br#"{"value":"option_b"}"#, + ), + ) + .await; + + send( + &mut client, + "multi_round_bob_converge", + env( + "v1", + "multi_round", + "Contribute", + "m_mr3", + "s_mr", + "bob", + 1_700_000_000_403, + br#"{"value":"option_a"}"#, + ), + ) + .await; + + send( + &mut client, + "multi_round_after_resolve", + env( + "v1", + "multi_round", + "Contribute", + "m_mr4", + "s_mr", + "alice", + 1_700_000_000_404, + br#"{"value":"option_c"}"#, + ), + ) + .await; + + Ok(()) +} diff --git a/src/bin/multi_round_client.rs b/src/bin/multi_round_client.rs new file mode 100644 index 0000000..e9d699b --- /dev/null +++ b/src/bin/multi_round_client.rs @@ -0,0 +1,149 @@ +use macp_runtime::pb::macp_service_client::MacpServiceClient; +use macp_runtime::pb::{Envelope, SessionQuery}; + +async fn send(client: &mut MacpServiceClient, label: &str, e: Envelope) { + match client.send_message(e).await { + Ok(resp) => { + let ack = resp.into_inner(); + println!("[{label}] accepted={} error='{}'", ack.accepted, ack.error); + } + Err(status) => { + println!("[{label}] grpc error: {status}"); + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = MacpServiceClient::connect("http://127.0.0.1:50051").await?; + + println!("=== Multi-Round Convergence Demo ===\n"); + + // 1) SessionStart with multi_round mode + let payload = serde_json::json!({ + "participants": ["alice", "bob"], + "convergence": {"type": "all_equal"}, + "ttl_ms": 60000 + }); + send( + &mut client, + "session_start", + Envelope { + macp_version: "v1".into(), + mode: "multi_round".into(), + message_type: "SessionStart".into(), + message_id: "m0".into(), + session_id: "mr1".into(), + sender: "coordinator".into(), + timestamp_unix_ms: chrono::Utc::now().timestamp_millis(), + payload: payload.to_string().into_bytes(), + }, + ) + .await; + + // 2) Alice contributes "option_a" + send( + &mut client, + "alice_contributes_a", + Envelope { + macp_version: "v1".into(), + mode: "multi_round".into(), + message_type: "Contribute".into(), + message_id: "m1".into(), + session_id: "mr1".into(), + sender: "alice".into(), + timestamp_unix_ms: chrono::Utc::now().timestamp_millis(), + payload: br#"{"value":"option_a"}"#.to_vec(), + }, + ) + .await; + + // 3) Bob contributes "option_b" (no convergence yet) + send( + &mut client, + "bob_contributes_b", + Envelope { + macp_version: "v1".into(), + mode: "multi_round".into(), + message_type: "Contribute".into(), + message_id: "m2".into(), + session_id: "mr1".into(), + sender: "bob".into(), + timestamp_unix_ms: chrono::Utc::now().timestamp_millis(), + payload: br#"{"value":"option_b"}"#.to_vec(), + }, + ) + .await; + + // Query session state — should be Open + match client + .get_session(SessionQuery { + session_id: "mr1".into(), + }) + .await + { + Ok(resp) => { + let info = resp.into_inner(); + println!( + "[get_session] state={} mode={} participants={:?}", + info.state, info.mode, info.participants + ); + } + Err(status) => println!("[get_session] error: {status}"), + } + + // 4) Bob revises to "option_a" (convergence → auto-resolved) + send( + &mut client, + "bob_revises_to_a", + Envelope { + macp_version: "v1".into(), + mode: "multi_round".into(), + message_type: "Contribute".into(), + message_id: "m3".into(), + session_id: "mr1".into(), + sender: "bob".into(), + timestamp_unix_ms: chrono::Utc::now().timestamp_millis(), + payload: br#"{"value":"option_a"}"#.to_vec(), + }, + ) + .await; + + // Query session state — should be Resolved + match client + .get_session(SessionQuery { + session_id: "mr1".into(), + }) + .await + { + Ok(resp) => { + let info = resp.into_inner(); + println!( + "[get_session] state={} resolution={}", + info.state, + String::from_utf8_lossy(&info.resolution) + ); + } + Err(status) => println!("[get_session] error: {status}"), + } + + // 5) Another message — should be rejected: SessionNotOpen + send( + &mut client, + "after_convergence", + Envelope { + macp_version: "v1".into(), + mode: "multi_round".into(), + message_type: "Contribute".into(), + message_id: "m4".into(), + session_id: "mr1".into(), + sender: "alice".into(), + timestamp_unix_ms: chrono::Utc::now().timestamp_millis(), + payload: br#"{"value":"option_c"}"#.to_vec(), + }, + ) + .await; + + println!("\n=== Demo Complete ==="); + Ok(()) +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..2b4940c --- /dev/null +++ b/src/error.rs @@ -0,0 +1,25 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum MacpError { + #[error("InvalidMacpVersion")] + InvalidMacpVersion, + #[error("InvalidEnvelope")] + InvalidEnvelope, + #[error("DuplicateSession")] + DuplicateSession, + #[error("UnknownSession")] + UnknownSession, + #[error("SessionNotOpen")] + SessionNotOpen, + #[error("TtlExpired")] + TtlExpired, + #[error("InvalidTtl")] + InvalidTtl, + #[error("UnknownMode")] + UnknownMode, + #[error("InvalidModeState")] + InvalidModeState, + #[error("InvalidPayload")] + InvalidPayload, +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..57e9c24 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,10 @@ +pub mod pb { + tonic::include_proto!("macp.v1"); +} + +pub mod error; +pub mod log_store; +pub mod mode; +pub mod registry; +pub mod runtime; +pub mod session; diff --git a/src/log_store.rs b/src/log_store.rs new file mode 100644 index 0000000..65de2d7 --- /dev/null +++ b/src/log_store.rs @@ -0,0 +1,114 @@ +use std::collections::HashMap; +use tokio::sync::RwLock; + +#[derive(Clone, Debug, PartialEq)] +pub enum EntryKind { + Incoming, + Internal, +} + +#[derive(Clone, Debug)] +pub struct LogEntry { + pub message_id: String, + pub received_at_ms: i64, + pub sender: String, + pub message_type: String, + pub raw_payload: Vec, + pub entry_kind: EntryKind, +} + +pub struct LogStore { + logs: RwLock>>, +} + +impl Default for LogStore { + fn default() -> Self { + Self::new() + } +} + +impl LogStore { + pub fn new() -> Self { + Self { + logs: RwLock::new(HashMap::new()), + } + } + + /// Create an empty log for a session. + pub async fn create_session_log(&self, session_id: &str) { + let mut guard = self.logs.write().await; + guard.entry(session_id.to_string()).or_default(); + } + + /// Append a log entry. Auto-creates the session log if it doesn't exist. + pub async fn append(&self, session_id: &str, entry: LogEntry) { + let mut guard = self.logs.write().await; + guard.entry(session_id.to_string()).or_default().push(entry); + } + + /// Get the log for a session. Returns None if session was never logged. + pub async fn get_log(&self, session_id: &str) -> Option> { + let guard = self.logs.read().await; + guard.get(session_id).cloned() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn entry(id: &str, kind: EntryKind) -> LogEntry { + LogEntry { + message_id: id.into(), + received_at_ms: 1_700_000_000_000, + sender: "test".into(), + message_type: "Message".into(), + raw_payload: vec![], + entry_kind: kind, + } + } + + #[tokio::test] + async fn create_append_get_round_trip() { + let store = LogStore::new(); + store.create_session_log("s1").await; + store.append("s1", entry("m1", EntryKind::Incoming)).await; + store.append("s1", entry("m2", EntryKind::Incoming)).await; + + let log = store.get_log("s1").await.unwrap(); + assert_eq!(log.len(), 2); + assert_eq!(log[0].message_id, "m1"); + assert_eq!(log[1].message_id, "m2"); + } + + #[tokio::test] + async fn auto_create_on_append() { + let store = LogStore::new(); + // No explicit create_session_log call + store.append("s2", entry("m1", EntryKind::Incoming)).await; + + let log = store.get_log("s2").await.unwrap(); + assert_eq!(log.len(), 1); + } + + #[tokio::test] + async fn ordering_preserved() { + let store = LogStore::new(); + for i in 0..5 { + store + .append("s1", entry(&format!("m{}", i), EntryKind::Incoming)) + .await; + } + + let log = store.get_log("s1").await.unwrap(); + for (i, e) in log.iter().enumerate() { + assert_eq!(e.message_id, format!("m{}", i)); + } + } + + #[tokio::test] + async fn unknown_session_returns_none() { + let store = LogStore::new(); + assert!(store.get_log("nonexistent").await.is_none()); + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..67901f8 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,28 @@ +mod server; + +use macp_runtime::log_store::LogStore; +use macp_runtime::registry::SessionRegistry; +use macp_runtime::runtime::Runtime; +use server::MacpServer; +use std::sync::Arc; +use tonic::transport::Server; + +use macp_runtime::pb; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = "127.0.0.1:50051".parse()?; + let registry = Arc::new(SessionRegistry::new()); + let log_store = Arc::new(LogStore::new()); + let runtime = Arc::new(Runtime::new(registry, log_store)); + let svc = MacpServer::new(runtime); + + println!("macp-runtime v0.1 listening on {}", addr); + + Server::builder() + .add_service(pb::macp_service_server::MacpServiceServer::new(svc)) + .serve(addr) + .await?; + + Ok(()) +} diff --git a/src/mode/decision.rs b/src/mode/decision.rs new file mode 100644 index 0000000..263dff4 --- /dev/null +++ b/src/mode/decision.rs @@ -0,0 +1,96 @@ +use crate::error::MacpError; +use crate::mode::{Mode, ModeResponse}; +use crate::pb::Envelope; +use crate::session::Session; + +/// DecisionMode wraps the original `payload == b"resolve"` behavior. +/// This preserves backward compatibility for existing clients. +pub struct DecisionMode; + +impl Mode for DecisionMode { + fn on_session_start( + &self, + _session: &Session, + _env: &Envelope, + ) -> Result { + Ok(ModeResponse::NoOp) + } + + fn on_message(&self, _session: &Session, env: &Envelope) -> Result { + if env.payload == b"resolve" { + Ok(ModeResponse::Resolve(env.payload.clone())) + } else { + Ok(ModeResponse::NoOp) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::session::SessionState; + + fn test_session() -> Session { + Session { + session_id: "s1".into(), + state: SessionState::Open, + ttl_expiry: i64::MAX, + resolution: None, + mode: "decision".into(), + mode_state: vec![], + participants: vec![], + } + } + + fn test_envelope(payload: &[u8]) -> Envelope { + Envelope { + macp_version: "v1".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: "test".into(), + timestamp_unix_ms: 1_700_000_000_000, + payload: payload.to_vec(), + } + } + + #[test] + fn session_start_returns_noop() { + let mode = DecisionMode; + let session = test_session(); + let env = Envelope { + macp_version: "v1".into(), + mode: "decision".into(), + message_type: "SessionStart".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: "test".into(), + timestamp_unix_ms: 1_700_000_000_000, + payload: vec![], + }; + + let result = mode.on_session_start(&session, &env).unwrap(); + assert!(matches!(result, ModeResponse::NoOp)); + } + + #[test] + fn resolve_payload_returns_resolve() { + let mode = DecisionMode; + let session = test_session(); + let env = test_envelope(b"resolve"); + + let result = mode.on_message(&session, &env).unwrap(); + assert!(matches!(result, ModeResponse::Resolve(_))); + } + + #[test] + fn other_payload_returns_noop() { + let mode = DecisionMode; + let session = test_session(); + let env = test_envelope(b"hello world"); + + let result = mode.on_message(&session, &env).unwrap(); + assert!(matches!(result, ModeResponse::NoOp)); + } +} diff --git a/src/mode/mod.rs b/src/mode/mod.rs new file mode 100644 index 0000000..d455ff0 --- /dev/null +++ b/src/mode/mod.rs @@ -0,0 +1,33 @@ +pub mod decision; +pub mod multi_round; + +use crate::error::MacpError; +use crate::pb::Envelope; +use crate::session::Session; + +/// The result of a Mode processing a message. +/// The runtime applies this response to mutate session state. +#[derive(Debug)] +pub enum ModeResponse { + /// No state change needed. + NoOp, + /// Persist updated mode state. + PersistState(Vec), + /// Resolve the session with the given resolution data. + Resolve(Vec), + /// Persist mode state and resolve in one step. + PersistAndResolve { state: Vec, resolution: Vec }, +} + +/// Trait that coordination modes implement. +/// Modes receive immutable session references and return a ModeResponse. +/// The runtime kernel is responsible for applying the response. +pub trait Mode: Send + Sync { + fn on_session_start( + &self, + session: &Session, + env: &Envelope, + ) -> Result; + + fn on_message(&self, session: &Session, env: &Envelope) -> Result; +} diff --git a/src/mode/multi_round.rs b/src/mode/multi_round.rs new file mode 100644 index 0000000..9b06dd5 --- /dev/null +++ b/src/mode/multi_round.rs @@ -0,0 +1,474 @@ +use crate::error::MacpError; +use crate::mode::{Mode, ModeResponse}; +use crate::pb::Envelope; +use crate::session::Session; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +/// Convergence strategy configuration. +#[derive(Debug, Clone, Deserialize)] +pub struct ConvergenceConfig { + #[serde(rename = "type")] + pub convergence_type: String, +} + +/// SessionStart payload for multi_round mode. +#[derive(Debug, Clone, Deserialize)] +pub struct MultiRoundConfig { + pub participants: Vec, + pub convergence: ConvergenceConfig, + pub ttl_ms: Option, +} + +/// Internal state tracked across rounds. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MultiRoundState { + pub round: u64, + pub participants: Vec, + pub contributions: BTreeMap, +} + +/// Payload for Contribute messages. +#[derive(Debug, Clone, Deserialize)] +struct ContributePayload { + value: String, +} + +/// Resolution payload emitted on convergence. +#[derive(Debug, Serialize)] +struct ResolutionPayload { + converged_value: String, + round: u64, + #[serde(rename = "final")] + final_values: BTreeMap, +} + +pub struct MultiRoundMode; + +impl MultiRoundMode { + fn encode_state(state: &MultiRoundState) -> Vec { + serde_json::to_vec(state).expect("MultiRoundState is always serializable") + } + + fn decode_state(data: &[u8]) -> Result { + serde_json::from_slice(data).map_err(|_| MacpError::InvalidModeState) + } +} + +impl Mode for MultiRoundMode { + fn on_session_start( + &self, + _session: &Session, + env: &Envelope, + ) -> Result { + let text = std::str::from_utf8(&env.payload).map_err(|_| MacpError::InvalidPayload)?; + let config: MultiRoundConfig = + serde_json::from_str(text).map_err(|_| MacpError::InvalidPayload)?; + + if config.participants.is_empty() { + return Err(MacpError::InvalidPayload); + } + + if config.convergence.convergence_type != "all_equal" { + return Err(MacpError::InvalidPayload); + } + + let state = MultiRoundState { + round: 0, + participants: config.participants, + contributions: BTreeMap::new(), + }; + + Ok(ModeResponse::PersistState(Self::encode_state(&state))) + } + + fn on_message(&self, session: &Session, env: &Envelope) -> Result { + if env.message_type != "Contribute" { + return Ok(ModeResponse::NoOp); + } + + let mut state = Self::decode_state(&session.mode_state)?; + + let text = std::str::from_utf8(&env.payload).map_err(|_| MacpError::InvalidPayload)?; + let contribute: ContributePayload = + serde_json::from_str(text).map_err(|_| MacpError::InvalidPayload)?; + + // Check if the value changed from previous contribution + let previous = state.contributions.get(&env.sender); + let value_changed = previous.is_none_or(|prev| *prev != contribute.value); + + if value_changed { + state.round += 1; + state + .contributions + .insert(env.sender.clone(), contribute.value); + } + + // Check convergence: all participants contributed + all values identical + let all_contributed = state + .participants + .iter() + .all(|p| state.contributions.contains_key(p)); + + if all_contributed { + let values: Vec<&String> = state.contributions.values().collect(); + let all_equal = values.windows(2).all(|w| w[0] == w[1]); + + if all_equal { + let converged_value = values[0].clone(); + let resolution = ResolutionPayload { + converged_value, + round: state.round, + final_values: state.contributions.clone(), + }; + let resolution_bytes = serde_json::to_vec(&resolution) + .expect("ResolutionPayload is always serializable"); + return Ok(ModeResponse::PersistAndResolve { + state: Self::encode_state(&state), + resolution: resolution_bytes, + }); + } + } + + Ok(ModeResponse::PersistState(Self::encode_state(&state))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::session::SessionState; + + fn base_session() -> Session { + Session { + session_id: "s1".into(), + state: SessionState::Open, + ttl_expiry: i64::MAX, + resolution: None, + mode: "multi_round".into(), + mode_state: vec![], + participants: vec![], + } + } + + fn session_start_env(payload: &str) -> Envelope { + Envelope { + macp_version: "v1".into(), + mode: "multi_round".into(), + message_type: "SessionStart".into(), + message_id: "m0".into(), + session_id: "s1".into(), + sender: "creator".into(), + timestamp_unix_ms: 1_700_000_000_000, + payload: payload.as_bytes().to_vec(), + } + } + + fn contribute_env(sender: &str, value: &str) -> Envelope { + let payload = serde_json::json!({"value": value}).to_string(); + Envelope { + macp_version: "v1".into(), + mode: "multi_round".into(), + message_type: "Contribute".into(), + message_id: format!("m_{}", sender), + session_id: "s1".into(), + sender: sender.into(), + timestamp_unix_ms: 1_700_000_000_000, + payload: payload.into_bytes(), + } + } + + fn session_with_state(state: &MultiRoundState) -> Session { + let mut s = base_session(); + s.mode_state = MultiRoundMode::encode_state(state); + s + } + + #[test] + fn session_start_parses_valid_config() { + let mode = MultiRoundMode; + let session = base_session(); + let payload = r#"{"participants":["alice","bob"],"convergence":{"type":"all_equal"}}"#; + let env = session_start_env(payload); + + let result = mode.on_session_start(&session, &env).unwrap(); + match result { + ModeResponse::PersistState(data) => { + let state: MultiRoundState = serde_json::from_slice(&data).unwrap(); + assert_eq!(state.round, 0); + assert_eq!(state.participants, vec!["alice", "bob"]); + assert!(state.contributions.is_empty()); + } + _ => panic!("Expected PersistState"), + } + } + + #[test] + fn session_start_with_ttl() { + let mode = MultiRoundMode; + let session = base_session(); + let payload = + r#"{"participants":["alice"],"convergence":{"type":"all_equal"},"ttl_ms":5000}"#; + let env = session_start_env(payload); + + let result = mode.on_session_start(&session, &env).unwrap(); + assert!(matches!(result, ModeResponse::PersistState(_))); + } + + #[test] + fn session_start_rejects_empty_participants() { + let mode = MultiRoundMode; + let session = base_session(); + let payload = r#"{"participants":[],"convergence":{"type":"all_equal"}}"#; + let env = session_start_env(payload); + + let err = mode.on_session_start(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + #[test] + fn session_start_rejects_unknown_convergence() { + let mode = MultiRoundMode; + let session = base_session(); + let payload = r#"{"participants":["alice"],"convergence":{"type":"majority"}}"#; + let env = session_start_env(payload); + + let err = mode.on_session_start(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + #[test] + fn session_start_rejects_invalid_json() { + let mode = MultiRoundMode; + let session = base_session(); + let env = session_start_env("not json"); + + let err = mode.on_session_start(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + #[test] + fn contribute_first_value_increments_round() { + let mode = MultiRoundMode; + let state = MultiRoundState { + round: 0, + participants: vec!["alice".into(), "bob".into()], + contributions: BTreeMap::new(), + }; + let session = session_with_state(&state); + let env = contribute_env("alice", "option_a"); + + let result = mode.on_message(&session, &env).unwrap(); + match result { + ModeResponse::PersistState(data) => { + let new_state: MultiRoundState = serde_json::from_slice(&data).unwrap(); + assert_eq!(new_state.round, 1); + assert_eq!(new_state.contributions.get("alice").unwrap(), "option_a"); + } + _ => panic!("Expected PersistState"), + } + } + + #[test] + fn resubmit_same_value_does_not_increment_round() { + let mode = MultiRoundMode; + let mut contributions = BTreeMap::new(); + contributions.insert("alice".to_string(), "option_a".to_string()); + let state = MultiRoundState { + round: 1, + participants: vec!["alice".into(), "bob".into()], + contributions, + }; + let session = session_with_state(&state); + let env = contribute_env("alice", "option_a"); + + let result = mode.on_message(&session, &env).unwrap(); + match result { + ModeResponse::PersistState(data) => { + let new_state: MultiRoundState = serde_json::from_slice(&data).unwrap(); + assert_eq!(new_state.round, 1); // unchanged + } + _ => panic!("Expected PersistState"), + } + } + + #[test] + fn revise_value_increments_round() { + let mode = MultiRoundMode; + let mut contributions = BTreeMap::new(); + contributions.insert("alice".to_string(), "option_a".to_string()); + let state = MultiRoundState { + round: 1, + participants: vec!["alice".into(), "bob".into()], + contributions, + }; + let session = session_with_state(&state); + let env = contribute_env("alice", "option_b"); + + let result = mode.on_message(&session, &env).unwrap(); + match result { + ModeResponse::PersistState(data) => { + let new_state: MultiRoundState = serde_json::from_slice(&data).unwrap(); + assert_eq!(new_state.round, 2); + assert_eq!(new_state.contributions.get("alice").unwrap(), "option_b"); + } + _ => panic!("Expected PersistState"), + } + } + + #[test] + fn convergence_when_all_equal() { + let mode = MultiRoundMode; + let mut contributions = BTreeMap::new(); + contributions.insert("alice".to_string(), "option_a".to_string()); + let state = MultiRoundState { + round: 1, + participants: vec!["alice".into(), "bob".into()], + contributions, + }; + let session = session_with_state(&state); + let env = contribute_env("bob", "option_a"); + + let result = mode.on_message(&session, &env).unwrap(); + match result { + ModeResponse::PersistAndResolve { + state: state_bytes, + resolution, + } => { + let final_state: MultiRoundState = serde_json::from_slice(&state_bytes).unwrap(); + assert_eq!(final_state.round, 2); + + let res: serde_json::Value = serde_json::from_slice(&resolution).unwrap(); + assert_eq!(res["converged_value"], "option_a"); + assert_eq!(res["round"], 2); + assert_eq!(res["final"]["alice"], "option_a"); + assert_eq!(res["final"]["bob"], "option_a"); + } + _ => panic!("Expected PersistAndResolve"), + } + } + + #[test] + fn no_convergence_when_values_differ() { + let mode = MultiRoundMode; + let mut contributions = BTreeMap::new(); + contributions.insert("alice".to_string(), "option_a".to_string()); + let state = MultiRoundState { + round: 1, + participants: vec!["alice".into(), "bob".into()], + contributions, + }; + let session = session_with_state(&state); + let env = contribute_env("bob", "option_b"); + + let result = mode.on_message(&session, &env).unwrap(); + assert!(matches!(result, ModeResponse::PersistState(_))); + } + + #[test] + fn no_convergence_when_not_all_contributed() { + let mode = MultiRoundMode; + let state = MultiRoundState { + round: 0, + participants: vec!["alice".into(), "bob".into(), "carol".into()], + contributions: BTreeMap::new(), + }; + let session = session_with_state(&state); + let env = contribute_env("alice", "option_a"); + + let result = mode.on_message(&session, &env).unwrap(); + assert!(matches!(result, ModeResponse::PersistState(_))); + } + + #[test] + fn non_contribute_message_returns_noop() { + let mode = MultiRoundMode; + let state = MultiRoundState { + round: 0, + participants: vec!["alice".into()], + contributions: BTreeMap::new(), + }; + let session = session_with_state(&state); + let env = Envelope { + macp_version: "v1".into(), + mode: "multi_round".into(), + message_type: "Message".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: "alice".into(), + timestamp_unix_ms: 1_700_000_000_000, + payload: b"hello".to_vec(), + }; + + let result = mode.on_message(&session, &env).unwrap(); + assert!(matches!(result, ModeResponse::NoOp)); + } + + #[test] + fn contribute_invalid_payload_returns_error() { + let mode = MultiRoundMode; + let state = MultiRoundState { + round: 0, + participants: vec!["alice".into()], + contributions: BTreeMap::new(), + }; + let session = session_with_state(&state); + let env = Envelope { + macp_version: "v1".into(), + mode: "multi_round".into(), + message_type: "Contribute".into(), + message_id: "m1".into(), + session_id: "s1".into(), + sender: "alice".into(), + timestamp_unix_ms: 1_700_000_000_000, + payload: b"not json".to_vec(), + }; + + let err = mode.on_message(&session, &env).unwrap_err(); + assert_eq!(err.to_string(), "InvalidPayload"); + } + + #[test] + fn encode_decode_round_trip() { + let mut contributions = BTreeMap::new(); + contributions.insert("alice".into(), "value_a".into()); + let original = MultiRoundState { + round: 5, + participants: vec!["alice".into(), "bob".into()], + contributions, + }; + + let encoded = MultiRoundMode::encode_state(&original); + let decoded = MultiRoundMode::decode_state(&encoded).unwrap(); + + assert_eq!(decoded.round, original.round); + assert_eq!(decoded.participants, original.participants); + assert_eq!(decoded.contributions, original.contributions); + } + + #[test] + fn decode_invalid_state_returns_error() { + let err = MultiRoundMode::decode_state(b"garbage").unwrap_err(); + assert_eq!(err.to_string(), "InvalidModeState"); + } + + #[test] + fn three_participant_convergence() { + let mode = MultiRoundMode; + + // Alice and Bob already contributed "option_a" + let mut contributions = BTreeMap::new(); + contributions.insert("alice".to_string(), "option_a".to_string()); + contributions.insert("bob".to_string(), "option_a".to_string()); + let state = MultiRoundState { + round: 2, + participants: vec!["alice".into(), "bob".into(), "carol".into()], + contributions, + }; + let session = session_with_state(&state); + let env = contribute_env("carol", "option_a"); + + let result = mode.on_message(&session, &env).unwrap(); + assert!(matches!(result, ModeResponse::PersistAndResolve { .. })); + } +} diff --git a/src/registry.rs b/src/registry.rs new file mode 100644 index 0000000..1e8528e --- /dev/null +++ b/src/registry.rs @@ -0,0 +1,33 @@ +use crate::session::Session; +use std::collections::HashMap; +use tokio::sync::RwLock; + +pub struct SessionRegistry { + pub(crate) sessions: RwLock>, +} + +impl Default for SessionRegistry { + fn default() -> Self { + Self::new() + } +} + +impl SessionRegistry { + pub fn new() -> Self { + Self { + sessions: RwLock::new(HashMap::new()), + } + } + + /// Get a clone of a session by ID. Returns None if not found. + pub async fn get_session(&self, session_id: &str) -> Option { + let guard = self.sessions.read().await; + guard.get(session_id).cloned() + } + + /// Insert a session directly. Used by tests and for pre-populating state. + pub async fn insert_session_for_test(&self, session_id: String, session: Session) { + let mut guard = self.sessions.write().await; + guard.insert(session_id, session); + } +} diff --git a/src/runtime.rs b/src/runtime.rs new file mode 100644 index 0000000..797692b --- /dev/null +++ b/src/runtime.rs @@ -0,0 +1,395 @@ +use chrono::Utc; +use std::collections::HashMap; +use std::sync::Arc; + +use crate::error::MacpError; +use crate::log_store::{EntryKind, LogEntry, LogStore}; +use crate::mode::decision::DecisionMode; +use crate::mode::multi_round::MultiRoundMode; +use crate::mode::{Mode, ModeResponse}; +use crate::pb::Envelope; +use crate::registry::SessionRegistry; +use crate::session::{parse_session_start_ttl_ms, Session, SessionState}; + +pub struct Runtime { + pub registry: Arc, + pub log_store: Arc, + modes: HashMap>, +} + +impl Runtime { + pub fn new(registry: Arc, log_store: Arc) -> Self { + let mut modes: HashMap> = HashMap::new(); + modes.insert("decision".into(), Box::new(DecisionMode)); + modes.insert("multi_round".into(), Box::new(MultiRoundMode)); + + Self { + registry, + log_store, + modes, + } + } + + fn resolve_mode_name(mode_field: &str) -> &str { + if mode_field.is_empty() { + "decision" + } else { + mode_field + } + } + + fn make_incoming_entry(env: &Envelope) -> LogEntry { + LogEntry { + message_id: env.message_id.clone(), + received_at_ms: Utc::now().timestamp_millis(), + sender: env.sender.clone(), + message_type: env.message_type.clone(), + raw_payload: env.payload.clone(), + entry_kind: EntryKind::Incoming, + } + } + + fn make_internal_entry(message_type: &str, payload: &[u8]) -> LogEntry { + LogEntry { + message_id: String::new(), + received_at_ms: Utc::now().timestamp_millis(), + sender: "_runtime".into(), + message_type: message_type.into(), + raw_payload: payload.to_vec(), + entry_kind: EntryKind::Internal, + } + } + + fn apply_mode_response(session: &mut Session, response: ModeResponse) { + match response { + ModeResponse::NoOp => {} + ModeResponse::PersistState(s) => { + session.mode_state = s; + } + ModeResponse::Resolve(r) => { + session.state = SessionState::Resolved; + session.resolution = Some(r); + } + ModeResponse::PersistAndResolve { state, resolution } => { + session.mode_state = state; + session.state = SessionState::Resolved; + session.resolution = Some(resolution); + } + } + } + + pub async fn process(&self, env: &Envelope) -> Result<(), MacpError> { + if env.message_type == "SessionStart" { + self.process_session_start(env).await + } else { + self.process_message(env).await + } + } + + async fn process_session_start(&self, env: &Envelope) -> Result<(), MacpError> { + let mode_name = Self::resolve_mode_name(&env.mode); + let mode = self.modes.get(mode_name).ok_or(MacpError::UnknownMode)?; + + let ttl_ms = parse_session_start_ttl_ms(&env.payload)?; + + let mut guard = self.registry.sessions.write().await; + + if guard.contains_key(&env.session_id) { + return Err(MacpError::DuplicateSession); + } + + let ttl_expiry = Utc::now().timestamp_millis() + ttl_ms; + + // Create session log and append incoming entry + self.log_store.create_session_log(&env.session_id).await; + self.log_store + .append(&env.session_id, Self::make_incoming_entry(env)) + .await; + + // Create session with initial state + let session = Session { + session_id: env.session_id.clone(), + state: SessionState::Open, + ttl_expiry, + resolution: None, + mode: mode_name.to_string(), + mode_state: vec![], + participants: vec![], + }; + + // Call mode's on_session_start + let response = mode.on_session_start(&session, env)?; + + // Insert session and apply response + let mut session = session; + Self::apply_mode_response(&mut session, response); + + // Extract participants from mode_state for multi_round + if mode_name == "multi_round" && !session.mode_state.is_empty() { + if let Ok(state) = serde_json::from_slice::( + &session.mode_state, + ) { + session.participants = state.participants.clone(); + } + } + + guard.insert(env.session_id.clone(), session); + + Ok(()) + } + + async fn process_message(&self, env: &Envelope) -> Result<(), MacpError> { + let mut guard = self.registry.sessions.write().await; + + let session = guard + .get_mut(&env.session_id) + .ok_or(MacpError::UnknownSession)?; + + // TTL check + let now = Utc::now().timestamp_millis(); + if session.state == SessionState::Open && now > session.ttl_expiry { + // Log internal TTL expiry event before state mutation + self.log_store + .append( + &env.session_id, + Self::make_internal_entry("TtlExpired", b""), + ) + .await; + session.state = SessionState::Expired; + return Err(MacpError::TtlExpired); + } + + if session.state != SessionState::Open { + return Err(MacpError::SessionNotOpen); + } + + // Log incoming message before mode dispatch + self.log_store + .append(&env.session_id, Self::make_incoming_entry(env)) + .await; + + let mode_name = session.mode.clone(); + let mode = self.modes.get(&mode_name).ok_or(MacpError::UnknownMode)?; + + let response = mode.on_message(session, env)?; + Self::apply_mode_response(session, response); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_runtime() -> Runtime { + let registry = Arc::new(SessionRegistry::new()); + let log_store = Arc::new(LogStore::new()); + Runtime::new(registry, log_store) + } + + fn env( + mode: &str, + message_type: &str, + message_id: &str, + session_id: &str, + sender: &str, + payload: &[u8], + ) -> Envelope { + Envelope { + macp_version: "v1".into(), + mode: mode.into(), + message_type: message_type.into(), + message_id: message_id.into(), + session_id: session_id.into(), + sender: sender.into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: payload.to_vec(), + } + } + + #[tokio::test] + async fn decision_mode_full_flow() { + let rt = make_runtime(); + + // SessionStart + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + // Normal message + let e = env("decision", "Message", "m2", "s1", "alice", b"hello"); + rt.process(&e).await.unwrap(); + + // Resolve + let e = env("decision", "Message", "m3", "s1", "alice", b"resolve"); + rt.process(&e).await.unwrap(); + + // After resolve + let e = env("decision", "Message", "m4", "s1", "alice", b"nope"); + let err = rt.process(&e).await.unwrap_err(); + assert_eq!(err.to_string(), "SessionNotOpen"); + } + + #[tokio::test] + async fn empty_mode_defaults_to_decision() { + let rt = make_runtime(); + + let e = env("", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let guard = rt.registry.sessions.read().await; + assert_eq!(guard["s1"].mode, "decision"); + } + + #[tokio::test] + async fn unknown_mode_rejected() { + let rt = make_runtime(); + + let e = env("nonexistent", "SessionStart", "m1", "s1", "alice", b""); + let err = rt.process(&e).await.unwrap_err(); + assert_eq!(err.to_string(), "UnknownMode"); + } + + #[tokio::test] + async fn multi_round_flow() { + let rt = make_runtime(); + + let payload = r#"{"participants":["alice","bob"],"convergence":{"type":"all_equal"}}"#; + let e = env( + "multi_round", + "SessionStart", + "m0", + "s1", + "creator", + payload.as_bytes(), + ); + rt.process(&e).await.unwrap(); + + // Alice contributes + let e = env( + "multi_round", + "Contribute", + "m1", + "s1", + "alice", + br#"{"value":"option_a"}"#, + ); + rt.process(&e).await.unwrap(); + + // Bob contributes different value — no convergence + let e = env( + "multi_round", + "Contribute", + "m2", + "s1", + "bob", + br#"{"value":"option_b"}"#, + ); + rt.process(&e).await.unwrap(); + + { + let guard = rt.registry.sessions.read().await; + assert_eq!(guard["s1"].state, SessionState::Open); + } + + // Bob revises to match alice — convergence + let e = env( + "multi_round", + "Contribute", + "m3", + "s1", + "bob", + br#"{"value":"option_a"}"#, + ); + rt.process(&e).await.unwrap(); + + { + let guard = rt.registry.sessions.read().await; + assert_eq!(guard["s1"].state, SessionState::Resolved); + let resolution = guard["s1"].resolution.as_ref().unwrap(); + let res: serde_json::Value = serde_json::from_slice(resolution).unwrap(); + assert_eq!(res["converged_value"], "option_a"); + } + } + + #[tokio::test] + async fn mode_response_apply_noop() { + let mut session = Session { + session_id: "s".into(), + state: SessionState::Open, + ttl_expiry: i64::MAX, + resolution: None, + mode: "decision".into(), + mode_state: vec![], + participants: vec![], + }; + Runtime::apply_mode_response(&mut session, ModeResponse::NoOp); + assert_eq!(session.state, SessionState::Open); + assert!(session.resolution.is_none()); + } + + #[tokio::test] + async fn mode_response_apply_persist_and_resolve() { + let mut session = Session { + session_id: "s".into(), + state: SessionState::Open, + ttl_expiry: i64::MAX, + resolution: None, + mode: "multi_round".into(), + mode_state: vec![], + participants: vec![], + }; + Runtime::apply_mode_response( + &mut session, + ModeResponse::PersistAndResolve { + state: b"new_state".to_vec(), + resolution: b"resolved_data".to_vec(), + }, + ); + assert_eq!(session.state, SessionState::Resolved); + assert_eq!(session.mode_state, b"new_state"); + assert_eq!(session.resolution, Some(b"resolved_data".to_vec())); + } + + #[tokio::test] + async fn log_before_mutate_ordering() { + let rt = make_runtime(); + + let e = env("decision", "SessionStart", "m1", "s1", "alice", b""); + rt.process(&e).await.unwrap(); + + let log = rt.log_store.get_log("s1").await.unwrap(); + assert_eq!(log.len(), 1); + assert_eq!(log[0].message_type, "SessionStart"); + assert_eq!(log[0].entry_kind, EntryKind::Incoming); + } + + #[tokio::test] + async fn ttl_expiry_logs_internal_entry() { + let rt = make_runtime(); + + // Create session with very short TTL + let e = env( + "decision", + "SessionStart", + "m1", + "s1", + "alice", + br#"{"ttl_ms":1}"#, + ); + rt.process(&e).await.unwrap(); + + // Wait for TTL to expire + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + let e = env("decision", "Message", "m2", "s1", "alice", b"hello"); + let err = rt.process(&e).await.unwrap_err(); + assert_eq!(err.to_string(), "TtlExpired"); + + let log = rt.log_store.get_log("s1").await.unwrap(); + // Should have: SessionStart (incoming) + TtlExpired (internal) + assert_eq!(log.len(), 2); + assert_eq!(log[1].entry_kind, EntryKind::Internal); + assert_eq!(log[1].message_type, "TtlExpired"); + } +} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..723a6fd --- /dev/null +++ b/src/server.rs @@ -0,0 +1,231 @@ +use macp_runtime::error::MacpError; +use macp_runtime::pb::macp_service_server::MacpService; +use macp_runtime::pb::{Ack, Envelope, SessionInfo, SessionQuery}; +use macp_runtime::runtime::Runtime; +use macp_runtime::session::SessionState; +use std::sync::Arc; +use tonic::{Request, Response, Status}; + +pub struct MacpServer { + runtime: Arc, +} + +impl MacpServer { + pub fn new(runtime: Arc) -> Self { + Self { runtime } + } + + fn validate(env: &Envelope) -> Result<(), MacpError> { + if env.macp_version != "v1" { + return Err(MacpError::InvalidMacpVersion); + } + if env.session_id.is_empty() || env.message_id.is_empty() { + return Err(MacpError::InvalidEnvelope); + } + Ok(()) + } +} + +#[tonic::async_trait] +impl MacpService for MacpServer { + async fn send_message(&self, request: Request) -> Result, Status> { + let env = request.into_inner(); + + let result = async { + Self::validate(&env)?; + self.runtime.process(&env).await + } + .await; + + let ack = match result { + Ok(_) => Ack { + accepted: true, + error: "".into(), + }, + Err(e) => Ack { + accepted: false, + error: e.to_string(), + }, + }; + + Ok(Response::new(ack)) + } + + async fn get_session( + &self, + request: Request, + ) -> Result, Status> { + let query = request.into_inner(); + + match self.runtime.registry.get_session(&query.session_id).await { + Some(session) => { + let state_str = match session.state { + SessionState::Open => "Open", + SessionState::Resolved => "Resolved", + SessionState::Expired => "Expired", + }; + + Ok(Response::new(SessionInfo { + session_id: session.session_id.clone(), + mode: session.mode.clone(), + state: state_str.into(), + ttl_expiry: session.ttl_expiry, + resolution: session.resolution.clone().unwrap_or_default(), + mode_state: session.mode_state.clone(), + participants: session.participants.clone(), + })) + } + None => Err(Status::not_found(format!( + "Session '{}' not found", + query.session_id + ))), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use macp_runtime::log_store::LogStore; + use macp_runtime::registry::SessionRegistry; + use macp_runtime::session::Session; + + fn make_server() -> (MacpServer, Arc) { + let registry = Arc::new(SessionRegistry::new()); + let log_store = Arc::new(LogStore::new()); + let runtime = Arc::new(Runtime::new(registry, log_store)); + let server = MacpServer::new(runtime.clone()); + (server, runtime) + } + + #[tokio::test] + async fn expired_session_transitions_to_expired() { + let (_, runtime) = make_server(); + let server = MacpServer::new(runtime.clone()); + + // Insert a session that expired 1 second ago + let expired_ttl = Utc::now().timestamp_millis() - 1000; + runtime + .registry + .insert_session_for_test( + "s_expired".into(), + Session { + session_id: "s_expired".into(), + state: SessionState::Open, + ttl_expiry: expired_ttl, + resolution: None, + mode: "decision".into(), + mode_state: vec![], + participants: vec![], + }, + ) + .await; + + let env = Envelope { + macp_version: "v1".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m1".into(), + session_id: "s_expired".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: b"hello".to_vec(), + }; + + let resp = server.send_message(Request::new(env)).await.unwrap(); + let ack = resp.into_inner(); + assert!(!ack.accepted); + assert_eq!(ack.error, "TtlExpired"); + + // Verify the session state was set to Expired + let s = runtime.registry.get_session("s_expired").await.unwrap(); + assert_eq!(s.state, SessionState::Expired); + } + + #[tokio::test] + async fn non_expired_session_stays_open() { + let (_, runtime) = make_server(); + let server = MacpServer::new(runtime.clone()); + + // Insert a session that expires far in the future + let future_ttl = Utc::now().timestamp_millis() + 60_000; + runtime + .registry + .insert_session_for_test( + "s_alive".into(), + Session { + session_id: "s_alive".into(), + state: SessionState::Open, + ttl_expiry: future_ttl, + resolution: None, + mode: "decision".into(), + mode_state: vec![], + participants: vec![], + }, + ) + .await; + + let env = Envelope { + macp_version: "v1".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m1".into(), + session_id: "s_alive".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: b"hello".to_vec(), + }; + + let resp = server.send_message(Request::new(env)).await.unwrap(); + let ack = resp.into_inner(); + assert!(ack.accepted); + + let s = runtime.registry.get_session("s_alive").await.unwrap(); + assert_eq!(s.state, SessionState::Open); + } + + #[tokio::test] + async fn resolved_session_not_overwritten_to_expired() { + let (_, runtime) = make_server(); + let server = MacpServer::new(runtime.clone()); + + // Insert a resolved session with an expired TTL + let expired_ttl = Utc::now().timestamp_millis() - 1000; + runtime + .registry + .insert_session_for_test( + "s_resolved".into(), + Session { + session_id: "s_resolved".into(), + state: SessionState::Resolved, + ttl_expiry: expired_ttl, + resolution: Some(b"resolve".to_vec()), + mode: "decision".into(), + mode_state: vec![], + participants: vec![], + }, + ) + .await; + + let env = Envelope { + macp_version: "v1".into(), + mode: "decision".into(), + message_type: "Message".into(), + message_id: "m1".into(), + session_id: "s_resolved".into(), + sender: "test".into(), + timestamp_unix_ms: Utc::now().timestamp_millis(), + payload: b"hello".to_vec(), + }; + + let resp = server.send_message(Request::new(env)).await.unwrap(); + let ack = resp.into_inner(); + assert!(!ack.accepted); + assert_eq!(ack.error, "SessionNotOpen"); + + // State should still be Resolved, not overwritten to Expired + let s = runtime.registry.get_session("s_resolved").await.unwrap(); + assert_eq!(s.state, SessionState::Resolved); + } +} diff --git a/src/session.rs b/src/session.rs new file mode 100644 index 0000000..a3ad16a --- /dev/null +++ b/src/session.rs @@ -0,0 +1,130 @@ +use crate::error::MacpError; +use serde::Deserialize; + +pub const DEFAULT_TTL_MS: i64 = 60_000; +pub const MAX_TTL_MS: i64 = 24 * 60 * 60 * 1000; + +#[derive(Clone, Debug, PartialEq)] +pub enum SessionState { + Open, + Resolved, + Expired, +} + +#[derive(Clone, Debug)] +pub struct Session { + pub session_id: String, + pub state: SessionState, + pub ttl_expiry: i64, + pub resolution: Option>, + pub mode: String, + pub mode_state: Vec, + pub participants: Vec, +} + +#[derive(Debug, Deserialize)] +struct SessionStartConfig { + ttl_ms: Option, +} + +pub fn parse_session_start_ttl_ms(payload: &[u8]) -> Result { + if payload.is_empty() { + return Ok(DEFAULT_TTL_MS); + } + + let text = std::str::from_utf8(payload).map_err(|_| MacpError::InvalidEnvelope)?; + let config: SessionStartConfig = + serde_json::from_str(text).map_err(|_| MacpError::InvalidEnvelope)?; + + match config.ttl_ms { + None => Ok(DEFAULT_TTL_MS), + Some(ms) if !(1..=MAX_TTL_MS).contains(&ms) => Err(MacpError::InvalidTtl), + Some(ms) => Ok(ms), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_ttl_empty_payload_returns_default() { + let result = parse_session_start_ttl_ms(b""); + assert_eq!(result.unwrap(), DEFAULT_TTL_MS); + } + + #[test] + fn parse_ttl_valid_json_with_ttl() { + let payload = br#"{"ttl_ms": 5000}"#; + assert_eq!(parse_session_start_ttl_ms(payload).unwrap(), 5000); + } + + #[test] + fn parse_ttl_json_missing_field_returns_default() { + let payload = br#"{}"#; + assert_eq!(parse_session_start_ttl_ms(payload).unwrap(), DEFAULT_TTL_MS); + } + + #[test] + fn parse_ttl_json_null_field_returns_default() { + let payload = br#"{"ttl_ms": null}"#; + assert_eq!(parse_session_start_ttl_ms(payload).unwrap(), DEFAULT_TTL_MS); + } + + #[test] + fn parse_ttl_boundary_min_valid() { + let payload = br#"{"ttl_ms": 1}"#; + assert_eq!(parse_session_start_ttl_ms(payload).unwrap(), 1); + } + + #[test] + fn parse_ttl_boundary_max_valid() { + let payload = format!(r#"{{"ttl_ms": {}}}"#, MAX_TTL_MS); + assert_eq!( + parse_session_start_ttl_ms(payload.as_bytes()).unwrap(), + MAX_TTL_MS + ); + } + + #[test] + fn parse_ttl_zero_returns_invalid() { + let payload = br#"{"ttl_ms": 0}"#; + let err = parse_session_start_ttl_ms(payload).unwrap_err(); + assert_eq!(err.to_string(), "InvalidTtl"); + } + + #[test] + fn parse_ttl_negative_returns_invalid() { + let payload = br#"{"ttl_ms": -5000}"#; + let err = parse_session_start_ttl_ms(payload).unwrap_err(); + assert_eq!(err.to_string(), "InvalidTtl"); + } + + #[test] + fn parse_ttl_exceeds_max_returns_invalid() { + let payload = format!(r#"{{"ttl_ms": {}}}"#, MAX_TTL_MS + 1); + let err = parse_session_start_ttl_ms(payload.as_bytes()).unwrap_err(); + assert_eq!(err.to_string(), "InvalidTtl"); + } + + #[test] + fn parse_ttl_invalid_utf8_returns_invalid_envelope() { + let payload: &[u8] = &[0xff, 0xfe, 0xfd]; + let err = parse_session_start_ttl_ms(payload).unwrap_err(); + assert_eq!(err.to_string(), "InvalidEnvelope"); + } + + #[test] + fn parse_ttl_invalid_json_returns_invalid_envelope() { + let payload = b"not json at all"; + let err = parse_session_start_ttl_ms(payload).unwrap_err(); + assert_eq!(err.to_string(), "InvalidEnvelope"); + } + + #[test] + fn parse_ttl_wrong_type_returns_invalid_envelope() { + let payload = br#"{"ttl_ms": "five thousand"}"#; + let err = parse_session_start_ttl_ms(payload).unwrap_err(); + assert_eq!(err.to_string(), "InvalidEnvelope"); + } +} From ed0734938436a57f7473416d220a92056e6fee5b Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Thu, 5 Mar 2026 12:43:52 -0800 Subject: [PATCH 2/5] Add CI/CD --- .github/ISSUE_TEMPLATE/bug_report.yml | 31 ++++ .github/ISSUE_TEMPLATE/rfc_proposal.yml | 31 ++++ .github/PULL_REQUEST_TEMPLATE.md | 24 ++++ .github/workflows/ci.yml | 182 ++++++++++++++++++++++++ 4 files changed, 268 insertions(+) create mode 100644 .github/ISSUE_TEMPLATE/bug_report.yml create mode 100644 .github/ISSUE_TEMPLATE/rfc_proposal.yml create mode 100644 .github/PULL_REQUEST_TEMPLATE.md create mode 100644 .github/workflows/ci.yml diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml new file mode 100644 index 0000000..c7e4ec0 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -0,0 +1,31 @@ + +name: Bug Report +description: Report a problem with the MACP runtime +title: "[BUG] " +labels: ["bug"] +body: + - type: textarea + id: description + attributes: + label: Description + description: Describe the issue clearly. + validations: + required: true + + - type: textarea + id: reproduction + attributes: + label: Reproduction Steps + description: Steps or example envelope/client code that demonstrates the issue. + + - type: textarea + id: expected + attributes: + label: Expected Behavior + description: What should happen according to the protocol? + + - type: textarea + id: environment + attributes: + label: Environment + description: Rust version, OS, protoc version, etc. diff --git a/.github/ISSUE_TEMPLATE/rfc_proposal.yml b/.github/ISSUE_TEMPLATE/rfc_proposal.yml new file mode 100644 index 0000000..34abd54 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/rfc_proposal.yml @@ -0,0 +1,31 @@ + +name: RFC / Enhancement Proposal +description: Propose a change or extension to the MACP runtime +title: "[RFC] " +labels: ["enhancement"] +body: + - type: textarea + id: summary + attributes: + label: Summary + description: High-level overview of the proposal. + validations: + required: true + + - type: textarea + id: motivation + attributes: + label: Motivation + description: Why is this needed? + + - type: textarea + id: specification + attributes: + label: Proposed Changes + description: Detailed specification changes. + + - type: textarea + id: compatibility + attributes: + label: Backward Compatibility + description: Does this introduce breaking changes? diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..6f2e022 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,24 @@ + +## Summary + +Describe the changes introduced by this pull request. + +## Type of Change + +- [ ] New feature +- [ ] Bug fix +- [ ] Refactoring +- [ ] New mode implementation +- [ ] Proto schema change +- [ ] Documentation improvement +- [ ] CI / tooling change + +## Checklist + +- [ ] `cargo test` passes +- [ ] `cargo clippy` has no warnings +- [ ] `cargo fmt` has been run +- [ ] Protobuf definitions compile successfully +- [ ] New modes implement the `Mode` trait correctly (if applicable) +- [ ] Error variants added to `MacpError` (if applicable) +- [ ] This change preserves MACP Core invariants diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..8a4d5c0 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,182 @@ +name: MACP Runtime CI + +on: + pull_request: + push: + branches: [ main ] + +env: + CARGO_TERM_COLOR: always + +jobs: + check: + name: Check + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Cache cargo registry and build + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + restore-keys: ${{ runner.os }}-cargo- + + - name: Install protoc + run: | + sudo apt-get update + sudo apt-get install -y protobuf-compiler + + - name: Cargo check + run: cargo check --all-targets + + fmt: + name: Format + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt + + - name: Check formatting + run: cargo fmt --all -- --check + + clippy: + name: Clippy + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + with: + components: clippy + + - name: Cache cargo registry and build + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + restore-keys: ${{ runner.os }}-cargo- + + - name: Install protoc + run: | + sudo apt-get update + sudo apt-get install -y protobuf-compiler + + - name: Run clippy + run: cargo clippy --all-targets -- -D warnings + + test: + name: Test + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Cache cargo registry and build + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + restore-keys: ${{ runner.os }}-cargo- + + - name: Install protoc + run: | + sudo apt-get update + sudo apt-get install -y protobuf-compiler + + - name: Run tests + run: cargo test --all-targets + + build: + name: Build + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Cache cargo registry and build + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + restore-keys: ${{ runner.os }}-cargo- + + - name: Install protoc + run: | + sudo apt-get update + sudo apt-get install -y protobuf-compiler + + - name: Build release + run: cargo build --release + + lint-protobuf: + name: Lint Protocol Buffers + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install buf + uses: bufbuild/buf-setup-action@v1 + with: + github_token: ${{ secrets.GITHUB_TOKEN }} + + - name: Lint protobuf with buf + run: buf lint + + - name: Check for breaking changes + if: github.event_name == 'pull_request' + run: | + git fetch origin main + buf breaking --against '.git#branch=origin/main' + + ci-pass: + name: All Checks Passed + runs-on: ubuntu-latest + needs: [check, fmt, clippy, test, build, lint-protobuf] + + steps: + - name: Summary + run: | + echo "All checks passed successfully" + echo " - cargo check" + echo " - cargo fmt" + echo " - cargo clippy" + echo " - cargo test" + echo " - cargo build --release" + echo " - protobuf lint" From bb87589c81cd16d2b9f0eea1259bdac8d5a699eb Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Thu, 5 Mar 2026 12:48:18 -0800 Subject: [PATCH 3/5] Fixing CI/CD issue --- buf.yaml | 3 +++ build.rs | 2 +- proto/{ => macp/v1}/macp.proto | 14 ++++++---- src/bin/client.rs | 30 +++++++++++++++++---- src/bin/fuzz_client.rs | 13 ++++++--- src/bin/multi_round_client.rs | 17 ++++++++---- src/server.rs | 49 +++++++++++++++++++++++++--------- 7 files changed, 96 insertions(+), 32 deletions(-) create mode 100644 buf.yaml rename proto/{ => macp/v1}/macp.proto (65%) diff --git a/buf.yaml b/buf.yaml new file mode 100644 index 0000000..ba5ddf5 --- /dev/null +++ b/buf.yaml @@ -0,0 +1,3 @@ +version: v2 +modules: + - path: proto diff --git a/build.rs b/build.rs index 73024d6..1404cba 100644 --- a/build.rs +++ b/build.rs @@ -1,6 +1,6 @@ fn main() -> Result<(), Box> { tonic_build::configure() .build_server(true) - .compile(&["proto/macp.proto"], &["proto"])?; + .compile(&["macp/v1/macp.proto"], &["proto"])?; Ok(()) } diff --git a/proto/macp.proto b/proto/macp/v1/macp.proto similarity index 65% rename from proto/macp.proto rename to proto/macp/v1/macp.proto index 160ae9f..794156c 100644 --- a/proto/macp.proto +++ b/proto/macp/v1/macp.proto @@ -13,16 +13,20 @@ message Envelope { bytes payload = 8; } -message Ack { +message SendMessageRequest { + Envelope envelope = 1; +} + +message SendMessageResponse { bool accepted = 1; string error = 2; } -message SessionQuery { +message GetSessionRequest { string session_id = 1; } -message SessionInfo { +message GetSessionResponse { string session_id = 1; string mode = 2; string state = 3; @@ -33,6 +37,6 @@ message SessionInfo { } service MACPService { - rpc SendMessage(Envelope) returns (Ack); - rpc GetSession(SessionQuery) returns (SessionInfo); + rpc SendMessage(SendMessageRequest) returns (SendMessageResponse); + rpc GetSession(GetSessionRequest) returns (GetSessionResponse); } diff --git a/src/bin/client.rs b/src/bin/client.rs index e2779ba..a1d1621 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -1,5 +1,5 @@ use macp_runtime::pb::macp_service_client::MacpServiceClient; -use macp_runtime::pb::Envelope; +use macp_runtime::pb::{Envelope, SendMessageRequest}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -18,7 +18,12 @@ async fn main() -> Result<(), Box> { payload: vec![], }; - let ack = client.send_message(start).await?.into_inner(); + let ack = client + .send_message(SendMessageRequest { + envelope: Some(start), + }) + .await? + .into_inner(); println!( "SessionStart ack: accepted={} error='{}'", ack.accepted, ack.error @@ -36,7 +41,12 @@ async fn main() -> Result<(), Box> { payload: b"hello".to_vec(), }; - let ack = client.send_message(msg).await?.into_inner(); + let ack = client + .send_message(SendMessageRequest { + envelope: Some(msg), + }) + .await? + .into_inner(); println!( "Message ack: accepted={} error='{}'", ack.accepted, ack.error @@ -54,7 +64,12 @@ async fn main() -> Result<(), Box> { payload: b"resolve".to_vec(), }; - let ack = client.send_message(resolve).await?.into_inner(); + let ack = client + .send_message(SendMessageRequest { + envelope: Some(resolve), + }) + .await? + .into_inner(); println!( "Resolve ack: accepted={} error='{}'", ack.accepted, ack.error @@ -72,7 +87,12 @@ async fn main() -> Result<(), Box> { payload: b"should-fail".to_vec(), }; - let ack = client.send_message(after).await?.into_inner(); + let ack = client + .send_message(SendMessageRequest { + envelope: Some(after), + }) + .await? + .into_inner(); println!( "After-resolve ack: accepted={} error='{}'", ack.accepted, ack.error diff --git a/src/bin/fuzz_client.rs b/src/bin/fuzz_client.rs index 6e317b6..174a870 100644 --- a/src/bin/fuzz_client.rs +++ b/src/bin/fuzz_client.rs @@ -1,5 +1,5 @@ use macp_runtime::pb::macp_service_client::MacpServiceClient; -use macp_runtime::pb::Envelope; +use macp_runtime::pb::{Envelope, SendMessageRequest}; use tokio::time::{sleep, Duration}; #[allow(clippy::too_many_arguments)] @@ -25,8 +25,15 @@ fn env( } } -async fn send(client: &mut MacpServiceClient, label: &str, e: Envelope) { - match client.send_message(e).await { +async fn send( + client: &mut MacpServiceClient, + label: &str, + e: Envelope, +) { + let req = SendMessageRequest { + envelope: Some(e), + }; + match client.send_message(req).await { Ok(resp) => { let ack = resp.into_inner(); println!("[{label}] accepted={} error='{}'", ack.accepted, ack.error); diff --git a/src/bin/multi_round_client.rs b/src/bin/multi_round_client.rs index e9d699b..94baf90 100644 --- a/src/bin/multi_round_client.rs +++ b/src/bin/multi_round_client.rs @@ -1,8 +1,15 @@ use macp_runtime::pb::macp_service_client::MacpServiceClient; -use macp_runtime::pb::{Envelope, SessionQuery}; +use macp_runtime::pb::{Envelope, GetSessionRequest, SendMessageRequest}; -async fn send(client: &mut MacpServiceClient, label: &str, e: Envelope) { - match client.send_message(e).await { +async fn send( + client: &mut MacpServiceClient, + label: &str, + e: Envelope, +) { + let req = SendMessageRequest { + envelope: Some(e), + }; + match client.send_message(req).await { Ok(resp) => { let ack = resp.into_inner(); println!("[{label}] accepted={} error='{}'", ack.accepted, ack.error); @@ -77,7 +84,7 @@ async fn main() -> Result<(), Box> { // Query session state — should be Open match client - .get_session(SessionQuery { + .get_session(GetSessionRequest { session_id: "mr1".into(), }) .await @@ -111,7 +118,7 @@ async fn main() -> Result<(), Box> { // Query session state — should be Resolved match client - .get_session(SessionQuery { + .get_session(GetSessionRequest { session_id: "mr1".into(), }) .await diff --git a/src/server.rs b/src/server.rs index 723a6fd..d89d240 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,8 @@ use macp_runtime::error::MacpError; use macp_runtime::pb::macp_service_server::MacpService; -use macp_runtime::pb::{Ack, Envelope, SessionInfo, SessionQuery}; +use macp_runtime::pb::{ + Envelope, GetSessionRequest, GetSessionResponse, SendMessageRequest, SendMessageResponse, +}; use macp_runtime::runtime::Runtime; use macp_runtime::session::SessionState; use std::sync::Arc; @@ -28,8 +30,14 @@ impl MacpServer { #[tonic::async_trait] impl MacpService for MacpServer { - async fn send_message(&self, request: Request) -> Result, Status> { - let env = request.into_inner(); + async fn send_message( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + let env = req + .envelope + .ok_or_else(|| Status::invalid_argument("missing envelope"))?; let result = async { Self::validate(&env)?; @@ -37,24 +45,24 @@ impl MacpService for MacpServer { } .await; - let ack = match result { - Ok(_) => Ack { + let resp = match result { + Ok(_) => SendMessageResponse { accepted: true, error: "".into(), }, - Err(e) => Ack { + Err(e) => SendMessageResponse { accepted: false, error: e.to_string(), }, }; - Ok(Response::new(ack)) + Ok(Response::new(resp)) } async fn get_session( &self, - request: Request, - ) -> Result, Status> { + request: Request, + ) -> Result, Status> { let query = request.into_inner(); match self.runtime.registry.get_session(&query.session_id).await { @@ -65,7 +73,7 @@ impl MacpService for MacpServer { SessionState::Expired => "Expired", }; - Ok(Response::new(SessionInfo { + Ok(Response::new(GetSessionResponse { session_id: session.session_id.clone(), mode: session.mode.clone(), state: state_str.into(), @@ -99,6 +107,12 @@ mod tests { (server, runtime) } + fn wrap(env: Envelope) -> SendMessageRequest { + SendMessageRequest { + envelope: Some(env), + } + } + #[tokio::test] async fn expired_session_transitions_to_expired() { let (_, runtime) = make_server(); @@ -133,7 +147,10 @@ mod tests { payload: b"hello".to_vec(), }; - let resp = server.send_message(Request::new(env)).await.unwrap(); + let resp = server + .send_message(Request::new(wrap(env))) + .await + .unwrap(); let ack = resp.into_inner(); assert!(!ack.accepted); assert_eq!(ack.error, "TtlExpired"); @@ -177,7 +194,10 @@ mod tests { payload: b"hello".to_vec(), }; - let resp = server.send_message(Request::new(env)).await.unwrap(); + let resp = server + .send_message(Request::new(wrap(env))) + .await + .unwrap(); let ack = resp.into_inner(); assert!(ack.accepted); @@ -219,7 +239,10 @@ mod tests { payload: b"hello".to_vec(), }; - let resp = server.send_message(Request::new(env)).await.unwrap(); + let resp = server + .send_message(Request::new(wrap(env))) + .await + .unwrap(); let ack = resp.into_inner(); assert!(!ack.accepted); assert_eq!(ack.error, "SessionNotOpen"); From 18186b111260c87a4b4cbafcbade4cf4d8d088e8 Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Thu, 5 Mar 2026 12:53:17 -0800 Subject: [PATCH 4/5] Fixing CI/CD Buf Issue --- .githooks/pre-commit | 11 +++++++++++ .github/workflows/ci.yml | 4 ++-- Makefile | 20 ++++++++++++++++++++ buf.yaml | 3 --- proto/buf.yaml | 7 +++++++ src/bin/fuzz_client.rs | 10 ++-------- src/bin/multi_round_client.rs | 10 ++-------- src/server.rs | 15 +++------------ 8 files changed, 47 insertions(+), 33 deletions(-) create mode 100755 .githooks/pre-commit create mode 100644 Makefile delete mode 100644 buf.yaml create mode 100644 proto/buf.yaml diff --git a/.githooks/pre-commit b/.githooks/pre-commit new file mode 100755 index 0000000..25bbb51 --- /dev/null +++ b/.githooks/pre-commit @@ -0,0 +1,11 @@ +#!/usr/bin/env bash +set -e + +echo "Running cargo fmt check..." +cargo fmt --all -- --check +if [ $? -ne 0 ]; then + echo "" + echo "Commit rejected: code is not formatted." + echo "Run 'cargo fmt --all' to fix formatting, then try again." + exit 1 +fi diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8a4d5c0..a76d9b5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -157,13 +157,13 @@ jobs: github_token: ${{ secrets.GITHUB_TOKEN }} - name: Lint protobuf with buf - run: buf lint + run: buf lint proto - name: Check for breaking changes if: github.event_name == 'pull_request' run: | git fetch origin main - buf breaking --against '.git#branch=origin/main' + buf breaking proto --against '.git#branch=origin/main,subdir=proto' ci-pass: name: All Checks Passed diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e8d4253 --- /dev/null +++ b/Makefile @@ -0,0 +1,20 @@ +.PHONY: setup build test fmt clippy check + +## First-time setup: configure git hooks +setup: + git config core.hooksPath .githooks + @echo "Git hooks configured." + +build: + cargo build + +test: + cargo test + +fmt: + cargo fmt --all + +clippy: + cargo clippy --all-targets -- -D warnings + +check: fmt clippy test diff --git a/buf.yaml b/buf.yaml deleted file mode 100644 index ba5ddf5..0000000 --- a/buf.yaml +++ /dev/null @@ -1,3 +0,0 @@ -version: v2 -modules: - - path: proto diff --git a/proto/buf.yaml b/proto/buf.yaml new file mode 100644 index 0000000..0ed83f4 --- /dev/null +++ b/proto/buf.yaml @@ -0,0 +1,7 @@ +version: v1 +lint: + use: + - STANDARD +breaking: + use: + - FILE diff --git a/src/bin/fuzz_client.rs b/src/bin/fuzz_client.rs index 174a870..508deef 100644 --- a/src/bin/fuzz_client.rs +++ b/src/bin/fuzz_client.rs @@ -25,14 +25,8 @@ fn env( } } -async fn send( - client: &mut MacpServiceClient, - label: &str, - e: Envelope, -) { - let req = SendMessageRequest { - envelope: Some(e), - }; +async fn send(client: &mut MacpServiceClient, label: &str, e: Envelope) { + let req = SendMessageRequest { envelope: Some(e) }; match client.send_message(req).await { Ok(resp) => { let ack = resp.into_inner(); diff --git a/src/bin/multi_round_client.rs b/src/bin/multi_round_client.rs index 94baf90..00265ae 100644 --- a/src/bin/multi_round_client.rs +++ b/src/bin/multi_round_client.rs @@ -1,14 +1,8 @@ use macp_runtime::pb::macp_service_client::MacpServiceClient; use macp_runtime::pb::{Envelope, GetSessionRequest, SendMessageRequest}; -async fn send( - client: &mut MacpServiceClient, - label: &str, - e: Envelope, -) { - let req = SendMessageRequest { - envelope: Some(e), - }; +async fn send(client: &mut MacpServiceClient, label: &str, e: Envelope) { + let req = SendMessageRequest { envelope: Some(e) }; match client.send_message(req).await { Ok(resp) => { let ack = resp.into_inner(); diff --git a/src/server.rs b/src/server.rs index d89d240..11bc534 100644 --- a/src/server.rs +++ b/src/server.rs @@ -147,10 +147,7 @@ mod tests { payload: b"hello".to_vec(), }; - let resp = server - .send_message(Request::new(wrap(env))) - .await - .unwrap(); + let resp = server.send_message(Request::new(wrap(env))).await.unwrap(); let ack = resp.into_inner(); assert!(!ack.accepted); assert_eq!(ack.error, "TtlExpired"); @@ -194,10 +191,7 @@ mod tests { payload: b"hello".to_vec(), }; - let resp = server - .send_message(Request::new(wrap(env))) - .await - .unwrap(); + let resp = server.send_message(Request::new(wrap(env))).await.unwrap(); let ack = resp.into_inner(); assert!(ack.accepted); @@ -239,10 +233,7 @@ mod tests { payload: b"hello".to_vec(), }; - let resp = server - .send_message(Request::new(wrap(env))) - .await - .unwrap(); + let resp = server.send_message(Request::new(wrap(env))).await.unwrap(); let ack = resp.into_inner(); assert!(!ack.accepted); assert_eq!(ack.error, "SessionNotOpen"); From dbcedcdf2b66b38496509296b7e7bac778bb9207 Mon Sep 17 00:00:00 2001 From: Ajit Koti Date: Thu, 5 Mar 2026 12:55:22 -0800 Subject: [PATCH 5/5] Fixing CI/CD Buf Issue --- .github/workflows/ci.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a76d9b5..2228476 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -163,7 +163,12 @@ jobs: if: github.event_name == 'pull_request' run: | git fetch origin main - buf breaking proto --against '.git#branch=origin/main,subdir=proto' + # Skip if main branch doesn't have a buf module yet + if git show origin/main:proto/buf.yaml > /dev/null 2>&1; then + buf breaking proto --against '.git#branch=origin/main,subdir=proto' + else + echo "No buf module found on main branch, skipping breaking change check" + fi ci-pass: name: All Checks Passed