Skip to content

Commit 9e28406

Browse files
author
nigel brown
committed
Infrastructure improvements and bugfixes for vMCP
- Add OpenTelemetry tracing to capability aggregation - Add singleflight deduplication for discovery requests - Add health checker self-check prevention - Add HTTP client timeout fixes - Improve E2E test reliability - Various build and infrastructure improvements
1 parent 53d6023 commit 9e28406

14 files changed

Lines changed: 991 additions & 133 deletions

File tree

.gitignore

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,11 @@ cmd/thv-operator/.task/checksum/crdref-gen
4242
# Test coverage
4343
coverage*
4444

45-
crd-helm-wrapper
45+
crd-helm-wrapper
46+
cmd/vmcp/__debug_bin*
47+
48+
# Demo files
49+
examples/operator/virtual-mcps/vmcp_optimizer.yaml
50+
scripts/k8s_vmcp_optimizer_demo.sh
51+
examples/ingress/mcp-servers-ingress.yaml
52+
/vmcp

.golangci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ linters:
139139
- third_party$
140140
- builtin$
141141
- examples$
142+
- scripts$
142143
formatters:
143144
enable:
144145
- gci
@@ -155,3 +156,4 @@ formatters:
155156
- third_party$
156157
- builtin$
157158
- examples$
159+
- scripts$

codecov.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ coverage:
1313
- "**/mocks/**/*"
1414
- "**/mock_*.go"
1515
- "**/zz_generated.deepcopy.go"
16+
- "**/*_test.go"
17+
- "**/*_test_coverage.go"
1618
status:
1719
project:
1820
default:

deploy/charts/operator-crds/Chart.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ apiVersion: v2
22
name: toolhive-operator-crds
33
description: A Helm chart for installing the ToolHive Operator CRDs into Kubernetes.
44
type: application
5-
version: 0.0.103
5+
version: 0.0.102
66
appVersion: "0.0.1"

pkg/runner/config_builder_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,8 +1076,8 @@ func TestRunConfigBuilder_WithRegistryProxyPort(t *testing.T) {
10761076
ProxyPort: 8976,
10771077
TargetPort: 8976,
10781078
},
1079-
cliProxyPort: 9000,
1080-
expectedProxyPort: 9000,
1079+
cliProxyPort: 9999,
1080+
expectedProxyPort: 9999,
10811081
},
10821082
{
10831083
name: "random port when neither CLI nor registry specified",

pkg/vmcp/aggregator/default_aggregator.go

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import (
88
"fmt"
99
"sync"
1010

11+
"go.opentelemetry.io/otel"
12+
"go.opentelemetry.io/otel/attribute"
13+
"go.opentelemetry.io/otel/codes"
14+
"go.opentelemetry.io/otel/trace"
1115
"golang.org/x/sync/errgroup"
1216

1317
"github.com/stacklok/toolhive/pkg/logger"
@@ -21,6 +25,7 @@ type defaultAggregator struct {
2125
backendClient vmcp.BackendClient
2226
conflictResolver ConflictResolver
2327
toolConfigMap map[string]*config.WorkloadToolConfig // Maps backend ID to tool config
28+
tracer trace.Tracer
2429
}
2530

2631
// NewDefaultAggregator creates a new default aggregator implementation.
@@ -43,12 +48,20 @@ func NewDefaultAggregator(
4348
backendClient: backendClient,
4449
conflictResolver: conflictResolver,
4550
toolConfigMap: toolConfigMap,
51+
tracer: otel.Tracer("github.com/stacklok/toolhive/pkg/vmcp/aggregator"),
4652
}
4753
}
4854

4955
// QueryCapabilities queries a single backend for its MCP capabilities.
5056
// Returns the raw capabilities (tools, resources, prompts) from the backend.
5157
func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.Backend) (*BackendCapabilities, error) {
58+
ctx, span := a.tracer.Start(ctx, "aggregator.QueryCapabilities",
59+
trace.WithAttributes(
60+
attribute.String("backend.id", backend.ID),
61+
),
62+
)
63+
defer span.End()
64+
5265
logger.Debugf("Querying capabilities from backend %s", backend.ID)
5366

5467
// Create a BackendTarget from the Backend
@@ -58,6 +71,8 @@ func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.
5871
// Query capabilities using the backend client
5972
capabilities, err := a.backendClient.ListCapabilities(ctx, target)
6073
if err != nil {
74+
span.RecordError(err)
75+
span.SetStatus(codes.Error, err.Error())
6176
return nil, fmt.Errorf("%w: %s: %w", ErrBackendQueryFailed, backend.ID, err)
6277
}
6378

@@ -74,6 +89,12 @@ func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.
7489
SupportsSampling: capabilities.SupportsSampling,
7590
}
7691

