diff --git a/e2e-tests/data/bluegreen-abort-ignore-drift.yaml b/e2e-tests/data/bluegreen-abort-ignore-drift.yaml new file mode 100644 index 0000000000..49bbc02607 --- /dev/null +++ b/e2e-tests/data/bluegreen-abort-ignore-drift.yaml @@ -0,0 +1,55 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: flink.apache.org/v1beta1 +kind: FlinkBlueGreenDeployment +metadata: + name: bg-abort-ignore-drift +spec: + configuration: + kubernetes.operator.bluegreen.deployment-deletion.delay: "2s" + # Short abort grace period so the test doesn't wait too long + kubernetes.operator.bluegreen.abort.grace-period: "30s" + kubernetes.operator.bluegreen.reconciliation.reschedule-interval: "5s" + template: + spec: + image: flink:1.20 + flinkVersion: v1_20 + flinkConfiguration: + rest.port: "8081" + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + jobManager: + resource: + memory: 1G + cpu: 1 + taskManager: + resource: + memory: 2G + cpu: 1 + job: + jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar + parallelism: 1 + entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample + args: + - "--error-rate" + - "0.15" + - "--sleep" + - "30" + upgradeMode: stateless + mode: native diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentStatus.java index 104f195efa..6e66c6805a 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentStatus.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentStatus.java @@ -44,6 +44,12 @@ public class FlinkBlueGreenDeploymentStatus { /** Last reconciled (serialized) deployment spec. */ private String lastReconciledSpec; + /** + * Last stable deployment spec. Tracks the last known-good spec where the active child was + * healthy. Restored to lastReconciledSpec when a transition is aborted. + */ + private String lastStableSpec; + /** Timestamp of last reconciliation. */ private String lastReconciledTimestamp; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java index 9543fecabd..6e8fdb9d1f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java @@ -57,7 +57,10 @@ import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.instantStrToMillis; import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.isSavepointRequired; import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.millisToInstantStr; +import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.markLastReconciledSpecAsStable; import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.prepareFlinkDeployment; +import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.reconcileSpecAsStable; +import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.restoreLastStableSpec; import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.setLastReconciledSpec; import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.triggerSavepoint; @@ -113,6 +116,11 @@ public UpdateControl initiateDeployment( public UpdateControl checkAndInitiateDeployment( BlueGreenContext context, BlueGreenDeploymentType currentBlueGreenDeploymentType) { + if (context.getDeploymentStatus().getLastStableSpec() == null + && context.getDeploymentStatus().getLastReconciledSpec() != null) { + markLastReconciledSpecAsStable(context); + } + BlueGreenDiffType specDiff = getSpecDiff(context); if (specDiff != BlueGreenDiffType.IGNORE) { @@ -120,7 +128,7 @@ public UpdateControl checkAndInitiateDeployment( context.getDeploymentByType(currentBlueGreenDeploymentType); if (specDiff == BlueGreenDiffType.SUSPEND && currentFlinkDeployment != null) { - setLastReconciledSpec(context); + reconcileSpecAsStable(context); LOG.info( "In-place suspension for '{}'", currentFlinkDeployment.getMetadata().getName()); @@ -128,7 +136,7 @@ public UpdateControl checkAndInitiateDeployment( } if (specDiff == BlueGreenDiffType.RESUME && currentFlinkDeployment != null) { - setLastReconciledSpec(context); + reconcileSpecAsStable(context); LOG.info( "In-place resume for '{}'", currentFlinkDeployment.getMetadata().getName()); return patchFlinkDeployment(context, currentBlueGreenDeploymentType); @@ -136,7 +144,7 @@ public UpdateControl checkAndInitiateDeployment( // Check if child is currently suspended - if so, just patch specs without restart if (isChildSuspended(currentFlinkDeployment)) { - setLastReconciledSpec(context); + reconcileSpecAsStable(context); LOG.info( "Spec change while suspended for '{}'", currentFlinkDeployment.getMetadata().getName()); @@ -168,6 +176,7 @@ public UpdateControl checkAndInitiateDeployment( context, currentBlueGreenDeploymentType, currentFlinkDeployment); } catch (Exception e) { var error = "Could not start Transition. Details: " + e.getMessage(); + restoreLastStableSpec(context); context.getDeploymentStatus().setSavepointTriggerId(null); return markDeploymentFailing(context, error); } @@ -188,10 +197,11 @@ public UpdateControl checkAndInitiateDeployment( var error = "Could not start Savepoint Redeploy Transition. Details: " + e.getMessage(); + restoreLastStableSpec(context); return markDeploymentFailing(context, error); } } else { - setLastReconciledSpec(context); + reconcileSpecAsStable(context); LOG.info( "Patching FlinkDeployment '{}' during checkAndInitiateDeployment", currentFlinkDeployment.getMetadata().getName()); @@ -509,6 +519,7 @@ private UpdateControl finalizeSuspendedDeployment( context.getDeploymentStatus().setDeploymentReadyTimestamp(millisToInstantStr(0)); context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0)); context.getDeploymentStatus().setSavepointTriggerId(null); + markLastReconciledSpecAsStable(context); return patchStatusUpdateControl(context, nextState, JobStatus.SUSPENDED, null) .rescheduleAfter(0); @@ -685,6 +696,7 @@ private UpdateControl abortDeployment( FlinkBlueGreenDeploymentState previousState = getPreviousState(nextState, context.getDeployments()); context.getDeploymentStatus().setBlueGreenState(previousState); + restoreLastStableSpec(context); var error = String.format( @@ -733,6 +745,7 @@ public UpdateControl finalizeBlueGreenDeployment( context.getDeploymentStatus().setDeploymentReadyTimestamp(millisToInstantStr(0)); context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0)); context.getDeploymentStatus().setSavepointTriggerId(null); + markLastReconciledSpecAsStable(context); updateBlueGreenIngress(context, nextState); diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java index af3ccbd65c..47e79aeec5 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java @@ -94,6 +94,36 @@ public static void setLastReconciledSpec(BlueGreenContext context) { deploymentStatus.setLastReconciledTimestamp(Instant.now().toString()); } + /** + * Sets lastReconciledSpec AND marks it as stable. Use for non-transition operations + * (PATCH_CHILD, SUSPEND, RESUME, etc.) where the active child absorbs the change immediately. + */ + public static void reconcileSpecAsStable(BlueGreenContext context) { + setLastReconciledSpec(context); + markLastReconciledSpecAsStable(context); + } + + /** + * Promotes the current lastReconciledSpec to lastStableSpec. Use at transition finalization + * when the new child is confirmed healthy. + */ + public static void markLastReconciledSpecAsStable(BlueGreenContext context) { + context.getDeploymentStatus() + .setLastStableSpec(context.getDeploymentStatus().getLastReconciledSpec()); + } + + /** + * Restores lastReconciledSpec from lastStableSpec. Use when a transition is aborted or fails + * to start, so that the BG controller's view of the reconciled spec matches the active child. + */ + public static void restoreLastStableSpec(BlueGreenContext context) { + String stableSpec = context.getDeploymentStatus().getLastStableSpec(); + if (stableSpec != null) { + context.getDeploymentStatus().setLastReconciledSpec(stableSpec); + LOG.info("Restored lastReconciledSpec from lastStableSpec after transition failure"); + } + } + public static void revertToLastSpec(BlueGreenContext context) { context.getBgDeployment() .setSpec( diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java index 528b61727d..537e399e6b 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java @@ -1540,6 +1540,164 @@ private static FlinkBlueGreenDeploymentSpec getTestFlinkDeploymentSpec(FlinkVers return new FlinkBlueGreenDeploymentSpec(configuration, null, flinkDeploymentTemplateSpec); } + // ==================== Abort + Ignore-field Drift Tests ==================== + + /** + * Reproduces the bug where lastReconciledSpec is not reverted during abort, causing a + * subsequent IGNORE-only spec change to trigger an unintended in-place restart of the active + * child. + * + *

