Skip to content

[dvc][cc] Fix unsafe DVRT CDC version swap#2795

Open
kvargha wants to merge 7 commits into
linkedin:mainfrom
kvargha:worktree-dvrt-cdc-aa-version-swap
Open

[dvc][cc] Fix unsafe DVRT CDC version swap#2795
kvargha wants to merge 7 commits into
linkedin:mainfrom
kvargha:worktree-dvrt-cdc-aa-version-swap

Conversation

@kvargha
Copy link
Copy Markdown
Contributor

@kvargha kvargha commented May 13, 2026

Problem Statement

This is the DVRT-based CDC counterpart to #2280, which added the same version-swap-by-control-message handling to the legacy VeniceChangelogConsumerImpl. Both PRs sit on top of #2245, which introduced the multi-region VSM broadcast on the controller/server side.

The current version-swap behavior in VeniceChangelogConsumerDaVinciRecordTransformerImpl is unsafe in three ways:

  1. Acts on VSMs meant for another region. Under A/A, the same logical swap is broadcast per source region. A consumer in region X should only act on VSMs originating from region X (sourceRegion), but today's per-partition flip ignores sourceRegion and acts on the first VSM it sees.
  2. Acts on stale VSMs from a previous version. Re-pushes replay previous VTs' version swap messages into the new VT. Without filtering by generationId and oldServingVersionTopic, the consumer can be misled into a swap by a stale message left over from an earlier push (or by historical VSMs replayed after restarting from EARLIEST, or after a rollback to a lower version).
  3. No protection against the future version's ingestion outpacing the current version's. The current-version transformer is throttled by the user's post-poll processing (records have to be drained through poll() before more are ingested), but the future-version transformer's ingestion bypasses user post-poll entirely. Whenever post-poll is slow — or under A/A where DCs ingest at different rates across versions — the future version's transformer can consume past the swap point before the current version has reached it. Records that should have surfaced on the current side are then silently swallowed by the future side: data loss at swap time.

Solution

Introduce RecordTransformerVersionSwapCoordinator and route the DVRT CDC consumer's onVersionSwap through it when versionSwapByControlMessageEnabled = true. The coordinator addresses the three issues above:

  1. Region filter. isRelevant() drops VSMs whose sourceRegion is not this client's region.
  2. Generation / old-VT filter. isRelevant() drops VSMs with generationId == -1, VSMs targeting a version not greater than the highest already promoted to serving (defends against rollback + replay-from-EARLIEST), and — once a swap is armed — VSMs whose generationId or oldServingVersionTopic / newServingVersionTopic don't match the in-progress swap.
  3. Future-side pause. Once a partition has observed VSMs from every destinationRegion on the future side, the coordinator pauses Kafka prefetch on that partition until the cross-partition barrier closes. The cutover atomically flips partitionToVersionToServe and resumes the paused partitions; a watchdog force-commits if the barrier doesn't close within versionSwapTimeoutInMs (default 30 min), bounding the vulnerable window.

Backward compatible: legacy per-partition flip is preserved when the config is false (the default).

