Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (t *componentWorkloadTransformer) reconcileWorkload(synthesizedComp *compon
*protoITS.Spec.Selector = *runningITS.Spec.Selector
protoITS.Spec.Template.Labels = intctrlutil.MergeMetadataMaps(runningITS.Spec.Template.Labels, synthesizedComp.UserDefinedLabels)
}
injectMySQLBinlogPathCompatInitContainer(synthesizedComp, runningITS, protoITS)

buildInstanceSetPlacementAnnotation(comp, protoITS)

Expand All @@ -158,6 +159,164 @@ func (t *componentWorkloadTransformer) reconcileWorkload(synthesizedComp *compon
return nil
}

const (
mysqlBinlogPathCompatInitContainerName = "kb-compat-mysql-binlog-path"
mysqlBinlogPathCompatDataRoot = "/data/mysql"
mysqlBinlogPathCompatScript = `set -eu

data_root=${KB_COMPAT_MYSQL_DATA_ROOT:-/data/mysql}
old_index="$data_root/data/mysql-bin.index"
new_dir="$data_root/binlog"
new_index="$new_dir/mysql-bin.index"
marker="$new_dir/.kb-binlog-path-migrated"

[ -s "$old_index" ] || exit 0
mkdir -p "$new_dir"
[ ! -f "$marker" ] || exit 0

tmp_index="$new_index.kb-migrate.$$"
trap 'rm -f "$tmp_index"' EXIT
awk -v dir="$new_dir" '{
base=$0
sub(/^.*\//, "", base)
if (base != "") print dir "/" base
}' "$old_index" > "$tmp_index"

[ -s "$tmp_index" ] || exit 0
all_new_binlogs_exist=true
while IFS= read -r path || [ -n "$path" ]; do
[ -n "$path" ] || continue
if [ ! -s "$path" ]; then
all_new_binlogs_exist=false
break
fi
done < "$tmp_index"

if [ -s "$new_index" ] && cmp -s "$new_index" "$tmp_index" && [ "$all_new_binlogs_exist" = true ]; then
date > "$marker" 2>/dev/null || true
exit 0
fi

if [ -s "$new_index" ]; then
non_bootstrap=$(awk 'NF && $0 !~ /(^|\/)mysql-bin\.000001$/ { print; exit }' "$new_index")
if [ -n "$non_bootstrap" ]; then
echo "existing mysql binlog index is already beyond bootstrap; skip migration: $non_bootstrap"
date > "$marker" 2>/dev/null || true
exit 0
fi
fi

ts=$(date +%Y%m%d%H%M%S 2>/dev/null || echo now)
backup="$data_root/repair-binlog-path-$ts"
mkdir -p "$backup"
cp -a "$old_index" "$backup/mysql-bin.index.from-data"
cp -a "$new_index" "$backup/mysql-bin.index.from-binlog" 2>/dev/null || true

while IFS= read -r entry || [ -n "$entry" ]; do
[ -n "$entry" ] || continue
base=${entry##*/}
src="$data_root/data/$base"
dst="$new_dir/$base"
if [ ! -s "$src" ]; then
echo "missing source binlog: $src" >&2
exit 1
fi
if [ -e "$dst" ] && ! cmp -s "$src" "$dst"; then
mv "$dst" "$backup/$base.existing"
fi
[ -e "$dst" ] || cp -a "$src" "$dst"
done < "$old_index"

mv "$tmp_index" "$new_index"
chown -R mysql:mysql "$new_dir" 2>/dev/null || true
date > "$marker" 2>/dev/null || true
sync
echo "migrated mysql binlog index to $new_index"`
)

func injectMySQLBinlogPathCompatInitContainer(synthesizedComp *component.SynthesizedComponent, runningITS, protoITS *workloads.InstanceSet) {
if runningITS == nil || protoITS == nil {
return
}
spec := &protoITS.Spec.Template.Spec
if _, container := intctrlutil.GetContainerByName(spec.InitContainers, mysqlBinlogPathCompatInitContainerName); container != nil {
return
}
mysqlContainer := findMySQLBinlogPathCompatContainer(synthesizedComp, spec.Containers)
if mysqlContainer == nil {
return
}
dataMount, ok := findMySQLBinlogPathCompatDataMount(mysqlContainer.VolumeMounts)
if !ok {
return
}
initContainer := buildMySQLBinlogPathCompatInitContainer(mysqlContainer, dataMount)
if _, runningContainer := intctrlutil.GetContainerByName(runningITS.Spec.Template.Spec.Containers, mysqlContainer.Name); runningContainer != nil && runningContainer.Image != "" {
initContainer.Image = runningContainer.Image
initContainer.ImagePullPolicy = runningContainer.ImagePullPolicy
}
spec.InitContainers = append(spec.InitContainers, initContainer)
}

func findMySQLBinlogPathCompatContainer(synthesizedComp *component.SynthesizedComponent, containers []corev1.Container) *corev1.Container {
for i := range containers {
if isApeCloudMySQLImage(containers[i].Image) {
return &containers[i]
}
}
if !isApeCloudMySQLComponent(synthesizedComp) {
return nil
}
for i := range containers {
if containers[i].Name == "mysql" {
return &containers[i]
}
}
return nil
}

func isApeCloudMySQLImage(image string) bool {
image = strings.ToLower(image)
return strings.Contains(image, "apecloud-mysql-server")
}

func isApeCloudMySQLComponent(synthesizedComp *component.SynthesizedComponent) bool {
if synthesizedComp == nil {
return false
}
characterType := strings.ToLower(synthesizedComp.CharacterType)
compDefName := strings.ToLower(synthesizedComp.CompDefName)
clusterDefName := strings.ToLower(synthesizedComp.ClusterDefName)
return characterType == "wesql" || strings.Contains(compDefName, "apecloud-mysql") ||
strings.Contains(compDefName, "wesql") || strings.Contains(clusterDefName, "apecloud-mysql")
}

