Skip to content

[server][dvc] Invalidate inherited fast-RTS inputs after blob transfer and persist before logging#2773

Open
sushantmane wants to merge 11 commits into
linkedin:mainfrom
sushantmane:sumane/post-blob-clear-stale-fast-rts-inputs
Open

[server][dvc] Invalidate inherited fast-RTS inputs after blob transfer and persist before logging#2773
sushantmane wants to merge 11 commits into
linkedin:mainfrom
sushantmane:sumane/post-blob-clear-stale-fast-rts-inputs

Conversation

@sushantmane
Copy link
Copy Markdown
Contributor

@sushantmane sushantmane commented May 5, 2026

Note on enforce-lines-added: PartitionState schema bump (v22 → v23) is tightly coupled to the consumer code (OffsetRecord.clearInheritedDonorState capture + BlobTransferIngestionHelper.isReplicaLaggedAndNeedBlobTransfer fallback). Splitting into two PRs would land a schema field with no reader, then a reader with no field — neither half is independently verifiable. Adding VALIDATION_OVERRIDE keyword so this can be reviewed end-to-end. The Avro change itself is additive (new field with default: -1), backward-compatible.

VALIDATION_OVERRIDE: schema-and-java-changes-must-land-together-for-this-feature.

Summary

Relationship to the companion PR #2779

#2779 fixes what actually fired in production. The captured incident's RTS promotion went through LeaderFollowerStoreIngestionTask.reportIfCatchUpVersionTopicOffset, observable as the Reported CATCH_UP_BASE_TOPIC_OFFSET_LAG log entry. That path needs cache eviction + an optional leader-complete gate; #2779 adds both.

