From 831b79a1cfd698d8de4a4a9e364c063d6e11e8f7 Mon Sep 17 00:00:00 2001 From: Thomas Andal Date: Wed, 6 May 2026 15:00:19 +0200 Subject: [PATCH] feat(vmcp): plumb operational.timeouts.default into backend request timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The CRD field `VirtualMCPServer.spec.config.operational.timeouts.default` exists and is documented as "the default timeout for backend requests" but is dead config in v0.26.1: it's parsed (config.go:524), defaulted (defaults.go:54) and validated (validator.go:367) but never read by the request-path code. The actual ceiling is hardcoded at `pkg/vmcp/session/internal/backend/mcp_session.go:35` (`defaultBackendRequestTimeout = 30 * time.Second`) and applied on lines 298-304 to both the streamable-HTTP `http.Client.Timeout` and the mark3labs SDK's `WithHTTPTimeout`. Tool calls that take >30s upstream — long ClickHouse aggregations, deep Perplexity research, heavy GitHub searches — surface as 502 from any nginx in front of vmcp because vmcp closes the backend HTTP client before the response arrives. Plumb the existing config through: - `NewHTTPConnector` and `createMCPClient` accept a `requestTimeout time.Duration`; 0 falls back to the existing 30s constant so callers that pass nothing keep the old behavior. - New `session.WithBackendRequestTimeout` factory option mirrors the existing `WithBackendInitTimeout`. - `NewSessionFactory` pre-applies opts to a sentinel struct so the connector can be constructed with the configured timeout (option funcs are pure setters, so double-application is idempotent). - `cmd/vmcp/cli/serve.go` reads `cfg.Operational.Timeouts.Default` and passes it to `createSessionFactory`, which forwards it via the new option. Nil-safe — falls back to 30s when `Operational` or `Timeouts` is unset, preserving upstream defaults. SSE backends are unaffected: their stream lifetime is unbounded by design and `http.Client.Timeout` is intentionally omitted on that code path. All existing tests pass; updated four call sites in mcp_session_test.go and serve_test.go to pass 0 for the new timeout arg (preserves prior behavior). --- pkg/vmcp/cli/serve.go | 10 ++++++ pkg/vmcp/cli/serve_test.go | 10 +++--- pkg/vmcp/server/server.go | 18 ++++++++-- pkg/vmcp/session/factory.go | 34 +++++++++++++++---- .../session/internal/backend/mcp_session.go | 26 ++++++++++---- .../internal/backend/mcp_session_test.go | 2 +- 6 files changed, 79 insertions(+), 21 deletions(-) diff --git a/pkg/vmcp/cli/serve.go b/pkg/vmcp/cli/serve.go index ffb501115c..2b6a34d439 100644 --- a/pkg/vmcp/cli/serve.go +++ b/pkg/vmcp/cli/serve.go @@ -329,11 +329,16 @@ func Serve(ctx context.Context, cfg ServeConfig) error { } envReader := &env.OSReader{} + var backendRequestTimeout time.Duration + if vmcpCfg.Operational != nil && vmcpCfg.Operational.Timeouts != nil { + backendRequestTimeout = time.Duration(vmcpCfg.Operational.Timeouts.Default) + } sessionFactory, err := createSessionFactory( envReader.Getenv("VMCP_SESSION_HMAC_SECRET"), runtime.IsKubernetesRuntimeWithEnv(envReader), outgoingRegistry, agg, + backendRequestTimeout, ) if err != nil { return err @@ -386,6 +391,7 @@ func Serve(ctx context.Context, cfg ServeConfig) error { OptimizerConfig: optCfg, SessionFactory: sessionFactory, SessionStorage: vmcpCfg.SessionStorage, + RequestTimeout: backendRequestTimeout, // see createSessionFactory; same value } // Assign Watcher only when backendWatcher is non-nil. A typed nil @@ -640,6 +646,7 @@ func createSessionFactory( isKubernetes bool, outgoingRegistry vmcpauth.OutgoingAuthRegistry, agg aggregator.Aggregator, + backendRequestTimeout time.Duration, ) (vmcpsession.MultiSessionFactory, error) { const minRecommendedSecretLen = 32 @@ -647,6 +654,9 @@ func createSessionFactory( if agg != nil { opts = append(opts, vmcpsession.WithAggregator(agg)) } + if backendRequestTimeout > 0 { + opts = append(opts, vmcpsession.WithBackendRequestTimeout(backendRequestTimeout)) + } if hmacSecret != "" { if secretLen := len(hmacSecret); secretLen < minRecommendedSecretLen { diff --git a/pkg/vmcp/cli/serve_test.go b/pkg/vmcp/cli/serve_test.go index 667b285779..08fdb80e0a 100644 --- a/pkg/vmcp/cli/serve_test.go +++ b/pkg/vmcp/cli/serve_test.go @@ -168,7 +168,7 @@ func newSessionFactoryMocks(t *testing.T) (*clientmocks.MockOutgoingAuthRegistry func TestCreateSessionFactory_WithHMACSecret(t *testing.T) { t.Parallel() registry, agg := newSessionFactoryMocks(t) - factory, err := createSessionFactory("a-sufficiently-long-hmac-secret-value-32b", false, registry, agg) + factory, err := createSessionFactory("a-sufficiently-long-hmac-secret-value-32b", false, registry, agg, 0) require.NoError(t, err) require.NotNil(t, factory) } @@ -176,7 +176,7 @@ func TestCreateSessionFactory_WithHMACSecret(t *testing.T) { func TestCreateSessionFactory_HMACSecretExactly32Bytes(t *testing.T) { t.Parallel() registry, agg := newSessionFactoryMocks(t) - factory, err := createSessionFactory("12345678901234567890123456789012", false, registry, agg) + factory, err := createSessionFactory("12345678901234567890123456789012", false, registry, agg, 0) require.NoError(t, err) require.NotNil(t, factory) } @@ -184,7 +184,7 @@ func TestCreateSessionFactory_HMACSecretExactly32Bytes(t *testing.T) { func TestCreateSessionFactory_ShortHMACSecret(t *testing.T) { t.Parallel() registry, agg := newSessionFactoryMocks(t) - factory, err := createSessionFactory("short", false, registry, agg) + factory, err := createSessionFactory("short", false, registry, agg, 0) require.NoError(t, err) require.NotNil(t, factory) } @@ -192,7 +192,7 @@ func TestCreateSessionFactory_ShortHMACSecret(t *testing.T) { func TestCreateSessionFactory_NoSecretNonKubernetes(t *testing.T) { t.Parallel() registry, agg := newSessionFactoryMocks(t) - factory, err := createSessionFactory("", false, registry, agg) + factory, err := createSessionFactory("", false, registry, agg, 0) require.NoError(t, err) require.NotNil(t, factory) } @@ -200,7 +200,7 @@ func TestCreateSessionFactory_NoSecretNonKubernetes(t *testing.T) { func TestCreateSessionFactory_NoSecretKubernetes(t *testing.T) { t.Parallel() registry, agg := newSessionFactoryMocks(t) - factory, err := createSessionFactory("", true, registry, agg) + factory, err := createSessionFactory("", true, registry, agg, 0) require.Error(t, err) require.ErrorContains(t, err, "an HMAC secret is required when running in Kubernetes") require.Nil(t, factory) diff --git a/pkg/vmcp/server/server.go b/pkg/vmcp/server/server.go index 23c415a551..2e1e1e4b79 100644 --- a/pkg/vmcp/server/server.go +++ b/pkg/vmcp/server/server.go @@ -181,6 +181,14 @@ type Config struct { // session persistence; the Redis password is read from the // THV_SESSION_REDIS_PASSWORD environment variable. SessionStorage *vmcpconfig.SessionStorageConfig + + // RequestTimeout overrides the inbound http.Server's ReadTimeout and + // WriteTimeout (defaults: 30s). When 0, the defaults are used. + // Applies to non-SSE routes; SSE GETs clear their per-request write + // deadline via transportmiddleware.WriteTimeout. Plumb the same value + // you pass to session.WithBackendRequestTimeout so a tool call that + // takes longer than 30s upstream can still complete its inbound POST. + RequestTimeout time.Duration } // Server is the Virtual MCP Server that aggregates multiple backends. @@ -689,12 +697,18 @@ func (s *Server) Start(ctx context.Context) error { // Create HTTP server addr := fmt.Sprintf("%s:%d", s.config.Host, s.config.Port) + readTimeout := defaultReadTimeout + writeTimeout := defaultWriteTimeout + if s.config.RequestTimeout > 0 { + readTimeout = s.config.RequestTimeout + writeTimeout = s.config.RequestTimeout + } s.httpServer = &http.Server{ Addr: addr, Handler: handler, ReadHeaderTimeout: defaultReadHeaderTimeout, - ReadTimeout: defaultReadTimeout, - WriteTimeout: defaultWriteTimeout, + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, IdleTimeout: defaultIdleTimeout, MaxHeaderBytes: defaultMaxHeaderBytes, } diff --git a/pkg/vmcp/session/factory.go b/pkg/vmcp/session/factory.go index a5e115eacd..751483e810 100644 --- a/pkg/vmcp/session/factory.go +++ b/pkg/vmcp/session/factory.go @@ -128,11 +128,12 @@ type backendConnector func( // defaultMultiSessionFactory is the production MultiSessionFactory implementation. type defaultMultiSessionFactory struct { - connector backendConnector - maxConcurrency int - backendInitTimeout time.Duration - hmacSecret []byte // Server-managed secret for HMAC-SHA256 token hashing - aggregator aggregator.Aggregator // Optional: applies tool transforms (overrides, conflict resolution, filter) + connector backendConnector + maxConcurrency int + backendInitTimeout time.Duration + backendRequestTimeout time.Duration // Per-tool-call timeout; 0 = use upstream default (30s). + hmacSecret []byte // Server-managed secret for HMAC-SHA256 token hashing + aggregator aggregator.Aggregator // Optional: applies tool transforms (overrides, conflict resolution, filter) } // MultiSessionFactoryOption configures a defaultMultiSessionFactory. @@ -158,6 +159,19 @@ func WithBackendInitTimeout(d time.Duration) MultiSessionFactoryOption { } } +// WithBackendRequestTimeout sets the per-tool-call wall-clock deadline for +// streamable-HTTP backend requests. Plumbs through to the http.Client and +// the mark3labs SDK constructed in backend.NewHTTPConnector. Defaults to 30s +// (the upstream default) when not set or set to 0. SSE backends ignore this +// value because their stream lifetime is unbounded by design. +func WithBackendRequestTimeout(d time.Duration) MultiSessionFactoryOption { + return func(f *defaultMultiSessionFactory) { + if d > 0 { + f.backendRequestTimeout = d + } + } +} + // WithHMACSecret sets the server-managed secret used for HMAC-SHA256 token hashing. // The secret should be 32+ bytes and loaded from secure configuration (e.g., environment // variable, secret management system). @@ -191,7 +205,15 @@ func WithAggregator(agg aggregator.Aggregator) MultiSessionFactoryOption { // NewSessionFactory creates a MultiSessionFactory that connects to backends // over HTTP using the given outgoing auth registry. func NewSessionFactory(registry vmcpauth.OutgoingAuthRegistry, opts ...MultiSessionFactoryOption) MultiSessionFactory { - return newSessionFactoryWithConnector(backend.NewHTTPConnector(registry), opts...) + // Pre-apply opts to a sentinel to extract config the connector needs at + // construction time. The same opts run a second time inside + // newSessionFactoryWithConnector to populate the real factory; option + // functions are pure setters, so double-application is idempotent. + pre := &defaultMultiSessionFactory{} + for _, opt := range opts { + opt(pre) + } + return newSessionFactoryWithConnector(backend.NewHTTPConnector(registry, pre.backendRequestTimeout), opts...) } // newSessionFactoryWithConnector creates a MultiSessionFactory backed by an diff --git a/pkg/vmcp/session/internal/backend/mcp_session.go b/pkg/vmcp/session/internal/backend/mcp_session.go index 75425e98f6..a130b0d63a 100644 --- a/pkg/vmcp/session/internal/backend/mcp_session.go +++ b/pkg/vmcp/session/internal/backend/mcp_session.go @@ -29,9 +29,12 @@ const ( // see createMCPClient for the rationale. maxBackendResponseSize = 100 * 1024 * 1024 // 100 MB - // defaultBackendRequestTimeout is the wall-clock deadline for individual - // streamable-HTTP requests. Applied at both the http.Client and SDK layers - // (defense-in-depth). Not used for SSE, whose stream lifetime is unbounded. + // defaultBackendRequestTimeout is the fallback wall-clock deadline for + // individual streamable-HTTP requests when the caller passes 0. Applied at + // both the http.Client and SDK layers (defense-in-depth). Not used for SSE, + // whose stream lifetime is unbounded. Override via NewHTTPConnector's + // requestTimeout argument, which is plumbed from the + // VirtualMCPServer.spec.config.operational.timeouts.default CRD field. defaultBackendRequestTimeout = 30 * time.Second ) @@ -195,7 +198,12 @@ func (c *mcpSession) GetPrompt( // // registry provides the authentication strategy for outgoing backend requests. // Pass a registry configured with the "unauthenticated" strategy to disable auth. -func NewHTTPConnector(registry vmcpauth.OutgoingAuthRegistry) func( +// +// requestTimeout sets the per-tool-call wall-clock deadline applied to the +// streamable-HTTP http.Client and the mark3labs SDK. Pass 0 to use the +// upstream default (defaultBackendRequestTimeout, 30s). SSE backends ignore +// this argument because their stream lifetime is unbounded by design. +func NewHTTPConnector(registry vmcpauth.OutgoingAuthRegistry, requestTimeout time.Duration) func( ctx context.Context, target *vmcp.BackendTarget, identity *auth.Identity, @@ -207,7 +215,7 @@ func NewHTTPConnector(registry vmcpauth.OutgoingAuthRegistry) func( identity *auth.Identity, sessionHint string, ) (Session, *vmcp.CapabilityList, error) { - c, err := createMCPClient(target, identity, registry, sessionHint) + c, err := createMCPClient(target, identity, registry, sessionHint, requestTimeout) if err != nil { return nil, nil, fmt.Errorf("failed to create MCP client for backend %s: %w", target.WorkloadID, err) } @@ -242,7 +250,11 @@ func createMCPClient( identity *auth.Identity, registry vmcpauth.OutgoingAuthRegistry, sessionHint string, + requestTimeout time.Duration, ) (*mcpclient.Client, error) { + if requestTimeout <= 0 { + requestTimeout = defaultBackendRequestTimeout + } // Resolve and validate the auth strategy once at client creation time. strategyName := authtypes.StrategyTypeUnauthenticated if target.AuthConfig != nil { @@ -297,10 +309,10 @@ func createMCPClient( }) httpClient := &http.Client{ Transport: sizeLimited, - Timeout: defaultBackendRequestTimeout, + Timeout: requestTimeout, } streamableOpts := []mcptransport.StreamableHTTPCOption{ - mcptransport.WithHTTPTimeout(defaultBackendRequestTimeout), + mcptransport.WithHTTPTimeout(requestTimeout), mcptransport.WithHTTPBasicClient(httpClient), } if sessionHint != "" { diff --git a/pkg/vmcp/session/internal/backend/mcp_session_test.go b/pkg/vmcp/session/internal/backend/mcp_session_test.go index 7daf5decce..a2d7691853 100644 --- a/pkg/vmcp/session/internal/backend/mcp_session_test.go +++ b/pkg/vmcp/session/internal/backend/mcp_session_test.go @@ -40,7 +40,7 @@ func TestCreateMCPClient_UnsupportedTransport(t *testing.T) { TransportType: transport, } - _, err := createMCPClient(target, nil, newTestRegistry(t), "") + _, err := createMCPClient(target, nil, newTestRegistry(t), "", 0) require.Error(t, err) assert.ErrorIs(t, err, vmcp.ErrUnsupportedTransport, "transport %q should return ErrUnsupportedTransport", transport)