Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 26 additions & 37 deletions cmd/thv-operator/controllers/mcpserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,57 +1455,46 @@ func (r *MCPServerReconciler) updateMCPServerStatus(ctx context.Context, m *mcpv
return r.Status().Update(ctx, m)
}

// deleteIfExists fetches a Kubernetes object by name and namespace, and deletes it if it exists.
// Returns nil if the object was not found or was successfully deleted.
func (r *MCPServerReconciler) deleteIfExists(ctx context.Context, obj client.Object, name, namespace, kind string) error {
err := r.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, obj)
if err == nil {
if delErr := r.Delete(ctx, obj); delErr != nil && !errors.IsNotFound(delErr) {
Comment thread
JAORMX marked this conversation as resolved.
return fmt.Errorf("failed to delete %s %s: %w", kind, name, delErr)
}
return nil
}
if !errors.IsNotFound(err) {
return fmt.Errorf("failed to check %s %s: %w", kind, name, err)
}
return nil
}

// finalizeMCPServer performs the finalizer logic for the MCPServer
func (r *MCPServerReconciler) finalizeMCPServer(ctx context.Context, m *mcpv1alpha1.MCPServer) error {
ctxLogger := log.FromContext(ctx)
// Update the MCPServer status
m.Status.Phase = mcpv1alpha1.MCPServerPhaseTerminating
m.Status.Message = "MCP server is being terminated"
if err := r.Status().Update(ctx, m); err != nil {
return err
}

// Step 2: Attempt to delete associated StatefulSet by name
sts := &appsv1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{Name: m.Name, Namespace: m.Namespace}, sts)
if err == nil {
// StatefulSet found, delete it
if delErr := r.Delete(ctx, sts); delErr != nil && !errors.IsNotFound(delErr) {
return fmt.Errorf("failed to delete StatefulSet %s: %w", m.Name, delErr)
}
} else if !errors.IsNotFound(err) {
// Unexpected error (not just "not found")
return fmt.Errorf("failed to get StatefulSet %s: %w", m.Name, err)
// Delete associated StatefulSet
if err := r.deleteIfExists(ctx, &appsv1.StatefulSet{}, m.Name, m.Namespace, "StatefulSet"); err != nil {
return err
}

// Step 3: Attempt to delete associated service by name
svc := &corev1.Service{}
serviceName := fmt.Sprintf("mcp-%s-headless", m.Name)
err = r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: m.Namespace}, svc)
if err == nil {
if delErr := r.Delete(ctx, svc); delErr != nil && !errors.IsNotFound(delErr) {
return fmt.Errorf("failed to delete Service %s: %w", serviceName, delErr)
}
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to check Service %s: %w", serviceName, err)
// Delete associated services
if err := r.deleteIfExists(ctx, &corev1.Service{}, fmt.Sprintf("mcp-%s-headless", m.Name), m.Namespace, "Service"); err != nil {
return err
}

// Step 4: Delete associated RunConfig ConfigMap
runConfigName := fmt.Sprintf("%s-runconfig", m.Name)
runConfigMap := &corev1.ConfigMap{}
err = r.Get(ctx, types.NamespacedName{Name: runConfigName, Namespace: m.Namespace}, runConfigMap)
if err == nil {
if delErr := r.Delete(ctx, runConfigMap); delErr != nil && !errors.IsNotFound(delErr) {
return fmt.Errorf("failed to delete RunConfig ConfigMap %s: %w", runConfigName, delErr)
}
ctxLogger.Info("Deleted RunConfig ConfigMap", "name", runConfigName, "namespace", m.Namespace)
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to check RunConfig ConfigMap %s: %w", runConfigName, err)
if err := r.deleteIfExists(ctx, &corev1.Service{}, fmt.Sprintf("mcp-%s", m.Name), m.Namespace, "Service"); err != nil {
return err
}

// The owner references will automatically delete the deployment and service
// when the MCPServer is deleted, so we don't need to do anything here.
return nil
// Delete associated RunConfig ConfigMap
return r.deleteIfExists(ctx, &corev1.ConfigMap{}, fmt.Sprintf("%s-runconfig", m.Name), m.Namespace, "ConfigMap")
}

// deploymentNeedsUpdate checks if the deployment needs to be updated
Expand Down
175 changes: 137 additions & 38 deletions pkg/container/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/tools/watch"
"k8s.io/utils/ptr"

"github.com/stacklok/toolhive-core/permissions"
"github.com/stacklok/toolhive/pkg/container/runtime"
Expand All @@ -46,6 +47,8 @@ const (
mcpContainerName = "mcp"
// defaultNamespace is the default Kubernetes namespace
defaultNamespace = "default"
// serviceFieldManager is the field manager name for server-side apply operations
serviceFieldManager = "toolhive-container-manager"
)

// RuntimeName is the name identifier for the Kubernetes runtime
Expand Down Expand Up @@ -328,6 +331,8 @@ func (c *Client) GetWorkloadLogs(ctx context.Context, workloadName string, follo
}

// DeployWorkload implements runtime.Runtime.
//
//nolint:gocyclo
Comment thread
JAORMX marked this conversation as resolved.
func (c *Client) DeployWorkload(ctx context.Context,
image string,
containerName string,
Expand Down Expand Up @@ -407,7 +412,7 @@ func (c *Client) DeployWorkload(ctx context.Context,
WithTemplate(podTemplateSpec))

// Apply the statefulset using server-side apply
fieldManager := "toolhive-container-manager"
fieldManager := serviceFieldManager
createdStatefulSet, err := c.client.AppsV1().StatefulSets(namespace).
Apply(ctx, statefulSetApply, metav1.ApplyOptions{
FieldManager: fieldManager,
Expand All @@ -420,12 +425,27 @@ func (c *Client) DeployWorkload(ctx context.Context,
//nolint:gosec // G706: statefulset name from Kubernetes API response
slog.Info("applied statefulset", "name", createdStatefulSet.Name)

if transportTypeRequiresHeadlessService(transportType) && options != nil {
// Create a headless service for SSE transport
err := c.createHeadlessService(ctx, containerName, namespace, containerLabels, options)
if transportTypeRequiresBackendServices(transportType) && options != nil {
stsOwner := &metav1.OwnerReference{
APIVersion: appsv1.SchemeGroupVersion.String(),
Kind: "StatefulSet",
Name: createdStatefulSet.Name,
UID: createdStatefulSet.UID,
BlockOwnerDeletion: ptr.To(true),
Controller: ptr.To(true),
}

// Create a headless service for DNS discovery
Comment thread
JAORMX marked this conversation as resolved.
err := c.createHeadlessService(ctx, containerName, namespace, containerLabels, options, stsOwner)
if err != nil {
return 0, fmt.Errorf("failed to create headless service: %w", err)
}

// Create a regular ClusterIP service with session affinity for the proxy-runner target
err = c.createMCPService(ctx, containerName, namespace, containerLabels, options, stsOwner)
if err != nil {
return 0, fmt.Errorf("failed to create MCP service: %w", err)
}
}

// Wait for the statefulset to be ready
Expand Down Expand Up @@ -888,65 +908,144 @@ func createServicePorts(options *runtime.DeployWorkloadOptions) ([]*corev1apply.
return servicePorts, nil
}

// createHeadlessService creates a headless Kubernetes service for the StatefulSet
func (c *Client) createHeadlessService(
// serviceConfig holds the configuration for creating a Kubernetes service via applyService.
type serviceConfig struct {
// nameSuffix is appended to "mcp-<containerName>" to form the service name.
// Use "-headless" for the headless service or "" for the MCP service.
nameSuffix string
// headless makes the service a headless service (ClusterIP: None).
headless bool
// sessionAffinity enables ClientIP session affinity with the given timeout.
sessionAffinity bool
// sessionAffinityTimeoutSeconds sets the timeout for ClientIP session affinity.
// Only used when sessionAffinity is true. Kubernetes defaults to 10800s (3h) if unset.
sessionAffinityTimeoutSeconds int32
}

// applyService creates or updates a Kubernetes service using server-side apply.
// If owner is non-nil, it is set as an owner reference so Kubernetes garbage-collects
// the service when the owner is deleted.
func (c *Client) applyService(
ctx context.Context,
containerName string,
namespace string,
labels map[string]string,
options *runtime.DeployWorkloadOptions,
) error {
// Create service ports from the container ports
cfg serviceConfig,
owner *metav1.OwnerReference,
) (string, error) {
servicePorts, err := createServicePorts(options)
if err != nil {
return err
return "", err
}

// If no ports were configured, don't create a service
if len(servicePorts) == 0 {
slog.Info("no ports configured for SSE transport, skipping service creation")
return nil
slog.Debug("no ports configured, skipping service creation")
return "", nil
}

// Create service type based on whether we have node ports
svcName := fmt.Sprintf("mcp-%s%s", containerName, cfg.nameSuffix)

// Determine service type based on whether any ports have NodePort set.
// Headless services (ClusterIP: None) cannot be NodePort, so skip the
// promotion for those — Kubernetes rejects clusterIP=None + type=NodePort.
serviceType := corev1.ServiceTypeClusterIP
for _, sp := range servicePorts {
if sp.NodePort != nil {
serviceType = corev1.ServiceTypeNodePort
break
if !cfg.headless {
for _, sp := range servicePorts {
if sp.NodePort != nil {
serviceType = corev1.ServiceTypeNodePort
break
}
}
}

// we want to generate a service name that is unique for the headless service
// to avoid conflicts with the proxy service
svcName := fmt.Sprintf("mcp-%s-headless", containerName)
spec := corev1apply.ServiceSpec().
WithSelector(map[string]string{
"app": containerName,
}).
WithPorts(servicePorts...).
WithType(serviceType)

if cfg.headless {
spec = spec.WithClusterIP("None")
}

if cfg.sessionAffinity {
spec = spec.
WithSessionAffinity(corev1.ServiceAffinityClientIP).
WithSessionAffinityConfig(corev1apply.SessionAffinityConfig().
WithClientIP(corev1apply.ClientIPConfig().
WithTimeoutSeconds(cfg.sessionAffinityTimeoutSeconds)))
}

// Create the service apply configuration
serviceApply := corev1apply.Service(svcName, namespace).
WithLabels(labels).
WithSpec(corev1apply.ServiceSpec().
WithSelector(map[string]string{
"app": containerName,
}).
WithPorts(servicePorts...).
WithType(serviceType).
WithClusterIP("None")) // "None" makes it a headless service

// Apply the service using server-side apply
fieldManager := "toolhive-container-manager"
WithSpec(spec)

if owner != nil {
serviceApply = serviceApply.WithOwnerReferences(metav1apply.OwnerReference().
WithAPIVersion(owner.APIVersion).
WithKind(owner.Kind).
WithName(owner.Name).
WithUID(owner.UID).
WithBlockOwnerDeletion(true).
WithController(true))
}

_, err = c.client.CoreV1().Services(namespace).
Apply(ctx, serviceApply, metav1.ApplyOptions{
FieldManager: fieldManager,
FieldManager: serviceFieldManager,
Force: true,
})

if err != nil {
return fmt.Errorf("failed to apply service: %w", err)
return "", fmt.Errorf("failed to apply service %s: %w", svcName, err)
}

slog.Info("created headless service for HTTP transport", "name", containerName)
slog.Debug("applied service", "name", svcName)
return svcName, nil
}

// createHeadlessService creates a headless Kubernetes service for the StatefulSet
func (c *Client) createHeadlessService(
ctx context.Context,
containerName string,
namespace string,
labels map[string]string,
options *runtime.DeployWorkloadOptions,
owner *metav1.OwnerReference,
) error {
_, err := c.applyService(ctx, containerName, namespace, labels, options, serviceConfig{
nameSuffix: "-headless",
headless: true,
}, owner)
return err
}
Comment thread
JAORMX marked this conversation as resolved.

// mcpServiceSessionAffinityTimeout is the timeout in seconds for ClientIP session affinity
// on the MCP service. This controls how long kube-proxy pins a client IP to the same backend pod.
// Note: this provides proxy-runner-level stickiness (L4), not per-MCP-session stickiness (L7).
// True per-session routing would require Mcp-Session-Id-based routing at the proxy layer.
const mcpServiceSessionAffinityTimeout int32 = 1800

options.SSEHeadlessServiceName = svcName
// createMCPService creates a regular ClusterIP service with SessionAffinity for the MCP server StatefulSet.
Comment thread
JAORMX marked this conversation as resolved.
// This service provides load balancing with client-IP-based session stickiness, which the proxy-runner
// uses as its target host. The headless service is retained for DNS discovery purposes.
func (c *Client) createMCPService(
ctx context.Context,
containerName string,
namespace string,
labels map[string]string,
options *runtime.DeployWorkloadOptions,
Comment thread
JAORMX marked this conversation as resolved.
owner *metav1.OwnerReference,
) error {
svcName, err := c.applyService(ctx, containerName, namespace, labels, options, serviceConfig{
sessionAffinity: true,
sessionAffinityTimeoutSeconds: mcpServiceSessionAffinityTimeout,
}, owner)
if err != nil {
return err
}
options.MCPServiceName = svcName
return nil
}

Expand All @@ -967,8 +1066,8 @@ func extractPortMappingsFromPod(pod *corev1.Pod) []runtime.PortMapping {
return ports
}

// transportTypeRequiresHeadlessService returns true if the transport type requires a headless service
func transportTypeRequiresHeadlessService(transportType string) bool {
// transportTypeRequiresBackendServices returns true if the transport type requires backend services
func transportTypeRequiresBackendServices(transportType string) bool {
return transportType == string(transtypes.TransportTypeSSE) || transportType == string(transtypes.TransportTypeStreamableHTTP)
}

Expand Down
Loading
Loading