This PR (#2773) hardens an adjacent path — the fast-RTS code (checkFastReadyToServeForReplicacheckFastReadyToServeWithPreviousTimeLag). That path was not the trigger in the captured incident: PR #2715 already clears the previouslyReadyToServe gate flag in-memory after blob transfer, so today the fast-RTS path correctly bails out at the heartbeat-INVALID guard for a fresh follower (measureHybridHeartbeatTimestamp returns INVALID_MESSAGE_TIMESTAMP while consumedFromUpstream is false).

The risk this PR closes is the crash-restart window: between the moment blob-transfer persists the donor's OffsetRecord to disk and the moment the SIT-side post-blob clear runs, the donor's heartbeatTimestamp and lastCheckpointTimestamp are sitting on disk verbatim. If the JVM crashes in that window, restart re-reads the donor record and the (currentLag - previousLag) ≈ 0 math against the donor's clock would wrongly trigger fast-RTS. Today only the heartbeat-INVALID guard saves us; if that guard ever weakens (refactor, logic bug), the second guard (lastCheckpointTimestamp) is overwritten by now() during normal syncOffset and offers no protection. This PR arms both guards at every observable persistence point.

Both PRs are independent — they touch different methods, fix different classes of artifact, and can land in either order. #2779 is the load-bearing production fix; this PR is belt-and-suspenders for the persistence-window variant.

Builds on top of #2715. After that PR cleared the inherited previouslyReadyToServe gate flag on the post-blob-transfer reinitialized PCS, the OffsetRecord still carried the donor server's heartbeatTimestamp and lastCheckpointTimestamp. checkFastReadyToServeWithPreviousTimeLag would correctly bail out at the heartbeat-INVALID guard for a fresh follower today — measureHybridHeartbeatTimestamp returns INVALID_MESSAGE_TIMESTAMP while consumedFromUpstream is false — but the entire defense relies on that single guard.

If the heartbeat-INVALID early-return is ever bypassed (future refactor of the heartbeat measurement, logic bug, post-restart persistence of mismatched fields), the fast path computes (currentLag - previousLag) ≈ 0 against the donor server's clock and wrongly marks the replica RTS without any real catch-up. The replica is then served reads while reporting INVALID_HEARTBEAT_LAG (Long.MAX_VALUE) for the heartbeat-monitoring metric, which clamps into the metric's top histogram bucket and produces a sentinel artifact in any record-delay / heartbeat-delay SLI dashboard.

Bug call-chain

The fast-RTS path triggers via three different entry points into validateAndSubscribePartition. The post-blob path is the one that wrongly inherits donor state:

StoreIngestionTask.run()                                                 [SIT main loop]
  └─ LFStoreIngestionTask.checkLongRunningTaskState()
       │  // detects partitionConsumptionState.isBlobTransferInProgress() && blobFuture.isDone()
       └─ completeBlobTransferAndSubscribe(pcs)
            │  // adjustStoragePartitionWhenBlobTransferComplete commits SST files
            └─ validateAndSubscribePartition(null, freshPcs)
                 └─ checkConsumptionStateWhenStart(offsetRecord, newPcs)
                      └─ checkFastReadyToServeForReplica(newPcs)
                           │  // gate: hybrid && pcs.getReadyToServeInOffsetRecord()  ← inherited TRUE
                           └─ checkFastReadyToServeWithPreviousTimeLag(pcs)
                                │  previousMessageTimestamp    ← from donor's clock
                                │  previousCheckpointTimestamp ← from donor's clock
                                │  delta = (currentLag - previousLag) ≈ 0
                                └─ pcs.lagHasCaughtUp(); reportCompleted(pcs, true)
                                                                ↑
                                                replica marked ready_to_serve
                                                without any real catch-up

The donor's OffsetRecord lands on disk earlier than this code path ever runs, in P2PMetadataTransferHandler.updateStorePartitionMetadata:

P2PMetadataTransferHandler.handleResponse()                              [blob-transfer thread]
  └─ updateStorePartitionMetadata(storageMetadataService, transferredPartitionMetadata)
       │  OffsetRecord transferredOffsetRecord = new OffsetRecord(transferred bytes)
       └─ storageMetadataService.put(topic, partition, transferredOffsetRecord)  ← donor record persisted verbatim
            └─ StorageEngineMetadataService.put → storageEngine.putPartitionOffset

A JVM crash anywhere between the put above and the SIT-side post-blob clears = restart reads the donor's record and re-triggers fast-RTS via the same gate.

Fix

Close every observable persistence point of the donor's OffsetRecord:

  1. At the blob-transfer entry point (the disk window). P2PMetadataTransferHandler.updateStorePartitionMetadata calls OffsetRecord.clearInheritedDonorState(transferredOffsetRecord) immediately after deserializing the wire bytes, so storageMetadataService.put never sees donor-clock values. Crash-safe across the entire SIT-side post-blob sequence.

  2. In-memory at the SIT post-blob path (the same-cycle window). completePostTransferPSCUpdated calls OffsetRecord.clearInheritedDonorState(newPcs.getOffsetRecord()) on the record reloaded from storage, so the in-memory PCS used by the immediately-following checkFastReadyToServeForReplica sees cleared values regardless of future changes to the disk-side path.

  3. syncOffset(pcs, updateMetadataLag) overload so the post-blob persist path can preserve our cleared timestamps verbatim. The default syncOffset(pcs) keeps current behavior (updateMetadataLag=true); the post-blob path passes false to skip the updateOffsetLagInMetadata re-measurement that would otherwise overwrite lastCheckpointTimestamp with now() and weaken the second early-return guard.

  4. OffsetRecord.clearInheritedDonorState(OffsetRecord) is the single authoritative place to extend whenever a new field is added that is unsafe to inherit from a donor host. Today it clears previouslyReadyToServe, heartbeatTimestamp, lastCheckpointTimestamp, and offsetLag. OffsetRecord.PREVIOUSLY_READY_TO_SERVE_KEY is now a public constant; PartitionConsumptionState's private alias is kept for brevity and now references it.

Result: both early-return guards in checkFastReadyToServeWithPreviousTimeLag (heartbeat AND checkpoint) are armed for any post-blob replica at every observable persistence point. Even if a future change weakens one of them, the other still bails out.

Log evidence

Observed on a hybrid AA store (<store>) on a follower replica (<host>) that had just been assigned partitions via Helix LOAD_REBALANCE and bootstrapped via blob transfer. Names sanitized; the timing and structure are real.

05:20:51.92  Helix state-transition: <host>, partition <store>_v<N>_<P>
             received state transition from OFFLINE to STANDBY on session ...
05:20:51.92  HYBRID store current version replica <store>_v<N>-<P>
             initiating transition from OFFLINE to STANDBY ... ST_REBALANCE_TYPE=LOAD_REBALANCE
05:20:59.35  Successfully bootstrapped from blob transfer <store>_v<N>-<P>
             with files: [000390.sst, 000388.sst, ..., MANIFEST-..., CURRENT, OPTIONS-..., ...log]
05:20:59.36  Post-blob-transfer PCS reinitialized for replica: <store>_v<N>-<P> at position: ...
             PCS: PCS{replica=<store>_v<N>-<P>, hybrid=true,
                       latestProcessedVtPosition=Xc{...offset=6334237...},
                       offsetRecord=OffsetRecord{...,
                           heartbeatTimestamp=1777862715170,        ← from donor's clock
                           lastCheckpointTimestamp=1777862722903,   ← from donor's clock
                           ...},
                       leaderFollowerState=STANDBY,
                       leaderCompleteState=LEADER_NOT_COMPLETED,
                       lastLeaderCompleteStateUpdateInMs=0,
                       blobTransferPending=false}
05:20:59.37  Reported STARTED for replica: <store>_v<N>-<P>
05:20:59.37  [Heartbeat lag] replica: <store>_v<N>-<P> is lagging.
             Lag: [9223372036854775807] > Threshold [120000].          ← Long.MAX_VALUE
             Leader Complete State: {LEADER_NOT_COMPLETED}, Last update In Ms: {0}.
05:20:59.53  LeaderCompleteState for replica: <store>_v<N>-<P> changed from
             LEADER_NOT_COMPLETED to LEADER_COMPLETED                  ← ~150ms later, but already RTS

The Long.MAX_VALUE sample lands in the OTel histogram's top bucket and shows up as a P99 spike of ~8.8M ms (~2.4h) on Venice.Server.Ingestion.Replication.{Record,Heartbeat}.Delay — even with Venice.Replica.State=ready_to_serve AND Venice.Version.Role=current filters applied. The replica is structurally RTS via the fast path before it has consumed a single HB.

This change ensures the fast path cannot be entered post-blob regardless of which guard the bug hits.

Testing Done

  • Existing SITFastReadyToServeTest continues to pass — non-blob callers see no behavior change (default syncOffset(pcs) keeps updateMetadataLag=true).
  • Add unit coverage asserting that after P2PMetadataTransferHandler.updateStorePartitionMetadata, the OffsetRecord persisted to storageMetadataService has heartbeatTimestamp == -1, lastCheckpointTimestamp == -1, offsetLag == DEFAULT_OFFSET_LAG, and the previouslyReadyToServe previousStatuses entry is absent — proves the disk-side window is closed.
  • Add unit coverage asserting that completePostTransferPSCUpdated on a freshly reloaded PCS leaves the in-memory OffsetRecord with the same cleared fields after syncOffset(pcs, false) returns — proves the SIT-side window is closed.
  • Add unit coverage asserting checkFastReadyToServeWithPreviousTimeLag returns false when both heartbeatTimestamp == -1 and lastCheckpointTimestamp == -1, even with previouslyReadyToServe=true artificially set on the OffsetRecord — proves the second guard is armed.

Copilot AI review requested due to automatic review settings May 5, 2026 18:29
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR hardens the Da Vinci/Server post–blob-transfer reinitialization flow to prevent inherited (donor) lag metadata in OffsetRecord from incorrectly enabling the fast ready-to-serve (RTS) path after blob bootstrap, and ensures the cleared state is persisted before it is logged.

Changes:

  • In completePostTransferPSCUpdated, explicitly invalidate inherited heartbeatTimestamp, lastCheckpointTimestamp, and offsetLag, then persist the updated OffsetRecord before logging.
  • Add a syncOffset(PartitionConsumptionState pcs, boolean updateMetadataLag) overload to optionally skip updateOffsetLagInMetadata(...) so explicitly-cleared lag fields can be persisted verbatim.
  • Gate the lag-metadata refresh inside syncOffset(...) behind the new updateMetadataLag flag.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copilot AI review requested due to automatic review settings May 5, 2026 18:49
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

Comments suppressed due to low confidence (1)

clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PMetadataTransferHandler.java:131

  • Clearing inherited donor state before persisting the transferred OffsetRecord is critical to avoid re-triggering fast RTS after a crash, but there’s no unit coverage validating that the stored OffsetRecord actually has the cleared heartbeatTimestamp/lastCheckpointTimestamp/offsetLag and previouslyReadyToServe removed. Please add a focused unit test for updateStorePartitionMetadata that captures the StorageMetadataService.put() argument and asserts the cleared values are persisted.
        Utils.getReplicaId(transferredPartitionMetadata.topicName, transferredPartitionMetadata.partitionId));
    OffsetRecord transferredOffsetRecord =
        new OffsetRecord(transferredPartitionMetadata.offsetRecord.array(), partitionStateSerializer, null);

    /*
     * Clear inherited donor state before persisting. If donor-clock fields reach disk verbatim, a
     * JVM crash between this put() and the post-blob sync on the SIT thread leaves a state where
     * restart reads donor values and re-triggers fast RTS via checkFastReadyToServeWithPreviousTimeLag.
     * Clearing at the entry point closes that crash window regardless of where the SIT thread is.
     */
    OffsetRecord.clearInheritedDonorState(transferredOffsetRecord);

    // 1. update the offset incremental push job information
    updateIncrementalPushInfoToStore(transferredOffsetRecord, transferredPartitionMetadata);
    // 2. update the offset record in storage service
    storageMetadataService
        .put(transferredPartitionMetadata.topicName, transferredPartitionMetadata.partitionId, transferredOffsetRecord);
    // 3. update the metadata SVS

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