Trade-off (documented in the coordinator's class javadoc): Kafka's consumer.pause() doesn't truncate the in-flight batch, so records past the VSM in the same batch flow through processPut on both sides — surfaced on current, silently dropped on future. The design accepts this assuming approximate poll-batch alignment between current and future leaders; the watchdog bounds the window.

Code changes

  • Gated behind existing config versionSwapByControlMessageEnabled (default false). Watchdog uses existing versionSwapTimeoutInMs (default 30 min). No new configs.
  • New log lines (bounded by partitions × regions per swap; rate-limit not needed).

Concurrency-Specific Checks

  • No races: all state-mutating coordinator methods are synchronized on the instance.
  • Cutover (flip + resume) runs under the coordinator's monitor.
  • No blocking calls in the critical section; watchdog runs on a daemon executor.
  • Region accumulators are only touched under the coordinator's monitor.
  • Watchdog/commit exceptions are captured and surfaced via poll(), not silently dropped.

How was this PR tested?

  • Unit tests added: RecordTransformerVersionSwapCoordinatorTest (18), VeniceChangelogConsumerDaVinciRecordTransformerAaVersionSwapTest (7).
  • Integration tests added: TestAaVersionSwapRecordTransformer (6 e2e scenarios — pre/post-swap, mid-swap restart, watchdog timeout, back-to-back swaps, rollback, buffer pressure).
  • Extended existing tests for the new onVersionSwap() signature.
  • Backward compat verified: flag default false keeps legacy behavior.

Does this PR introduce any user-facing or breaking changes?

  • No — gated behind versionSwapByControlMessageEnabled (default false).

Introduce RecordTransformerVersionSwapCoordinator, a barrier that gates
version swap until every assigned partition observes VSMs from every
region on both current and future version topics. Replaces the legacy
per-partition immediate flip, which is incorrect under AA topology
because sibling regions may not have replicated their VSMs yet.

- Coordinator: state machine IDLE -> IN_PROGRESS ->
{COMMITTED,TIMED_OUT,FAILED};
  atomic cutover across all assigned partitions on barrier completion.
- Pauses Kafka prefetch on future-side partitions until commit via new
  pause/resume hooks on InternalDaVinciRecordTransformer + StoreIngestionTask.
- Watchdog timeout force-commits if the barrier does not close in time.
- Legacy per-partition path preserved when versionSwapByControlMessageEnabled
is false.

Tests: 18 coordinator unit tests, 7 CDC consumer AA tests, and 6 end-to-end
TestAaVersionSwapRecordTransformer scenarios.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 13, 2026 23:09
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new RecordTransformerVersionSwapCoordinator that gates the DVRT-based CDC consumer's version-swap cutover until every assigned partition has observed VSMs from every region on both the current and future version topics, then atomically flips partitionToVersionToServe for all partitions in one synchronized block. The coordinator is gated behind the existing versionSwapByControlMessageEnabled flag (default false), so legacy per-partition flip behavior is preserved.

Changes:

  • New RecordTransformerVersionSwapCoordinator (state machine: IDLE → IN_PROGRESS → {COMMITTED, TIMED_OUT, FAILED}) plumbed into VeniceChangelogConsumerDaVinciRecordTransformerImpl.
  • InternalDaVinciRecordTransformer gains pause/resume Kafka-prefetch hooks (wired by StoreIngestionTask) and onVersionSwap now takes the VersionSwap payload.
  • New unit + integration tests covering AA cutover, timeout watchdog, rollback, restart, back-to-back swaps, and buffer pressure.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 14 comments.

Show a summary per file
File Description
clients/da-vinci-client/.../RecordTransformerVersionSwapCoordinator.java New cross-region barrier coordinator with state machine, watchdog, and per-side accumulators
clients/da-vinci-client/.../VeniceChangelogConsumerDaVinciRecordTransformerImpl.java Wires the coordinator, surfaces watchdog failures via poll(), branches between AA and legacy paths in onVersionSwap
clients/da-vinci-client/.../InternalDaVinciRecordTransformer.java New pause/resume handlers and onVersionSwap(VersionSwap, …) signature; back-reference initialization
clients/da-vinci-client/.../StoreIngestionTask.java Wires pause/resume handlers from the SIT into the transformer; passes VersionSwap payload
clients/da-vinci-client/test/.../RecordTransformerVersionSwapCoordinatorTest.java 18 unit tests for the coordinator's relevance/accumulation/state-machine behaviour
clients/da-vinci-client/test/.../VeniceChangelogConsumerDaVinciRecordTransformerAaVersionSwapTest.java 7 unit tests covering pause/resume, foreign-region/legacy/stale filtering, atomic flip, failure surfacing
clients/da-vinci-client/test/.../VeniceChangelogConsumerDaVinciRecordTransformerImplTest.java Updates legacy onVersionSwap call sites to the new 4-arg signature
clients/da-vinci-client/test/.../RecordTransformerTest.java Updates onVersionSwap call site and adds pause/resume wiring test
internal/venice-test-common/.../TestAaVersionSwapRecordTransformer.java New end-to-end integration tests: pre/post swap, restart, watchdog timeout, back-to-back swaps, rollback, buffer pressure

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

}
} catch (Exception exception) {
versionSwapCoordinator.failSwap(exception);
throw exception;
Comment on lines +161 to +196
public synchronized boolean isRelevant(VersionSwap vsm, boolean isCurrentSide, int transformerVersion) {
if (vsm == null || vsm.getGenerationId() == -1) {
return false;
}
if (vsm.getSourceRegion() == null || !clientRegionName.equals(vsm.getSourceRegion().toString())) {
return false;
}
String oldTopic = vsm.getOldServingVersionTopic().toString();
String newTopic = vsm.getNewServingVersionTopic().toString();
if (!Version.isVersionTopic(newTopic)) {
return false;
}
String transformerTopic = Version.composeKafkaTopic(storeName, transformerVersion);
if (isCurrentSide) {
if (!transformerTopic.equals(oldTopic)) {
return false;
}
} else {
if (!transformerTopic.equals(newTopic)) {
return false;
}
}
int newVersion = Version.parseVersionFromVersionTopicName(newTopic);
if (newVersion <= computeMaxServedVersion()) {
return false;
}
if (state.get() == State.IN_PROGRESS) {
if (vsm.getGenerationId() != activeGenerationId) {
return false;
}
if (!oldTopic.equals(activeOldVersionTopic) || !newTopic.equals(activeNewVersionTopic)) {
return false;
}
}
return true;
}
Comment on lines +275 to +326
public synchronized void commitSwap() {
if (!state.compareAndSet(State.IN_PROGRESS, State.COMMITTED)) {
return;
}
cancelWatchdog();
try {
flipServingVersion();
resumeFutureSide();
} catch (Exception flipFailure) {
handleTerminalFailure(flipFailure);
throw flipFailure;
}
if (changeCaptureStats != null) {
changeCaptureStats.emitVersionSwapCountMetrics(SUCCESS);
}
LOGGER.info(
"Version swap committed for store: {}, swap: {} -> {}, partitions: {}",
storeName,
activeOldVersionTopic,
activeNewVersionTopic,
assignedPartitionsSnapshot);
clearSwapState();
}

/**
* Forces the cutover after the configured timeout has elapsed without all VSMs being observed.
* Counts as a recoverable success — emits a single SUCCESS metric and a WARN log; no separate
* timeout counter exists by design.
*/
public synchronized void timeoutSwap() {
if (!state.compareAndSet(State.IN_PROGRESS, State.TIMED_OUT)) {
return;
}
cancelWatchdog();
try {
flipServingVersion();
resumeFutureSide();
} catch (Exception flipFailure) {
handleTerminalFailure(flipFailure);
return;
}
if (changeCaptureStats != null) {
changeCaptureStats.emitVersionSwapCountMetrics(SUCCESS);
}
LOGGER.warn(
"Version swap timed out (forced cutover) for store: {}, swap: {} -> {}, partitions: {}",
storeName,
activeOldVersionTopic,
activeNewVersionTopic,
assignedPartitionsSnapshot);
clearSwapState();
}
private void flipServingVersion() {
for (int partition: assignedPartitionsSnapshot) {
partitionToVersionToServe.put(partition, activeNewVersion);
}
Comment on lines +456 to +464
private int computeMaxServedVersion() {
int max = -1;
for (int version: partitionToVersionToServe.values()) {
if (version > max) {
max = version;
}
}
return max;
}
Comment on lines +636 to +645
private static void drainForDuration(
PollFunction pollFn,
Map<String, ChangeEvent<GenericRecord>> seen,
List<String> orderedKeys,
long drainSeconds) {
long deadline = System.currentTimeMillis() + drainSeconds * 1000;
while (System.currentTimeMillis() < deadline) {
drain(pollFn, seen, orderedKeys);
}
}
Comment on lines +601 to +613
private static void pollUntilRangeObserved(
PollFunction pollFn,
Map<String, ChangeEvent<GenericRecord>> seen,
List<String> orderedKeys,
int startInclusive,
int endExclusive) {
TestUtils.waitForNonDeterministicAssertion(POLL_TIMEOUT_SECONDS, TimeUnit.SECONDS, true, () -> {
drain(pollFn, seen, orderedKeys);
for (int i = startInclusive; i < endExclusive; i++) {
assertNotNull(seen.get(String.valueOf(i)), "Key " + i + " not yet observed");
}
});
}
Comment on lines +123 to +124
this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(
new DaemonThreadFactory("RecordTransformerVersionSwapCoordinator-" + storeName));
Comment on lines +534 to +548
/** Convenience overload — produces records to dc-0's RT, which is where consumers subscribe. */
private void writeRecords(String storeName, int startIndex, int endIndexExclusive) {
try (VeniceSystemProducer producer = IntegrationTestPushUtils.getSamzaProducer(
childDatacenters.get(0).getClusters().get(CLUSTER_NAMES[0]),
storeName,
Version.PushType.STREAM)) {
producer.start();
for (int i = startIndex; i < endIndexExclusive; i++) {
TestChangelogKey key = new TestChangelogKey();
key.id = i;
TestChangelogValue value = new TestChangelogValue();
value.firstName = "first_" + i;
value.lastName = "last_" + i;
sendStreamingRecord(producer, storeName, key, value, null);
}
Comment on lines +428 to +432
private void flipServingVersion() {
for (int partition: assignedPartitionsSnapshot) {
partitionToVersionToServe.put(partition, activeNewVersion);
}
}
@kvargha kvargha changed the title [dvc][cc] Add cross-region version-swap coordinator for AA DVRT CDC [dvc][cc] Handle new A/A version swap messages in DVRT CDC consumer May 13, 2026
@kvargha kvargha changed the title [dvc][cc] Handle new A/A version swap messages in DVRT CDC consumer [dvc][cc] Fix unsafe DVRT CDC version swap May 13, 2026
kvargha and others added 2 commits May 13, 2026 16:32
worktree-dvrt-cdc-aa-version-swap
Fixes 2 SpotBugs failures in the AA version-swap tests:

- DP_DO_INSIDE_DO_PRIVILEGED (5x Field.setAccessible calls in the unit
  test's setUp + 3 test methods): replaced with @VisibleForTesting
  getters/setters on VeniceChangelogConsumerDaVinciRecordTransformerImpl.
  Drops `final` from changeCaptureStats and versionSwapCoordinator so
  tests can swap them in. Matches the @VisibleForTesting idiom already
  used 8x in the same production class.
- DE_MIGHT_IGNORE (TestAaVersionSwapRecordTransformer.deleteStoreQuietly
  silently swallowed Exception): now logs at debug level.

Verified locally: spotbugsTest + spotbugsIntegrationTest both pass; all
8 tests in VeniceChangelogConsumerDaVinciRecordTransformerAaVersionSwapTest
still pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 14, 2026 17:26
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 15 comments.

Comment on lines +500 to +505
Exception versionSwapException = versionSwapThreadException.get();
if (versionSwapException != null) {
throw new VeniceException(
"Version Swap failed for store: " + storeName + " due to exception:",
versionSwapException);
}
Comment on lines +275 to +297
public synchronized void commitSwap() {
if (!state.compareAndSet(State.IN_PROGRESS, State.COMMITTED)) {
return;
}
cancelWatchdog();
try {
flipServingVersion();
resumeFutureSide();
} catch (Exception flipFailure) {
handleTerminalFailure(flipFailure);
throw flipFailure;
}
if (changeCaptureStats != null) {
changeCaptureStats.emitVersionSwapCountMetrics(SUCCESS);
}
LOGGER.info(
"Version swap committed for store: {}, swap: {} -> {}, partitions: {}",
storeName,
activeOldVersionTopic,
activeNewVersionTopic,
assignedPartitionsSnapshot);
clearSwapState();
}
futureVersionRegionsConsumed.clear();
pausedCurrentPartitions.clear();
pausedFuturePartitions.clear();
assignedPartitionsSnapshot.clear();
Comment on lines +168 to +183
String oldTopic = vsm.getOldServingVersionTopic().toString();
String newTopic = vsm.getNewServingVersionTopic().toString();
if (!Version.isVersionTopic(newTopic)) {
return false;
}
String transformerTopic = Version.composeKafkaTopic(storeName, transformerVersion);
if (isCurrentSide) {
if (!transformerTopic.equals(oldTopic)) {
return false;
}
} else {
if (!transformerTopic.equals(newTopic)) {
return false;
}
}
int newVersion = Version.parseVersionFromVersionTopicName(newTopic);
Comment on lines +398 to +415
private void armIfNeeded(VersionSwap vsm) {
State current = state.get();
if (current == State.IN_PROGRESS) {
return;
}
activeGenerationId = vsm.getGenerationId();
activeOldVersionTopic = vsm.getOldServingVersionTopic().toString();
activeNewVersionTopic = vsm.getNewServingVersionTopic().toString();
activeNewVersion = Version.parseVersionFromVersionTopicName(activeNewVersionTopic);
currentVersionRegionsConsumed.clear();
futureVersionRegionsConsumed.clear();
pausedCurrentPartitions.clear();
pausedFuturePartitions.clear();
assignedPartitionsSnapshot.clear();
assignedPartitionsSnapshot.addAll(subscribedPartitions);
state.set(State.IN_PROGRESS);
scheduleTimeoutWatchdog();
}
Comment on lines +684 to +692
private void deleteStoreQuietly(String storeName) {
CompletableFuture.runAsync(() -> {
try {
parentControllerClient.disableAndDeleteStore(storeName);
} catch (Exception e) {
LOGGER.debug("Best-effort cleanup of store {} failed", storeName, e);
}
});
}
Comment on lines +469 to +478
java.util.List<Integer> paused = new java.util.ArrayList<>();
java.util.List<Integer> resumed = new java.util.ArrayList<>();
internalRecordTransformer.setPartitionPauseHandlers(paused::add, resumed::add);

internalRecordTransformer.pausePartitionConsumption(3);
internalRecordTransformer.pausePartitionConsumption(7);
internalRecordTransformer.resumePartitionConsumption(3);

assertEquals(paused, java.util.Arrays.asList(3, 7));
assertEquals(resumed, java.util.Collections.singletonList(3));
Comment on lines +63 to +109
org.mockito.Mockito.when(schemaReader.getKeySchema()).thenReturn(keySchema);
org.mockito.Mockito.when(schemaReader.getValueSchema(1)).thenReturn(valueSchema);

changelogClientConfig = new ChangelogClientConfig<>().setD2ControllerClient(mock(D2ControllerClient.class))
.setSchemaReader(schemaReader)
.setStoreName(STORE)
.setControllerD2ServiceName(D2_SERVICE_NAME)
.setD2ServiceName(DEFAULT_CLUSTER_DISCOVERY_D2_SERVICE_NAME)
.setConsumerProperties(new Properties())
.setLocalD2ZkHosts("test_zookeeper")
.setD2Client(mock(D2Client.class))
.setVersionSwapByControlMessageEnabled(true)
.setClientRegionName(CLIENT_REGION)
.setTotalRegionCount(TOTAL_REGIONS);
changelogClientConfig.getInnerClientConfig()
.setMetricsRepository(getVeniceMetricsRepository(CHANGE_DATA_CAPTURE_CLIENT, CONSUMER_METRIC_ENTITIES, true));

VeniceChangelogConsumerClientFactory factory =
spy(new VeniceChangelogConsumerClientFactory(changelogClientConfig, null));
consumer = spy(new VeniceChangelogConsumerDaVinciRecordTransformerImpl<>(changelogClientConfig, factory));

// Spy stats so we can verify metric emissions
stats = spy(consumer.getChangeCaptureStats());
consumer.setChangeCaptureStats(stats);

// Coordinator was initialized with the original (pre-spy) stats reference; rebuild it so its
// metric emissions are routed through the spy.
consumer.setVersionSwapCoordinator(
new RecordTransformerVersionSwapCoordinator(
STORE,
CLIENT_REGION,
TOTAL_REGIONS,
changelogClientConfig.getVersionSwapTimeoutInMs(),
stats,
consumer.getPartitionToVersionToServe(),
consumer.getSubscribedPartitions(),
consumer.getVersionSwapThreadException()::set));

currentTransformer = consumer.new DaVinciRecordTransformerChangelogConsumer(STORE, CURRENT_VERSION, keySchema,
valueSchema, valueSchema, mock(com.linkedin.davinci.client.DaVinciRecordTransformerConfig.class));
futureTransformer = consumer.new DaVinciRecordTransformerChangelogConsumer(STORE, FUTURE_VERSION, keySchema,
valueSchema, valueSchema, mock(com.linkedin.davinci.client.DaVinciRecordTransformerConfig.class));

currentInternal =
(InternalDaVinciRecordTransformer<Integer, Integer, Integer>) mock(InternalDaVinciRecordTransformer.class);
futureInternal =
(InternalDaVinciRecordTransformer<Integer, Integer, Integer>) mock(InternalDaVinciRecordTransformer.class);
Comment on lines 116 to +127
/**
* Lifecycle event triggered when a version swap is detected for partitionId
* It is used for DVRT CDC.
* Lifecycle event triggered when a version swap is detected for partitionId.
* It is used for the DaVinciRecordTransformer CDC consumer. The {@code versionSwap} payload is
* required for AA-aware version-swap coordination (region filtering, generation-id matching);
* it may be null when invoked from legacy or test code paths.
*/
public void onVersionSwap(int currentVersion, int futureVersion, int partitionId) {
public void onVersionSwap(VersionSwap versionSwap, int currentVersion, int futureVersion, int partitionId) {
if (isCDCRecordTransformer()) {
((VeniceChangelogConsumerDaVinciRecordTransformerImpl.DaVinciRecordTransformerChangelogConsumer) this.recordTransformer)
.onVersionSwap(currentVersion, futureVersion, partitionId);
.onVersionSwap(versionSwap, currentVersion, futureVersion, partitionId);
}
}
Comment on lines +393 to +397
public synchronized void shutdown() {
cancelWatchdog();
timeoutExecutor.shutdownNow();
}

kvargha and others added 2 commits May 14, 2026 11:13
testAaVersionSwapRollback was asserting parent.currentVersion == 2 after
rolling back from v3, but a bug in upstream PR linkedin#2785's
VeniceParentHelixAdmin.updateParentVersionStatusAfterRollback
double-decrements parent.currentVersion when 3+ versions exist: the
admin-task handler decrements 3->2 (via VeniceHelixAdmin), then
updateParentVersionStatusAfterRollback reads store.getCurrentVersion()
(now 2), computes getBackupVersionNumber(versions, 2) = 1, and
decrements again to 1.

Child controllers decrement correctly to 2, so switch the assertion to
getColoToCurrentVersions() which reflects each child's view. This will
need to be tracked for upstream fix separately; the test sidesteps the
parent-side double decrement.

Verified locally: testAaVersionSwapRollback PASSED (222s).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The class lands in the catch-all IntegrationTests_99 by default. With 7
tests totaling ~13 minutes, that shard overflows the 15-min CI limit
(cancelled in PR linkedin#2795 CI). Assign it to its own shard so the runtime
sits well within the per-shard budget.

This is a manual entry pending the next run of
scripts/ci/rebalance_test_shards.py with collected timing data.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 14, 2026 21:31
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 10 comments.

Comment on lines +328 to +342
private void handleTerminalFailure(Exception error) {
if (changeCaptureStats != null) {
changeCaptureStats.emitVersionSwapCountMetrics(FAIL);
}
if (failureSurface != null) {
failureSurface.accept(error);
}
LOGGER.error(
"Version swap commit failed for store: {}, swap: {} -> {}",
storeName,
activeOldVersionTopic,
activeNewVersionTopic,
error);
clearSwapState();
}
private void flipServingVersion() {
for (int partition: assignedPartitionsSnapshot) {
partitionToVersionToServe.put(partition, activeNewVersion);
}
Comment on lines +500 to +505
Exception versionSwapException = versionSwapThreadException.get();
if (versionSwapException != null) {
throw new VeniceException(
"Version Swap failed for store: " + storeName + " due to exception:",
versionSwapException);
}
}
if (vsm.getSourceRegion() == null || !clientRegionName.equals(vsm.getSourceRegion().toString())) {
return false;
}
Comment on lines +967 to +990
try {
boolean isCurrentSide = currentVersion == getStoreVersion();
boolean isFutureSide = futureVersion == getStoreVersion();
if (!versionSwapCoordinator.isRelevant(versionSwap, isCurrentSide, getStoreVersion())) {
return;
}

if (isCurrentSide) {
if (versionSwapCoordinator.recordCurrentVsm(versionSwap, partitionId, internalRecordTransformer)) {
internalRecordTransformer.pausePartitionConsumption(partitionId);
}
} else if (isFutureSide) {
if (versionSwapCoordinator.recordFutureVsm(versionSwap, partitionId, internalRecordTransformer)) {
internalRecordTransformer.pausePartitionConsumption(partitionId);
}
}

if (versionSwapCoordinator.allPartitionsBothSidesComplete()) {
versionSwapCoordinator.commitSwap();
}
} catch (Exception exception) {
versionSwapCoordinator.failSwap(exception);
throw exception;
}
Comment on lines +689 to +697
private void deleteStoreQuietly(String storeName) {
CompletableFuture.runAsync(() -> {
try {
parentControllerClient.disableAndDeleteStore(storeName);
} catch (Exception e) {
LOGGER.debug("Best-effort cleanup of store {} failed", storeName, e);
}
});
}
Comment on lines +644 to +687
private static void drainForDuration(
PollFunction pollFn,
Map<String, ChangeEvent<GenericRecord>> seen,
List<String> orderedKeys,
long drainSeconds) {
long deadline = System.currentTimeMillis() + drainSeconds * 1000;
while (System.currentTimeMillis() < deadline) {
drain(pollFn, seen, orderedKeys);
}
}

/**
* Functional adapter so the same drain helpers work with both {@link VeniceChangelogConsumer}
* and {@link StatefulVeniceChangelogConsumer}, which don't share an interface.
*/
@FunctionalInterface
private interface PollFunction {
@SuppressWarnings("rawtypes")
Collection<? extends PubSubMessage> poll(long timeoutMs);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private static void drain(
PollFunction pollFn,
Map<String, ChangeEvent<GenericRecord>> seen,
List<String> orderedKeys) {
Collection<? extends PubSubMessage> messages = pollFn.poll(500);
for (PubSubMessage message: messages) {
if (message.getKey() == null) {
continue; // skip control messages
}
String keyId;
Object key = message.getKey();
if (key instanceof GenericRecord) {
keyId = String.valueOf(((GenericRecord) key).get("id"));
} else if (key instanceof TestChangelogKey) {
keyId = String.valueOf(((TestChangelogKey) key).id);
} else {
keyId = String.valueOf(key);
}
seen.put(keyId, (ChangeEvent<GenericRecord>) message.getValue());
orderedKeys.add(keyId);
}
}
Comment on lines +469 to +478
java.util.List<Integer> paused = new java.util.ArrayList<>();
java.util.List<Integer> resumed = new java.util.ArrayList<>();
internalRecordTransformer.setPartitionPauseHandlers(paused::add, resumed::add);

internalRecordTransformer.pausePartitionConsumption(3);
internalRecordTransformer.pausePartitionConsumption(7);
internalRecordTransformer.resumePartitionConsumption(3);

assertEquals(paused, java.util.Arrays.asList(3, 7));
assertEquals(resumed, java.util.Collections.singletonList(3));
Comment on lines +63 to +267
org.mockito.Mockito.when(schemaReader.getKeySchema()).thenReturn(keySchema);
org.mockito.Mockito.when(schemaReader.getValueSchema(1)).thenReturn(valueSchema);

changelogClientConfig = new ChangelogClientConfig<>().setD2ControllerClient(mock(D2ControllerClient.class))
.setSchemaReader(schemaReader)
.setStoreName(STORE)
.setControllerD2ServiceName(D2_SERVICE_NAME)
.setD2ServiceName(DEFAULT_CLUSTER_DISCOVERY_D2_SERVICE_NAME)
.setConsumerProperties(new Properties())
.setLocalD2ZkHosts("test_zookeeper")
.setD2Client(mock(D2Client.class))
.setVersionSwapByControlMessageEnabled(true)
.setClientRegionName(CLIENT_REGION)
.setTotalRegionCount(TOTAL_REGIONS);
changelogClientConfig.getInnerClientConfig()
.setMetricsRepository(getVeniceMetricsRepository(CHANGE_DATA_CAPTURE_CLIENT, CONSUMER_METRIC_ENTITIES, true));

VeniceChangelogConsumerClientFactory factory =
spy(new VeniceChangelogConsumerClientFactory(changelogClientConfig, null));
consumer = spy(new VeniceChangelogConsumerDaVinciRecordTransformerImpl<>(changelogClientConfig, factory));

// Spy stats so we can verify metric emissions
stats = spy(consumer.getChangeCaptureStats());
consumer.setChangeCaptureStats(stats);

// Coordinator was initialized with the original (pre-spy) stats reference; rebuild it so its
// metric emissions are routed through the spy.
consumer.setVersionSwapCoordinator(
new RecordTransformerVersionSwapCoordinator(
STORE,
CLIENT_REGION,
TOTAL_REGIONS,
changelogClientConfig.getVersionSwapTimeoutInMs(),
stats,
consumer.getPartitionToVersionToServe(),
consumer.getSubscribedPartitions(),
consumer.getVersionSwapThreadException()::set));

currentTransformer = consumer.new DaVinciRecordTransformerChangelogConsumer(STORE, CURRENT_VERSION, keySchema,
valueSchema, valueSchema, mock(com.linkedin.davinci.client.DaVinciRecordTransformerConfig.class));
futureTransformer = consumer.new DaVinciRecordTransformerChangelogConsumer(STORE, FUTURE_VERSION, keySchema,
valueSchema, valueSchema, mock(com.linkedin.davinci.client.DaVinciRecordTransformerConfig.class));

currentInternal =
(InternalDaVinciRecordTransformer<Integer, Integer, Integer>) mock(InternalDaVinciRecordTransformer.class);
futureInternal =
(InternalDaVinciRecordTransformer<Integer, Integer, Integer>) mock(InternalDaVinciRecordTransformer.class);
currentTransformer.setInternalRecordTransformer(currentInternal);
futureTransformer.setInternalRecordTransformer(futureInternal);

// Both partitions assigned and starting on CURRENT_VERSION
Map<Integer, Integer> partitionToVersionToServe = consumer.getPartitionToVersionToServe();
Set<Integer> subscribed = consumer.getSubscribedPartitions();
for (int p = 0; p < TOTAL_REGIONS; p++) {
partitionToVersionToServe.put(p, CURRENT_VERSION);
subscribed.add(p);
}
}

private VersionSwap newVsm(long generationId, String sourceRegion, String destRegion, int oldV, int newV) {
VersionSwap vs = new VersionSwap();
vs.oldServingVersionTopic = new Utf8(Version.composeKafkaTopic(STORE, oldV));
vs.newServingVersionTopic = new Utf8(Version.composeKafkaTopic(STORE, newV));
vs.sourceRegion = new Utf8(sourceRegion);
vs.destinationRegion = new Utf8(destRegion);
vs.generationId = generationId;
return vs;
}

@Test
public void testOnVersionSwapAaPathPausesCurrentOnPartitionComplete() {
VersionSwap fromA = newVsm(1L, CLIENT_REGION, DEST_A, CURRENT_VERSION, FUTURE_VERSION);
VersionSwap fromB = newVsm(1L, CLIENT_REGION, DEST_B, CURRENT_VERSION, FUTURE_VERSION);

// First region — partition 0 not yet complete
currentTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, 0);
verify(currentInternal, never()).pausePartitionConsumption(0);

// Second region — partition 0 now complete on current side; pause expected
currentTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, 0);
verify(currentInternal, times(1)).pausePartitionConsumption(0);
}

@Test
public void testOnVersionSwapAaPathPausesFutureOnPartitionComplete() {
VersionSwap fromA = newVsm(2L, CLIENT_REGION, DEST_A, CURRENT_VERSION, FUTURE_VERSION);
VersionSwap fromB = newVsm(2L, CLIENT_REGION, DEST_B, CURRENT_VERSION, FUTURE_VERSION);

futureTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, 0);
verify(futureInternal, never()).pausePartitionConsumption(0);

futureTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, 0);
verify(futureInternal, times(1)).pausePartitionConsumption(0);
}

@Test
public void testOnVersionSwapAaPathDropsForeignRegionVsm() {
VersionSwap foreign = newVsm(3L, "different-region", DEST_A, CURRENT_VERSION, FUTURE_VERSION);
currentTransformer.onVersionSwap(foreign, CURRENT_VERSION, FUTURE_VERSION, 0);
verify(currentInternal, never()).pausePartitionConsumption(0);
verify(stats, never()).emitVersionSwapCountMetrics(SUCCESS);
}

@Test
public void testOnVersionSwapAaPathDropsLegacyGenerationIdVsm() {
VersionSwap legacy = newVsm(-1L, CLIENT_REGION, DEST_A, CURRENT_VERSION, FUTURE_VERSION);
currentTransformer.onVersionSwap(legacy, CURRENT_VERSION, FUTURE_VERSION, 0);
verify(currentInternal, never()).pausePartitionConsumption(0);
verify(stats, never()).emitVersionSwapCountMetrics(SUCCESS);
}

@Test
public void testOnVersionSwapAaPathDropsStaleGenerationIdVsm() {
VersionSwap firstA = newVsm(10L, CLIENT_REGION, DEST_A, CURRENT_VERSION, FUTURE_VERSION);
VersionSwap stale = newVsm(9L, CLIENT_REGION, DEST_B, CURRENT_VERSION, FUTURE_VERSION);

currentTransformer.onVersionSwap(firstA, CURRENT_VERSION, FUTURE_VERSION, 0);
// Stale generation arrives mid-swap — must be ignored
currentTransformer.onVersionSwap(stale, CURRENT_VERSION, FUTURE_VERSION, 0);
// Only the original region was accumulated; partition not yet complete
verify(currentInternal, never()).pausePartitionConsumption(0);
}

@Test
public void testOnVersionSwapAaPathDropsStaleRollbackVsm() {
Map<Integer, Integer> partitionToVersionToServe = consumer.getPartitionToVersionToServe();
// Already serving v6 — rollback re-emitting v3->v4 must be ignored
partitionToVersionToServe.put(0, 6);
partitionToVersionToServe.put(1, 6);

VersionSwap rollback = newVsm(50L, CLIENT_REGION, DEST_A, 3, 4);
currentTransformer.onVersionSwap(rollback, 3, 4, 0);
verify(currentInternal, never()).pausePartitionConsumption(0);
}

@Test
public void testOnVersionSwapAaPathFullSwapEmitsSingleSuccessAndFlipsAllPartitions() {
VersionSwap fromA = newVsm(20L, CLIENT_REGION, DEST_A, CURRENT_VERSION, FUTURE_VERSION);
VersionSwap fromB = newVsm(20L, CLIENT_REGION, DEST_B, CURRENT_VERSION, FUTURE_VERSION);

for (int p = 0; p < TOTAL_REGIONS; p++) {
currentTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, p);
currentTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, p);
futureTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, p);
futureTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, p);
}

