From 9257e2371aa25aad1adea54a5ff3fcfae56ce84e Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Thu, 21 May 2026 19:35:19 +0200 Subject: [PATCH 1/2] feat(rmcp): Move MCP server builder from core to rmcp crate --- Cargo.lock | 5 +- Cargo.toml | 4 +- md/design.md | 5 +- .../Cargo.toml | 1 + .../tests/mcp_integration/proxy.rs | 1 + .../tests/mcp_server_handler_chain.rs | 1 + .../tests/scoped_mcp_server.rs | 1 + .../tests/standalone_mcp_server.rs | 1 + .../tests/test_mcp_tool_output_types.rs | 1 + .../tests/test_session_id_in_mcp_tools.rs | 1 + .../tests/test_tool_enable_disable.rs | 1 + .../tests/test_tool_fn.rs | 1 + .../tests/trace_client_mcp_server.rs | 1 + src/agent-client-protocol-cookbook/src/lib.rs | 14 ++-- src/agent-client-protocol-rmcp/CHANGELOG.md | 4 ++ src/agent-client-protocol-rmcp/Cargo.toml | 7 +- src/agent-client-protocol-rmcp/README.md | 13 +++- .../src}/builder.rs | 68 +++++++++---------- src/agent-client-protocol-rmcp/src/lib.rs | 40 +++++++---- .../src}/responder.rs | 23 ++++--- src/agent-client-protocol/CHANGELOG.md | 4 ++ src/agent-client-protocol/Cargo.toml | 5 +- src/agent-client-protocol/README.md | 1 + .../src/concepts/proxies.rs | 6 +- .../src/concepts/sessions.rs | 3 +- src/agent-client-protocol/src/lib.rs | 10 ++- .../src/mcp_server/mod.rs | 41 ++++------- .../src/mcp_server/server.rs | 31 +++------ .../src/mcp_server/tool.rs | 4 +- src/agent-client-protocol/src/session.rs | 6 +- 30 files changed, 170 insertions(+), 134 deletions(-) rename src/{agent-client-protocol/src/mcp_server => agent-client-protocol-rmcp/src}/builder.rs (90%) rename src/{agent-client-protocol/src/mcp_server => agent-client-protocol-rmcp/src}/responder.rs (86%) diff --git a/Cargo.lock b/Cargo.lock index 179f588b..bd1043e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -16,7 +16,6 @@ dependencies = [ "futures", "futures-concurrency", "jsonrpcmsg", - "rmcp", "rustc-hash", "schemars 1.2.1", "serde", @@ -34,6 +33,7 @@ version = "0.12.1" dependencies = [ "agent-client-protocol", "agent-client-protocol-polyfill", + "agent-client-protocol-rmcp", "agent-client-protocol-test", "agent-client-protocol-trace-viewer", "agent-client-protocol-yopo", @@ -101,10 +101,13 @@ name = "agent-client-protocol-rmcp" version = "0.11.2" dependencies = [ "agent-client-protocol", + "futures", "futures-concurrency", "rmcp", + "rustc-hash", "schemars 1.2.1", "serde", + "serde_json", "tokio", "tokio-util", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 9a5c89c7..1cf3ea04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,11 +2,11 @@ members = [ "src/agent-client-protocol", "src/agent-client-protocol-derive", - # Keep polyfill before conductor: release-plz currently ignores versioned + # Keep polyfill and rmcp before conductor: release-plz currently ignores versioned # dev-dependency edges, but cargo publish still validates them. "src/agent-client-protocol-polyfill", - "src/agent-client-protocol-conductor", "src/agent-client-protocol-rmcp", + "src/agent-client-protocol-conductor", "src/agent-client-protocol-test", "src/agent-client-protocol-trace-viewer", "src/yopo", diff --git a/md/design.md b/md/design.md index 65c44311..05268d0a 100644 --- a/md/design.md +++ b/md/design.md @@ -14,7 +14,7 @@ The core SDK. Provides: - **Connection builders** (`builder()`, `connect_to()`, `connect_with()`) - **Message handling** (`on_receive_request`, `on_receive_notification`, `on_receive_dispatch`) - **Protocol types** (`agent_client_protocol::schema::*`) - all ACP message types -- **MCP server builder** - for adding tools to proxies +- **MCP server attachment** - runtime-agnostic interfaces for wiring MCP servers into ACP sessions ### agent-client-protocol-tokio @@ -27,6 +27,7 @@ Tokio-specific utilities: Integration with the [rmcp](https://docs.rs/rmcp) crate: +- **`McpServer::builder()`** - define MCP tools in Rust code - **`McpServer::from_rmcp()`** - wrap an rmcp server as an ACP MCP server ## Role System @@ -156,7 +157,7 @@ stateDiagram-v2 | `src/agent-client-protocol/src/component.rs` | ConnectTo and Builder traits | | `src/agent-client-protocol/src/handler.rs` | Connection builder implementation | | `src/agent-client-protocol/src/typed.rs` | Dispatch type and handler matching | -| `src/agent-client-protocol/src/mcp_server/` | MCP server builder | +| `src/agent-client-protocol/src/mcp_server/` | Runtime-agnostic MCP server attachment | | `src/agent-client-protocol/src/concepts/` | Rustdoc concept explanations | ## Design Decisions diff --git a/src/agent-client-protocol-conductor/Cargo.toml b/src/agent-client-protocol-conductor/Cargo.toml index b248723b..389e312b 100644 --- a/src/agent-client-protocol-conductor/Cargo.toml +++ b/src/agent-client-protocol-conductor/Cargo.toml @@ -37,6 +37,7 @@ uuid.workspace = true [dev-dependencies] agent-client-protocol = { workspace = true, features = ["unstable_mcp_over_acp"] } +agent-client-protocol-rmcp.workspace = true agent-client-protocol-test.workspace = true yopo.workspace = true expect-test.workspace = true diff --git a/src/agent-client-protocol-conductor/tests/mcp_integration/proxy.rs b/src/agent-client-protocol-conductor/tests/mcp_integration/proxy.rs index 1359cf18..77c62d4b 100644 --- a/src/agent-client-protocol-conductor/tests/mcp_integration/proxy.rs +++ b/src/agent-client-protocol-conductor/tests/mcp_integration/proxy.rs @@ -2,6 +2,7 @@ use agent_client_protocol::mcp_server::McpServer; use agent_client_protocol::{Conductor, ConnectTo, Proxy}; +use agent_client_protocol_rmcp::McpServerExt as _; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; diff --git a/src/agent-client-protocol-conductor/tests/mcp_server_handler_chain.rs b/src/agent-client-protocol-conductor/tests/mcp_server_handler_chain.rs index a55b404c..d91b4993 100644 --- a/src/agent-client-protocol-conductor/tests/mcp_server_handler_chain.rs +++ b/src/agent-client-protocol-conductor/tests/mcp_server_handler_chain.rs @@ -12,6 +12,7 @@ use agent_client_protocol::schema::{ }; use agent_client_protocol::{Agent, Client, Conductor, ConnectTo, DynConnectTo, Proxy}; use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; +use agent_client_protocol_rmcp::McpServerExt as _; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::path::PathBuf; diff --git a/src/agent-client-protocol-conductor/tests/scoped_mcp_server.rs b/src/agent-client-protocol-conductor/tests/scoped_mcp_server.rs index 549beca9..c052078f 100644 --- a/src/agent-client-protocol-conductor/tests/scoped_mcp_server.rs +++ b/src/agent-client-protocol-conductor/tests/scoped_mcp_server.rs @@ -8,6 +8,7 @@ use agent_client_protocol::mcp_server::McpServer; use agent_client_protocol::{Agent, Conductor, ConnectTo, Proxy, Role, RunWithConnectionTo}; use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; +use agent_client_protocol_rmcp::McpServerExt as _; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; diff --git a/src/agent-client-protocol-conductor/tests/standalone_mcp_server.rs b/src/agent-client-protocol-conductor/tests/standalone_mcp_server.rs index 4501c977..9fff00e0 100644 --- a/src/agent-client-protocol-conductor/tests/standalone_mcp_server.rs +++ b/src/agent-client-protocol-conductor/tests/standalone_mcp_server.rs @@ -6,6 +6,7 @@ use agent_client_protocol::{ ByteStreams, ConnectTo, RunWithConnectionTo, mcp_server::McpServer, role::mcp, util::run_until, }; +use agent_client_protocol_rmcp::McpServerExt as _; use rmcp::{ClientHandler, ServiceExt, model::ClientInfo}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; diff --git a/src/agent-client-protocol-conductor/tests/test_mcp_tool_output_types.rs b/src/agent-client-protocol-conductor/tests/test_mcp_tool_output_types.rs index 791f069e..bf65b65f 100644 --- a/src/agent-client-protocol-conductor/tests/test_mcp_tool_output_types.rs +++ b/src/agent-client-protocol-conductor/tests/test_mcp_tool_output_types.rs @@ -7,6 +7,7 @@ use agent_client_protocol::mcp_server::McpServer; use agent_client_protocol::{Conductor, ConnectTo, DynConnectTo, Proxy, RunWithConnectionTo}; use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; +use agent_client_protocol_rmcp::McpServerExt as _; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; diff --git a/src/agent-client-protocol-conductor/tests/test_session_id_in_mcp_tools.rs b/src/agent-client-protocol-conductor/tests/test_session_id_in_mcp_tools.rs index 5d6a5f52..c10d59d6 100644 --- a/src/agent-client-protocol-conductor/tests/test_session_id_in_mcp_tools.rs +++ b/src/agent-client-protocol-conductor/tests/test_session_id_in_mcp_tools.rs @@ -13,6 +13,7 @@ use agent_client_protocol::mcp_server::McpServer; use agent_client_protocol::{Conductor, ConnectTo, DynConnectTo, Proxy}; use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; +use agent_client_protocol_rmcp::McpServerExt as _; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; diff --git a/src/agent-client-protocol-conductor/tests/test_tool_enable_disable.rs b/src/agent-client-protocol-conductor/tests/test_tool_enable_disable.rs index 6c68b7ba..0ccfd1e2 100644 --- a/src/agent-client-protocol-conductor/tests/test_tool_enable_disable.rs +++ b/src/agent-client-protocol-conductor/tests/test_tool_enable_disable.rs @@ -7,6 +7,7 @@ use agent_client_protocol::mcp_server::McpServer; use agent_client_protocol::{Conductor, ConnectTo, DynConnectTo, Proxy, RunWithConnectionTo}; use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; +use agent_client_protocol_rmcp::McpServerExt as _; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; diff --git a/src/agent-client-protocol-conductor/tests/test_tool_fn.rs b/src/agent-client-protocol-conductor/tests/test_tool_fn.rs index b9128479..27ae6149 100644 --- a/src/agent-client-protocol-conductor/tests/test_tool_fn.rs +++ b/src/agent-client-protocol-conductor/tests/test_tool_fn.rs @@ -7,6 +7,7 @@ use agent_client_protocol::mcp_server::McpServer; use agent_client_protocol::{Conductor, ConnectTo, DynConnectTo, Proxy, RunWithConnectionTo}; use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; +use agent_client_protocol_rmcp::McpServerExt as _; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; diff --git a/src/agent-client-protocol-conductor/tests/trace_client_mcp_server.rs b/src/agent-client-protocol-conductor/tests/trace_client_mcp_server.rs index 471a3628..f338adf5 100644 --- a/src/agent-client-protocol-conductor/tests/trace_client_mcp_server.rs +++ b/src/agent-client-protocol-conductor/tests/trace_client_mcp_server.rs @@ -15,6 +15,7 @@ use agent_client_protocol::{Client, Role, RunWithConnectionTo}; use agent_client_protocol_conductor::trace::TraceEvent; use agent_client_protocol_conductor::{ConductorImpl, ProxiesAndAgent}; use agent_client_protocol_polyfill::mcp_over_acp::McpOverAcpPolyfill; +use agent_client_protocol_rmcp::McpServerExt as _; use agent_client_protocol_test::testy::{Testy, TestyCommand}; use expect_test::expect; use futures::StreamExt; diff --git a/src/agent-client-protocol-cookbook/src/lib.rs b/src/agent-client-protocol-cookbook/src/lib.rs index 9d19b5a6..11655abe 100644 --- a/src/agent-client-protocol-cookbook/src/lib.rs +++ b/src/agent-client-protocol-cookbook/src/lib.rs @@ -455,6 +455,7 @@ pub mod global_mcp_server { //! //! ``` //! use agent_client_protocol::mcp_server::McpServer; + //! use agent_client_protocol_rmcp::McpServerExt; //! use agent_client_protocol::{ConnectTo, RunWithConnectionTo, Proxy, Conductor}; //! use schemars::JsonSchema; //! use serde::{Deserialize, Serialize}; @@ -556,7 +557,7 @@ pub mod global_mcp_server { //! 2. Passes the modified request through to the next handler //! 3. Handles incoming MCP protocol messages (tool calls, etc.) for its URL //! - //! [`McpServer::builder`]: agent_client_protocol::mcp_server::McpServer::builder + //! [`McpServer::builder`]: agent_client_protocol_rmcp::McpServerExt::builder //! [`McpServer::from_rmcp`]: agent_client_protocol_rmcp::McpServerExt::from_rmcp //! [`with_mcp_server`]: agent_client_protocol::Builder::with_mcp_server } @@ -581,6 +582,7 @@ pub mod per_session_mcp_server { //! //! ``` //! use agent_client_protocol::mcp_server::McpServer; + //! use agent_client_protocol_rmcp::McpServerExt; //! use agent_client_protocol::schema::NewSessionRequest; //! use agent_client_protocol::{Client, Proxy, Conductor, ConnectTo}; //! @@ -639,6 +641,7 @@ pub mod per_session_mcp_server { //! //! ``` //! # use agent_client_protocol::mcp_server::McpServer; + //! # use agent_client_protocol_rmcp::McpServerExt; //! # use agent_client_protocol::schema::NewSessionRequest; //! # use agent_client_protocol::{Client, Proxy, Conductor, ConnectTo}; //! # async fn run_proxy(transport: impl ConnectTo) -> Result<(), agent_client_protocol::Error> { @@ -694,6 +697,7 @@ pub mod filtering_tools { //! //! ``` //! use agent_client_protocol::mcp_server::McpServer; + //! use agent_client_protocol_rmcp::McpServerExt; //! use agent_client_protocol::{Conductor, RunWithConnectionTo}; //! use schemars::JsonSchema; //! use serde::Deserialize; @@ -730,6 +734,7 @@ pub mod filtering_tools { //! //! ``` //! use agent_client_protocol::mcp_server::McpServer; + //! use agent_client_protocol_rmcp::McpServerExt; //! use agent_client_protocol::{Conductor, RunWithConnectionTo}; //! use schemars::JsonSchema; //! use serde::Deserialize; @@ -763,6 +768,7 @@ pub mod filtering_tools { //! //! ``` //! use agent_client_protocol::mcp_server::McpServer; + //! use agent_client_protocol_rmcp::McpServerExt; //! use agent_client_protocol::Conductor; //! //! // This will error because "ech" is not a registered tool @@ -775,9 +781,9 @@ pub mod filtering_tools { //! Calling enable/disable on an already enabled/disabled tool is not an error - //! the operations are idempotent. //! - //! [`disable_tool`]: agent_client_protocol::mcp_server::McpServerBuilder::disable_tool - //! [`enable_tool`]: agent_client_protocol::mcp_server::McpServerBuilder::enable_tool - //! [`disable_all_tools`]: agent_client_protocol::mcp_server::McpServerBuilder::disable_all_tools + //! [`disable_tool`]: agent_client_protocol_rmcp::McpServerBuilder::disable_tool + //! [`enable_tool`]: agent_client_protocol_rmcp::McpServerBuilder::enable_tool + //! [`disable_all_tools`]: agent_client_protocol_rmcp::McpServerBuilder::disable_all_tools } pub mod running_proxies_with_conductor { diff --git a/src/agent-client-protocol-rmcp/CHANGELOG.md b/src/agent-client-protocol-rmcp/CHANGELOG.md index 7db895d6..5ec1a169 100644 --- a/src/agent-client-protocol-rmcp/CHANGELOG.md +++ b/src/agent-client-protocol-rmcp/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Add the MCP server builder APIs moved out of `agent-client-protocol`, keeping `rmcp` and Tokio dependencies in this integration crate. + ## [0.11.2](https://github.com/agentclientprotocol/rust-sdk/compare/agent-client-protocol-rmcp-v0.11.1...agent-client-protocol-rmcp-v0.11.2) - 2026-05-16 ### Other diff --git a/src/agent-client-protocol-rmcp/Cargo.toml b/src/agent-client-protocol-rmcp/Cargo.toml index bc84c54e..d885e172 100644 --- a/src/agent-client-protocol-rmcp/Cargo.toml +++ b/src/agent-client-protocol-rmcp/Cargo.toml @@ -12,14 +12,17 @@ categories = ["development-tools"] [dependencies] agent-client-protocol.workspace = true +futures.workspace = true futures-concurrency.workspace = true rmcp.workspace = true +rustc-hash.workspace = true +schemars.workspace = true +serde.workspace = true +serde_json.workspace = true tokio.workspace = true tokio-util.workspace = true [dev-dependencies] -schemars.workspace = true -serde.workspace = true tracing.workspace = true tracing-subscriber.workspace = true diff --git a/src/agent-client-protocol-rmcp/README.md b/src/agent-client-protocol-rmcp/README.md index 7c2738ae..b72807a6 100644 --- a/src/agent-client-protocol-rmcp/README.md +++ b/src/agent-client-protocol-rmcp/README.md @@ -4,11 +4,20 @@ ## Overview -This crate bridges [rmcp](https://docs.rs/rmcp)-based MCP server implementations with the ACP MCP server framework from `agent-client-protocol`. It lets you use any rmcp service as an MCP server in an ACP proxy. +This crate bridges [rmcp](https://docs.rs/rmcp)-based MCP server implementations with the ACP MCP server framework from `agent-client-protocol`. It lets you define MCP tools in Rust or use any rmcp service as an MCP server in an ACP proxy. ## Usage -Use the `McpServerExt` trait to create an MCP server from an rmcp service: +Use the `McpServerExt` trait to build an MCP server with tools: + +```rust +use agent_client_protocol::mcp_server::McpServer; +use agent_client_protocol_rmcp::McpServerExt; + +let server = McpServer::builder("my-tools").build(); +``` + +Or create an MCP server from an rmcp service: ```rust use agent_client_protocol::mcp_server::McpServer; diff --git a/src/agent-client-protocol/src/mcp_server/builder.rs b/src/agent-client-protocol-rmcp/src/builder.rs similarity index 90% rename from src/agent-client-protocol/src/mcp_server/builder.rs rename to src/agent-client-protocol-rmcp/src/builder.rs index 733b4c16..0bdef3e9 100644 --- a/src/agent-client-protocol/src/mcp_server/builder.rs +++ b/src/agent-client-protocol-rmcp/src/builder.rs @@ -47,25 +47,26 @@ use schemars::JsonSchema; use serde::{Serialize, de::DeserializeOwned}; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; -use super::{McpConnectionTo, McpTool}; -use crate::{ - ByteStreams, ConnectTo, DynConnectTo, - jsonrpc::run::{ChainRun, NullRun, RunWithConnectionTo}, - mcp_server::{ - McpServer, McpServerConnect, - responder::{ToolCall, ToolFnMutResponder, ToolFnResponder}, - }, +use agent_client_protocol as acp; +use agent_client_protocol::{ + ByteStreams, ChainRun, ConnectTo, DynConnectTo, NullRun, RunWithConnectionTo, + mcp_server::{McpConnectionTo, McpServer, McpServerConnect, McpTool}, role::{self, Role}, }; +use crate::responder::{ToolCall, ToolFnMutResponder, ToolFnResponder}; + /// Builder for creating MCP servers with tools. /// -/// Use [`McpServer::builder`] to create a new builder, then chain methods to +/// Use [`crate::McpServerExt::builder`] to create a new builder, then chain methods to /// configure the server and call [`build`](Self::build) to create the server. /// /// # Example /// /// ```rust,ignore +/// use agent_client_protocol::mcp_server::McpServer; +/// use agent_client_protocol_rmcp::McpServerExt; +/// /// let server = McpServer::builder("my-server".to_string()) /// .instructions("A helpful assistant") /// .tool(EchoTool) @@ -73,7 +74,7 @@ use crate::{ /// "greet", /// "Greet someone by name", /// async |input: GreetInput, _cx| Ok(format!("Hello, {}!", input.name)), -/// agent_client_protocol::tool_fn!(), +/// agent_client_protocol_rmcp::tool_fn!(), /// ) /// .build(); /// ``` @@ -179,9 +180,9 @@ where /// Disable a specific tool by name. /// /// Returns an error if the tool is not registered. - pub fn disable_tool(mut self, name: &str) -> Result { + pub fn disable_tool(mut self, name: &str) -> Result { if !self.data.tools.contains_key(name) { - return Err(crate::Error::invalid_request().data(format!("unknown tool: {name}"))); + return Err(acp::Error::invalid_request().data(format!("unknown tool: {name}"))); } match &mut self.data.enabled_tools { EnabledTools::DenyList(deny) => { @@ -197,9 +198,9 @@ where /// Enable a specific tool by name. /// /// Returns an error if the tool is not registered. - pub fn enable_tool(mut self, name: &str) -> Result { + pub fn enable_tool(mut self, name: &str) -> Result { if !self.data.tools.contains_key(name) { - return Err(crate::Error::invalid_request().data(format!("unknown tool: {name}"))); + return Err(acp::Error::invalid_request().data(format!("unknown tool: {name}"))); } match &mut self.data.enabled_tools { EnabledTools::DenyList(deny) => { @@ -259,14 +260,14 @@ where &'a mut F, P, McpConnectionTo, - ) -> BoxFuture<'a, Result> + ) -> BoxFuture<'a, Result> + Send + 'static, ) -> McpServerBuilder> where P: JsonSchema + DeserializeOwned + 'static + Send, Ret: JsonSchema + Serialize + 'static + Send, - F: AsyncFnMut(P, McpConnectionTo) -> Result + Send, + F: AsyncFnMut(P, McpConnectionTo) -> Result + Send, { let (call_tx, call_rx) = mpsc::channel(128); self.tool_with_responder( @@ -312,7 +313,7 @@ where &'a F, P, McpConnectionTo, - ) -> BoxFuture<'a, Result> + ) -> BoxFuture<'a, Result> + Send + Sync + 'static, @@ -320,7 +321,7 @@ where where P: JsonSchema + DeserializeOwned + 'static + Send, Ret: JsonSchema + Serialize + 'static + Send, - F: AsyncFn(P, McpConnectionTo) -> Result + F: AsyncFn(P, McpConnectionTo) -> Result + Send + Sync + 'static, @@ -342,8 +343,8 @@ where /// Create an MCP server from this builder. /// - /// This builder can be attached to new sessions (see [`SessionBuilder::with_mcp_server`](`crate::SessionBuilder::with_mcp_server`)) - /// or served up as part of a proxy (see [`Builder::with_mcp_server`](`crate::Builder::with_mcp_server`)). + /// This builder can be attached to new sessions (see [`SessionBuilder::with_mcp_server`](`agent_client_protocol::SessionBuilder::with_mcp_server`)) + /// or served up as part of a proxy (see [`Builder::with_mcp_server`](`agent_client_protocol::Builder::with_mcp_server`)). pub fn build(self) -> McpServer { McpServer::new( McpServerBuilt { @@ -383,10 +384,7 @@ pub(crate) struct McpServerConnection { } impl ConnectTo for McpServerConnection { - async fn connect_to( - self, - client: impl ConnectTo, - ) -> Result<(), crate::Error> { + async fn connect_to(self, client: impl ConnectTo) -> Result<(), acp::Error> { // Create tokio byte streams that rmcp expects let (mcp_server_stream, mcp_client_stream) = tokio::io::duplex(8192); let (mcp_server_read, mcp_server_write) = tokio::io::split(mcp_server_stream); @@ -403,14 +401,14 @@ impl ConnectTo for McpServerConnection: Send + Sync { &self, input: serde_json::Value, connection: McpConnectionTo, - ) -> BoxFuture<'_, Result>; + ) -> BoxFuture<'_, Result>; } /// Create an `rmcp` tool model from our [`McpTool`] trait. @@ -554,11 +552,11 @@ fn make_erased_mcp_tool<'s, R: Role, M: McpTool + 's>( &self, input: serde_json::Value, context: McpConnectionTo, - ) -> BoxFuture<'_, Result> { + ) -> BoxFuture<'_, Result> { Box::pin(async move { - let input = serde_json::from_value(input).map_err(crate::util::internal_error)?; + let input = serde_json::from_value(input).map_err(acp::util::internal_error)?; serde_json::to_value(self.tool.call_tool(input, context).await?) - .map_err(crate::util::internal_error) + .map_err(acp::util::internal_error) }) } } @@ -566,8 +564,8 @@ fn make_erased_mcp_tool<'s, R: Role, M: McpTool + 's>( Arc::new(ErasedMcpToolImpl { tool }) } -/// Convert a [`crate::Error`] into an [`rmcp::ErrorData`]. -fn to_rmcp_error(error: crate::Error) -> rmcp::ErrorData { +/// Convert an [`agent_client_protocol::Error`] into an [`rmcp::ErrorData`]. +fn to_rmcp_error(error: acp::Error) -> rmcp::ErrorData { rmcp::ErrorData { code: rmcp::model::ErrorCode(error.code.into()), message: error.message.into(), @@ -604,7 +602,7 @@ where &self, params: P, mcp_connection: McpConnectionTo, - ) -> Result { + ) -> Result { let (result_tx, result_rx) = oneshot::channel(); self.call_tx @@ -615,8 +613,8 @@ where result_tx, }) .await - .map_err(crate::util::internal_error)?; + .map_err(acp::util::internal_error)?; - result_rx.await.map_err(crate::util::internal_error)? + result_rx.await.map_err(acp::util::internal_error)? } } diff --git a/src/agent-client-protocol-rmcp/src/lib.rs b/src/agent-client-protocol-rmcp/src/lib.rs index 7faf143e..a9d45efa 100644 --- a/src/agent-client-protocol-rmcp/src/lib.rs +++ b/src/agent-client-protocol-rmcp/src/lib.rs @@ -5,7 +5,16 @@ //! //! ## Usage //! -//! Create an MCP server from an rmcp service using the extension trait: +//! Build an MCP server with tools using the extension trait: +//! +//! ```ignore +//! use agent_client_protocol::mcp_server::McpServer; +//! use agent_client_protocol_rmcp::McpServerExt; +//! +//! let server = McpServer::builder("my-tools").build(); +//! ``` +//! +//! Or create an MCP server from an rmcp service: //! //! ```ignore //! use agent_client_protocol::mcp_server::McpServer; @@ -21,21 +30,31 @@ //! ``` use agent_client_protocol::mcp_server::{McpConnectionTo, McpServer, McpServerConnect}; -use agent_client_protocol::role::{self, HasPeer}; -use agent_client_protocol::{Agent, ByteStreams, ConnectTo, DynConnectTo, NullRun, Role}; +use agent_client_protocol::role; +use agent_client_protocol::{ByteStreams, ConnectTo, DynConnectTo, NullRun, Role}; use futures_concurrency::future::TryJoin as _; use rmcp::ServiceExt; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; -pub trait McpServerExt -where - Counterpart: HasPeer, -{ +mod builder; +mod responder; + +pub use agent_client_protocol::mcp_server::McpTool; +pub use agent_client_protocol::{tool_fn, tool_fn_mut}; +pub use builder::{EnabledTools, McpServerBuilder}; + +/// Extension constructors for ACP MCP servers backed by `rmcp`. +pub trait McpServerExt { + /// Create an MCP server builder for defining tools in Rust code. + fn builder(name: impl ToString) -> McpServerBuilder { + McpServerBuilder::new(name.to_string()) + } + /// Create an MCP server from something that implements the [`McpServerConnect`] trait. /// /// # See also /// - /// See [`McpServer::builder`] to construct MCP servers from Rust code. + /// See [`Self::builder`] to construct MCP servers from Rust code. fn from_rmcp( name: impl ToString, new_fn: impl Fn() -> S + Send + Sync + 'static, @@ -77,10 +96,7 @@ where } } -impl McpServerExt for McpServer where - Counterpart: HasPeer -{ -} +impl McpServerExt for McpServer {} /// Component wrapper for rmcp services. struct RmcpServerComponent { diff --git a/src/agent-client-protocol/src/mcp_server/responder.rs b/src/agent-client-protocol-rmcp/src/responder.rs similarity index 86% rename from src/agent-client-protocol/src/mcp_server/responder.rs rename to src/agent-client-protocol-rmcp/src/responder.rs index aed19d88..9bfef416 100644 --- a/src/agent-client-protocol/src/mcp_server/responder.rs +++ b/src/agent-client-protocol-rmcp/src/responder.rs @@ -6,15 +6,16 @@ use futures::{ future::BoxFuture, }; -use crate::{ - ConnectionTo, jsonrpc::run::RunWithConnectionTo, mcp_server::McpConnectionTo, role::Role, +use agent_client_protocol as acp; +use agent_client_protocol::{ + ConnectionTo, RunWithConnectionTo, mcp_server::McpConnectionTo, role::Role, }; /// A tool call request sent through the channel. pub(super) struct ToolCall { pub(crate) params: P, pub(crate) mcp_connection: McpConnectionTo, - pub(crate) result_tx: futures::channel::oneshot::Sender>, + pub(crate) result_tx: futures::channel::oneshot::Sender>, } /// Responder for a `tool_fn` closure that receives tool calls through a channel @@ -27,7 +28,7 @@ pub(super) struct ToolFnMutResponder { &'a mut F, P, McpConnectionTo, - ) -> BoxFuture<'a, Result> + ) -> BoxFuture<'a, Result> + Send, >, } @@ -44,7 +45,7 @@ where async fn run_with_connection_to( self, _connection: ConnectionTo, - ) -> Result<(), crate::Error> { + ) -> Result<(), acp::Error> { let ToolFnMutResponder { mut func, mut call_rx, @@ -59,7 +60,7 @@ where let result = tool_future_fn(&mut func, params, mcp_connection).await; result_tx .send(result) - .map_err(|_| crate::util::internal_error("failed to send MCP result"))?; + .map_err(|_| acp::util::internal_error("failed to send MCP result"))?; } Ok(()) } @@ -75,7 +76,7 @@ pub(super) struct ToolFnResponder { &'a F, P, McpConnectionTo, - ) -> BoxFuture<'a, Result> + ) -> BoxFuture<'a, Result> + Send + Sync, >, @@ -93,13 +94,13 @@ where async fn run_with_connection_to( self, _connection: ConnectionTo, - ) -> Result<(), crate::Error> { + ) -> Result<(), acp::Error> { let ToolFnResponder { func, call_rx, tool_future_fn, } = self; - crate::util::process_stream_concurrently( + acp::util::process_stream_concurrently( call_rx, async |tool_call| { fn hack<'a, F, P, R, MyRole>( @@ -111,11 +112,11 @@ where &'a F, P, McpConnectionTo, - ) -> BoxFuture<'a, Result> + ) -> BoxFuture<'a, Result> + Send + Sync ), - result_tx: oneshot::Sender>, + result_tx: oneshot::Sender>, ) -> BoxFuture<'a, ()> where MyRole: Role, diff --git a/src/agent-client-protocol/CHANGELOG.md b/src/agent-client-protocol/CHANGELOG.md index f1aadd07..ff9368a8 100644 --- a/src/agent-client-protocol/CHANGELOG.md +++ b/src/agent-client-protocol/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] +### Changed + +- Move the `rmcp`-backed MCP server builder to `agent-client-protocol-rmcp`, removing `tokio`, `tokio-util`, and `rmcp` from the core crate's normal dependency graph. + ## [0.12.1](https://github.com/agentclientprotocol/rust-sdk/compare/v0.12.0...v0.12.1) - 2026-05-17 ### Other diff --git a/src/agent-client-protocol/Cargo.toml b/src/agent-client-protocol/Cargo.toml index 79469426..c3e46554 100644 --- a/src/agent-client-protocol/Cargo.toml +++ b/src/agent-client-protocol/Cargo.toml @@ -46,7 +46,6 @@ futures.workspace = true futures-concurrency.workspace = true rustc-hash.workspace = true jsonrpcmsg.workspace = true -rmcp = { workspace = true, features = ["server"] } schemars.workspace = true serde.workspace = true serde_json.workspace = true @@ -55,13 +54,13 @@ uuid.workspace = true async-process.workspace = true blocking.workspace = true shell-words.workspace = true -tokio.workspace = true -tokio-util.workspace = true [dev-dependencies] agent-client-protocol-test.workspace = true clap.workspace = true expect-test.workspace = true +tokio.workspace = true +tokio-util.workspace = true [lints] workspace = true diff --git a/src/agent-client-protocol/README.md b/src/agent-client-protocol/README.md index 1f6b12a1..81f4f2c2 100644 --- a/src/agent-client-protocol/README.md +++ b/src/agent-client-protocol/README.md @@ -46,6 +46,7 @@ See the [crate documentation](https://docs.rs/agent-client-protocol) for: ## Related Crates - **[agent-client-protocol-tokio](../agent-client-protocol-tokio/)** — Tokio utilities for spawning agent processes +- **[agent-client-protocol-rmcp](../agent-client-protocol-rmcp/)** — MCP tool builders and `rmcp` integration - **[agent-client-protocol-derive](../agent-client-protocol-derive/)** — Derive macros for JSON-RPC traits - **[agent-client-protocol-trace-viewer](../agent-client-protocol-trace-viewer/)** — Interactive trace visualization diff --git a/src/agent-client-protocol/src/concepts/proxies.rs b/src/agent-client-protocol/src/concepts/proxies.rs index 9b985aaa..571d681a 100644 --- a/src/agent-client-protocol/src/concepts/proxies.rs +++ b/src/agent-client-protocol/src/concepts/proxies.rs @@ -66,9 +66,10 @@ //! //! ## Global MCP Server //! -//! ``` +//! ```ignore //! # use agent_client_protocol::{Proxy, Conductor, ConnectTo}; //! # use agent_client_protocol::mcp_server::McpServer; +//! # use agent_client_protocol_rmcp::McpServerExt; //! # async fn example(transport: impl ConnectTo) -> Result<(), agent_client_protocol::Error> { //! # let my_mcp_server = McpServer::::builder("tools").build(); //! Proxy.builder() @@ -81,10 +82,11 @@ //! //! ## Per-Session MCP Server //! -//! ``` +//! ```ignore //! # use agent_client_protocol::{Proxy, Client, Conductor, ConnectTo}; //! # use agent_client_protocol::schema::NewSessionRequest; //! # use agent_client_protocol::mcp_server::McpServer; +//! # use agent_client_protocol_rmcp::McpServerExt; //! # async fn example(transport: impl ConnectTo) -> Result<(), agent_client_protocol::Error> { //! Proxy.builder() //! .on_receive_request_from(Client, async |req: NewSessionRequest, responder, cx| { diff --git a/src/agent-client-protocol/src/concepts/sessions.rs b/src/agent-client-protocol/src/concepts/sessions.rs index 142350a4..00370946 100644 --- a/src/agent-client-protocol/src/concepts/sessions.rs +++ b/src/agent-client-protocol/src/concepts/sessions.rs @@ -77,9 +77,10 @@ //! You can attach MCP (Model Context Protocol) servers to a session to provide //! tools to the agent: //! -//! ``` +//! ```ignore //! # use agent_client_protocol::{Client, Agent, ConnectTo}; //! # use agent_client_protocol::mcp_server::McpServer; +//! # use agent_client_protocol_rmcp::McpServerExt; //! # async fn example(transport: impl ConnectTo) -> Result<(), agent_client_protocol::Error> { //! # let my_mcp_server = McpServer::::builder("tools").build(); //! # Client.builder().connect_with(transport, async |cx| { diff --git a/src/agent-client-protocol/src/lib.rs b/src/agent-client-protocol/src/lib.rs index 9a28f59c..59780cfa 100644 --- a/src/agent-client-protocol/src/lib.rs +++ b/src/agent-client-protocol/src/lib.rs @@ -147,7 +147,10 @@ mod stdio; pub use stdio::Stdio; /// This is a hack that must be given as the final argument of -/// [`McpServerBuilder::tool_fn`](`crate::mcp_server::McpServerBuilder::tool_fn`) when defining tools. +/// the MCP server builder's `tool_fn_mut` method when defining tools. +/// +/// The `agent-client-protocol-rmcp` crate provides the builder this macro is +/// typically used with. /// Look away, lest ye be blinded by its vileness! /// /// Fine, if you MUST know, it's a horrific workaround for not having @@ -162,7 +165,10 @@ macro_rules! tool_fn_mut { } /// This is a hack that must be given as the final argument of -/// [`McpServerBuilder::tool_fn`](`crate::mcp_server::McpServerBuilder::tool_fn`) when defining stateless concurrent tools. +/// the MCP server builder's `tool_fn` method when defining stateless concurrent tools. +/// +/// The `agent-client-protocol-rmcp` crate provides the builder this macro is +/// typically used with. /// See [`tool_fn_mut!`] for the gory details. #[macro_export] macro_rules! tool_fn { diff --git a/src/agent-client-protocol/src/mcp_server/mod.rs b/src/agent-client-protocol/src/mcp_server/mod.rs index 25f5416f..82a4c0bd 100644 --- a/src/agent-client-protocol/src/mcp_server/mod.rs +++ b/src/agent-client-protocol/src/mcp_server/mod.rs @@ -1,59 +1,44 @@ -//! MCP server support for providing MCP tools over ACP. +//! Runtime-agnostic MCP server support for providing MCP tools over ACP. //! -//! This module provides the infrastructure for building MCP servers that -//! integrate with ACP connections. +//! This module provides the infrastructure for attaching MCP servers to ACP +//! connections without tying the core SDK to a particular MCP implementation or +//! async runtime. //! -//! ## Quick Start +//! ## Building MCP servers with tools //! -//! ```rust,ignore -//! use agent_client_protocol::mcp_server::{McpServer, McpTool}; -//! -//! // Create an MCP server with tools -//! let server = McpServer::builder("my-server".to_string()) -//! .instructions("A helpful assistant") -//! .tool(MyTool) -//! .build(); -//! -//! // Use the server as a handler on your connection -//! Proxy.builder() -//! .with_handler(server) -//! .serve(client) -//! .await?; -//! ``` +//! The `agent-client-protocol-rmcp` crate provides the builder APIs for MCP +//! tools backed by the `rmcp` crate. //! //! ## Custom MCP Server Implementations //! //! You can implement [`McpServerConnect`](`crate::mcp_server::McpServerConnect`) to create custom MCP servers: //! //! ```rust,ignore -//! use agent_client_protocol::mcp_server::{McpServer, McpServerConnect, McpContext}; -//! use agent_client_protocol::{DynComponent, JrLink}; +//! use agent_client_protocol::mcp_server::{McpConnectionTo, McpServer, McpServerConnect}; +//! use agent_client_protocol::{DynConnectTo, NullRun, Role, role}; //! //! struct MyCustomServer; //! -//! impl McpServerConnect for MyCustomServer { +//! impl McpServerConnect for MyCustomServer { //! fn name(&self) -> String { //! "my-custom-server".to_string() //! } //! -//! fn connect(&self, cx: McpContext) -> DynComponent { +//! fn connect(&self, cx: McpConnectionTo) -> DynConnectTo { //! // Return a component that serves MCP requests -//! DynComponent::new(my_mcp_component) +//! DynConnectTo::new(my_mcp_component(cx)) //! } //! } //! -//! let server = McpServer::new(MyCustomServer); +//! let server = McpServer::new(MyCustomServer, NullRun); //! ``` mod active_session; -mod builder; mod connect; mod context; -mod responder; mod server; mod tool; -pub use builder::{EnabledTools, McpServerBuilder}; pub use connect::McpServerConnect; pub use context::McpConnectionTo; pub use server::McpServer; diff --git a/src/agent-client-protocol/src/mcp_server/server.rs b/src/agent-client-protocol/src/mcp_server/server.rs index f746ba46..868483a2 100644 --- a/src/agent-client-protocol/src/mcp_server/server.rs +++ b/src/agent-client-protocol/src/mcp_server/server.rs @@ -1,4 +1,4 @@ -//! MCP server builder for creating MCP servers. +//! MCP server attachment and routing for ACP sessions. use std::{marker::PhantomData, sync::Arc}; @@ -13,10 +13,7 @@ use crate::{ DynamicHandlerRegistration, run::{NullRun, RunWithConnectionTo}, }, - mcp_server::{ - McpConnectionTo, McpServerConnect, active_session::McpActiveSession, - builder::McpServerBuilder, - }, + mcp_server::{McpConnectionTo, McpServerConnect, active_session::McpActiveSession}, role::{self, HasPeer}, util::MatchDispatchFrom, }; @@ -30,14 +27,8 @@ use crate::{ /// /// # Creating an MCP Server /// -/// Use [`McpServer::builder`] to create a server with tools: -/// -/// ```rust,ignore -/// let server = McpServer::builder("my-server".to_string()) -/// .instructions("A helpful assistant") -/// .tool(MyTool) -/// .build(); -/// ``` +/// The `agent-client-protocol-rmcp` crate provides builder APIs for MCP tools +/// backed by the `rmcp` crate. /// /// Or implement [`McpServerConnect`](`super::McpServerConnect`) for custom server behavior: /// @@ -58,8 +49,8 @@ pub struct McpServer { /// Some futures direct messages back through channels to this future which actually /// handles responding to the client. /// - /// This is how we bridge the gap between the rmcp implementation, - /// which requires `'static`, and our APIs, which do not. + /// Some connector implementations use this to run support tasks alongside + /// the message handler. responder: Run, } @@ -75,13 +66,6 @@ impl std::fmt::Debug } } -impl McpServer { - /// Create an empty server with no content. - pub fn builder(name: impl ToString) -> McpServerBuilder { - McpServerBuilder::new(name.to_string()) - } -} - impl McpServer where Run: RunWithConnectionTo, @@ -90,7 +74,8 @@ where /// /// # See also /// - /// See [`Self::builder`] to construct MCP servers from Rust code. + /// See `agent-client-protocol-rmcp` to construct MCP servers from Rust code + /// with `rmcp`. pub fn new(c: impl McpServerConnect, responder: Run) -> Self { McpServer { phantom: PhantomData, diff --git a/src/agent-client-protocol/src/mcp_server/tool.rs b/src/agent-client-protocol/src/mcp_server/tool.rs index 1b2c450e..7690451c 100644 --- a/src/agent-client-protocol/src/mcp_server/tool.rs +++ b/src/agent-client-protocol/src/mcp_server/tool.rs @@ -16,7 +16,7 @@ use super::McpConnectionTo; /// # Example /// /// ```rust,ignore -/// use agent_client_protocol::mcp_server::{McpTool, McpContext}; +/// use agent_client_protocol::mcp_server::{McpConnectionTo, McpTool}; /// use schemars::JsonSchema; /// use serde::{Deserialize, Serialize}; /// @@ -47,7 +47,7 @@ use super::McpConnectionTo; /// async fn call_tool( /// &self, /// input: EchoInput, -/// _context: McpContext, +/// _context: McpConnectionTo, /// ) -> Result { /// Ok(EchoOutput { /// echoed: format!("Echo: {}", input.message), diff --git a/src/agent-client-protocol/src/session.rs b/src/agent-client-protocol/src/session.rs index 144d4030..97f70d1f 100644 --- a/src/agent-client-protocol/src/session.rs +++ b/src/agent-client-protocol/src/session.rs @@ -181,9 +181,10 @@ where /// /// # Example /// - /// ``` + /// ```ignore /// # use agent_client_protocol::{Client, Agent, ConnectTo}; /// # use agent_client_protocol::mcp_server::McpServer; + /// # use agent_client_protocol_rmcp::McpServerExt; /// # async fn example(transport: impl ConnectTo) -> Result<(), agent_client_protocol::Error> { /// # Client.builder().connect_with(transport, async |cx| { /// # let mcp = McpServer::::builder("tools").build(); @@ -255,10 +256,11 @@ where /// /// # Example /// - /// ``` + /// ```ignore /// # use agent_client_protocol::{Proxy, Client, Conductor, ConnectTo}; /// # use agent_client_protocol::schema::NewSessionRequest; /// # use agent_client_protocol::mcp_server::McpServer; + /// # use agent_client_protocol_rmcp::McpServerExt; /// # async fn example(transport: impl ConnectTo) -> Result<(), agent_client_protocol::Error> { /// Proxy.builder() /// .on_receive_request_from(Client, async |request: NewSessionRequest, responder, cx| { From 9704e6fe24886978064f6e4f8f3a4f1631577ae0 Mon Sep 17 00:00:00 2001 From: Ben Brandt Date: Thu, 21 May 2026 20:14:08 +0200 Subject: [PATCH 2/2] refactor(rmcp): move MCP tool registry to core --- Cargo.lock | 1 - src/agent-client-protocol-rmcp/Cargo.toml | 1 - src/agent-client-protocol-rmcp/src/builder.rs | 288 ++------------- src/agent-client-protocol-rmcp/src/lib.rs | 5 +- .../src/responder.rs | 147 -------- .../src/mcp_server/mod.rs | 6 + .../src/mcp_server/registry.rs | 333 ++++++++++++++++++ .../src/mcp_server/tool_fn.rs | 255 ++++++++++++++ 8 files changed, 633 insertions(+), 403 deletions(-) delete mode 100644 src/agent-client-protocol-rmcp/src/responder.rs create mode 100644 src/agent-client-protocol/src/mcp_server/registry.rs create mode 100644 src/agent-client-protocol/src/mcp_server/tool_fn.rs diff --git a/Cargo.lock b/Cargo.lock index bd1043e9..0e3e66e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -104,7 +104,6 @@ dependencies = [ "futures", "futures-concurrency", "rmcp", - "rustc-hash", "schemars 1.2.1", "serde", "serde_json", diff --git a/src/agent-client-protocol-rmcp/Cargo.toml b/src/agent-client-protocol-rmcp/Cargo.toml index d885e172..fbd453b1 100644 --- a/src/agent-client-protocol-rmcp/Cargo.toml +++ b/src/agent-client-protocol-rmcp/Cargo.toml @@ -15,7 +15,6 @@ agent-client-protocol.workspace = true futures.workspace = true futures-concurrency.workspace = true rmcp.workspace = true -rustc-hash.workspace = true schemars.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/src/agent-client-protocol-rmcp/src/builder.rs b/src/agent-client-protocol-rmcp/src/builder.rs index 0bdef3e9..47ff491e 100644 --- a/src/agent-client-protocol-rmcp/src/builder.rs +++ b/src/agent-client-protocol-rmcp/src/builder.rs @@ -1,46 +1,11 @@ //! MCP server builder for creating MCP servers. -use std::{collections::HashSet, marker::PhantomData, pin::pin, sync::Arc}; +use std::{marker::PhantomData, pin::pin, sync::Arc}; -use futures::{ - SinkExt, - channel::{mpsc, oneshot}, - future::{BoxFuture, Either}, -}; +use futures::future::{BoxFuture, Either}; use futures_concurrency::future::TryJoin; -use rustc_hash::FxHashMap; - -/// Tracks which tools are enabled. -/// -/// - `DenyList`: All tools enabled except those in the set (default) -/// - `AllowList`: Only tools in the set are enabled -#[derive(Clone, Debug)] -pub enum EnabledTools { - /// All tools enabled except those in the deny set. - DenyList(HashSet), - /// Only tools in the allow set are enabled. - AllowList(HashSet), -} - -impl Default for EnabledTools { - fn default() -> Self { - EnabledTools::DenyList(HashSet::new()) - } -} - -impl EnabledTools { - /// Check if a tool is enabled. - #[must_use] - pub fn is_enabled(&self, name: &str) -> bool { - match self { - EnabledTools::DenyList(deny) => !deny.contains(name), - EnabledTools::AllowList(allow) => allow.contains(name), - } - } -} use rmcp::{ ErrorData, ServerHandler, - handler::server::tool::{schema_for_output, schema_for_type}, model::{CallToolResult, ListToolsResult, Tool}, }; use schemars::JsonSchema; @@ -50,12 +15,12 @@ use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; use agent_client_protocol as acp; use agent_client_protocol::{ ByteStreams, ChainRun, ConnectTo, DynConnectTo, NullRun, RunWithConnectionTo, - mcp_server::{McpConnectionTo, McpServer, McpServerConnect, McpTool}, + mcp_server::{ + McpConnectionTo, McpServer, McpServerConnect, McpTool, McpToolMetadata, McpToolRegistry, + }, role::{self, Role}, }; -use crate::responder::{ToolCall, ToolFnMutResponder, ToolFnResponder}; - /// Builder for creating MCP servers with tools. /// /// Use [`crate::McpServerExt::builder`] to create a new builder, then chain methods to @@ -85,50 +50,16 @@ where { phantom: PhantomData, name: String, - data: McpServerData, + data: McpToolRegistry, responder: Responder, } -#[derive(Debug)] -struct McpServerData { - instructions: Option, - tool_models: Vec, - tools: FxHashMap>, - enabled_tools: EnabledTools, -} - -/// A registered tool with its metadata. -struct RegisteredTool { - tool: Arc>, - /// Whether this tool returns structured output (i.e., has an output_schema). - has_structured_output: bool, -} - -impl std::fmt::Debug for RegisteredTool { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RegisteredTool") - .field("has_structured_output", &self.has_structured_output) - .finish_non_exhaustive() - } -} - -impl Default for McpServerData { - fn default() -> Self { - Self { - instructions: None, - tool_models: Vec::new(), - tools: FxHashMap::default(), - enabled_tools: EnabledTools::default(), - } - } -} - impl McpServerBuilder { pub(super) fn new(name: String) -> Self { Self { name, phantom: PhantomData, - data: McpServerData::default(), + data: McpToolRegistry::default(), responder: NullRun, } } @@ -141,23 +72,14 @@ where /// Set the server instructions that are provided to the client. #[must_use] pub fn instructions(mut self, instructions: impl ToString) -> Self { - self.data.instructions = Some(instructions.to_string()); + self.data.set_instructions(instructions); self } /// Add a tool to the server. #[must_use] pub fn tool(mut self, tool: impl McpTool + 'static) -> Self { - let tool_model = make_tool_model(&tool); - let has_structured_output = tool_model.output_schema.is_some(); - self.data.tool_models.push(tool_model); - self.data.tools.insert( - tool.name(), - RegisteredTool { - tool: make_erased_mcp_tool(tool), - has_structured_output, - }, - ); + self.data.register_tool(tool); self } @@ -165,7 +87,7 @@ where /// with [`enable_tool`](Self::enable_tool) will be available. #[must_use] pub fn disable_all_tools(mut self) -> Self { - self.data.enabled_tools = EnabledTools::AllowList(HashSet::new()); + self.data.disable_all_tools(); self } @@ -173,7 +95,7 @@ where /// except those explicitly disabled with [`disable_tool`](Self::disable_tool). #[must_use] pub fn enable_all_tools(mut self) -> Self { - self.data.enabled_tools = EnabledTools::DenyList(HashSet::new()); + self.data.enable_all_tools(); self } @@ -181,17 +103,7 @@ where /// /// Returns an error if the tool is not registered. pub fn disable_tool(mut self, name: &str) -> Result { - if !self.data.tools.contains_key(name) { - return Err(acp::Error::invalid_request().data(format!("unknown tool: {name}"))); - } - match &mut self.data.enabled_tools { - EnabledTools::DenyList(deny) => { - deny.insert(name.to_string()); - } - EnabledTools::AllowList(allow) => { - allow.remove(name); - } - } + self.data.disable_tool(name)?; Ok(self) } @@ -199,17 +111,7 @@ where /// /// Returns an error if the tool is not registered. pub fn enable_tool(mut self, name: &str) -> Result { - if !self.data.tools.contains_key(name) { - return Err(acp::Error::invalid_request().data(format!("unknown tool: {name}"))); - } - match &mut self.data.enabled_tools { - EnabledTools::DenyList(deny) => { - deny.remove(name); - } - EnabledTools::AllowList(allow) => { - allow.insert(name.to_string()); - } - } + self.data.enable_tool(name)?; Ok(self) } @@ -269,19 +171,9 @@ where Ret: JsonSchema + Serialize + 'static + Send, F: AsyncFnMut(P, McpConnectionTo) -> Result + Send, { - let (call_tx, call_rx) = mpsc::channel(128); - self.tool_with_responder( - ToolFnTool { - name: name.to_string(), - description: description.to_string(), - call_tx, - }, - ToolFnMutResponder { - func, - call_rx, - tool_future_fn: Box::new(tool_future_hack), - }, - ) + let (tool, responder) = + acp::mcp_server::tool_fn_mut(name, description, func, tool_future_hack); + self.tool_with_responder(tool, responder) } /// Convenience wrapper for defining a stateless tool that can run concurrently. @@ -326,19 +218,8 @@ where + Sync + 'static, { - let (call_tx, call_rx) = mpsc::channel(128); - self.tool_with_responder( - ToolFnTool { - name: name.to_string(), - description: description.to_string(), - call_tx, - }, - ToolFnResponder { - func, - call_rx, - tool_future_fn: Box::new(tool_future_hack), - }, - ) + let (tool, responder) = acp::mcp_server::tool_fn(name, description, func, tool_future_hack); + self.tool_with_responder(tool, responder) } /// Create an MCP server from this builder. @@ -358,7 +239,7 @@ where struct McpServerBuilt { name: String, - data: Arc>, + data: Arc>, } impl McpServerConnect for McpServerBuilt { @@ -379,7 +260,7 @@ impl McpServerConnect for McpServerBuilt { - data: Arc>, + data: Arc>, mcp_connection: McpConnectionTo, } @@ -423,30 +304,20 @@ impl ServerHandler for McpServerConnection { context: rmcp::service::RequestContext, ) -> Result { // Lookup the tool definition, erroring if not found or disabled - let Some(registered) = self.data.tools.get(&request.name[..]) else { + let Some(registered) = self.data.enabled_tool(&request.name) else { return Err(rmcp::model::ErrorData::invalid_params( format!("tool `{}` not found", request.name), None, )); }; - // Treat disabled tools as not found - if !self.data.enabled_tools.is_enabled(&request.name) { - return Err(rmcp::model::ErrorData::invalid_params( - format!("tool `{}` not found", request.name), - None, - )); - } - // Convert input into JSON let serde_value = serde_json::to_value(request.arguments).expect("valid json"); // Execute the user's tool, unless cancellation occurs - let has_structured_output = registered.has_structured_output; + let has_structured_output = registered.has_structured_output(); match futures::future::select( - registered - .tool - .call_tool(serde_value, self.mcp_connection.clone()), + registered.call_tool(serde_value, self.mcp_connection.clone()), pin!(context.ct.cancelled()), ) .await @@ -481,10 +352,8 @@ impl ServerHandler for McpServerConnection { // Return only enabled tools let tools: Vec<_> = self .data - .tool_models - .iter() - .filter(|t| self.data.enabled_tools.is_enabled(&t.name)) - .cloned() + .enabled_tools() + .map(|tool| make_tool_model(tool.metadata())) .collect(); Ok(ListToolsResult::with_all_items(tools)) } @@ -499,69 +368,32 @@ impl ServerHandler for McpServerConnection { .with_server_info(rmcp::model::Implementation::default()) .with_protocol_version(rmcp::model::ProtocolVersion::default()); - if let Some(instr) = self.data.instructions.clone() { - base.with_instructions(instr) + if let Some(instructions) = self.data.instructions() { + base.with_instructions(instructions.to_string()) } else { base } } } -/// Erased version of the MCP tool trait that is dyn-compatible. -trait ErasedMcpTool: Send + Sync { - fn call_tool( - &self, - input: serde_json::Value, - connection: McpConnectionTo, - ) -> BoxFuture<'_, Result>; -} - -/// Create an `rmcp` tool model from our [`McpTool`] trait. -fn make_tool_model>(tool: &M) -> Tool { +/// Create an `rmcp` tool model from runtime-neutral MCP tool metadata. +fn make_tool_model(metadata: &McpToolMetadata) -> Tool { let mut tool = rmcp::model::Tool::new( - tool.name(), - tool.description(), - schema_for_type::(), + metadata.name().to_string(), + metadata.description().to_string(), + metadata.input_schema().clone(), ) .with_execution(rmcp::model::ToolExecution::new()); - if let Ok(schema) = schema_for_output::() { - // schema_for_output returns Err for non-object types (strings, integers, etc.) - // since MCP structured output requires JSON objects. We set - // output_schema to None for these tools, signaling unstructured output. - tool = tool.with_raw_output_schema(schema); + if let Some(title) = metadata.title() { + tool = tool.with_title(title.to_string()); } - tool -} - -/// Create a [`ErasedMcpTool`] from a [`McpTool`], erasing the type details. -fn make_erased_mcp_tool<'s, R: Role, M: McpTool + 's>( - tool: M, -) -> Arc + 's> { - struct ErasedMcpToolImpl { - tool: M, - } - - impl ErasedMcpTool for ErasedMcpToolImpl - where - R: Role, - M: McpTool, - { - fn call_tool( - &self, - input: serde_json::Value, - context: McpConnectionTo, - ) -> BoxFuture<'_, Result> { - Box::pin(async move { - let input = serde_json::from_value(input).map_err(acp::util::internal_error)?; - serde_json::to_value(self.tool.call_tool(input, context).await?) - .map_err(acp::util::internal_error) - }) - } + if let Some(schema) = metadata.output_schema() { + tool = tool.with_raw_output_schema(schema.clone()); } - Arc::new(ErasedMcpToolImpl { tool }) + tool } /// Convert an [`agent_client_protocol::Error`] into an [`rmcp::ErrorData`]. @@ -572,49 +404,3 @@ fn to_rmcp_error(error: acp::Error) -> rmcp::ErrorData { data: error.data, } } - -/// MCP tool used for `tool_fn` and `tooL_fn_mut`. -/// Each time it is invoked, it sends a `ToolCall` message to `call_tx`. -struct ToolFnTool { - name: String, - description: String, - call_tx: mpsc::Sender>, -} - -impl McpTool for ToolFnTool -where - R: Role, - P: JsonSchema + DeserializeOwned + 'static + Send, - Ret: JsonSchema + Serialize + 'static + Send, -{ - type Input = P; - type Output = Ret; - - fn name(&self) -> String { - self.name.clone() - } - - fn description(&self) -> String { - self.description.clone() - } - - async fn call_tool( - &self, - params: P, - mcp_connection: McpConnectionTo, - ) -> Result { - let (result_tx, result_rx) = oneshot::channel(); - - self.call_tx - .clone() - .send(ToolCall { - params, - mcp_connection, - result_tx, - }) - .await - .map_err(acp::util::internal_error)?; - - result_rx.await.map_err(acp::util::internal_error)? - } -} diff --git a/src/agent-client-protocol-rmcp/src/lib.rs b/src/agent-client-protocol-rmcp/src/lib.rs index a9d45efa..7d47c52c 100644 --- a/src/agent-client-protocol-rmcp/src/lib.rs +++ b/src/agent-client-protocol-rmcp/src/lib.rs @@ -37,11 +37,10 @@ use rmcp::ServiceExt; use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; mod builder; -mod responder; -pub use agent_client_protocol::mcp_server::McpTool; +pub use agent_client_protocol::mcp_server::{EnabledTools, McpTool}; pub use agent_client_protocol::{tool_fn, tool_fn_mut}; -pub use builder::{EnabledTools, McpServerBuilder}; +pub use builder::McpServerBuilder; /// Extension constructors for ACP MCP servers backed by `rmcp`. pub trait McpServerExt { diff --git a/src/agent-client-protocol-rmcp/src/responder.rs b/src/agent-client-protocol-rmcp/src/responder.rs deleted file mode 100644 index 9bfef416..00000000 --- a/src/agent-client-protocol-rmcp/src/responder.rs +++ /dev/null @@ -1,147 +0,0 @@ -//! MCP-specific responder types. - -use futures::{ - StreamExt, - channel::{mpsc, oneshot}, - future::BoxFuture, -}; - -use agent_client_protocol as acp; -use agent_client_protocol::{ - ConnectionTo, RunWithConnectionTo, mcp_server::McpConnectionTo, role::Role, -}; - -/// A tool call request sent through the channel. -pub(super) struct ToolCall { - pub(crate) params: P, - pub(crate) mcp_connection: McpConnectionTo, - pub(crate) result_tx: futures::channel::oneshot::Sender>, -} - -/// Responder for a `tool_fn` closure that receives tool calls through a channel -/// and invokes the user's async function. -pub(super) struct ToolFnMutResponder { - pub(crate) func: F, - pub(crate) call_rx: mpsc::Receiver>, - pub(crate) tool_future_fn: Box< - dyn for<'a> Fn( - &'a mut F, - P, - McpConnectionTo, - ) -> BoxFuture<'a, Result> - + Send, - >, -} - -impl RunWithConnectionTo - for ToolFnMutResponder -where - Counterpart: Role, - Counterpart1: Role, - P: Send, - R: Send, - F: Send, -{ - async fn run_with_connection_to( - self, - _connection: ConnectionTo, - ) -> Result<(), acp::Error> { - let ToolFnMutResponder { - mut func, - mut call_rx, - tool_future_fn, - } = self; - while let Some(ToolCall { - params, - mcp_connection, - result_tx, - }) = call_rx.next().await - { - let result = tool_future_fn(&mut func, params, mcp_connection).await; - result_tx - .send(result) - .map_err(|_| acp::util::internal_error("failed to send MCP result"))?; - } - Ok(()) - } -} - -/// Responder for a `tool_fn` closure that receives tool calls through a channel -/// and invokes the user's async function concurrently. -pub(super) struct ToolFnResponder { - pub(crate) func: F, - pub(crate) call_rx: mpsc::Receiver>, - pub(crate) tool_future_fn: Box< - dyn for<'a> Fn( - &'a F, - P, - McpConnectionTo, - ) -> BoxFuture<'a, Result> - + Send - + Sync, - >, -} - -impl RunWithConnectionTo - for ToolFnResponder -where - Counterpart: Role, - Counterpart1: Role, - P: Send, - R: Send, - F: Send + Sync, -{ - async fn run_with_connection_to( - self, - _connection: ConnectionTo, - ) -> Result<(), acp::Error> { - let ToolFnResponder { - func, - call_rx, - tool_future_fn, - } = self; - acp::util::process_stream_concurrently( - call_rx, - async |tool_call| { - fn hack<'a, F, P, R, MyRole>( - func: &'a F, - params: P, - mcp_connection: McpConnectionTo, - tool_future_fn: &'a ( - dyn Fn( - &'a F, - P, - McpConnectionTo, - ) -> BoxFuture<'a, Result> - + Send - + Sync - ), - result_tx: oneshot::Sender>, - ) -> BoxFuture<'a, ()> - where - MyRole: Role, - P: Send, - R: Send, - F: Send + Sync, - { - Box::pin(async move { - let result = tool_future_fn(func, params, mcp_connection).await; - // Ignore send errors - the receiver may have been dropped - drop(result_tx.send(result)); - }) - } - - let ToolCall { - params, - mcp_connection, - result_tx, - } = tool_call; - - hack(&func, params, mcp_connection, &*tool_future_fn, result_tx).await; - Ok(()) - }, - |a, b| Box::pin(a(b)), - ) - .await - } -} diff --git a/src/agent-client-protocol/src/mcp_server/mod.rs b/src/agent-client-protocol/src/mcp_server/mod.rs index 82a4c0bd..6914dc7f 100644 --- a/src/agent-client-protocol/src/mcp_server/mod.rs +++ b/src/agent-client-protocol/src/mcp_server/mod.rs @@ -36,10 +36,16 @@ mod active_session; mod connect; mod context; +mod registry; mod server; mod tool; +mod tool_fn; pub use connect::McpServerConnect; pub use context::McpConnectionTo; +pub use registry::{ + EnabledTools, McpToolMetadata, McpToolRegistry, McpToolSchema, RegisteredMcpTool, +}; pub use server::McpServer; pub use tool::McpTool; +pub use tool_fn::{tool_fn, tool_fn_mut}; diff --git a/src/agent-client-protocol/src/mcp_server/registry.rs b/src/agent-client-protocol/src/mcp_server/registry.rs new file mode 100644 index 00000000..c012edcc --- /dev/null +++ b/src/agent-client-protocol/src/mcp_server/registry.rs @@ -0,0 +1,333 @@ +//! Runtime-neutral MCP tool registration and dispatch. + +use std::{collections::HashSet, sync::Arc}; + +use futures::future::BoxFuture; +use rustc_hash::FxHashMap; +use schemars::{JsonSchema, generate::SchemaSettings}; +use serde_json::{Map, Value}; + +use crate::{Error, Role}; + +use super::{McpConnectionTo, McpTool}; + +/// JSON Schema object used to describe MCP tool inputs and outputs. +pub type McpToolSchema = Map; + +/// Tracks which tools are enabled. +/// +/// - `DenyList`: All tools enabled except those in the set (default) +/// - `AllowList`: Only tools in the set are enabled +#[derive(Clone, Debug)] +pub enum EnabledTools { + /// All tools enabled except those in the deny set. + DenyList(HashSet), + /// Only tools in the allow set are enabled. + AllowList(HashSet), +} + +impl Default for EnabledTools { + fn default() -> Self { + EnabledTools::DenyList(HashSet::new()) + } +} + +impl EnabledTools { + /// Check if a tool is enabled. + #[must_use] + pub fn is_enabled(&self, name: &str) -> bool { + match self { + EnabledTools::DenyList(deny) => !deny.contains(name), + EnabledTools::AllowList(allow) => allow.contains(name), + } + } +} + +/// Runtime-neutral metadata for an MCP tool. +#[derive(Clone, Debug)] +pub struct McpToolMetadata { + name: String, + title: Option, + description: String, + input_schema: Arc, + output_schema: Option>, +} + +impl McpToolMetadata { + fn from_tool>(tool: &M) -> Self { + Self { + name: tool.name(), + title: tool.title(), + description: tool.description(), + input_schema: schema_for_type::(), + output_schema: schema_for_output::(), + } + } + + /// The tool name. + #[must_use] + pub fn name(&self) -> &str { + &self.name + } + + /// A human-readable title for the tool. + #[must_use] + pub fn title(&self) -> Option<&str> { + self.title.as_deref() + } + + /// A description of what the tool does. + #[must_use] + pub fn description(&self) -> &str { + &self.description + } + + /// JSON Schema object defining the expected parameters for the tool. + #[must_use] + pub fn input_schema(&self) -> &Arc { + &self.input_schema + } + + /// Optional JSON Schema object defining the structure of the tool's output. + #[must_use] + pub fn output_schema(&self) -> Option<&Arc> { + self.output_schema.as_ref() + } +} + +/// A registered MCP tool that can be dispatched with erased JSON values. +pub struct RegisteredMcpTool { + metadata: McpToolMetadata, + tool: Arc>, +} + +impl std::fmt::Debug for RegisteredMcpTool { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RegisteredMcpTool") + .field("metadata", &self.metadata) + .field("has_structured_output", &self.has_structured_output()) + .finish_non_exhaustive() + } +} + +impl RegisteredMcpTool { + fn new(tool: impl McpTool + 'static) -> Self { + let metadata = McpToolMetadata::from_tool(&tool); + Self { + metadata, + tool: make_erased_mcp_tool(tool), + } + } + + /// Tool metadata. + #[must_use] + pub fn metadata(&self) -> &McpToolMetadata { + &self.metadata + } + + /// The tool name. + #[must_use] + pub fn name(&self) -> &str { + self.metadata.name() + } + + /// Whether the tool returns structured output. + #[must_use] + pub fn has_structured_output(&self) -> bool { + self.metadata.output_schema().is_some() + } + + /// Invoke the registered tool using JSON input and output values. + pub fn call_tool( + &self, + input: Value, + connection: McpConnectionTo, + ) -> BoxFuture<'_, Result> { + self.tool.call_tool(input, connection) + } +} + +/// Runtime-neutral registry for MCP tools. +#[derive(Debug)] +pub struct McpToolRegistry { + instructions: Option, + tool_indices: FxHashMap, + tools: Vec>, + enabled_tools: EnabledTools, +} + +impl Default for McpToolRegistry { + fn default() -> Self { + Self { + instructions: None, + tool_indices: FxHashMap::default(), + tools: Vec::new(), + enabled_tools: EnabledTools::default(), + } + } +} + +impl McpToolRegistry { + /// Set the server instructions that are provided to the client. + pub fn set_instructions(&mut self, instructions: impl ToString) { + self.instructions = Some(instructions.to_string()); + } + + /// Server instructions provided to the client. + #[must_use] + pub fn instructions(&self) -> Option<&str> { + self.instructions.as_deref() + } + + /// Register a tool. + pub fn register_tool(&mut self, tool: impl McpTool + 'static) { + let registered_tool = RegisteredMcpTool::new(tool); + let name = registered_tool.name().to_string(); + + if let Some(&index) = self.tool_indices.get(&name) { + self.tools[index] = registered_tool; + } else { + self.tool_indices.insert(name, self.tools.len()); + self.tools.push(registered_tool); + } + } + + /// Return all registered tools in registration order. + pub fn tools(&self) -> impl Iterator> { + self.tools.iter() + } + + /// Return enabled registered tools in registration order. + pub fn enabled_tools(&self) -> impl Iterator> { + self.tools + .iter() + .filter(|tool| self.enabled_tools.is_enabled(tool.name())) + } + + /// Return a registered tool by name, even if it is disabled. + #[must_use] + pub fn tool(&self, name: &str) -> Option<&RegisteredMcpTool> { + self.tool_indices + .get(name) + .and_then(|&index| self.tools.get(index)) + } + + /// Return an enabled tool by name. + #[must_use] + pub fn enabled_tool(&self, name: &str) -> Option<&RegisteredMcpTool> { + self.tool(name) + .filter(|tool| self.enabled_tools.is_enabled(tool.name())) + } + + /// Check whether a tool is registered. + #[must_use] + pub fn contains_tool(&self, name: &str) -> bool { + self.tool_indices.contains_key(name) + } + + /// Disable all tools. After calling this, only tools explicitly enabled + /// with [`enable_tool`](Self::enable_tool) will be available. + pub fn disable_all_tools(&mut self) { + self.enabled_tools = EnabledTools::AllowList(HashSet::new()); + } + + /// Enable all tools. After calling this, all tools will be available + /// except those explicitly disabled with [`disable_tool`](Self::disable_tool). + pub fn enable_all_tools(&mut self) { + self.enabled_tools = EnabledTools::DenyList(HashSet::new()); + } + + /// Disable a specific tool by name. + /// + /// Returns an error if the tool is not registered. + pub fn disable_tool(&mut self, name: &str) -> Result<(), Error> { + if !self.contains_tool(name) { + return Err(Error::invalid_request().data(format!("unknown tool: {name}"))); + } + match &mut self.enabled_tools { + EnabledTools::DenyList(deny) => { + deny.insert(name.to_string()); + } + EnabledTools::AllowList(allow) => { + allow.remove(name); + } + } + Ok(()) + } + + /// Enable a specific tool by name. + /// + /// Returns an error if the tool is not registered. + pub fn enable_tool(&mut self, name: &str) -> Result<(), Error> { + if !self.contains_tool(name) { + return Err(Error::invalid_request().data(format!("unknown tool: {name}"))); + } + match &mut self.enabled_tools { + EnabledTools::DenyList(deny) => { + deny.remove(name); + } + EnabledTools::AllowList(allow) => { + allow.insert(name.to_string()); + } + } + Ok(()) + } +} + +/// Erased version of the MCP tool trait that is dyn-compatible. +trait ErasedMcpTool: Send + Sync { + fn call_tool( + &self, + input: Value, + connection: McpConnectionTo, + ) -> BoxFuture<'_, Result>; +} + +fn make_erased_mcp_tool(tool: M) -> Arc> +where + R: Role, + M: McpTool + 'static, +{ + struct ErasedMcpToolImpl { + tool: M, + } + + impl ErasedMcpTool for ErasedMcpToolImpl + where + R: Role, + M: McpTool, + { + fn call_tool( + &self, + input: Value, + context: McpConnectionTo, + ) -> BoxFuture<'_, Result> { + Box::pin(async move { + let input = serde_json::from_value(input).map_err(crate::util::internal_error)?; + serde_json::to_value(self.tool.call_tool(input, context).await?) + .map_err(crate::util::internal_error) + }) + } + } + + Arc::new(ErasedMcpToolImpl { tool }) +} + +fn schema_for_type() -> Arc { + let settings = SchemaSettings::draft2020_12(); + let generator = settings.into_generator(); + let schema = generator.into_root_schema_for::(); + let object = serde_json::to_value(schema).expect("failed to serialize schema"); + let Value::Object(object) = object else { + panic!("Schema serialization produced non-object value: expected JSON object"); + }; + Arc::new(object) +} + +fn schema_for_output() -> Option> { + let schema = schema_for_type::(); + match schema.get("type") { + Some(Value::String(t)) if t == "object" => Some(schema), + _ => None, + } +} diff --git a/src/agent-client-protocol/src/mcp_server/tool_fn.rs b/src/agent-client-protocol/src/mcp_server/tool_fn.rs new file mode 100644 index 00000000..cadf1bab --- /dev/null +++ b/src/agent-client-protocol/src/mcp_server/tool_fn.rs @@ -0,0 +1,255 @@ +//! Runtime-neutral helpers for registering function-backed MCP tools. + +use futures::{ + SinkExt, StreamExt, + channel::{mpsc, oneshot}, + future::BoxFuture, +}; +use schemars::JsonSchema; +use serde::{Serialize, de::DeserializeOwned}; + +use crate::{ConnectionTo, Error, Role, RunWithConnectionTo}; + +use super::{McpConnectionTo, McpTool}; + +struct ToolCall { + params: P, + mcp_connection: McpConnectionTo, + result_tx: futures::channel::oneshot::Sender>, +} + +struct ToolFnMutResponder { + func: F, + call_rx: mpsc::Receiver>, + tool_future_fn: Box< + dyn for<'a> Fn( + &'a mut F, + P, + McpConnectionTo, + ) -> BoxFuture<'a, Result> + + Send, + >, +} + +impl RunWithConnectionTo + for ToolFnMutResponder +where + Counterpart: Role, + Counterpart1: Role, + P: Send, + R: Send, + F: Send, +{ + async fn run_with_connection_to( + self, + _connection: ConnectionTo, + ) -> Result<(), Error> { + let ToolFnMutResponder { + mut func, + mut call_rx, + tool_future_fn, + } = self; + while let Some(ToolCall { + params, + mcp_connection, + result_tx, + }) = call_rx.next().await + { + let result = tool_future_fn(&mut func, params, mcp_connection).await; + result_tx + .send(result) + .map_err(|_| crate::util::internal_error("failed to send MCP result"))?; + } + Ok(()) + } +} + +struct ToolFnResponder { + func: F, + call_rx: mpsc::Receiver>, + tool_future_fn: Box< + dyn for<'a> Fn(&'a F, P, McpConnectionTo) -> BoxFuture<'a, Result> + + Send + + Sync, + >, +} + +impl RunWithConnectionTo + for ToolFnResponder +where + Counterpart: Role, + Counterpart1: Role, + P: Send, + R: Send, + F: Send + Sync, +{ + async fn run_with_connection_to( + self, + _connection: ConnectionTo, + ) -> Result<(), Error> { + let ToolFnResponder { + func, + call_rx, + tool_future_fn, + } = self; + crate::util::process_stream_concurrently( + call_rx, + async |tool_call| { + fn hack<'a, F, P, R, MyRole>( + func: &'a F, + params: P, + mcp_connection: McpConnectionTo, + tool_future_fn: &'a ( + dyn Fn( + &'a F, + P, + McpConnectionTo, + ) -> BoxFuture<'a, Result> + + Send + + Sync + ), + result_tx: oneshot::Sender>, + ) -> BoxFuture<'a, ()> + where + MyRole: Role, + P: Send, + R: Send, + F: Send + Sync, + { + Box::pin(async move { + let result = tool_future_fn(func, params, mcp_connection).await; + drop(result_tx.send(result)); + }) + } + + let ToolCall { + params, + mcp_connection, + result_tx, + } = tool_call; + + hack(&func, params, mcp_connection, &*tool_future_fn, result_tx).await; + Ok(()) + }, + |a, b| Box::pin(a(b)), + ) + .await + } +} + +struct ToolFnTool { + name: String, + description: String, + call_tx: mpsc::Sender>, +} + +impl McpTool for ToolFnTool +where + R: Role, + P: JsonSchema + DeserializeOwned + 'static + Send, + Ret: JsonSchema + Serialize + 'static + Send, +{ + type Input = P; + type Output = Ret; + + fn name(&self) -> String { + self.name.clone() + } + + fn description(&self) -> String { + self.description.clone() + } + + async fn call_tool(&self, params: P, mcp_connection: McpConnectionTo) -> Result { + let (result_tx, result_rx) = oneshot::channel(); + + self.call_tx + .clone() + .send(ToolCall { + params, + mcp_connection, + result_tx, + }) + .await + .map_err(crate::util::internal_error)?; + + result_rx.await.map_err(crate::util::internal_error)? + } +} + +/// Create a "single-threaded" function-backed MCP tool and its responder. +/// +/// Only one invocation of the tool can be running at a time. +pub fn tool_fn_mut( + name: impl ToString, + description: impl ToString, + func: F, + tool_future_fn: impl for<'a> Fn( + &'a mut F, + P, + McpConnectionTo, + ) -> BoxFuture<'a, Result> + + Send + + 'static, +) -> ( + impl McpTool + 'static, + impl RunWithConnectionTo, +) +where + Counterpart: Role, + P: JsonSchema + DeserializeOwned + 'static + Send, + Ret: JsonSchema + Serialize + 'static + Send, + F: AsyncFnMut(P, McpConnectionTo) -> Result + Send, +{ + let (call_tx, call_rx) = mpsc::channel(128); + ( + ToolFnTool { + name: name.to_string(), + description: description.to_string(), + call_tx, + }, + ToolFnMutResponder { + func, + call_rx, + tool_future_fn: Box::new(tool_future_fn), + }, + ) +} + +/// Create a stateless function-backed MCP tool and its concurrent responder. +pub fn tool_fn( + name: impl ToString, + description: impl ToString, + func: F, + tool_future_fn: impl for<'a> Fn( + &'a F, + P, + McpConnectionTo, + ) -> BoxFuture<'a, Result> + + Send + + Sync + + 'static, +) -> ( + impl McpTool + 'static, + impl RunWithConnectionTo, +) +where + Counterpart: Role, + P: JsonSchema + DeserializeOwned + 'static + Send, + Ret: JsonSchema + Serialize + 'static + Send, + F: AsyncFn(P, McpConnectionTo) -> Result + Send + Sync + 'static, +{ + let (call_tx, call_rx) = mpsc::channel(128); + ( + ToolFnTool { + name: name.to_string(), + description: description.to_string(), + call_tx, + }, + ToolFnResponder { + func, + call_rx, + tool_future_fn: Box::new(tool_future_fn), + }, + ) +}