sushantmane added a commit to sushantmane/venice that referenced this pull request May 5, 2026
wrapper, add unit tests

Three review-driven changes, all in response to copilot-pull-request-reviewer
feedback on PR linkedin#2773:

1. clearInheritedDonorState no longer touches heartbeatTimestamp. Setting it
   to INVALID_MESSAGE_TIMESTAMP (-1) interacts badly with
   BlobTransferIngestionHelper.isReplicaLaggedAndNeedBlobTransfer, which
   computes the time-lag eligibility check as
   (now() - offsetRecord.getHeartbeatTimestamp()). With heartbeatTimestamp =
-1
   this always exceeds blobTransferDisabledTimeLagThresholdInMinutes and would
   re-trigger blob transfer on every restart until a real heartbeat is
   consumed and checkpointed. The fast-RTS guard on lastCheckpointTimestamp
   alone is sufficient to block the wrongful RTS path -- the donor's
   heartbeatTimestamp is left intact so eligibility checks see a recent value.

2. Replace the boolean-literal call site syncOffset(newPcs, false) with a
   named wrapper syncOffsetPreservingLagMetadata(pcs). The boolean overload
   is renamed to syncOffsetInternal(pcs, updateMetadataLag) and is no longer
   the public-facing API; both callers go through one of the two named
   methods so the metadata-lag handling is explicit at every call site.

3. Add unit tests in TestOffsetRecord covering:
     * clearPreviouslyReadyToServe removes the gate flag.
     * clearInheritedDonorState clears the gate flag, lastCheckpointTimestamp,
       and offsetLag, but leaves heartbeatTimestamp at the donor value.
     * clearInheritedDonorState is idempotent on a default OffsetRecord.

No behavior change for any non-blob caller.
Copilot AI review requested due to automatic review settings May 5, 2026 20:30
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