Map<Integer, Integer> partitionToVersionToServe = consumer.getPartitionToVersionToServe();
for (int p = 0; p < TOTAL_REGIONS; p++) {
assertNotNull(partitionToVersionToServe.get(p));
org.testng.Assert.assertEquals(partitionToVersionToServe.get(p).intValue(), FUTURE_VERSION);
}
// Single SUCCESS metric for the entire swap
verify(stats, times(1)).emitVersionSwapCountMetrics(SUCCESS);
verify(stats, never()).emitVersionSwapCountMetrics(FAIL);
// Future side resumed for every partition
for (int p = 0; p < TOTAL_REGIONS; p++) {
verify(futureInternal, atLeastOnce()).resumePartitionConsumption(p);
}
}

@Test
public void testOnVersionSwapAaPathFailureSurfacesViaPoll() {
// Wrap partitionToVersionToServe in a map that throws on the second put — simulates an
// unexpected failure mid-commit. The coordinator catches via failSwap and stashes the exception
// so the next poll() throws.
java.util.Map<Integer, Integer> throwingMap = new java.util.concurrent.ConcurrentHashMap<Integer, Integer>() {
@Override
public Integer put(Integer key, Integer value) {
throw new IllegalStateException("induced failure during commit");
}

@Override
public java.util.Collection<Integer> values() {
return java.util.Collections.singletonList(CURRENT_VERSION);
}
};

RecordTransformerVersionSwapCoordinator failingCoordinator = new RecordTransformerVersionSwapCoordinator(
STORE,
CLIENT_REGION,
TOTAL_REGIONS,
changelogClientConfig.getVersionSwapTimeoutInMs(),
stats,
throwingMap,
consumer.getSubscribedPartitions(),
consumer.getVersionSwapThreadException()::set);
consumer.setVersionSwapCoordinator(failingCoordinator);

VersionSwap fromA = newVsm(30L, CLIENT_REGION, DEST_A, CURRENT_VERSION, FUTURE_VERSION);
VersionSwap fromB = newVsm(30L, CLIENT_REGION, DEST_B, CURRENT_VERSION, FUTURE_VERSION);

currentTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, 0);
currentTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, 0);
currentTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, 1);
currentTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, 1);
futureTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, 0);
futureTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, 1);
futureTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, 1);
// Last call completes the barrier and triggers commitSwap which throws via the throwing map
assertThrows(Exception.class, () -> futureTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, 0));

