Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 50 additions & 50 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ type dispatcherStat struct {

resolvedTs atomic.Uint64
// the max ts of events which is not needed by this dispatcher
checkpointTs uint64
checkpointTs atomic.Uint64
// the difference between `subStat`, `pendingSubStat` and `removingSubStat`:
// 1) if there is no existing subscriptions which can be reused,
// or there is a existing subscription with exact span match,
Expand Down Expand Up @@ -498,8 +498,8 @@ func (e *eventStore) RegisterDispatcher(
stat := &dispatcherStat{
dispatcherID: dispatcherID,
tableSpan: dispatcherSpan,
checkpointTs: startTs,
}
stat.checkpointTs.Store(startTs)
stat.resolvedTs.Store(startTs)

wrappedNotifier := func(resolvedTs uint64, latestCommitTs uint64) {
Expand Down Expand Up @@ -710,7 +710,7 @@ func (e *eventStore) UpdateDispatcherCheckpointTs(
if !ok {
return
}
dispatcherStat.checkpointTs = checkpointTs
dispatcherStat.checkpointTs.Store(checkpointTs)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To ensure that the dispatcher's checkpoint timestamp only moves forward and to handle potential concurrent updates safely, it is better to use util.CompareAndMonotonicIncrease instead of a direct Store. This prevents a stale update from regressing the checkpoint, which could incorrectly affect the minimum checkpoint calculation for the shared subscription.

Suggested change
dispatcherStat.checkpointTs.Store(checkpointTs)
util.CompareAndMonotonicIncrease(&dispatcherStat.checkpointTs, checkpointTs)


updateSubStatCheckpoint := func(subStat *subscriptionStat) {
if subStat == nil {
Expand All @@ -730,8 +730,9 @@ func (e *eventStore) UpdateDispatcherCheckpointTs(
continue
}

if newCheckpointTs == 0 || dispatcherStat.checkpointTs < newCheckpointTs {
newCheckpointTs = dispatcherStat.checkpointTs
dispatcherCheckpointTs := dispatcherStat.checkpointTs.Load()
if newCheckpointTs == 0 || dispatcherCheckpointTs < newCheckpointTs {
newCheckpointTs = dispatcherCheckpointTs
}
}

Expand All @@ -744,44 +745,39 @@ func (e *eventStore) UpdateDispatcherCheckpointTs(
if newCheckpointTs == 0 {
return
}
oldCheckpointTs := subStat.checkpointTs.Load()
if newCheckpointTs == oldCheckpointTs {
return
}
if newCheckpointTs < oldCheckpointTs {
log.Panic("should not happen",
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("oldCheckpointTs", oldCheckpointTs))
}
// If there is no dml event after old checkpoint ts, then there is no data to be deleted.
// So we can skip adding gc item.
lastReceiveDMLTime := subStat.lastReceiveDMLTime.Load()
if lastReceiveDMLTime > 0 {
oldCheckpointPhysicalTime := oracle.GetTimeFromTS(oldCheckpointTs)
if lastReceiveDMLTime >= oldCheckpointPhysicalTime.UnixMilli() {
e.gcManager.addGCItem(
subStat.dbIndex,
uint64(subStat.subID),
subStat.tableSpan.TableID,
oldCheckpointTs,
newCheckpointTs,
)
for {
oldCheckpointTs := subStat.checkpointTs.Load()
if newCheckpointTs == oldCheckpointTs {
return
}
}
e.subscriptionChangeCh.In() <- SubscriptionChange{
ChangeType: SubscriptionChangeTypeUpdate,
SubID: uint64(subStat.subID),
Span: subStat.tableSpan,
CheckpointTs: newCheckpointTs,
ResolvedTs: subStat.resolvedTs.Load(),
}
subStat.checkpointTs.Store(newCheckpointTs)
if log.GetLevel() <= zap.DebugLevel {
log.Debug("update checkpoint ts",
zap.Any("dispatcherID", dispatcherID),
zap.Uint64("subscriptionID", uint64(subStat.subID)),
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("oldCheckpointTs", oldCheckpointTs))
if newCheckpointTs < oldCheckpointTs {
return
}
if !subStat.checkpointTs.CompareAndSwap(oldCheckpointTs, newCheckpointTs) {
continue
}
e.gcManager.addGCItem(
subStat.dbIndex,
uint64(subStat.subID),
subStat.tableSpan.TableID,
oldCheckpointTs,
newCheckpointTs,
)
e.subscriptionChangeCh.In() <- SubscriptionChange{
ChangeType: SubscriptionChangeTypeUpdate,
SubID: uint64(subStat.subID),
Span: subStat.tableSpan,
CheckpointTs: newCheckpointTs,
ResolvedTs: subStat.resolvedTs.Load(),
}
if log.GetLevel() <= zap.DebugLevel {
log.Debug("update checkpoint ts",
zap.Any("dispatcherID", dispatcherID),
zap.Uint64("subscriptionID", uint64(subStat.subID)),
zap.Uint64("newCheckpointTs", newCheckpointTs),
zap.Uint64("oldCheckpointTs", oldCheckpointTs))
}
return
}
}
updateSubStatCheckpoint(dispatcherStat.subStat)
Expand Down Expand Up @@ -1501,7 +1497,7 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) {
}
comparableKey := common.ToComparableKey(rawKV.Key)
if bytes.Compare(comparableKey, iter.tableSpan.StartKey) >= 0 &&
bytes.Compare(comparableKey, iter.tableSpan.EndKey) <= 0 {
bytes.Compare(comparableKey, iter.tableSpan.EndKey) < 0 {
break
}
log.Debug("event store iter skip kv not in table span",
Expand Down Expand Up @@ -1645,17 +1641,21 @@ func (e *eventStore) uploadStatePeriodically(ctx context.Context) error {
log.Warn("cannot find subscription state", zap.Uint64("subscriptionID", change.SubID))
continue
}
if change.CheckpointTs < tableState.Subscriptions[targetIndex].CheckpointTs ||
change.ResolvedTs < tableState.Subscriptions[targetIndex].ResolvedTs {
log.Panic("should not happen",
subState := tableState.Subscriptions[targetIndex]
if change.CheckpointTs < subState.CheckpointTs || change.ResolvedTs < subState.ResolvedTs {
log.Warn("ignore stale subscription state update",
zap.Uint64("subscriptionID", change.SubID),
zap.Uint64("oldCheckpointTs", tableState.Subscriptions[targetIndex].CheckpointTs),
zap.Uint64("oldResolvedTs", tableState.Subscriptions[targetIndex].ResolvedTs),
zap.Uint64("oldCheckpointTs", subState.CheckpointTs),
zap.Uint64("oldResolvedTs", subState.ResolvedTs),
zap.Uint64("newCheckpointTs", change.CheckpointTs),
zap.Uint64("newResolvedTs", change.ResolvedTs))
}
tableState.Subscriptions[targetIndex].CheckpointTs = change.CheckpointTs
tableState.Subscriptions[targetIndex].ResolvedTs = change.ResolvedTs
if change.CheckpointTs > subState.CheckpointTs {
subState.CheckpointTs = change.CheckpointTs
}
if change.ResolvedTs > subState.ResolvedTs {
subState.ResolvedTs = change.ResolvedTs
}
default:
log.Panic("invalid subscription change type", zap.Int("changeType", int(change.ChangeType)))
}
Expand Down
4 changes: 2 additions & 2 deletions logservice/eventstore/event_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,8 +1227,8 @@ func TestEventStoreIter_NextWithFiltering(t *testing.T) {
var tableID int64 = 42
iteratorSpan := &heartbeatpb.TableSpan{
TableID: tableID,
StartKey: []byte("keyB"),
EndKey: []byte("keyD"),
StartKey: common.ToComparableKey([]byte("keyB")),
EndKey: common.ToComparableKey([]byte("keyD")),
}

// This test now focuses on a single, more comprehensive scenario.
Expand Down