anchorTimestamp = offsetRecord.getDonorHeartbeatTimestampMs();
anchorSource = "donorHeartbeatTimestampMs";
}
if (anchorTimestamp > 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it expected to skip blob transfer for a fresh replica where heartbeat timestamp and donor timestamp is -1? For a new replica, offset record will be initialized with default values -1 and it will also not pass the offset lag check for blob transfer

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — let me trace the path explicitly:

For a brand-new replica that has never been through ingestion, the empty OffsetRecord (constructed via getEmptyPartitionState) reports:

  • heartbeatTimestamp = -1L (INVALID_MESSAGE_TIMESTAMP)
  • donorHeartbeatTimestampMs = -1L (now explicitly initialized in 28925d5 — was 0 before, which is the bug you spotted: 0 would slip past my > 0 capture guard. Fixed.)
  • offsetLag = 0 (LOWEST_OFFSET_LAG)
  • checkpointedLocalVtPosition = EARLIEST

Walking the method:

  1. Time-lag check (lines 153–187): heartbeatTimestamp == INVALID → fall back to donorHeartbeatTimestampMs → also -1anchorTimestamp > 0 is false → fall through to offset-lag check.

  2. Offset-lag check (lines 188–190): offsetLag == 0 && EARLIEST.equals(checkpointedLocalVtPosition) is the explicit fresh-replica detector — and it returns true (yes, blob transfer is needed).

So the fall-through does land on the right answer for a fresh replica. The path you flagged works because the offset-lag layer has the existing fresh-replica branch. My time-lag layer adds a path before offset-lag for replicas that have ingested before but lost their own heartbeats (post-blob); it deliberately delegates to the offset-lag layer for the no-history case via the > 0 guard.

The donor-init-to--1 fix is in 28925d5 — that closes the latent 0-vs--1 ambiguity you implied. Pinned the contract via testFreshOffsetRecordHasDonorHeartbeatTimestampMsAtSentinel.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't get it, looks like for new or lagged replica this can't be the case. Can we get some concrete example?

}