verify(stats, times(1)).emitVersionSwapCountMetrics(FAIL);
// Next poll() observes the stashed exception
assertThrows(VeniceException.class, () -> consumer.poll(1L));
Comment on lines +429 to +430
for (int partition: assignedPartitionsSnapshot) {
partitionToVersionToServe.put(partition, activeNewVersion);
kvargha and others added 2 commits May 14, 2026 15:41
The monolithic TestAaVersionSwapRecordTransformer (~13 min runtime across
7 tests) exceeds the 15-min per-shard CI budget even when given its own
shard. Split by topic into two classes that each fit comfortably, and
rename to better reflect what's actually under test (the DVRT-based CDC
consumer's behavior under AA version swaps, not the RecordTransformer
class itself).

- AbstractDvrtCdcAaVersionSwapTest — shared multi-region cluster setup,
  helpers (createAaHybridStore, emptyPush, writeRecords, buildClientConfig,
  pollUntilRangeObserved, drainForDuration, assertNoLoss, deleteStoreQuietly),
  and constants. Pattern matches AbstractTestRepush.
- TestDvrtCdcAaVersionSwap — single-swap basic + edge scenarios: stateful,
  stateless, mid-swap restart, watchdog timeout, buffer pressure (~440s).
- TestDvrtCdcAaVersionSwapMultiVersion — multi-version scenarios:
  back-to-back swaps, rollback (~387s).

