[fast-client] Store-config-change listener on StoreMetadata#2819
Open
ymuppala wants to merge 4 commits into
Open
[fast-client] Store-config-change listener on StoreMetadata#2819ymuppala wants to merge 4 commits into
ymuppala wants to merge 4 commits into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR enhances Fast Client metadata observability by adding a store-level config change hook (snapshot + diff + listener dispatch) so consumers can react to operator-driven store config updates without polling. As part of the stacked series, it also includes the version-switch listener plumbing and the prerequisite external-storage metadata surface (enums + Store/Version accessors + transient backing fields) needed by the snapshot.
Changes:
- Add
StoreConfigSnapshot+StoreConfigChangeListener, plus registration APIs onStoreMetadataand dispatch logic inAbstractStoreMetadata. - Wire snapshot materialization/diffing into
RequestBasedMetadata.updateCache()and add unit tests for initial-fire and suppression-on-no-change. - Introduce external-storage enums and Store/Version accessors (transient fields until Avro pins move), with corresponding unit tests.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-common/src/main/java/com/linkedin/venice/meta/Store.java | Adds store-level externalStorageReadMode accessor/mutator. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/ZKStore.java | Stores externalStorageReadMode as a transient field (schema still pinned), copies it in copy ctor. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java | Delegates getter to shared store; setter remains unsupported. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/StoreInfo.java | Mirrors externalStorageReadMode in JSON-facing StoreInfo. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/Version.java | Adds per-version external-storage fields (mode + external table identity/status). |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/VersionImpl.java | Implements new Version fields as transient members; copies them in cloneVersion(). |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlyStore.java | Read-only delegation for new Store/Version getters; setters throw UOE. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/ExternalStorageReadMode.java | New enum + valueOf(int) mapping for store-level read routing. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/StorageMode.java | New enum + valueOf(int) mapping for per-version storage mode. |
| internal/venice-common/src/main/java/com/linkedin/venice/meta/ExternalTableStatus.java | New enum + valueOf(int) mapping for external table lifecycle state. |
| internal/venice-common/src/test/java/com/linkedin/venice/meta/TestZKStore.java | Tests default/round-trip/clone preservation for externalStorageReadMode. |
| internal/venice-common/src/test/java/com/linkedin/venice/meta/TestStoreInfo.java | Tests StoreInfo defaulting/null coercion for externalStorageReadMode. |
| internal/venice-common/src/test/java/com/linkedin/venice/meta/TestVersion.java | Tests default/round-trip/clone preservation for new Version external-storage fields. |
| internal/venice-common/src/test/java/com/linkedin/venice/meta/ExternalStorageReadModeTest.java | Validates enum int mapping contract via VeniceEnumValueTest. |
| internal/venice-common/src/test/java/com/linkedin/venice/meta/StorageModeTest.java | Validates enum int mapping contract via VeniceEnumValueTest. |
| internal/venice-common/src/test/java/com/linkedin/venice/meta/ExternalTableStatusTest.java | Validates enum int mapping contract via VeniceEnumValueTest. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/StoreMetadata.java | Adds default no-op registration APIs for version-switch and store-config-change listeners. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/AbstractStoreMetadata.java | Owns listener lists and provides protected fireVersionSwitch / fireStoreConfigChange. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadata.java | Tracks last snapshot and fires store-config-change after successful cache refresh. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/StoreVersionSwitchListener.java | New functional interface for current-version transitions. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/StoreConfigChangeListener.java | New functional interface for snapshot transitions. |
| clients/venice-client/src/main/java/com/linkedin/venice/fastclient/meta/StoreConfigSnapshot.java | New immutable value object for store-level config tracked by Fast Client. |
| clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/AbstractStoreMetadataTest.java | Exercises listener registration, exception isolation, suppression, unregister, null handling. |
| clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/RequestBasedMetadataTest.java | Wiring tests for initial-fire and suppression-on-unchanged refresh for both listener types. |
| clients/venice-client/src/test/java/com/linkedin/venice/fastclient/meta/StoreConfigSnapshotTest.java | Equality/hashCode/default-coercion/accessor tests for snapshot. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| currentVersion.set(fetchedCurrentVersion); | ||
| clusterStats.updateCurrentVersion(fetchedCurrentVersion); | ||
| fetchedCurrentVersionPartitionResourceInCompletionRetries.set(0); | ||
| fireVersionSwitch(previousVersion, fetchedCurrentVersion); |
Comment on lines
+538
to
+542
| // Diff and fire the store-config-change callback. Done last so listeners see a fully committed cache. | ||
| StoreConfigSnapshot newSnapshot = buildStoreConfigSnapshot(); | ||
| StoreConfigSnapshot previousSnapshot = lastStoreConfigSnapshot; | ||
| lastStoreConfigSnapshot = newSnapshot; | ||
| fireStoreConfigChange(previousSnapshot, newSnapshot); |
Comment on lines
+12
to
+13
| * <p>Version-granularity changes — current version flips, partition / routing updates — are delivered by | ||
| * {@link StoreVersionSwitchListener}, not this callback. |
Comment on lines
+12
to
+13
| * <p>Only fields that change at store-granularity (not per-version) belong here. Per-version state — current version, | ||
| * partition count, replicas, compression dictionary — is delivered via {@link StoreVersionSwitchListener} instead. |
5882e3c to
e368a8f
Compare
e368a8f to
dfa62f2
Compare
Comment on lines
+538
to
+542
| // Diff and fire the store-config-change callback. Done last so listeners see a fully committed cache. | ||
| StoreConfigSnapshot newSnapshot = buildStoreConfigSnapshot(); | ||
| StoreConfigSnapshot previousSnapshot = lastStoreConfigSnapshot; | ||
| lastStoreConfigSnapshot = newSnapshot; | ||
| fireStoreConfigChange(previousSnapshot, newSnapshot); |
| public final class StoreConfigSnapshot { | ||
| private final int batchGetLimit; | ||
| // TODO(external-storage): populate from metadata response once StoreMetaValue / MetadataResponseRecord are | ||
| // unpinned past v43 / v3 to include externalStorageReadMode on the wire. |
Comment on lines
+193
to
+195
| * Concrete subclass of {@link AbstractStoreMetadata} used purely to drive the listener-plumbing tests. All | ||
| * methods unrelated to version-switch notification are stubbed to throw, which guarantees that any future | ||
| * accidental dependency on them in a listener test would fail loudly rather than silently. |
dfa62f2 to
87e62e2
Compare
87e62e2 to
4aa18fd
Compare
Comment on lines
+538
to
+542
| // Diff and fire the store-config-change callback. Done last so listeners see a fully committed cache. | ||
| StoreConfigSnapshot newSnapshot = buildStoreConfigSnapshot(); | ||
| StoreConfigSnapshot previousSnapshot = lastStoreConfigSnapshot; | ||
| lastStoreConfigSnapshot = newSnapshot; | ||
| fireStoreConfigChange(previousSnapshot, newSnapshot); |
Comment on lines
+20
to
+21
| // TODO(external-storage): populate from metadata response once StoreMetaValue / MetadataResponseRecord are | ||
| // unpinned past v43 / v3 to include externalStorageReadMode on the wire. |
Comment on lines
+197
to
+203
| private static final class TestStoreMetadata extends AbstractStoreMetadata { | ||
| private final String storeName; | ||
|
|
||
| TestStoreMetadata(String storeName) { | ||
| super(buildClientConfig(storeName)); | ||
| this.storeName = storeName; | ||
| } |
| }, | ||
| {"name": "accessControlled", "type": "boolean", "default": true, "doc": "Store-level ACL switch. When disabled, Venice Router should accept every request."}, | ||
| {"name": "compressionStrategy", "type": "int", "default": 0, "doc": "Strategy used to compress/decompress Record's value, and default is 'NO_OP'"}, | ||
| {"name": "clientDecompressionEnabled", "type": "boolean", "default": true, "doc": "le/Disable client-side record decompression (default: true)"}, |
Comment on lines
96
to
103
| storeInfo.setLatestVersionPromoteToCurrentTimestamp(store.getLatestVersionPromoteToCurrentTimestamp()); | ||
| storeInfo.setKeyUrnCompressionEnabled(store.isKeyUrnCompressionEnabled()); | ||
| storeInfo.setKeyUrnFields(store.getKeyUrnFields()); | ||
| storeInfo.setFlinkVeniceViewsEnabled(store.isFlinkVeniceViewsEnabled()); | ||
| storeInfo.setPreviousCurrentVersion(store.getPreviousCurrentVersion()); | ||
| storeInfo.setSeparateRealTimeTopicEnabled(store.isSeparateRealTimeTopicEnabled()); | ||
| storeInfo.setExternalStorageReadMode(store.getExternalStorageReadMode()); | ||
| return storeInfo; |
4aa18fd to
a20123b
Compare
a20123b to
2d4fe0c
Compare
| public final class StoreConfigSnapshot { | ||
| private final int batchGetLimit; | ||
| // TODO(external-storage): populate from metadata response once StoreMetaValue / MetadataResponseRecord are | ||
| // unpinned past v43 / v3 to include externalStorageReadMode on the wire. |
Comment on lines
+794
to
+804
| /** | ||
| * Materialize the store-level config snapshot from the local cache. Called once at the end of every successful | ||
| * {@link #updateCache(boolean)}; the result is diffed against {@link #lastStoreConfigSnapshot} to decide whether | ||
| * to fire {@link StoreConfigChangeListener}s. | ||
| * | ||
| * <p>External-storage fields default to safe values until {@link MetadataResponseRecord} is extended to carry them | ||
| * over the wire. The TODO on {@link StoreConfigSnapshot} tracks the gap. | ||
| */ | ||
| private StoreConfigSnapshot buildStoreConfigSnapshot() { | ||
| return new StoreConfigSnapshot(batchGetLimit.get(), com.linkedin.venice.meta.ExternalStorageReadMode.VENICE_ONLY); | ||
| } |
2d4fe0c to
abe7b8f
Compare
abe7b8f to
e909fed
Compare
Comment on lines
+742
to
+746
| @Override | ||
| public StorageMode getStorageMode() { | ||
| return this.delegate.getStorageMode(); | ||
| } | ||
|
|
Comment on lines
+1363
to
+1367
| @Override | ||
| public ExternalStorageReadMode getExternalStorageReadMode() { | ||
| return this.delegate.getExternalStorageReadMode(); | ||
| } | ||
|
|
Comment on lines
+20
to
+21
| // TODO(external-storage): populate from metadata response once StoreMetaValue / MetadataResponseRecord are | ||
| // unpinned past v43 / v3 to include externalStorageReadMode on the wire. |
e909fed to
3ec701b
Compare
3ec701b to
cdcb822
Compare
Comment on lines
+20
to
+21
| // TODO(external-storage): populate from metadata response once StoreMetaValue / MetadataResponseRecord are | ||
| // unpinned past v43 / v3 to include externalStorageReadMode on the wire. |
cdcb822 to
1c2ef5c
Compare
Comment on lines
+20
to
+21
| // TODO(external-storage): populate from metadata response once StoreMetaValue / MetadataResponseRecord are | ||
| // unpinned past v43 / v3 to include externalStorageReadMode on the wire. |
Comment on lines
+77
to
+79
| * snapshot (e.g. operator-driven {@link com.linkedin.venice.meta.ExternalStorageReadMode} flip). The callback is invoked | ||
| * after the new snapshot has been committed to the local cache. See {@link StoreConfigChangeListener} for | ||
| * threading and exception semantics. |
Comment on lines
+538
to
+542
| // Diff and fire the store-config-change callback. Done last so listeners see a fully committed cache. | ||
| StoreConfigSnapshot newSnapshot = buildStoreConfigSnapshot(); | ||
| StoreConfigSnapshot previousSnapshot = lastStoreConfigSnapshot; | ||
| lastStoreConfigSnapshot = newSnapshot; | ||
| fireStoreConfigChange(previousSnapshot, newSnapshot); |
dcd1146 to
bf2bd71
Compare
bf2bd71 to
24dcc14
Compare
Comment on lines
+20
to
+21
| // TODO(external-storage): populate from metadata response once StoreMetaValue / MetadataResponseRecord are | ||
| // unpinned past v43 / v3 to include externalStorageReadMode on the wire. |
24dcc14 to
0ccbcb2
Compare
0ccbcb2 to
90daebb
Compare
Comment on lines
+20
to
+21
| // TODO(external-storage): populate from metadata response once StoreMetaValue / MetadataResponseRecord are | ||
| // unpinned past v43 / v3 to include externalStorageReadMode on the wire. |
Comment on lines
+794
to
+804
| /** | ||
| * Materialize the store-level config snapshot from the local cache. Called once at the end of every successful | ||
| * {@link #updateCache(boolean)}; the result is diffed against {@link #lastStoreConfigSnapshot} to decide whether | ||
| * to fire {@link StoreConfigChangeListener}s. | ||
| * | ||
| * <p>External-storage fields default to safe values until {@link MetadataResponseRecord} is extended to carry them | ||
| * over the wire. The TODO on {@link StoreConfigSnapshot} tracks the gap. | ||
| */ | ||
| private StoreConfigSnapshot buildStoreConfigSnapshot() { | ||
| return new StoreConfigSnapshot(batchGetLimit.get(), com.linkedin.venice.meta.ExternalStorageReadMode.VENICE_ONLY); | ||
| } |
90daebb to
4b70a09
Compare
4b70a09 to
90daebb
Compare
Comment on lines
+20
to
+21
| // TODO(external-storage): populate from metadata response once StoreMetaValue / MetadataResponseRecord are | ||
| // unpinned past v43 / v3 to include externalStorageReadMode on the wire. |
Comment on lines
+15
to
+16
| * @param previousVersion the version that was current before this refresh, or {@code -1} for the first refresh | ||
| * after client start |
90daebb to
0473521
Compare
0473521 to
ee3fdf7
Compare
Comment on lines
+538
to
+542
| // Diff and fire the store-config-change callback. Done last so listeners see a fully committed cache. | ||
| StoreConfigSnapshot newSnapshot = buildStoreConfigSnapshot(); | ||
| StoreConfigSnapshot previousSnapshot = lastStoreConfigSnapshot; | ||
| lastStoreConfigSnapshot = newSnapshot; | ||
| fireStoreConfigChange(previousSnapshot, newSnapshot); |
Comment on lines
+20
to
+21
| // TODO(external-storage): populate from metadata response once StoreMetaValue / MetadataResponseRecord are | ||
| // unpinned past v43 / v3 to include externalStorageReadMode on the wire. |
| /** | ||
| * Store-level read routing for clients backed by both Venice local storage and a configured external storage | ||
| * system. Defaults to {@link ExternalStorageReadMode#VENICE_ONLY} (no external-storage involvement). Mirrors the | ||
| * {@code externalStorageReadMode} field staged on {@code StoreProperties} (StoreMetaValue v44) by PR #2814. |
| /** | ||
| * Per-version storage mode controlling where data is persisted relative to the configured external storage system. | ||
| * Defaults to {@link StorageMode#INTERNAL}. Mirrors the {@code storageMode} field staged on {@code StoreVersion} | ||
| * (StoreMetaValue v44) by PR #2814. |
Comment on lines
+10
to
+11
| * external storage system. Mirrors the {@code storageMode} field staged on {@code StoreVersion} (StoreMetaValue v44) | ||
| * by PR #2814. |
| /** | ||
| * Store-level read routing for clients backed by both Venice local storage and a configured external storage system. | ||
| * Applies to the store as a whole, not per-version. Mirrors the {@code externalStorageReadMode} field staged on | ||
| * {@code StoreProperties} (StoreMetaValue v44) by PR #2814. |
| currentVersion.set(fetchedCurrentVersion); | ||
| clusterStats.updateCurrentVersion(fetchedCurrentVersion); | ||
| fetchedCurrentVersionPartitionResourceInCompletionRetries.set(0); | ||
| fireVersionSwitch(previousVersion, fetchedCurrentVersion); |
ee3fdf7 to
da584a0
Compare
…dminOperation v99 + Java accessors
Predecessor PR (the schema PR in this stack) stages StoreMetaValue v44 and
AdminOperation v99 with these external-storage-related fields:
* StoreVersion: storageMode, externalDbName, externalTableName (per-version,
all newly-added in v44).
* StoreProperties: externalStorageReadMode (store-level, per-store).
* UpdateStore (admin op): storageMode, externalStorageReadMode,
externalDbName, externalTableName (mirrors for operator-override).
build.gradle's `versionOverrides` list still forces the Avro compiler to use
v43 / v98 instead of letting it pick the highest-numeric directory, so v44 /
v99 are not yet wire-real. Downstream consumers (Fast Client metadata
refresh, controller, server, VPJ producers) need typed Java accessors to
start integrating against.
Drop the `versionOverrides` entries so the Avro compiler picks the latest
schema directories (v44 / v99) naturally — that's exactly the
remove-the-override step the comment above `versionOverrides` describes
("remove the override in a follow-up PR when actually using the new
protocol"). Bump `AvroProtocolDefinition` constants in lockstep so the
in-process protocol version matches the compiled schema. Add Java accessors
that read/write directly from the now-Avro-backed `StoreProperties` /
`StoreVersion` records (no transient POJO mirroring), so updates round-trip
through ZK / metadata-system-store serialization.
- `build.gradle`: remove the `versionOverrides` entries for StoreMetaValue
and AdminOperation. The Avro generator now selects v44 / v99 by the
default max-numeric rule.
- `AvroProtocolDefinition`: bump constants
(`METADATA_SYSTEM_SCHEMA_STORE` 43 -> 44; `ADMIN_OPERATION` 98 -> 99)
so the in-process protocol version matches the compiled schema.
- New enums (`VeniceEnumValue`, `EnumUtils`-backed `valueOf`):
* `StorageMode { INTERNAL, DUAL_WRITE, EXTERNAL }` — mirrors
`StoreVersion.storageMode`.
* `ExternalStorageReadMode { VENICE_ONLY, DUAL_MODE_CONSISTENCY_CHECK,
DUAL_MODE_EARLY_RETURN, EXTERNAL_ONLY }` — mirrors
`StoreProperties.externalStorageReadMode`.
- Store-level Java accessor backed by `StoreProperties` Avro record:
* `Store.get/setExternalStorageReadMode` → `storeProperties.externalStorageReadMode`.
Implemented on `ZKStore` (int <-> enum). `ReadOnlyStore` delegates the
getter and throws UOE on the setter; `SystemStore` does the same via its
existing `throwUnsupportedOperationException` helper. `StoreInfo.fromStore`
mirrors the field for JSON.
- Version-level Java accessors backed by `StoreVersion` Avro record:
* `Version.get/setStorageMode` → `storeVersion.storageMode` (int <-> enum).
* `Version.get/setExternalDbName` → `storeVersion.externalDbName`
(null-coerces to "NOT_SPECIFIED", the schema default).
* `Version.get/setExternalTableName` → `storeVersion.externalTableName`
(null-coerces to "NOT_SPECIFIED", the schema default).
Implemented on `VersionImpl`; `cloneVersion` propagates all three through
the Avro-record copy. `ReadOnlyVersion` delegates getters and throws UOE
on setters.
- `AdminMessageType.UPDATE_STORE.getNewInstance()` initializes the two new
`UpdateStore` string fields (externalDbName, externalTableName) to "" at
construction so the Avro generic writer does not NPE on null values. The
Avro schema default for these fields on UpdateStore is "" (the "no-op for
this admin op" sentinel), which is only applied at deserialization time;
explicit initialization is required so existing controller code paths
that call `getNewInstance()` and then conditionally set fields can
serialize the result without populating every new field.
- [ ] Added new code behind a config. (no.)
- [ ] Introduced new log lines. (no.)
- [x] No race conditions: new fields are plain Avro-record fields on the
existing storeProperties / storeVersion containers, accessed under
the same synchronization invariants as the fields they sit alongside.
- [x] No new synchronization primitives required.
- [x] No blocking calls introduced.
- [x] No new collections; only enum singletons and a primitive-or-enum-or-String
field per type.
- [x] No new threading.
- [x] New unit tests added: `ExternalStorageReadModeTest`, `StorageModeTest`
(`VeniceEnumValueTest`-based int-mapping coverage); new test methods
in `TestStoreInfo`, `TestZKStore`, `TestVersion` for defaults /
round-trip / clone preservation / null coercion for storageMode,
externalStorageReadMode, externalDbName, externalTableName; plus
"persists through Avro data model" tests on `TestZKStore` and
`TestVersion` that read the value back through `dataModel()` to
guard against a regression to transient POJO mirroring.
- [x] Modified or extended existing tests: yes (`TestStoreInfo` /
`TestZKStore` / `TestVersion`).
- [x] Verified the controller-side createStore admin-op flow no longer
NPEs on the new UpdateStore fields (TestVeniceParentHelixAdmin
passes locally; previously failed with NPE on
`externalDbName.toString()` during AdminOperationSerializer write).
- [x] Verified backward compatibility: the v44 schema is forward-compatible
with v43 because the new fields have defaults; downgrading a process
that writes v44 records to one that reads v43 records works as long
as both sides treat unknown fields as ignorable, which Avro does for
record fields with defaults.
Activating the v44 / v99 schemas changes what the controller writes to ZK
and what controllers exchange via the admin topic. All Venice services
must be redeployed at this version before any caller starts setting
`storageMode` / `externalStorageReadMode` / `externalDbName` /
`externalTableName` to non-default values, otherwise a v43-only service
performing a read-modify-write could silently drop the new fields.
- [x] No (defaults preserve existing behavior; new fields are inert until
callers opt in).
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
## Problem Statement
Fast Client consumers that wrap the Venice client (e.g. shadow-clients
maintaining a parallel cache, version-aware routing layers) currently have
no way to be notified when the metadata refresh observes a new current
serving version. They have to poll `getCurrentStoreVersion()` from outside
the refresh loop, which is racy with respect to the rest of the cache
(replicas, partition count, schemas) being updated atomically inside
`RequestBasedMetadata.updateCache`.
## Solution
Add a registration-style callback that fires from inside the refresh loop
after the new current version has been committed to the local cache, so
listeners may safely read the rest of the cache and observe the new value.
- New `StoreVersionSwitchListener` functional interface:
`onVersionSwitch(int previousVersion, int newVersion)`. First refresh
after client start delivers `previousVersion = -1` as a sentinel.
- `StoreMetadata` gains `register/unregisterVersionSwitchListener` with
`default {}` no-op bodies so existing test fakes / mocks aren't forced
to implement them.
- `AbstractStoreMetadata` owns the listener list (`CopyOnWriteArrayList`
for rare-writer / frequent-reader access) and exposes a protected
`fireVersionSwitch(prev, next)` that catches per-listener `Throwable`
and logs at ERROR, so one buggy listener cannot break the refresh
thread or starve sibling listeners.
- `RequestBasedMetadata.updateCache` captures `currentVersion.get()`
immediately before `.set(fetchedCurrentVersion)`, then calls
`fireVersionSwitch(previousVersion, fetchedCurrentVersion)` after the
stat update. The fire happens only on the branch that actually accepts
a new current version (i.e. when `whetherToSwitchToFetchedCurrentVersion`
is true), so transitions that get deferred for partition-resource
readiness do not generate spurious callbacks.
### Code changes
- [ ] Added new code behind a config. (no)
- [ ] Introduced new log lines. (one ERROR-level log on listener exception in
`AbstractStoreMetadata.fireVersionSwitch`. Bounded by listener count, fires
only on bad listener — no rate limiting needed.)
### Concurrency-Specific Checks
- [x] No race conditions: listeners are stored in a `CopyOnWriteArrayList`; the
iterator in `fireVersionSwitch` is snapshot-stable across concurrent
register/unregister calls.
- [x] Synchronization: registration is intrinsically thread-safe via
`CopyOnWriteArrayList.addIfAbsent` / `remove`; fire walks the snapshot
iterator without holding any lock.
- [x] No blocking calls inside critical sections — `fireVersionSwitch` is
called from `RequestBasedMetadata.updateCache` which is `synchronized`,
but the listener iteration itself acquires no further locks. Listener
contracts (Javadoc) require short, non-blocking work; exceptions are
isolated per listener.
- [x] Thread-safe collection used (`CopyOnWriteArrayList`).
- [x] Validated proper exception handling — per-listener `try/catch (Throwable)`
ensures one bad listener cannot kill the refresh thread.
## How was this PR tested?
- [x] New unit tests added: `AbstractStoreMetadataTest` (8 cases: register+fire,
no-listeners-safe, multi-listener fan-out, exception isolation, unregister,
dedup on double-register, null-reject on register, null/unknown tolerance
on unregister).
- [x] Modified or extended existing tests: yes — `RequestBasedMetadataTest`
gains two wiring tests (listener fires on initial `start()` with sentinel
`-1`; no fire when a subsequent `updateCache` returns the same version).
- [x] Verified backward compatibility: yes — the two new `StoreMetadata` methods
are `default` no-ops, so existing implementations of `StoreMetadata`
compile and run unchanged.
## Does this PR introduce any user-facing or breaking changes?
- [x] No.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
## Problem Statement
Sibling to the version-switch listener: Fast Client consumers also need a
hook for store-level config changes (operator-driven flips to
`externalStorageReadMode`, future tweaks to `batchGetLimit`, etc.) so they
can react without polling. Today there's no observability into store-level
config transitions from the refresh loop.
## Solution
Introduce a snapshot-based change listener. The metadata refresh materializes
an immutable `StoreConfigSnapshot` at the end of every successful
`updateCache`, diffs it against the previous snapshot, and fires registered
listeners only when the snapshot has changed by value-equality.
- New `StoreConfigSnapshot`: immutable POJO carrying `batchGetLimit` and
`externalStorageReadMode` (introduced in [venice-common][protocol] Java
accessors for storageMode + externalStorageReadMode + external-table
identity, the prerequisite PR). Value-based `equals`/`hashCode`/`toString`.
`null` `externalStorageReadMode` coerced to `VENICE_ONLY` for defensive
defaulting.
- New `StoreConfigChangeListener` functional interface:
`onStoreConfigChange(previous, current)`. `previous == null` on the first
refresh after client start.
- `StoreMetadata` gains `register/unregisterStoreConfigChangeListener` with
`default {}` no-op bodies.
- `AbstractStoreMetadata` owns the listener list (`CopyOnWriteArrayList`)
and exposes a protected `fireStoreConfigChange(prev, current)` that
rejects `null` current, no-ops on `prev.equals(current)`, otherwise
dispatches with per-listener `Throwable` catch + log.
- `RequestBasedMetadata` gains a `volatile StoreConfigSnapshot
lastStoreConfigSnapshot` and a private `buildStoreConfigSnapshot()` that
materializes the snapshot from the current cache. `updateCache` builds
the new snapshot, swaps `lastStoreConfigSnapshot`, then calls
`fireStoreConfigChange` after the version-switch fire so listeners
observe a fully committed cache.
- `externalStorageReadMode` is currently hardcoded to `VENICE_ONLY` in
`buildStoreConfigSnapshot()` because `MetadataResponseRecord` /
`StoreMetaValue` are still pinned at v2 / v43 and don't carry the field
over the wire. The TODO on `StoreConfigSnapshot` tracks the gap; once
the wire format catches up, the snapshot will start reflecting real
values and the callback will start firing for read-mode flips.
### Code changes
- [ ] Added new code behind a config. (no)
- [ ] Introduced new log lines. (one ERROR-level log on listener exception in
`AbstractStoreMetadata.fireStoreConfigChange`. Bounded by listener count,
fires only on bad listener — no rate limiting needed.)
### Concurrency-Specific Checks
- [x] No race conditions: listeners in `CopyOnWriteArrayList`; the snapshot
diff uses `volatile lastStoreConfigSnapshot` read-then-write inside
`updateCache` which is already `synchronized`.
- [x] Snapshot is immutable — no shared mutable state escapes through the
callback.
- [x] No blocking calls inside critical sections; listener iteration acquires
no further locks.
- [x] Thread-safe collection used (`CopyOnWriteArrayList`).
- [x] Validated proper exception handling — per-listener `try/catch (Throwable)`.
## How was this PR tested?
- [x] New unit tests added: `StoreConfigSnapshotTest` (5 cases: equality
identity, equality breaks on each field, null coercion, accessors
round-trip). `AbstractStoreMetadataTest` extended with 6 cases for
store-config-change (initial null→snapshot fire, no-op suppression on
equal snapshots, listener exception isolation, unregister stops fires,
null-listener reject, null-current reject).
- [x] Modified or extended existing tests: yes — `RequestBasedMetadataTest`
gains two wiring tests (listener fires on initial `start()` with
`prev=null`; no fire on no-change refresh).
- [x] Verified backward compatibility: yes — the two new `StoreMetadata`
methods are `default` no-ops; existing implementations compile and run
unchanged. Snapshot is built only inside `RequestBasedMetadata` which
is the only known production implementation.
## Does this PR introduce any user-facing or breaking changes?
- [x] No.
Depends on the prior PR in this stack:
`[venice-common][protocol] Java accessors for storageMode +
externalStorageReadMode + external-table identity`
(introduces `ExternalStorageReadMode` referenced by `StoreConfigSnapshot`).
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
da584a0 to
139f5d9
Compare
Comment on lines
+20
to
+21
| // TODO(external-storage): populate from metadata response once StoreMetaValue / MetadataResponseRecord are | ||
| // unpinned past v43 / v3 to include externalStorageReadMode on the wire. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem Statement
Sibling to the version-switch listener (#2818): Fast Client consumers also need a hook for store-level config changes (operator-driven flips to
externalStorageReadMode, future tweaks tobatchGetLimit, etc.) so they can react without polling. Today there's no observability into store-level config transitions from the refresh loop.Solution
Introduce a snapshot-based change listener. The metadata refresh materializes an immutable
StoreConfigSnapshotat the end of every successfulupdateCache, diffs it against the previous snapshot, and fires registered listeners only when the snapshot has changed by value-equality.New
StoreConfigSnapshot: immutable POJO carryingbatchGetLimitandexternalStorageReadMode(introduced in the prerequisite PR [venice-common][controller][compact] Activate StoreMetaValue v44 / AdminOperation v99 + Java accessors #2817). Value-basedequals/hashCode/toString.nullexternalStorageReadModecoerced toVENICE_ONLYfor defensive defaulting.New
StoreConfigChangeListenerfunctional interface:onStoreConfigChange(previous, current).previous == nullon the first refresh after client start.StoreMetadatagainsregister/unregisterStoreConfigChangeListenerwithdefault {}no-op bodies.AbstractStoreMetadataowns the listener list (CopyOnWriteArrayList) and exposes a protectedfireStoreConfigChange(prev, current)that rejectsnullcurrent, no-ops onprev.equals(current), otherwise dispatches with per-listenerThrowablecatch + log.RequestBasedMetadatagains avolatile StoreConfigSnapshot lastStoreConfigSnapshotand a privatebuildStoreConfigSnapshot()that materializes the snapshot from the current cache.updateCachebuilds the new snapshot, swapslastStoreConfigSnapshot, then callsfireStoreConfigChangeafter the version-switch fire so listeners observe a fully committed cache.externalStorageReadModeis currently hardcoded toVENICE_ONLYinbuildStoreConfigSnapshot()becauseMetadataResponseRecord/StoreMetaValueare still pinned at v2 / v43 and don't carry the field over the wire. The TODO onStoreConfigSnapshottracks the gap; once the wire format catches up, the snapshot will start reflecting real values and the callback will start firing for read-mode flips.Code changes
AbstractStoreMetadata.fireStoreConfigChange. Bounded by listener count, fires only on bad listener — no rate limiting needed.Concurrency-Specific Checks
CopyOnWriteArrayList; the snapshot diff usesvolatile lastStoreConfigSnapshotread-then-write insideupdateCachewhich is alreadysynchronized.CopyOnWriteArrayList).try/catch (Throwable).How was this PR tested?
StoreConfigSnapshotTest(5 cases: equality identity, equality breaks on each field, null coercion, accessors round-trip).AbstractStoreMetadataTestextended with 6 cases for store-config-change (initial null→snapshot fire, no-op suppression on equal snapshots, listener exception isolation, unregister stops fires, null-listener reject, null-current reject).RequestBasedMetadataTestgains two wiring tests (listener fires on initialstart()withprev=null; no fire on no-change refresh).StoreMetadatamethods aredefaultno-ops; existing implementations compile and run unchanged. Snapshot is built only insideRequestBasedMetadatawhich is the only known production implementation.Does this PR introduce any user-facing or breaking changes?
Top of a 3-PR stack:
[venice-common][protocol] Java accessors for storageMode + externalStorageReadMode + external-table identity([venice-common][controller][compact] Activate StoreMetaValue v44 / AdminOperation v99 + Java accessors #2817) — introducesExternalStorageReadModethat this PR references inStoreConfigSnapshot. Blocks this PR.[fast-client] Version-switch listener on StoreMetadata([fast-client] Version-switch listener on StoreMetadata #2818) — independent of this PR; shares the same touched files (StoreMetadata,AbstractStoreMetadata,RequestBasedMetadata, the two test classes) so the merge order between (2) and (3) determines the conflict resolution direction. This PR is currently stacked on top of (2).