Skip to content
Draft

WIP #21

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions e2e-tests/data/bluegreen-abort-ignore-drift.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

apiVersion: flink.apache.org/v1beta1
kind: FlinkBlueGreenDeployment
metadata:
name: bg-abort-ignore-drift
spec:
configuration:
kubernetes.operator.bluegreen.deployment-deletion.delay: "2s"
# Short abort grace period so the test doesn't wait too long
kubernetes.operator.bluegreen.abort.grace-period: "30s"
kubernetes.operator.bluegreen.reconciliation.reschedule-interval: "5s"
template:
spec:
image: flink:1.20
flinkVersion: v1_20
flinkConfiguration:
rest.port: "8081"
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: 1G
cpu: 1
taskManager:
resource:
memory: 2G
cpu: 1
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 1
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
args:
- "--error-rate"
- "0.15"
- "--sleep"
- "30"
upgradeMode: stateless
mode: native
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public class FlinkBlueGreenDeploymentStatus {
/** Last reconciled (serialized) deployment spec. */
private String lastReconciledSpec;

/**
* Last stable deployment spec. Tracks the last known-good spec where the active child was
* healthy. Restored to lastReconciledSpec when a transition is aborted.
*/
private String lastStableSpec;

/** Timestamp of last reconciliation. */
private String lastReconciledTimestamp;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.instantStrToMillis;
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.isSavepointRequired;
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.millisToInstantStr;
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.markLastReconciledSpecAsStable;
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.prepareFlinkDeployment;
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.reconcileSpecAsStable;
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.restoreLastStableSpec;
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.setLastReconciledSpec;
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.triggerSavepoint;

Expand Down Expand Up @@ -113,30 +116,35 @@ public UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
BlueGreenContext context, BlueGreenDeploymentType currentBlueGreenDeploymentType) {

if (context.getDeploymentStatus().getLastStableSpec() == null
&& context.getDeploymentStatus().getLastReconciledSpec() != null) {
markLastReconciledSpecAsStable(context);
}

BlueGreenDiffType specDiff = getSpecDiff(context);

if (specDiff != BlueGreenDiffType.IGNORE) {
FlinkDeployment currentFlinkDeployment =
context.getDeploymentByType(currentBlueGreenDeploymentType);

if (specDiff == BlueGreenDiffType.SUSPEND && currentFlinkDeployment != null) {
setLastReconciledSpec(context);
reconcileSpecAsStable(context);
LOG.info(
"In-place suspension for '{}'",
currentFlinkDeployment.getMetadata().getName());
return patchFlinkDeployment(context, currentBlueGreenDeploymentType);
}

if (specDiff == BlueGreenDiffType.RESUME && currentFlinkDeployment != null) {
setLastReconciledSpec(context);
reconcileSpecAsStable(context);
LOG.info(
"In-place resume for '{}'", currentFlinkDeployment.getMetadata().getName());
return patchFlinkDeployment(context, currentBlueGreenDeploymentType);
}

// Check if child is currently suspended - if so, just patch specs without restart
if (isChildSuspended(currentFlinkDeployment)) {
setLastReconciledSpec(context);
reconcileSpecAsStable(context);
LOG.info(
"Spec change while suspended for '{}'",
currentFlinkDeployment.getMetadata().getName());
Expand Down Expand Up @@ -168,6 +176,7 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
context, currentBlueGreenDeploymentType, currentFlinkDeployment);
} catch (Exception e) {
var error = "Could not start Transition. Details: " + e.getMessage();
restoreLastStableSpec(context);
context.getDeploymentStatus().setSavepointTriggerId(null);
return markDeploymentFailing(context, error);
}
Expand All @@ -188,10 +197,11 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
var error =
"Could not start Savepoint Redeploy Transition. Details: "
+ e.getMessage();
restoreLastStableSpec(context);
return markDeploymentFailing(context, error);
}
} else {
setLastReconciledSpec(context);
reconcileSpecAsStable(context);
LOG.info(
"Patching FlinkDeployment '{}' during checkAndInitiateDeployment",
currentFlinkDeployment.getMetadata().getName());
Expand Down Expand Up @@ -509,6 +519,7 @@ private UpdateControl<FlinkBlueGreenDeployment> finalizeSuspendedDeployment(
context.getDeploymentStatus().setDeploymentReadyTimestamp(millisToInstantStr(0));
context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
context.getDeploymentStatus().setSavepointTriggerId(null);
markLastReconciledSpecAsStable(context);

return patchStatusUpdateControl(context, nextState, JobStatus.SUSPENDED, null)
.rescheduleAfter(0);
Expand Down Expand Up @@ -685,6 +696,7 @@ private UpdateControl<FlinkBlueGreenDeployment> abortDeployment(
FlinkBlueGreenDeploymentState previousState =
getPreviousState(nextState, context.getDeployments());
context.getDeploymentStatus().setBlueGreenState(previousState);
restoreLastStableSpec(context);

var error =
String.format(
Expand Down Expand Up @@ -733,6 +745,7 @@ public UpdateControl<FlinkBlueGreenDeployment> finalizeBlueGreenDeployment(
context.getDeploymentStatus().setDeploymentReadyTimestamp(millisToInstantStr(0));
context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
context.getDeploymentStatus().setSavepointTriggerId(null);
markLastReconciledSpecAsStable(context);

updateBlueGreenIngress(context, nextState);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,36 @@ public static void setLastReconciledSpec(BlueGreenContext context) {
deploymentStatus.setLastReconciledTimestamp(Instant.now().toString());
}

/**
* Sets lastReconciledSpec AND marks it as stable. Use for non-transition operations
* (PATCH_CHILD, SUSPEND, RESUME, etc.) where the active child absorbs the change immediately.
*/
public static void reconcileSpecAsStable(BlueGreenContext context) {
setLastReconciledSpec(context);
markLastReconciledSpecAsStable(context);
}

/**
* Promotes the current lastReconciledSpec to lastStableSpec. Use at transition finalization
* when the new child is confirmed healthy.
*/
public static void markLastReconciledSpecAsStable(BlueGreenContext context) {
context.getDeploymentStatus()
.setLastStableSpec(context.getDeploymentStatus().getLastReconciledSpec());
}

/**
* Restores lastReconciledSpec from lastStableSpec. Use when a transition is aborted or fails
* to start, so that the BG controller's view of the reconciled spec matches the active child.
*/
public static void restoreLastStableSpec(BlueGreenContext context) {
String stableSpec = context.getDeploymentStatus().getLastStableSpec();
if (stableSpec != null) {
context.getDeploymentStatus().setLastReconciledSpec(stableSpec);
LOG.info("Restored lastReconciledSpec from lastStableSpec after transition failure");
}
}

public static void revertToLastSpec(BlueGreenContext context) {
context.getBgDeployment()
.setSpec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1540,6 +1540,164 @@ private static FlinkBlueGreenDeploymentSpec getTestFlinkDeploymentSpec(FlinkVers
return new FlinkBlueGreenDeploymentSpec(configuration, null, flinkDeploymentTemplateSpec);
}

// ==================== Abort + Ignore-field Drift Tests ====================

/**
* Reproduces the bug where lastReconciledSpec is not reverted during abort, causing a
* subsequent IGNORE-only spec change to trigger an unintended in-place restart of the active
* child.
*
* <p>Scenario:
*
* <ol>
* <li>Blue is ACTIVE with spec v1
* <li>User applies v2 (UPGRADE-level change via custom-configuration-field) → transition to
* Green starts
* <li>Green fails to start → abort grace period expires → abort rolls back to ACTIVE_BLUE
* <li>lastReconciledSpec is now v2 (the bug: it should be v1)
* <li>User applies v3 that differs from v2 only in IGNORE-level fields (e.g.,
* kubernetes.operator.reconcile.interval)
* <li>BG controller diffs lastReconciledSpec(v2) vs v3 → PATCH_CHILD (only IGNORE fields
* changed)
* <li>PATCH_CHILD pushes v3 to Blue child, but Blue's FDC lastReconciledSpec is still v1
* <li>FDC diffs v1 vs v3 → sees UPGRADE-level changes from the original v1→v2 transition →
* unintended in-place restart
* </ol>
*/
@ParameterizedTest
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
public void verifyIgnoreFieldChangeAfterAbortCausesUnintendedRestart(FlinkVersion flinkVersion)
throws Exception {
// ---- Phase 1: Deploy Blue with v1 spec ----
var blueGreenDeployment =
buildSessionCluster(
TEST_DEPLOYMENT_NAME,
TEST_NAMESPACE,
flinkVersion,
null,
UpgradeMode.STATELESS);

var abortGracePeriodMs = 1200;
var reconciliationReschedulingIntervalMs = 3000;
Map<String, String> configuration = blueGreenDeployment.getSpec().getConfiguration();
configuration.put(ABORT_GRACE_PERIOD.key(), String.valueOf(abortGracePeriodMs));
configuration.put(
RECONCILIATION_RESCHEDULING_INTERVAL.key(),
String.valueOf(reconciliationReschedulingIntervalMs));

var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, false, null);

assertEquals(
FlinkBlueGreenDeploymentState.ACTIVE_BLUE, rs.reconciledStatus.getBlueGreenState());
assertEquals(1, getFlinkDeployments().size());

// Capture Blue child's FDC lastReconciledSpec (v1)
var blueChild = getFlinkDeployments().get(0);
var blueChildLastReconciledSpecV1 =
blueChild.getStatus().getReconciliationStatus().getLastReconciledSpec();
assertNotNull(blueChildLastReconciledSpecV1, "Blue child should have lastReconciledSpec");

// Capture BG lastReconciledSpec (v1)
var bgLastReconciledSpecV1 = rs.reconciledStatus.getLastReconciledSpec();
assertNotNull(bgLastReconciledSpecV1, "BG should have lastReconciledSpec after deploy");

// ---- Phase 2: Apply v2 (UPGRADE-level change) → trigger transition to Green ----
String v2CustomValue = UUID.randomUUID().toString();
simulateChangeInSpec(rs.deployment, v2CustomValue, 0, null);

rs = reconcile(rs.deployment);
assertEquals(
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN,
rs.reconciledStatus.getBlueGreenState());
assertEquals(2, getFlinkDeployments().size());

// BG lastReconciledSpec is now v2
var bgLastReconciledSpecV2 = rs.reconciledStatus.getLastReconciledSpec();
assertNotEquals(
bgLastReconciledSpecV1,
bgLastReconciledSpecV2,
"BG lastReconciledSpec should have changed to v2");

// ---- Phase 3: Green fails to start → abort ----
// Simulating the Green deployment doesn't start (status remains unchanged)
Long reschedDelayMs = 0L;
for (int i = 0; i < 2; i++) {
rs = reconcile(rs.deployment);
assertTrue(rs.updateControl.isPatchStatus());
reschedDelayMs = rs.updateControl.getScheduleDelay().get();
}

// Wait for abort grace period to expire
Thread.sleep(reschedDelayMs);

// Abort should fire
rs = reconcile(rs.deployment);
assertTrue(rs.updateControl.isPatchStatus());
assertFailingJobStatus(rs);
assertEquals(
FlinkBlueGreenDeploymentState.ACTIVE_BLUE,
rs.reconciledStatus.getBlueGreenState(),
"Should roll back to ACTIVE_BLUE after abort");

// After abort, lastReconciledSpec SHOULD be reverted to match what the active
// Blue child is actually running (v1). This ensures subsequent diffs are correct.
var bgLastReconciledSpecAfterAbort = rs.reconciledStatus.getLastReconciledSpec();
assertEquals(
bgLastReconciledSpecV1,
bgLastReconciledSpecAfterAbort,
"After abort, BG lastReconciledSpec should be reverted to v1 (the active "
+ "child's spec), not left as v2 (the failed transition spec)");

// Verify Blue child is still running v1 (unchanged by the abort)
var flinkDeployments = getFlinkDeployments();
blueChild =
flinkDeployments.stream()
.filter(fd -> fd.getMetadata().getName().endsWith("-blue"))
.findFirst()
.orElseThrow();
assertEquals(
JobStatus.RUNNING,
blueChild.getStatus().getJobStatus().getState(),
"Blue child should still be RUNNING");
assertEquals(
blueChildLastReconciledSpecV1,
blueChild.getStatus().getReconciliationStatus().getLastReconciledSpec(),
"Blue child FDC lastReconciledSpec should still be v1");

// ---- Phase 4: Apply v3 with ONLY IGNORE-level field changes relative to v2 ----
// Change only kubernetes.operator.* config (DiffType.IGNORE) — no UPGRADE-level changes
FlinkDeploymentSpec spec = rs.deployment.getSpec().getTemplate().getSpec();
spec.getFlinkConfiguration().put("kubernetes.operator.reconcile.interval", "999 SECONDS");
rs.deployment.getSpec().getTemplate().setSpec(spec);
kubernetesClient.resource(rs.deployment).createOrReplace();

// ---- Phase 5: Reconcile — with correct lastReconciledSpec (v1), the BG controller
// should detect the UPGRADE-level changes from v1→v3 and trigger a proper TRANSITION,
// NOT a PATCH_CHILD that would cause an unintended in-place restart. ----
rs = reconcile(rs.deployment);

// With lastReconciledSpec correctly reverted to v1, the diff v1→v3 includes
// UPGRADE-level changes (custom-configuration-field). This should trigger a full
// blue-green TRANSITION, not a PATCH_CHILD.
assertEquals(
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN,
rs.reconciledStatus.getBlueGreenState(),
"v1→v3 contains UPGRADE-level changes (custom-configuration-field from v2), "
+ "so BG controller should trigger a TRANSITION, not PATCH_CHILD");

// Verify Green was created for the transition (2 deployments = transition in progress)
flinkDeployments = getFlinkDeployments();
assertEquals(
2,
flinkDeployments.stream()
.filter(
fd ->
fd.getMetadata().getName().endsWith("-blue")
|| fd.getMetadata().getName().endsWith("-green"))
.count(),
"A proper transition should create a Green deployment alongside Blue");
}

// ==================== Ingress Helper Methods ====================

private void assertIngressPointsToService(String expectedServiceName) {
Expand Down
Loading