@Test
public void testShouldStartBlobTransferHybridStoreUsesDonorHeartbeatFallbackPostBlob() throws InterruptedException {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add an integration test to verify fix works E2E?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filing as a separate follow-up rather than holding this PR for it. Reasoning:

The crash-restart window this PR closes is hard to construct deterministically inside an integration test — you need a JVM kill at the precise instant between P2PMetadataTransferHandler.updateStorePartitionMetadata's storageMetadataService.put(...) and the SIT-side post-blob clear, then a restart that re-reads the donor record. Even with a coordinated injection point, the wall-clock delta between donor and recipient is ms-scale, so any non-determinism in test scheduling masks the regression.

What we can assert end-to-end without inducing a crash is the "normal" post-blob path:

  • After blob-transfer completes, the persisted OffsetRecord on disk has heartbeatTimestamp == -1, lastCheckpointTimestamp == -1, offsetLag == DEFAULT_OFFSET_LAG, and the previouslyReadyToServe map entry is absent.
  • The donor's heartbeat ts is preserved as donorHeartbeatTimestampMs.
  • The replica is not prematurely marked READY_TO_SERVE before consuming its own heartbeat.

That's what the unit tests in this PR (testClearInheritedDonorState, testClearInheritedDonorStateIsReentrantSafeOnAlreadyClearedRecord, testShouldStartBlobTransferHybridStoreUsesDonorHeartbeatFallbackPostBlob) already pin down.

For a real E2E test, the most valuable shape would be a multi-server cluster where one replica is bootstrapped via blob transfer from another, and we assert the post-blob OffsetRecord on disk has the cleared sentinels — which exercises both the P2P-thread path and the SIT-thread path. I'll open a follow-up jira / PR for that integration test once the schema bumps and the consumer code in this PR have been in production for a release cycle (so we know the schema is stable). Sound good?

Copilot AI review requested due to automatic review settings May 8, 2026 22:00
sushantmane added a commit to sushantmane/venice that referenced this pull request May 8, 2026
wrapper, add unit tests

Three review-driven changes, all in response to copilot-pull-request-reviewer
feedback on PR linkedin#2773:

1. clearInheritedDonorState no longer touches heartbeatTimestamp. Setting it
   to INVALID_MESSAGE_TIMESTAMP (-1) interacts badly with
   BlobTransferIngestionHelper.isReplicaLaggedAndNeedBlobTransfer, which
   computes the time-lag eligibility check as
   (now() - offsetRecord.getHeartbeatTimestamp()). With heartbeatTimestamp =
-1
   this always exceeds blobTransferDisabledTimeLagThresholdInMinutes and would
   re-trigger blob transfer on every restart until a real heartbeat is
   consumed and checkpointed. The fast-RTS guard on lastCheckpointTimestamp
   alone is sufficient to block the wrongful RTS path -- the donor's
   heartbeatTimestamp is left intact so eligibility checks see a recent value.

2. Replace the boolean-literal call site syncOffset(newPcs, false) with a
   named wrapper syncOffsetPreservingLagMetadata(pcs). The boolean overload
   is renamed to syncOffsetInternal(pcs, updateMetadataLag) and is no longer
   the public-facing API; both callers go through one of the two named
   methods so the metadata-lag handling is explicit at every call site.

3. Add unit tests in TestOffsetRecord covering:
     * clearPreviouslyReadyToServe removes the gate flag.
     * clearInheritedDonorState clears the gate flag, lastCheckpointTimestamp,
       and offsetLag, but leaves heartbeatTimestamp at the donor value.
     * clearInheritedDonorState is idempotent on a default OffsetRecord.

No behavior change for any non-blob caller.
@sushantmane sushantmane force-pushed the sumane/post-blob-clear-stale-fast-rts-inputs branch from c8a6db3 to 2a13639 Compare May 8, 2026 22:00
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 10 changed files in this pull request and generated 3 comments.

sushantmane added a commit to sushantmane/venice that referenced this pull request May 8, 2026
…ization + donor

sentinel init

Three changes driven by the post-rebase Copilot review:

1. clearPreviousStatusesEntry / getPreviousStatusesEntry now compare via
   toString() so they normalize across CharSequence subtypes. Avro
   deserialization yields Utf8 keys, but in-memory test/production code
   may put entries with String literals directly. Utf8.equals(String) is
   false, so a vanilla map.remove(Utf8) would miss a String-keyed entry —
   a real cross-type bug latent in the codebase (StoreIngestionTaskTest's
   testReadyToServePartitionValidateIngestionSuccessWithPriorState
   already mixes the two).

2. donorHeartbeatTimestampMs is explicitly initialized to -1L in
   OffsetRecord.getEmptyPartitionState. Java's default for `long` is 0;
   the documented sentinel (and Avro schema default applied during
   deserialization) is -1L = "not inherited from a donor". Without the
   explicit init, a fresh in-memory record exposed 0 — which the
   isReplicaLaggedAndNeedBlobTransfer fallback could observe as a "real"
   anchor and skew the elapsed-time math.

3. Two new tests in TestOffsetRecord:
   - testFreshOffsetRecordHasDonorHeartbeatTimestampMsAtSentinel pins
     down the sentinel-at-construction contract.
   - testClearPreviouslyReadyToServeRemovesEntryWrittenWithStringKey
     mirrors the StoreIngestionTaskTest String-keyed call shape and
     asserts both clear and getter resolve the entry regardless of which
     CharSequence subtype it was written with.

Updated testClearInheritedDonorStateIsIdempotent to expect -1L for the
donor field on a default record (was 0L).

Verified locally: TestOffsetRecord (full suite, 6 relevant tests pass);
SITWithPWiseAndBufferAfterLeaderTest.testReadyToServePartitionValidateIngestio
nSuccessWithPriorState
(the load-bearing String-key call site) passes for both AA_ON / AA_OFF.
Copilot AI review requested due to automatic review settings May 8, 2026 23:15
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 10 changed files in this pull request and generated no new comments.

sushantmane added 10 commits May 8, 2026 19:28
…r and

persist before logging

After PR linkedin#2715 cleared the previouslyReadyToServe gate flag inherited from the
blob-transfer source host, the OffsetRecord still carried the donor server's
heartbeatTimestamp and lastCheckpointTimestamp.
checkFastReadyToServeWithPreviousTimeLag
would correctly bail out at the heartbeat-INVALID guard for a fresh follower
(since
measureHybridHeartbeatTimestamp returns INVALID_MESSAGE_TIMESTAMP while
consumedFromUpstream is false), but if that single guard were ever bypassed
-- by a
future refactor of the heartbeat measurement, a logic bug, or post-restart
persistence
of mismatched fields -- the fast path would compute (currentLag -
previousLag) ~= 0
against the donor's clock and wrongly mark the replica RTS without any real
catch-up.

This change closes both early-return guards in
checkFastReadyToServeWithPreviousTimeLag
(heartbeat AND checkpoint) for any post-blob replica:

  * In completePostTransferPSCUpdated, explicitly set heartbeatTimestamp,
    lastCheckpointTimestamp, and offsetLag to INVALID_MESSAGE_TIMESTAMP /
    DEFAULT_OFFSET_LAG on the reinitialized PCS. These fields belong to the
    donor server's ingestion clock and must not be read as authoritative for
    this replica.

  * Add a syncOffset(pcs, updateMetadataLag) overload. The post-blob path
calls
    syncOffset(newPcs, false) to persist the cleared OffsetRecord verbatim, so
    a JVM bounce immediately after blob bootstrap (and before any HB has been
    consumed on the new ingestion path) cannot re-trigger fast RTS. The
default
    syncOffset(pcs) preserves existing behavior for all other callers.

  * The persist-then-log ordering also makes the "Post-blob-transfer PCS
    reinitialized" log reflect on-disk state, not just the in-memory snapshot,
    which simplifies post-incident investigations.

Belt-and-suspenders alongside the gate flag clear from linkedin#2715: even if the gate
is somehow set true again, the heartbeat AND checkpoint INVALID values
together
make checkFastReadyToServeWithPreviousTimeLag structurally bail out.

Testing Done
* Existing SITFastReadyToServeTest (unchanged behavior for non-blob callers).
* Manually verified the diff via git diff and confirmed the no-overwrite path
  through syncOffset(pcs, false) reaches storageMetadataService.put with the
  intended INVALID values intact.
No functional change. Project convention is to use /* */ block form for
multi-line explanatory comments; one-liner // is unchanged.
…point

Closes the crash window between when the donor's OffsetRecord lands on disk
and when the SIT thread runs the post-blob clears. Previously, the donor's
record was persisted by
P2PMetadataTransferHandler.updateStorePartitionMetadata
in full -- including previouslyReadyToServe, heartbeatTimestamp,
lastCheckpointTimestamp, and offsetLag -- and only later got cleared by
completePostTransferPSCUpdated on the SIT thread. A JVM crash anywhere in
between meant restart would read the donor's values and re-trigger fast-RTS
via checkFastReadyToServeWithPreviousTimeLag.

Sanitize the inherited record at the entry point, before any disk write:

  * P2PMetadataTransferHandler.updateStorePartitionMetadata clears
    previouslyReadyToServe and sets heartbeatTimestamp /
lastCheckpointTimestamp
    / offsetLag to INVALID_MESSAGE_TIMESTAMP / DEFAULT_OFFSET_LAG immediately
    after deserializing the transferred bytes, so storageMetadataService.put
    never sees donor-clock values.

  * OffsetRecord exposes PREVIOUSLY_READY_TO_SERVE_KEY as a public constant
    and a clearPreviouslyReadyToServe() convenience helper, so callers in
    other packages (here: blobtransfer.client) can clear the flag without
    depending on the davinci-internal constant.

  * PartitionConsumptionState's private alias now points to the public
    constant instead of duplicating the Utf8 literal; the unused Utf8 import
    is removed.

Together with the SIT-side clears in completePostTransferPSCUpdated and the
syncOffset(pcs, false) variant from earlier in this PR, both early-return
guards in checkFastReadyToServeWithPreviousTimeLag (heartbeat AND checkpoint)
are armed for any post-blob replica at every observable persistence point.

Testing Done
* Verified Utf8 import is no longer needed in PartitionConsumptionState; no
  other reference remains.
* Verified the public constant is reachable from the blobtransfer.client
  package without a circular dependency.
OffsetRecord.clearInheritedDonorState

Replace the inline 4-line clear blocks in P2PMetadataTransferHandler and
StoreIngestionTask.completePostTransferPSCUpdated with a single static
helper OffsetRecord.clearInheritedDonorState(record). The helper is the
authoritative spot to extend whenever a new field is added that is unsafe
to inherit verbatim from the donor host -- callers don't need to remember
the full list each time.

Static (rather than instance) because the call site reads more naturally
as "clear inherited state on this record" and avoids the awkward
`getOffsetRecord().sanitizeAfterBlobTransfer()` getter dance when the
OffsetRecord lives inside another object (PCS).

Both call sites now invoke it before any disk persistence:

  * P2PMetadataTransferHandler.updateStorePartitionMetadata clears the
    inherited record before storageMetadataService.put.
  * StoreIngestionTask.completePostTransferPSCUpdated clears the in-memory
    record reloaded from storage before syncOffset(pcs, false).

No behavior change vs. the prior commit; this is a pure refactor.
wrapper, add unit tests

Three review-driven changes, all in response to copilot-pull-request-reviewer
feedback on PR linkedin#2773:

1. clearInheritedDonorState no longer touches heartbeatTimestamp. Setting it
   to INVALID_MESSAGE_TIMESTAMP (-1) interacts badly with
   BlobTransferIngestionHelper.isReplicaLaggedAndNeedBlobTransfer, which
   computes the time-lag eligibility check as
   (now() - offsetRecord.getHeartbeatTimestamp()). With heartbeatTimestamp =
-1
   this always exceeds blobTransferDisabledTimeLagThresholdInMinutes and would
   re-trigger blob transfer on every restart until a real heartbeat is
   consumed and checkpointed. The fast-RTS guard on lastCheckpointTimestamp
   alone is sufficient to block the wrongful RTS path -- the donor's
   heartbeatTimestamp is left intact so eligibility checks see a recent value.

2. Replace the boolean-literal call site syncOffset(newPcs, false) with a
   named wrapper syncOffsetPreservingLagMetadata(pcs). The boolean overload
   is renamed to syncOffsetInternal(pcs, updateMetadataLag) and is no longer
   the public-facing API; both callers go through one of the two named
   methods so the metadata-lag handling is explicit at every call site.

3. Add unit tests in TestOffsetRecord covering:
     * clearPreviouslyReadyToServe removes the gate flag.
     * clearInheritedDonorState clears the gate flag, lastCheckpointTimestamp,
       and offsetLag, but leaves heartbeatTimestamp at the donor value.
     * clearInheritedDonorState is idempotent on a default OffsetRecord.

No behavior change for any non-blob caller.
blob-transfer eligibility

Treat the donor's heartbeatTimestamp as untrusted on this replica (it is the
root-cause
input to the fast-RTS misfire we are fixing) and absorb the consequence at
the only
reader that cared about it.

  * OffsetRecord.clearInheritedDonorState now also resets heartbeatTimestamp