Scenario: + * + *

    + *
  1. Blue is ACTIVE with spec v1 + *
  2. User applies v2 (UPGRADE-level change via custom-configuration-field) → transition to + * Green starts + *
  3. Green fails to start → abort grace period expires → abort rolls back to ACTIVE_BLUE + *
  4. lastReconciledSpec is now v2 (the bug: it should be v1) + *
  5. User applies v3 that differs from v2 only in IGNORE-level fields (e.g., + * kubernetes.operator.reconcile.interval) + *
  6. BG controller diffs lastReconciledSpec(v2) vs v3 → PATCH_CHILD (only IGNORE fields + * changed) + *
  7. PATCH_CHILD pushes v3 to Blue child, but Blue's FDC lastReconciledSpec is still v1 + *
  8. FDC diffs v1 vs v3 → sees UPGRADE-level changes from the original v1→v2 transition → + * unintended in-place restart + *
+ */ + @ParameterizedTest + @MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions") + public void verifyIgnoreFieldChangeAfterAbortCausesUnintendedRestart(FlinkVersion flinkVersion) + throws Exception { + // ---- Phase 1: Deploy Blue with v1 spec ---- + var blueGreenDeployment = + buildSessionCluster( + TEST_DEPLOYMENT_NAME, + TEST_NAMESPACE, + flinkVersion, + null, + UpgradeMode.STATELESS); + + var abortGracePeriodMs = 1200; + var reconciliationReschedulingIntervalMs = 3000; + Map configuration = blueGreenDeployment.getSpec().getConfiguration(); + configuration.put(ABORT_GRACE_PERIOD.key(), String.valueOf(abortGracePeriodMs)); + configuration.put( + RECONCILIATION_RESCHEDULING_INTERVAL.key(), + String.valueOf(reconciliationReschedulingIntervalMs)); + + var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, false, null); + + assertEquals( + FlinkBlueGreenDeploymentState.ACTIVE_BLUE, rs.reconciledStatus.getBlueGreenState()); + assertEquals(1, getFlinkDeployments().size()); + + // Capture Blue child's FDC lastReconciledSpec (v1) + var blueChild = getFlinkDeployments().get(0); + var blueChildLastReconciledSpecV1 = + blueChild.getStatus().getReconciliationStatus().getLastReconciledSpec(); + assertNotNull(blueChildLastReconciledSpecV1, "Blue child should have lastReconciledSpec"); + + // Capture BG lastReconciledSpec (v1) + var bgLastReconciledSpecV1 = rs.reconciledStatus.getLastReconciledSpec(); + assertNotNull(bgLastReconciledSpecV1, "BG should have lastReconciledSpec after deploy"); + + // ---- Phase 2: Apply v2 (UPGRADE-level change) → trigger transition to Green ---- + String v2CustomValue = UUID.randomUUID().toString(); + simulateChangeInSpec(rs.deployment, v2CustomValue, 0, null); + + rs = reconcile(rs.deployment); + assertEquals( + FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN, + rs.reconciledStatus.getBlueGreenState()); + assertEquals(2, getFlinkDeployments().size()); + + // BG lastReconciledSpec is now v2 + var bgLastReconciledSpecV2 = rs.reconciledStatus.getLastReconciledSpec(); + assertNotEquals( + bgLastReconciledSpecV1, + bgLastReconciledSpecV2, + "BG lastReconciledSpec should have changed to v2"); + + // ---- Phase 3: Green fails to start → abort ---- + // Simulating the Green deployment doesn't start (status remains unchanged) + Long reschedDelayMs = 0L; + for (int i = 0; i < 2; i++) { + rs = reconcile(rs.deployment); + assertTrue(rs.updateControl.isPatchStatus()); + reschedDelayMs = rs.updateControl.getScheduleDelay().get(); + } + + // Wait for abort grace period to expire + Thread.sleep(reschedDelayMs); + + // Abort should fire + rs = reconcile(rs.deployment); + assertTrue(rs.updateControl.isPatchStatus()); + assertFailingJobStatus(rs); + assertEquals( + FlinkBlueGreenDeploymentState.ACTIVE_BLUE, + rs.reconciledStatus.getBlueGreenState(), + "Should roll back to ACTIVE_BLUE after abort"); + + // After abort, lastReconciledSpec SHOULD be reverted to match what the active + // Blue child is actually running (v1). This ensures subsequent diffs are correct. + var bgLastReconciledSpecAfterAbort = rs.reconciledStatus.getLastReconciledSpec(); + assertEquals( + bgLastReconciledSpecV1, + bgLastReconciledSpecAfterAbort, + "After abort, BG lastReconciledSpec should be reverted to v1 (the active " + + "child's spec), not left as v2 (the failed transition spec)"); + + // Verify Blue child is still running v1 (unchanged by the abort) + var flinkDeployments = getFlinkDeployments(); + blueChild = + flinkDeployments.stream() + .filter(fd -> fd.getMetadata().getName().endsWith("-blue")) + .findFirst() + .orElseThrow(); + assertEquals( + JobStatus.RUNNING, + blueChild.getStatus().getJobStatus().getState(), + "Blue child should still be RUNNING"); + assertEquals( + blueChildLastReconciledSpecV1, + blueChild.getStatus().getReconciliationStatus().getLastReconciledSpec(), + "Blue child FDC lastReconciledSpec should still be v1"); + + // ---- Phase 4: Apply v3 with ONLY IGNORE-level field changes relative to v2 ---- + // Change only kubernetes.operator.* config (DiffType.IGNORE) — no UPGRADE-level changes + FlinkDeploymentSpec spec = rs.deployment.getSpec().getTemplate().getSpec(); + spec.getFlinkConfiguration().put("kubernetes.operator.reconcile.interval", "999 SECONDS"); + rs.deployment.getSpec().getTemplate().setSpec(spec); + kubernetesClient.resource(rs.deployment).createOrReplace(); + + // ---- Phase 5: Reconcile — with correct lastReconciledSpec (v1), the BG controller + // should detect the UPGRADE-level changes from v1→v3 and trigger a proper TRANSITION, + // NOT a PATCH_CHILD that would cause an unintended in-place restart. ---- + rs = reconcile(rs.deployment); + + // With lastReconciledSpec correctly reverted to v1, the diff v1→v3 includes + // UPGRADE-level changes (custom-configuration-field). This should trigger a full + // blue-green TRANSITION, not a PATCH_CHILD. + assertEquals( + FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN, + rs.reconciledStatus.getBlueGreenState(), + "v1→v3 contains UPGRADE-level changes (custom-configuration-field from v2), " + + "so BG controller should trigger a TRANSITION, not PATCH_CHILD"); + + // Verify Green was created for the transition (2 deployments = transition in progress) + flinkDeployments = getFlinkDeployments(); + assertEquals( + 2, + flinkDeployments.stream() + .filter( + fd -> + fd.getMetadata().getName().endsWith("-blue") + || fd.getMetadata().getName().endsWith("-green")) + .count(), + "A proper transition should create a Green deployment alongside Blue"); + } + // ==================== Ingress Helper Methods ==================== private void assertIngressPointsToService(String expectedServiceName) {