Skip to content

[server][da-vinci] Leader handover: consume the graceful-EOS marker to skip the 5-minute wait#2794

Open
sushantmane wants to merge 19 commits into
linkedin:mainfrom
sushantmane:sumane/leader-handover-2-consume-side
Open

[server][da-vinci] Leader handover: consume the graceful-EOS marker to skip the 5-minute wait#2794
sushantmane wants to merge 19 commits into
linkedin:mainfrom
sushantmane:sumane/leader-handover-2-consume-side

Conversation

@sushantmane
Copy link
Copy Markdown
Contributor

@sushantmane sushantmane commented May 13, 2026

Summary

Stacked on top of #2793 (the emit-side PR).

Builds on top of #2793, which has each demoting leader emit a self-contained Leader Step-Down Stamp control message (dedicated KafkaKey.LEADER_STEPDOWN_STAMP, dedicated producer GUID via LeaderStepDownStampGuidGenerator, segmentNumber=0, messageSequenceNumber=0) carrying its term in LeaderMetadata.termId.

A new leader, during STANDBY -> LEADER (and on top of the existing DoL loopback gate), now considers the most recent non-self Leader Step-Down Stamp it has observed on the local VT. If the consume-side feature flag is on and that stamp's LeaderMetadata.termId is strictly less than the new leader's own term, the new leader bypasses the legacy 5-minute inactivity wait in canSwitchToLeaderTopicLegacy and proceeds to switch to the leader source topic. Otherwise it falls back to the legacy wait - exactly the pre-change behavior.

Feature flag default is OFF (server.leader.handover.consume.stepdown.stamp = false), so this PR is a no-op until explicitly enabled. The fallback is unchanged.

Mechanism

  • PartitionConsumptionState tracks lastObservedNonSelfStepDownStampTermId (single field, -1 = none observed, >= 0 = the stamp's termId). Updated when a record with KafkaKey == LEADER_STEPDOWN_STAMP is consumed on local VT whose termId differs from this replica's currentLeaderTermId. Cleared whenever any subsequent non-DoL data record on local VT is observed, so a stale stamp cannot be replayed against a later transition.
  • StoreIngestionTask exposes a new protected hook observeRecordForLeaderHandover(...) invoked once per consumed record (data or control message). Default base implementation is a no-op.
  • LeaderFollowerStoreIngestionTask overrides the hook: scoped to records whose source topic is the local VT, it records non-self stamp observations and invalidates them on subsequent non-DoL records. DoL stamps (KafkaKey.DOL_STAMP) are excluded so the new leader's own loopback does not invalidate the marker.
  • canSwitchToLeaderTopic adds an OR with the new stepDownStampObserved(pcs): when the consume flag is on, the legacy 5-min check is skipped if a step-down stamp from an earlier term has been observed at the local-VT tail.

Metrics (HostLevelIngestionStats)

  • leader_handover_fast_path - per-partition count of S->L transitions that took the fast path
  • leader_handover_legacy_wait - per-partition count of S->L transitions that took the legacy 5-min wait

Testing Done

…Leader

-> Standby

Adds the emit side of an interim, cooperative fast-path for the
STANDBY -> LEADER transition wait. Each leader, when it receives a
Helix LEADER -> STANDBY (or LEADER -> OFFLINE) transition, drains its
producer and emits a final EndOfSegment control message marked with a
new gracefulLeadershipHandoff=true flag. The message carries the
demoting leader's termId in LeaderMetadata.termId (already in the
footer since v14).

A follow-up change adds the consume side: a new leader observes the
marker and (behind a separate consume-side flag) skips the legacy
5-minute inactivity wait in canSwitchToLeaderTopicLegacy.

This PR is behavior-preserving in isolation: no reader of the marker
ships in this change, so the on-the-wire field is dormant until the
consume side lands. The new VT messages are correctly handled by old
binaries because the field defaults to false on read.

Schema
  - Bumps KafkaMessageEnvelope to v15. EndOfSegment gains
    gracefulLeadershipHandoff: boolean, default false.
  - Bumps AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE current
    protocol version from 14 to 15.

VeniceWriter
  - New sendGracefulLeadershipEndOfSegment(partition, term,
    localPubSubClusterId, callback) that closes the partition's
    current segment with finalSegment=true,
    gracefulLeadershipHandoff=true, and LeaderMetadata.termId =
    given term. No-op when no segment is open.