func findMySQLBinlogPathCompatDataMount(mounts []corev1.VolumeMount) (corev1.VolumeMount, bool) {
for _, mount := range mounts {
if strings.TrimRight(mount.MountPath, "/") == mysqlBinlogPathCompatDataRoot {
return mount, true
}
}
return corev1.VolumeMount{}, false
}

func buildMySQLBinlogPathCompatInitContainer(mysqlContainer *corev1.Container, dataMount corev1.VolumeMount) corev1.Container {
dataMount.MountPath = strings.TrimRight(dataMount.MountPath, "/")
return corev1.Container{
Name: mysqlBinlogPathCompatInitContainerName,
Image: mysqlContainer.Image,
ImagePullPolicy: mysqlContainer.ImagePullPolicy,
Command: []string{"/bin/sh", "-ec", mysqlBinlogPathCompatScript},
Env: []corev1.EnvVar{{
Name: "KB_COMPAT_MYSQL_DATA_ROOT",
Value: dataMount.MountPath,
}},
Resources: mysqlContainer.Resources,
SecurityContext: mysqlContainer.SecurityContext,
VolumeMounts: []corev1.VolumeMount{dataMount},
}
}

func isCompStopped(synthesizedComp *component.SynthesizedComponent) bool {
if synthesizedComp.Stop != nil && *synthesizedComp.Stop {
return true
Expand Down
82 changes: 82 additions & 0 deletions controllers/apps/transformer_component_workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,88 @@ func TestMemberJoinEnabled(t *testing.T) {
})
}

func TestInjectMySQLBinlogPathCompatInitContainer(t *testing.T) {
t.Run("injects for ApeCloud MySQL workloads", func(t *testing.T) {
runningITS := newBinlogCompatTestITS("mirror.local/apecloud/apecloud-mysql-server:8.0.30", corev1.PullIfNotPresent)
protoITS := runningITS.DeepCopy()
protoITS.Spec.Template.Spec.Containers = []corev1.Container{{
Name: "mysql",
Image: "docker.io/apecloud/apecloud-mysql-server:latest",
ImagePullPolicy: corev1.PullAlways,
VolumeMounts: []corev1.VolumeMount{{
Name: "data",
MountPath: "/data/mysql/",
}},
}}

injectMySQLBinlogPathCompatInitContainer(&component.SynthesizedComponent{
CharacterType: constant.MySQLCharacterType,
ClusterDefName: "apecloud-mysql",
}, runningITS, protoITS)

if len(protoITS.Spec.Template.Spec.InitContainers) != 1 {
t.Fatalf("expected one init container, got %d", len(protoITS.Spec.Template.Spec.InitContainers))
}
initContainer := protoITS.Spec.Template.Spec.InitContainers[0]
if initContainer.Name != mysqlBinlogPathCompatInitContainerName {
t.Fatalf("unexpected init container name: %s", initContainer.Name)
}
if initContainer.Image != "mirror.local/apecloud/apecloud-mysql-server:8.0.30" {
t.Fatalf("expected running image to be reused, got %s", initContainer.Image)
}
if initContainer.ImagePullPolicy != corev1.PullIfNotPresent {
t.Fatalf("expected running image pull policy to be reused, got %s", initContainer.ImagePullPolicy)
}
if len(initContainer.Command) != 3 || initContainer.Command[2] != mysqlBinlogPathCompatScript {
t.Fatalf("unexpected init command: %#v", initContainer.Command)
}
if len(initContainer.Env) != 1 || initContainer.Env[0].Name != "KB_COMPAT_MYSQL_DATA_ROOT" || initContainer.Env[0].Value != "/data/mysql" {
t.Fatalf("unexpected env: %#v", initContainer.Env)
}
if len(initContainer.VolumeMounts) != 1 || initContainer.VolumeMounts[0].Name != "data" || initContainer.VolumeMounts[0].MountPath != "/data/mysql" {
t.Fatalf("unexpected volume mounts: %#v", initContainer.VolumeMounts)
}

injectMySQLBinlogPathCompatInitContainer(&component.SynthesizedComponent{}, runningITS, protoITS)
if len(protoITS.Spec.Template.Spec.InitContainers) != 1 {
t.Fatalf("expected injection to be idempotent, got %d init containers", len(protoITS.Spec.Template.Spec.InitContainers))
}
})

t.Run("skips non ApeCloud MySQL workloads", func(t *testing.T) {
runningITS := newBinlogCompatTestITS("mysql:8.0", corev1.PullIfNotPresent)
protoITS := runningITS.DeepCopy()
protoITS.Spec.Template.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{{
Name: "data",
MountPath: "/data/mysql",
}}

injectMySQLBinlogPathCompatInitContainer(&component.SynthesizedComponent{
CharacterType: constant.MySQLCharacterType,
}, runningITS, protoITS)

if len(protoITS.Spec.Template.Spec.InitContainers) != 0 {
t.Fatalf("expected no init containers, got %#v", protoITS.Spec.Template.Spec.InitContainers)
}
})
}

func newBinlogCompatTestITS(image string, pullPolicy corev1.PullPolicy) *workloads.InstanceSet {
return &workloads.InstanceSet{
Spec: workloads.InstanceSetSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "mysql",
Image: image,
ImagePullPolicy: pullPolicy,
}},
},
},
},
}
}

func TestDetectPodsToMemberJoin(t *testing.T) {
t.Run("no leader present", func(t *testing.T) {
op := newMemberJoinOps(3, []workloads.MemberStatus{{PodName: "pod-0"}}, "pod-0", "pod-1")
Expand Down
Loading