Each subclass lands in its own dedicated shard (86 and 87).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ivate

SpotBugs MS_PKGPROTECT flagged the protected static final String[] — a
mutable array exposed to subclasses in any package. Both concrete
subclasses are in the same package, so package-private access is
sufficient and silences the warning.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 15, 2026 03:00
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 13 out of 13 changed files in this pull request and generated 12 comments.

Comment on lines +500 to +505
Exception versionSwapException = versionSwapThreadException.get();
if (versionSwapException != null) {
throw new VeniceException(
"Version Swap failed for store: " + storeName + " due to exception:",
versionSwapException);
}
Comment on lines +207 to +246
public synchronized boolean recordCurrentVsm(
VersionSwap vsm,
int partition,
InternalDaVinciRecordTransformer<?, ?, ?> currentTransformer) {
armIfNeeded(vsm);
currentTransformerRef = currentTransformer;
if (!assignedPartitionsSnapshot.contains(partition)) {
// Partition was added mid-swap; it will be considered on the next swap.
return false;
}
Set<String> regions = currentVersionRegionsConsumed.computeIfAbsent(partition, p -> new HashSet<>());
regions.add(vsm.getDestinationRegion().toString());
boolean complete = regions.size() >= totalRegionCount;
if (complete) {
pausedCurrentPartitions.add(partition);
}
return complete;
}

