Skip to content

Commit ebf0f55

Browse files
author
Nigel Brown
authored
Infrastructure improvements and bugfixes for vMCP (#3439)
* Infrastructure improvements and bugfixes for vMCP - Add OpenTelemetry tracing to capability aggregation - Add singleflight deduplication for discovery requests - Add health checker self-check prevention - Add HTTP client timeout fixes - Improve E2E test reliability - Various build and infrastructure improvements * fix: Update CallTool and GetPrompt signatures to match BackendClient interface - Add conversion import for meta field handling - Update CallTool to accept meta parameter and return *vmcp.ToolCallResult - Update GetPrompt to return *vmcp.PromptGetResult - Add convertContent helper function * fix: Update ReadResource signature to match BackendClient interface - Update ReadResource to return *vmcp.ResourceReadResult instead of []byte - Extract and include meta field from backend response - Include MIME type in result * fix: Pass selfURL parameter to health.NewMonitor - Construct selfURL from Host, Port, and EndpointPath - Prevents health checker from checking itself * Fix NewHealthChecker calls in checker_test.go to include selfURL parameter * Fix NewMonitor calls in monitor_test.go to include selfURL parameter All 10 calls to NewMonitor in monitor_test.go were missing the new selfURL parameter that was added to the function signature. This was causing compilation failures in CI. * Fix Go import formatting issues (gci linter) Fixed import ordering in: - pkg/vmcp/client/client.go - pkg/vmcp/health/checker_test.go - pkg/vmcp/health/monitor_test.go * Fix Chart.yaml version - restore to 0.0.103 The version was incorrectly downgraded to 0.0.102. Restore it to 0.0.103 to match main branch. * Bump Chart.yaml version to 0.0.104 The chart-testing tool requires version bumps to be higher than the base branch version (0.0.103). * Update README.md version badge to 0.0.104 Match the Chart.yaml version update to satisfy helm-docs pre-commit hook. * Refactor vMCP tracing and remove health checker self-check Move telemetry provider initialization earlier in vmcp serve command to enable distributed tracing in the aggregator. The aggregator now accepts an explicit tracer provider parameter instead of using the global otel tracer, following dependency injection best practices. Improve tracing error handling by using named return values and deferred error recording in aggregator methods, ensuring errors are properly captured in traces. Remove health checker self-check functionality that prevented the server from checking its own health endpoint. This simplifies the implementation and removes unnecessary URL normalization logic. Changes: - Add tracerProvider parameter to aggregator.NewDefaultAggregator - Use noop tracer when provider is nil - Improve span error handling with deferred recording - Remove selfURL parameter from health.NewHealthChecker - Delete pkg/vmcp/health/checker_selfcheck_test.go - Update all tests to match new function signatures - Add debug logging for auth strategy application in client * Add explanatory comment for MCP SDK Meta limitations Restores comment explaining why Meta field preservation is important for ReadResource, in anticipation of future SDK improvements. This addresses PR feedback to maintain context about the SDK's current limitations regarding Meta field handling. * Update test helper comments to clarify pod readiness contract - Clarify that checkPodsReady waits for at least one pod (not all pods) - Add context that helpers are used for single replica deployments - Update comments on WaitForPodsReady and WaitForVirtualMCPServerReady Addresses code review feedback from PR review. * Complete error capture pattern in MergeCapabilities defer - Add named return value (retErr error) to MergeCapabilities - Add error capture in defer statement with span.RecordError and span.SetStatus - Ensures consistent error handling pattern across all aggregator methods This completes the implementation of the error capture pattern suggested in code review for all methods with tracing spans. * Remove singleflight race condition fix Moving the singleflight deduplication logic to a separate PR as it addresses a different race condition from the one fixed in #3450. The fix prevents duplicate capability aggregation when multiple concurrent requests arrive simultaneously at startup. * Add SPDX license headers to manager.go
1 parent 6c2a0c5 commit ebf0f55

14 files changed

Lines changed: 405 additions & 57 deletions

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,5 @@ cmd/thv-operator/.task/checksum/crdref-gen
4242
# Test coverage
4343
coverage*
4444

45-
crd-helm-wrapper
45+
crd-helm-wrapper
46+
cmd/vmcp/__debug_bin*

.golangci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ linters:
139139
- third_party$
140140
- builtin$
141141
- examples$
142+
- scripts$
142143
formatters:
143144
enable:
144145
- gci
@@ -155,3 +156,4 @@ formatters:
155156
- third_party$
156157
- builtin$
157158
- examples$
159+
- scripts$

cmd/vmcp/app/commands.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/spf13/cobra"
1414
"github.com/spf13/viper"
15+
"go.opentelemetry.io/otel/trace"
1516
"k8s.io/client-go/rest"
1617

1718
"github.com/stacklok/toolhive/pkg/audit"
@@ -310,8 +311,27 @@ func runServe(cmd *cobra.Command, _ []string) error {
310311
return fmt.Errorf("failed to create conflict resolver: %w", err)
311312
}
312313

313-
// Create aggregator
314-
agg := aggregator.NewDefaultAggregator(backendClient, conflictResolver, cfg.Aggregation.Tools)
314+
// If telemetry is configured, create the provider early so aggregator can use it
315+
var telemetryProvider *telemetry.Provider
316+
if cfg.Telemetry != nil {
317+
telemetryProvider, err = telemetry.NewProvider(ctx, *cfg.Telemetry)
318+
if err != nil {
319+
return fmt.Errorf("failed to create telemetry provider: %w", err)
320+
}
321+
defer func() {
322+
err := telemetryProvider.Shutdown(ctx)
323+
if err != nil {
324+
logger.Errorf("failed to shutdown telemetry provider: %v", err)
325+
}
326+
}()
327+
}
328+
329+
// Create aggregator with tracer provider (nil if telemetry not configured)
330+
var tracerProvider trace.TracerProvider
331+
if telemetryProvider != nil {
332+
tracerProvider = telemetryProvider.TracerProvider()
333+
}
334+
agg := aggregator.NewDefaultAggregator(backendClient, conflictResolver, cfg.Aggregation.Tools, tracerProvider)
315335

316336
// Use DynamicRegistry for version-based cache invalidation
317337
// Works in both standalone (CLI with YAML config) and Kubernetes (operator-deployed) modes
@@ -381,21 +401,8 @@ func runServe(cmd *cobra.Command, _ []string) error {
381401
host, _ := cmd.Flags().GetString("host")
382402
port, _ := cmd.Flags().GetInt("port")
383403

384-
// If telemetry is configured, create the provider.
385-
var telemetryProvider *telemetry.Provider
386-
if cfg.Telemetry != nil {
387-
var err error
388-
telemetryProvider, err = telemetry.NewProvider(ctx, *cfg.Telemetry)
389-
if err != nil {
390-
return fmt.Errorf("failed to create telemetry provider: %w", err)
391-
}
392-
defer func() {
393-
err := telemetryProvider.Shutdown(ctx)
394-
if err != nil {
395-
logger.Errorf("failed to shutdown telemetry provider: %v", err)
396-
}
397-
}()
398-
}
404+
// Note: telemetryProvider was already created earlier (before aggregator creation)
405+
// to enable tracing in the aggregator
399406

400407
// Configure health monitoring if enabled
401408
var healthMonitorConfig *health.MonitorConfig

codecov.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ coverage:
1313
- "**/mocks/**/*"
1414
- "**/mock_*.go"
1515
- "**/zz_generated.deepcopy.go"
16+
- "**/*_test.go"
17+
- "**/*_test_coverage.go"
1618
status:
1719
project:
1820
default:

pkg/runner/config_builder_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,8 +1079,8 @@ func TestRunConfigBuilder_WithRegistryProxyPort(t *testing.T) {
10791079
ProxyPort: testPort,
10801080
TargetPort: testPort,
10811081
},
1082-
cliProxyPort: 9000,
1083-
expectedProxyPort: 9000,
1082+
cliProxyPort: 9999,
1083+
expectedProxyPort: 9999,
10841084
},
10851085
{
10861086
name: "random port when neither CLI nor registry specified",

pkg/vmcp/aggregator/default_aggregator.go

Lines changed: 123 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import (
88
"fmt"
99
"sync"
1010

11+
"go.opentelemetry.io/otel/attribute"
12+
"go.opentelemetry.io/otel/codes"
13+
"go.opentelemetry.io/otel/trace"
14+
"go.opentelemetry.io/otel/trace/noop"
1115
"golang.org/x/sync/errgroup"
1216

1317
"github.com/stacklok/toolhive/pkg/logger"
@@ -21,15 +25,18 @@ type defaultAggregator struct {
2125
backendClient vmcp.BackendClient
2226
conflictResolver ConflictResolver
2327
toolConfigMap map[string]*config.WorkloadToolConfig // Maps backend ID to tool config
28+
tracer trace.Tracer
2429
}
2530

2631
// NewDefaultAggregator creates a new default aggregator implementation.
2732
// conflictResolver handles tool name conflicts across backends.
2833
// workloadConfigs specifies per-backend tool filtering and overrides.
34+
// tracerProvider is used to create a tracer for distributed tracing (pass nil for no tracing).
2935
func NewDefaultAggregator(
3036
backendClient vmcp.BackendClient,
3137
conflictResolver ConflictResolver,
3238
workloadConfigs []*config.WorkloadToolConfig,
39+
tracerProvider trace.TracerProvider,
3340
) Aggregator {
3441
// Build tool config map for quick lookup by backend ID
3542
toolConfigMap := make(map[string]*config.WorkloadToolConfig)
@@ -39,16 +46,38 @@ func NewDefaultAggregator(
3946
}
4047
}
4148

49+
// Create tracer from provider (use noop tracer if provider is nil)
50+
var tracer trace.Tracer
51+
if tracerProvider != nil {
52+
tracer = tracerProvider.Tracer("github.com/stacklok/toolhive/pkg/vmcp/aggregator")
53+
} else {
54+
tracer = noop.NewTracerProvider().Tracer("github.com/stacklok/toolhive/pkg/vmcp/aggregator")
55+
}
56+
4257
return &defaultAggregator{
4358
backendClient: backendClient,
4459
conflictResolver: conflictResolver,
4560
toolConfigMap: toolConfigMap,
61+
tracer: tracer,
4662
}
4763
}
4864

4965
// QueryCapabilities queries a single backend for its MCP capabilities.
5066
// Returns the raw capabilities (tools, resources, prompts) from the backend.
51-
func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.Backend) (*BackendCapabilities, error) {
67+
func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.Backend) (_ *BackendCapabilities, retErr error) {
68+
ctx, span := a.tracer.Start(ctx, "aggregator.QueryCapabilities",
69+
trace.WithAttributes(
70+
attribute.String("backend.id", backend.ID),
71+
),
72+
)
73+
defer func() {
74+
if retErr != nil {
75+
span.RecordError(retErr)
76+
span.SetStatus(codes.Error, retErr.Error())
77+
}
78+
span.End()
79+
}()
80+
5281
logger.Debugf("Querying capabilities from backend %s", backend.ID)
5382

5483
// Create a BackendTarget from the Backend
@@ -74,6 +103,12 @@ func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.
74103
SupportsSampling: capabilities.SupportsSampling,
75104
}
76105

106+
span.SetAttributes(
107+
attribute.Int("tools.count", len(result.Tools)),
108+
attribute.Int("resources.count", len(result.Resources)),
109+
attribute.Int("prompts.count", len(result.Prompts)),
110+
)
111+
77112
logger.Debugf("Backend %s: %d tools (after filtering/overrides), %d resources, %d prompts",
78113
backend.ID, len(result.Tools), len(result.Resources), len(result.Prompts))
79114

@@ -85,7 +120,20 @@ func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.
85120
func (a *defaultAggregator) QueryAllCapabilities(
86121
ctx context.Context,
87122
backends []vmcp.Backend,
88-
) (map[string]*BackendCapabilities, error) {
123+
) (_ map[string]*BackendCapabilities, retErr error) {
124+
ctx, span := a.tracer.Start(ctx, "aggregator.QueryAllCapabilities",
125+
trace.WithAttributes(
126+
attribute.Int("backends.count", len(backends)),
127+
),
128+
)
129+
defer func() {
130+
if retErr != nil {
131+
span.RecordError(retErr)
132+
span.SetStatus(codes.Error, retErr.Error())
133+
}
134+
span.End()
135+
}()
136+
89137
logger.Infof("Querying capabilities from %d backends", len(backends))
90138

91139
// Use errgroup for parallel queries with context cancellation
@@ -125,6 +173,10 @@ func (a *defaultAggregator) QueryAllCapabilities(
125173
return nil, fmt.Errorf("no backends returned capabilities")
126174
}
127175

176+
span.SetAttributes(
177+
attribute.Int("successful.backends", len(capabilities)),
178+
)
179+
128180
logger.Infof("Successfully queried %d/%d backends", len(capabilities), len(backends))
129181
return capabilities, nil
130182
}
@@ -134,7 +186,20 @@ func (a *defaultAggregator) QueryAllCapabilities(
134186
func (a *defaultAggregator) ResolveConflicts(
135187
ctx context.Context,
136188
capabilities map[string]*BackendCapabilities,
137-
) (*ResolvedCapabilities, error) {
189+
) (_ *ResolvedCapabilities, retErr error) {
190+
ctx, span := a.tracer.Start(ctx, "aggregator.ResolveConflicts",
191+
trace.WithAttributes(
192+
attribute.Int("backends.count", len(capabilities)),
193+
),
194+
)
195+
defer func() {
196+
if retErr != nil {
197+
span.RecordError(retErr)
198+
span.SetStatus(codes.Error, retErr.Error())
199+
}
200+
span.End()
201+
}()
202+
138203
logger.Debugf("Resolving conflicts across %d backends", len(capabilities))
139204

140205
// Group tools by backend for conflict resolution
@@ -191,6 +256,12 @@ func (a *defaultAggregator) ResolveConflicts(
191256
resolved.SupportsSampling = resolved.SupportsSampling || caps.SupportsSampling
192257
}
193258

259+
span.SetAttributes(
260+
attribute.Int("resolved.tools", len(resolved.Tools)),
261+
attribute.Int("resolved.resources", len(resolved.Resources)),
262+
attribute.Int("resolved.prompts", len(resolved.Prompts)),
263+
)
264+
194265
logger.Debugf("Resolved %d unique tools, %d resources, %d prompts",
195266
len(resolved.Tools), len(resolved.Resources), len(resolved.Prompts))
196267

@@ -199,11 +270,26 @@ func (a *defaultAggregator) ResolveConflicts(
199270

200271
// MergeCapabilities creates the final unified capability view and routing table.
201272
// Uses the backend registry to populate full BackendTarget information for routing.
202-
func (*defaultAggregator) MergeCapabilities(
273+
func (a *defaultAggregator) MergeCapabilities(
203274
ctx context.Context,
204275
resolved *ResolvedCapabilities,
205276
registry vmcp.BackendRegistry,
206-
) (*AggregatedCapabilities, error) {
277+
) (_ *AggregatedCapabilities, retErr error) {
278+
ctx, span := a.tracer.Start(ctx, "aggregator.MergeCapabilities",
279+
trace.WithAttributes(
280+
attribute.Int("resolved.tools", len(resolved.Tools)),
281+
attribute.Int("resolved.resources", len(resolved.Resources)),
282+
attribute.Int("resolved.prompts", len(resolved.Prompts)),
283+
),
284+
)
285+
defer func() {
286+
if retErr != nil {
287+
span.RecordError(retErr)
288+
span.SetStatus(codes.Error, retErr.Error())
289+
}
290+
span.End()
291+
}()
292+
207293
logger.Debugf("Merging capabilities into final view")
208294

209295
// Create routing table
@@ -304,6 +390,13 @@ func (*defaultAggregator) MergeCapabilities(
304390
},
305391
}
306392

393+
span.SetAttributes(
394+
attribute.Int("aggregated.tools", aggregated.Metadata.ToolCount),
395+
attribute.Int("aggregated.resources", aggregated.Metadata.ResourceCount),
396+
attribute.Int("aggregated.prompts", aggregated.Metadata.PromptCount),
397+
attribute.String("conflict.strategy", string(aggregated.Metadata.ConflictStrategy)),
398+
)
399+
307400
logger.Infof("Merged capabilities: %d tools, %d resources, %d prompts",
308401
aggregated.Metadata.ToolCount, aggregated.Metadata.ResourceCount, aggregated.Metadata.PromptCount)
309402

@@ -315,7 +408,23 @@ func (*defaultAggregator) MergeCapabilities(
315408
// 2. Query all backends
316409
// 3. Resolve conflicts
317410
// 4. Merge into final view with full backend information
318-
func (a *defaultAggregator) AggregateCapabilities(ctx context.Context, backends []vmcp.Backend) (*AggregatedCapabilities, error) {
411+
func (a *defaultAggregator) AggregateCapabilities(
412+
ctx context.Context,
413+
backends []vmcp.Backend,
414+
) (_ *AggregatedCapabilities, retErr error) {
415+
ctx, span := a.tracer.Start(ctx, "aggregator.AggregateCapabilities",
416+
trace.WithAttributes(
417+
attribute.Int("backends.count", len(backends)),
418+
),
419+
)
420+
defer func() {
421+
if retErr != nil {
422+
span.RecordError(retErr)
423+
span.SetStatus(codes.Error, retErr.Error())
424+
}
425+
span.End()
426+
}()
427+
319428
logger.Infof("Starting capability aggregation for %d backends", len(backends))
320429

321430
// Step 1: Create registry from discovered backends
@@ -343,6 +452,14 @@ func (a *defaultAggregator) AggregateCapabilities(ctx context.Context, backends
343452
// Update metadata with backend count
344453
aggregated.Metadata.BackendCount = len(backends)
345454

455+
span.SetAttributes(
456+
attribute.Int("aggregated.backends", aggregated.Metadata.BackendCount),
457+
attribute.Int("aggregated.tools", aggregated.Metadata.ToolCount),
458+
attribute.Int("aggregated.resources", aggregated.Metadata.ResourceCount),
459+
attribute.Int("aggregated.prompts", aggregated.Metadata.PromptCount),
460+
attribute.String("conflict.strategy", string(aggregated.Metadata.ConflictStrategy)),
461+
)
462+
346463
logger.Infof("Capability aggregation complete: %d backends, %d tools, %d resources, %d prompts",
347464
aggregated.Metadata.BackendCount, aggregated.Metadata.ToolCount,
348465
aggregated.Metadata.ResourceCount, aggregated.Metadata.PromptCount)

0 commit comments

Comments
 (0)