= -1, so both
    early-return guards in checkFastReadyToServeWithPreviousTimeLag are armed
for any
    post-blob replica regardless of which guard a future change might weaken.

  * BlobTransferIngestionHelper.isReplicaLaggedAndNeedBlobTransfer skips the
time-lag
    branch when heartbeatTimestamp == INVALID_MESSAGE_TIMESTAMP and falls
through to the
    existing offset-lag fallback. With offsetLag = DEFAULT_OFFSET_LAG (-1,
also cleared by
    sanitize) and a real localVtPosition (SST data on disk from blob
bootstrap, not
    EARLIEST), the fallback correctly classifies this state as "caught up — no
blob
    transfer", preventing the spurious-retransfer regression that a naive
    (now - INVALID_MESSAGE_TIMESTAMP) computation would cause.

  * Brand-new replicas (offsetLag == 0 && localVtPosition == EARLIEST) still
take the
    blob-transfer path via the existing offset-lag short-circuit; no
regression there.

  * Once a real heartbeat is consumed and a regular syncOffset(pcs) runs (NOT
the
    syncOffsetPreservingLagMetadata variant used post-blob),
updateOffsetLagInMetadata
    overwrites heartbeatTimestamp with a real measured value and the time-lag
check
    resumes its normal behavior. The cleared INVALID state self-stabilizes.

Tests:
  * TestOffsetRecord asserts heartbeatTimestamp = -1 after