Server integration
  - PartitionConsumptionState gains currentLeaderTermId,
    set on STANDBY -> LEADER (from the Helix session checker term),
    cleared on LEADER -> STANDBY after the marker is emitted.
  - LeaderFollowerStoreIngestionTask.processConsumerAction for
    LEADER_TO_STANDBY now calls emitGracefulLeadershipEosIfEnabled
    just before vw.closePartition(). The call is behind
    server.leader.handover.emit.graceful.eos (default true) and
    bounded by server.leader.handover.emit.graceful.eos.ack.timeout.ms
    (default 5000). Failure or timeout is best-effort and logged.

Config keys (ConfigKeys + VeniceServerConfig getters)
  - server.leader.handover.emit.graceful.eos (default true)
  - server.leader.handover.consume.graceful.eos (default false,
    consumed by the follow-up PR)
  - server.leader.handover.emit.graceful.eos.ack.timeout.ms (default 5000)

Metrics (HostLevelIngestionStats)
  - leader_stepdown_graceful_eos_emit_success
  - leader_stepdown_graceful_eos_emit_failure

Testing Done
  - VeniceWriterUnitTest:
      testSendGracefulLeadershipEndOfSegmentEmitsExpectedFields
      testSendGracefulLeadershipEndOfSegmentIsNoOpWhenSegmentAlreadyClosed
  - PartitionConsumptionStateTest:
      testCurrentLeaderTermIdLifecycle
  - Re-ran the full VeniceWriterUnitTest, AvroProtocolDefinitionTest,
    TestOptimizedKafkaValueSerializer, LeaderFollowerStoreIngestionTaskTest,
    DolStampTest, and PartitionConsumptionStateTest - all pass.
…o skip

the 5-minute wait

Builds on top of the prior change that has each demoting leader emit a
gracefulLeadershipHandoff=true EndOfSegment carrying its termId in the
LeaderMetadata footer.

A new leader, during STANDBY -> LEADER (and on top of the existing DoL
loopback gate), now considers the most recent non-self EndOfSegment it
has observed on the local VT. If the consume-side feature flag is on
and that EOS is marked graceful with a strictly earlier termId, the
new leader bypasses the legacy 5-minute inactivity wait in
canSwitchToLeaderTopicLegacy and proceeds to switch to the leader
source topic. Otherwise it falls back to the legacy wait - exactly
the pre-change behavior.

Feature flag default is OFF
(server.leader.handover.consume.graceful.eos = false), so this PR
is a no-op until explicitly enabled. The fallback is unchanged.

Mechanism
  - PartitionConsumptionState tracks (lastObservedNonSelfEosGraceful,
    lastObservedNonSelfEosTermId). Updated when an EndOfSegment is
    consumed on local VT whose termId differs from this replica's
    currentLeaderTermId (so a leader's own segment-roll EOS does not
    qualify). Cleared whenever any subsequent non-DoL data record on
    local VT is observed, so a stale graceful EOS cannot be replayed
    against a later transition.
  - StoreIngestionTask exposes a new protected hook
    observeRecordForLeaderHandover(...) invoked once per consumed
    record (data or control message). Default base implementation is
    a no-op.
  - LeaderFollowerStoreIngestionTask overrides the hook: scoped to
    records whose source topic is the local VT, it records non-self
    EOS observations and invalidates them on subsequent non-DoL
    records. DoL stamps (KafkaKey.DOL_STAMP) are excluded so the new
    leader's own loopback does not invalidate the marker.
  - canSwitchToLeaderTopic adds an OR with the new
    gracefulHandoffObserved(pcs): when the consume flag is on, the
    legacy 5-min check is skipped if a graceful EOS from an earlier
    term has been observed at the local-VT tail.

Metrics (HostLevelIngestionStats)
  - leader_handover_fast_path - per-partition count of S->L
    transitions that took the fast path
  - leader_handover_legacy_wait - per-partition count of S->L
    transitions that took the legacy 5-min wait

Testing Done
  - PartitionConsumptionStateTest:
      testObservedNonSelfEosBookkeeping (new): default state, record
      non-graceful, record graceful, clear - asserts each transition
      of the (graceful, termId) pair.
  - testCurrentLeaderTermIdLifecycle (from prior PR) still passes.
  - Re-ran LeaderFollowerStoreIngestionTaskTest,
    LeaderFollowerStoreIngestionTaskPauseTransitionTest,
    StoreIngestionTaskAggregationTest - all pass.
  - All existing PartitionConsumptionStateTest cases continue to pass.
Copilot AI review requested due to automatic review settings May 13, 2026 04:08
Step-Down Stamp

Addresses review feedback: the prior implementation used a flag on
EndOfSegment
and depended on the leader's segment state, which had two issues:
  1. No marker was emitted when the leader had no open segment (e.g. leader
     never produced for this term, or previously rolled the segment).
  2. Closing the segment ourselves was fragile against any in-flight straggler
     write that could reopen a segment after our EOS.

