[server][vpj][admin-tool] Plumb KME SchemaReader through consumer-side dictionary fetch and topic dump paths#2807
Conversation
…er on
consumer paths
VeniceWriter.sendDoLStamp routes through the producerAdapter directly with
EmptyPubSubMessageHeaders.SINGLETON, bypassing the getHeaders() helper that
attaches the VENICE_TRANSPORT_PROTOCOL_HEADER ('vtp') on a segment-start
record. A leader handover that lands a DoL stamp at offset 0 of a fresh
version topic therefore writes the first record with no headers, and a
forward-compat consumer whose jar predates the current KME protocol version
has no on-record schema to bootstrap with - it throws "Received Protocol
Version 'N' which is not supported by KafkaValueSerializer".
Fix: route sendDoLStamp through getHeaders(producerMetadata, false, null,
EmptyPubSubMessageHeaders.SINGLETON), same pattern sendHeartbeat already
uses. DoL stamps always carry segmentNumber=0 and messageSequenceNumber=0
(set in getDoLStampKME), so needVtpHeader is true and vtp is attached.
Defense in depth: even with the producer fixed, every consumer path that
opens its own KME deserializer can now resolve an unknown protocol version
via a SchemaReader, not just via vtp. Plumbed through 7 production
callsites:
KafkaInputUtils.getCompressor | newer.kme.schemas.*
broadcast
AbstractInputRecordProcessor.readDictionaryFromKafka
AbstractPartitionWriter (ZSTD branch)
SparkPubSubPartitionReaderFactory |
VTConsistencyCheckerJob.createConsumer |
AdminTool.getConsumer |
KmeSchemaReader.fromControllerClient
StoreIngestionTask.getNewStoreVersionState SoP-null branch
| reuses host's
kafkaMessageEnvelopeSchemaReader
VeniceChangelogConsumerImpl already wires a KME SchemaReader (PR linkedin#2177)
- no change. TopicMetadataFetcher never deserializes message bodies
- left as-is.
First-iteration strict mode: the new helpers throw IllegalStateException
when they can't reach a SchemaReader, so any callsite that hasn't actually
plumbed one is surfaced loudly during PR validation. A follow-up will
soften the throws to logged fallbacks once we have the full audit of
broken paths.
Refactor: KMESchemaReaderForKafkaInputFormat in venice-push-job moved to
venice-common as KmeSchemaReader so admin tool / server / controller can
construct one without taking a build dependency on venice-push-job. Adds
a static fromControllerClient(ControllerClient) factory that fetches the
KME schemas from the venice_system_store_kafka_message_envelope system
store.
Testing Done:
- New regression test: VeniceWriterUnitTest.testSendDoLStampCarriesVtpHeader
asserts the DoL stamp now carries the vtp header and the bytes are the
current KafkaMessageEnvelope schema JSON.
- All existing VeniceWriterUnitTest, KafkaInputUtilsTest,
StoreIngestionTaskTest.testGetNewStoreVersionState* and
PubSubMessageDeserializerTest tests pass.
- Compile-clean across internal:venice-common, clients:venice-push-job,
clients:venice-admin-tool, clients:da-vinci-client, services:venice-server
(main + test sources).
There was a problem hiding this comment.
Pull request overview
This PR fixes a forward-compatibility bootstrap gap where VeniceWriter.sendDoLStamp() could emit the first record on a fresh version topic with empty headers (missing vtp), and extends “defense-in-depth” by plumbing a KME-backed SchemaReader into additional consumer/deserializer construction paths so unknown KME protocol versions can be resolved even when vtp is absent.
Changes:
- Producer: route DoL-stamp produce through
getHeaders(...)so segment-startvtpis attached. - Consumers/tools: introduce
KmeSchemaReaderinvenice-commonand addPubSubMessageDeserializerfactories that accept aSchemaReader; update several VPJ/Spark/admin-tool/server ingestion call sites to use schema-aware deserializers. - Tests: add a regression unit test asserting DoL stamps carry
vtp.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java | Adds regression test ensuring DoL stamp carries vtp header. |
| internal/venice-common/src/main/java/com/linkedin/venice/writer/VeniceWriter.java | Fixes DoL-stamp send path to attach headers via getHeaders(...). |
| internal/venice-common/src/main/java/com/linkedin/venice/schema/KmeSchemaReader.java | Moves/renames KME schema reader into venice-common and adds a controller-backed factory. |
| internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java | Adds schema-reader-backed factory methods for production-safe deserialization. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/SparkPubSubPartitionReaderFactory.java | Switches Spark PubSub consumption to a schema-aware deserializer. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/spark/consistency/VTConsistencyCheckerJob.java | Switches VT consistency checker consumer to a schema-aware deserializer. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractPartitionWriter.java | Wires schema-aware deserializer into ZSTD dictionary fetch path. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/task/datawriter/AbstractInputRecordProcessor.java | Wires schema-aware deserializer into dictionary fetch helper. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputUtils.java | Renames schema reader usage and adds buildSchemaAwareDeserializer(...) helper. |
| clients/venice-admin-tool/src/main/java/com/linkedin/venice/AdminTool.java | Switches admin-tool topic consumer to a controller-backed KME schema-aware deserializer. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskFactory.java | Plumbs a KME SchemaReader through the SIT factory builder. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java | Uses the plumbed schema reader for SoP-null dictionary fallback; adds strict null checks. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java | Passes the host’s KME schema reader into SIT builder. |
Comments suppressed due to low confidence (3)
internal/venice-common/src/main/java/com/linkedin/venice/schema/KmeSchemaReader.java:56
- KmeSchemaReader(Map) claims to throw IllegalArgumentException when newerKmeSchemas is null, but it will currently NPE on newerKmeSchemas.entrySet(). Add an explicit null check (e.g., Objects.requireNonNull) so callers get a deterministic failure with a clear message.
internal/venice-common/src/main/java/com/linkedin/venice/schema/KmeSchemaReader.java:54 - In this constructor, newerKmeSchemas are added into SchemaData before the resource-loaded schemas (see the subsequent addValueSchema loops). Since SchemaData.addValueSchema overwrites by id, any overlapping ids will end up using the resource schema, which contradicts the “merged on top”/runtime-precedence intent. Consider loading resource schemas first and then overlaying newerKmeSchemas (or explicitly preventing overwrites).
internal/venice-common/src/main/java/com/linkedin/venice/schema/KmeSchemaReader.java:77 - fromControllerClient() assumes controllerClient is non-null and will NPE otherwise. Since this is a public factory that will be used across modules, add an explicit null check with a clear message (or document the precondition in the Javadoc).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…ks to logged fallbacks The first iteration threw IllegalStateException when a code path couldn't reach a KME SchemaReader, to surface every callsite that hadn't actually plumbed one. The CI run confirmed the gaps: integration-test VPJ jobs don't set SYSTEM_SCHEMA_READER_ENABLED, and several admin-tool entry points hit getConsumer before cluster discovery has wired controllerClient. Soften the three strict throws to logged fallbacks to the jar-only deserializer. The on-wire vtp header bootstrap still applies on the fallback path, so a forward-compat consumer can still resolve an unknown KME protocol version when the writer attaches vtp - which is now the guaranteed case for DoL stamps thanks to the sendDoLStamp fix in this PR. Behavior on each path when the SchemaReader can't be reached: KafkaInputUtils.buildSchemaAwareDeserializer log + createDefaultDeserializer AdminTool.buildAdminToolDeserializer log + createOptimizedDeserializer StoreIngestionTask SoP-null fallback log + createDefaultDeserializer createWithSchemaReader / createOptimizedWithSchemaReader still requireNonNull on the SchemaReader argument - that is an API contract, not a missing-plumbing concern. Testing Done: - VeniceWriterUnitTest.testSendDoLStampCarriesVtpHeader still passes. - Compile-clean across internal:venice-common, clients:venice-push-job, clients:venice-admin-tool, clients:da-vinci-client, services:venice-server (main + test sources).
buildSchemaAwareDeserializer + createWithSchemaReader Adds focused unit tests for the new code surface introduced by the DoL stamp vtp + KME SchemaReader plumbing PR: - KmeSchemaReaderTest (new): merges newer schemas with jar resources, fromControllerClient happy path + error response, getKey/Value/Latest, update-schema unsupported, close is no-op. - PubSubMessageDeserializerTest: createWithSchemaReader and createOptimizedWithSchemaReader both reject null SchemaReader via Objects.requireNonNull, and successfully construct when given a mocked SchemaReader. - KafkaInputUtilsTest: buildSchemaAwareDeserializer falls back to the jar-only default deserializer when SYSTEM_SCHEMA_READER_ENABLED is false (the default), and builds a SchemaReader-backed deserializer when the system flag is enabled and newer.kme.schemas.* entries are present in the job conf. Testing Done: - All new tests pass locally: - KmeSchemaReaderTest: 8/8 - PubSubMessageDeserializerTest: existing tests still pass + 4 new tests - KafkaInputUtilsTest: existing tests still pass + 2 new tests - VeniceWriterUnitTest.testSendDoLStampCarriesVtpHeader still passes. - Compile-clean across all touched modules.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 16 out of 16 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (3)
internal/venice-common/src/main/java/com/linkedin/venice/schema/KmeSchemaReader.java:61
- In
KmeSchemaReader(...), the merge order is reversed vs the class Javadoc:SchemaData.addValueSchemaoverwrites by ID, so addingnewerKmeSchemasfirst and resource schemas second means jar resources win on duplicate IDs. If the intent is "newer schemas override jar-bundled schemas", add resource schemas first, then overlaynewerKmeSchemas(or explicitly handle duplicates).
internal/venice-common/src/main/java/com/linkedin/venice/schema/KmeSchemaReader.java:56 - The constructor Javadoc says it throws
IllegalArgumentExceptionwhennewerKmeSchemasis null, but the implementation doesn't validate it and will NPE onentrySet(). ConsiderObjects.requireNonNull(newerKmeSchemas, ...)to match the contract and fail fast with a clear message.
internal/venice-common/src/main/java/com/linkedin/venice/schema/KmeSchemaReader.java:85 fromControllerClientiteratesresponse.getSchemas()without a null check.MultiSchemaResponse.schemascan be null if the controller returns a success response but omits the field, which would NPE here. Consider treating null as empty and/or throwing aVeniceExceptionwith a clear message.
…coverage The SoP-null dictionary fetch in StoreIngestionTask.getNewStoreVersionState gained an if/else branch over kafkaMessageEnvelopeSchemaReader (use it when plumbed; jar-only fallback + warn log when not). CI's diffCoverage flagged the branch as 0% covered. Refactor the branch into a tiny @VisibleForTesting helper method buildDictionaryFetchDeserializer() so the two paths can be exercised in isolation without standing up a real Kafka consumer. Adds two tests: - testBuildDictionaryFetchDeserializerWithSchemaReader: mocks SIT, injects a SchemaReader into the kafkaMessageEnvelopeSchemaReader field via reflection, asserts the helper returns a SchemaReader-backed deserializer. - testBuildDictionaryFetchDeserializerWithoutSchemaReaderFallsBack: same setup with kafkaMessageEnvelopeSchemaReader left null, asserts the helper returns a non-null deserializer (the jar-only fallback path). Testing Done: - Both new tests pass locally via SITWithPWiseWithoutBufferAfterLeaderTest. - Existing testGetNewStoreVersionStateFallsBackToInstanceFields and testGetNewStoreVersionStateWithZstdCompression unchanged.
…ce, log throwable Addresses inline review comments from Copilot on the KME SchemaReader plumbing: 1. KafkaInputUtils gains buildSchemaAwareOptimizedDeserializer() so hot read paths (Spark per-partition record readers) get the optimized value-serializer variant. The non-optimized buildSchemaAwareDeserializer stays for one-shot fetches (dictionary fetch in getCompressor / AbstractInputRecordProcessor / AbstractPartitionWriter). Both delegate to a shared doBuildSchemaAwareDeserializer(properties, optimized). 2. SparkPubSubPartitionReaderFactory now calls buildSchemaAwareOptimizedDeserializer, matching its previous use of createOptimizedDeserializer before this PR. No perf regression on the Spark hot path. 3. Updated SparkPubSubPartitionReaderFactory comment from "Strict mode: throws if..." to describe the actual graceful-fallback behavior. 4. Log-once-per-JVM gating on the fallback warnings via AtomicBoolean, so Spark executors that construct one deserializer per partition don't spam logs on older configs without SYSTEM_SCHEMA_READER_ENABLED. 5. Added a separate warn (also log-once) for the previously-silent case where SYSTEM_SCHEMA_READER_ENABLED=true but the newer.kme.schemas.* broadcast is empty - now the user knows the broadcast pipeline failed. 6. AdminTool.buildAdminToolDeserializer logs the exception as a Throwable (not e.toString()) on the schema-fetch failure path, so stack traces are preserved for debugging controller connectivity / auth issues. Testing Done: - All five KafkaInputUtilsTest tests pass (3 pre-existing + 2 new from earlier in this PR). - Compile-clean across venice-push-job and venice-admin-tool.
…adcast fallback Follow-up to 738f9ed: add tests for the new code paths so diffCoverage stays green: - testBuildSchemaAwareOptimizedDeserializerFallsBackWhenSystemSchemaReaderDisabl ed - testBuildSchemaAwareOptimizedDeserializerBuildsSchemaAwareWhenEnabled - testBuildSchemaAwareDeserializerFallsBackWhenSystemSchemaReaderEnabledButBroad castIsEmpty All 8 KafkaInputUtilsTest tests pass locally.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 17 out of 17 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (2)
internal/venice-common/src/main/java/com/linkedin/venice/schema/KmeSchemaReader.java:86
- fromControllerClient iterates response.getSchemas() without a null check. MultiSchemaResponse.schemas can be null (it’s nullable in the model and some callers explicitly guard against null), which would trigger an NPE here. Consider treating null as an empty schema list (jar-only reader) or throwing a clear VeniceException indicating a malformed controller response.
internal/venice-common/src/main/java/com/linkedin/venice/schema/KmeSchemaReader.java:61 - KmeSchemaReader(Map) doesn’t enforce the documented null contract: passing null will currently NPE at newerKmeSchemas.entrySet() even though the Javadoc says IllegalArgumentException. Also, the merge order is opposite of the stated behavior (“newer schemas … merged on top of jar resources”): jar resource schemas are added after the runtime map and will overwrite any duplicate IDs from newerKmeSchemas. Add an explicit null guard (e.g., requireNonNull) and consider loading jar resources first, then applying runtime schemas so the precedence matches the documentation/intent.
Summary
Defense-in-depth follow-up to #2808 (DoL stamp
vtpheader fix). Plumbs a KMESchemaReaderthrough every production callsite that opens its ownPubSubMessageDeserializer, so a consumer that lands on a header-less control-message record can still resolve an unknown KME protocol version through a schema reader instead of throwingReceived Protocol Version 'N' which is not supported. Useful insurance against any future writer regression that drops thevtpheader.Depends on #2808. Once that lands, this branch will be rebased to drop the writer change (which is the first commit on this branch).
Consumer-side plumbing
The following production paths now use a
KafkaValueSerializerwhosesetSchemaReaderis wired with aKmeSchemaReader, so when the in-memory protocol-version cache misses andvtpis unavailable, the serializer fetches the unknown schema through the reader:KafkaInputUtils.getCompressor(ZSTD_WITH_DICT branch, fetches the SoP dictionary from the source VT)newer.kme.schemas.*broadcast on the VPJ job confAbstractInputRecordProcessor.readDictionaryFromKafkaAbstractPartitionWriterZSTD branchSparkPubSubPartitionReaderFactory.createReaderVTConsistencyCheckerJob.createConsumerAdminTool.getConsumerKmeSchemaReader.fromControllerClient(ControllerClient)fetches all KME schemas from thevenice_system_store_kafka_message_envelopesystem storeStoreIngestionTask.getNewStoreVersionState(SoP-null fallback path)kafkaMessageEnvelopeSchemaReader, newly plumbed throughStoreIngestionTaskFactory.Builder.setKafkaMessageEnvelopeSchemaReaderVeniceChangelogConsumerImplalready wires a KMESchemaReader(added in #2177) — no change.TopicMetadataFetcherconstructs a deserializer but never invokes the deserialize path (only uses position/topic-metadata APIs on the underlying consumer) — left as-is.Graceful fallback semantics
Where the plumbing exists but the upstream config wasn't actually broadcast, the helpers log a warning and fall back to the jar-only deserializer instead of throwing. The on-wire
vtpheader bootstrap still applies on the fallback path. Specific behaviors:KafkaInputUtils.buildSchemaAwareDeserializer(VeniceProperties): ifSYSTEM_SCHEMA_READER_ENABLEDis false (older VPJ job conf), logs a warning and returnsPubSubMessageDeserializer.createDefaultDeserializer().AdminTool.buildAdminToolDeserializer(): ifcontrollerClientis null (e.g., admin-message dump that runs before cluster discovery), logs and returnsPubSubMessageDeserializer.createOptimizedDeserializer(). If the controller fetch itself fails (network, ACL, etc.), the exception is caught and the same fallback is used with awarnlog.StoreIngestionTask.getNewStoreVersionStateSoP-null branch: if the host'skafkaMessageEnvelopeSchemaReaderwasn't plumbed throughStoreIngestionTaskFactory.Builder, logs and usescreateDefaultDeserializer().PubSubMessageDeserializer.createWithSchemaReaderandcreateOptimizedWithSchemaReaderthemselves doObjects.requireNonNull(schemaReader, …)— that's an API contract.Refactor:
KmeSchemaReadermoved to venice-commonKMESchemaReaderForKafkaInputFormatlived underclients/venice-push-job/.../hadoop/input/kafka. The class was a generalSchemaReaderover KME protocol-version schemas — its name was an artifact of where it lived, not what it did. Moved tointernal/venice-common/.../schema/KmeSchemaReaderso non-VPJ callers (admin tool, server, controller-side code) can construct one without a build dependency onvenice-push-job. Renamed toKmeSchemaReaderto follow Java naming conventions for acronyms.A new
KmeSchemaReader.fromControllerClient(ControllerClient)static factory pulls all KME schemas from thevenice_system_store_kafka_message_envelopesystem store viacontrollerClient.getAllValueSchema(…)and merges them with the schemas baked into the jar's resources. Used byAdminToolhere.Strict-to-graceful iteration
The first iteration of this PR used strict throws (
IllegalStateException) at the consumer-side fallback points to surface every callsite that hadn't plumbed aSchemaReader. CI confirmed the gaps exactly where expected: integration-test VPJ jobs don't setSYSTEM_SCHEMA_READER_ENABLED=trueby default, and various admin-tool entry points run before cluster discovery has wiredcontrollerClient. The follow-up commits soften those throws to logged fallbacks. TheObjects.requireNonNulloncreateWithSchemaReader/createOptimizedWithSchemaReaderremains — that's an API contract, not a missing-plumbing concern.Related
vtpheader fix this PR builds on. Merge that first.SchemaReaderinto the CDC changelog consumer.Testing Done
New unit tests (all pass locally):
KmeSchemaReaderTest(new file, 8 tests): exercises the constructor merge with empty and non-empty newer-schemas maps;getKeySchema,getValueSchema(id),getLatestValueSchema(Id); the update-schema methods that should throwVeniceUnsupportedOperationException;closeis a no-op;fromControllerClienthappy path with a mockedControllerClientreturning a populatedMultiSchemaResponse; andfromControllerClienterror path where the response carries an error string.PubSubMessageDeserializerTest(+4 tests):createWithSchemaReader(null)andcreateOptimizedWithSchemaReader(null)both throwNullPointerExceptionviarequireNonNull; both successfully construct a deserializer with a non-null mockedSchemaReaderand expose aKafkaValueSerializer.KafkaInputUtilsTest(+2 tests):buildSchemaAwareDeserializerfalls back to a jar-only deserializer whenSYSTEM_SCHEMA_READER_ENABLEDis unset; builds aSchemaReader-backed deserializer when the system flag is enabled andnewer.kme.schemas.*entries are present in the job conf.StoreIngestionTaskTest(+2 tests):buildDictionaryFetchDeserializerreturns aSchemaReader-backed deserializer when the host plumbedkafkaMessageEnvelopeSchemaReader; returns the jar-only fallback (with a warn log) when the SchemaReader wasn't wired.Existing tests confirmed still passing locally: all
VeniceWriterUnitTest,KafkaInputUtilsTest,StoreIngestionTaskTest.testGetNewStoreVersionState*,PubSubMessageDeserializerTest.Compile-clean across
internal:venice-common,clients:venice-push-job,clients:venice-admin-tool,clients:da-vinci-client,services:venice-server(main + test sources).