clearInheritedDonorState (was
    asserting "intact at donor's value" in the previous commit; reverted to
match the new
    contract).
  * LeaderFollowerStoreIngestionTaskTest adds

testShouldStartBlobTransferHybridStoreInvalidHeartbeatFallsThroughToOffsetLag
asserting
    that an INVALID heartbeatTimestamp + cleared offsetLag + real
localVtPosition → no blob
    transfer (guard against the spurious-retransfer regression).

This supersedes the "keep heartbeatTimestamp intact" approach from the
previous commit.
… for

post-blob freshness

Bumps PartitionState schema to v23, adding donorHeartbeatTimestampMs as a
freshness anchor
that explicitly carries the donor's wall-clock for blob-transfer eligibility.
This separates
the donor-clock value (used for "is the data fresh?") from heartbeatTimestamp
(which is now
strictly the THIS-server-observed HB ts, used by fast-RTS lag computation).
Both fast-RTS
guards in checkFastReadyToServeWithPreviousTimeLag remain armed for any
post-blob replica.

Mechanics:

  * PartitionState.avsc v23 adds donorHeartbeatTimestampMs (long, default -1).
    AvroProtocolDefinition.PARTITION_STATE bumps to currentProtocolVersion =
23.

  * OffsetRecord exposes getDonorHeartbeatTimestampMs /
setDonorHeartbeatTimestampMs
    wrappers, and clearInheritedDonorState now captures the inherited
heartbeatTimestamp
    into the donor field BEFORE clearing it. The capture guard is "> 0" so:
      * fresh records (heartbeatTimestamp = 0, the Java long default) → no ca
pture
      * already-cleared records (heartbeatTimestamp = -1) → no capture (r
e-entrant safe)
      * donor records (heartbeatTimestamp = real wall-clock ms) → capture an
d clear
    Re-running on the in-memory record reloaded from disk by the SIT-side
path is
    therefore a no-op for the donor field — the value persisted by the ea
rlier P2P-side
    call is preserved.

  * BlobTransferIngestionHelper.isReplicaLaggedAndNeedBlobTransfer falls back
from
    heartbeatTimestamp to donorHeartbeatTimestampMs when the former is
INVALID. The
    threshold comparison uses whichever anchor is available, with
anchorSource logged
    explicitly. If neither is set (truly fresh replica with no inherited
donor state),
    falls through to the existing offset-lag check.

Why this is better than the previous "fall through to offset-lag" approach:
  * Preserves time-lag-based blob-transfer eligibility post-blob — the op
erator's
    threshold tuning (blobTransferDisabledTimeLagThresholdInMinutes) remains
effective.
  * Trust separation is explicit in the schema. Future readers can tell apart
"this
    replica observed an HB at this time" from "the donor observed an HB at
this time".
  * Once the replica catches up and a regular syncOffset overwrites
heartbeatTimestamp
    with a real measured value, the donor field is naturally ignored — el
igibility
    prefers the THIS-server value.

Tests:
  * TestOffsetRecord.testClearInheritedDonorState now asserts donor's HB ts
is captured
    into donorHeartbeatTimestampMs while the standard heartbeatTimestamp is
cleared to -1.
  *
TestOffsetRecord.testClearInheritedDonorStateIsReentrantSafeOnAlreadyClearedRe
cord
    asserts that a second invocation does not overwrite the captured donor
value.
  *
LeaderFollowerStoreIngestionTaskTest.testShouldStartBlobTransferHybridStoreUse
sDonor-
    HeartbeatFallbackPostBlob verifies the eligibility fallback path on the
post-blob
    state: heartbeatTimestamp = INVALID + donorHeartbeatTimestampMs = recent
→ no blob
    transfer.
  *
LeaderFollowerStoreIngestionTaskTest.testShouldStartBlobTransferHybridStoreNoH
eart-
    beatAnchorFallsThroughToOffsetLag covers the pathological case where
neither anchor
    is set.

Verified locally: :internal:venice-common:test --tests TestOffsetRecord and
:clients:da-vinci-client:test --tests testShouldStartBlobTransfer* all pass.

Supersedes the heartbeatTimestamp-cleared-and-fall-through-to-offset-lag
approach from
the previous commit. Both early-return guards in fast-RTS lag computation
remain armed,
AND time-lag-based blob-transfer eligibility is preserved.
Main landed PR linkedin#2776 (PartitionState v23 with batchPushRecordCount, pinned
to v22 until prc record-count consumer code lands), so my donor-clock
freshness field can no longer claim v23. Move it to a new v24 that carries
v23 + donorHeartbeatTimestampMs together.

Bumps the PartitionState avro override from v22 → v24 so consumer code in
this PR can read/write the new field. v23's batchPushRecordCount is
exposed as a side effect, but no consumer reads/writes it yet — existing
data sees default 0 and behaves unchanged. The prc record-count team's
follow-up PR will replace this override (or remove it once v24 is the
real latest) when their consumer code lands.
…ization + donor

sentinel init

Three changes driven by the post-rebase Copilot review:

1. clearPreviousStatusesEntry / getPreviousStatusesEntry now compare via
   toString() so they normalize across CharSequence subtypes. Avro
   deserialization yields Utf8 keys, but in-memory test/production code
   may put entries with String literals directly. Utf8.equals(String) is
   false, so a vanilla map.remove(Utf8) would miss a String-keyed entry —
   a real cross-type bug latent in the codebase (StoreIngestionTaskTest's
   testReadyToServePartitionValidateIngestionSuccessWithPriorState
   already mixes the two).

