Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/vmcp/cli/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,16 @@ func Serve(ctx context.Context, cfg ServeConfig) error {
}

envReader := &env.OSReader{}
var backendRequestTimeout time.Duration
if vmcpCfg.Operational != nil && vmcpCfg.Operational.Timeouts != nil {
backendRequestTimeout = time.Duration(vmcpCfg.Operational.Timeouts.Default)
}
sessionFactory, err := createSessionFactory(
envReader.Getenv("VMCP_SESSION_HMAC_SECRET"),
runtime.IsKubernetesRuntimeWithEnv(envReader),
outgoingRegistry,
agg,
backendRequestTimeout,
)
if err != nil {
return err
Expand Down Expand Up @@ -386,6 +391,7 @@ func Serve(ctx context.Context, cfg ServeConfig) error {
OptimizerConfig: optCfg,
SessionFactory: sessionFactory,
SessionStorage: vmcpCfg.SessionStorage,
RequestTimeout: backendRequestTimeout, // see createSessionFactory; same value
}

// Assign Watcher only when backendWatcher is non-nil. A typed nil
Expand Down Expand Up @@ -640,13 +646,17 @@ func createSessionFactory(
isKubernetes bool,
outgoingRegistry vmcpauth.OutgoingAuthRegistry,
agg aggregator.Aggregator,
backendRequestTimeout time.Duration,
) (vmcpsession.MultiSessionFactory, error) {
const minRecommendedSecretLen = 32

opts := []vmcpsession.MultiSessionFactoryOption{}
if agg != nil {
opts = append(opts, vmcpsession.WithAggregator(agg))
}
if backendRequestTimeout > 0 {
opts = append(opts, vmcpsession.WithBackendRequestTimeout(backendRequestTimeout))
}

if hmacSecret != "" {
if secretLen := len(hmacSecret); secretLen < minRecommendedSecretLen {
Expand Down
10 changes: 5 additions & 5 deletions pkg/vmcp/cli/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,39 +168,39 @@ func newSessionFactoryMocks(t *testing.T) (*clientmocks.MockOutgoingAuthRegistry
func TestCreateSessionFactory_WithHMACSecret(t *testing.T) {
t.Parallel()
registry, agg := newSessionFactoryMocks(t)
factory, err := createSessionFactory("a-sufficiently-long-hmac-secret-value-32b", false, registry, agg)
factory, err := createSessionFactory("a-sufficiently-long-hmac-secret-value-32b", false, registry, agg, 0)
require.NoError(t, err)
require.NotNil(t, factory)
}

func TestCreateSessionFactory_HMACSecretExactly32Bytes(t *testing.T) {
t.Parallel()
registry, agg := newSessionFactoryMocks(t)
factory, err := createSessionFactory("12345678901234567890123456789012", false, registry, agg)
factory, err := createSessionFactory("12345678901234567890123456789012", false, registry, agg, 0)
require.NoError(t, err)
require.NotNil(t, factory)
}

func TestCreateSessionFactory_ShortHMACSecret(t *testing.T) {
t.Parallel()
registry, agg := newSessionFactoryMocks(t)
factory, err := createSessionFactory("short", false, registry, agg)
factory, err := createSessionFactory("short", false, registry, agg, 0)
require.NoError(t, err)
require.NotNil(t, factory)
}

func TestCreateSessionFactory_NoSecretNonKubernetes(t *testing.T) {
t.Parallel()
registry, agg := newSessionFactoryMocks(t)
factory, err := createSessionFactory("", false, registry, agg)
factory, err := createSessionFactory("", false, registry, agg, 0)
require.NoError(t, err)
require.NotNil(t, factory)
}

func TestCreateSessionFactory_NoSecretKubernetes(t *testing.T) {
t.Parallel()
registry, agg := newSessionFactoryMocks(t)
factory, err := createSessionFactory("", true, registry, agg)
factory, err := createSessionFactory("", true, registry, agg, 0)
require.Error(t, err)
require.ErrorContains(t, err, "an HMAC secret is required when running in Kubernetes")
require.Nil(t, factory)
Expand Down
18 changes: 16 additions & 2 deletions pkg/vmcp/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ type Config struct {
// session persistence; the Redis password is read from the
// THV_SESSION_REDIS_PASSWORD environment variable.
SessionStorage *vmcpconfig.SessionStorageConfig

// RequestTimeout overrides the inbound http.Server's ReadTimeout and
// WriteTimeout (defaults: 30s). When 0, the defaults are used.
// Applies to non-SSE routes; SSE GETs clear their per-request write
// deadline via transportmiddleware.WriteTimeout. Plumb the same value
// you pass to session.WithBackendRequestTimeout so a tool call that
// takes longer than 30s upstream can still complete its inbound POST.
RequestTimeout time.Duration
}

// Server is the Virtual MCP Server that aggregates multiple backends.
Expand Down Expand Up @@ -689,12 +697,18 @@ func (s *Server) Start(ctx context.Context) error {

// Create HTTP server
addr := fmt.Sprintf("%s:%d", s.config.Host, s.config.Port)
readTimeout := defaultReadTimeout
writeTimeout := defaultWriteTimeout
if s.config.RequestTimeout > 0 {
readTimeout = s.config.RequestTimeout
writeTimeout = s.config.RequestTimeout
}
s.httpServer = &http.Server{
Addr: addr,
Handler: handler,
ReadHeaderTimeout: defaultReadHeaderTimeout,
ReadTimeout: defaultReadTimeout,
WriteTimeout: defaultWriteTimeout,
ReadTimeout: readTimeout,
WriteTimeout: writeTimeout,
IdleTimeout: defaultIdleTimeout,
MaxHeaderBytes: defaultMaxHeaderBytes,
}
Expand Down
34 changes: 28 additions & 6 deletions pkg/vmcp/session/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,12 @@ type backendConnector func(

// defaultMultiSessionFactory is the production MultiSessionFactory implementation.
type defaultMultiSessionFactory struct {
connector backendConnector
maxConcurrency int
backendInitTimeout time.Duration
hmacSecret []byte // Server-managed secret for HMAC-SHA256 token hashing
aggregator aggregator.Aggregator // Optional: applies tool transforms (overrides, conflict resolution, filter)
connector backendConnector
maxConcurrency int
backendInitTimeout time.Duration
backendRequestTimeout time.Duration // Per-tool-call timeout; 0 = use upstream default (30s).
hmacSecret []byte // Server-managed secret for HMAC-SHA256 token hashing
aggregator aggregator.Aggregator // Optional: applies tool transforms (overrides, conflict resolution, filter)
}

// MultiSessionFactoryOption configures a defaultMultiSessionFactory.
Expand All @@ -158,6 +159,19 @@ func WithBackendInitTimeout(d time.Duration) MultiSessionFactoryOption {
}
}

// WithBackendRequestTimeout sets the per-tool-call wall-clock deadline for
// streamable-HTTP backend requests. Plumbs through to the http.Client and
// the mark3labs SDK constructed in backend.NewHTTPConnector. Defaults to 30s
// (the upstream default) when not set or set to 0. SSE backends ignore this
// value because their stream lifetime is unbounded by design.
func WithBackendRequestTimeout(d time.Duration) MultiSessionFactoryOption {
return func(f *defaultMultiSessionFactory) {
if d > 0 {
f.backendRequestTimeout = d
}
}
}

// WithHMACSecret sets the server-managed secret used for HMAC-SHA256 token hashing.
// The secret should be 32+ bytes and loaded from secure configuration (e.g., environment
// variable, secret management system).
Expand Down Expand Up @@ -191,7 +205,15 @@ func WithAggregator(agg aggregator.Aggregator) MultiSessionFactoryOption {
// NewSessionFactory creates a MultiSessionFactory that connects to backends
// over HTTP using the given outgoing auth registry.
func NewSessionFactory(registry vmcpauth.OutgoingAuthRegistry, opts ...MultiSessionFactoryOption) MultiSessionFactory {
return newSessionFactoryWithConnector(backend.NewHTTPConnector(registry), opts...)
// Pre-apply opts to a sentinel to extract config the connector needs at
// construction time. The same opts run a second time inside
// newSessionFactoryWithConnector to populate the real factory; option
// functions are pure setters, so double-application is idempotent.
pre := &defaultMultiSessionFactory{}
for _, opt := range opts {
opt(pre)
}
return newSessionFactoryWithConnector(backend.NewHTTPConnector(registry, pre.backendRequestTimeout), opts...)
}

// newSessionFactoryWithConnector creates a MultiSessionFactory backed by an
Expand Down
26 changes: 19 additions & 7 deletions pkg/vmcp/session/internal/backend/mcp_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ const (
// see createMCPClient for the rationale.
maxBackendResponseSize = 100 * 1024 * 1024 // 100 MB

// defaultBackendRequestTimeout is the wall-clock deadline for individual
// streamable-HTTP requests. Applied at both the http.Client and SDK layers
// (defense-in-depth). Not used for SSE, whose stream lifetime is unbounded.
// defaultBackendRequestTimeout is the fallback wall-clock deadline for
// individual streamable-HTTP requests when the caller passes 0. Applied at
// both the http.Client and SDK layers (defense-in-depth). Not used for SSE,
// whose stream lifetime is unbounded. Override via NewHTTPConnector's
// requestTimeout argument, which is plumbed from the
// VirtualMCPServer.spec.config.operational.timeouts.default CRD field.
defaultBackendRequestTimeout = 30 * time.Second
)

Expand Down Expand Up @@ -195,7 +198,12 @@ func (c *mcpSession) GetPrompt(
//
// registry provides the authentication strategy for outgoing backend requests.
// Pass a registry configured with the "unauthenticated" strategy to disable auth.
func NewHTTPConnector(registry vmcpauth.OutgoingAuthRegistry) func(
//
// requestTimeout sets the per-tool-call wall-clock deadline applied to the
// streamable-HTTP http.Client and the mark3labs SDK. Pass 0 to use the
// upstream default (defaultBackendRequestTimeout, 30s). SSE backends ignore
// this argument because their stream lifetime is unbounded by design.
func NewHTTPConnector(registry vmcpauth.OutgoingAuthRegistry, requestTimeout time.Duration) func(
ctx context.Context,
target *vmcp.BackendTarget,
identity *auth.Identity,
Expand All @@ -207,7 +215,7 @@ func NewHTTPConnector(registry vmcpauth.OutgoingAuthRegistry) func(
identity *auth.Identity,
sessionHint string,
) (Session, *vmcp.CapabilityList, error) {
c, err := createMCPClient(target, identity, registry, sessionHint)
c, err := createMCPClient(target, identity, registry, sessionHint, requestTimeout)
if err != nil {
return nil, nil, fmt.Errorf("failed to create MCP client for backend %s: %w", target.WorkloadID, err)
}
Expand Down Expand Up @@ -242,7 +250,11 @@ func createMCPClient(
identity *auth.Identity,
registry vmcpauth.OutgoingAuthRegistry,
sessionHint string,
requestTimeout time.Duration,
) (*mcpclient.Client, error) {
if requestTimeout <= 0 {
requestTimeout = defaultBackendRequestTimeout
}
// Resolve and validate the auth strategy once at client creation time.
strategyName := authtypes.StrategyTypeUnauthenticated
if target.AuthConfig != nil {
Expand Down Expand Up @@ -297,10 +309,10 @@ func createMCPClient(
})
httpClient := &http.Client{
Transport: sizeLimited,
Timeout: defaultBackendRequestTimeout,
Timeout: requestTimeout,
}
streamableOpts := []mcptransport.StreamableHTTPCOption{
mcptransport.WithHTTPTimeout(defaultBackendRequestTimeout),
mcptransport.WithHTTPTimeout(requestTimeout),
mcptransport.WithHTTPBasicClient(httpClient),
}
if sessionHint != "" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/vmcp/session/internal/backend/mcp_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestCreateMCPClient_UnsupportedTransport(t *testing.T) {
TransportType: transport,
}

_, err := createMCPClient(target, nil, newTestRegistry(t), "")
_, err := createMCPClient(target, nil, newTestRegistry(t), "", 0)
require.Error(t, err)
assert.ErrorIs(t, err, vmcp.ErrUnsupportedTransport,
"transport %q should return ErrUnsupportedTransport", transport)
Expand Down