/**
* Records a future-side (new-version) VSM observation for {@code partition}. Returns true when
* this partition has now observed one VSM from every destination-region on the future side.
*/
public synchronized boolean recordFutureVsm(
VersionSwap vsm,
int partition,
InternalDaVinciRecordTransformer<?, ?, ?> futureTransformer) {
armIfNeeded(vsm);
futureTransformerRef = futureTransformer;
if (!assignedPartitionsSnapshot.contains(partition)) {
return false;
}
Set<String> regions = futureVersionRegionsConsumed.computeIfAbsent(partition, p -> new HashSet<>());
regions.add(vsm.getDestinationRegion().toString());
boolean complete = regions.size() >= totalRegionCount;
if (complete) {
pausedFuturePartitions.add(partition);
}
return complete;
}
Comment on lines +398 to +419
private void armIfNeeded(VersionSwap vsm) {
State current = state.get();
if (current == State.IN_PROGRESS) {
return;
}
activeGenerationId = vsm.getGenerationId();
activeOldVersionTopic = vsm.getOldServingVersionTopic().toString();
activeNewVersionTopic = vsm.getNewServingVersionTopic().toString();
activeNewVersion = Version.parseVersionFromVersionTopicName(activeNewVersionTopic);
currentVersionRegionsConsumed.clear();
futureVersionRegionsConsumed.clear();
pausedCurrentPartitions.clear();
pausedFuturePartitions.clear();
assignedPartitionsSnapshot.clear();
assignedPartitionsSnapshot.addAll(subscribedPartitions);
state.set(State.IN_PROGRESS);
scheduleTimeoutWatchdog();
}

