[changelog-consumer] Opt-in periodic offset commit to enable Kafka consumer-group lag monitoring#2789
Open
minhmo1620 wants to merge 6 commits into
Open
Conversation
…Kafka property overrides VeniceChangelogConsumer can now commit its current positions back to the underlying pub-sub broker so external tools (e.g. xinfra consumer-group lag dashboards) can observe progress. This is monitoring-only — the caller's checkpoint state remains the source of truth for offsets; on restart the consumer re-seeks to the caller's checkpoint and the Kafka-side committed offset is ignored. A failed commit is non-fatal. Wires VeniceChangelogConsumerImpl.internalPoll(long) to call a new commitOffsets() at most once per ChangelogClientConfig.consumerOffsetCommitIntervalMs (default 30s, set to 0 to disable). VeniceChangelogConsumerClientFactory gets a new overload accepting a Properties bag of per-call consumer overrides so callers can set e.g. pubsub.kafka.consumer.group.id deterministically per view job. Context: enables Proteus to monitor Venice CDC Flink jobs via the same Kafka consumer-group regex pattern they use for every other Flink job. See VENG-12668. Tests: - ApacheKafkaConsumerAdapterTest: commitSync delegates + swallows - VeniceChangelogConsumerImplTest: config default, commitOffsets delegation, periodic commit cadence, zero-interval escape hatch - VeniceChangelogConsumerClientFactoryTest: per-call overrides land on the resolved per-store config and don't leak globally
There was a problem hiding this comment.
Pull request overview
This PR adds an optional “monitoring-only” offset commit path for Venice changelog consumers so external Kafka consumer-group monitoring tools can observe lag, and adds factory overloads to allow per-call Kafka consumer property overrides (e.g., group.id) without mutating global config.
Changes:
- Add
commitSync()toPubSubConsumerAdapter(default no-op) and implement it inApacheKafkaConsumerAdapterby delegating to KafkacommitSync()while swallowing failures. - Add
commitOffsets()toVeniceChangelogConsumer(default no-op) and periodically invoke it frompoll()in the changelog consumer implementation, gated byconsumerOffsetCommitIntervalMs. - Add
ChangelogClientConfig.consumerOffsetCommitIntervalMs(default 30s; 0 disables) and addPropertiesoverride overloads inVeniceChangelogConsumerClientFactorywith tests.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubConsumerAdapter.java | Adds a monitoring-only commitSync() default method to the pub-sub consumer abstraction. |
| internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapter.java | Implements monitoring-only commits by delegating to Kafka commitSync() and swallowing errors. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumer.java | Adds a monitoring-only commitOffsets() default method to the changelog consumer API. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java | Periodically calls commitOffsets() from poll() based on the new interval config. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactory.java | Adds overloads to merge per-call consumer Properties into resolved per-store configs without leaking globally. |
| clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java | Introduces consumerOffsetCommitIntervalMs config with clone support. |
| internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapterTest.java | Adds unit tests for commitSync() delegation and exception swallowing. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImplTest.java | Adds unit tests for commit interval config, delegation, cadence, and interval=0 behavior. |
| clients/da-vinci-client/src/test/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactoryTest.java | Adds test verifying per-call consumer overrides merge without mutating global config. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
added 2 commits
May 12, 2026 13:45
group.id; preserve cadence after slow commit Two fixes from the Copilot review on PR linkedin#2789: 1. ApacheKafkaConsumerAdapter.commitSync() now silently no-ops when group.id is not configured on the consumer. Without this, existing callers that didn't opt into broker-side monitoring saw a WARN every commit interval (default 30s) as Kafka threw IllegalStateException. For real broker failures, log the exception with stack (not just getMessage) so production issues are debuggable. 2. VeniceChangelogConsumerImpl.maybeCommitOffsets() now captures lastCommitTimeMs *after* the commit returns instead of before. Previously, a slow commit (e.g. broker hiccup) would cause the next poll to immediately re-commit because the timestamp didn't account for commit duration. Tests: - testCommitSyncShortCircuitsWhenNoGroupId — new - testCommitSyncDelegatesToKafkaConsumerWhenGroupIdConfigured — renamed, now explicitly sets group.id in mocked consumer properties - testCommitSyncSwallowsExceptions — updated to set group.id so the swallow path is actually exercised
… — co mmits require two explicit opt-ins Previously the interval defaulted to 30s. Even though ApacheKafkaConsumerAdapter.commitSync() short-circuits silently when no group.id is set, every poll cycle was still walking the maybeCommitOffsets path for callers who had no monitoring need. Now: commits actually happen only when *both* are true — the caller sets a positive consumerOffsetCommitIntervalMs *and* a Kafka group.id on the consumer properties. No magic auto-enable, no implicit behavior tied to property presence. Default = 0 (off). Test assertion updated to match.
added 2 commits
May 12, 2026 14:39
…ry — ca llers configure group.id via the global ChangelogClientConfig The Properties overload on VeniceChangelogConsumerClientFactory turned out to be redundant. The Flink-side consumer (flink-li-venice-connectors) sets its default group.id by injecting it into the global cfg.backendConfig Properties bag in OffspringFactory, which flows through to ChangelogClientConfig.consumerProperties unchanged. No per-call factory parameter needed. Keeps everything else from the original PR intact: commitSync() on PubSubConsumerAdapter/ApacheKafkaConsumerAdapter, commitOffsets() on VeniceChangelogConsumer/Impl, the maybeCommitOffsets cadence in internalPoll, and the consumerOffsetCommitIntervalMs setter on ChangelogClientConfig. Tests: - testGetChangelogConsumerMergesConsumerOverrides — removed (no longer ap plicable) - All commit-path tests still pass.
maybeCommitOffsets Matches the existing pattern in the same file (lines 274, 398, 1285) — the protected `time` field is reserved for cases that need a fake-clock injection point (e.g. version-swap timeout checks). The periodic-commit cadence doesn't need that abstraction: the tests rely on lastCommitTimeMs starting at 0 and any real clock easily exceeding any positive interval.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerImpl.java:789
maybeCommitOffsets()usesSystem.currentTimeMillis()even though this class already has an injectableTime timeabstraction (used elsewhere viatime.getMilliseconds()). Using theTimefield here would keep time handling consistent and make commit-cadence behavior easier to test deterministically (e.g., withTestMockTime).
private void maybeCommitOffsets() {
if (consumerOffsetCommitIntervalMs <= 0) {
return;
}
long now = System.currentTimeMillis();
if (now - lastCommitTimeMs >= consumerOffsetCommitIntervalMs) {
commitOffsets();
lastCommitTimeMs = System.currentTimeMillis();
}
…ire + adapter-level too Two follow-ups from PR review on linkedin#2789: 1. ApacheKafkaConsumerAdapter.commitSync(): acquireLockWithTimeout() was called before the try block, so a lock-acquisition failure (PubSubOpTimeoutException, PubSubClientException from interrupt) would propagate out of monitoring-only commitSync. Moved the acquire inside the try; releaseLock() is already safe to call on a non-held lock. 2. VeniceChangelogConsumerImpl.commitOffsets(): added defensive try/catch as a second safety layer. If any PubSubConsumerAdapter implementation ever throws unexpectedly, the exception now gets logged and swallowed here instead of bubbling out of poll(). Tests: - testCommitOffsetsSwallowsExceptions — new
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem Statement
We want to leverage Kafka's existing consumer-group lag mechanism to detect laggy
VeniceChangelogConsumer(DVRT) jobs. Kafka already computes lag for any consumer that commits to__consumer_offsets— the broker has the latest committed offset, the high-watermark, and exposes the difference through every standard consumer-group tool (kafka-consumer-groups.sh, Burrow, lag dashboards, regex-based alerts). Reusing this means we don't have to stand up a parallel lag-monitoring stack just for DVRT.Today DVRT can't plug into this. It never writes to
__consumer_offsets, so from the broker's point of view the consumer group doesn't exist, and every tool that reads broker-side lag returns empty.Solution
Let the changelog consumer optionally publish its current position to
__consumer_offsetspurely so external tools can see it. Source-of-truth for offsets stays with the caller — Kafka-side commits are never read back on restart and a failed commit is swallowed.commitOffsets()toVeniceChangelogConsumer(default no-op; Kafka-backed impl delegates to the underlying consumer'scommitSync()).poll()at most once perconsumerOffsetCommitIntervalMs. The interval is measured after each commit returns so a slow commit can't trigger an immediate re-commit on the next poll.Opting in is fully explicit and requires two knobs. Commits actually happen only when both:
ChangelogClientConfig.consumerOffsetCommitIntervalMs > 0(default0= off), andgroup.idis set on the consumer properties (caller configures this on the existingChangelogClientConfig.consumerPropertiesbag).If either is missing the path short-circuits silently — no Kafka call, no log noise. Existing callers see zero behavior change. No new factory overload or
Propertiesparameter — callers wire both knobs through the existingChangelogClientConfigAPI.Code changes
ChangelogClientConfig.consumerOffsetCommitIntervalMs(default0= off). To enable, the caller must additionally setgroup.idon the consumer properties.LOGGER.warn(with stack trace) for actual broker-side commit failures. No log noise when commits are disabled or whengroup.idis unset — the path short-circuits silently.Concurrency-Specific Checks
consumerLock,subscriptionLock) — no new synchronization primitives.How was this PR tested?
commitSync()delegates to the underlying Kafka consumer whengroup.idis configured.commitSync()short-circuits silently whengroup.idis missing (no Kafka call, no log).commitSync()swallows real broker exceptions (logged once with stack).commitOffsets()delegates to the underlying pub-sub consumer.defaultno-ops; the commit path is fully off by default.Does this PR introduce any user-facing or breaking changes?