Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions pkg/resources/kafka/wait-for-envoy-sidecar.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
KAFKA_HOME=${KAFKA_HOME:-/opt/kafka}
WAIT_DIR=${WAIT_DIR:-/var/run/wait}

if [[ -n "$ENVOY_SIDECAR_STATUS" ]]; then
COUNT=0
MAXCOUNT=${1:-30}
Expand All @@ -27,14 +30,14 @@ if [[ -n "$ENVOY_SIDECAR_STATUS" ]]; then
fi
done
fi
touch /var/run/wait/do-not-exit-yet
touch ${WAIT_DIR}/do-not-exit-yet

# A few necessary steps if we are in KRaft mode
if [[ -n "${CLUSTER_ID}" ]]; then
# If the storage is already formatted (e.g. broker restarts), the kafka-storage.sh will skip formatting for that storage
# thus we can safely run the storage format command regardless if the storage has been formatted or not
echo "Formatting KRaft storage with cluster ID ${CLUSTER_ID}"
/opt/kafka/bin/kafka-storage.sh format --cluster-id "${CLUSTER_ID}" --ignore-formatted -c /config/broker-config
${KAFKA_HOME}/bin/kafka-storage.sh format --cluster-id "${CLUSTER_ID}" --ignore-formatted -c /config/broker-config

# Adding or removing controller nodes to the Kafka cluster would trigger cluster rolling upgrade so all the nodes in the cluster are aware of the newly added/removed controllers.
# When this happens, Kafka's local quorum state file would be outdated since it is static and the Kafka server can't be started with conflicting controllers info (compared to info stored in ConfigMap),
Expand All @@ -53,5 +56,11 @@ if [[ -n "${CLUSTER_ID}" ]]; then
fi
fi

/opt/kafka/bin/kafka-server-start.sh /config/broker-config
rm /var/run/wait/do-not-exit-yet
${KAFKA_HOME}/bin/kafka-server-start.sh /config/broker-config
KAFKA_EXIT=$?
rm ${WAIT_DIR}/do-not-exit-yet
# 143 = 128 + SIGTERM: controlled shutdown by koperator; not a Kafka crash.
if [ $KAFKA_EXIT -eq 143 ]; then
exit 0

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm - not sure about this.
e.g 137 can be an actual OOM/SIGKILL of the Java process, the wrapper exiting 0
makes the broker pod look Succeeded

Why no simply passing the exact value of $KAFKA_EXIT code?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests was failing. I'm trying this one (not sure it was a flaky or not 😬). I can of course revert

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amuraru tests are now green. It's a right concern so I changed my approach 🙏. Do you prefer to change tests instead ?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for a graceful shutdown and see whether the handling of sigterm signal is actually correctly handled by the operator?

fi
exit $KAFKA_EXIT
99 changes: 99 additions & 0 deletions pkg/resources/kafka/wait_for_envoy_sidecar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright © 2020 Cisco Systems, Inc. and/or its affiliates
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"fmt"
"os"
"os/exec"
"path/filepath"
"testing"
)

// TestWaitForEnvoySidecarExitCodes verifies that the wait-for-envoy-sidecar.sh script
// correctly translates exit code 143 (SIGTERM controlled shutdown) to 0, and propagates
// all other exit codes unchanged.
func TestWaitForEnvoySidecarExitCodes(t *testing.T) {
tests := []struct {
name string
kafkaExit int
expectedExit int
}{
{
name: "graceful shutdown (SIGTERM → exit 0)",
kafkaExit: 143,
expectedExit: 0,
},
{
name: "OOM kill (SIGKILL) propagated",
kafkaExit: 137,
expectedExit: 137,
},
{
name: "generic crash propagated",
kafkaExit: 1,
expectedExit: 1,
},
{
name: "normal exit propagated",
kafkaExit: 0,
expectedExit: 0,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tmp := t.TempDir()

// Create mock kafka-server-start.sh that exits with the desired code.
binDir := filepath.Join(tmp, "bin")
if err := os.MkdirAll(binDir, 0o755); err != nil {
t.Fatalf("create bin dir: %v", err)
}
stub := filepath.Join(binDir, "kafka-server-start.sh")
stubContent := fmt.Sprintf("#!/bin/bash\nexit %d\n", tc.kafkaExit)
if err := os.WriteFile(stub, []byte(stubContent), 0o755); err != nil {
t.Fatalf("write stub: %v", err)
}

waitDir := filepath.Join(tmp, "wait")
if err := os.MkdirAll(waitDir, 0o755); err != nil {
t.Fatalf("create wait dir: %v", err)
}

cmd := exec.Command("bash", "-c", envoySidecarScript)
cmd.Env = []string{
"KAFKA_HOME=" + tmp,
"WAIT_DIR=" + waitDir,
// Leave ENVOY_SIDECAR_STATUS and CLUSTER_ID unset to skip those blocks.
}

err := cmd.Run()

got := 0
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
got = exitErr.ExitCode()
} else {
t.Fatalf("unexpected error running script: %v", err)
}
}

if got != tc.expectedExit {
t.Errorf("kafka exit %d: want script exit %d, got %d", tc.kafkaExit, tc.expectedExit, got)
}
})
}
}
Loading