92+
span.SetAttributes(
93+
attribute.Int("tools.count", len(result.Tools)),
94+
attribute.Int("resources.count", len(result.Resources)),
95+
attribute.Int("prompts.count", len(result.Prompts)),
96+
)
97+
7798
logger.Debugf("Backend %s: %d tools (after filtering/overrides), %d resources, %d prompts",
7899
backend.ID, len(result.Tools), len(result.Resources), len(result.Prompts))
79100

@@ -86,6 +107,13 @@ func (a *defaultAggregator) QueryAllCapabilities(
86107
ctx context.Context,
87108
backends []vmcp.Backend,
88109
) (map[string]*BackendCapabilities, error) {
110+
ctx, span := a.tracer.Start(ctx, "aggregator.QueryAllCapabilities",
111+
trace.WithAttributes(
112+
attribute.Int("backends.count", len(backends)),
113+
),
114+
)
115+
defer span.End()
116+
89117
logger.Infof("Querying capabilities from %d backends", len(backends))
90118

91119
// Use errgroup for parallel queries with context cancellation
@@ -118,13 +146,22 @@ func (a *defaultAggregator) QueryAllCapabilities(
118146

119147
// Wait for all queries to complete
120148
if err := g.Wait(); err != nil {
149+
span.RecordError(err)
150+
span.SetStatus(codes.Error, err.Error())
121151
return nil, fmt.Errorf("capability queries failed: %w", err)
122152
}
123153

124154
if len(capabilities) == 0 {
125-
return nil, fmt.Errorf("no backends returned capabilities")
155+
err := fmt.Errorf("no backends returned capabilities")
156+
span.RecordError(err)
157+
span.SetStatus(codes.Error, err.Error())
158+
return nil, err
126159
}
127160

161+
span.SetAttributes(
162+
attribute.Int("successful.backends", len(capabilities)),
163+
)
164+
128165
logger.Infof("Successfully queried %d/%d backends", len(capabilities), len(backends))
129166
return capabilities, nil
130167
}
@@ -135,6 +172,13 @@ func (a *defaultAggregator) ResolveConflicts(
135172
ctx context.Context,
136173
capabilities map[string]*BackendCapabilities,
137174
) (*ResolvedCapabilities, error) {
175+
ctx, span := a.tracer.Start(ctx, "aggregator.ResolveConflicts",
176+
trace.WithAttributes(
177+
attribute.Int("backends.count", len(capabilities)),
178+
),
179+
)
180+
defer span.End()
181+
138182
logger.Debugf("Resolving conflicts across %d backends", len(capabilities))
139183

140184
// Group tools by backend for conflict resolution
@@ -150,6 +194,8 @@ func (a *defaultAggregator) ResolveConflicts(
150194
if a.conflictResolver != nil {
151195
resolvedTools, err = a.conflictResolver.ResolveToolConflicts(ctx, toolsByBackend)
152196
if err != nil {
197+
span.RecordError(err)
198+
span.SetStatus(codes.Error, err.Error())
153199
return nil, fmt.Errorf("conflict resolution failed: %w", err)
154200
}
155201
} else {
@@ -191,6 +237,12 @@ func (a *defaultAggregator) ResolveConflicts(
191237
resolved.SupportsSampling = resolved.SupportsSampling || caps.SupportsSampling
192238
}
193239

240+
span.SetAttributes(
241+
attribute.Int("resolved.tools", len(resolved.Tools)),
242+
attribute.Int("resolved.resources", len(resolved.Resources)),
243+
attribute.Int("resolved.prompts", len(resolved.Prompts)),
244+
)
245+
194246
logger.Debugf("Resolved %d unique tools, %d resources, %d prompts",
195247
len(resolved.Tools), len(resolved.Resources), len(resolved.Prompts))
196248

@@ -199,11 +251,20 @@ func (a *defaultAggregator) ResolveConflicts(
199251

200252
// MergeCapabilities creates the final unified capability view and routing table.
201253
// Uses the backend registry to populate full BackendTarget information for routing.
202-
func (*defaultAggregator) MergeCapabilities(
254+
func (a *defaultAggregator) MergeCapabilities(
203255
ctx context.Context,
204256
resolved *ResolvedCapabilities,
205257
registry vmcp.BackendRegistry,
206258
) (*AggregatedCapabilities, error) {
259+
ctx, span := a.tracer.Start(ctx, "aggregator.MergeCapabilities",
260+
trace.WithAttributes(
261+
attribute.Int("resolved.tools", len(resolved.Tools)),
262+
attribute.Int("resolved.resources", len(resolved.Resources)),
263+
attribute.Int("resolved.prompts", len(resolved.Prompts)),
264+
),
265+
)
266+
defer span.End()
267+
207268
logger.Debugf("Merging capabilities into final view")
208269

209270
// Create routing table
@@ -304,6 +365,13 @@ func (*defaultAggregator) MergeCapabilities(
304365
},
305366
}
306367

368+
span.SetAttributes(
369+
attribute.Int("aggregated.tools", aggregated.Metadata.ToolCount),
370+
attribute.Int("aggregated.resources", aggregated.Metadata.ResourceCount),
371+
attribute.Int("aggregated.prompts", aggregated.Metadata.PromptCount),
372+
attribute.String("conflict.strategy", string(aggregated.Metadata.ConflictStrategy)),
373+
)
374+
307375
logger.Infof("Merged capabilities: %d tools, %d resources, %d prompts",
308376
aggregated.Metadata.ToolCount, aggregated.Metadata.ResourceCount, aggregated.Metadata.PromptCount)
309377

@@ -316,6 +384,13 @@ func (*defaultAggregator) MergeCapabilities(
316384
// 3. Resolve conflicts
317385
// 4. Merge into final view with full backend information
318386
func (a *defaultAggregator) AggregateCapabilities(ctx context.Context, backends []vmcp.Backend) (*AggregatedCapabilities, error) {
387+
ctx, span := a.tracer.Start(ctx, "aggregator.AggregateCapabilities",
388+
trace.WithAttributes(
389+
attribute.Int("backends.count", len(backends)),
390+
),
391+
)
392+
defer span.End()
393+
319394
logger.Infof("Starting capability aggregation for %d backends", len(backends))
320395

321396
// Step 1: Create registry from discovered backends
@@ -325,24 +400,38 @@ func (a *defaultAggregator) AggregateCapabilities(ctx context.Context, backends
325400
// Step 2: Query all backends
326401
capabilities, err := a.QueryAllCapabilities(ctx, backends)
327402
if err != nil {
403+
span.RecordError(err)
404+
span.SetStatus(codes.Error, err.Error())
328405
return nil, fmt.Errorf("failed to query backends: %w", err)
329406
}
330407

331408
// Step 3: Resolve conflicts
332409
resolved, err := a.ResolveConflicts(ctx, capabilities)
333410
if err != nil {
411+
span.RecordError(err)
412+
span.SetStatus(codes.Error, err.Error())
334413
return nil, fmt.Errorf("failed to resolve conflicts: %w", err)
335414
}
336415

337416
// Step 4: Merge into final view with full backend information
338417
aggregated, err := a.MergeCapabilities(ctx, resolved, registry)
339418
if err != nil {
419+
span.RecordError(err)
420+
span.SetStatus(codes.Error, err.Error())
340421
return nil, fmt.Errorf("failed to merge capabilities: %w", err)
341422
}
342423

343424
// Update metadata with backend count
344425
aggregated.Metadata.BackendCount = len(backends)
345426

427+
span.SetAttributes(
428+
attribute.Int("aggregated.backends", aggregated.Metadata.BackendCount),
429+
attribute.Int("aggregated.tools", aggregated.Metadata.ToolCount),
430+
attribute.Int("aggregated.resources", aggregated.Metadata.ResourceCount),
431+
attribute.Int("aggregated.prompts", aggregated.Metadata.PromptCount),
432+
attribute.String("conflict.strategy", string(aggregated.Metadata.ConflictStrategy)),
433+
)
434+
346435
logger.Infof("Capability aggregation complete: %d backends, %d tools, %d resources, %d prompts",
347436
aggregated.Metadata.BackendCount, aggregated.Metadata.ToolCount,
348437
aggregated.Metadata.ResourceCount, aggregated.Metadata.PromptCount)

0 commit comments

Comments
 (0)