2. donorHeartbeatTimestampMs is explicitly initialized to -1L in
   OffsetRecord.getEmptyPartitionState. Java's default for `long` is 0;
   the documented sentinel (and Avro schema default applied during
   deserialization) is -1L = "not inherited from a donor". Without the
   explicit init, a fresh in-memory record exposed 0 — which the
   isReplicaLaggedAndNeedBlobTransfer fallback could observe as a "real"
   anchor and skew the elapsed-time math.

3. Two new tests in TestOffsetRecord:
   - testFreshOffsetRecordHasDonorHeartbeatTimestampMsAtSentinel pins
     down the sentinel-at-construction contract.
   - testClearPreviouslyReadyToServeRemovesEntryWrittenWithStringKey
     mirrors the StoreIngestionTaskTest String-keyed call shape and
     asserts both clear and getter resolve the entry regardless of which
     CharSequence subtype it was written with.

Updated testClearInheritedDonorStateIsIdempotent to expect -1L for the
donor field on a default record (was 0L).

Verified locally: TestOffsetRecord (full suite, 6 relevant tests pass);
SITWithPWiseAndBufferAfterLeaderTest.testReadyToServePartitionValidateIngestio
nSuccessWithPriorState
(the load-bearing String-key call site) passes for both AA_ON / AA_OFF.
…in-place

Earlier I bumped to v24 to avoid colliding with PR linkedin#2776's v23
(batchPushRecordCount), but v23 has no production data — PR linkedin#2776 pinned
the avro override at v22 because its consumer code wasn't ready, so v23
has only ever existed as a schema file. That makes it safe to extend
v23 in-place with one more additive field instead of carrying an unused
intermediate version.

Result:
  v22 (active wire format today)
   └─ v23 = batchPushRecordCount (PR linkedin#2776) + donorHeartbeatTimestampMs (thi
s PR)

Avro override bumps from v22 → v23. AvroProtocolDefinition.PARTITION_STATE
goes from (24, 24) to (24, 23). v24 directory deleted.

The prc record-count team's follow-up PR no longer needs to coordinate a
v24/v23 swap — they just remove the override (or land their consumer
code on v23 alongside this PR's). batchPushRecordCount is now part of
the active schema; default 0 means replicas behave unchanged until the
prc consumer code reads/writes it.

Verified: TestOffsetRecord (all pass), LeaderFollowerStoreIngestionTaskTest
blob-transfer suite (all 7 pass), reportIfCatchUpVersionTopicOffset and
testReadyToServePartitionValidateIngestionSuccessWithPriorState (pass for
AA_ON / AA_OFF). Generated PartitionState class carries both fields.
The previous commit pinned PartitionState to v23 in versionOverrides,
but the override only matters when forcing a different version than the
real on-disk maximum. Since v23 is now the natural latest version and the
consumer code in this PR is ready to use it, the override is a no-op:
compileAvro picks v23 by version-number ordering with or without the
override entry.

Drop the entry; leave a one-line note pointing future readers at the
non-override path.
Copilot AI review requested due to automatic review settings May 9, 2026 03:34
@sushantmane sushantmane force-pushed the sumane/post-blob-clear-stale-fast-rts-inputs branch from b0600e4 to de3323a Compare May 9, 2026 03:34
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 10 changed files in this pull request and generated 1 comment.

Comment on lines +185 to +190
* <p>Specifically:
* <ul>
* <li>If {@code heartbeatTimestamp} is a real value (not {@code -1L}), copy it into
* {@code donorHeartbeatTimestampMs} before clearing. This preserves the wall-clock
* freshness signal so {@code BlobTransferIngestionHelper.isReplicaLaggedAndNeedBlobTransfer}
* can use it as a fallback to avoid spurious blob retransfers on restart.</li>
"doc": "Number of data records (PUT/DELETE, excluding chunk fragments) ingested in this partition before End-of-Push. Used for end-to-end record count verification against the producer-side count carried on the EOP message's 'prc' PubSub header. Persisted across server restarts.",
"type": "long",
"default": 0
},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to reuse the same v23 before anyone use it, let's still separate a PR with [schema] with this schema change only to keep up with existing case to avoid any confusion. Also this PR let's add one [compact] tag for oncall and any AI bot triage awareness, I hope it can detect it and ask for registration of PS

* either key type resolves to the same logical entry.
*/
String target = key.toString();
for (Map.Entry<CharSequence, CharSequence> entry: partitionState.getPreviousStatuses().entrySet()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like correct but over-engineered code. Do we really need it?
Also if existing one is persisted in utf8 and new one write string in java, there is no guarantee that which one will get returned.

anchorTimestamp = offsetRecord.getDonorHeartbeatTimestampMs();
anchorSource = "donorHeartbeatTimestampMs";
}
if (anchorTimestamp > 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't get it, looks like for new or lagged replica this can't be the case. Can we get some concrete example?

* (downstream of {@code venice-common}).
*/
public static void clearInheritedDonorState(OffsetRecord record) {
long inheritedHeartbeatTimestamp = record.getHeartbeatTimestamp();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a general question - if the intention is to avoid the crash restart, I think we can just evict the existing thing and force sync metadata(which your PR already do it) without adding a new field. wdyt?

@sushantmane sushantmane added the requested-changes Reviewer requested changes label May 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

requested-changes Reviewer requested changes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants