From 181555f3ab6894fb884a15fa299c8756ab386c03 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sun, 3 May 2026 14:41:39 +0800 Subject: [PATCH 1/3] eventstore: fix potential race --- logservice/eventstore/event_store.go | 119 ++++++++++++---------- logservice/eventstore/event_store_test.go | 4 +- 2 files changed, 65 insertions(+), 58 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 0246756b7b..f7d32720f0 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -14,7 +14,6 @@ package eventstore import ( - "bytes" "context" "fmt" "math" @@ -115,7 +114,7 @@ type dispatcherStat struct { resolvedTs atomic.Uint64 // the max ts of events which is not needed by this dispatcher - checkpointTs uint64 + checkpointTs atomic.Uint64 // the difference between `subStat`, `pendingSubStat` and `removingSubStat`: // 1) if there is no existing subscriptions which can be reused, // or there is a existing subscription with exact span match, @@ -498,8 +497,8 @@ func (e *eventStore) RegisterDispatcher( stat := &dispatcherStat{ dispatcherID: dispatcherID, tableSpan: dispatcherSpan, - checkpointTs: startTs, } + stat.checkpointTs.Store(startTs) stat.resolvedTs.Store(startTs) wrappedNotifier := func(resolvedTs uint64, latestCommitTs uint64) { @@ -513,8 +512,8 @@ func (e *eventStore) RegisterDispatcher( if subStats, ok := e.dispatcherMeta.tableStats[dispatcherSpan.TableID]; ok { for _, subStat := range subStats { // Check if this subStat's span contains the dispatcherSpan - if bytes.Compare(subStat.tableSpan.StartKey, dispatcherSpan.StartKey) <= 0 && - bytes.Compare(subStat.tableSpan.EndKey, dispatcherSpan.EndKey) >= 0 { + if common.StartCompare(subStat.tableSpan.StartKey, dispatcherSpan.StartKey) <= 0 && + common.EndCompare(dispatcherSpan.EndKey, subStat.tableSpan.EndKey) <= 0 { // For onlyReuse register request, we only consider initialized subStats if onlyReuse && !subStat.initialized.Load() { @@ -547,8 +546,8 @@ func (e *eventStore) RegisterDispatcher( // for example, if we have a dispatcher with span [b, c), // it is hard to determine whether [a, d) or [b, h) is bestMatch without some statistics. if bestMatch == nil || - (bytes.Compare(subStat.tableSpan.StartKey, bestMatch.tableSpan.StartKey) >= 0 && - bytes.Compare(subStat.tableSpan.EndKey, bestMatch.tableSpan.EndKey) <= 0) { + (common.StartCompare(subStat.tableSpan.StartKey, bestMatch.tableSpan.StartKey) >= 0 && + common.EndCompare(subStat.tableSpan.EndKey, bestMatch.tableSpan.EndKey) <= 0) { bestMatch = subStat } } @@ -710,7 +709,7 @@ func (e *eventStore) UpdateDispatcherCheckpointTs( if !ok { return } - dispatcherStat.checkpointTs = checkpointTs + dispatcherStat.checkpointTs.Store(checkpointTs) updateSubStatCheckpoint := func(subStat *subscriptionStat) { if subStat == nil { @@ -730,8 +729,9 @@ func (e *eventStore) UpdateDispatcherCheckpointTs( continue } - if newCheckpointTs == 0 || dispatcherStat.checkpointTs < newCheckpointTs { - newCheckpointTs = dispatcherStat.checkpointTs + dispatcherCheckpointTs := dispatcherStat.checkpointTs.Load() + if newCheckpointTs == 0 || dispatcherCheckpointTs < newCheckpointTs { + newCheckpointTs = dispatcherCheckpointTs } } @@ -744,44 +744,47 @@ func (e *eventStore) UpdateDispatcherCheckpointTs( if newCheckpointTs == 0 { return } - oldCheckpointTs := subStat.checkpointTs.Load() - if newCheckpointTs == oldCheckpointTs { - return - } - if newCheckpointTs < oldCheckpointTs { - log.Panic("should not happen", - zap.Uint64("newCheckpointTs", newCheckpointTs), - zap.Uint64("oldCheckpointTs", oldCheckpointTs)) - } - // If there is no dml event after old checkpoint ts, then there is no data to be deleted. - // So we can skip adding gc item. - lastReceiveDMLTime := subStat.lastReceiveDMLTime.Load() - if lastReceiveDMLTime > 0 { - oldCheckpointPhysicalTime := oracle.GetTimeFromTS(oldCheckpointTs) - if lastReceiveDMLTime >= oldCheckpointPhysicalTime.UnixMilli() { - e.gcManager.addGCItem( - subStat.dbIndex, - uint64(subStat.subID), - subStat.tableSpan.TableID, - oldCheckpointTs, - newCheckpointTs, - ) + for { + oldCheckpointTs := subStat.checkpointTs.Load() + if newCheckpointTs == oldCheckpointTs { + return } - } - e.subscriptionChangeCh.In() <- SubscriptionChange{ - ChangeType: SubscriptionChangeTypeUpdate, - SubID: uint64(subStat.subID), - Span: subStat.tableSpan, - CheckpointTs: newCheckpointTs, - ResolvedTs: subStat.resolvedTs.Load(), - } - subStat.checkpointTs.Store(newCheckpointTs) - if log.GetLevel() <= zap.DebugLevel { - log.Debug("update checkpoint ts", - zap.Any("dispatcherID", dispatcherID), - zap.Uint64("subscriptionID", uint64(subStat.subID)), - zap.Uint64("newCheckpointTs", newCheckpointTs), - zap.Uint64("oldCheckpointTs", oldCheckpointTs)) + if newCheckpointTs < oldCheckpointTs { + return + } + if !subStat.checkpointTs.CompareAndSwap(oldCheckpointTs, newCheckpointTs) { + continue + } + // If there is no dml event after old checkpoint ts, then there is no data to be deleted. + // So we can skip adding gc item. + lastReceiveDMLTime := subStat.lastReceiveDMLTime.Load() + if lastReceiveDMLTime > 0 { + oldCheckpointPhysicalTime := oracle.GetTimeFromTS(oldCheckpointTs) + if lastReceiveDMLTime >= oldCheckpointPhysicalTime.UnixMilli() { + e.gcManager.addGCItem( + subStat.dbIndex, + uint64(subStat.subID), + subStat.tableSpan.TableID, + oldCheckpointTs, + newCheckpointTs, + ) + } + } + e.subscriptionChangeCh.In() <- SubscriptionChange{ + ChangeType: SubscriptionChangeTypeUpdate, + SubID: uint64(subStat.subID), + Span: subStat.tableSpan, + CheckpointTs: newCheckpointTs, + ResolvedTs: subStat.resolvedTs.Load(), + } + if log.GetLevel() <= zap.DebugLevel { + log.Debug("update checkpoint ts", + zap.Any("dispatcherID", dispatcherID), + zap.Uint64("subscriptionID", uint64(subStat.subID)), + zap.Uint64("newCheckpointTs", newCheckpointTs), + zap.Uint64("oldCheckpointTs", oldCheckpointTs)) + } + return } } updateSubStatCheckpoint(dispatcherStat.subStat) @@ -1500,8 +1503,8 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) { break } comparableKey := common.ToComparableKey(rawKV.Key) - if bytes.Compare(comparableKey, iter.tableSpan.StartKey) >= 0 && - bytes.Compare(comparableKey, iter.tableSpan.EndKey) <= 0 { + if common.StartCompare(comparableKey, iter.tableSpan.StartKey) >= 0 && + common.EndCompare(comparableKey, iter.tableSpan.EndKey) < 0 { break } log.Debug("event store iter skip kv not in table span", @@ -1645,17 +1648,21 @@ func (e *eventStore) uploadStatePeriodically(ctx context.Context) error { log.Warn("cannot find subscription state", zap.Uint64("subscriptionID", change.SubID)) continue } - if change.CheckpointTs < tableState.Subscriptions[targetIndex].CheckpointTs || - change.ResolvedTs < tableState.Subscriptions[targetIndex].ResolvedTs { - log.Panic("should not happen", + subState := tableState.Subscriptions[targetIndex] + if change.CheckpointTs < subState.CheckpointTs || change.ResolvedTs < subState.ResolvedTs { + log.Warn("ignore stale subscription state update", zap.Uint64("subscriptionID", change.SubID), - zap.Uint64("oldCheckpointTs", tableState.Subscriptions[targetIndex].CheckpointTs), - zap.Uint64("oldResolvedTs", tableState.Subscriptions[targetIndex].ResolvedTs), + zap.Uint64("oldCheckpointTs", subState.CheckpointTs), + zap.Uint64("oldResolvedTs", subState.ResolvedTs), zap.Uint64("newCheckpointTs", change.CheckpointTs), zap.Uint64("newResolvedTs", change.ResolvedTs)) } - tableState.Subscriptions[targetIndex].CheckpointTs = change.CheckpointTs - tableState.Subscriptions[targetIndex].ResolvedTs = change.ResolvedTs + if change.CheckpointTs > subState.CheckpointTs { + subState.CheckpointTs = change.CheckpointTs + } + if change.ResolvedTs > subState.ResolvedTs { + subState.ResolvedTs = change.ResolvedTs + } default: log.Panic("invalid subscription change type", zap.Int("changeType", int(change.ChangeType))) } diff --git a/logservice/eventstore/event_store_test.go b/logservice/eventstore/event_store_test.go index eb92c90715..1ea842b934 100644 --- a/logservice/eventstore/event_store_test.go +++ b/logservice/eventstore/event_store_test.go @@ -1227,8 +1227,8 @@ func TestEventStoreIter_NextWithFiltering(t *testing.T) { var tableID int64 = 42 iteratorSpan := &heartbeatpb.TableSpan{ TableID: tableID, - StartKey: []byte("keyB"), - EndKey: []byte("keyD"), + StartKey: common.ToComparableKey([]byte("keyB")), + EndKey: common.ToComparableKey([]byte("keyD")), } // This test now focuses on a single, more comprehensive scenario. From aa4f2023669c139d18c5cc52b45468cc05886b68 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 5 May 2026 21:53:28 +0800 Subject: [PATCH 2/3] fix --- logservice/eventstore/event_store.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index f7d32720f0..ceb17545fc 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -14,6 +14,7 @@ package eventstore import ( + "bytes" "context" "fmt" "math" @@ -512,8 +513,8 @@ func (e *eventStore) RegisterDispatcher( if subStats, ok := e.dispatcherMeta.tableStats[dispatcherSpan.TableID]; ok { for _, subStat := range subStats { // Check if this subStat's span contains the dispatcherSpan - if common.StartCompare(subStat.tableSpan.StartKey, dispatcherSpan.StartKey) <= 0 && - common.EndCompare(dispatcherSpan.EndKey, subStat.tableSpan.EndKey) <= 0 { + if bytes.Compare(subStat.tableSpan.StartKey, dispatcherSpan.StartKey) <= 0 && + bytes.Compare(dispatcherSpan.EndKey, subStat.tableSpan.EndKey) <= 0 { // For onlyReuse register request, we only consider initialized subStats if onlyReuse && !subStat.initialized.Load() { @@ -546,8 +547,8 @@ func (e *eventStore) RegisterDispatcher( // for example, if we have a dispatcher with span [b, c), // it is hard to determine whether [a, d) or [b, h) is bestMatch without some statistics. if bestMatch == nil || - (common.StartCompare(subStat.tableSpan.StartKey, bestMatch.tableSpan.StartKey) >= 0 && - common.EndCompare(subStat.tableSpan.EndKey, bestMatch.tableSpan.EndKey) <= 0) { + (bytes.Compare(subStat.tableSpan.StartKey, bestMatch.tableSpan.StartKey) >= 0 && + bytes.Compare(subStat.tableSpan.EndKey, bestMatch.tableSpan.EndKey) <= 0) { bestMatch = subStat } } @@ -1503,8 +1504,8 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) { break } comparableKey := common.ToComparableKey(rawKV.Key) - if common.StartCompare(comparableKey, iter.tableSpan.StartKey) >= 0 && - common.EndCompare(comparableKey, iter.tableSpan.EndKey) < 0 { + if bytes.Compare(comparableKey, iter.tableSpan.StartKey) >= 0 && + bytes.Compare(comparableKey, iter.tableSpan.EndKey) < 0 { break } log.Debug("event store iter skip kv not in table span", From 926bb423542a024ecbb33ce20a48e62e29ade5ab Mon Sep 17 00:00:00 2001 From: lidezhu Date: Tue, 5 May 2026 22:07:23 +0800 Subject: [PATCH 3/3] f --- logservice/eventstore/event_store.go | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index ceb17545fc..a6f20a21d2 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -514,7 +514,7 @@ func (e *eventStore) RegisterDispatcher( for _, subStat := range subStats { // Check if this subStat's span contains the dispatcherSpan if bytes.Compare(subStat.tableSpan.StartKey, dispatcherSpan.StartKey) <= 0 && - bytes.Compare(dispatcherSpan.EndKey, subStat.tableSpan.EndKey) <= 0 { + bytes.Compare(subStat.tableSpan.EndKey, dispatcherSpan.EndKey) >= 0 { // For onlyReuse register request, we only consider initialized subStats if onlyReuse && !subStat.initialized.Load() { @@ -756,21 +756,13 @@ func (e *eventStore) UpdateDispatcherCheckpointTs( if !subStat.checkpointTs.CompareAndSwap(oldCheckpointTs, newCheckpointTs) { continue } - // If there is no dml event after old checkpoint ts, then there is no data to be deleted. - // So we can skip adding gc item. - lastReceiveDMLTime := subStat.lastReceiveDMLTime.Load() - if lastReceiveDMLTime > 0 { - oldCheckpointPhysicalTime := oracle.GetTimeFromTS(oldCheckpointTs) - if lastReceiveDMLTime >= oldCheckpointPhysicalTime.UnixMilli() { - e.gcManager.addGCItem( - subStat.dbIndex, - uint64(subStat.subID), - subStat.tableSpan.TableID, - oldCheckpointTs, - newCheckpointTs, - ) - } - } + e.gcManager.addGCItem( + subStat.dbIndex, + uint64(subStat.subID), + subStat.tableSpan.TableID, + oldCheckpointTs, + newCheckpointTs, + ) e.subscriptionChangeCh.In() <- SubscriptionChange{ ChangeType: SubscriptionChangeTypeUpdate, SubID: uint64(subStat.subID),