diff --git a/pkg/resources/kafka/wait-for-envoy-sidecar.sh b/pkg/resources/kafka/wait-for-envoy-sidecar.sh index 6063e03c6..5f0eb8189 100644 --- a/pkg/resources/kafka/wait-for-envoy-sidecar.sh +++ b/pkg/resources/kafka/wait-for-envoy-sidecar.sh @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # +KAFKA_HOME=${KAFKA_HOME:-/opt/kafka} +WAIT_DIR=${WAIT_DIR:-/var/run/wait} + if [[ -n "$ENVOY_SIDECAR_STATUS" ]]; then COUNT=0 MAXCOUNT=${1:-30} @@ -27,14 +30,14 @@ if [[ -n "$ENVOY_SIDECAR_STATUS" ]]; then fi done fi -touch /var/run/wait/do-not-exit-yet +touch ${WAIT_DIR}/do-not-exit-yet # A few necessary steps if we are in KRaft mode if [[ -n "${CLUSTER_ID}" ]]; then # If the storage is already formatted (e.g. broker restarts), the kafka-storage.sh will skip formatting for that storage # thus we can safely run the storage format command regardless if the storage has been formatted or not echo "Formatting KRaft storage with cluster ID ${CLUSTER_ID}" - /opt/kafka/bin/kafka-storage.sh format --cluster-id "${CLUSTER_ID}" --ignore-formatted -c /config/broker-config + ${KAFKA_HOME}/bin/kafka-storage.sh format --cluster-id "${CLUSTER_ID}" --ignore-formatted -c /config/broker-config # Adding or removing controller nodes to the Kafka cluster would trigger cluster rolling upgrade so all the nodes in the cluster are aware of the newly added/removed controllers. # When this happens, Kafka's local quorum state file would be outdated since it is static and the Kafka server can't be started with conflicting controllers info (compared to info stored in ConfigMap), @@ -53,5 +56,11 @@ if [[ -n "${CLUSTER_ID}" ]]; then fi fi -/opt/kafka/bin/kafka-server-start.sh /config/broker-config -rm /var/run/wait/do-not-exit-yet +${KAFKA_HOME}/bin/kafka-server-start.sh /config/broker-config +KAFKA_EXIT=$? +rm ${WAIT_DIR}/do-not-exit-yet +# 143 = 128 + SIGTERM: controlled shutdown by koperator; not a Kafka crash. +if [ $KAFKA_EXIT -eq 143 ]; then + exit 0 +fi +exit $KAFKA_EXIT diff --git a/pkg/resources/kafka/wait_for_envoy_sidecar_test.go b/pkg/resources/kafka/wait_for_envoy_sidecar_test.go new file mode 100644 index 000000000..e8aae489e --- /dev/null +++ b/pkg/resources/kafka/wait_for_envoy_sidecar_test.go @@ -0,0 +1,99 @@ +// Copyright © 2020 Cisco Systems, Inc. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "testing" +) + +// TestWaitForEnvoySidecarExitCodes verifies that the wait-for-envoy-sidecar.sh script +// correctly translates exit code 143 (SIGTERM controlled shutdown) to 0, and propagates +// all other exit codes unchanged. +func TestWaitForEnvoySidecarExitCodes(t *testing.T) { + tests := []struct { + name string + kafkaExit int + expectedExit int + }{ + { + name: "graceful shutdown (SIGTERM → exit 0)", + kafkaExit: 143, + expectedExit: 0, + }, + { + name: "OOM kill (SIGKILL) propagated", + kafkaExit: 137, + expectedExit: 137, + }, + { + name: "generic crash propagated", + kafkaExit: 1, + expectedExit: 1, + }, + { + name: "normal exit propagated", + kafkaExit: 0, + expectedExit: 0, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tmp := t.TempDir() + + // Create mock kafka-server-start.sh that exits with the desired code. + binDir := filepath.Join(tmp, "bin") + if err := os.MkdirAll(binDir, 0o755); err != nil { + t.Fatalf("create bin dir: %v", err) + } + stub := filepath.Join(binDir, "kafka-server-start.sh") + stubContent := fmt.Sprintf("#!/bin/bash\nexit %d\n", tc.kafkaExit) + if err := os.WriteFile(stub, []byte(stubContent), 0o755); err != nil { + t.Fatalf("write stub: %v", err) + } + + waitDir := filepath.Join(tmp, "wait") + if err := os.MkdirAll(waitDir, 0o755); err != nil { + t.Fatalf("create wait dir: %v", err) + } + + cmd := exec.Command("bash", "-c", envoySidecarScript) + cmd.Env = []string{ + "KAFKA_HOME=" + tmp, + "WAIT_DIR=" + waitDir, + // Leave ENVOY_SIDECAR_STATUS and CLUSTER_ID unset to skip those blocks. + } + + err := cmd.Run() + + got := 0 + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + got = exitErr.ExitCode() + } else { + t.Fatalf("unexpected error running script: %v", err) + } + } + + if got != tc.expectedExit { + t.Errorf("kafka exit %d: want script exit %d, got %d", tc.kafkaExit, tc.expectedExit, got) + } + }) + } +}