private void scheduleTimeoutWatchdog() {
timeoutWatchdog = timeoutExecutor.schedule(this::timeoutSwap, versionSwapTimeoutInMs, TimeUnit.MILLISECONDS);
}
Comment on lines +168 to +172
String oldTopic = vsm.getOldServingVersionTopic().toString();
String newTopic = vsm.getNewServingVersionTopic().toString();
if (!Version.isVersionTopic(newTopic)) {
return false;
}
Comment on lines +217 to +240
Set<String> regions = currentVersionRegionsConsumed.computeIfAbsent(partition, p -> new HashSet<>());
regions.add(vsm.getDestinationRegion().toString());
boolean complete = regions.size() >= totalRegionCount;
if (complete) {
pausedCurrentPartitions.add(partition);
}
return complete;
}

/**
* Records a future-side (new-version) VSM observation for {@code partition}. Returns true when
* this partition has now observed one VSM from every destination-region on the future side.
*/
public synchronized boolean recordFutureVsm(
VersionSwap vsm,
int partition,
InternalDaVinciRecordTransformer<?, ?, ?> futureTransformer) {
armIfNeeded(vsm);
futureTransformerRef = futureTransformer;
if (!assignedPartitionsSnapshot.contains(partition)) {
return false;
}
Set<String> regions = futureVersionRegionsConsumed.computeIfAbsent(partition, p -> new HashSet<>());
regions.add(vsm.getDestinationRegion().toString());
Comment on lines +359 to +367
protected void deleteStoreQuietly(String storeName) {
CompletableFuture.runAsync(() -> {
try {
parentControllerClient.disableAndDeleteStore(storeName);
} catch (Exception e) {
LOGGER.debug("Best-effort cleanup of store {} failed", storeName, e);
}
});
}
Comment on lines +469 to +478
java.util.List<Integer> paused = new java.util.ArrayList<>();
java.util.List<Integer> resumed = new java.util.ArrayList<>();
internalRecordTransformer.setPartitionPauseHandlers(paused::add, resumed::add);

internalRecordTransformer.pausePartitionConsumption(3);
internalRecordTransformer.pausePartitionConsumption(7);
internalRecordTransformer.resumePartitionConsumption(3);

assertEquals(paused, java.util.Arrays.asList(3, 7));
assertEquals(resumed, java.util.Collections.singletonList(3));
Comment on lines +62 to +267
Schema valueSchema = Schema.create(Schema.Type.INT);
org.mockito.Mockito.when(schemaReader.getKeySchema()).thenReturn(keySchema);
org.mockito.Mockito.when(schemaReader.getValueSchema(1)).thenReturn(valueSchema);

changelogClientConfig = new ChangelogClientConfig<>().setD2ControllerClient(mock(D2ControllerClient.class))
.setSchemaReader(schemaReader)
.setStoreName(STORE)
.setControllerD2ServiceName(D2_SERVICE_NAME)
.setD2ServiceName(DEFAULT_CLUSTER_DISCOVERY_D2_SERVICE_NAME)
.setConsumerProperties(new Properties())
.setLocalD2ZkHosts("test_zookeeper")
.setD2Client(mock(D2Client.class))
.setVersionSwapByControlMessageEnabled(true)
.setClientRegionName(CLIENT_REGION)
.setTotalRegionCount(TOTAL_REGIONS);
changelogClientConfig.getInnerClientConfig()
.setMetricsRepository(getVeniceMetricsRepository(CHANGE_DATA_CAPTURE_CLIENT, CONSUMER_METRIC_ENTITIES, true));

VeniceChangelogConsumerClientFactory factory =
spy(new VeniceChangelogConsumerClientFactory(changelogClientConfig, null));
consumer = spy(new VeniceChangelogConsumerDaVinciRecordTransformerImpl<>(changelogClientConfig, factory));

// Spy stats so we can verify metric emissions
stats = spy(consumer.getChangeCaptureStats());
consumer.setChangeCaptureStats(stats);

// Coordinator was initialized with the original (pre-spy) stats reference; rebuild it so its
// metric emissions are routed through the spy.
consumer.setVersionSwapCoordinator(
new RecordTransformerVersionSwapCoordinator(
STORE,
CLIENT_REGION,
TOTAL_REGIONS,
changelogClientConfig.getVersionSwapTimeoutInMs(),
stats,
consumer.getPartitionToVersionToServe(),
consumer.getSubscribedPartitions(),
consumer.getVersionSwapThreadException()::set));

currentTransformer = consumer.new DaVinciRecordTransformerChangelogConsumer(STORE, CURRENT_VERSION, keySchema,
valueSchema, valueSchema, mock(com.linkedin.davinci.client.DaVinciRecordTransformerConfig.class));
futureTransformer = consumer.new DaVinciRecordTransformerChangelogConsumer(STORE, FUTURE_VERSION, keySchema,
valueSchema, valueSchema, mock(com.linkedin.davinci.client.DaVinciRecordTransformerConfig.class));

currentInternal =
(InternalDaVinciRecordTransformer<Integer, Integer, Integer>) mock(InternalDaVinciRecordTransformer.class);
futureInternal =
(InternalDaVinciRecordTransformer<Integer, Integer, Integer>) mock(InternalDaVinciRecordTransformer.class);
currentTransformer.setInternalRecordTransformer(currentInternal);
futureTransformer.setInternalRecordTransformer(futureInternal);

// Both partitions assigned and starting on CURRENT_VERSION
Map<Integer, Integer> partitionToVersionToServe = consumer.getPartitionToVersionToServe();
Set<Integer> subscribed = consumer.getSubscribedPartitions();
for (int p = 0; p < TOTAL_REGIONS; p++) {
partitionToVersionToServe.put(p, CURRENT_VERSION);
subscribed.add(p);
}
}

private VersionSwap newVsm(long generationId, String sourceRegion, String destRegion, int oldV, int newV) {
VersionSwap vs = new VersionSwap();
vs.oldServingVersionTopic = new Utf8(Version.composeKafkaTopic(STORE, oldV));
vs.newServingVersionTopic = new Utf8(Version.composeKafkaTopic(STORE, newV));
vs.sourceRegion = new Utf8(sourceRegion);
vs.destinationRegion = new Utf8(destRegion);
vs.generationId = generationId;
return vs;
}

