Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,15 @@ type KafkaClusterSpec struct {
// This is default to be true; if set to false, the Kafka cluster is in ZooKeeper mode.
// +kubebuilder:default=false
// +optional
KRaftMode bool `json:"kRaft"`
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
KRaftMode bool `json:"kRaft"`
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
// Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods.
// This Disables CPU and Memory request reconciliation from the desired state defined in
// the KafkaCluster to the current state in the Kubernetes Cluster
// +kubebuilder:default=false
// +optional
ScaleOpsEnabled bool `json:"scaleOpsEnabled,omitempty"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
// Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint
AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"`
// ZKAddresses specifies the ZooKeeper connection string
Expand Down
7 changes: 7 additions & 0 deletions charts/kafka-operator/crds/kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23002,6 +23002,13 @@ spec:
required:
- failureThreshold
type: object
scaleOpsEnabled:
default: false
description: |-
Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods.
This Disables CPU and Memory request reconciliation from the desired state defined in
the KafkaCluster to the current state in the Kubernetes Cluster
type: boolean
taintedBrokersSelector:
description: Selector for broker pods that need to be recycled/reconciled
properties:
Expand Down
7 changes: 7 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23002,6 +23002,13 @@ spec:
required:
- failureThreshold
type: object
scaleOpsEnabled:
default: false
description: |-
Allows ScaleOps to manage Memory and CPU Resource Requests for Kafka Broker Pods.
This Disables CPU and Memory request reconciliation from the desired state defined in
the KafkaCluster to the current state in the Kubernetes Cluster
type: boolean
taintedBrokersSelector:
description: Selector for broker pods that need to be recycled/reconciled
properties:
Expand Down
8 changes: 8 additions & 0 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,14 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
}
desiredPod.Spec.Tolerations = uniqueTolerations
}
if r.KafkaCluster.Spec.ScaleOpsEnabled {
// if resources requets are updated by scale ops, we need to sync them to desiredPod,
// otherwise they will be removed and cause pod restart
syncResourceRequests(desiredPod, currentPod)
// If current pod had affinities created by ScaleOps, we need to sync them to desiredPod,
// otherwise they will be removed and cause pod restart
syncScaleOpsAffinities(desiredPod, currentPod)
}
// Check if the resource actually updated or if labels match TaintedBrokersSelector
patchResult, err := patch.DefaultPatchMaker.Calculate(currentPod, desiredPod)
switch {
Expand Down
188 changes: 188 additions & 0 deletions pkg/resources/kafka/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import (
"encoding/base64"
"fmt"
"reflect"
"sort"

"github.com/google/uuid"
corev1 "k8s.io/api/core/v1"

"github.com/banzaicloud/koperator/api/v1beta1"
)
Expand Down Expand Up @@ -73,3 +75,189 @@
randomUUID := uuid.New()
return base64.URLEncoding.EncodeToString(randomUUID[:])
}

// syncResourceRequests overwrites CPU and memory requests in desiredPod's containers
// with the values from currentPod so that request-only changes do not trigger a pod restart.
func syncResourceRequests(desiredPod, currentPod *corev1.Pod) {
syncContainerResourceRequests(desiredPod.Spec.Containers, currentPod.Spec.Containers)
syncContainerResourceRequests(desiredPod.Spec.InitContainers, currentPod.Spec.InitContainers)
}

func syncContainerResourceRequests(desired, current []corev1.Container) {
index := make(map[string]corev1.ResourceList, len(current))
for _, c := range current {
index[c.Name] = c.Resources.Requests
}
for i := range desired {
c := &desired[i]
reqs, ok := index[c.Name]
if !ok {
continue
}
if c.Resources.Requests == nil {
c.Resources.Requests = make(corev1.ResourceList)
}
for _, res := range []corev1.ResourceName{corev1.ResourceCPU, corev1.ResourceMemory} {
if val, exists := reqs[res]; exists {
c.Resources.Requests[res] = val
} else {
delete(c.Resources.Requests, res)
}
}
}
}

// syncScaleOpsAffinities syncs all scale ops related affinities from the current pod to the desired pod.
// This includes pod affinities with "scaleops.sh/managed-unevictable" label selector
// and node affinities with "scaleops.sh/node-packing=true" selector.
func syncScaleOpsAffinities(desiredPod, currentPod *corev1.Pod) {
syncScaleOpsPodAffinities(desiredPod, currentPod)
syncScaleOpsNodeAffinities(desiredPod, currentPod)
}

