From 5e8f9abd5e52a99a515f12e2ff7f29f099421d3f Mon Sep 17 00:00:00 2001 From: DarkIsDude Date: Wed, 3 Jun 2026 15:17:34 +0200 Subject: [PATCH 1/4] =?UTF-8?q?=F0=9F=90=9B=20propagate=20Kafka=20broker?= =?UTF-8?q?=20exit=20code=20to=20pod=20phase?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The entrypoint script called `rm /var/run/wait/do-not-exit-yet` after kafka-server-start.sh, and since `rm` always exits 0, the broker pod would show phase Succeeded even after a crash (e.g. KafkaStorageException). Koperator would then recreate the pod with no backoff and no visible failure. Capture Kafka's exit code before the rm and exit with it, so a crashed broker produces pod phase Failed instead of Succeeded. --- pkg/resources/kafka/wait-for-envoy-sidecar.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/resources/kafka/wait-for-envoy-sidecar.sh b/pkg/resources/kafka/wait-for-envoy-sidecar.sh index 6063e03c6..8147d99f4 100644 --- a/pkg/resources/kafka/wait-for-envoy-sidecar.sh +++ b/pkg/resources/kafka/wait-for-envoy-sidecar.sh @@ -54,4 +54,6 @@ if [[ -n "${CLUSTER_ID}" ]]; then fi /opt/kafka/bin/kafka-server-start.sh /config/broker-config +KAFKA_EXIT=$? rm /var/run/wait/do-not-exit-yet +exit $KAFKA_EXIT From f1ac2b6db683b3106ae002f68a578427d6aaeb10 Mon Sep 17 00:00:00 2001 From: DarkIsDude Date: Fri, 5 Jun 2026 16:42:13 +0200 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=90=9B=20suppress=20signal-based=20ex?= =?UTF-8?q?it=20codes=20during=20controlled=20shutdowns?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When koperator performs a rolling upgrade, Kubernetes sends SIGTERM to the Kafka container. If Kafka's shutdown hook does not complete within the grace period, SIGKILL is sent instead. Both result in exit codes ≥ 128 (128 + signal number), which our propagation logic was marking as pod failures, causing an extra koperator reconcile cycle and pushing the multi-disk-removal E2E test over its 3-minute readiness window. Signal-based exits (≥ 128) are controlled shutdowns orchestrated by koperator, not genuine Kafka failures. Only exit codes 1–127 indicate a real Kafka crash (e.g. KafkaStorageException) and should set the pod phase to Failed. --- pkg/resources/kafka/wait-for-envoy-sidecar.sh | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/resources/kafka/wait-for-envoy-sidecar.sh b/pkg/resources/kafka/wait-for-envoy-sidecar.sh index 8147d99f4..954f755aa 100644 --- a/pkg/resources/kafka/wait-for-envoy-sidecar.sh +++ b/pkg/resources/kafka/wait-for-envoy-sidecar.sh @@ -56,4 +56,10 @@ fi /opt/kafka/bin/kafka-server-start.sh /config/broker-config KAFKA_EXIT=$? rm /var/run/wait/do-not-exit-yet +# Exit codes >= 128 mean the process was killed by a signal (128 + signal number). +# Signal-based exits are controlled shutdowns (e.g. SIGTERM during rolling upgrades), +# not genuine Kafka failures, so suppress them to avoid marking the pod as Failed. +if [ $KAFKA_EXIT -ge 128 ]; then + exit 0 +fi exit $KAFKA_EXIT From 393a148cf2c6ffc831caf47868adb5e5ef294e9a Mon Sep 17 00:00:00 2001 From: DarkIsDude Date: Fri, 5 Jun 2026 18:32:09 +0200 Subject: [PATCH 3/4] =?UTF-8?q?=F0=9F=90=9B=20suppress=20signal-based=20ex?= =?UTF-8?q?it=20codes=20during=20controlled=20shutdowns?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Only suppress exit code 143 (128+SIGTERM), which is sent by koperator during rolling upgrades. All other non-zero codes, including 137 (OOMKilled), are genuine failures and are propagated so the pod phase is set to Failed. --- pkg/resources/kafka/wait-for-envoy-sidecar.sh | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/resources/kafka/wait-for-envoy-sidecar.sh b/pkg/resources/kafka/wait-for-envoy-sidecar.sh index 954f755aa..6af18de77 100644 --- a/pkg/resources/kafka/wait-for-envoy-sidecar.sh +++ b/pkg/resources/kafka/wait-for-envoy-sidecar.sh @@ -56,10 +56,8 @@ fi /opt/kafka/bin/kafka-server-start.sh /config/broker-config KAFKA_EXIT=$? rm /var/run/wait/do-not-exit-yet -# Exit codes >= 128 mean the process was killed by a signal (128 + signal number). -# Signal-based exits are controlled shutdowns (e.g. SIGTERM during rolling upgrades), -# not genuine Kafka failures, so suppress them to avoid marking the pod as Failed. -if [ $KAFKA_EXIT -ge 128 ]; then +# 143 = 128 + SIGTERM: controlled shutdown by koperator; not a Kafka crash. +if [ $KAFKA_EXIT -eq 143 ]; then exit 0 fi exit $KAFKA_EXIT From e1e8c293bac9c04ee81a4a137855c6f31f85db2e Mon Sep 17 00:00:00 2001 From: DarkIsDude Date: Mon, 15 Jun 2026 11:08:08 +0200 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=A7=AA=20add=20graceful=20shutdown=20?= =?UTF-8?q?test=20for=20wait-for-envoy-sidecar.sh?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make KAFKA_HOME and WAIT_DIR configurable (defaulting to the current hardcoded values) so the script can be exercised in unit tests without a real Kafka installation. Add TestWaitForEnvoySidecarExitCodes covering: - exit 143 (SIGTERM) → 0 (controlled shutdown) - exit 137 (SIGKILL/OOM) → 137 - exit 1 (generic crash) → 1 - exit 0 (normal) → 0 --- pkg/resources/kafka/wait-for-envoy-sidecar.sh | 11 ++- .../kafka/wait_for_envoy_sidecar_test.go | 99 +++++++++++++++++++ 2 files changed, 106 insertions(+), 4 deletions(-) create mode 100644 pkg/resources/kafka/wait_for_envoy_sidecar_test.go diff --git a/pkg/resources/kafka/wait-for-envoy-sidecar.sh b/pkg/resources/kafka/wait-for-envoy-sidecar.sh index 6af18de77..5f0eb8189 100644 --- a/pkg/resources/kafka/wait-for-envoy-sidecar.sh +++ b/pkg/resources/kafka/wait-for-envoy-sidecar.sh @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # +KAFKA_HOME=${KAFKA_HOME:-/opt/kafka} +WAIT_DIR=${WAIT_DIR:-/var/run/wait} + if [[ -n "$ENVOY_SIDECAR_STATUS" ]]; then COUNT=0 MAXCOUNT=${1:-30} @@ -27,14 +30,14 @@ if [[ -n "$ENVOY_SIDECAR_STATUS" ]]; then fi done fi -touch /var/run/wait/do-not-exit-yet +touch ${WAIT_DIR}/do-not-exit-yet # A few necessary steps if we are in KRaft mode if [[ -n "${CLUSTER_ID}" ]]; then # If the storage is already formatted (e.g. broker restarts), the kafka-storage.sh will skip formatting for that storage # thus we can safely run the storage format command regardless if the storage has been formatted or not echo "Formatting KRaft storage with cluster ID ${CLUSTER_ID}" - /opt/kafka/bin/kafka-storage.sh format --cluster-id "${CLUSTER_ID}" --ignore-formatted -c /config/broker-config + ${KAFKA_HOME}/bin/kafka-storage.sh format --cluster-id "${CLUSTER_ID}" --ignore-formatted -c /config/broker-config # Adding or removing controller nodes to the Kafka cluster would trigger cluster rolling upgrade so all the nodes in the cluster are aware of the newly added/removed controllers. # When this happens, Kafka's local quorum state file would be outdated since it is static and the Kafka server can't be started with conflicting controllers info (compared to info stored in ConfigMap), @@ -53,9 +56,9 @@ if [[ -n "${CLUSTER_ID}" ]]; then fi fi -/opt/kafka/bin/kafka-server-start.sh /config/broker-config +${KAFKA_HOME}/bin/kafka-server-start.sh /config/broker-config KAFKA_EXIT=$? -rm /var/run/wait/do-not-exit-yet +rm ${WAIT_DIR}/do-not-exit-yet # 143 = 128 + SIGTERM: controlled shutdown by koperator; not a Kafka crash. if [ $KAFKA_EXIT -eq 143 ]; then exit 0 diff --git a/pkg/resources/kafka/wait_for_envoy_sidecar_test.go b/pkg/resources/kafka/wait_for_envoy_sidecar_test.go new file mode 100644 index 000000000..e8aae489e --- /dev/null +++ b/pkg/resources/kafka/wait_for_envoy_sidecar_test.go @@ -0,0 +1,99 @@ +// Copyright © 2020 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "testing" +) + +// TestWaitForEnvoySidecarExitCodes verifies that the wait-for-envoy-sidecar.sh script +// correctly translates exit code 143 (SIGTERM controlled shutdown) to 0, and propagates +// all other exit codes unchanged. +func TestWaitForEnvoySidecarExitCodes(t *testing.T) { + tests := []struct { + name string + kafkaExit int + expectedExit int + }{ + { + name: "graceful shutdown (SIGTERM → exit 0)", + kafkaExit: 143, + expectedExit: 0, + }, + { + name: "OOM kill (SIGKILL) propagated", + kafkaExit: 137, + expectedExit: 137, + }, + { + name: "generic crash propagated", + kafkaExit: 1, + expectedExit: 1, + }, + { + name: "normal exit propagated", + kafkaExit: 0, + expectedExit: 0, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tmp := t.TempDir() + + // Create mock kafka-server-start.sh that exits with the desired code. + binDir := filepath.Join(tmp, "bin") + if err := os.MkdirAll(binDir, 0o755); err != nil { + t.Fatalf("create bin dir: %v", err) + } + stub := filepath.Join(binDir, "kafka-server-start.sh") + stubContent := fmt.Sprintf("#!/bin/bash\nexit %d\n", tc.kafkaExit) + if err := os.WriteFile(stub, []byte(stubContent), 0o755); err != nil { + t.Fatalf("write stub: %v", err) + } + + waitDir := filepath.Join(tmp, "wait") + if err := os.MkdirAll(waitDir, 0o755); err != nil { + t.Fatalf("create wait dir: %v", err) + } + + cmd := exec.Command("bash", "-c", envoySidecarScript) + cmd.Env = []string{ + "KAFKA_HOME=" + tmp, + "WAIT_DIR=" + waitDir, + // Leave ENVOY_SIDECAR_STATUS and CLUSTER_ID unset to skip those blocks. + } + + err := cmd.Run() + + got := 0 + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + got = exitErr.ExitCode() + } else { + t.Fatalf("unexpected error running script: %v", err) + } + } + + if got != tc.expectedExit { + t.Errorf("kafka exit %d: want script exit %d, got %d", tc.kafkaExit, tc.expectedExit, got) + } + }) + } +}