From f1cafea324273486ec74ec0b2c5955899363e261 Mon Sep 17 00:00:00 2001 From: Daniel Vaseekaran Date: Tue, 16 Jun 2026 14:41:23 -0400 Subject: [PATCH 1/2] Sync Scaleops Resource and Affinities --- api/v1beta1/kafkacluster_types.go | 12 +- charts/kafka-operator/crds/kafkaclusters.yaml | 8 +- .../kafka.banzaicloud.io_kafkaclusters.yaml | 8 +- pkg/resources/kafka/kafka.go | 8 + pkg/resources/kafka/util.go | 188 ++++++ pkg/resources/kafka/util_test.go | 563 ++++++++++++++++++ 6 files changed, 782 insertions(+), 5 deletions(-) diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index 7ccae1cee..02166f53c 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -142,9 +142,15 @@ type KafkaClusterSpec struct { // This is default to be true; if set to false, the Kafka cluster is in ZooKeeper mode. // +kubebuilder:default=false // +optional - KRaftMode bool `json:"kRaft"` - HeadlessServiceEnabled bool `json:"headlessServiceEnabled"` - ListenersConfig ListenersConfig `json:"listenersConfig"` + KRaftMode bool `json:"kRaft"` + HeadlessServiceEnabled bool `json:"headlessServiceEnabled"` + // Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods. + // This Disables CPU and Memory request reconciliation from the desired state defined in + // the KafkaCluster to the current state in the Kubernetes Cluster + // +kubebuilder:default=false + // +optional + ScaleOpsEnabled bool `json:"scaleOpsEnabled"` + ListenersConfig ListenersConfig `json:"listenersConfig,omitempty"` // Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"` // ZKAddresses specifies the ZooKeeper connection string diff --git a/charts/kafka-operator/crds/kafkaclusters.yaml b/charts/kafka-operator/crds/kafkaclusters.yaml index 7abba845c..8505a6815 100644 --- a/charts/kafka-operator/crds/kafkaclusters.yaml +++ b/charts/kafka-operator/crds/kafkaclusters.yaml @@ -23002,6 +23002,13 @@ spec: required: - failureThreshold type: object + scaleOpsEnabled: + default: false + description: |- + Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods. + This Disables CPU and Memory request reconciliation from the desired state defined in + the KafkaCluster to the current state in the Kubernetes Cluster + type: boolean taintedBrokersSelector: description: Selector for broker pods that need to be recycled/reconciled properties: @@ -23067,7 +23074,6 @@ spec: - brokers - cruiseControlConfig - headlessServiceEnabled - - listenersConfig - oneBrokerPerNode - rollingUpgradeConfig type: object diff --git a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml index 7abba845c..8505a6815 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml @@ -23002,6 +23002,13 @@ spec: required: - failureThreshold type: object + scaleOpsEnabled: + default: false + description: |- + Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods. + This Disables CPU and Memory request reconciliation from the desired state defined in + the KafkaCluster to the current state in the Kubernetes Cluster + type: boolean taintedBrokersSelector: description: Selector for broker pods that need to be recycled/reconciled properties: @@ -23067,7 +23074,6 @@ spec: - brokers - cruiseControlConfig - headlessServiceEnabled - - listenersConfig - oneBrokerPerNode - rollingUpgradeConfig type: object diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index e2a5c1168..354a63de7 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -956,6 +956,14 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo } desiredPod.Spec.Tolerations = uniqueTolerations } + if r.KafkaCluster.Spec.ScaleOpsEnabled { + // if resources requets are updated by scale ops, we need to sync them to desiredPod, + // otherwise they will be removed and cause pod restart + syncResourceRequests(desiredPod, currentPod) + // If current pod had affinities created by ScaleOps, we need to sync them to desiredPod, + // otherwise they will be removed and cause pod restart + syncScaleOpsAffinities(desiredPod, currentPod) + } // Check if the resource actually updated or if labels match TaintedBrokersSelector patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod) switch { diff --git a/pkg/resources/kafka/util.go b/pkg/resources/kafka/util.go index cfafbae14..d3c9c8c67 100644 --- a/pkg/resources/kafka/util.go +++ b/pkg/resources/kafka/util.go @@ -18,9 +18,11 @@ package kafka import ( "encoding/base64" "fmt" + "reflect" "sort" "github.com/google/uuid" + corev1 "k8s.io/api/core/v1" "github.com/banzaicloud/koperator/api/v1beta1" ) @@ -73,3 +75,189 @@ func generateRandomClusterID() string { randomUUID := uuid.New() return base64.URLEncoding.EncodeToString(randomUUID[:]) } + +// syncResourceRequests overwrites CPU and memory requests in desiredPod's containers +// with the values from currentPod so that request-only changes do not trigger a pod restart. +func syncResourceRequests(desiredPod, currentPod *corev1.Pod) { + syncContainerResourceRequests(desiredPod.Spec.Containers, currentPod.Spec.Containers) + syncContainerResourceRequests(desiredPod.Spec.InitContainers, currentPod.Spec.InitContainers) +} + +func syncContainerResourceRequests(desired, current []corev1.Container) { + index := make(map[string]corev1.ResourceList, len(current)) + for _, c := range current { + index[c.Name] = c.Resources.Requests + } + for i := range desired { + c := &desired[i] + reqs, ok := index[c.Name] + if !ok { + continue + } + if c.Resources.Requests == nil { + c.Resources.Requests = make(corev1.ResourceList) + } + for _, res := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} { + if val, exists := reqs[res]; exists { + c.Resources.Requests[res] = val + } else { + delete(c.Resources.Requests, res) + } + } + } +} + +// syncScaleOpsAffinities syncs all scale ops related affinities from the current pod to the desired pod. +// This includes pod affinities with "scaleops.sh/managed-unevictable" label selector +// and node affinities with "scaleops.sh/node-packing=true" selector. +func syncScaleOpsAffinities(desiredPod, currentPod *corev1.Pod) { + syncScaleOpsPodAffinities(desiredPod, currentPod) + syncScaleOpsNodeAffinities(desiredPod, currentPod) +} + +// syncScaleOpsPodAffinities syncs preferred pod affinities with "scaleops.sh/managed-unevictable" +// label selector from current pod to desired pod. +func syncScaleOpsPodAffinities(desiredPod, currentPod *corev1.Pod) { + if currentPod.Spec.Affinity == nil || currentPod.Spec.Affinity.PodAffinity == nil { + return + } + + currentPodAffinity := currentPod.Spec.Affinity.PodAffinity + + // Filter preferred pod affinities with "scaleops.sh/managed-unevictable" label selector + var scaleOpsPreferredAffinities []corev1.WeightedPodAffinityTerm + if currentPodAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil { + for _, term := range currentPodAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + if term.PodAffinityTerm.LabelSelector != nil { + hasScaleOpsLabel := false + + // Check MatchExpressions + for _, requirement := range term.PodAffinityTerm.LabelSelector.MatchExpressions { + if requirement.Key == "scaleops.sh/managed-unevictable" { + hasScaleOpsLabel = true + break + } + } + + // Check MatchLabels if not found in MatchExpressions + if !hasScaleOpsLabel { + if _, exists := term.PodAffinityTerm.LabelSelector.MatchLabels["scaleops.sh/managed-unevictable"]; exists { + hasScaleOpsLabel = true + } + } + + if hasScaleOpsLabel { + scaleOpsPreferredAffinities = append(scaleOpsPreferredAffinities, term) + } + } + } + } + + // If we found any scale ops preferred affinities, add them to the desired pod + if len(scaleOpsPreferredAffinities) > 0 { + if desiredPod.Spec.Affinity == nil { + desiredPod.Spec.Affinity = &corev1.Affinity{} + } + if desiredPod.Spec.Affinity.PodAffinity == nil { + desiredPod.Spec.Affinity.PodAffinity = &corev1.PodAffinity{} + } + + // Merge scale ops preferred affinities, avoiding duplicates + existingTerms := desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution + for _, newTerm := range scaleOpsPreferredAffinities { + // Check if this term already exists + found := false + for _, existing := range existingTerms { + if reflect.DeepEqual(existing.PodAffinityTerm, newTerm.PodAffinityTerm) && existing.Weight == newTerm.Weight { + found = true + break + } + } + if !found { + existingTerms = append(existingTerms, newTerm) + } + } + desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = existingTerms + } +} + +// syncScaleOpsNodeAffinities syncs preferred node affinities with "scaleops.sh/node-packing=true" +// selector from current pod to desired pod. +func syncScaleOpsNodeAffinities(desiredPod, currentPod *corev1.Pod) { + if currentPod.Spec.Affinity == nil || currentPod.Spec.Affinity.NodeAffinity == nil { + return + } + + currentNodeAffinity := currentPod.Spec.Affinity.NodeAffinity + + // Filter preferred node affinities with "scaleops.sh/node-packing=true" selector + var scaleOpsPreferredTerms []corev1.PreferredSchedulingTerm + if currentNodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil { + for _, term := range currentNodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + hasScaleOpsNodePacking := false + + // Check MatchExpressions + for _, requirement := range term.Preference.MatchExpressions { + if requirement.Key == "scaleops.sh/node-packing" { + for _, val := range requirement.Values { + if val == "true" { + hasScaleOpsNodePacking = true + break + } + } + if hasScaleOpsNodePacking { + break + } + } + } + + // Check MatchFields if not found in MatchExpressions + if !hasScaleOpsNodePacking { + for _, requirement := range term.Preference.MatchFields { + if requirement.Key == "scaleops.sh/node-packing" { + for _, val := range requirement.Values { + if val == "true" { + hasScaleOpsNodePacking = true + break + } + } + if hasScaleOpsNodePacking { + break + } + } + } + } + + if hasScaleOpsNodePacking { + scaleOpsPreferredTerms = append(scaleOpsPreferredTerms, term) + } + } + } + + // If we found any scale ops node affinities, add them to the desired pod + if len(scaleOpsPreferredTerms) > 0 { + if desiredPod.Spec.Affinity == nil { + desiredPod.Spec.Affinity = &corev1.Affinity{} + } + if desiredPod.Spec.Affinity.NodeAffinity == nil { + desiredPod.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{} + } + + // Merge scale ops node affinities, avoiding duplicates + existingTerms := desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution + for _, newTerm := range scaleOpsPreferredTerms { + // Check if this term already exists + found := false + for _, existing := range existingTerms { + if reflect.DeepEqual(existing.Preference, newTerm.Preference) && existing.Weight == newTerm.Weight { + found = true + break + } + } + if !found { + existingTerms = append(existingTerms, newTerm) + } + } + desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = existingTerms + } +} diff --git a/pkg/resources/kafka/util_test.go b/pkg/resources/kafka/util_test.go index d4c04045e..96f9db5eb 100644 --- a/pkg/resources/kafka/util_test.go +++ b/pkg/resources/kafka/util_test.go @@ -20,6 +20,9 @@ import ( "reflect" "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/banzaicloud/koperator/api/v1beta1" ) @@ -402,3 +405,563 @@ func TestGenerateQuorumVoters(t *testing.T) { }) } } + +func TestSyncScaleOpsPodAffinities(t *testing.T) { + tests := []struct { + name string + currentPod *corev1.Pod + desiredPod *corev1.Pod + expectedPodAffinity bool + expectedTermCount int + }{ + { + name: "no affinity in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedPodAffinity: false, + expectedTermCount: 0, + }, + { + name: "no pod affinity in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{}, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedPodAffinity: false, + expectedTermCount: 0, + }, + { + name: "pod affinity with scaleops managed-unevictable in MatchLabels", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "scaleops.sh/managed-unevictable": "true", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedPodAffinity: true, + expectedTermCount: 1, + }, + { + name: "pod affinity with scaleops managed-unevictable in MatchExpressions", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 50, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "scaleops.sh/managed-unevictable", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedPodAffinity: true, + expectedTermCount: 1, + }, + { + name: "pod affinity with mixed terms, only scaleops managed-unevictable should be synced", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "other", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + { + Weight: 50, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "scaleops.sh/managed-unevictable": "true", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedPodAffinity: true, + expectedTermCount: 1, + }, + { + name: "desired pod already has pod affinity, scaleops affinity should be merged", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "scaleops.sh/managed-unevictable": "true", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 80, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "myapp", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + }, + }, + }, + expectedPodAffinity: true, + expectedTermCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncScaleOpsPodAffinities(tt.desiredPod, tt.currentPod) + + if !tt.expectedPodAffinity { + if tt.desiredPod.Spec.Affinity != nil && tt.desiredPod.Spec.Affinity.PodAffinity != nil { + t.Errorf("expected no pod affinity, but got one") + } + return + } + + if tt.desiredPod.Spec.Affinity == nil || tt.desiredPod.Spec.Affinity.PodAffinity == nil { + t.Errorf("expected pod affinity to be set") + return + } + + gotTermCount := len(tt.desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution) + if gotTermCount != tt.expectedTermCount { + t.Errorf("expected %d pod affinity terms, got %d", tt.expectedTermCount, gotTermCount) + } + + // Verify all synced terms have the scaleops label + for _, term := range tt.desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution { + if term.PodAffinityTerm.LabelSelector != nil { + hasScaleOpsLabel := false + for _, req := range term.PodAffinityTerm.LabelSelector.MatchExpressions { + if req.Key == "scaleops.sh/managed-unevictable" { + hasScaleOpsLabel = true + break + } + } + if !hasScaleOpsLabel { + if _, exists := term.PodAffinityTerm.LabelSelector.MatchLabels["scaleops.sh/managed-unevictable"]; !exists { + // This term should have been filtered out if it doesn't have scaleops label + // unless it came from the original desired pod + } + } + } + } + }) + } +} + +func TestSyncScaleOpsNodeAffinities(t *testing.T) { + tests := []struct { + name string + currentPod *corev1.Pod + desiredPod *corev1.Pod + expectedNodeAffinity bool + expectedTermCount int + }{ + { + name: "no affinity in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: false, + expectedTermCount: 0, + }, + { + name: "no node affinity in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{}, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: false, + expectedTermCount: 0, + }, + { + name: "node affinity with scaleops node-packing in MatchExpressions", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleops.sh/node-packing", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: true, + expectedTermCount: 1, + }, + { + name: "node affinity with scaleops node-packing in MatchFields", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 50, + Preference: corev1.NodeSelectorTerm{ + MatchFields: []corev1.NodeSelectorRequirement{ + { + Key: "scaleops.sh/node-packing", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: true, + expectedTermCount: 1, + }, + { + name: "node affinity with mixed terms, only scaleops node-packing should be synced", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "disktype", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"ssd"}, + }, + }, + }, + }, + { + Weight: 50, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleops.sh/node-packing", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectedNodeAffinity: true, + expectedTermCount: 1, + }, + { + name: "desired pod already has node affinity, scaleops affinity should be merged", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 100, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleops.sh/node-packing", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 80, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "disktype", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"ssd"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + expectedNodeAffinity: true, + expectedTermCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncScaleOpsNodeAffinities(tt.desiredPod, tt.currentPod) + + if !tt.expectedNodeAffinity { + if tt.desiredPod.Spec.Affinity != nil && tt.desiredPod.Spec.Affinity.NodeAffinity != nil { + t.Errorf("expected no node affinity, but got one") + } + return + } + + if tt.desiredPod.Spec.Affinity == nil || tt.desiredPod.Spec.Affinity.NodeAffinity == nil { + t.Errorf("expected node affinity to be set") + return + } + + gotTermCount := len(tt.desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution) + if gotTermCount != tt.expectedTermCount { + t.Errorf("expected %d node affinity terms, got %d", tt.expectedTermCount, gotTermCount) + } + }) + } +} + +func TestSyncScaleOpsAffinities(t *testing.T) { + tests := []struct { + name string + currentPod *corev1.Pod + desiredPod *corev1.Pod + expectPodAffinity bool + expectNodeAffinity bool + }{ + { + name: "no affinities in current pod", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectPodAffinity: false, + expectNodeAffinity: false, + }, + { + name: "both pod and node affinities with scaleops labels", + currentPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{ + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{ + { + Weight: 100, + PodAffinityTerm: corev1.PodAffinityTerm{ + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "scaleops.sh/managed-unevictable": "true", + }, + }, + TopologyKey: "kubernetes.io/hostname", + }, + }, + }, + }, + NodeAffinity: &corev1.NodeAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []corev1.PreferredSchedulingTerm{ + { + Weight: 50, + Preference: corev1.NodeSelectorTerm{ + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "scaleops.sh/node-packing", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"true"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + desiredPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + Spec: corev1.PodSpec{}, + }, + expectPodAffinity: true, + expectNodeAffinity: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncScaleOpsAffinities(tt.desiredPod, tt.currentPod) + + if tt.expectPodAffinity { + if tt.desiredPod.Spec.Affinity == nil || tt.desiredPod.Spec.Affinity.PodAffinity == nil { + t.Errorf("expected pod affinity to be set") + } + } else { + if tt.desiredPod.Spec.Affinity != nil && tt.desiredPod.Spec.Affinity.PodAffinity != nil { + if len(tt.desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution) > 0 { + t.Errorf("expected no pod affinity") + } + } + } + + if tt.expectNodeAffinity { + if tt.desiredPod.Spec.Affinity == nil || tt.desiredPod.Spec.Affinity.NodeAffinity == nil { + t.Errorf("expected node affinity to be set") + } + } else { + if tt.desiredPod.Spec.Affinity != nil && tt.desiredPod.Spec.Affinity.NodeAffinity != nil { + if len(tt.desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution) > 0 { + t.Errorf("expected no node affinity") + } + } + } + }) + } +} From 030ca424ed2fd9fb6a8c5b472a81386384b536cb Mon Sep 17 00:00:00 2001 From: Daniel Vaseekaran Date: Tue, 16 Jun 2026 14:47:33 -0400 Subject: [PATCH 2/2] Sync Scaleops Resource and Affinities --- api/v1beta1/kafkacluster_types.go | 4 ++-- charts/kafka-operator/crds/kafkaclusters.yaml | 1 + config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index 02166f53c..4d4c3b817 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -149,8 +149,8 @@ type KafkaClusterSpec struct { // the KafkaCluster to the current state in the Kubernetes Cluster // +kubebuilder:default=false // +optional - ScaleOpsEnabled bool `json:"scaleOpsEnabled"` - ListenersConfig ListenersConfig `json:"listenersConfig,omitempty"` + ScaleOpsEnabled bool `json:"scaleOpsEnabled,omitempty"` + ListenersConfig ListenersConfig `json:"listenersConfig"` // Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"` // ZKAddresses specifies the ZooKeeper connection string diff --git a/charts/kafka-operator/crds/kafkaclusters.yaml b/charts/kafka-operator/crds/kafkaclusters.yaml index 8505a6815..5be31977d 100644 --- a/charts/kafka-operator/crds/kafkaclusters.yaml +++ b/charts/kafka-operator/crds/kafkaclusters.yaml @@ -23074,6 +23074,7 @@ spec: - brokers - cruiseControlConfig - headlessServiceEnabled + - listenersConfig - oneBrokerPerNode - rollingUpgradeConfig type: object diff --git a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml index 8505a6815..5be31977d 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml @@ -23074,6 +23074,7 @@ spec: - brokers - cruiseControlConfig - headlessServiceEnabled + - listenersConfig - oneBrokerPerNode - rollingUpgradeConfig type: object