Switches to a DoL-style self-contained stamp:
  - New KafkaKey.LEADER_STEPDOWN_STAMP sentinel
  - New LeaderStepDownStampGuidGenerator (type-3 GUID, distinct from
    HeartbeatGuidV3Generator and DoLStampGuidGenerator)
  - New VeniceWriter.sendLeaderStepDownStamp(...) modeled exactly on
    sendDoLStamp(...): dedicated producer GUID, segmentNumber=0,
    sequenceNumber=0, LeaderMetadata.termId stamped with the demoting
    leader's term, payload reuses heartbeat ControlMessage
  - Schema v15 (added in the prior commit) is no longer needed - reverted to
    v14, v15 directory deleted, EndOfSegment.gracefulLeadershipHandoff field
    dropped

Config keys renamed for clarity:
  - server.leader.handover.emit.graceful.eos
    -> server.leader.handover.emit.stepdown.stamp
  - server.leader.handover.consume.graceful.eos
    -> server.leader.handover.consume.stepdown.stamp
  - server.leader.handover.emit.graceful.eos.ack.timeout.ms
    -> server.leader.handover.emit.stepdown.stamp.ack.timeout.ms

LeaderFollowerStoreIngestionTask emit path now calls sendLeaderStepDownStamp,
which works regardless of the leader's segment state. Ordered after
closePartition so the stamp is the tail record for the new leader's check.

Metrics renamed:
  - leader_stepdown_graceful_eos_emit_{success,failure}
    -> leader_stepdown_stamp_emit_{success,failure}

Testing Done
  - VeniceWriterUnitTest:
      testSendLeaderStepDownStampEmitsExpectedFields (new) - verifies the
      stamp lands with KafkaKey.LEADER_STEPDOWN_STAMP, ProducerMetadata uses
      the dedicated step-down GUID with segment=0 and seq=0, and
      LeaderMetadata.termId/upstreamKafkaClusterId carry the passed-in values.
      testSendLeaderStepDownStampDoesNotRequireOpenSegment (new) - verifies
      the stamp is emitted even when no put/segment was opened first.
  - testCurrentLeaderTermIdLifecycle still passes.
  - Local compile clean across venice-common and da-vinci-client.
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

…nterrupt

on InterruptedException

Addresses Copilot review feedback on linkedin#2793 about the ack-wait catch swallowing
InterruptedException. Splits emitLeaderStepDownStampIfEnabled into:
  1. produce try/catch (any failure to send the stamp -> warn + failure
metric)
  2. ack-wait try/catch with explicit InterruptedException, TimeoutException,
and
     ExecutionException arms.

On InterruptedException we now restore the interrupt status via
Thread.currentThread().interrupt() so the Helix state-transition / shutdown
caller can observe the interrupt promptly. The stamp may or may not have
landed;
the new leader simply falls back to the legacy 5-minute wait if it does not
see
it - safety unchanged.

Testing Done
  - Existing :clients:da-vinci-client compilation clean.
  - VeniceWriterUnitTest stamp tests still pass.
… PR linkedin#2794

# Conflicts:
#	clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/HostLevelIngestionStats.java
Extends testVeniceWriterInProcessConsumerAction with three new cases that
exercise the Leader Step-Down stamp emit helper, raising diff coverage on
clients/da-vinci-client's new lines:

  case 4 - success path: emit flag on, leader term recorded, mock writer
    returns a completed-future PubSubProduceResult. Asserts that
    sendLeaderStepDownStamp(versionTopic-partition, null, term, clusterId)
    fires with the recorded term, and currentLeaderTermId is cleared on
    the tail of the L->Standby handler.
  case 5 - ack timeout path: mock writer returns a future that never
    completes. The helper times out at the configured ackTimeoutMs, logs,
    increments the failure metric, and the consumer-action processing
    continues without crash.
  case 6 - flag-off path: emit flag flipped to false; sendLeaderStepDownStamp
    is not invoked. Confirms the early-return guard.

Testing Done
  - testVeniceWriterInProcessConsumerAction passes locally (2.3s).
