diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 0246756b7b..a6f20a21d2 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -115,7 +115,7 @@ type dispatcherStat struct { resolvedTs atomic.Uint64 // the max ts of events which is not needed by this dispatcher - checkpointTs uint64 + checkpointTs atomic.Uint64 // the difference between `subStat`, `pendingSubStat` and `removingSubStat`: // 1) if there is no existing subscriptions which can be reused, // or there is a existing subscription with exact span match, @@ -498,8 +498,8 @@ func (e *eventStore) RegisterDispatcher( stat := &dispatcherStat{ dispatcherID: dispatcherID, tableSpan: dispatcherSpan, - checkpointTs: startTs, } + stat.checkpointTs.Store(startTs) stat.resolvedTs.Store(startTs) wrappedNotifier := func(resolvedTs uint64, latestCommitTs uint64) { @@ -710,7 +710,7 @@ func (e *eventStore) UpdateDispatcherCheckpointTs( if !ok { return } - dispatcherStat.checkpointTs = checkpointTs + dispatcherStat.checkpointTs.Store(checkpointTs) updateSubStatCheckpoint := func(subStat *subscriptionStat) { if subStat == nil { @@ -730,8 +730,9 @@ func (e *eventStore) UpdateDispatcherCheckpointTs( continue } - if newCheckpointTs == 0 || dispatcherStat.checkpointTs < newCheckpointTs { - newCheckpointTs = dispatcherStat.checkpointTs + dispatcherCheckpointTs := dispatcherStat.checkpointTs.Load() + if newCheckpointTs == 0 || dispatcherCheckpointTs < newCheckpointTs { + newCheckpointTs = dispatcherCheckpointTs } } @@ -744,44 +745,39 @@ func (e *eventStore) UpdateDispatcherCheckpointTs( if newCheckpointTs == 0 { return } - oldCheckpointTs := subStat.checkpointTs.Load() - if newCheckpointTs == oldCheckpointTs { - return - } - if newCheckpointTs < oldCheckpointTs { - log.Panic("should not happen", - zap.Uint64("newCheckpointTs", newCheckpointTs), - zap.Uint64("oldCheckpointTs", oldCheckpointTs)) - } - // If there is no dml event after old checkpoint ts, then there is no data to be deleted. - // So we can skip adding gc item. - lastReceiveDMLTime := subStat.lastReceiveDMLTime.Load() - if lastReceiveDMLTime > 0 { - oldCheckpointPhysicalTime := oracle.GetTimeFromTS(oldCheckpointTs) - if lastReceiveDMLTime >= oldCheckpointPhysicalTime.UnixMilli() { - e.gcManager.addGCItem( - subStat.dbIndex, - uint64(subStat.subID), - subStat.tableSpan.TableID, - oldCheckpointTs, - newCheckpointTs, - ) + for { + oldCheckpointTs := subStat.checkpointTs.Load() + if newCheckpointTs == oldCheckpointTs { + return } - } - e.subscriptionChangeCh.In() <- SubscriptionChange{ - ChangeType: SubscriptionChangeTypeUpdate, - SubID: uint64(subStat.subID), - Span: subStat.tableSpan, - CheckpointTs: newCheckpointTs, - ResolvedTs: subStat.resolvedTs.Load(), - } - subStat.checkpointTs.Store(newCheckpointTs) - if log.GetLevel() <= zap.DebugLevel { - log.Debug("update checkpoint ts", - zap.Any("dispatcherID", dispatcherID), - zap.Uint64("subscriptionID", uint64(subStat.subID)), - zap.Uint64("newCheckpointTs", newCheckpointTs), - zap.Uint64("oldCheckpointTs", oldCheckpointTs)) + if newCheckpointTs < oldCheckpointTs { + return + } + if !subStat.checkpointTs.CompareAndSwap(oldCheckpointTs, newCheckpointTs) { + continue + } + e.gcManager.addGCItem( + subStat.dbIndex, + uint64(subStat.subID), + subStat.tableSpan.TableID, + oldCheckpointTs, + newCheckpointTs, + ) + e.subscriptionChangeCh.In() <- SubscriptionChange{ + ChangeType: SubscriptionChangeTypeUpdate, + SubID: uint64(subStat.subID), + Span: subStat.tableSpan, + CheckpointTs: newCheckpointTs, + ResolvedTs: subStat.resolvedTs.Load(), + } + if log.GetLevel() <= zap.DebugLevel { + log.Debug("update checkpoint ts", + zap.Any("dispatcherID", dispatcherID), + zap.Uint64("subscriptionID", uint64(subStat.subID)), + zap.Uint64("newCheckpointTs", newCheckpointTs), + zap.Uint64("oldCheckpointTs", oldCheckpointTs)) + } + return } } updateSubStatCheckpoint(dispatcherStat.subStat) @@ -1501,7 +1497,7 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) { } comparableKey := common.ToComparableKey(rawKV.Key) if bytes.Compare(comparableKey, iter.tableSpan.StartKey) >= 0 && - bytes.Compare(comparableKey, iter.tableSpan.EndKey) <= 0 { + bytes.Compare(comparableKey, iter.tableSpan.EndKey) < 0 { break } log.Debug("event store iter skip kv not in table span", @@ -1645,17 +1641,21 @@ func (e *eventStore) uploadStatePeriodically(ctx context.Context) error { log.Warn("cannot find subscription state", zap.Uint64("subscriptionID", change.SubID)) continue } - if change.CheckpointTs < tableState.Subscriptions[targetIndex].CheckpointTs || - change.ResolvedTs < tableState.Subscriptions[targetIndex].ResolvedTs { - log.Panic("should not happen", + subState := tableState.Subscriptions[targetIndex] + if change.CheckpointTs < subState.CheckpointTs || change.ResolvedTs < subState.ResolvedTs { + log.Warn("ignore stale subscription state update", zap.Uint64("subscriptionID", change.SubID), - zap.Uint64("oldCheckpointTs", tableState.Subscriptions[targetIndex].CheckpointTs), - zap.Uint64("oldResolvedTs", tableState.Subscriptions[targetIndex].ResolvedTs), + zap.Uint64("oldCheckpointTs", subState.CheckpointTs), + zap.Uint64("oldResolvedTs", subState.ResolvedTs), zap.Uint64("newCheckpointTs", change.CheckpointTs), zap.Uint64("newResolvedTs", change.ResolvedTs)) } - tableState.Subscriptions[targetIndex].CheckpointTs = change.CheckpointTs - tableState.Subscriptions[targetIndex].ResolvedTs = change.ResolvedTs + if change.CheckpointTs > subState.CheckpointTs { + subState.CheckpointTs = change.CheckpointTs + } + if change.ResolvedTs > subState.ResolvedTs { + subState.ResolvedTs = change.ResolvedTs + } default: log.Panic("invalid subscription change type", zap.Int("changeType", int(change.ChangeType))) } diff --git a/logservice/eventstore/event_store_test.go b/logservice/eventstore/event_store_test.go index eb92c90715..1ea842b934 100644 --- a/logservice/eventstore/event_store_test.go +++ b/logservice/eventstore/event_store_test.go @@ -1227,8 +1227,8 @@ func TestEventStoreIter_NextWithFiltering(t *testing.T) { var tableID int64 = 42 iteratorSpan := &heartbeatpb.TableSpan{ TableID: tableID, - StartKey: []byte("keyB"), - EndKey: []byte("keyD"), + StartKey: common.ToComparableKey([]byte("keyB")), + EndKey: common.ToComparableKey([]byte("keyD")), } // This test now focuses on a single, more comprehensive scenario.