Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
2d6706f
chore: add go-sdk dependency
freeznet Jan 18, 2026
6841554
feat: add go-sdk server wrapper
freeznet Jan 18, 2026
e883083
refactor: migrate stdio transport to go-sdk
freeznet Jan 18, 2026
febb1d5
chore(gitignore): add .agents directory to ignore list
freeznet Jan 18, 2026
78dfe8b
refactor: update tool builder interface for go-sdk
freeznet Jan 18, 2026
ae088ee
feat(mcp): add go-sdk schema helpers
freeznet Jan 18, 2026
c0d839b
fix: complete kafka topics tool migration
freeznet Jan 18, 2026
164c2f2
refactor: migrate kafka groups tools to go-sdk
freeznet Jan 18, 2026
67d978c
feat: migrate kafka produce tool to go-sdk
freeznet Jan 19, 2026
5abc1ce
feat: migrate kafka schema registry tool to go-sdk
freeznet Jan 19, 2026
55d7c63
feat: migrate kafka consume tool to go-sdk
freeznet Jan 19, 2026
678e461
feat: migrate pulsar admin topic tool to go-sdk
freeznet Jan 19, 2026
78546b9
feat: migrate pulsar admin tenant tool to go-sdk
freeznet Jan 19, 2026
3f7cce9
feat: migrate pulsar namespace tool to go-sdk
freeznet Jan 19, 2026
a904d8d
feat: migrate pulsar schema tool to go-sdk
freeznet Jan 19, 2026
28ea496
feat: migrate pulsar admin functions tool to go-sdk
freeznet Jan 19, 2026
d9e8d1d
feat: migrate pulsar admin brokers/cluster/sources/sinks/subscription
freeznet Jan 19, 2026
4785eb5
feat: migrate pulsar broker stats tool
freeznet Jan 19, 2026
d3b8847
feat: migrate functions worker tool to go-sdk
freeznet Jan 19, 2026
b55718e
feat: migrate namespace policy tools to go-sdk
freeznet Jan 19, 2026
81aa67f
feat: migrate pulsar topic policy tool
freeznet Jan 19, 2026
b19fcad
feat: migrate ns isolation policy tool to go-sdk
freeznet Jan 19, 2026
8e6867c
feat: migrate pulsar packages tool to go-sdk
freeznet Jan 19, 2026
ac65065
feat: migrate pulsar resource quotas tool
freeznet Jan 19, 2026
ccf0b20
feat: migrate pulsar client produce tool to go-sdk
freeznet Jan 19, 2026
2c84753
feat: migrate pulsar consume tool to go-sdk
freeznet Jan 19, 2026
a424ac1
feat: migrate pftools types to go-sdk
freeznet Jan 19, 2026
98601e3
fix: migrate pftools schema conversion
freeznet Jan 19, 2026
3b8ff3c
refactor: migrate pftools manager to go-sdk
freeznet Jan 19, 2026
e38bf85
refactor: migrate pftools invocation results
freeznet Jan 19, 2026
3cba28b
fix: align functions-as-tools integration with go-sdk
freeznet Jan 19, 2026
c7f9b46
feat: add MCP request context helpers
freeznet Jan 19, 2026
2574867
fix: align go-sdk middleware recovery
freeznet Jan 19, 2026
a9a7856
test: add testcontainers e2e scaffolding
freeznet Jan 19, 2026
eb84f86
test: migrate e2e client to go-sdk
freeznet Jan 19, 2026
b344cdb
test: add pulsar admin e2e coverage
freeznet Jan 19, 2026
d767266
test(e2e): add kafka e2e coverage
freeznet Jan 19, 2026
579ef8d
ci: add full test workflow for migration
freeznet Jan 19, 2026
eb32071
test: verify auth isolation in e2e runner
freeznet Jan 19, 2026
a7f0628
docs(mcp): update architecture and tool implementation details
freeznet Jan 19, 2026
ef54a62
stash
freeznet Jan 19, 2026
aad6af4
go mod tidy
freeznet Jan 19, 2026
2853cb9
stash
freeznet Jan 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: CI
on: [push, pull_request]

permissions:
contents: read

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: Check out code
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: "go.mod"

- name: Download dependencies
run: go mod download

- name: Run unit tests
run: go test -race ./...

- name: Run e2e test suite
run: go test ./cmd/snmcp-e2e/... -v
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
bin/
dist/

.DS_Store
.DS_Store
tmp/
*.log

Expand All @@ -12,3 +12,4 @@ vendor
agents/
.serena/
.envrc
.agents/
64 changes: 48 additions & 16 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ go test -v -run TestName ./pkg/... # Run a single test

## Architecture Overview

The StreamNative MCP Server implements the Model Context Protocol using the `mark3labs/mcp-go` library to enable AI agents to interact with Apache Kafka, Apache Pulsar, and StreamNative Cloud resources.
The StreamNative MCP Server implements the Model Context Protocol using the `modelcontextprotocol/go-sdk` library to enable AI agents to interact with Apache Kafka, Apache Pulsar, and StreamNative Cloud resources.

### Request Flow