Extends testVeniceWriterInProcessConsumerAction with three new cases that
exercise the Leader Step-Down stamp emit helper, raising diff coverage on
clients/da-vinci-client's new lines:

  case 4 - success path: emit flag on, leader term recorded, mock writer
    returns a completed-future PubSubProduceResult. Asserts that
    sendLeaderStepDownStamp(versionTopic-partition, null, term, clusterId)
    fires with the recorded term, and currentLeaderTermId is cleared on
    the tail of the L->Standby handler.
  case 5 - ack timeout path: mock writer returns a future that never
    completes. The helper times out at the configured ackTimeoutMs, logs,
    increments the failure metric, and the consumer-action processing
    continues without crash.
  case 6 - flag-off path: emit flag flipped to false; sendLeaderStepDownStamp
    is not invoked. Confirms the early-return guard.

Testing Done
  - testVeniceWriterInProcessConsumerAction passes locally (2.3s).
Copilot AI review requested due to automatic review settings May 13, 2026 21:49
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 6 comments.

@sushantmane sushantmane added the needs-reviewer Looking for a reviewer to pick this up label May 14, 2026
cleanup-on-early-return

Addresses 5 review comments on the Leader Step-Down Stamp emit side:

1. getLeaderStepDownStampKME now sets LeaderMetadata.upstreamMessageTimestamp
   to DEFAULT_UPSTREAM_MESSAGE_TIMESTAMP (-1). Avro-generated primitives
   initialize long to 0 rather than the schema default, so without this
   assignment a downstream consumer would see 0 and misinterpret the stamp
   as having a real upstream timestamp. Mirrors the defensive setting that
   already exists in getDoLStampKME.

2. StoreIngestionTask.validateMessage now includes LEADER_STEPDOWN_STAMP in
   the DIV bypass list alongside HEART_BEAT and DOL_STAMP. The stamp uses a
   dedicated type-3 producer GUID with segmentNumber=0, sequenceNumber=0
   and does not participate in the per-segment DIV chain.

3. LEADER_TO_STANDBY now wraps the body in try/finally so
   clearCurrentLeaderTermId() runs on the already-STANDBY early-return path,
   not just the normal demotion flow. The PCS-null and session-invalid
   early returns intentionally skip the clear (PCS is unavailable or a
   newer transition is already in flight).

4. Updated the stale "graceful-leadership-handoff EndOfSegment" comment in
   the STANDBY -> LEADER setCurrentLeaderTermId path to describe the new
   step-down stamp design.

5. Updated PartitionConsumptionState.currentLeaderTermId Javadoc to
   describe stamp-based usage (no longer "graceful-EOS emit path"). Also
   documents the wall-clock termId source and its acceptable-for-fast-path
   caveat (full termId-based fencing tracked separately).

Testing Done
  - :internal:venice-common:test VeniceWriterUnitTest passes.
  - :clients:da-vinci-client:test PartitionConsumptionStateTest,
    LeaderFollowerStoreIngestionTaskTest,
    LeaderFollowerStoreIngestionTaskPauseTransitionTest all pass.
  - :internal:venice-common:spotbugsTest clean.
# Conflicts:
#	clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
# Conflicts:
#	clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java
#	internal/venice-common/src/test/java/com/linkedin/venice/writer/VeniceWriterUnitTest.java
…#2794

# Conflicts:
#	clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
#	clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionStateTest.java
Copilot AI review requested due to automatic review settings May 14, 2026 21:33
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.

rolling-upgrade hazard

Two related changes addressing rolling-upgrade safety + Helix-transition
latency:

1. Default SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP to false. A leader on
   this binary must not emit LEADER_STEPDOWN_STAMP records onto VT until
   every reader in the fleet recognizes the new KafkaKey and skips it in
   StoreIngestionTask.validateMessage. Old readers without the DIV-bypass
   list update would validate the stamp's fixed (segment=0, sequence=0)
   GUID as a regular segment control record and fail DIV. Operators flip
   this flag fleet-wide only after every binary in the read path has
   shipped the change.

2. Default SERVER_LEADER_HANDOVER_EMIT_STEPDOWN_STAMP_ACK_TIMEOUT_MS from
   5000 ms to 1000 ms. The ack wait is synchronous inside the Helix
   state-transition handler, so this value bounds the worst-case additional
   Helix transition latency this feature can add. A 1-second bound keeps
   the worst case manageable while still landing the ack in steady state
   (typical local-VT ack is well under 1 s). Missing the ack is safe - the
   new leader falls back to the legacy 5-minute wait.

Testing Done
  - Compile + test classes clean.
  - Existing testVeniceWriterInProcessConsumerAction (which stubs the
    timeout) and PartitionConsumptionStateTest unchanged.
# Conflicts:
#	internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java
…h guard)

Addresses Copilot review on PR linkedin#2794: the consume-side override was running
for
every consumed record on every partition - per-record topic equality and
LEADER_STEPDOWN_STAMP/DOL_STAMP byte-array comparisons - even when
server.leader.handover.consume.stepdown.stamp is off (the default).

