Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions api/v1/installation_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,17 @@ type InstallationSpec struct {
// +optional
NodeUpdateStrategy appsv1.DaemonSetUpdateStrategy `json:"nodeUpdateStrategy,omitempty"`

// StalePodIPRecovery enables automatic detection and deletion of host-networked Calico pods
// (calico-typha, calico-node, calico-node-windows) whose status.podIPs no longer matches their
// node's current InternalIP. When stale pod IPs are detected (e.g., after a node reboot pulls
// a new DHCP lease), the operator deletes affected pods so the Deployment / DaemonSet
// controllers recreate them with the correct IP. This works around an upstream Kubernetes
// limitation where status.podIPs is immutable for hostNetwork pods.
// Default: Enabled
// +kubebuilder:validation:Enum=Enabled;Disabled
// +optional
StalePodIPRecovery *StalePodIPRecoveryType `json:"stalePodIPRecovery,omitempty"`

// Deprecated. Please use CalicoNodeDaemonSet, TyphaDeployment, and KubeControllersDeployment.
// ComponentResources can be used to customize the resource requirements for each component.
// Node, Typha, and KubeControllers are supported for installations.
Expand Down Expand Up @@ -350,6 +361,15 @@ const (
FIPSModeDisabled FIPSMode = "Disabled"
)

// StalePodIPRecoveryType controls whether the operator automatically detects and recreates
// host-networked Calico pods with stale status.podIPs after a node IP change.
type StalePodIPRecoveryType string

const (
StalePodIPRecoveryEnabled StalePodIPRecoveryType = "Enabled"
StalePodIPRecoveryDisabled StalePodIPRecoveryType = "Disabled"
)

// Deprecated. Please use TyphaDeployment instead.
// TyphaAffinity allows configuration of node affinity characteristics for Typha pods.
type TyphaAffinity struct {
Expand Down Expand Up @@ -1109,6 +1129,12 @@ func IsFIPSModeEnabledString(mode *FIPSMode) string {
return fmt.Sprintf("%t", IsFIPSModeEnabled(mode))
}

// IsStalePodIPRecoveryEnabled returns whether stale pod IP recovery is enabled. The behavior
// is default-on, so a nil reference means enabled.
func IsStalePodIPRecoveryEnabled(s *StalePodIPRecoveryType) bool {
return s == nil || *s == StalePodIPRecoveryEnabled
}

type WindowsNodeSpec struct {
// CNIBinDir is the path to the CNI binaries directory on Windows, it must match what is used as 'bin_dir' under
// [plugins]
Expand Down
5 changes: 5 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 18 additions & 2 deletions pkg/controller/installation/core_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,25 @@ func newReconciler(mgr manager.Manager, opts options.ControllerOptions) (*Reconc
nodeIndexInformer := cache.NewSharedIndexInformer(nodeListWatch, &corev1.Node{}, 0, cache.Indexers{})
go nodeIndexInformer.Run(opts.ShutdownContext.Done())

// Create a Typha autoscaler.
// Create a Typha autoscaler. Stale pod IP recovery defers to the current
// Installation.Spec.StalePodIPRecovery setting on each tick (default-on if unset).
// If the Installation can't be read for any reason, fail-open and let the recovery
// run — it's the safer behavior for the kubelet bug we're working around.
mgrClient := mgr.GetClient()
typhaListWatch := cache.NewListWatchFromClient(opts.K8sClientset.AppsV1().RESTClient(), "deployments", "calico-system", fields.OneTermEqualSelector("metadata.name", "calico-typha"))
typhaScaler := newTyphaAutoscaler(opts.K8sClientset, nodeIndexInformer, typhaListWatch, statusManager)
typhaScaler := newTyphaAutoscaler(
opts.K8sClientset,
nodeIndexInformer,
typhaListWatch,
statusManager,
typhaAutoscalerOptionStalePodIPRecoveryEnabled(func() bool {
inst := &operatorv1.Installation{}
if err := mgrClient.Get(opts.ShutdownContext, types.NamespacedName{Name: "default"}, inst); err != nil {
return true
}
return operatorv1.IsStalePodIPRecoveryEnabled(inst.Spec.StalePodIPRecovery)
}),
)

r := &ReconcileInstallation{
config: mgr.GetConfig(),
Expand Down
197 changes: 197 additions & 0 deletions pkg/controller/installation/typha_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
logf "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -57,6 +58,10 @@ type typhaAutoscaler struct {
typhaIndexer cache.Store
nonClusterHost bool

// stalePodIPRecoveryEnabled returns whether the operator should detect and delete
// host-networked pods with stale status.podIPs each tick. Default-on if nil.
stalePodIPRecoveryEnabled func() bool

// Number of currently running replicas.
activeReplicas int32
}
Expand All @@ -77,6 +82,15 @@ func typhaAutoscalerOptionNonclusterHost(nonClusterHost bool) typhaAutoscalerOpt
}
}

// typhaAutoscalerOptionStalePodIPRecoveryEnabled provides a getter that the autoscaler will
// call each tick to determine whether stale-IP pod recovery is enabled. If unset, recovery
// is always enabled.
func typhaAutoscalerOptionStalePodIPRecoveryEnabled(enabled func() bool) typhaAutoscalerOption {
return func(t *typhaAutoscaler) {
t.stalePodIPRecoveryEnabled = enabled
}
}

// newTyphaAutoscaler creates a new Typha autoscaler, optionally applying any options to the default autoscaler instance.
// The default sync period is 10 seconds.
func newTyphaAutoscaler(cs kubernetes.Interface, indexInformer cache.SharedIndexInformer, typhaListWatch cache.ListerWatcher, statusManager status.StatusManager, options ...typhaAutoscalerOption) *typhaAutoscaler {
Expand Down Expand Up @@ -156,6 +170,40 @@ func (t *typhaAutoscaler) start(ctx context.Context) {
} else {
degraded = false
}

// Check for host-networked pods with stale IPs (e.g., after a node
// IP change) and delete them so they get recreated with the correct
// IP. Typha is checked first; if any Typha pod was deleted this
// cycle, calico-node deletions are skipped to give the new Typha a
// clean window to come up before churning calico-node pods that
// depend on it.
//
// Skip the entire check if stale-IP recovery has been disabled
// via Installation.Spec.StalePodIPRecovery.
if t.stalePodIPRecoveryEnabled == nil || t.stalePodIPRecoveryEnabled() {
typhaBatch := t.resolveTyphaMaxUnavailable()
deletedTypha := t.deleteStaleHostNetworkPods(
"calico-typha",
fmt.Sprintf("%s=%s", render.AppLabelName, render.TyphaK8sAppName),
typhaBatch,
) > 0
if !deletedTypha {
// Linux and Windows DaemonSets are paced independently of each
// other.
linuxBatch := t.resolveDaemonSetMaxUnavailable(render.CalicoNodeObjectName)
t.deleteStaleHostNetworkPods(
"calico-node",
fmt.Sprintf("%s=%s", render.AppLabelName, render.CalicoNodeObjectName),
linuxBatch,
)
windowsBatch := t.resolveDaemonSetMaxUnavailable(render.WindowsNodeObjectName)
t.deleteStaleHostNetworkPods(
"calico-node-windows",
fmt.Sprintf("%s=%s", render.AppLabelName, render.WindowsNodeObjectName),
windowsBatch,
)
}
}
case errCh := <-t.triggerRunChan:
if err := t.autoscaleReplicas(); err != nil {
degraded = true
Expand Down Expand Up @@ -280,6 +328,155 @@ func (t *typhaAutoscaler) getNodeCounts() (int, int) {
return schedulable, linuxNodes
}

// deleteStaleHostNetworkPods lists pods matching labelSelector in the
// calico-system namespace and compares each pod's status.podIPs against the
// current InternalIP of the node the pod is running on. If the IPs don't match
// (stale pod IP after a node IP change), up to maxBatch pods are deleted so the
// owning controller (Deployment / DaemonSet) recreates them with the correct IP.
//
// This is necessary because Kubernetes does not update status.podIPs for
// existing hostNetwork pods when the node's IP changes — it is explicitly
// immutable in the kubelet:
// https://github.com/kubernetes/kubernetes/issues/93897.
//
// Returns the number of pods deleted in this call.
//
// workloadName is used only for logging.
func (t *typhaAutoscaler) deleteStaleHostNetworkPods(workloadName, labelSelector string, maxBatch int) int {
if t.nonClusterHost {
return 0
}
if maxBatch < 1 {
maxBatch = 1
}

pods, err := t.client.CoreV1().Pods(common.CalicoNamespace).List(context.Background(), metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
typhaLog.V(5).Info("Failed to list pods for stale IP check", "workload", workloadName, "error", err)
return 0
}

// Build a map of node name → InternalIP from the informer cache.
nodeInternalIPs := map[string]string{}
for _, obj := range t.indexInformer.GetIndexer().List() {
n := obj.(*v1.Node)
for _, addr := range n.Status.Addresses {
if addr.Type == v1.NodeInternalIP {
nodeInternalIPs[n.Name] = addr.Address
break
}
}
}

deleted := 0
for i := range pods.Items {
if deleted >= maxBatch {
break
}
pod := &pods.Items[i]
if pod.Spec.NodeName == "" {
continue
}
nodeIP, ok := nodeInternalIPs[pod.Spec.NodeName]
if !ok {
continue
}

// Check if any of the pod's IPs match the node's current InternalIP.
match := false
for _, podIP := range pod.Status.PodIPs {
if podIP.IP == nodeIP {
match = true
break
}
}
if match {
continue
}

// Pod IP is stale — delete the pod so the owning controller recreates
// it with the correct IP.
podIPs := make([]string, len(pod.Status.PodIPs))
for j, pip := range pod.Status.PodIPs {
podIPs[j] = pip.IP
}
typhaLog.Info("Pod has stale IP after node IP change; deleting pod so it gets recreated with the correct IP",
"workload", workloadName, "pod", pod.Name, "node", pod.Spec.NodeName,
"podIPs", podIPs, "nodeInternalIP", nodeIP)
if err := t.client.CoreV1().Pods(common.CalicoNamespace).Delete(context.Background(), pod.Name, metav1.DeleteOptions{}); err != nil {
typhaLog.Error(err, "Failed to delete pod with stale IP", "workload", workloadName, "pod", pod.Name)
continue
}
deleted++
}
return deleted
}

// resolveTyphaMaxUnavailable reads the maxUnavailable value from the Typha
// PodDisruptionBudget and resolves it to an absolute pod count using the
// current Typha replica count. Returns 1 if the PDB doesn't exist, doesn't
// have maxUnavailable set, or if the resolved value is < 1 (so progress
// is always guaranteed).
func (t *typhaAutoscaler) resolveTyphaMaxUnavailable() int {
const fallback = 1
pdb, err := t.client.PolicyV1().PodDisruptionBudgets(common.CalicoNamespace).Get(
context.Background(), common.TyphaDeploymentName, metav1.GetOptions{},
)
if err != nil || pdb.Spec.MaxUnavailable == nil {
return fallback
}
replicas := int(t.activeReplicas)
if replicas <= 0 {
// activeReplicas is populated by the informer; fall back to fetching
// the deployment if it hasn't been observed yet.
typha, err := t.client.AppsV1().Deployments(common.CalicoNamespace).Get(
context.Background(), common.TyphaDeploymentName, metav1.GetOptions{},
)
if err == nil && typha.Spec.Replicas != nil {
replicas = int(*typha.Spec.Replicas)
}
}
if replicas < 1 {
return fallback
}
val, err := intstr.GetScaledValueFromIntOrPercent(pdb.Spec.MaxUnavailable, replicas, true)
if err != nil || val < 1 {
return fallback
}
return val
}

// resolveDaemonSetMaxUnavailable reads the maxUnavailable value from the named
// DaemonSet's update strategy and resolves it to an absolute pod count using
// the desired DaemonSet pod count. Returns 1 if the DaemonSet doesn't exist,
// doesn't have a RollingUpdate strategy, or if the resolved value is < 1.
func (t *typhaAutoscaler) resolveDaemonSetMaxUnavailable(name string) int {
const fallback = 1
ds, err := t.client.AppsV1().DaemonSets(common.CalicoNamespace).Get(
context.Background(), name, metav1.GetOptions{},
)
if err != nil {
return fallback
}
if ds.Spec.UpdateStrategy.RollingUpdate == nil ||
ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable == nil {
return fallback
}
desired := int(ds.Status.DesiredNumberScheduled)
if desired < 1 {
return fallback
}
val, err := intstr.GetScaledValueFromIntOrPercent(
ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, desired, true,
)
if err != nil || val < 1 {
return fallback
}
return val
}

// getHostEndpointCounts returns the number of host endpoints in the cluster that are not created by the kube-controllers.
func (t *typhaAutoscaler) getHostEndpointCounts() int {
heps := 0
Expand Down
Loading
Loading