Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,9 @@ subprojects {
// when actually using the new protocol. Example to pin KME to v12 when introducing v13:
// project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v12', PathValidation.DIRECTORY)
def versionOverrides = [
// StoreMetaValue v44 and AdminOperation v99 stage targetRegionPromoted, storageMode, and externalStorageReadMode.
// Pinned to the current active versions until the corresponding Java wiring lands in follow-up PRs.
project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v43', PathValidation.DIRECTORY),
// AdminOperation v99 stages targetRegionPromoted, storageMode, and externalStorageReadMode admin setters.
// Pinned to v98 until the admin-tool / controller wiring lands in a follow-up PR. StoreMetaValue v44 is
// active so the storageMode field can be read from StoreVersion by the VPJ dual-write gating predicate.
project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v98', PathValidation.DIRECTORY)
]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.linkedin.venice.vpj;

import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS;

import com.linkedin.venice.meta.StorageMode;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.VeniceProperties;


/**
* Helpers for the VPJ external-storage dual-write path. Currently exposes only the gating predicate that
* answers "should VPJ dual-write this version's records to external storage in parallel with Kafka produce?"
* The actual writer implementation and wiring land in a follow-up PR.
*/
public final class ExternalStorageWriteUtils {
private ExternalStorageWriteUtils() {
}

/**
* Returns true iff both halves of the dual-write gate are set:
* <ul>
* <li>VPJ-side: {@link VenicePushJobConstants#PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS} is configured to a
* non-empty class name (the impl is shipped on the executor classpath separately).</li>
* <li>Store-version-side: the target {@code Version}'s {@link Version#getStorageMode()} is
* {@link StorageMode#DUAL_WRITE}.</li>
* </ul>
* If either half is missing, dual-write stays off and VPJ behaves as today (Kafka-only produce).
*/
public static boolean isDualWriteToExternalStorageFromVpjEnabled(VeniceProperties jobProps, Version version) {
if (jobProps == null || version == null) {
return false;
}
String writerClass = jobProps.getString(PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS, "");
if (writerClass.isEmpty()) {
return false;
}
return version.getStorageMode() == StorageMode.DUAL_WRITE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,14 @@ private VenicePushJobConstants() {
*/
public static final String DATA_WRITER_COMPUTE_JOB_CLASS = "data.writer.compute.job.class";

/**
* Fully-qualified class name of the external storage writer used by the VPJ dual-write path. The class must
* implement the SPI defined by the dual-write writer interface (added in a follow-up PR). Presence of a
* non-empty value here is the VPJ-side half of the dual-write gating predicate; the store-version-side half
* is {@code Version.getStorageMode() == StorageMode.DUAL_WRITE}.
*/
public static final String PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS = "push.job.external.storage.writer.class";

public static final String PUSH_TO_SEPARATE_REALTIME_TOPIC = "push.to.separate.realtime.topic";
public static final String STORE_SEPARATE_REALTIME_TOPIC_ENABLED = "store.separate.realtime.topic.enabled";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.linkedin.venice.vpj;

import static com.linkedin.venice.vpj.ExternalStorageWriteUtils.isDualWriteToExternalStorageFromVpjEnabled;
import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.linkedin.venice.meta.StorageMode;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Properties;
import org.testng.annotations.Test;


public class ExternalStorageWriteUtilsTest {
private static final String STORE = "test_store";
private static final String IMPL_CLASS = "com.example.LiveExternalWriter";

private static VeniceProperties propsWithClass(String value) {
Properties props = new Properties();
if (value != null) {
props.setProperty(PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS, value);
}
return new VeniceProperties(props);
}

private static Version versionWithMode(StorageMode mode) {
Version version = new VersionImpl(STORE, 1, "push-id");
version.setStorageMode(mode);
return version;
}

@Test
public void bothHalvesSetReturnsTrue() {
assertTrue(
isDualWriteToExternalStorageFromVpjEnabled(
propsWithClass(IMPL_CLASS),
versionWithMode(StorageMode.DUAL_WRITE)));
}

@Test
public void writerClassMissingReturnsFalse() {
assertFalse(
isDualWriteToExternalStorageFromVpjEnabled(propsWithClass(null), versionWithMode(StorageMode.DUAL_WRITE)));
}

@Test
public void writerClassEmptyReturnsFalse() {
assertFalse(
isDualWriteToExternalStorageFromVpjEnabled(propsWithClass(""), versionWithMode(StorageMode.DUAL_WRITE)));
}

@Test
public void storageModeInternalReturnsFalse() {
assertFalse(
isDualWriteToExternalStorageFromVpjEnabled(propsWithClass(IMPL_CLASS), versionWithMode(StorageMode.INTERNAL)));
}

@Test
public void storageModeExternalReturnsFalse() {
assertFalse(
isDualWriteToExternalStorageFromVpjEnabled(propsWithClass(IMPL_CLASS), versionWithMode(StorageMode.EXTERNAL)));
}

@Test
public void defaultVersionStorageModeIsInternal() {
Version version = new VersionImpl(STORE, 1, "push-id");
assertFalse(isDualWriteToExternalStorageFromVpjEnabled(propsWithClass(IMPL_CLASS), version));
}

@Test
public void nullVersionReturnsFalse() {
assertFalse(isDualWriteToExternalStorageFromVpjEnabled(propsWithClass(IMPL_CLASS), null));
}

@Test
public void nullPropsReturnsFalse() {
assertFalse(isDualWriteToExternalStorageFromVpjEnabled(null, versionWithMode(StorageMode.DUAL_WRITE)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,16 @@ public void setCompressionStrategy(CompressionStrategy compressionStrategy) {
throw new UnsupportedOperationException();
}

@Override
public StorageMode getStorageMode() {
return this.delegate.getStorageMode();
}

@Override
public void setStorageMode(StorageMode storageMode) {
throw new UnsupportedOperationException();
}

@Override
public boolean isNativeReplicationEnabled() {
return this.delegate.isNativeReplicationEnabled();
Expand Down Expand Up @@ -1999,6 +2009,7 @@ private static StoreVersion convertVersion(Version version) {
storeVersion.setKeyUrnFields(version.getKeyUrnFields().stream().map(String::toString).collect(Collectors.toList()));
storeVersion.setRepushTtlSeconds(version.getRepushTtlSeconds());
storeVersion.setPreviousCurrentVersion(version.getPreviousCurrentVersion());
storeVersion.setStorageMode(version.getStorageMode().getValue());
// Set fields to default values - fields exist in schema but not yet exposed via Version interface
storeVersion.setRollbackTrigger("NOT_ROLLED_BACK");
storeVersion.setTransientRecordCacheEnabled(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.linkedin.venice.meta;

import com.linkedin.venice.utils.EnumUtils;
import com.linkedin.venice.utils.VeniceEnumValue;
import java.util.List;


/**
* Storage mode for a Venice store version. Selects where version data is persisted in addition
* to (or instead of) Venice local storage. The value is set per-version on
* {@code StoreVersion.storageMode}; the controller copies it from the store-level
* {@code UpdateStore} admin op when a new version is created.
*/
public enum StorageMode implements VeniceEnumValue {
/** Default. Venice-only — today's behavior. */
INTERNAL(0),

/**
* Data is written to both Venice local storage and the configured external storage. The specific
* dual-write implementation (leader-consumer pipeline vs Venice Push Job) is selected by separate
* server/VPJ configuration, not by this enum.
*/
DUAL_WRITE(1),

/**
* External-storage-only. Venice's data partition becomes NoOp; metadata partitions are still
* persisted locally for checkpointing.
*/
EXTERNAL(2);

private final int value;

private static final List<StorageMode> TYPES = EnumUtils.getEnumValuesList(StorageMode.class);

StorageMode(int value) {
this.value = value;
}

@Override
public int getValue() {
return value;
}

public static StorageMode valueOf(int value) {
return EnumUtils.valueOf(TYPES, value, StorageMode.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ public VeniceMetricsDimensions getDimensionName() {

void setCompressionStrategy(CompressionStrategy compressionStrategy);

StorageMode getStorageMode();

void setStorageMode(StorageMode storageMode);

default boolean isLeaderFollowerModelEnabled() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ public void setCompressionStrategy(CompressionStrategy compressionStrategy) {
this.storeVersion.compressionStrategy = compressionStrategy.getValue();
}

@Override
public StorageMode getStorageMode() {
return StorageMode.valueOf(this.storeVersion.storageMode);
}

@Override
public void setStorageMode(StorageMode storageMode) {
this.storeVersion.storageMode = storageMode.getValue();
}

@Override
public boolean isNativeReplicationEnabled() {
return this.storeVersion.nativeReplicationEnabled;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.linkedin.venice.meta;

import com.linkedin.venice.utils.CollectionUtils;
import com.linkedin.venice.utils.VeniceEnumValueTest;
import java.util.Map;


public class StorageModeTest extends VeniceEnumValueTest<StorageMode> {
public StorageModeTest() {
super(StorageMode.class);
}

@Override
protected Map<Integer, StorageMode> expectedMapping() {
return CollectionUtils.<Integer, StorageMode>mapBuilder()
.put(0, StorageMode.INTERNAL)
.put(1, StorageMode.DUAL_WRITE)
.put(2, StorageMode.EXTERNAL)
.build();
}
}
Loading