Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/api/config/crd/bases/kagent.dev_remotemcpservers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ spec:
default: true
type: boolean
timeout:
default: 30s
type: string
url:
minLength: 1
Expand Down
1 change: 1 addition & 0 deletions go/api/v1alpha2/remotemcpserver_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type RemoteMCPServerSpec struct {
// +optional
HeadersFrom []ValueRef `json:"headersFrom,omitempty"`
// +optional
// +kubebuilder:default="30s"
Timeout *metav1.Duration `json:"timeout,omitempty"`
// +optional
SseReadTimeout *metav1.Duration `json:"sseReadTimeout,omitempty"`
Expand Down
44 changes: 38 additions & 6 deletions go/core/internal/controller/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"reflect"
"slices"
"strings"
"time"

"github.com/hashicorp/go-multierror"
reconcilerutils "github.com/kagent-dev/kagent/go/core/internal/controller/reconciler/utils"
Expand Down Expand Up @@ -45,8 +46,25 @@ var (
const (
AgentReadyReasonDeploymentReady = "DeploymentReady"
AgentReadyReasonWorkloadReady = "WorkloadReady"

// mcpRegistrationTimeout is the default deadline applied to a RemoteMCPServer
// registration attempt (header resolution + MCP connect + tool listing) when
// .spec.timeout is not set. A hung or unreachable endpoint is bounded to this
// duration, ensuring the reconciler goroutine is always released and does not
// block subsequent RemoteMCPServer reconciliations.
mcpRegistrationTimeout = 30 * time.Second
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I almost prefer setting this as a default value in remotemcpserver_types.go instead in the spec.Timeout field so its more explicit.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - added // +kubebuilder:default="30s" on Timeout in remotemcpserver_types.go and regenerated both the base CRD and the Helm CRD template.
The Go fallback in remoteMCPRegistrationTimeout() is kept for existing resources that were created before this default takes effect (nil Timeout at admission time).

)

// remoteMCPRegistrationTimeout returns the effective registration deadline for
// a RemoteMCPServer. It uses .spec.timeout when set, and falls back to the
// package-level default otherwise.
func remoteMCPRegistrationTimeout(s *v1alpha2.RemoteMCPServer) time.Duration {
if s != nil && s.Spec.Timeout != nil {
return s.Spec.Timeout.Duration
}
return mcpRegistrationTimeout
}

type KagentReconciler interface {
ReconcileKagentAgent(ctx context.Context, req ctrl.Request) error
ReconcileKagentSandboxAgent(ctx context.Context, req ctrl.Request) error
Expand Down Expand Up @@ -576,16 +594,20 @@ func (a *kagentReconciler) ReconcileKagentRemoteMCPServer(ctx context.Context, r
GroupKind: server.GroupVersionKind().GroupKind().String(),
}

l.Info("registering remote MCP server", "url", server.Spec.URL, "protocol", server.Spec.Protocol)
start := time.Now()
tools, err := a.upsertToolServerForRemoteMCPServer(ctx, dbServer, server)
if err != nil {
l.Error(err, "failed to upsert tool server for remote mcp server")
l.Error(err, "failed to upsert tool server for remote mcp server", "duration", time.Since(start))

// Fetch previously discovered tools from database if possible
var discoveryErr error
tools, discoveryErr = a.getDiscoveredMCPTools(ctx, serverRef)
if discoveryErr != nil {
err = multierror.Append(err, discoveryErr)
}
} else {
l.Info("successfully registered remote MCP server", "url", server.Spec.URL, "toolCount", len(tools), "duration", time.Since(start))
}

// update the tool server status as the agents depend on it
Expand Down Expand Up @@ -968,12 +990,19 @@ func (a *kagentReconciler) upsertToolServerForRemoteMCPServer(ctx context.Contex
return nil, fmt.Errorf("failed to store toolServer %s: %w", toolServer.Name, err)
}

tsp, err := a.createMcpTransport(ctx, remoteMcpServer)
// Bound the entire registration sequence (header resolution + MCP connect +
// tool listing) to the effective per-resource timeout so that a hung or
// unreachable endpoint cannot block this goroutine — and therefore all
// subsequent RemoteMCPServer reconciliations — indefinitely.
tCtx, cancel := context.WithTimeout(ctx, remoteMCPRegistrationTimeout(remoteMcpServer))
defer cancel()

tsp, err := a.createMcpTransport(tCtx, remoteMcpServer)
Comment on lines +993 to +1000
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed - upsertToolServerForRemoteMCPServer now calls context.WithTimeout(ctx, remoteMCPRegistrationTimeout(remoteMcpServer)) so operators can tune the deadline per resource via .spec.timeout without any code changes.

if err != nil {
return nil, fmt.Errorf("failed to create client for toolServer %s: %w", toolServer.Name, err)
}

tools, err := a.listTools(ctx, tsp, toolServer)
tools, err := a.listTools(tCtx, tsp, toolServer)
Comment on lines +993 to +1005
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added TestRemoteMCPRegistrationTimeout (covers nil server, nil spec.timeout, and a custom value) and TestNewHTTPClient (covers nil/empty/with-headers - all assert the correct timeout and transport type). These are in reconciler_test.go in the same package.

if err != nil {
return nil, fmt.Errorf("failed to fetch tools for toolServer %s: %w", toolServer.Name, err)
}
Expand All @@ -999,7 +1028,7 @@ func (a *kagentReconciler) createMcpTransport(ctx context.Context, s *v1alpha2.R
return nil, err
}

httpClient := newHTTPClient(headers)
httpClient := newHTTPClient(headers, remoteMCPRegistrationTimeout(s))

switch s.Spec.Protocol {
case v1alpha2.RemoteMCPServerProtocolSse:
Expand All @@ -1017,11 +1046,14 @@ func (a *kagentReconciler) createMcpTransport(ctx context.Context, s *v1alpha2.R

// go-sdk does not have a WithHeaders option when initializing transport
// so we need to create a custom HTTP client that adds headers to all requests.
func newHTTPClient(headers map[string]string) *http.Client {
func newHTTPClient(headers map[string]string, timeout time.Duration) *http.Client {
if len(headers) == 0 {
return http.DefaultClient
return &http.Client{
Timeout: timeout,
}
}
return &http.Client{
Timeout: timeout,
Transport: &headerTransport{
headers: headers,
base: http.DefaultTransport,
Expand Down
61 changes: 61 additions & 0 deletions go/core/internal/controller/reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package reconciler
import (
"context"
"testing"
"time"

"github.com/kagent-dev/kagent/go/api/v1alpha2"
"github.com/kagent-dev/kagent/go/core/internal/utils"
Expand Down Expand Up @@ -672,3 +673,63 @@ func TestValidateCrossNamespaceReferences(t *testing.T) {
})
}
}

// TestRemoteMCPRegistrationTimeout verifies that remoteMCPRegistrationTimeout
// returns spec.timeout when set and falls back to the package default otherwise.
func TestRemoteMCPRegistrationTimeout(t *testing.T) {
custom := 10 * time.Second

tests := []struct {
name string
server *v1alpha2.RemoteMCPServer
want time.Duration
}{
{
name: "nil server returns default",
server: nil,
want: mcpRegistrationTimeout,
},
{
name: "nil spec.timeout returns default",
server: &v1alpha2.RemoteMCPServer{},
want: mcpRegistrationTimeout,
},
{
name: "spec.timeout overrides default",
server: &v1alpha2.RemoteMCPServer{
Spec: v1alpha2.RemoteMCPServerSpec{
Timeout: &metav1.Duration{Duration: custom},
},
},
want: custom,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, remoteMCPRegistrationTimeout(tt.server))
})
}
}

// TestNewHTTPClient verifies that newHTTPClient always produces a client with
// the supplied timeout, regardless of whether custom headers are present.
func TestNewHTTPClient(t *testing.T) {
timeout := 5 * time.Second

t.Run("no headers", func(t *testing.T) {
c := newHTTPClient(nil, timeout)
assert.Equal(t, timeout, c.Timeout)
})

t.Run("empty headers", func(t *testing.T) {
c := newHTTPClient(map[string]string{}, timeout)
assert.Equal(t, timeout, c.Timeout)
})

t.Run("with headers sets timeout and custom transport", func(t *testing.T) {
c := newHTTPClient(map[string]string{"X-Key": "val"}, timeout)
assert.Equal(t, timeout, c.Timeout)
_, ok := c.Transport.(*headerTransport)
assert.True(t, ok, "expected headerTransport")
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ spec:
default: true
type: boolean
timeout:
default: 30s
type: string
url:
minLength: 1
Expand Down
Loading