@Test
public void testOnVersionSwapAaPathPausesCurrentOnPartitionComplete() {
VersionSwap fromA = newVsm(1L, CLIENT_REGION, DEST_A, CURRENT_VERSION, FUTURE_VERSION);
VersionSwap fromB = newVsm(1L, CLIENT_REGION, DEST_B, CURRENT_VERSION, FUTURE_VERSION);

// First region — partition 0 not yet complete
currentTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, 0);
verify(currentInternal, never()).pausePartitionConsumption(0);

// Second region — partition 0 now complete on current side; pause expected
currentTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, 0);
verify(currentInternal, times(1)).pausePartitionConsumption(0);
}

@Test
public void testOnVersionSwapAaPathPausesFutureOnPartitionComplete() {
VersionSwap fromA = newVsm(2L, CLIENT_REGION, DEST_A, CURRENT_VERSION, FUTURE_VERSION);
VersionSwap fromB = newVsm(2L, CLIENT_REGION, DEST_B, CURRENT_VERSION, FUTURE_VERSION);

futureTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, 0);
verify(futureInternal, never()).pausePartitionConsumption(0);

futureTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, 0);
verify(futureInternal, times(1)).pausePartitionConsumption(0);
}

@Test
public void testOnVersionSwapAaPathDropsForeignRegionVsm() {
VersionSwap foreign = newVsm(3L, "different-region", DEST_A, CURRENT_VERSION, FUTURE_VERSION);
currentTransformer.onVersionSwap(foreign, CURRENT_VERSION, FUTURE_VERSION, 0);
verify(currentInternal, never()).pausePartitionConsumption(0);
verify(stats, never()).emitVersionSwapCountMetrics(SUCCESS);
}

@Test
public void testOnVersionSwapAaPathDropsLegacyGenerationIdVsm() {
VersionSwap legacy = newVsm(-1L, CLIENT_REGION, DEST_A, CURRENT_VERSION, FUTURE_VERSION);
currentTransformer.onVersionSwap(legacy, CURRENT_VERSION, FUTURE_VERSION, 0);
verify(currentInternal, never()).pausePartitionConsumption(0);
verify(stats, never()).emitVersionSwapCountMetrics(SUCCESS);
}

@Test
public void testOnVersionSwapAaPathDropsStaleGenerationIdVsm() {
VersionSwap firstA = newVsm(10L, CLIENT_REGION, DEST_A, CURRENT_VERSION, FUTURE_VERSION);
VersionSwap stale = newVsm(9L, CLIENT_REGION, DEST_B, CURRENT_VERSION, FUTURE_VERSION);

currentTransformer.onVersionSwap(firstA, CURRENT_VERSION, FUTURE_VERSION, 0);
// Stale generation arrives mid-swap — must be ignored
currentTransformer.onVersionSwap(stale, CURRENT_VERSION, FUTURE_VERSION, 0);
// Only the original region was accumulated; partition not yet complete
verify(currentInternal, never()).pausePartitionConsumption(0);
}

@Test
public void testOnVersionSwapAaPathDropsStaleRollbackVsm() {
Map<Integer, Integer> partitionToVersionToServe = consumer.getPartitionToVersionToServe();
// Already serving v6 — rollback re-emitting v3->v4 must be ignored
partitionToVersionToServe.put(0, 6);
partitionToVersionToServe.put(1, 6);

VersionSwap rollback = newVsm(50L, CLIENT_REGION, DEST_A, 3, 4);
currentTransformer.onVersionSwap(rollback, 3, 4, 0);
verify(currentInternal, never()).pausePartitionConsumption(0);
}

@Test
public void testOnVersionSwapAaPathFullSwapEmitsSingleSuccessAndFlipsAllPartitions() {
VersionSwap fromA = newVsm(20L, CLIENT_REGION, DEST_A, CURRENT_VERSION, FUTURE_VERSION);
VersionSwap fromB = newVsm(20L, CLIENT_REGION, DEST_B, CURRENT_VERSION, FUTURE_VERSION);

for (int p = 0; p < TOTAL_REGIONS; p++) {
currentTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, p);
currentTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, p);
futureTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, p);
futureTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, p);
}

Map<Integer, Integer> partitionToVersionToServe = consumer.getPartitionToVersionToServe();
for (int p = 0; p < TOTAL_REGIONS; p++) {
assertNotNull(partitionToVersionToServe.get(p));
org.testng.Assert.assertEquals(partitionToVersionToServe.get(p).intValue(), FUTURE_VERSION);
}
// Single SUCCESS metric for the entire swap
verify(stats, times(1)).emitVersionSwapCountMetrics(SUCCESS);
verify(stats, never()).emitVersionSwapCountMetrics(FAIL);
// Future side resumed for every partition
for (int p = 0; p < TOTAL_REGIONS; p++) {
verify(futureInternal, atLeastOnce()).resumePartitionConsumption(p);
}
}

@Test
public void testOnVersionSwapAaPathFailureSurfacesViaPoll() {
// Wrap partitionToVersionToServe in a map that throws on the second put — simulates an
// unexpected failure mid-commit. The coordinator catches via failSwap and stashes the exception
// so the next poll() throws.
java.util.Map<Integer, Integer> throwingMap = new java.util.concurrent.ConcurrentHashMap<Integer, Integer>() {
@Override
public Integer put(Integer key, Integer value) {
throw new IllegalStateException("induced failure during commit");
}

@Override
public java.util.Collection<Integer> values() {
return java.util.Collections.singletonList(CURRENT_VERSION);
}
};

RecordTransformerVersionSwapCoordinator failingCoordinator = new RecordTransformerVersionSwapCoordinator(
STORE,
CLIENT_REGION,
TOTAL_REGIONS,
changelogClientConfig.getVersionSwapTimeoutInMs(),
stats,
throwingMap,
consumer.getSubscribedPartitions(),
consumer.getVersionSwapThreadException()::set);
consumer.setVersionSwapCoordinator(failingCoordinator);

VersionSwap fromA = newVsm(30L, CLIENT_REGION, DEST_A, CURRENT_VERSION, FUTURE_VERSION);
VersionSwap fromB = newVsm(30L, CLIENT_REGION, DEST_B, CURRENT_VERSION, FUTURE_VERSION);

currentTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, 0);
currentTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, 0);
currentTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, 1);
currentTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, 1);
futureTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, 0);
futureTransformer.onVersionSwap(fromA, CURRENT_VERSION, FUTURE_VERSION, 1);
futureTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, 1);
// Last call completes the barrier and triggers commitSwap which throws via the throwing map
assertThrows(Exception.class, () -> futureTransformer.onVersionSwap(fromB, CURRENT_VERSION, FUTURE_VERSION, 0));

verify(stats, times(1)).emitVersionSwapCountMetrics(FAIL);
// Next poll() observes the stashed exception
assertThrows(VeniceException.class, () -> consumer.poll(1L));
Comment on lines +428 to +432
private void flipServingVersion() {
for (int partition: assignedPartitionsSnapshot) {
partitionToVersionToServe.put(partition, activeNewVersion);
}
}
Comment on lines +502 to +504
throw new VeniceException(
"Version Swap failed for store: " + storeName + " due to exception:",
versionSwapException);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants