diff --git a/build.gradle b/build.gradle index d07f7bdf53c..ff0f46d132f 100644 --- a/build.gradle +++ b/build.gradle @@ -329,9 +329,9 @@ subprojects { // when actually using the new protocol. Example to pin KME to v12 when introducing v13: // project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v12', PathValidation.DIRECTORY) def versionOverrides = [ - // StoreMetaValue v44 and AdminOperation v99 stage targetRegionPromoted, storageMode, and externalStorageReadMode. - // Pinned to the current active versions until the corresponding Java wiring lands in follow-up PRs. - project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v43', PathValidation.DIRECTORY), + // AdminOperation v99 stages targetRegionPromoted, storageMode, and externalStorageReadMode admin setters. + // Pinned to v98 until the admin-tool / controller wiring lands in a follow-up PR. StoreMetaValue v44 is + // active so the storageMode field can be read from StoreVersion by the VPJ dual-write gating predicate. project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v98', PathValidation.DIRECTORY) ] diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/ExternalStorageWriteUtils.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/ExternalStorageWriteUtils.java new file mode 100644 index 00000000000..4ec25f761d6 --- /dev/null +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/ExternalStorageWriteUtils.java @@ -0,0 +1,39 @@ +package com.linkedin.venice.vpj; + +import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS; + +import com.linkedin.venice.meta.StorageMode; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.utils.VeniceProperties; + + +/** + * Helpers for the VPJ external-storage dual-write path. Currently exposes only the gating predicate that + * answers "should VPJ dual-write this version's records to external storage in parallel with Kafka produce?" + * The actual writer implementation and wiring land in a follow-up PR. + */ +public final class ExternalStorageWriteUtils { + private ExternalStorageWriteUtils() { + } + + /** + * Returns true iff both halves of the dual-write gate are set: + * + * If either half is missing, dual-write stays off and VPJ behaves as today (Kafka-only produce). + */ + public static boolean isDualWriteToExternalStorageFromVpjEnabled(VeniceProperties jobProps, Version version) { + if (jobProps == null || version == null) { + return false; + } + String writerClass = jobProps.getString(PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS, ""); + if (writerClass.isEmpty()) { + return false; + } + return version.getStorageMode() == StorageMode.DUAL_WRITE; + } +} diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java index 6d7aef852bc..200695c8729 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java @@ -494,6 +494,14 @@ private VenicePushJobConstants() { */ public static final String DATA_WRITER_COMPUTE_JOB_CLASS = "data.writer.compute.job.class"; + /** + * Fully-qualified class name of the external storage writer used by the VPJ dual-write path. The class must + * implement the SPI defined by the dual-write writer interface (added in a follow-up PR). Presence of a + * non-empty value here is the VPJ-side half of the dual-write gating predicate; the store-version-side half + * is {@code Version.getStorageMode() == StorageMode.DUAL_WRITE}. + */ + public static final String PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS = "push.job.external.storage.writer.class"; + public static final String PUSH_TO_SEPARATE_REALTIME_TOPIC = "push.to.separate.realtime.topic"; public static final String STORE_SEPARATE_REALTIME_TOPIC_ENABLED = "store.separate.realtime.topic.enabled"; diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/vpj/ExternalStorageWriteUtilsTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/vpj/ExternalStorageWriteUtilsTest.java new file mode 100644 index 00000000000..f072ef4d3c0 --- /dev/null +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/vpj/ExternalStorageWriteUtilsTest.java @@ -0,0 +1,81 @@ +package com.linkedin.venice.vpj; + +import static com.linkedin.venice.vpj.ExternalStorageWriteUtils.isDualWriteToExternalStorageFromVpjEnabled; +import static com.linkedin.venice.vpj.VenicePushJobConstants.PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import com.linkedin.venice.meta.StorageMode; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.meta.VersionImpl; +import com.linkedin.venice.utils.VeniceProperties; +import java.util.Properties; +import org.testng.annotations.Test; + + +public class ExternalStorageWriteUtilsTest { + private static final String STORE = "test_store"; + private static final String IMPL_CLASS = "com.example.LiveExternalWriter"; + + private static VeniceProperties propsWithClass(String value) { + Properties props = new Properties(); + if (value != null) { + props.setProperty(PUSH_JOB_EXTERNAL_STORAGE_WRITER_CLASS, value); + } + return new VeniceProperties(props); + } + + private static Version versionWithMode(StorageMode mode) { + Version version = new VersionImpl(STORE, 1, "push-id"); + version.setStorageMode(mode); + return version; + } + + @Test + public void bothHalvesSetReturnsTrue() { + assertTrue( + isDualWriteToExternalStorageFromVpjEnabled( + propsWithClass(IMPL_CLASS), + versionWithMode(StorageMode.DUAL_WRITE))); + } + + @Test + public void writerClassMissingReturnsFalse() { + assertFalse( + isDualWriteToExternalStorageFromVpjEnabled(propsWithClass(null), versionWithMode(StorageMode.DUAL_WRITE))); + } + + @Test + public void writerClassEmptyReturnsFalse() { + assertFalse( + isDualWriteToExternalStorageFromVpjEnabled(propsWithClass(""), versionWithMode(StorageMode.DUAL_WRITE))); + } + + @Test + public void storageModeInternalReturnsFalse() { + assertFalse( + isDualWriteToExternalStorageFromVpjEnabled(propsWithClass(IMPL_CLASS), versionWithMode(StorageMode.INTERNAL))); + } + + @Test + public void storageModeExternalReturnsFalse() { + assertFalse( + isDualWriteToExternalStorageFromVpjEnabled(propsWithClass(IMPL_CLASS), versionWithMode(StorageMode.EXTERNAL))); + } + + @Test + public void defaultVersionStorageModeIsInternal() { + Version version = new VersionImpl(STORE, 1, "push-id"); + assertFalse(isDualWriteToExternalStorageFromVpjEnabled(propsWithClass(IMPL_CLASS), version)); + } + + @Test + public void nullVersionReturnsFalse() { + assertFalse(isDualWriteToExternalStorageFromVpjEnabled(propsWithClass(IMPL_CLASS), null)); + } + + @Test + public void nullPropsReturnsFalse() { + assertFalse(isDualWriteToExternalStorageFromVpjEnabled(null, versionWithMode(StorageMode.DUAL_WRITE))); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java index 48de5511a5a..026916ad8dc 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java @@ -379,6 +379,16 @@ public void setCompressionStrategy(CompressionStrategy compressionStrategy) { throw new UnsupportedOperationException(); } + @Override + public StorageMode getStorageMode() { + return this.delegate.getStorageMode(); + } + + @Override + public void setStorageMode(StorageMode storageMode) { + throw new UnsupportedOperationException(); + } + @Override public boolean isNativeReplicationEnabled() { return this.delegate.isNativeReplicationEnabled(); @@ -1999,6 +2009,7 @@ private static StoreVersion convertVersion(Version version) { storeVersion.setKeyUrnFields(version.getKeyUrnFields().stream().map(String::toString).collect(Collectors.toList())); storeVersion.setRepushTtlSeconds(version.getRepushTtlSeconds()); storeVersion.setPreviousCurrentVersion(version.getPreviousCurrentVersion()); + storeVersion.setStorageMode(version.getStorageMode().getValue()); // Set fields to default values - fields exist in schema but not yet exposed via Version interface storeVersion.setRollbackTrigger("NOT_ROLLED_BACK"); storeVersion.setTransientRecordCacheEnabled(false); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/StorageMode.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/StorageMode.java new file mode 100644 index 00000000000..c6328a93fa5 --- /dev/null +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/StorageMode.java @@ -0,0 +1,47 @@ +package com.linkedin.venice.meta; + +import com.linkedin.venice.utils.EnumUtils; +import com.linkedin.venice.utils.VeniceEnumValue; +import java.util.List; + + +/** + * Storage mode for a Venice store version. Selects where version data is persisted in addition + * to (or instead of) Venice local storage. The value is set per-version on + * {@code StoreVersion.storageMode}; the controller copies it from the store-level + * {@code UpdateStore} admin op when a new version is created. + */ +public enum StorageMode implements VeniceEnumValue { + /** Default. Venice-only — today's behavior. */ + INTERNAL(0), + + /** + * Data is written to both Venice local storage and the configured external storage. The specific + * dual-write implementation (leader-consumer pipeline vs Venice Push Job) is selected by separate + * server/VPJ configuration, not by this enum. + */ + DUAL_WRITE(1), + + /** + * External-storage-only. Venice's data partition becomes NoOp; metadata partitions are still + * persisted locally for checkpointing. + */ + EXTERNAL(2); + + private final int value; + + private static final List TYPES = EnumUtils.getEnumValuesList(StorageMode.class); + + StorageMode(int value) { + this.value = value; + } + + @Override + public int getValue() { + return value; + } + + public static StorageMode valueOf(int value) { + return EnumUtils.valueOf(TYPES, value, StorageMode.class); + } +} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java index 153389d1e15..e3633f6cf14 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java @@ -159,6 +159,10 @@ public VeniceMetricsDimensions getDimensionName() { void setCompressionStrategy(CompressionStrategy compressionStrategy); + StorageMode getStorageMode(); + + void setStorageMode(StorageMode storageMode); + default boolean isLeaderFollowerModelEnabled() { return true; } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java index 3838c85d592..b2f8dca6a0f 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java @@ -130,6 +130,16 @@ public void setCompressionStrategy(CompressionStrategy compressionStrategy) { this.storeVersion.compressionStrategy = compressionStrategy.getValue(); } + @Override + public StorageMode getStorageMode() { + return StorageMode.valueOf(this.storeVersion.storageMode); + } + + @Override + public void setStorageMode(StorageMode storageMode) { + this.storeVersion.storageMode = storageMode.getValue(); + } + @Override public boolean isNativeReplicationEnabled() { return this.storeVersion.nativeReplicationEnabled; diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/meta/StorageModeTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/meta/StorageModeTest.java new file mode 100644 index 00000000000..572d1584474 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/meta/StorageModeTest.java @@ -0,0 +1,21 @@ +package com.linkedin.venice.meta; + +import com.linkedin.venice.utils.CollectionUtils; +import com.linkedin.venice.utils.VeniceEnumValueTest; +import java.util.Map; + + +public class StorageModeTest extends VeniceEnumValueTest { + public StorageModeTest() { + super(StorageMode.class); + } + + @Override + protected Map expectedMapping() { + return CollectionUtils.mapBuilder() + .put(0, StorageMode.INTERNAL) + .put(1, StorageMode.DUAL_WRITE) + .put(2, StorageMode.EXTERNAL) + .build(); + } +}