```
Client Request → MCP Server (pkg/mcp/server.go)
Client Request → Transport (stdio/SSE in pkg/cmd/mcp/)
SSE/stdio transport layer (pkg/cmd/mcp/)
MCP Server (go-sdk, pkg/mcp/server_new.go)
Tool Handler (from builders)
Tool Handler (typed input/output)
Context Functions (pkg/mcp/ctx.go)
Expand All @@ -37,13 +37,16 @@ Client Request → MCP Server (pkg/mcp/server.go)

### Core Components

1. **Server & Sessions** (`pkg/mcp/server.go`)
- `Server` struct holds `MCPServer`, `KafkaSession`, `PulsarSession`, and `SNCloudSession`
- Sessions provide lazy-initialized clients for each service
1. **Server & Sessions** (`pkg/mcp/server_new.go`)
- `Server` wraps go-sdk `*mcp.Server` as `MCPServer` and holds `KafkaSession`, `PulsarSession`, and `SNCloudSession`
- `NewServer` configures server capabilities plus logging/recovery middleware
- Context functions (`pkg/mcp/ctx.go`) inject/retrieve sessions from request context

2. **Tool Builders Framework** (`pkg/mcp/builders/`)
- `ToolBuilder` interface: `GetName()`, `GetRequiredFeatures()`, `BuildTools()`, `Validate()`
- `ToolBuilder` interface: `GetName()`, `GetRequiredFeatures()`, `BuildTools(ctx, config)`, `Validate(config)`
- `ToolDefinition` provides `Definition()` and `Register(*mcp.Server)` for typed tool installs
- `ServerTool[In, Out]` pairs a `*mcp.Tool` with `mcp.ToolHandlerFor[In, Out]`
- Tool schemas are generated via `jsonschema-go` helpers in `pkg/mcp/schema.go`
- `BaseToolBuilder` provides common feature validation logic
- `ToolRegistry` manages all tool builders with concurrent-safe registration
- `ToolBuildConfig` specifies build parameters (ReadOnly, Features, Options)
Expand All @@ -57,6 +60,7 @@ Client Request → MCP Server (pkg/mcp/server.go)
4. **Tool Registration** (`pkg/mcp/*_tools.go`)
- Each `*_tools.go` file creates a builder, builds tools, and adds them to the server
- Tools are conditionally registered based on `--features` flag
- Registration uses `tool.Register(server)` to install typed handlers on the go-sdk server
- Feature constants defined in `pkg/mcp/features.go`

5. **PFTools - Functions as Tools** (`pkg/mcp/pftools/`)
Expand All @@ -69,8 +73,9 @@ Client Request → MCP Server (pkg/mcp/server.go)
- `pulsar_session_manager.go` - LRU session cache with TTL cleanup for multi-session mode

7. **Transport Layer** (`pkg/cmd/mcp/`)
- `sse.go` - SSE transport with health endpoints (`/healthz`, `/readyz`) and auth middleware
- `server.go` - Stdio transport and common server initialization
- `stdio.go` - Stdio transport via `mcp.StdioTransport` (optional `mcp.LoggingTransport`)
- `sse.go` - SSE transport via `mcp.SSEServerTransport` with message endpoint, health endpoints, and auth middleware
- `server.go` - Common server initialization and tool registration

### Key Design Patterns

Expand Down Expand Up @@ -101,24 +106,48 @@ Client Request → MCP Server (pkg/mcp/server.go)
}
}

func (b *MyToolBuilder) BuildTools(ctx context.Context, config builders.ToolBuildConfig) ([]server.ServerTool, error) {
func (b *MyToolBuilder) BuildTools(ctx context.Context, config builders.ToolBuildConfig) ([]builders.ToolDefinition, error) {
if !b.HasAnyRequiredFeature(config.Features) {
return nil, nil
}
// Build and return tools
if err := b.Validate(config); err != nil {
return nil, err
}

inputSchema, err := mcp.InputSchema[MyInput]()
if err != nil {
return nil, err
}

tool := &sdk.Tool{
Name: "my_tool",
Description: "Tool description",
InputSchema: inputSchema,
}

handler := func(ctx context.Context, _ *sdk.CallToolRequest, input MyInput) (*sdk.CallToolResult, MyOutput, error) {
// Handler logic here
return &sdk.CallToolResult{
Content: []sdk.Content{&sdk.TextContent{Text: "ok"}},
}, MyOutput{}, nil
}

return []builders.ToolDefinition{
builders.ServerTool[MyInput, MyOutput]{Tool: tool, Handler: handler},
}, nil
}
```

2. **Add Feature Constant** in `pkg/mcp/features.go` if needed

3. **Create Registration File** `pkg/mcp/my_tools.go`:
```go
func AddMyTools(s *server.MCPServer, readOnly bool, features []string) {
func AddMyTools(s *sdk.Server, readOnly bool, features []string) {
builder := kafkabuilders.NewMyToolBuilder()
config := builders.ToolBuildConfig{ReadOnly: readOnly, Features: features}
tools, _ := builder.BuildTools(context.Background(), config)
for _, tool := range tools {
s.AddTool(tool.Tool, tool.Handler)
tool.Register(s)
}
}
```
Expand All @@ -127,7 +156,10 @@ Client Request → MCP Server (pkg/mcp/server.go)
```go
session := mcp.GetKafkaSession(ctx) // or GetPulsarSession
if session == nil {
return mcp.NewToolResultError("session not found"), nil
return &sdk.CallToolResult{
IsError: true,
Content: []sdk.Content{&sdk.TextContent{Text: "session not found"}},
}, nil, nil
}
admin, err := session.GetAdminClient()
```
Expand Down Expand Up @@ -242,6 +274,6 @@ The project includes generated SDK packages:
## Error Handling

- Wrap errors: `fmt.Errorf("failed to X: %w", err)`
- Return tool errors: `mcp.NewToolResultError("message")`
- Return tool errors by setting `IsError: true` on `sdk.CallToolResult`
- Check session nil before operations
- For PFTools, use circuit breaker to handle repeated failures
Loading
Loading