Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,22 @@ includes:
tasks:
docs:
desc: Regenerate the docs
deps: [swagger-install, helm-docs]
deps: [swagger-install, helm-docs-install]
cmds:
- rm -rf docs/cli/*
- go run cmd/help/main.go --dir docs/cli
- swag init -g pkg/api/server.go --v3.1 -o docs/server
- task: helm-docs
- helm-docs --chart-search-root=deploy/charts --template-files=./_templates.gotmpl --template-files=README.md.gotmpl

swagger-install:
desc: Install the swag tool for OpenAPI/Swagger generation
cmds:
- go install github.com/swaggo/swag/v2/cmd/swag@latest

helm-docs:
desc: Generate Helm chart documentation
helm-docs-install:
desc: Install the helm-docs tool to regenerate Helm chart documentation
cmds:
- command -v helm-docs >/dev/null 2>&1 || go install github.com/norwoodj/helm-docs/cmd/helm-docs@latest
- helm-docs --chart-search-root=deploy/charts
- go install github.com/norwoodj/helm-docs/cmd/helm-docs@latest

mock-install:
desc: Install the mockgen tool for mock generation
Expand Down Expand Up @@ -177,6 +176,11 @@ tasks:
desc: Run all tests (unit and e2e)
deps: [test, test-e2e]

test-optimizer:
desc: Run optimizer integration tests with sqlite-vec
cmds:
- ./scripts/test-optimizer-with-sqlite-vec.sh

build:
desc: Build the binary
deps: [gen]
Expand Down Expand Up @@ -220,12 +224,12 @@ tasks:
cmds:
- cmd: mkdir -p bin
platforms: [linux, darwin]
- cmd: go build -ldflags "-s -w -X github.com/stacklok/toolhive/pkg/versions.Version={{.VERSION}} -X github.com/stacklok/toolhive/pkg/versions.Commit={{.COMMIT}} -X github.com/stacklok/toolhive/pkg/versions.BuildDate={{.BUILD_DATE}}" -o bin/vmcp ./cmd/vmcp
- cmd: go build -tags="fts5" -ldflags "-s -w -X github.com/stacklok/toolhive/pkg/versions.Version={{.VERSION}} -X github.com/stacklok/toolhive/pkg/versions.Commit={{.COMMIT}} -X github.com/stacklok/toolhive/pkg/versions.BuildDate={{.BUILD_DATE}}" -o bin/vmcp ./cmd/vmcp
platforms: [linux, darwin]
- cmd: cmd.exe /c mkdir bin
platforms: [windows]
ignore_error: true
- cmd: go build -ldflags "-s -w -X github.com/stacklok/toolhive/pkg/versions.Version={{.VERSION}} -X github.com/stacklok/toolhive/pkg/versions.Commit={{.COMMIT}} -X github.com/stacklok/toolhive/pkg/versions.BuildDate={{.BUILD_DATE}}" -o bin/vmcp.exe ./cmd/vmcp
- cmd: go build -tags="fts5" -ldflags "-s -w -X github.com/stacklok/toolhive/pkg/versions.Version={{.VERSION}} -X github.com/stacklok/toolhive/pkg/versions.Commit={{.COMMIT}} -X github.com/stacklok/toolhive/pkg/versions.BuildDate={{.BUILD_DATE}}" -o bin/vmcp.exe ./cmd/vmcp
platforms: [windows]

install-vmcp:
Expand All @@ -237,7 +241,7 @@ tasks:
sh: git rev-parse --short HEAD || echo "unknown"
BUILD_DATE: '{{dateInZone "2006-01-02T15:04:05Z" (now) "UTC"}}'
cmds:
- go install -ldflags "-s -w -X github.com/stacklok/toolhive/pkg/versions.Version={{.VERSION}} -X github.com/stacklok/toolhive/pkg/versions.Commit={{.COMMIT}} -X github.com/stacklok/toolhive/pkg/versions.BuildDate={{.BUILD_DATE}}" -v ./cmd/vmcp
- go install -tags="fts5" -ldflags "-s -w -X github.com/stacklok/toolhive/pkg/versions.Version={{.VERSION}} -X github.com/stacklok/toolhive/pkg/versions.Commit={{.COMMIT}} -X github.com/stacklok/toolhive/pkg/versions.BuildDate={{.BUILD_DATE}}" -v ./cmd/vmcp

all:
desc: Run linting, tests, and build
Expand Down
26 changes: 20 additions & 6 deletions cmd/thv-operator/controllers/mcpserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1137,12 +1137,13 @@ func (r *MCPServerReconciler) deploymentForMCPServer(
Spec: corev1.PodSpec{
ServiceAccountName: ctrlutil.ProxyRunnerServiceAccountName(m.Name),
Containers: []corev1.Container{{
Image: getToolhiveRunnerImage(),
Name: "toolhive",
Args: args,
Env: env,
VolumeMounts: volumeMounts,
Resources: resources,
Image: getToolhiveRunnerImage(),
Name: "toolhive",
ImagePullPolicy: getImagePullPolicyForToolhiveRunner(),
Args: args,
Env: env,
VolumeMounts: volumeMounts,
Resources: resources,
Ports: []corev1.ContainerPort{{
ContainerPort: m.GetProxyPort(),
Name: "http",
Expand Down Expand Up @@ -1700,6 +1701,19 @@ func getToolhiveRunnerImage() string {
return image
}

// getImagePullPolicyForToolhiveRunner returns the appropriate imagePullPolicy for the toolhive runner container.
// If the image is a local image (starts with "kind.local/" or "localhost/"), use Never.
// Otherwise, use IfNotPresent to allow pulling when needed but avoid unnecessary pulls.
func getImagePullPolicyForToolhiveRunner() corev1.PullPolicy {
image := getToolhiveRunnerImage()
// Check if it's a local image that should use Never
if strings.HasPrefix(image, "kind.local/") || strings.HasPrefix(image, "localhost/") {
return corev1.PullNever
}
// For other images, use IfNotPresent to allow pulling when needed
return corev1.PullIfNotPresent
}

// handleExternalAuthConfig validates and tracks the hash of the referenced MCPExternalAuthConfig.
// It updates the MCPServer status when the external auth configuration changes.
func (r *MCPServerReconciler) handleExternalAuthConfig(ctx context.Context, m *mcpv1alpha1.MCPServer) error {
Expand Down
16 changes: 11 additions & 5 deletions cmd/thv-operator/pkg/vmcpconfig/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,24 @@ func (c *Converter) Convert(
// are handled by kubebuilder annotations in pkg/telemetry/config.go and applied by the API server.
config.Telemetry = spectoconfig.NormalizeTelemetryConfig(vmcp.Spec.Config.Telemetry, vmcp.Name)

// Convert audit config
c.convertAuditConfig(config, vmcp)

// Apply operational defaults (fills missing values)
config.EnsureOperationalDefaults()

return config, nil
}

// convertAuditConfig converts audit configuration from CRD to vmcp config.
func (*Converter) convertAuditConfig(config *vmcpconfig.Config, vmcp *mcpv1alpha1.VirtualMCPServer) {
if vmcp.Spec.Config.Audit != nil && vmcp.Spec.Config.Audit.Enabled {
config.Audit = vmcp.Spec.Config.Audit
}

if config.Audit != nil && config.Audit.Component == "" {
config.Audit.Component = vmcp.Name
}

// Apply operational defaults (fills missing values)
config.EnsureOperationalDefaults()

return config, nil
}

// convertIncomingAuth converts IncomingAuthConfig from CRD to vmcp config.
Expand Down
67 changes: 37 additions & 30 deletions cmd/vmcp/app/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"github.com/stacklok/toolhive/pkg/vmcp/discovery"
"github.com/stacklok/toolhive/pkg/vmcp/health"
"github.com/stacklok/toolhive/pkg/vmcp/k8s"
"github.com/stacklok/toolhive/pkg/vmcp/optimizer"
vmcpoptimizer "github.com/stacklok/toolhive/pkg/vmcp/optimizer"
vmcprouter "github.com/stacklok/toolhive/pkg/vmcp/router"
vmcpserver "github.com/stacklok/toolhive/pkg/vmcp/server"
vmcpstatus "github.com/stacklok/toolhive/pkg/vmcp/status"
Expand Down Expand Up @@ -188,17 +188,6 @@
return "dev"
}

// getStatusReportingInterval extracts the status reporting interval from config.
// Returns 0 if not configured, which will use the default interval.
func getStatusReportingInterval(cfg *config.Config) time.Duration {
if cfg.Operational != nil &&
cfg.Operational.FailureHandling != nil &&
cfg.Operational.FailureHandling.StatusReportingInterval > 0 {
return time.Duration(cfg.Operational.FailureHandling.StatusReportingInterval)
}
return 0
}

// loadAndValidateConfig loads and validates the vMCP configuration file
func loadAndValidateConfig(configPath string) (*config.Config, error) {
logger.Infof("Loading configuration from: %s", configPath)
Expand Down Expand Up @@ -443,24 +432,42 @@
}

serverCfg := &vmcpserver.Config{
Name: cfg.Name,
Version: getVersion(),
GroupRef: cfg.Group,
Host: host,
Port: port,
AuthMiddleware: authMiddleware,
AuthInfoHandler: authInfoHandler,
TelemetryProvider: telemetryProvider,
AuditConfig: cfg.Audit,
HealthMonitorConfig: healthMonitorConfig,
StatusReportingInterval: getStatusReportingInterval(cfg),
Watcher: backendWatcher,
StatusReporter: statusReporter,
}

if cfg.Optimizer != nil {
// TODO: update this with the real optimizer.
serverCfg.OptimizerFactory = optimizer.NewDummyOptimizer
Name: cfg.Name,
Version: getVersion(),
GroupRef: cfg.Group,
Host: host,
Port: port,
AuthMiddleware: authMiddleware,
AuthInfoHandler: authInfoHandler,
TelemetryProvider: telemetryProvider,
AuditConfig: cfg.Audit,
HealthMonitorConfig: healthMonitorConfig,
Watcher: backendWatcher,
StatusReporter: statusReporter,
}

// Configure optimizer if enabled in YAML config
if cfg.Optimizer != nil && cfg.Optimizer.Enabled {

Check failure on line 450 in cmd/vmcp/app/commands.go

View workflow job for this annotation

GitHub Actions / Security Scan / Go Vulnerability Check

cfg.Optimizer.Enabled undefined (type *"github.com/stacklok/toolhive/pkg/vmcp/config".OptimizerConfig has no field or method Enabled)

Check failure on line 450 in cmd/vmcp/app/commands.go

View workflow job for this annotation

GitHub Actions / Go Vulnerability Check

cfg.Optimizer.Enabled undefined (type *"github.com/stacklok/toolhive/pkg/vmcp/config".OptimizerConfig has no field or method Enabled)
logger.Info("🔬 Optimizer enabled via configuration (chromem-go)")
serverCfg.OptimizerFactory = vmcpoptimizer.NewEmbeddingOptimizer

Check failure on line 452 in cmd/vmcp/app/commands.go

View workflow job for this annotation

GitHub Actions / Security Scan / Go Vulnerability Check

undefined: vmcpoptimizer.NewEmbeddingOptimizer

Check failure on line 452 in cmd/vmcp/app/commands.go

View workflow job for this annotation

GitHub Actions / Go Vulnerability Check

undefined: vmcpoptimizer.NewEmbeddingOptimizer
serverCfg.OptimizerConfig = cfg.Optimizer
persistInfo := "in-memory"
if cfg.Optimizer.PersistPath != "" {
persistInfo = cfg.Optimizer.PersistPath
}
// FTS5 is always enabled with configurable semantic/BM25 ratio
ratio := 70 // Default (70%)
if cfg.Optimizer.HybridSearchRatio != nil {
ratio = *cfg.Optimizer.HybridSearchRatio
}
searchMode := fmt.Sprintf("hybrid (%d%% semantic, %d%% BM25)",
ratio,
100-ratio)
logger.Infof("Optimizer configured: backend=%s, dimension=%d, persistence=%s, search=%s",
cfg.Optimizer.EmbeddingBackend,
cfg.Optimizer.EmbeddingDimension,
persistInfo,
searchMode)
}

// Convert composite tool configurations to workflow definitions
Expand Down
9 changes: 8 additions & 1 deletion pkg/telemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,16 @@ func NewProvider(ctx context.Context, config Config) (*Provider, error) {
return nil, err
}

// Apply default for ServiceVersion if not provided
// Documentation states: "When omitted, defaults to the ToolHive version"
serviceVersion := config.ServiceVersion
if serviceVersion == "" {
serviceVersion = versions.GetVersionInfo().Version
}

telemetryOptions := []providers.ProviderOption{
providers.WithServiceName(config.ServiceName),
providers.WithServiceVersion(config.ServiceVersion),
providers.WithServiceVersion(serviceVersion),
providers.WithOTLPEndpoint(config.Endpoint),
providers.WithHeaders(config.Headers),
providers.WithInsecure(config.Insecure),
Expand Down
17 changes: 16 additions & 1 deletion pkg/vmcp/aggregator/default_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func (a *defaultAggregator) QueryCapabilities(ctx context.Context, backend vmcp.
// Query capabilities using the backend client
capabilities, err := a.backendClient.ListCapabilities(ctx, target)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("%w: %s: %w", ErrBackendQueryFailed, backend.ID, err)
}

Expand Down Expand Up @@ -166,11 +168,16 @@ func (a *defaultAggregator) QueryAllCapabilities(

// Wait for all queries to complete
if err := g.Wait(); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("capability queries failed: %w", err)
}

if len(capabilities) == 0 {
return nil, fmt.Errorf("no backends returned capabilities")
err := fmt.Errorf("no backends returned capabilities")
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

span.SetAttributes(
Expand Down Expand Up @@ -215,6 +222,8 @@ func (a *defaultAggregator) ResolveConflicts(
if a.conflictResolver != nil {
resolvedTools, err = a.conflictResolver.ResolveToolConflicts(ctx, toolsByBackend)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("conflict resolution failed: %w", err)
}
} else {
Expand Down Expand Up @@ -434,18 +443,24 @@ func (a *defaultAggregator) AggregateCapabilities(
// Step 2: Query all backends
capabilities, err := a.QueryAllCapabilities(ctx, backends)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("failed to query backends: %w", err)
}

// Step 3: Resolve conflicts
resolved, err := a.ResolveConflicts(ctx, capabilities)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("failed to resolve conflicts: %w", err)
}

// Step 4: Merge into final view with full backend information
aggregated, err := a.MergeCapabilities(ctx, resolved, registry)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("failed to merge capabilities: %w", err)
}

Expand Down
13 changes: 2 additions & 11 deletions pkg/vmcp/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,22 +277,15 @@ func wrapBackendError(err error, backendID string, operation string) error {
vmcp.ErrCancelled, operation, backendID, err)
}

// 2. Type-based detection: Check for io.EOF errors
// These indicate the connection was closed unexpectedly
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return fmt.Errorf("%w: failed to %s for backend %s (connection closed): %v",
vmcp.ErrBackendUnavailable, operation, backendID, err)
}

// 3. Type-based detection: Check for net.Error with Timeout() method
// 2. Type-based detection: Check for net.Error with Timeout() method
// This handles network timeouts from the standard library
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return fmt.Errorf("%w: failed to %s for backend %s (timeout): %v",
vmcp.ErrTimeout, operation, backendID, err)
}

// 4. String-based detection: Fall back to pattern matching for cases where
// 3. String-based detection: Fall back to pattern matching for cases where
// we don't have structured error types (MCP SDK, HTTP libraries with embedded status codes)
// Authentication errors (401, 403, auth failures)
if vmcp.IsAuthenticationError(err) {
Expand Down Expand Up @@ -707,8 +700,6 @@ func (h *httpBackendClient) ReadResource(
// Extract _meta field from backend response
meta := conversion.FromMCPMeta(result.Meta)

// Note: Due to MCP SDK limitations, the SDK's ReadResourceResult may not include Meta.
// This preserves it for future SDK improvements.
return &vmcp.ResourceReadResult{
Contents: data,
MimeType: mimeType,
Expand Down
13 changes: 12 additions & 1 deletion pkg/vmcp/discovery/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,19 @@ func TestMiddleware_CapabilitiesInContext(t *testing.T) {
},
}

// Use Do to capture and verify backends separately, since order may vary
mockMgr.EXPECT().
Discover(gomock.Any(), unorderedBackendsMatcher{backends}).
Discover(gomock.Any(), gomock.Any()).
Do(func(_ context.Context, actualBackends []vmcp.Backend) {
// Verify that we got the expected backends regardless of order
assert.Len(t, actualBackends, 2)
backendIDs := make(map[string]bool)
for _, b := range actualBackends {
backendIDs[b.ID] = true
}
assert.True(t, backendIDs["backend1"], "backend1 should be present")
assert.True(t, backendIDs["backend2"], "backend2 should be present")
}).
Return(expectedCaps, nil)

// Create handler that inspects context in detail
Expand Down
Loading
Loading