Skip to content

Commit 7a0f7be

Browse files
JAORMXclaude
andcommitted
Address review feedback for MCP service session affinity
- Extract shared applyService helper to reduce duplication between createHeadlessService and createMCPService - Add explicit SessionAffinityConfig.ClientIP.TimeoutSeconds (1800s) instead of relying on Kubernetes 3h default - Rename SSEHeadlessServiceName to MCPServiceName across types, setup, and client packages - Rename transportTypeRequiresHeadlessService to transportTypeRequiresBackendServices - Extract deleteIfExists helper in operator finalizer, removing nolint:gocyclo - Fix headless+NodePort conflict: skip NodePort type promotion for headless services (Kubernetes rejects clusterIP=None + type=NodePort) - Use slog.Debug instead of slog.Info for service creation success paths - Add spec.type and sessionAffinityConfig assertions to e2e tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent b589c1f commit 7a0f7be

8 files changed

Lines changed: 133 additions & 130 deletions

File tree

cmd/thv-operator/controllers/mcpserver_controller.go

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1455,71 +1455,46 @@ func (r *MCPServerReconciler) updateMCPServerStatus(ctx context.Context, m *mcpv
14551455
return r.Status().Update(ctx, m)
14561456
}
14571457

1458+
// deleteIfExists fetches a Kubernetes object by name and namespace, and deletes it if it exists.
1459+
// Returns nil if the object was not found or was successfully deleted.
1460+
func (r *MCPServerReconciler) deleteIfExists(ctx context.Context, obj client.Object, name, namespace, kind string) error {
1461+
err := r.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, obj)
1462+
if err == nil {
1463+
if delErr := r.Delete(ctx, obj); delErr != nil && !errors.IsNotFound(delErr) {
1464+
return fmt.Errorf("failed to delete %s %s: %w", kind, name, delErr)
1465+
}
1466+
return nil
1467+
}
1468+
if !errors.IsNotFound(err) {
1469+
return fmt.Errorf("failed to check %s %s: %w", kind, name, err)
1470+
}
1471+
return nil
1472+
}
1473+
14581474
// finalizeMCPServer performs the finalizer logic for the MCPServer
1459-
//
1460-
//nolint:gocyclo
14611475
func (r *MCPServerReconciler) finalizeMCPServer(ctx context.Context, m *mcpv1alpha1.MCPServer) error {
1462-
ctxLogger := log.FromContext(ctx)
14631476
// Update the MCPServer status
14641477
m.Status.Phase = mcpv1alpha1.MCPServerPhaseTerminating
14651478
m.Status.Message = "MCP server is being terminated"
14661479
if err := r.Status().Update(ctx, m); err != nil {
14671480
return err
14681481
}
14691482

1470-
// Step 2: Attempt to delete associated StatefulSet by name
1471-
sts := &appsv1.StatefulSet{}
1472-
err := r.Get(ctx, types.NamespacedName{Name: m.Name, Namespace: m.Namespace}, sts)
1473-
if err == nil {
1474-
// StatefulSet found, delete it
1475-
if delErr := r.Delete(ctx, sts); delErr != nil && !errors.IsNotFound(delErr) {
1476-
return fmt.Errorf("failed to delete StatefulSet %s: %w", m.Name, delErr)
1477-
}
1478-
} else if !errors.IsNotFound(err) {
1479-
// Unexpected error (not just "not found")
1480-
return fmt.Errorf("failed to get StatefulSet %s: %w", m.Name, err)
1481-
}
1482-
1483-
// Step 3: Attempt to delete associated services by name
1484-
svc := &corev1.Service{}
1485-
headlessSvcName := fmt.Sprintf("mcp-%s-headless", m.Name)
1486-
err = r.Get(ctx, types.NamespacedName{Name: headlessSvcName, Namespace: m.Namespace}, svc)
1487-
if err == nil {
1488-
if delErr := r.Delete(ctx, svc); delErr != nil && !errors.IsNotFound(delErr) {
1489-
return fmt.Errorf("failed to delete Service %s: %w", headlessSvcName, delErr)
1490-
}
1491-
} else if !errors.IsNotFound(err) {
1492-
return fmt.Errorf("failed to check Service %s: %w", headlessSvcName, err)
1483+
// Delete associated StatefulSet
1484+
if err := r.deleteIfExists(ctx, &appsv1.StatefulSet{}, m.Name, m.Namespace, "StatefulSet"); err != nil {
1485+
return err
14931486
}
14941487

1495-
// Delete the MCP ClusterIP service with session affinity
1496-
mcpSvc := &corev1.Service{}
1497-
mcpSvcName := fmt.Sprintf("mcp-%s", m.Name)
1498-
err = r.Get(ctx, types.NamespacedName{Name: mcpSvcName, Namespace: m.Namespace}, mcpSvc)
1499-
if err == nil {
1500-
if delErr := r.Delete(ctx, mcpSvc); delErr != nil && !errors.IsNotFound(delErr) {
1501-
return fmt.Errorf("failed to delete Service %s: %w", mcpSvcName, delErr)
1502-
}
1503-
} else if !errors.IsNotFound(err) {
1504-
return fmt.Errorf("failed to check Service %s: %w", mcpSvcName, err)
1488+
// Delete associated services
1489+
if err := r.deleteIfExists(ctx, &corev1.Service{}, fmt.Sprintf("mcp-%s-headless", m.Name), m.Namespace, "Service"); err != nil {
1490+
return err
15051491
}
1506-
1507-
// Step 4: Delete associated RunConfig ConfigMap
1508-
runConfigName := fmt.Sprintf("%s-runconfig", m.Name)
1509-
runConfigMap := &corev1.ConfigMap{}
1510-
err = r.Get(ctx, types.NamespacedName{Name: runConfigName, Namespace: m.Namespace}, runConfigMap)
1511-
if err == nil {
1512-
if delErr := r.Delete(ctx, runConfigMap); delErr != nil && !errors.IsNotFound(delErr) {
1513-
return fmt.Errorf("failed to delete RunConfig ConfigMap %s: %w", runConfigName, delErr)
1514-
}
1515-
ctxLogger.Info("Deleted RunConfig ConfigMap", "name", runConfigName, "namespace", m.Namespace)
1516-
} else if !errors.IsNotFound(err) {
1517-
return fmt.Errorf("failed to check RunConfig ConfigMap %s: %w", runConfigName, err)
1492+
if err := r.deleteIfExists(ctx, &corev1.Service{}, fmt.Sprintf("mcp-%s", m.Name), m.Namespace, "Service"); err != nil {
1493+
return err
15181494
}
15191495

1520-
// The owner references will automatically delete the deployment and service
1521-
// when the MCPServer is deleted, so we don't need to do anything here.
1522-
return nil
1496+
// Delete associated RunConfig ConfigMap
1497+
return r.deleteIfExists(ctx, &corev1.ConfigMap{}, fmt.Sprintf("%s-runconfig", m.Name), m.Namespace, "ConfigMap")
15231498
}
15241499

15251500
// deploymentNeedsUpdate checks if the deployment needs to be updated

pkg/container/kubernetes/client.go

Lines changed: 84 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ func (c *Client) DeployWorkload(ctx context.Context,
424424
//nolint:gosec // G706: statefulset name from Kubernetes API response
425425
slog.Info("applied statefulset", "name", createdStatefulSet.Name)
426426

427-
if transportTypeRequiresHeadlessService(transportType) && options != nil {
427+
if transportTypeRequiresBackendServices(transportType) && options != nil {
428428
// Create a headless service for DNS discovery
429429
err := c.createHeadlessService(ctx, containerName, namespace, containerLabels, options)
430430
if err != nil {
@@ -898,67 +898,111 @@ func createServicePorts(options *runtime.DeployWorkloadOptions) ([]*corev1apply.
898898
return servicePorts, nil
899899
}
900900

901-
// createHeadlessService creates a headless Kubernetes service for the StatefulSet
902-
func (c *Client) createHeadlessService(
901+
// serviceConfig holds the configuration for creating a Kubernetes service via applyService.
902+
type serviceConfig struct {
903+
// nameSuffix is appended to "mcp-<containerName>" to form the service name.
904+
// Use "-headless" for the headless service or "" for the MCP service.
905+
nameSuffix string
906+
// headless makes the service a headless service (ClusterIP: None).
907+
headless bool
908+
// sessionAffinity enables ClientIP session affinity with the given timeout.
909+
sessionAffinity bool
910+
// sessionAffinityTimeoutSeconds sets the timeout for ClientIP session affinity.
911+
// Only used when sessionAffinity is true. Kubernetes defaults to 10800s (3h) if unset.
912+
sessionAffinityTimeoutSeconds int32
913+
}
914+
915+
// applyService creates or updates a Kubernetes service using server-side apply.
916+
func (c *Client) applyService(
903917
ctx context.Context,
904918
containerName string,
905919
namespace string,
906920
labels map[string]string,
907921
options *runtime.DeployWorkloadOptions,
908-
) error {
909-
// Create service ports from the container ports
922+
cfg serviceConfig,
923+
) (string, error) {
910924
servicePorts, err := createServicePorts(options)
911925
if err != nil {
912-
return err
926+
return "", err
913927
}
914928

915-
// If no ports were configured, don't create a service
916929
if len(servicePorts) == 0 {
917-
slog.Info("no ports configured for SSE transport, skipping service creation")
918-
return nil
930+
slog.Debug("no ports configured, skipping service creation")
931+
return "", nil
919932
}
920933

921-
// Create service type based on whether we have node ports
934+
svcName := fmt.Sprintf("mcp-%s%s", containerName, cfg.nameSuffix)
935+
936+
// Determine service type based on whether any ports have NodePort set.
937+
// Headless services (ClusterIP: None) cannot be NodePort, so skip the
938+
// promotion for those — Kubernetes rejects clusterIP=None + type=NodePort.
922939
serviceType := corev1.ServiceTypeClusterIP
923-
for _, sp := range servicePorts {
924-
if sp.NodePort != nil {
925-
serviceType = corev1.ServiceTypeNodePort
926-
break
940+
if !cfg.headless {
941+
for _, sp := range servicePorts {
942+
if sp.NodePort != nil {
943+
serviceType = corev1.ServiceTypeNodePort
944+
break
945+
}
927946
}
928947
}
929948

930-
// we want to generate a service name that is unique for the headless service
931-
// to avoid conflicts with the proxy service
932-
svcName := fmt.Sprintf("mcp-%s-headless", containerName)
949+
spec := corev1apply.ServiceSpec().
950+
WithSelector(map[string]string{
951+
"app": containerName,
952+
}).
953+
WithPorts(servicePorts...).
954+
WithType(serviceType)
955+
956+
if cfg.headless {
957+
spec = spec.WithClusterIP("None")
958+
}
959+
960+
if cfg.sessionAffinity {
961+
spec = spec.
962+
WithSessionAffinity(corev1.ServiceAffinityClientIP).
963+
WithSessionAffinityConfig(corev1apply.SessionAffinityConfig().
964+
WithClientIP(corev1apply.ClientIPConfig().
965+
WithTimeoutSeconds(cfg.sessionAffinityTimeoutSeconds)))
966+
}
933967

934-
// Create the service apply configuration
935968
serviceApply := corev1apply.Service(svcName, namespace).
936969
WithLabels(labels).
937-
WithSpec(corev1apply.ServiceSpec().
938-
WithSelector(map[string]string{
939-
"app": containerName,
940-
}).
941-
WithPorts(servicePorts...).
942-
WithType(serviceType).
943-
WithClusterIP("None")) // "None" makes it a headless service
944-
945-
// Apply the service using server-side apply
946-
fieldManager := serviceFieldManager
970+
WithSpec(spec)
971+
947972
_, err = c.client.CoreV1().Services(namespace).
948973
Apply(ctx, serviceApply, metav1.ApplyOptions{
949-
FieldManager: fieldManager,
974+
FieldManager: serviceFieldManager,
950975
Force: true,
951976
})
952-
953977
if err != nil {
954-
return fmt.Errorf("failed to apply service: %w", err)
978+
return "", fmt.Errorf("failed to apply service %s: %w", svcName, err)
955979
}
956980

957-
slog.Info("created headless service for HTTP transport", "name", containerName)
981+
slog.Debug("applied service", "name", svcName)
982+
return svcName, nil
983+
}
958984

959-
return nil
985+
// createHeadlessService creates a headless Kubernetes service for the StatefulSet
986+
func (c *Client) createHeadlessService(
987+
ctx context.Context,
988+
containerName string,
989+
namespace string,
990+
labels map[string]string,
991+
options *runtime.DeployWorkloadOptions,
992+
) error {
993+
_, err := c.applyService(ctx, containerName, namespace, labels, options, serviceConfig{
994+
nameSuffix: "-headless",
995+
headless: true,
996+
})
997+
return err
960998
}
961999

1000+
// mcpServiceSessionAffinityTimeout is the timeout in seconds for ClientIP session affinity
1001+
// on the MCP service. This controls how long kube-proxy pins a client IP to the same backend pod.
1002+
// Note: this provides proxy-runner-level stickiness (L4), not per-MCP-session stickiness (L7).
1003+
// True per-session routing would require Mcp-Session-Id-based routing at the proxy layer.
1004+
const mcpServiceSessionAffinityTimeout int32 = 1800
1005+
9621006
// createMCPService creates a regular ClusterIP service with SessionAffinity for the MCP server StatefulSet.
9631007
// This service provides load balancing with client-IP-based session stickiness, which the proxy-runner
9641008
// uses as its target host. The headless service is retained for DNS discovery purposes.
@@ -969,47 +1013,14 @@ func (c *Client) createMCPService(
9691013
labels map[string]string,
9701014
options *runtime.DeployWorkloadOptions,
9711015
) error {
972-
// Create service ports from the container ports
973-
servicePorts, err := createServicePorts(options)
1016+
svcName, err := c.applyService(ctx, containerName, namespace, labels, options, serviceConfig{
1017+
sessionAffinity: true,
1018+
sessionAffinityTimeoutSeconds: mcpServiceSessionAffinityTimeout,
1019+
})
9741020
if err != nil {
9751021
return err
9761022
}
977-
978-
// If no ports were configured, don't create a service
979-
if len(servicePorts) == 0 {
980-
slog.Info("no ports configured for MCP transport, skipping service creation")
981-
return nil
982-
}
983-
984-
svcName := fmt.Sprintf("mcp-%s", containerName)
985-
986-
// Create the service apply configuration with SessionAffinity
987-
serviceApply := corev1apply.Service(svcName, namespace).
988-
WithLabels(labels).
989-
WithSpec(corev1apply.ServiceSpec().
990-
WithSelector(map[string]string{
991-
"app": containerName,
992-
}).
993-
WithPorts(servicePorts...).
994-
WithType(corev1.ServiceTypeClusterIP).
995-
WithSessionAffinity(corev1.ServiceAffinityClientIP))
996-
997-
// Apply the service using server-side apply
998-
fieldManager := serviceFieldManager
999-
_, err = c.client.CoreV1().Services(namespace).
1000-
Apply(ctx, serviceApply, metav1.ApplyOptions{
1001-
FieldManager: fieldManager,
1002-
Force: true,
1003-
})
1004-
1005-
if err != nil {
1006-
return fmt.Errorf("failed to apply MCP service: %w", err)
1007-
}
1008-
1009-
slog.Info("created MCP service with session affinity", "name", svcName)
1010-
1011-
// TODO: rename SSEHeadlessServiceName to MCPServiceName
1012-
options.SSEHeadlessServiceName = svcName
1023+
options.MCPServiceName = svcName
10131024
return nil
10141025
}
10151026

@@ -1030,8 +1041,8 @@ func extractPortMappingsFromPod(pod *corev1.Pod) []runtime.PortMapping {
10301041
return ports
10311042
}
10321043

1033-
// transportTypeRequiresHeadlessService returns true if the transport type requires a headless service
1034-
func transportTypeRequiresHeadlessService(transportType string) bool {
1044+
// transportTypeRequiresBackendServices returns true if the transport type requires backend services
1045+
func transportTypeRequiresBackendServices(transportType string) bool {
10351046
return transportType == string(transtypes.TransportTypeSSE) || transportType == string(transtypes.TransportTypeStreamableHTTP)
10361047
}
10371048

pkg/container/runtime/types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,10 @@ type DeployWorkloadOptions struct {
256256
// Only applicable when using Kubernetes runtime
257257
K8sPodTemplatePatch string
258258

259-
// SSEHeadlessServiceName is the name of the Kubernetes service to use for the workload
260-
// Only applicable when using Kubernetes runtime and SSE transport
261-
SSEHeadlessServiceName string
259+
// MCPServiceName is the name of the Kubernetes ClusterIP service used as the
260+
// proxy-runner target for MCP server workloads.
261+
// Only applicable when using Kubernetes runtime with SSE or streamable-http transport.
262+
MCPServiceName string
262263

263264
// IgnoreConfig contains configuration for ignore patterns and tmpfs overlays
264265
// Used to filter bind mount contents by hiding sensitive files

pkg/runtime/setup.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,9 @@ func Setup(
124124
} else {
125125
// Update target host and port if needed (for Kubernetes)
126126
if (transportType == types.TransportTypeSSE || transportType == types.TransportTypeStreamableHTTP) && rt.IsKubernetesRuntime() {
127-
// If the SSEHeadlessServiceName is set, use it as the target host
128-
if containerOptions.SSEHeadlessServiceName != "" {
129-
result.TargetHost = containerOptions.SSEHeadlessServiceName
127+
// If the MCPServiceName is set, use it as the target host
128+
if containerOptions.MCPServiceName != "" {
129+
result.TargetHost = containerOptions.MCPServiceName
130130
}
131131
}
132132

test/e2e/chainsaw/operator/multi-tenancy/test-scenarios/sse/assert-mcpserver-svc.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,8 @@ metadata:
44
name: mcp-yardstick
55
namespace: test-namespace
66
spec:
7+
type: ClusterIP
78
sessionAffinity: ClientIP
9+
sessionAffinityConfig:
10+
clientIP:
11+
timeoutSeconds: 1800

test/e2e/chainsaw/operator/multi-tenancy/test-scenarios/streamable-http/assert-mcpserver-svc.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,8 @@ metadata:
44
name: mcp-yardstick
55
namespace: test-namespace
66
spec:
7+
type: ClusterIP
78
sessionAffinity: ClientIP
9+
sessionAffinityConfig:
10+
clientIP:
11+
timeoutSeconds: 1800

test/e2e/chainsaw/operator/single-tenancy/test-scenarios/sse/assert-mcpserver-svc.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,8 @@ metadata:
44
name: mcp-st-sse
55
namespace: toolhive-system
66
spec:
7+
type: ClusterIP
78
sessionAffinity: ClientIP
9+
sessionAffinityConfig:
10+
clientIP:
11+
timeoutSeconds: 1800

test/e2e/chainsaw/operator/single-tenancy/test-scenarios/streamable-http/assert-mcpserver-svc.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,8 @@ metadata:
44
name: mcp-st-streamable-http
55
namespace: toolhive-system
66
spec:
7+
type: ClusterIP
78
sessionAffinity: ClientIP
9+
sessionAffinityConfig:
10+
clientIP:
11+
timeoutSeconds: 1800

0 commit comments

Comments
 (0)