Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions pkg/mediorum/crudr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,43 @@ func (p *PeerClient) doSweep(ctx context.Context) error {
return fmt.Errorf("bad status: %d", resp.StatusCode)
}

// Retention-gap signal: the peer has dropped ops below our cursor and
// is advertising the lowest ulid it still has. Surface the gap, then
// explicitly advance our cursor to that floor. Without this branch,
// the old code path would silently skip the gap (Topic 7) because the
// "ulid > after" query already returns ops above the floor and the
// client would treat the response as a normal sweep result.
//
// Validate the advertised ulid before trusting it: a hostile or
// misconfigured peer that emits a forged future ulid could otherwise
// permanently silence this sweep stream by jumping our cursor past
// every legitimate op. Require the ulid to parse cleanly and decode
// to a time at or before the local wall clock (with a generous skew
// window).
gapMinULID := resp.Header.Get(HeaderAvailableMin)
if resp.Header.Get(HeaderRetentionGap) == "true" && gapMinULID != "" && gapMinULID > lastUlid && isValidGapULID(gapMinULID) {
p.logger.Warn("retention gap detected: peer cursor below peer's available history; advancing cursor across gap",
"peer", host,
"local_cursor", lastUlid,
"peer_available_min_ulid", gapMinULID)
// Persist the advanced cursor before applying any ops so a
// crash mid-apply doesn't leave us stuck below the gap.
// Only count the gap-advance toward the operator metric if the
// cursor actually got persisted; otherwise the counter would
// over-report relative to durable state.
upsertClause := clause.OnConflict{UpdateAll: true}
if dbErr := p.crudr.DB.Clauses(upsertClause).Create(&Cursor{Host: host, LastULID: gapMinULID}).Error; dbErr != nil {
p.logger.Error("failed to advance cursor across retention gap", "err", dbErr)
} else {
lastUlid = gapMinULID
MarkSweepGapAdvance()
}
} else if resp.Header.Get(HeaderRetentionGap) == "true" && gapMinULID != "" && !isValidGapULID(gapMinULID) {
p.logger.Warn("ignoring retention-gap header with invalid ulid",
"peer", host,
"advertised_ulid", gapMinULID)
}

var ops []*Op
dec := json.NewDecoder(resp.Body)
err = dec.Decode(&ops)
Expand Down
Loading