Skip to content

Commit d7e9b1b

Browse files
syn-zhuclaude
andcommitted
feat: add MCP tool server discovery and invocation endpoint
Adds three new MCP tools to the kagent-controller's existing MCP endpoint at :8083/mcp, enabling dynamic discovery and invocation of tools across all tool source types: - list_tool_servers: Lists all tool servers (RemoteMCPServer, Service with kagent.dev/mcp-service=true label, MCPServer CRs) - list_tools: Connects to a tool server and returns its tool catalog - call_tool: Invokes a specific tool on a specific tool server This moves the functionality originally proposed in kagent-dev/kmcp#123 into kagent per reviewer feedback, since kagent already watches all three resource types and has the existing MCP handler infrastructure. Key design decisions: - Unified ref format: Kind/namespace/name (e.g. RemoteMCPServer/default/my-server) - Session caching with evict-and-retry for stale connections - Reuses existing ConvertServiceToRemoteMCPServer and ConvertMCPServerToRemoteMCPServer from the translator package - MCPServer CRD is optional (graceful degradation if not installed) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Simon Zhu <simon.zhu@mongodb.com>
1 parent 97a5014 commit d7e9b1b

5 files changed

Lines changed: 1072 additions & 58 deletions

File tree

go/core/internal/controller/reconciler/reconciler.go

