diff --git a/api/v1/installation_types.go b/api/v1/installation_types.go index b64227400d..b97c1c4d91 100644 --- a/api/v1/installation_types.go +++ b/api/v1/installation_types.go @@ -160,6 +160,17 @@ type InstallationSpec struct { // +optional NodeUpdateStrategy appsv1.DaemonSetUpdateStrategy `json:"nodeUpdateStrategy,omitempty"` + // StalePodIPRecovery enables automatic detection and deletion of host-networked Calico pods + // (calico-typha, calico-node, calico-node-windows) whose status.podIPs no longer matches their + // node's current InternalIP. When stale pod IPs are detected (e.g., after a node reboot pulls + // a new DHCP lease), the operator deletes affected pods so the Deployment / DaemonSet + // controllers recreate them with the correct IP. This works around an upstream Kubernetes + // limitation where status.podIPs is immutable for hostNetwork pods. + // Default: Enabled + // +kubebuilder:validation:Enum=Enabled;Disabled + // +optional + StalePodIPRecovery *StalePodIPRecoveryType `json:"stalePodIPRecovery,omitempty"` + // Deprecated. Please use CalicoNodeDaemonSet, TyphaDeployment, and KubeControllersDeployment. // ComponentResources can be used to customize the resource requirements for each component. // Node, Typha, and KubeControllers are supported for installations. @@ -350,6 +361,15 @@ const ( FIPSModeDisabled FIPSMode = "Disabled" ) +// StalePodIPRecoveryType controls whether the operator automatically detects and recreates +// host-networked Calico pods with stale status.podIPs after a node IP change. +type StalePodIPRecoveryType string + +const ( + StalePodIPRecoveryEnabled StalePodIPRecoveryType = "Enabled" + StalePodIPRecoveryDisabled StalePodIPRecoveryType = "Disabled" +) + // Deprecated. Please use TyphaDeployment instead. // TyphaAffinity allows configuration of node affinity characteristics for Typha pods. type TyphaAffinity struct { @@ -1109,6 +1129,12 @@ func IsFIPSModeEnabledString(mode *FIPSMode) string { return fmt.Sprintf("%t", IsFIPSModeEnabled(mode)) } +// IsStalePodIPRecoveryEnabled returns whether stale pod IP recovery is enabled. The behavior +// is default-on, so a nil reference means enabled. +func IsStalePodIPRecoveryEnabled(s *StalePodIPRecoveryType) bool { + return s == nil || *s == StalePodIPRecoveryEnabled +} + type WindowsNodeSpec struct { // CNIBinDir is the path to the CNI binaries directory on Windows, it must match what is used as 'bin_dir' under // [plugins] diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 0bade7102c..732d27e5f0 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -5971,6 +5971,11 @@ func (in *InstallationSpec) DeepCopyInto(out *InstallationSpec) { **out = **in } in.NodeUpdateStrategy.DeepCopyInto(&out.NodeUpdateStrategy) + if in.StalePodIPRecovery != nil { + in, out := &in.StalePodIPRecovery, &out.StalePodIPRecovery + *out = new(StalePodIPRecoveryType) + **out = **in + } if in.ComponentResources != nil { in, out := &in.ComponentResources, &out.ComponentResources *out = make([]ComponentResource, len(*in)) diff --git a/pkg/controller/installation/core_controller.go b/pkg/controller/installation/core_controller.go index 2812fe1f97..4e1855504f 100644 --- a/pkg/controller/installation/core_controller.go +++ b/pkg/controller/installation/core_controller.go @@ -322,9 +322,25 @@ func newReconciler(mgr manager.Manager, opts options.ControllerOptions) (*Reconc nodeIndexInformer := cache.NewSharedIndexInformer(nodeListWatch, &corev1.Node{}, 0, cache.Indexers{}) go nodeIndexInformer.Run(opts.ShutdownContext.Done()) - // Create a Typha autoscaler. + // Create a Typha autoscaler. Stale pod IP recovery defers to the current + // Installation.Spec.StalePodIPRecovery setting on each tick (default-on if unset). + // If the Installation can't be read for any reason, fail-open and let the recovery + // run — it's the safer behavior for the kubelet bug we're working around. + mgrClient := mgr.GetClient() typhaListWatch := cache.NewListWatchFromClient(opts.K8sClientset.AppsV1().RESTClient(), "deployments", "calico-system", fields.OneTermEqualSelector("metadata.name", "calico-typha")) - typhaScaler := newTyphaAutoscaler(opts.K8sClientset, nodeIndexInformer, typhaListWatch, statusManager) + typhaScaler := newTyphaAutoscaler( + opts.K8sClientset, + nodeIndexInformer, + typhaListWatch, + statusManager, + typhaAutoscalerOptionStalePodIPRecoveryEnabled(func() bool { + inst := &operatorv1.Installation{} + if err := mgrClient.Get(opts.ShutdownContext, types.NamespacedName{Name: "default"}, inst); err != nil { + return true + } + return operatorv1.IsStalePodIPRecoveryEnabled(inst.Spec.StalePodIPRecovery) + }), + ) r := &ReconcileInstallation{ config: mgr.GetConfig(), diff --git a/pkg/controller/installation/typha_autoscaler.go b/pkg/controller/installation/typha_autoscaler.go index 8d675591c4..fc7ffa9c0c 100644 --- a/pkg/controller/installation/typha_autoscaler.go +++ b/pkg/controller/installation/typha_autoscaler.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -57,6 +58,10 @@ type typhaAutoscaler struct { typhaIndexer cache.Store nonClusterHost bool + // stalePodIPRecoveryEnabled returns whether the operator should detect and delete + // host-networked pods with stale status.podIPs each tick. Default-on if nil. + stalePodIPRecoveryEnabled func() bool + // Number of currently running replicas. activeReplicas int32 } @@ -77,6 +82,15 @@ func typhaAutoscalerOptionNonclusterHost(nonClusterHost bool) typhaAutoscalerOpt } } +// typhaAutoscalerOptionStalePodIPRecoveryEnabled provides a getter that the autoscaler will +// call each tick to determine whether stale-IP pod recovery is enabled. If unset, recovery +// is always enabled. +func typhaAutoscalerOptionStalePodIPRecoveryEnabled(enabled func() bool) typhaAutoscalerOption { + return func(t *typhaAutoscaler) { + t.stalePodIPRecoveryEnabled = enabled + } +} + // newTyphaAutoscaler creates a new Typha autoscaler, optionally applying any options to the default autoscaler instance. // The default sync period is 10 seconds. func newTyphaAutoscaler(cs kubernetes.Interface, indexInformer cache.SharedIndexInformer, typhaListWatch cache.ListerWatcher, statusManager status.StatusManager, options ...typhaAutoscalerOption) *typhaAutoscaler { @@ -156,6 +170,40 @@ func (t *typhaAutoscaler) start(ctx context.Context) { } else { degraded = false } + + // Check for host-networked pods with stale IPs (e.g., after a node + // IP change) and delete them so they get recreated with the correct + // IP. Typha is checked first; if any Typha pod was deleted this + // cycle, calico-node deletions are skipped to give the new Typha a + // clean window to come up before churning calico-node pods that + // depend on it. + // + // Skip the entire check if stale-IP recovery has been disabled + // via Installation.Spec.StalePodIPRecovery. + if t.stalePodIPRecoveryEnabled == nil || t.stalePodIPRecoveryEnabled() { + typhaBatch := t.resolveTyphaMaxUnavailable() + deletedTypha := t.deleteStaleHostNetworkPods( + "calico-typha", + fmt.Sprintf("%s=%s", render.AppLabelName, render.TyphaK8sAppName), + typhaBatch, + ) > 0 + if !deletedTypha { + // Linux and Windows DaemonSets are paced independently of each + // other. + linuxBatch := t.resolveDaemonSetMaxUnavailable(render.CalicoNodeObjectName) + t.deleteStaleHostNetworkPods( + "calico-node", + fmt.Sprintf("%s=%s", render.AppLabelName, render.CalicoNodeObjectName), + linuxBatch, + ) + windowsBatch := t.resolveDaemonSetMaxUnavailable(render.WindowsNodeObjectName) + t.deleteStaleHostNetworkPods( + "calico-node-windows", + fmt.Sprintf("%s=%s", render.AppLabelName, render.WindowsNodeObjectName), + windowsBatch, + ) + } + } case errCh := <-t.triggerRunChan: if err := t.autoscaleReplicas(); err != nil { degraded = true @@ -280,6 +328,155 @@ func (t *typhaAutoscaler) getNodeCounts() (int, int) { return schedulable, linuxNodes } +// deleteStaleHostNetworkPods lists pods matching labelSelector in the +// calico-system namespace and compares each pod's status.podIPs against the +// current InternalIP of the node the pod is running on. If the IPs don't match +// (stale pod IP after a node IP change), up to maxBatch pods are deleted so the +// owning controller (Deployment / DaemonSet) recreates them with the correct IP. +// +// This is necessary because Kubernetes does not update status.podIPs for +// existing hostNetwork pods when the node's IP changes — it is explicitly +// immutable in the kubelet: +// https://github.com/kubernetes/kubernetes/issues/93897. +// +// Returns the number of pods deleted in this call. +// +// workloadName is used only for logging. +func (t *typhaAutoscaler) deleteStaleHostNetworkPods(workloadName, labelSelector string, maxBatch int) int { + if t.nonClusterHost { + return 0 + } + if maxBatch < 1 { + maxBatch = 1 + } + + pods, err := t.client.CoreV1().Pods(common.CalicoNamespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + typhaLog.V(5).Info("Failed to list pods for stale IP check", "workload", workloadName, "error", err) + return 0 + } + + // Build a map of node name → InternalIP from the informer cache. + nodeInternalIPs := map[string]string{} + for _, obj := range t.indexInformer.GetIndexer().List() { + n := obj.(*v1.Node) + for _, addr := range n.Status.Addresses { + if addr.Type == v1.NodeInternalIP { + nodeInternalIPs[n.Name] = addr.Address + break + } + } + } + + deleted := 0 + for i := range pods.Items { + if deleted >= maxBatch { + break + } + pod := &pods.Items[i] + if pod.Spec.NodeName == "" { + continue + } + nodeIP, ok := nodeInternalIPs[pod.Spec.NodeName] + if !ok { + continue + } + + // Check if any of the pod's IPs match the node's current InternalIP. + match := false + for _, podIP := range pod.Status.PodIPs { + if podIP.IP == nodeIP { + match = true + break + } + } + if match { + continue + } + + // Pod IP is stale — delete the pod so the owning controller recreates + // it with the correct IP. + podIPs := make([]string, len(pod.Status.PodIPs)) + for j, pip := range pod.Status.PodIPs { + podIPs[j] = pip.IP + } + typhaLog.Info("Pod has stale IP after node IP change; deleting pod so it gets recreated with the correct IP", + "workload", workloadName, "pod", pod.Name, "node", pod.Spec.NodeName, + "podIPs", podIPs, "nodeInternalIP", nodeIP) + if err := t.client.CoreV1().Pods(common.CalicoNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}); err != nil { + typhaLog.Error(err, "Failed to delete pod with stale IP", "workload", workloadName, "pod", pod.Name) + continue + } + deleted++ + } + return deleted +} + +// resolveTyphaMaxUnavailable reads the maxUnavailable value from the Typha +// PodDisruptionBudget and resolves it to an absolute pod count using the +// current Typha replica count. Returns 1 if the PDB doesn't exist, doesn't +// have maxUnavailable set, or if the resolved value is < 1 (so progress +// is always guaranteed). +func (t *typhaAutoscaler) resolveTyphaMaxUnavailable() int { + const fallback = 1 + pdb, err := t.client.PolicyV1().PodDisruptionBudgets(common.CalicoNamespace).Get( + context.Background(), common.TyphaDeploymentName, metav1.GetOptions{}, + ) + if err != nil || pdb.Spec.MaxUnavailable == nil { + return fallback + } + replicas := int(t.activeReplicas) + if replicas <= 0 { + // activeReplicas is populated by the informer; fall back to fetching + // the deployment if it hasn't been observed yet. + typha, err := t.client.AppsV1().Deployments(common.CalicoNamespace).Get( + context.Background(), common.TyphaDeploymentName, metav1.GetOptions{}, + ) + if err == nil && typha.Spec.Replicas != nil { + replicas = int(*typha.Spec.Replicas) + } + } + if replicas < 1 { + return fallback + } + val, err := intstr.GetScaledValueFromIntOrPercent(pdb.Spec.MaxUnavailable, replicas, true) + if err != nil || val < 1 { + return fallback + } + return val +} + +// resolveDaemonSetMaxUnavailable reads the maxUnavailable value from the named +// DaemonSet's update strategy and resolves it to an absolute pod count using +// the desired DaemonSet pod count. Returns 1 if the DaemonSet doesn't exist, +// doesn't have a RollingUpdate strategy, or if the resolved value is < 1. +func (t *typhaAutoscaler) resolveDaemonSetMaxUnavailable(name string) int { + const fallback = 1 + ds, err := t.client.AppsV1().DaemonSets(common.CalicoNamespace).Get( + context.Background(), name, metav1.GetOptions{}, + ) + if err != nil { + return fallback + } + if ds.Spec.UpdateStrategy.RollingUpdate == nil || + ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable == nil { + return fallback + } + desired := int(ds.Status.DesiredNumberScheduled) + if desired < 1 { + return fallback + } + val, err := intstr.GetScaledValueFromIntOrPercent( + ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, desired, true, + ) + if err != nil || val < 1 { + return fallback + } + return val +} + // getHostEndpointCounts returns the number of host endpoints in the cluster that are not created by the kube-controllers. func (t *typhaAutoscaler) getHostEndpointCounts() int { heps := 0 diff --git a/pkg/controller/installation/typha_autoscaler_test.go b/pkg/controller/installation/typha_autoscaler_test.go index 3ddedcef51..f16ff0ef49 100644 --- a/pkg/controller/installation/typha_autoscaler_test.go +++ b/pkg/controller/installation/typha_autoscaler_test.go @@ -29,8 +29,10 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes" kfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" @@ -273,6 +275,384 @@ var _ = Describe("Test typha autoscaler ", func() { statusManager.AssertExpectations(GinkgoT()) }) + + Context("stale pod IP detection", func() { + var ta *typhaAutoscaler + + const ( + typhaSelector = "k8s-app=calico-typha" + calicoNodeSelector = "k8s-app=calico-node" + windowsNodeSelector = "k8s-app=calico-node-windows" + ) + + BeforeEach(func() { + // The autoscaler may degrade if the first tick fires before nodes are + // added by the test body (race against ta.start). We don't care about + // scaling behavior in these tests; allow any SetDegraded call. + statusManager.On("SetDegraded", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() + ta = newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, typhaAutoscalerOptionPeriod(10*time.Millisecond)) + ta.start(ctx) + }) + + createNodeWithIP := func(name, ip string) *corev1.Node { + node := CreateNode(c, name, map[string]string{"kubernetes.io/os": "linux"}, nil) + node.Status.Addresses = []corev1.NodeAddress{ + {Type: corev1.NodeInternalIP, Address: ip}, + } + var err error + node, err = c.CoreV1().Nodes().UpdateStatus(ctx, node, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + return node + } + + createPodWithLabel := func(name, nodeName, podIP, k8sApp string) *corev1.Pod { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "calico-system", + Labels: map[string]string{"k8s-app": k8sApp}, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + }, + Status: corev1.PodStatus{ + PodIPs: []corev1.PodIP{{IP: podIP}}, + }, + } + var err error + pod, err = c.CoreV1().Pods("calico-system").Create(ctx, pod, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + // fake client doesn't persist status on Create; update it separately. + pod.Status.PodIPs = []corev1.PodIP{{IP: podIP}} + pod, err = c.CoreV1().Pods("calico-system").UpdateStatus(ctx, pod, metav1.UpdateOptions{}) + Expect(err).To(BeNil()) + return pod + } + + // waitForNodes blocks until the node informer has observed n schedulable nodes. + waitForNodes := func(n int) { + EventuallyWithOffset(1, func() int { + all, _ := ta.getNodeCounts() + return all + }, 5*time.Second).Should(Equal(n)) + } + + listPods := func(labelSelector string) []corev1.Pod { + pods, err := c.CoreV1().Pods("calico-system").List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + Expect(err).To(BeNil()) + return pods.Items + } + + It("returns 0 and deletes nothing when all pod IPs match the node InternalIP", func() { + createNodeWithIP("node1", "10.0.0.1") + createPodWithLabel("typha-abc", "node1", "10.0.0.1", "calico-typha") + waitForNodes(1) + + deleted := ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1) + Expect(deleted).To(Equal(0)) + Expect(listPods(typhaSelector)).To(HaveLen(1)) + }) + + It("returns 1 and deletes a Typha pod whose IP doesn't match the node InternalIP", func() { + createNodeWithIP("node1", "10.0.0.2") + createPodWithLabel("typha-abc", "node1", "10.0.0.1", "calico-typha") + waitForNodes(1) + + deleted := ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1) + Expect(deleted).To(Equal(1)) + Expect(listPods(typhaSelector)).To(HaveLen(0)) + }) + + It("deletes a stale calico-node (Linux) pod", func() { + createNodeWithIP("node1", "10.0.0.2") + createPodWithLabel("calico-node-abc", "node1", "10.0.0.1", "calico-node") + waitForNodes(1) + + deleted := ta.deleteStaleHostNetworkPods("calico-node", calicoNodeSelector, 1) + Expect(deleted).To(Equal(1)) + Expect(listPods(calicoNodeSelector)).To(HaveLen(0)) + }) + + It("deletes a stale calico-node-windows pod", func() { + createNodeWithIP("node1", "10.0.0.2") + createPodWithLabel("calico-node-windows-abc", "node1", "10.0.0.1", "calico-node-windows") + waitForNodes(1) + + deleted := ta.deleteStaleHostNetworkPods("calico-node-windows", windowsNodeSelector, 1) + Expect(deleted).To(Equal(1)) + Expect(listPods(windowsNodeSelector)).To(HaveLen(0)) + }) + + It("respects maxBatch=1 (default): exactly one stale pod deleted per call", func() { + createNodeWithIP("node1", "10.0.0.2") + createNodeWithIP("node2", "10.0.0.4") + createPodWithLabel("typha-abc", "node1", "10.0.0.1", "calico-typha") + createPodWithLabel("typha-def", "node2", "10.0.0.3", "calico-typha") + waitForNodes(2) + + Expect(ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1)).To(Equal(1)) + Expect(listPods(typhaSelector)).To(HaveLen(1)) + + Expect(ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1)).To(Equal(1)) + Expect(listPods(typhaSelector)).To(HaveLen(0)) + }) + + It("respects maxBatch=N (>1): up to N stale pods deleted per call", func() { + for i := 0; i < 5; i++ { + nodeName := fmt.Sprintf("node%d", i) + createNodeWithIP(nodeName, fmt.Sprintf("10.0.0.%d", 100+i)) + createPodWithLabel(fmt.Sprintf("calico-node-%d", i), nodeName, fmt.Sprintf("10.0.0.%d", i+1), "calico-node") + } + waitForNodes(5) + + // maxBatch=3 → delete 3 of the 5 stale pods. + Expect(ta.deleteStaleHostNetworkPods("calico-node", calicoNodeSelector, 3)).To(Equal(3)) + Expect(listPods(calicoNodeSelector)).To(HaveLen(2)) + + // Next call cleans up the remaining 2. + Expect(ta.deleteStaleHostNetworkPods("calico-node", calicoNodeSelector, 3)).To(Equal(2)) + Expect(listPods(calicoNodeSelector)).To(HaveLen(0)) + }) + + It("treats maxBatch < 1 as 1 (minimum-progress fallback)", func() { + createNodeWithIP("node1", "10.0.0.2") + createPodWithLabel("typha-abc", "node1", "10.0.0.1", "calico-typha") + waitForNodes(1) + + // maxBatch=0 should still delete one pod (minimum-progress fallback). + Expect(ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 0)).To(Equal(1)) + Expect(listPods(typhaSelector)).To(HaveLen(0)) + }) + + It("does not delete a pod whose node is not in the informer cache", func() { + createPodWithLabel("typha-abc", "unknown-node", "10.0.0.1", "calico-typha") + + deleted := ta.deleteStaleHostNetworkPods("calico-typha", typhaSelector, 1) + Expect(deleted).To(Equal(0)) + Expect(listPods(typhaSelector)).To(HaveLen(1)) + }) + + It("paces Linux and Windows DaemonSets independently of each other", func() { + createNodeWithIP("node-linux", "10.0.0.2") + createNodeWithIP("node-win", "10.0.1.2") + createPodWithLabel("calico-node-abc", "node-linux", "10.0.0.1", "calico-node") + createPodWithLabel("calico-node-windows-abc", "node-win", "10.0.1.1", "calico-node-windows") + waitForNodes(2) + + // Both DaemonSets have a stale pod; each call deletes its own. + Expect(ta.deleteStaleHostNetworkPods("calico-node", calicoNodeSelector, 1)).To(Equal(1)) + Expect(ta.deleteStaleHostNetworkPods("calico-node-windows", windowsNodeSelector, 1)).To(Equal(1)) + Expect(listPods(calicoNodeSelector)).To(HaveLen(0)) + Expect(listPods(windowsNodeSelector)).To(HaveLen(0)) + }) + }) + + Context("maxUnavailable resolution", func() { + var ta *typhaAutoscaler + + BeforeEach(func() { + // Allow any degradation that may happen due to race with autoscaler ticks. + statusManager.On("SetDegraded", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() + ta = newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, typhaAutoscalerOptionPeriod(10*time.Millisecond)) + ta.start(ctx) + }) + + It("returns 1 for Typha when the PDB does not exist", func() { + Expect(ta.resolveTyphaMaxUnavailable()).To(Equal(1)) + }) + + It("resolves an int Typha PDB maxUnavailable", func() { + var replicas int32 = 5 + _, err := c.AppsV1().Deployments("calico-system").Create(ctx, &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, + Spec: appsv1.DeploymentSpec{Replicas: &replicas}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + mu := intstr.FromInt(2) + _, err = c.PolicyV1().PodDisruptionBudgets("calico-system").Create(ctx, &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, + Spec: policyv1.PodDisruptionBudgetSpec{MaxUnavailable: &mu}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + Expect(ta.resolveTyphaMaxUnavailable()).To(Equal(2)) + }) + + It("resolves a percentage Typha PDB maxUnavailable against replica count", func() { + var replicas int32 = 10 + _, err := c.AppsV1().Deployments("calico-system").Create(ctx, &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, + Spec: appsv1.DeploymentSpec{Replicas: &replicas}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + mu := intstr.FromString("25%") + _, err = c.PolicyV1().PodDisruptionBudgets("calico-system").Create(ctx, &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, + Spec: policyv1.PodDisruptionBudgetSpec{MaxUnavailable: &mu}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + // 25% of 10 = 3 (rounded up from 2.5). + Expect(ta.resolveTyphaMaxUnavailable()).To(Equal(3)) + }) + + It("returns 1 for Typha when maxUnavailable resolves to 0", func() { + var replicas int32 = 5 + _, err := c.AppsV1().Deployments("calico-system").Create(ctx, &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, + Spec: appsv1.DeploymentSpec{Replicas: &replicas}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + mu := intstr.FromInt(0) + _, err = c.PolicyV1().PodDisruptionBudgets("calico-system").Create(ctx, &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-typha", Namespace: "calico-system"}, + Spec: policyv1.PodDisruptionBudgetSpec{MaxUnavailable: &mu}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + Expect(ta.resolveTyphaMaxUnavailable()).To(Equal(1)) + }) + + It("returns 1 for a DaemonSet without RollingUpdate", func() { + _, err := c.AppsV1().DaemonSets("calico-system").Create(ctx, &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-node", Namespace: "calico-system"}, + Spec: appsv1.DaemonSetSpec{ + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{Type: appsv1.OnDeleteDaemonSetStrategyType}, + }, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + Expect(ta.resolveDaemonSetMaxUnavailable("calico-node")).To(Equal(1)) + }) + + It("resolves an int DaemonSet maxUnavailable", func() { + mu := intstr.FromInt(4) + _, err := c.AppsV1().DaemonSets("calico-system").Create(ctx, &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-node", Namespace: "calico-system"}, + Spec: appsv1.DaemonSetSpec{ + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateDaemonSet{ + MaxUnavailable: &mu, + }, + }, + }, + Status: appsv1.DaemonSetStatus{DesiredNumberScheduled: 100}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + Expect(ta.resolveDaemonSetMaxUnavailable("calico-node")).To(Equal(4)) + }) + + It("resolves a percentage DaemonSet maxUnavailable against desired pod count", func() { + mu := intstr.FromString("10%") + _, err := c.AppsV1().DaemonSets("calico-system").Create(ctx, &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{Name: "calico-node", Namespace: "calico-system"}, + Spec: appsv1.DaemonSetSpec{ + UpdateStrategy: appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateDaemonSet{ + MaxUnavailable: &mu, + }, + }, + }, + Status: appsv1.DaemonSetStatus{DesiredNumberScheduled: 100}, + }, metav1.CreateOptions{}) + Expect(err).To(BeNil()) + + // 10% of 100 = 10. + Expect(ta.resolveDaemonSetMaxUnavailable("calico-node")).To(Equal(10)) + }) + }) + + Context("stalePodIPRecoveryEnabled gate", func() { + const ( + typhaSelector = "k8s-app=calico-typha" + calicoNodeSelector = "k8s-app=calico-node" + ) + + BeforeEach(func() { + // The autoscaler may degrade if there aren't enough linux nodes to satisfy + // the expected typha scale. We don't care about scaling behavior here, so + // allow any SetDegraded call. + statusManager.On("SetDegraded", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe() + }) + + // stale-node has InternalIP 10.0.0.2; pods are placed on it with podIP 10.0.0.1 + // to make them stale. + ensureStaleNode := func() { + if _, err := c.CoreV1().Nodes().Get(ctx, "stale-node", metav1.GetOptions{}); err == nil { + return + } + node := CreateNode(c, "stale-node", map[string]string{"kubernetes.io/os": "linux"}, nil) + node.Status.Addresses = []corev1.NodeAddress{{Type: corev1.NodeInternalIP, Address: "10.0.0.2"}} + _, err := c.CoreV1().Nodes().UpdateStatus(ctx, node, metav1.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + } + + createStalePod := func(name, k8sApp string) { + ensureStaleNode() + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, Namespace: "calico-system", + Labels: map[string]string{"k8s-app": k8sApp}, + }, + Spec: corev1.PodSpec{NodeName: "stale-node"}, + Status: corev1.PodStatus{PodIPs: []corev1.PodIP{{IP: "10.0.0.1"}}}, + } + pod, err := c.CoreV1().Pods("calico-system").Create(ctx, pod, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + pod.Status.PodIPs = []corev1.PodIP{{IP: "10.0.0.1"}} + _, err = c.CoreV1().Pods("calico-system").UpdateStatus(ctx, pod, metav1.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + } + + listPods := func(selector string) []corev1.Pod { + pods, err := c.CoreV1().Pods("calico-system").List(ctx, metav1.ListOptions{LabelSelector: selector}) + Expect(err).NotTo(HaveOccurred()) + return pods.Items + } + + It("nil getter (default) is treated as enabled", func() { + ta := newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, typhaAutoscalerOptionPeriod(10*time.Millisecond)) + ta.start(ctx) + + createStalePod("typha-abc", "calico-typha") + Eventually(func() int { return len(listPods(typhaSelector)) }, 5*time.Second).Should(Equal(0)) + }) + + It("getter returning true allows deletions to proceed", func() { + ta := newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, + typhaAutoscalerOptionPeriod(10*time.Millisecond), + typhaAutoscalerOptionStalePodIPRecoveryEnabled(func() bool { return true }), + ) + ta.start(ctx) + + createStalePod("typha-abc", "calico-typha") + Eventually(func() int { return len(listPods(typhaSelector)) }, 5*time.Second).Should(Equal(0)) + }) + + It("getter returning false suppresses all deletions", func() { + ta := newTyphaAutoscaler(c, nodeIndexInformer, tlw, statusManager, + typhaAutoscalerOptionPeriod(10*time.Millisecond), + typhaAutoscalerOptionStalePodIPRecoveryEnabled(func() bool { return false }), + ) + ta.start(ctx) + + createStalePod("typha-abc", "calico-typha") + createStalePod("calico-node-abc", "calico-node") + + // Wait long enough for several ticks; nothing should be deleted. + Consistently(func() int { + return len(listPods(typhaSelector)) + len(listPods(calicoNodeSelector)) + }, 200*time.Millisecond, 20*time.Millisecond).Should(Equal(2)) + }) + }) }) func verifyTyphaReplicas(c kubernetes.Interface, expectedReplicas int) { diff --git a/pkg/imports/crds/operator/operator.tigera.io_installations.yaml b/pkg/imports/crds/operator/operator.tigera.io_installations.yaml index 3e6fa9d2ea..7de68a7da8 100644 --- a/pkg/imports/crds/operator/operator.tigera.io_installations.yaml +++ b/pkg/imports/crds/operator/operator.tigera.io_installations.yaml @@ -7358,6 +7358,19 @@ spec: items: type: string type: array + stalePodIPRecovery: + description: |- + StalePodIPRecovery enables automatic detection and deletion of host-networked Calico pods + (calico-typha, calico-node, calico-node-windows) whose status.podIPs no longer matches their + node's current InternalIP. When stale pod IPs are detected (e.g., after a node reboot pulls + a new DHCP lease), the operator deletes affected pods so the Deployment / DaemonSet + controllers recreate them with the correct IP. This works around an upstream Kubernetes + limitation where status.podIPs is immutable for hostNetwork pods. + Default: Enabled + enum: + - Enabled + - Disabled + type: string tlsCipherSuites: description: TLSCipherSuites defines the cipher suite list that the @@ -16685,6 +16698,19 @@ spec: items: type: string type: array + stalePodIPRecovery: + description: |- + StalePodIPRecovery enables automatic detection and deletion of host-networked Calico pods + (calico-typha, calico-node, calico-node-windows) whose status.podIPs no longer matches their + node's current InternalIP. When stale pod IPs are detected (e.g., after a node reboot pulls + a new DHCP lease), the operator deletes affected pods so the Deployment / DaemonSet + controllers recreate them with the correct IP. This works around an upstream Kubernetes + limitation where status.podIPs is immutable for hostNetwork pods. + Default: Enabled + enum: + - Enabled + - Disabled + type: string tlsCipherSuites: description: TLSCipherSuites defines the cipher suite list that