// syncScaleOpsPodAffinities syncs preferred pod affinities with "scaleops.sh/managed-unevictable"
// label selector from current pod to desired pod.
func syncScaleOpsPodAffinities(desiredPod, currentPod *corev1.Pod) {
if currentPod.Spec.Affinity == nil || currentPod.Spec.Affinity.PodAffinity == nil {
return
}

currentPodAffinity := currentPod.Spec.Affinity.PodAffinity

// Filter preferred pod affinities with "scaleops.sh/managed-unevictable" label selector
var scaleOpsPreferredAffinities []corev1.WeightedPodAffinityTerm
if currentPodAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
for _, term := range currentPodAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
if term.PodAffinityTerm.LabelSelector != nil {
hasScaleOpsLabel := false

// Check MatchExpressions
for _, requirement := range term.PodAffinityTerm.LabelSelector.MatchExpressions {
if requirement.Key == "scaleops.sh/managed-unevictable" {

Check failure on line 136 in pkg/resources/kafka/util.go

View workflow job for this annotation

GitHub Actions / Build

string `scaleops.sh/managed-unevictable` has 7 occurrences, make it a constant (goconst)
hasScaleOpsLabel = true
break
}
}

// Check MatchLabels if not found in MatchExpressions
if !hasScaleOpsLabel {
if _, exists := term.PodAffinityTerm.LabelSelector.MatchLabels["scaleops.sh/managed-unevictable"]; exists {
hasScaleOpsLabel = true
}
}

if hasScaleOpsLabel {
scaleOpsPreferredAffinities = append(scaleOpsPreferredAffinities, term)
}
}
}
}

// If we found any scale ops preferred affinities, add them to the desired pod
if len(scaleOpsPreferredAffinities) > 0 {
if desiredPod.Spec.Affinity == nil {
desiredPod.Spec.Affinity = &corev1.Affinity{}
}
if desiredPod.Spec.Affinity.PodAffinity == nil {
desiredPod.Spec.Affinity.PodAffinity = &corev1.PodAffinity{}
}

// Merge scale ops preferred affinities, avoiding duplicates
existingTerms := desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution
for _, newTerm := range scaleOpsPreferredAffinities {
// Check if this term already exists
found := false
for _, existing := range existingTerms {
if reflect.DeepEqual(existing.PodAffinityTerm, newTerm.PodAffinityTerm) && existing.Weight == newTerm.Weight {
found = true
break
}
}
if !found {
existingTerms = append(existingTerms, newTerm)
}
}
desiredPod.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = existingTerms
}
}

// syncScaleOpsNodeAffinities syncs preferred node affinities with "scaleops.sh/node-packing=true"
// selector from current pod to desired pod.
func syncScaleOpsNodeAffinities(desiredPod, currentPod *corev1.Pod) {
if currentPod.Spec.Affinity == nil || currentPod.Spec.Affinity.NodeAffinity == nil {
return
}

currentNodeAffinity := currentPod.Spec.Affinity.NodeAffinity

// Filter preferred node affinities with "scaleops.sh/node-packing=true" selector
var scaleOpsPreferredTerms []corev1.PreferredSchedulingTerm
if currentNodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
for _, term := range currentNodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
hasScaleOpsNodePacking := false

// Check MatchExpressions
for _, requirement := range term.Preference.MatchExpressions {
if requirement.Key == "scaleops.sh/node-packing" {

Check failure on line 201 in pkg/resources/kafka/util.go

View workflow job for this annotation

GitHub Actions / Build

string `scaleops.sh/node-packing` has 7 occurrences, make it a constant (goconst)
for _, val := range requirement.Values {
if val == "true" {

Check failure on line 203 in pkg/resources/kafka/util.go

View workflow job for this annotation

GitHub Actions / Build

string `true` has 14 occurrences, but such constant `configValueTrue` already exists (goconst)
hasScaleOpsNodePacking = true
break
}
}
if hasScaleOpsNodePacking {
break
}
}
}

// Check MatchFields if not found in MatchExpressions
if !hasScaleOpsNodePacking {
for _, requirement := range term.Preference.MatchFields {
if requirement.Key == "scaleops.sh/node-packing" {
for _, val := range requirement.Values {
if val == "true" {
hasScaleOpsNodePacking = true
break
}
}
if hasScaleOpsNodePacking {
break
}
}
}
}

if hasScaleOpsNodePacking {
scaleOpsPreferredTerms = append(scaleOpsPreferredTerms, term)
}
}
}

// If we found any scale ops node affinities, add them to the desired pod
if len(scaleOpsPreferredTerms) > 0 {
if desiredPod.Spec.Affinity == nil {
desiredPod.Spec.Affinity = &corev1.Affinity{}
}
if desiredPod.Spec.Affinity.NodeAffinity == nil {
desiredPod.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{}
}

// Merge scale ops node affinities, avoiding duplicates
existingTerms := desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution
for _, newTerm := range scaleOpsPreferredTerms {
// Check if this term already exists
found := false
for _, existing := range existingTerms {
if reflect.DeepEqual(existing.Preference, newTerm.Preference) && existing.Weight == newTerm.Weight {
found = true
break
}
}
if !found {
existingTerms = append(existingTerms, newTerm)
}
}
desiredPod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution = existingTerms
}
}
Loading
Loading