Lines changed: 2 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"encoding/hex"
77
"errors"
88
"fmt"
9-
"net/http"
109
"reflect"
1110
"slices"
1211
"strings"
@@ -27,6 +26,7 @@ import (
2726
"github.com/kagent-dev/kagent/go/core/internal/controller/provider"
2827
agent_translator "github.com/kagent-dev/kagent/go/core/internal/controller/translator/agent"
2928
"github.com/kagent-dev/kagent/go/core/internal/utils"
29+
mcputil "github.com/kagent-dev/kagent/go/core/internal/mcp"
3030
"github.com/kagent-dev/kagent/go/core/internal/version"
3131
"github.com/modelcontextprotocol/go-sdk/mcp"
3232
corev1 "k8s.io/api/core/v1"
@@ -784,7 +784,7 @@ func (a *kagentReconciler) upsertToolServerForRemoteMCPServer(ctx context.Contex
784784
return nil, fmt.Errorf("failed to store toolServer %s: %w", toolServer.Name, err)
785785
}
786786

787-
tsp, err := a.createMcpTransport(ctx, remoteMcpServer)
787+
tsp, err := mcputil.CreateMCPTransport(ctx, a.kube, remoteMcpServer)
788788
if err != nil {
789789
return nil, fmt.Errorf("failed to create client for toolServer %s: %w", toolServer.Name, err)
790790
}
@@ -809,59 +809,6 @@ func (a *kagentReconciler) isNamespaceWatched(namespace string) bool {
809809
return slices.Contains(a.watchedNamespaces, namespace)
810810
}
811811

812-
func (a *kagentReconciler) createMcpTransport(ctx context.Context, s *v1alpha2.RemoteMCPServer) (mcp.Transport, error) {
813-
headers, err := s.ResolveHeaders(ctx, a.kube)
814-
if err != nil {
815-
return nil, err
816-
}
817-
818-
httpClient := newHTTPClient(headers)
819-
820-
switch s.Spec.Protocol {
821-
case v1alpha2.RemoteMCPServerProtocolSse:
822-
return &mcp.SSEClientTransport{
823-
Endpoint: s.Spec.URL,
824-
HTTPClient: httpClient,
825-
}, nil
826-
default:
827-
return &mcp.StreamableClientTransport{
828-
Endpoint: s.Spec.URL,
829-
HTTPClient: httpClient,
830-
}, nil
831-
}
832-
}
833-
834-
// go-sdk does not have a WithHeaders option when initializing transport
835-
// so we need to create a custom HTTP client that adds headers to all requests.
836-
func newHTTPClient(headers map[string]string) *http.Client {
837-
if len(headers) == 0 {
838-
return http.DefaultClient
839-
}
840-
return &http.Client{
841-
Transport: &headerTransport{
842-
headers: headers,
843-
base: http.DefaultTransport,
844-
},
845-
}
846-
}
847-
848-
// headerTransport is an http.RoundTripper that adds custom headers to requests.
849-
type headerTransport struct {
850-
headers map[string]string
851-
base http.RoundTripper
852-
}
853-
854-
func (t *headerTransport) RoundTrip(req *http.Request) (*http.Response, error) {
855-
req = req.Clone(req.Context())
856-
for k, v := range t.headers {
857-
req.Header.Set(k, v)
858-
}
859-
if t.base == nil {
860-
t.base = http.DefaultTransport
861-
}
862-
return t.base.RoundTrip(req)
863-
}
864-
865812
func (a *kagentReconciler) listTools(ctx context.Context, tsp mcp.Transport, toolServer *database.ToolServer) ([]*v1alpha2.MCPTool, error) {
866813
impl := &mcp.Implementation{
867814
Name: "kagent-controller",

go/core/internal/mcp/mcp_handler.go

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ import (
2222
)
2323

2424
// MCPHandler handles MCP requests and bridges them to A2A endpoints
25+
// and tool server discovery/invocation.
2526
type MCPHandler struct {
2627
kubeClient client.Client
2728
a2aBaseURL string
2829
authenticator auth.AuthProvider
2930
httpHandler *mcpsdk.StreamableHTTPHandler
3031
server *mcpsdk.Server
3132
a2aClients sync.Map
33+
sessions sync.Map // cached MCP client sessions keyed by "Kind/namespace/name"
3234
}
3335

3436
// Input types for MCP tools
@@ -92,6 +94,36 @@ func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator au
9294
handler.handleInvokeAgent,
9395
)
9496

97+
// Add list_tool_servers tool
98+
mcpsdk.AddTool[ListToolServersInput, ListToolServersOutput](
99+
server,
100+
&mcpsdk.Tool{
101+
Name: "list_tool_servers",
102+
Description: "List all MCP tool servers in the cluster (RemoteMCPServer, Service, MCPServer)",
103+
},
104+
handler.handleListToolServers,
105+
)
106+
107+
// Add list_tools tool
108+
mcpsdk.AddTool[ListToolsInput, ListToolsOutput](
109+
server,
110+
&mcpsdk.Tool{
111+
Name: "list_tools",
112+
Description: "Connect to a tool server and list its available tools",
113+
},
114+
handler.handleListTools,
115+
)
116+
117+
// Add call_tool tool
118+
mcpsdk.AddTool[CallToolInput, CallToolOutput](
119+
server,
120+
&mcpsdk.Tool{
121+
Name: "call_tool",
122+
Description: "Invoke a specific tool on a specific tool server",
123+
},
124+
handler.handleCallTool,
125+
)
126+
95127
// Create HTTP handler
96128
handler.httpHandler = mcpsdk.NewStreamableHTTPHandler(
97129
func(*http.Request) *mcpsdk.Server {
@@ -309,9 +341,14 @@ func (h *MCPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
309341
h.httpHandler.ServeHTTP(w, r)
310342
}
311343

312-
// Shutdown gracefully shuts down the MCP handler
344+
// Shutdown gracefully shuts down the MCP handler and closes cached sessions.
313345
func (h *MCPHandler) Shutdown(ctx context.Context) error {
314-
// The new SDK doesn't have an explicit Shutdown method on StreamableHTTPHandler
315-
// The server will be shut down when the context is cancelled
346+
h.sessions.Range(func(key, value any) bool {
347+
if session, ok := value.(*mcpsdk.ClientSession); ok {
348+
session.Close()
349+
}
350+
h.sessions.Delete(key)
351+
return true
352+
})
316353
return nil
317354
}

0 commit comments

Comments
 (0)