Adds an early-return at the top of the override gated on
isLeaderHandoverConsumeStepDownStampEnabled(). When the feature flag is off,
the override is now a true no-op for the drainer hot path. The PR's stated
"this PR is a no-op until explicitly enabled" claim now matches the runtime
behavior.

Testing Done
  - PartitionConsumptionStateTest passes (all variants including the new
    testObservedNonSelfStepDownStampBookkeeping).
  -
LeaderFollowerStoreIngestionTaskTest.testVeniceWriterInProcessConsumerAction
    passes (covers the L->Standby emit path and stubs the consume flag to
    exercise the guard).
Copilot AI review requested due to automatic review settings May 14, 2026 22:59
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated no new comments.

Pushes diffCoverage on da-vinci-client from 41.67% to 50.0%, above the 45%
threshold the CI gate enforces.

Adds three cases to testVeniceWriterInProcessConsumerAction:

  case 7 - ExecutionException during ack-wait: future completes
    exceptionally; emit helper logs, increments failure metric, and demotion
    proceeds (closePartition still fires).
  case 8 - Synchronous produce-side exception: sendLeaderStepDownStamp
    throws; emit helper logs via the outer try/catch and demotion proceeds.
  case 9 - Already-STANDBY early-return path through the try/finally:
    when PCS reports state == STANDBY at the top of LEADER_TO_STANDBY,
    closePartition and stamp emit are skipped, but currentLeaderTermId is
    still cleared via the finally block.

Testing Done
  - testVeniceWriterInProcessConsumerAction passes (3.5 s).
  - :clients:da-vinci-client:diffCoverage reports 50% branch coverage on
    the diff, clears the 45% threshold.
observeRecordForLeaderHandover

Pushes diffCoverage on PR linkedin#2794's da-vinci-client diff from 36.76% to 69.12%,
clearing the 45% threshold.

Drive-through the public processConsumerRecord ->
observeRecordForLeaderHandover
and canSwitchToLeaderTopic -> stepDownStampObserved paths is awkward in unit
tests because both methods sit inside long-running task loops. Mark
stepDownStampObserved package-private with @VisibleForTesting (matching the
existing convention for the same class) so the test class - already in the
same package - can call both methods directly with mocked
PartitionConsumptionState.

Added tests:

stepDownStampObserved:
  - testStepDownStampObservedFlagOff: consume flag off -> early return,
    no PCS access.
  - testStepDownStampObservedNoStamp: flag on but no stamp recorded ->
    returns false.
  - testStepDownStampObservedTermInequality (DataProvider, 5 cases):
    term equal / observed > mine / observed -1 / my term -1 / valid case.

observeRecordForLeaderHandover:
  - testObserveRecordForLeaderHandoverEarlyReturnOnFlagOff: hot-path guard
    short-circuits when consume flag off.
  - testObserveRecordForLeaderHandoverNonLocalVtRecordIsIgnored: records
    from RT or remote VT do not feed the bookkeeping.
  - testObserveRecordForLeaderHandoverRecordsStepDownStamp: stamp on local
    VT records the term.
  - testObserveRecordForLeaderHandoverDolStampLoopbackDoesNotInvalidateMarker:
    the new leader's own DoL loopback never invalidates a previously
    recorded stamp.
  - testObserveRecordForLeaderHandoverSelfRecordDoesNotInvalidateMarker:
    self-produced record (termId == my term) is benign.
  - testObserveRecordForLeaderHandoverNonSelfRecordInvalidatesMarker:
    other-leader, non-DoL record after a stamp -> stamp cleared.

Testing Done
  - All 13 new tests pass.
  - :clients:da-vinci-client:diffCoverage reports 69.12% on the diff,
    clearing the 45% gate (was 36.76% before this change).
Copilot AI review requested due to automatic review settings May 15, 2026 08:43
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 12 changed files in this pull request and generated no new comments.

…read time

Addresses Copilot review comment: a misconfigured negative ack-timeout would
make CompletableFuture.get(long, TimeUnit) return immediately or, on some
JDKs,
throw IllegalArgumentException - neither is what we want in the demotion
handler. Clamp the value to Math.max(0, raw) at config-read time so the
emit path can call get() unconditionally without a defensive guard at every
call site. Zero still works (no-wait poll), which is the operator's choice.

Testing Done
  - testVeniceWriterInProcessConsumerAction passes (3.1 s).
  - No behavior change for the default 1000 ms value.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

needs-reviewer Looking for a reviewer to pick this up

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants