From af9b69b5e82bc125d37351cbd1f106f71410b453 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 29 Apr 2026 18:19:48 +0800 Subject: [PATCH 01/13] logservice: improve event store Pebble table filtering --- logservice/eventstore/event_store.go | 24 ++++- logservice/eventstore/pebble.go | 13 ++- logservice/eventstore/pebble_test.go | 58 ++++++++++++ logservice/eventstore/table_properties.go | 108 ++++++++++++++++++++++ 4 files changed, 197 insertions(+), 6 deletions(-) create mode 100644 logservice/eventstore/table_properties.go diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 82954a1cb9..c12c46d296 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -204,6 +204,8 @@ type eventStore struct { subClient logpuller.SubscriptionClient dbs []*pebble.DB + pebbleCache *pebble.Cache + tableCache *pebble.TableCache chs []*chann.UnlimitedChannel[eventWithCallback, uint64] writeTaskPools []*writeTaskPool @@ -258,11 +260,14 @@ func New( log.Panic("fail to remove path", zap.String("path", dbPath), zap.Error(err)) } + dbs, pebbleCache, tableCache := createPebbleDBs(dbPath, dbCount) store := &eventStore{ pdClock: appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock), subClient: subClient, - dbs: createPebbleDBs(dbPath, dbCount), + dbs: dbs, + pebbleCache: pebbleCache, + tableCache: tableCache, chs: make([]*chann.UnlimitedChannel[eventWithCallback, uint64], 0, dbCount), writeTaskPools: make([]*writeTaskPool, 0, dbCount), @@ -425,6 +430,16 @@ func (e *eventStore) Close(_ context.Context) error { log.Error("failed to close pebble db", zap.Error(err)) } } + if e.tableCache != nil { + if err := e.tableCache.Unref(); err != nil { + log.Error("failed to unref pebble table cache", zap.Error(err)) + } + e.tableCache = nil + } + if e.pebbleCache != nil { + e.pebbleCache.Unref() + e.pebbleCache = nil + } return nil } @@ -876,17 +891,22 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com // convert range before pass it to pebble: (startTs, endTs] is equal to [startTs + 1, endTs + 1) var start []byte + lowerTs := dataRange.CommitTsStart + 1 if dataRange.LastScannedTxnStartTs != 0 { start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart, dataRange.LastScannedTxnStartTs+1) + lowerTs = dataRange.CommitTsStart } else { start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) } end := EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) - // TODO: optimize read performance // it's impossible return error here iter, _ := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, + TableFilter: newEventStoreTableFilter( + lowerTs, + dataRange.CommitTsEnd, + ), }) decoder := e.decoderPool.Get().(*zstd.Decoder) startTime := time.Now() diff --git a/logservice/eventstore/pebble.go b/logservice/eventstore/pebble.go index 08d252070a..e72c4133b0 100644 --- a/logservice/eventstore/pebble.go +++ b/logservice/eventstore/pebble.go @@ -32,6 +32,7 @@ const ( cacheSize = 1 << 30 // 1GB memTableTotalSize = 1 << 30 // 1GB memTableSize = 64 << 20 // 64MB + maxOpenFilesPerDB = 10000 ) func newPebbleOptions(dbNum int) *pebble.Options { @@ -39,7 +40,7 @@ func newPebbleOptions(dbNum int) *pebble.Options { // Disable WAL to decrease io DisableWAL: true, - MaxOpenFiles: 10000, + MaxOpenFiles: maxOpenFilesPerDB, MaxConcurrentCompactions: func() int { return 6 }, @@ -56,6 +57,9 @@ func newPebbleOptions(dbNum int) *pebble.Options { // Configure options to optimize read/write performance Levels: make([]pebble.LevelOptions, 7), + TablePropertyCollectors: []func() pebble.TablePropertyCollector{ + newEventStoreCRTsCollector, + }, } for i := 0; i < len(opts.Levels); i++ { @@ -74,9 +78,10 @@ func newPebbleOptions(dbNum int) *pebble.Options { return opts } -func createPebbleDBs(rootDir string, dbNum int) []*pebble.DB { +func createPebbleDBs(rootDir string, dbNum int) ([]*pebble.DB, *pebble.Cache, *pebble.TableCache) { cache := pebble.NewCache(cacheSize) - tableCache := pebble.NewTableCache(cache, dbNum, int(cache.MaxSize())) + tableCacheSize := dbNum * pebble.TableCacheSize(maxOpenFilesPerDB) + tableCache := pebble.NewTableCache(cache, dbNum, tableCacheSize) dbs := make([]*pebble.DB, dbNum) for i := 0; i < dbNum; i++ { id := strconv.Itoa(i + 1) @@ -113,7 +118,7 @@ func createPebbleDBs(rootDir string, dbNum int) []*pebble.DB { } dbs[i] = db } - return dbs + return dbs, cache, tableCache } type eventStoreWriteStallState struct { diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index c95bf3928d..0e80628926 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -143,3 +143,61 @@ func TestCompressionAndKeyOrder(t *testing.T) { require.Less(t, bytes.Compare(keyDelete, keyUpdate), 0, "Delete should come before Update") require.Less(t, bytes.Compare(keyUpdate, keyInsert), 0, "Update should come before Insert") } + +func TestEventStoreCRTsCollector(t *testing.T) { + t.Parallel() + + collector := newEventStoreCRTsCollector() + event := &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: 1, + CRTs: 10, + Key: []byte("key"), + } + require.NoError(t, collector.Add(pebble.InternalKey{ + UserKey: EncodeKey(1, 1, event, CompressionNone), + Trailer: uint64(pebble.InternalKeyKindSet), + }, nil)) + require.NoError(t, collector.Add(pebble.InternalKey{ + UserKey: EncodeKeyPrefix(1, 1, 100), + Trailer: uint64(pebble.InternalKeyKindRangeDelete), + }, EncodeKeyPrefix(1, 1, 200))) + + props := make(map[string]string) + require.NoError(t, collector.Finish(props)) + require.Equal(t, "10", props[eventStoreMinCRTsTableProperty]) + require.Equal(t, "200", props[eventStoreMaxCRTsTableProperty]) + + require.True(t, newEventStoreTableFilter(9, 10)(props)) + require.True(t, newEventStoreTableFilter(150, 150)(props)) + require.False(t, newEventStoreTableFilter(201, 300)(props)) + require.True(t, newEventStoreTableFilter(201, 300)(nil)) +} + +func TestEventStoreTableFilterKeepsRangeDeletionTables(t *testing.T) { + t.Parallel() + + db, err := pebble.Open(t.TempDir(), newPebbleOptions(1)) + require.NoError(t, err) + defer db.Close() + + event := &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: 1, + CRTs: 80, + Key: []byte("key"), + } + require.NoError(t, db.Set(EncodeKey(1, 1, event, CompressionNone), []byte("value"), pebble.NoSync)) + require.NoError(t, db.Flush()) + require.NoError(t, deleteDataRange(db, 1, 1, 0, 100)) + require.NoError(t, db.Flush()) + + iter, err := db.NewIter(&pebble.IterOptions{ + LowerBound: EncodeKeyPrefix(1, 1, 80), + UpperBound: EncodeKeyPrefix(1, 1, 81), + TableFilter: newEventStoreTableFilter(80, 80), + }) + require.NoError(t, err) + defer iter.Close() + require.False(t, iter.First()) +} diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go new file mode 100644 index 0000000000..e183fcbd16 --- /dev/null +++ b/logservice/eventstore/table_properties.go @@ -0,0 +1,108 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventstore + +import ( + "encoding/binary" + "strconv" + + "github.com/cockroachdb/pebble" +) + +const ( + eventStoreMinCRTsTableProperty = "event-store-min-crts" + eventStoreMaxCRTsTableProperty = "event-store-max-crts" + eventStoreCRTsCollectorName = "event-store-crts-collector" + + encodedKeyCRTsOffset = 16 + encodedKeyCRTsEnd = encodedKeyCRTsOffset + 8 +) + +type eventStoreCRTsCollector struct { + minTs uint64 + maxTs uint64 + hasTs bool +} + +func newEventStoreCRTsCollector() pebble.TablePropertyCollector { + return &eventStoreCRTsCollector{} +} + +func (c *eventStoreCRTsCollector) Add(key pebble.InternalKey, value []byte) error { + c.recordEncodedKey(key.UserKey) + if key.Kind() == pebble.InternalKeyKindRangeDelete { + c.recordEncodedKey(value) + } + return nil +} + +func (c *eventStoreCRTsCollector) Finish(userProps map[string]string) error { + if !c.hasTs { + return nil + } + userProps[eventStoreMinCRTsTableProperty] = strconv.FormatUint(c.minTs, 10) + userProps[eventStoreMaxCRTsTableProperty] = strconv.FormatUint(c.maxTs, 10) + return nil +} + +func (c *eventStoreCRTsCollector) Name() string { + return eventStoreCRTsCollectorName +} + +func (c *eventStoreCRTsCollector) recordEncodedKey(key []byte) { + crts, ok := decodeCRTsFromEncodedKey(key) + if !ok { + return + } + if !c.hasTs || crts < c.minTs { + c.minTs = crts + } + if !c.hasTs || crts > c.maxTs { + c.maxTs = crts + } + c.hasTs = true +} + +func decodeCRTsFromEncodedKey(key []byte) (uint64, bool) { + if len(key) < encodedKeyCRTsEnd { + return 0, false + } + return binary.BigEndian.Uint64(key[encodedKeyCRTsOffset:encodedKeyCRTsEnd]), true +} + +func newEventStoreTableFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { + return func(userProps map[string]string) bool { + minTs, ok := parseEventStoreCRTsTableProperty(userProps, eventStoreMinCRTsTableProperty) + if !ok { + return true + } + maxTs, ok := parseEventStoreCRTsTableProperty(userProps, eventStoreMaxCRTsTableProperty) + if !ok { + return true + } + return maxTs >= lowerTs && minTs <= upperTs + } +} + +func parseEventStoreCRTsTableProperty(userProps map[string]string, key string) (uint64, bool) { + value, ok := userProps[key] + if !ok { + return 0, false + } + ts, err := strconv.ParseUint(value, 10, 64) + if err != nil { + return 0, false + } + return ts, true +} From a959f579efe42db0690a5a7b1ecb447c74f57a0d Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 29 Apr 2026 18:57:56 +0800 Subject: [PATCH 02/13] add some comment --- logservice/eventstore/event_store.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index c12c46d296..b1c8a848f6 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -889,7 +889,27 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com e.dispatcherMeta.Unlock() } - // convert range before pass it to pebble: (startTs, endTs] is equal to [startTs + 1, endTs + 1) + // dataRange fields: + // CommitTsStart and CommitTsEnd define the commit-ts scan window. + // LastScannedTxnStartTs records how far the previous scan progressed inside + // CommitTsStart. It is zero if there is no unfinished scan at CommitTsStart. + // + // Iterator key bounds: + // Pebble uses [LowerBound, UpperBound), so end is always encoded as + // CommitTsEnd+1. + // + // If LastScannedTxnStartTs is zero, scan commit ts in + // (CommitTsStart, CommitTsEnd], and use CommitTsStart+1 as LowerBound. + // + // If LastScannedTxnStartTs is non-zero, continue scanning commit ts + // CommitTsStart with start ts greater than LastScannedTxnStartTs, then scan + // later commit ts up to CommitTsEnd. + // + // Table filter bounds: + // lowerTs is the commit-ts lower bound for TableFilter. It skips SSTs whose + // collected CRTs range does not overlap [lowerTs, CommitTsEnd]. Therefore + // lowerTs is CommitTsStart+1 in the first case, and CommitTsStart in the + // second case. var start []byte lowerTs := dataRange.CommitTsStart + 1 if dataRange.LastScannedTxnStartTs != 0 { From 8668f0dc051b06f6c400c2fd591f86f2baa7c236 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 29 Apr 2026 20:34:50 +0800 Subject: [PATCH 03/13] r --- logservice/eventstore/format.go | 32 +++++++++++++++++--- logservice/eventstore/pebble_test.go | 36 +++-------------------- logservice/eventstore/table_properties.go | 20 ++++--------- 3 files changed, 37 insertions(+), 51 deletions(-) diff --git a/logservice/eventstore/format.go b/logservice/eventstore/format.go index bb8cefe2b5..6c4815a97a 100644 --- a/logservice/eventstore/format.go +++ b/logservice/eventstore/format.go @@ -38,6 +38,20 @@ const ( CompressionZSTD ) +const ( + encodedKeyUint64Len = 8 + encodedKeyOrderLen = 2 + + encodedKeyUniqueIDOffset = 0 + encodedKeyTableIDOffset = encodedKeyUniqueIDOffset + encodedKeyUint64Len + encodedKeyCRTsOffset = encodedKeyTableIDOffset + encodedKeyUint64Len + encodedKeyCRTsEnd = encodedKeyCRTsOffset + encodedKeyUint64Len + encodedKeyStartTsOffset = encodedKeyCRTsEnd + encodedKeyStartTsEnd = encodedKeyStartTsOffset + encodedKeyUint64Len + encodedKeyMetasOffset = encodedKeyStartTsEnd + encodedKeyMetasEnd = encodedKeyMetasOffset + encodedKeyOrderLen +) + const ( // Bitmask for DML order and compression type. dmlOrderMask = 0xFF00 // DML order is stored in the high 8 bits for sorting. @@ -53,9 +67,9 @@ func EncodeKeyPrefix(uniqueID uint64, tableID int64, CRTs uint64, startTs ...uin log.Panic("startTs should be at most one") } // uniqueID, tableID, CRTs. - keySize := 8 + 8 + 8 + keySize := encodedKeyCRTsEnd if len(startTs) > 0 { - keySize += 8 + keySize = encodedKeyStartTsEnd } buf := make([]byte, 0, keySize) uint64Buf := [8]byte{} @@ -78,7 +92,7 @@ func EncodeKeyPrefix(uniqueID uint64, tableID int64, CRTs uint64, startTs ...uin func encodedKeyLen(event *common.RawKVEntry) int { // uniqueID, tableID, CRTs, startTs, Put/Delete, CompressionType, Key - return 8 + 8 + 8 + 8 + 1 + 1 + len(event.Key) + return encodedKeyMetasEnd + len(event.Key) } // EncodeKeyTo appends an encoded event-store key to buf. @@ -116,10 +130,20 @@ func EncodeKey(uniqueID uint64, tableID int64, event *common.RawKVEntry, compres // DecodeKeyMetas decodes compression type and dml order from the key. func DecodeKeyMetas(key []byte) (DMLOrder, CompressionType) { - combinedOrder := binary.BigEndian.Uint16(key[32:34]) // The combined order is at offset 32 for 2 bytes. + combinedOrder := binary.BigEndian.Uint16(key[encodedKeyMetasOffset:encodedKeyMetasEnd]) return DMLOrder((combinedOrder & dmlOrderMask) >> dmlOrderShift), CompressionType(combinedOrder & compressionMask) } +// decodeCRTsFromEncodedKey decodes CRTs from an event-store key prefix. +// It works for both full event keys and DeleteRange boundary keys because both +// contain uniqueID, tableID, and CRTs as the first three fields. +func decodeCRTsFromEncodedKey(key []byte) (uint64, bool) { + if len(key) < encodedKeyCRTsEnd { + return 0, false + } + return binary.BigEndian.Uint64(key[encodedKeyCRTsOffset:encodedKeyCRTsEnd]), true +} + // getDMLOrder returns the order of the dml types: delete Date: Thu, 30 Apr 2026 16:41:00 +0800 Subject: [PATCH 04/13] f --- logservice/eventstore/pebble_test.go | 6 ++++++ logservice/eventstore/table_properties.go | 3 +++ 2 files changed, 9 insertions(+) diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index 73bb6374bb..839fae621a 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -172,4 +172,10 @@ func TestEventStoreCRTsCollector(t *testing.T) { require.True(t, newEventStoreTableFilter(100, 100)(props)) require.False(t, newEventStoreTableFilter(101, 300)(props)) require.True(t, newEventStoreTableFilter(101, 300)(nil)) + + corruptedProps := map[string]string{ + eventStoreMinCRTsTableProperty: "300", + eventStoreMaxCRTsTableProperty: "100", + } + require.True(t, newEventStoreTableFilter(101, 200)(corruptedProps)) } diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go index 2fa4334f1b..62d8076a2b 100644 --- a/logservice/eventstore/table_properties.go +++ b/logservice/eventstore/table_properties.go @@ -81,6 +81,9 @@ func newEventStoreTableFilter(lowerTs uint64, upperTs uint64) func(map[string]st if !ok { return true } + if minTs > maxTs { + return true + } return maxTs >= lowerTs && minTs <= upperTs } } From a384cb12e4e9a90af0c7ee22b250b86fb9f5b9f6 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 15:19:16 +0800 Subject: [PATCH 05/13] fix --- logservice/eventstore/event_store.go | 7 ++- logservice/eventstore/format.go | 62 +++++++++-------------- logservice/eventstore/pebble_test.go | 42 +++++++++++++-- logservice/eventstore/table_properties.go | 56 +++++++++++++------- pkg/metrics/event_store.go | 16 ++++++ 5 files changed, 122 insertions(+), 61 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index b1c8a848f6..1711b45439 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -913,7 +913,12 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com var start []byte lowerTs := dataRange.CommitTsStart + 1 if dataRange.LastScannedTxnStartTs != 0 { - start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart, dataRange.LastScannedTxnStartTs+1) + start = encodeScanLowerBound( + uint64(subStat.subID), + stat.tableSpan.TableID, + dataRange.CommitTsStart, + dataRange.LastScannedTxnStartTs+1, + ) lowerTs = dataRange.CommitTsStart } else { start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) diff --git a/logservice/eventstore/format.go b/logservice/eventstore/format.go index 6c4815a97a..627e6831b1 100644 --- a/logservice/eventstore/format.go +++ b/logservice/eventstore/format.go @@ -39,17 +39,11 @@ const ( ) const ( - encodedKeyUint64Len = 8 - encodedKeyOrderLen = 2 - - encodedKeyUniqueIDOffset = 0 - encodedKeyTableIDOffset = encodedKeyUniqueIDOffset + encodedKeyUint64Len - encodedKeyCRTsOffset = encodedKeyTableIDOffset + encodedKeyUint64Len - encodedKeyCRTsEnd = encodedKeyCRTsOffset + encodedKeyUint64Len - encodedKeyStartTsOffset = encodedKeyCRTsEnd - encodedKeyStartTsEnd = encodedKeyStartTsOffset + encodedKeyUint64Len - encodedKeyMetasOffset = encodedKeyStartTsEnd - encodedKeyMetasEnd = encodedKeyMetasOffset + encodedKeyOrderLen + encodedKeyUint64Len = 8 + encodedKeyCRTsOffset = 2 * encodedKeyUint64Len + encodedKeyCRTsEnd = encodedKeyCRTsOffset + encodedKeyUint64Len + encodedKeyMetasOffset = 4 * encodedKeyUint64Len + encodedKeyMetasEnd = encodedKeyMetasOffset + 2 ) const ( @@ -59,37 +53,27 @@ const ( dmlOrderShift = 8 ) -// EncodeKeyPrefix encodes uniqueID, tableID, CRTs and StartTs. -// StartTs is optional. -// The result should be a prefix of normal key. (TODO: add a unit test) -func EncodeKeyPrefix(uniqueID uint64, tableID int64, CRTs uint64, startTs ...uint64) []byte { - if len(startTs) > 1 { - log.Panic("startTs should be at most one") - } - // uniqueID, tableID, CRTs. - keySize := encodedKeyCRTsEnd - if len(startTs) > 0 { - keySize = encodedKeyStartTsEnd - } - buf := make([]byte, 0, keySize) - uint64Buf := [8]byte{} - // uniqueID - binary.BigEndian.PutUint64(uint64Buf[:], uniqueID) - buf = append(buf, uint64Buf[:]...) - // tableID - binary.BigEndian.PutUint64(uint64Buf[:], uint64(tableID)) - buf = append(buf, uint64Buf[:]...) - // CRTs - binary.BigEndian.PutUint64(uint64Buf[:], CRTs) - buf = append(buf, uint64Buf[:]...) - if len(startTs) > 0 { - // startTs - binary.BigEndian.PutUint64(uint64Buf[:], startTs[0]) - buf = append(buf, uint64Buf[:]...) - } +// EncodeKeyPrefix encodes uniqueID, tableID, and txnCommitTs. +// The result is a prefix of a full event-store key. +func EncodeKeyPrefix(uniqueID uint64, tableID int64, txnCommitTs uint64) []byte { + buf := make([]byte, encodedKeyCRTsEnd) + encodeKeyPrefixTo(buf, uniqueID, tableID, txnCommitTs) return buf } +func encodeScanLowerBound(uniqueID uint64, tableID int64, txnCommitTs uint64, startTs uint64) []byte { + buf := make([]byte, encodedKeyMetasOffset) + encodeKeyPrefixTo(buf, uniqueID, tableID, txnCommitTs) + binary.BigEndian.PutUint64(buf[encodedKeyCRTsEnd:encodedKeyMetasOffset], startTs) + return buf +} + +func encodeKeyPrefixTo(buf []byte, uniqueID uint64, tableID int64, txnCommitTs uint64) { + binary.BigEndian.PutUint64(buf[:encodedKeyUint64Len], uniqueID) + binary.BigEndian.PutUint64(buf[encodedKeyUint64Len:encodedKeyCRTsOffset], uint64(tableID)) + binary.BigEndian.PutUint64(buf[encodedKeyCRTsOffset:encodedKeyCRTsEnd], txnCommitTs) +} + func encodedKeyLen(event *common.RawKVEntry) int { // uniqueID, tableID, CRTs, startTs, Put/Delete, CompressionType, Key return encodedKeyMetasEnd + len(event.Key) diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index 839fae621a..bdefadf5f5 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -18,6 +18,7 @@ import ( "encoding/binary" "fmt" "os" + "strconv" "testing" "github.com/cockroachdb/pebble" @@ -144,6 +145,33 @@ func TestCompressionAndKeyOrder(t *testing.T) { require.Less(t, bytes.Compare(keyUpdate, keyInsert), 0, "Update should come before Insert") } +func TestEventStoreKeyBounds(t *testing.T) { + t.Parallel() + + event := &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: 20, + CRTs: 10, + Key: []byte("key"), + } + key := EncodeKey(1, 1, event, CompressionNone) + prefix := EncodeKeyPrefix(1, 1, event.CRTs) + require.Len(t, prefix, encodedKeyCRTsEnd) + require.True(t, bytes.HasPrefix(key, prefix)) + + lowerBound := encodeScanLowerBound(1, 1, event.CRTs, event.StartTs) + require.Len(t, lowerBound, encodedKeyMetasOffset) + require.True(t, bytes.HasPrefix(key, lowerBound)) + + previousEvent := &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: event.StartTs - 1, + CRTs: event.CRTs, + Key: []byte("key"), + } + require.Less(t, bytes.Compare(EncodeKey(1, 1, previousEvent, CompressionNone), lowerBound), 0) +} + func TestEventStoreCRTsCollector(t *testing.T) { t.Parallel() @@ -154,19 +182,25 @@ func TestEventStoreCRTsCollector(t *testing.T) { CRTs: 10, Key: []byte("key"), } + eventKey := EncodeKey(1, 1, event, CompressionNone) + eventValue := []byte("value") require.NoError(t, collector.Add(pebble.InternalKey{ - UserKey: EncodeKey(1, 1, event, CompressionNone), + UserKey: eventKey, Trailer: uint64(pebble.InternalKeyKindSet), - }, nil)) + }, eventValue)) + deleteStart := EncodeKeyPrefix(1, 1, 100) + deleteEnd := EncodeKeyPrefix(1, 1, 200) require.NoError(t, collector.Add(pebble.InternalKey{ - UserKey: EncodeKeyPrefix(1, 1, 100), + UserKey: deleteStart, Trailer: uint64(pebble.InternalKeyKindRangeDelete), - }, EncodeKeyPrefix(1, 1, 200))) + }, deleteEnd)) props := make(map[string]string) require.NoError(t, collector.Finish(props)) require.Equal(t, "10", props[eventStoreMinCRTsTableProperty]) require.Equal(t, "100", props[eventStoreMaxCRTsTableProperty]) + require.Equal(t, strconv.Itoa(len(eventKey)+len(eventValue)+len(deleteStart)+len(deleteEnd)), + props[eventStoreLogicalBytesProperty]) require.True(t, newEventStoreTableFilter(9, 10)(props)) require.True(t, newEventStoreTableFilter(100, 100)(props)) diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go index 62d8076a2b..c7400d70c3 100644 --- a/logservice/eventstore/table_properties.go +++ b/logservice/eventstore/table_properties.go @@ -17,30 +17,34 @@ import ( "strconv" "github.com/cockroachdb/pebble" + "github.com/pingcap/ticdc/pkg/metrics" ) const ( eventStoreMinCRTsTableProperty = "event-store-min-crts" eventStoreMaxCRTsTableProperty = "event-store-max-crts" + eventStoreLogicalBytesProperty = "event-store-logical-bytes" eventStoreCRTsCollectorName = "event-store-crts-collector" ) type eventStoreCRTsCollector struct { - minTs uint64 - maxTs uint64 - hasTs bool + minTs uint64 + maxTs uint64 + logicalBytes uint64 + hasTs bool } func newEventStoreCRTsCollector() pebble.TablePropertyCollector { return &eventStoreCRTsCollector{} } -func (c *eventStoreCRTsCollector) Add(key pebble.InternalKey, _ []byte) error { +func (c *eventStoreCRTsCollector) Add(key pebble.InternalKey, value []byte) error { // Event store DeleteRange is GC-only: it removes data that should already be // below the future scan range. Do not widen table properties with the range // tombstone end key. For example, a cleanup tombstone [CRTs=100, CRTs=1000) // would make this cleanup-only SST overlap scans like [500,600]. c.recordEncodedKey(key.UserKey) + c.logicalBytes += uint64(len(key.UserKey) + len(value)) return nil } @@ -50,6 +54,7 @@ func (c *eventStoreCRTsCollector) Finish(userProps map[string]string) error { } userProps[eventStoreMinCRTsTableProperty] = strconv.FormatUint(c.minTs, 10) userProps[eventStoreMaxCRTsTableProperty] = strconv.FormatUint(c.maxTs, 10) + userProps[eventStoreLogicalBytesProperty] = strconv.FormatUint(c.logicalBytes, 10) return nil } @@ -73,22 +78,39 @@ func (c *eventStoreCRTsCollector) recordEncodedKey(key []byte) { func newEventStoreTableFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { return func(userProps map[string]string) bool { - minTs, ok := parseEventStoreCRTsTableProperty(userProps, eventStoreMinCRTsTableProperty) - if !ok { - return true - } - maxTs, ok := parseEventStoreCRTsTableProperty(userProps, eventStoreMaxCRTsTableProperty) - if !ok { - return true - } - if minTs > maxTs { - return true - } - return maxTs >= lowerTs && minTs <= upperTs + shouldScan := eventStoreTableMayContainCRTs(userProps, lowerTs, upperTs) + recordEventStoreTableFilterMetrics(userProps, shouldScan) + return shouldScan } } -func parseEventStoreCRTsTableProperty(userProps map[string]string, key string) (uint64, bool) { +func eventStoreTableMayContainCRTs(userProps map[string]string, lowerTs uint64, upperTs uint64) bool { + minTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMinCRTsTableProperty) + if !ok { + return true + } + maxTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMaxCRTsTableProperty) + if !ok { + return true + } + if minTs > maxTs { + return true + } + return maxTs >= lowerTs && minTs <= upperTs +} + +func recordEventStoreTableFilterMetrics(userProps map[string]string, shouldScan bool) { + result := "scanned" + if !shouldScan { + result = "skipped" + } + metrics.EventStoreTableFilterCount.WithLabelValues(result).Inc() + if logicalBytes, ok := parseEventStoreUint64TableProperty(userProps, eventStoreLogicalBytesProperty); ok { + metrics.EventStoreTableFilterLogicalBytes.WithLabelValues(result).Add(float64(logicalBytes)) + } +} + +func parseEventStoreUint64TableProperty(userProps map[string]string, key string) (uint64, bool) { value, ok := userProps[key] if !ok { return 0, false diff --git a/pkg/metrics/event_store.go b/pkg/metrics/event_store.go index d934173f27..3e499b3d52 100644 --- a/pkg/metrics/event_store.go +++ b/pkg/metrics/event_store.go @@ -81,6 +81,20 @@ var ( Help: "The number of bytes scanned by event store.", }, []string{"type"}) + EventStoreTableFilterCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_store", + Name: "table_filter_count", + Help: "The number of SST table filter decisions by event store.", + }, []string{"result"}) + + EventStoreTableFilterLogicalBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_store", + Name: "table_filter_logical_bytes", + Help: "The estimated logical bytes in SSTs scanned or skipped by event store table filter.", + }, []string{"result"}) + EventStoreDeleteRangeCount = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "ticdc", @@ -336,6 +350,8 @@ func initEventStoreMetrics(registry *prometheus.Registry) { registry.MustRegister(EventStoreWriteDurationHistogram) registry.MustRegister(EventStoreScanRequestsCount) registry.MustRegister(EventStoreScanBytes) + registry.MustRegister(EventStoreTableFilterCount) + registry.MustRegister(EventStoreTableFilterLogicalBytes) registry.MustRegister(EventStoreDeleteRangeCount) registry.MustRegister(EventStoreDeleteRangeFetchedCount) registry.MustRegister(EventStoreSubscriptionResolvedTsLagHist) From a6cdcbcbd39938f8a81032302ef5b001c729297c Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 15:47:16 +0800 Subject: [PATCH 06/13] fix --- logservice/eventstore/event_store.go | 8 +- .../eventstore/event_store_bench_test.go | 4 +- logservice/eventstore/event_store_test.go | 8 +- logservice/eventstore/format.go | 75 ++++++------ logservice/eventstore/format_test.go | 70 ++++++++++++ logservice/eventstore/gc.go | 78 +++++++------ logservice/eventstore/gc_test.go | 108 ++++++++++-------- logservice/eventstore/pebble.go | 2 +- logservice/eventstore/pebble_test.go | 38 +++--- logservice/eventstore/table_properties.go | 58 +++++----- pkg/metrics/event_store.go | 16 +-- 11 files changed, 278 insertions(+), 187 deletions(-) create mode 100644 logservice/eventstore/format_test.go diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 1711b45439..434a717161 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -921,14 +921,14 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com ) lowerTs = dataRange.CommitTsStart } else { - start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) + start = EncodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) } - end := EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) + end := EncodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) // it's impossible return error here iter, _ := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, - TableFilter: newEventStoreTableFilter( + TableFilter: newEventStoreSSTFileFilter( lowerTs, dataRange.CommitTsEnd, ), @@ -1474,7 +1474,7 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) { key := iter.innerIter.Key() value := iter.innerIter.Value() - _, compressionType := DecodeKeyMetas(key) + _, compressionType := DecodeKeyAttributes(key) var decodedValue []byte if compressionType == CompressionZSTD { var err error diff --git a/logservice/eventstore/event_store_bench_test.go b/logservice/eventstore/event_store_bench_test.go index e8926e92e5..a61c2ff221 100644 --- a/logservice/eventstore/event_store_bench_test.go +++ b/logservice/eventstore/event_store_bench_test.go @@ -181,8 +181,8 @@ func BenchmarkEventStoreIteratorNext(b *testing.B) { StartKey: []byte{}, EndKey: []byte{0xff}, } - lower := EncodeKeyPrefix(1, 1, 1) - upper := EncodeKeyPrefix(1, 1, 1<<63) + lower := EncodeTxnCommitTsBoundaryKey(1, 1, 1) + upper := EncodeTxnCommitTsBoundaryKey(1, 1, 1<<63) b.ReportAllocs() b.ResetTimer() diff --git a/logservice/eventstore/event_store_test.go b/logservice/eventstore/event_store_test.go index f2cddd8ff8..92ef174824 100644 --- a/logservice/eventstore/event_store_test.go +++ b/logservice/eventstore/event_store_test.go @@ -891,7 +891,7 @@ func TestWriteToEventStore(t *testing.T) { key := iter.Key() value := iter.Value() - _, compressionType := DecodeKeyMetas(key) + _, compressionType := DecodeKeyAttributes(key) var decodedValue []byte if compressionType == CompressionZSTD { @@ -965,7 +965,7 @@ func TestWriteToEventStoreZstdCompressionDisabled(t *testing.T) { count := 0 for iter.First(); iter.Valid(); iter.Next() { - _, compressionType := DecodeKeyMetas(iter.Key()) + _, compressionType := DecodeKeyAttributes(iter.Key()) require.Equal(t, CompressionNone, compressionType) readEntry := &common.RawKVEntry{} @@ -1248,8 +1248,8 @@ func TestEventStoreIter_NextWithFiltering(t *testing.T) { // Create iterator with a wider range to ensure it sees all keys, // so we can test the internal filtering logic. - start := EncodeKeyPrefix(subID, tableID, 0) - end := EncodeKeyPrefix(subID, tableID, 500) + start := EncodeTxnCommitTsBoundaryKey(subID, tableID, 0) + end := EncodeTxnCommitTsBoundaryKey(subID, tableID, 500) innerIter, err := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, diff --git a/logservice/eventstore/format.go b/logservice/eventstore/format.go index 627e6831b1..31dc449da8 100644 --- a/logservice/eventstore/format.go +++ b/logservice/eventstore/format.go @@ -39,11 +39,11 @@ const ( ) const ( - encodedKeyUint64Len = 8 - encodedKeyCRTsOffset = 2 * encodedKeyUint64Len - encodedKeyCRTsEnd = encodedKeyCRTsOffset + encodedKeyUint64Len - encodedKeyMetasOffset = 4 * encodedKeyUint64Len - encodedKeyMetasEnd = encodedKeyMetasOffset + 2 + encodedKeyUint64Len = 8 + encodedKeyTxnCommitTsStart = 2 * encodedKeyUint64Len + encodedKeyTxnCommitTsEnd = encodedKeyTxnCommitTsStart + encodedKeyUint64Len + encodedKeyAttributesOffset = 4 * encodedKeyUint64Len + encodedKeyAttributesEnd = encodedKeyAttributesOffset + 2 ) const ( @@ -53,34 +53,33 @@ const ( dmlOrderShift = 8 ) -// EncodeKeyPrefix encodes uniqueID, tableID, and txnCommitTs. -// The result is a prefix of a full event-store key. -func EncodeKeyPrefix(uniqueID uint64, tableID int64, txnCommitTs uint64) []byte { - buf := make([]byte, encodedKeyCRTsEnd) - encodeKeyPrefixTo(buf, uniqueID, tableID, txnCommitTs) +// EncodeTxnCommitTsBoundaryKey encodes the event-store key boundary up to txnCommitTs. +func EncodeTxnCommitTsBoundaryKey(uniqueID uint64, tableID int64, txnCommitTs uint64) []byte { + buf := make([]byte, encodedKeyTxnCommitTsEnd) + encodeTxnCommitTsBoundaryKeyTo(buf, uniqueID, tableID, txnCommitTs) return buf } -func encodeScanLowerBound(uniqueID uint64, tableID int64, txnCommitTs uint64, startTs uint64) []byte { - buf := make([]byte, encodedKeyMetasOffset) - encodeKeyPrefixTo(buf, uniqueID, tableID, txnCommitTs) - binary.BigEndian.PutUint64(buf[encodedKeyCRTsEnd:encodedKeyMetasOffset], startTs) +func encodeScanLowerBound(uniqueID uint64, tableID int64, txnCommitTs uint64, txnStartTs uint64) []byte { + buf := make([]byte, encodedKeyAttributesOffset) + encodeTxnCommitTsBoundaryKeyTo(buf, uniqueID, tableID, txnCommitTs) + binary.BigEndian.PutUint64(buf[encodedKeyTxnCommitTsEnd:encodedKeyAttributesOffset], txnStartTs) return buf } -func encodeKeyPrefixTo(buf []byte, uniqueID uint64, tableID int64, txnCommitTs uint64) { +func encodeTxnCommitTsBoundaryKeyTo(buf []byte, uniqueID uint64, tableID int64, txnCommitTs uint64) { binary.BigEndian.PutUint64(buf[:encodedKeyUint64Len], uniqueID) - binary.BigEndian.PutUint64(buf[encodedKeyUint64Len:encodedKeyCRTsOffset], uint64(tableID)) - binary.BigEndian.PutUint64(buf[encodedKeyCRTsOffset:encodedKeyCRTsEnd], txnCommitTs) + binary.BigEndian.PutUint64(buf[encodedKeyUint64Len:encodedKeyTxnCommitTsStart], uint64(tableID)) + binary.BigEndian.PutUint64(buf[encodedKeyTxnCommitTsStart:encodedKeyTxnCommitTsEnd], txnCommitTs) } func encodedKeyLen(event *common.RawKVEntry) int { - // uniqueID, tableID, CRTs, startTs, Put/Delete, CompressionType, Key - return encodedKeyMetasEnd + len(event.Key) + // uniqueID, tableID, txnCommitTs, txnStartTs, Put/Delete, CompressionType, Key + return encodedKeyAttributesEnd + len(event.Key) } // EncodeKeyTo appends an encoded event-store key to buf. -// Format: uniqueID, tableID, CRTs, startTs, delete/update/insert, Key. +// Format: uniqueID, tableID, txnCommitTs, txnStartTs, delete/update/insert, Key. func EncodeKeyTo( buf []byte, uniqueID uint64, @@ -95,9 +94,9 @@ func EncodeKeyTo( buf = binary.BigEndian.AppendUint64(buf, uniqueID) // table ID buf = binary.BigEndian.AppendUint64(buf, uint64(tableID)) - // CRTs + // txn commit ts buf = binary.BigEndian.AppendUint64(buf, event.CRTs) - // startTs + // txn start ts buf = binary.BigEndian.AppendUint64(buf, event.StartTs) // Let Delete < Update < Insert dmlOrder := getDMLOrder(event) @@ -112,20 +111,20 @@ func EncodeKey(uniqueID uint64, tableID int64, event *common.RawKVEntry, compres return EncodeKeyTo(make([]byte, 0, encodedKeyLen(event)), uniqueID, tableID, event, compressionType) } -// DecodeKeyMetas decodes compression type and dml order from the key. -func DecodeKeyMetas(key []byte) (DMLOrder, CompressionType) { - combinedOrder := binary.BigEndian.Uint16(key[encodedKeyMetasOffset:encodedKeyMetasEnd]) +// DecodeKeyAttributes decodes compression type and dml order from the key. +func DecodeKeyAttributes(key []byte) (DMLOrder, CompressionType) { + combinedOrder := binary.BigEndian.Uint16(key[encodedKeyAttributesOffset:encodedKeyAttributesEnd]) return DMLOrder((combinedOrder & dmlOrderMask) >> dmlOrderShift), CompressionType(combinedOrder & compressionMask) } -// decodeCRTsFromEncodedKey decodes CRTs from an event-store key prefix. +// decodeTxnCommitTsFromEncodedKey decodes txnCommitTs from an event-store key boundary. // It works for both full event keys and DeleteRange boundary keys because both -// contain uniqueID, tableID, and CRTs as the first three fields. -func decodeCRTsFromEncodedKey(key []byte) (uint64, bool) { - if len(key) < encodedKeyCRTsEnd { +// contain uniqueID, tableID, and txnCommitTs as the first three fields. +func decodeTxnCommitTsFromEncodedKey(key []byte) (uint64, bool) { + if len(key) < encodedKeyTxnCommitTsEnd { return 0, false } - return binary.BigEndian.Uint64(key[encodedKeyCRTsOffset:encodedKeyCRTsEnd]), true + return binary.BigEndian.Uint64(key[encodedKeyTxnCommitTsStart:encodedKeyTxnCommitTsEnd]), true } // getDMLOrder returns the order of the dml types: delete pending.item.endTs { - pending.item.endTs = endTS + if endTxnCommitTs > pending.item.endTxnCommitTs { + pending.item.endTxnCommitTs = endTxnCommitTs } } @@ -163,7 +171,7 @@ func (d *gcManager) pendingDeleteRangeCount() int { func shouldFlushDeleteRange(pending *pendingDeleteRange, now time.Time, minRangeInterval, maxDelay time.Duration) bool { // addGCItem only records valid ranges, so this should not happen in normal flow. // Keep the guard as a defensive check against unexpected state or future refactors. - if pending == nil || pending.item.endTs <= pending.item.startTs { + if pending == nil || pending.item.endTxnCommitTs <= pending.item.startTxnCommitTs { return false } if maxDelay > 0 && now.Sub(pending.firstEnqueueTime) >= maxDelay { @@ -173,8 +181,8 @@ func shouldFlushDeleteRange(pending *pendingDeleteRange, now time.Time, minRange return true } - startPhysical := oracle.ExtractPhysical(pending.item.startTs) - endPhysical := oracle.ExtractPhysical(pending.item.endTs) + startPhysical := oracle.ExtractPhysical(pending.item.startTxnCommitTs) + endPhysical := oracle.ExtractPhysical(pending.item.endTxnCommitTs) if endPhysical <= startPhysical { return false } @@ -290,7 +298,7 @@ func (d *gcManager) run(ctx context.Context) error { func (d *gcManager) doGCJob(ranges []gcRangeItem) { for _, r := range ranges { db := d.dbs[r.dbIndex] - if err := d.deleteDataRange(db, r.uniqueKeyID, r.tableID, r.startTs, r.endTs); err != nil { + if err := d.deleteDataRange(db, r.uniqueKeyID, r.tableID, r.startTxnCommitTs, r.endTxnCommitTs); err != nil { log.Warn("gc manager failed to delete data range", zap.Error(err)) } } @@ -306,8 +314,8 @@ func (d *gcManager) updateCompactRanges(ranges []gcRangeItem) { state = &compactState{} d.compactRanges[key] = state } - if state.endTs < r.endTs { - state.endTs = r.endTs + if state.endTxnCommitTs < r.endTxnCommitTs { + state.endTxnCommitTs = r.endTxnCommitTs state.compacted = false } } @@ -318,7 +326,7 @@ func (d *gcManager) doCompaction() { d.mu.Lock() for key, state := range d.compactRanges { if !state.compacted { - toCompact[key] = state.endTs + toCompact[key] = state.endTxnCommitTs state.compacted = true } } @@ -326,14 +334,14 @@ func (d *gcManager) doCompaction() { startTime := time.Now() log.Info("gc manager compacting ranges", zap.Int("rangeCount", len(toCompact))) - for key, endTs := range toCompact { + for key, endTxnCommitTs := range toCompact { db := d.dbs[key.dbIndex] - if err := d.compactDataRange(db, key.uniqueKeyID, key.tableID, 0, endTs); err != nil { + if err := d.compactDataRange(db, key.uniqueKeyID, key.tableID, 0, endTxnCommitTs); err != nil { log.Warn("gc manager failed to compact data range", zap.Int("dbIndex", key.dbIndex), zap.Uint64("uniqueKeyID", key.uniqueKeyID), zap.Int64("tableID", key.tableID), - zap.Uint64("endTs", endTs), + zap.Uint64("endTxnCommitTs", endTxnCommitTs), zap.Error(err)) } } diff --git a/logservice/eventstore/gc_test.go b/logservice/eventstore/gc_test.go index 79aaed8ea8..3c55babeea 100644 --- a/logservice/eventstore/gc_test.go +++ b/logservice/eventstore/gc_test.go @@ -58,11 +58,21 @@ func (m *mockDB) getCompactCalls() [][]byte { func TestGCManager(t *testing.T) { mdb := &mockDB{} - deleteFn := func(db *pebble.DB, uniqueKeyID uint64, tableID int64, startTs uint64, endTs uint64) error { - return mdb.DeleteRange(EncodeKeyPrefix(uniqueKeyID, tableID, startTs), EncodeKeyPrefix(uniqueKeyID, tableID, endTs), nil) + deleteFn := func( + db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, + ) error { + return mdb.DeleteRange( + EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), + EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), + nil) } - compactFn := func(db *pebble.DB, uniqueKeyID uint64, tableID int64, startTs uint64, endTs uint64) error { - return mdb.Compact(EncodeKeyPrefix(uniqueKeyID, tableID, startTs), EncodeKeyPrefix(uniqueKeyID, tableID, endTs), false) + compactFn := func( + db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, + ) error { + return mdb.Compact( + EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), + EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), + false) } gcm := newGCManager([]*pebble.DB{nil}, deleteFn, compactFn) @@ -83,26 +93,26 @@ func TestGCManager(t *testing.T) { deleteCalls := mdb.getDeleteCalls() require.Len(t, deleteCalls, 4) // The order of delete ranges is not guaranteed because it iterates over a map. - if bytes.Equal(deleteCalls[0], EncodeKeyPrefix(1, 10, 100)) { - require.Equal(t, EncodeKeyPrefix(1, 10, 200), deleteCalls[1]) - require.Equal(t, EncodeKeyPrefix(1, 20, 300), deleteCalls[2]) - require.Equal(t, EncodeKeyPrefix(1, 20, 400), deleteCalls[3]) + if bytes.Equal(deleteCalls[0], EncodeTxnCommitTsBoundaryKey(1, 10, 100)) { + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[1]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[2]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[3]) } else { - require.Equal(t, EncodeKeyPrefix(1, 20, 300), deleteCalls[0]) - require.Equal(t, EncodeKeyPrefix(1, 20, 400), deleteCalls[1]) - require.Equal(t, EncodeKeyPrefix(1, 10, 100), deleteCalls[2]) - require.Equal(t, EncodeKeyPrefix(1, 10, 200), deleteCalls[3]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[0]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[1]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 100), deleteCalls[2]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[3]) } // Check internal state for compaction gcm.mu.Lock() state1, ok := gcm.compactRanges[compactKey1] require.True(t, ok) - require.Equal(t, uint64(200), state1.endTs) + require.Equal(t, uint64(200), state1.endTxnCommitTs) require.False(t, state1.compacted) state2, ok := gcm.compactRanges[compactKey2] require.True(t, ok) - require.Equal(t, uint64(400), state2.endTs) + require.Equal(t, uint64(400), state2.endTxnCommitTs) require.False(t, state2.compacted) gcm.mu.Unlock() } @@ -112,15 +122,15 @@ func TestGCManager(t *testing.T) { compactCalls := mdb.getCompactCalls() require.Len(t, compactCalls, 4) // The order of compaction is not guaranteed because it iterates over a map. - if bytes.Equal(compactCalls[0], EncodeKeyPrefix(1, 10, 0)) { - require.Equal(t, EncodeKeyPrefix(1, 10, 200), compactCalls[1]) - require.Equal(t, EncodeKeyPrefix(1, 20, 0), compactCalls[2]) - require.Equal(t, EncodeKeyPrefix(1, 20, 400), compactCalls[3]) + if bytes.Equal(compactCalls[0], EncodeTxnCommitTsBoundaryKey(1, 10, 0)) { + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[1]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[2]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[3]) } else { - require.Equal(t, EncodeKeyPrefix(1, 20, 0), compactCalls[0]) - require.Equal(t, EncodeKeyPrefix(1, 20, 400), compactCalls[1]) - require.Equal(t, EncodeKeyPrefix(1, 10, 0), compactCalls[2]) - require.Equal(t, EncodeKeyPrefix(1, 10, 200), compactCalls[3]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[0]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[1]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 0), compactCalls[2]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[3]) } // Verify internal state is now compacted gcm.mu.Lock() @@ -155,12 +165,12 @@ func TestGCManagerDelaysSmallDeleteRanges(t *testing.T) { gcm := newGCManager(nil, nil, nil) now := time.Unix(100, 0) - startTs := oracle.ComposeTS(1_000, 0) - midTs := oracle.ComposeTS(2_000, 0) - endTs := oracle.ComposeTS(3_000, 0) + startTxnCommitTs := oracle.ComposeTS(1_000, 0) + midTxnCommitTs := oracle.ComposeTS(2_000, 0) + endTxnCommitTs := oracle.ComposeTS(3_000, 0) - gcm.addGCItem(0, 1, 10, startTs, midTs) - gcm.addGCItem(0, 1, 10, midTs, endTs) + gcm.addGCItem(0, 1, 10, startTxnCommitTs, midTxnCommitTs) + gcm.addGCItem(0, 1, 10, midTxnCommitTs, endTxnCommitTs) compactKey := compactItemKey{dbIndex: 0, uniqueKeyID: 1, tableID: 10} gcm.mu.Lock() @@ -175,18 +185,18 @@ func TestGCManagerDelaysSmallDeleteRanges(t *testing.T) { gcm.mu.Lock() pending, ok = gcm.deleteRanges[compactKey] require.True(t, ok) - require.Equal(t, startTs, pending.item.startTs) - require.Equal(t, endTs, pending.item.endTs) + require.Equal(t, startTxnCommitTs, pending.item.startTxnCommitTs) + require.Equal(t, endTxnCommitTs, pending.item.endTxnCommitTs) gcm.mu.Unlock() ranges = gcm.fetchGCItems(now.Add(31*time.Minute), 5*time.Minute, 30*time.Minute) require.Len(t, ranges, 1) require.Equal(t, gcRangeItem{ - dbIndex: 0, - uniqueKeyID: 1, - tableID: 10, - startTs: startTs, - endTs: endTs, + dbIndex: 0, + uniqueKeyID: 1, + tableID: 10, + startTxnCommitTs: startTxnCommitTs, + endTxnCommitTs: endTxnCommitTs, }, ranges[0]) require.Equal(t, 0, gcm.pendingDeleteRangeCount()) } @@ -195,10 +205,10 @@ func TestGCManagerFlushesLargeDeleteRangeImmediately(t *testing.T) { gcm := newGCManager(nil, nil, nil) now := time.Unix(100, 0) - startTs := oracle.ComposeTS(1_000, 0) - endTs := oracle.ComposeTS(1_000+6*60*1000, 0) + startTxnCommitTs := oracle.ComposeTS(1_000, 0) + endTxnCommitTs := oracle.ComposeTS(1_000+6*60*1000, 0) - gcm.addGCItem(0, 1, 10, startTs, endTs) + gcm.addGCItem(0, 1, 10, startTxnCommitTs, endTxnCommitTs) compactKey := compactItemKey{dbIndex: 0, uniqueKeyID: 1, tableID: 10} gcm.mu.Lock() @@ -210,11 +220,11 @@ func TestGCManagerFlushesLargeDeleteRangeImmediately(t *testing.T) { ranges := gcm.fetchGCItems(now.Add(time.Minute), 5*time.Minute, 30*time.Minute) require.Len(t, ranges, 1) require.Equal(t, gcRangeItem{ - dbIndex: 0, - uniqueKeyID: 1, - tableID: 10, - startTs: startTs, - endTs: endTs, + dbIndex: 0, + uniqueKeyID: 1, + tableID: 10, + startTxnCommitTs: startTxnCommitTs, + endTxnCommitTs: endTxnCommitTs, }, ranges[0]) require.Equal(t, 0, gcm.pendingDeleteRangeCount()) } @@ -236,18 +246,18 @@ func TestGCManagerWidensDisjointDeleteRanges(t *testing.T) { pending, ok := gcm.deleteRanges[compactKey] require.True(t, ok) pending.firstEnqueueTime = now - require.Equal(t, firstStart, pending.item.startTs) - require.Equal(t, secondEnd, pending.item.endTs) + require.Equal(t, firstStart, pending.item.startTxnCommitTs) + require.Equal(t, secondEnd, pending.item.endTxnCommitTs) gcm.mu.Unlock() ranges := gcm.fetchGCItems(now.Add(31*time.Minute), 5*time.Minute, 30*time.Minute) require.Len(t, ranges, 1) require.Equal(t, gcRangeItem{ - dbIndex: 0, - uniqueKeyID: 1, - tableID: 10, - startTs: firstStart, - endTs: secondEnd, + dbIndex: 0, + uniqueKeyID: 1, + tableID: 10, + startTxnCommitTs: firstStart, + endTxnCommitTs: secondEnd, }, ranges[0]) require.Equal(t, 0, gcm.pendingDeleteRangeCount()) } diff --git a/logservice/eventstore/pebble.go b/logservice/eventstore/pebble.go index e72c4133b0..e63c0e714b 100644 --- a/logservice/eventstore/pebble.go +++ b/logservice/eventstore/pebble.go @@ -58,7 +58,7 @@ func newPebbleOptions(dbNum int) *pebble.Options { // Configure options to optimize read/write performance Levels: make([]pebble.LevelOptions, 7), TablePropertyCollectors: []func() pebble.TablePropertyCollector{ - newEventStoreCRTsCollector, + newEventStoreTxnCommitTsCollector, }, } diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index bdefadf5f5..45726a1768 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -122,12 +122,12 @@ func TestCompressionAndKeyOrder(t *testing.T) { Key: []byte("test-key"), } keyWithZstd := EncodeKey(1, 1, ev, CompressionZSTD) - dmlOrder, compressionType := DecodeKeyMetas(keyWithZstd) + dmlOrder, compressionType := DecodeKeyAttributes(keyWithZstd) require.Equal(t, DMLOrderInsert, dmlOrder) require.Equal(t, CompressionZSTD, compressionType) keyWithNone := EncodeKey(1, 1, ev, CompressionNone) - dmlOrder, compressionType = DecodeKeyMetas(keyWithNone) + dmlOrder, compressionType = DecodeKeyAttributes(keyWithNone) require.Equal(t, DMLOrderInsert, dmlOrder) require.Equal(t, CompressionNone, compressionType) @@ -155,12 +155,12 @@ func TestEventStoreKeyBounds(t *testing.T) { Key: []byte("key"), } key := EncodeKey(1, 1, event, CompressionNone) - prefix := EncodeKeyPrefix(1, 1, event.CRTs) - require.Len(t, prefix, encodedKeyCRTsEnd) - require.True(t, bytes.HasPrefix(key, prefix)) + commitTsBoundaryKey := EncodeTxnCommitTsBoundaryKey(1, 1, event.CRTs) + require.Len(t, commitTsBoundaryKey, encodedKeyTxnCommitTsEnd) + require.True(t, bytes.HasPrefix(key, commitTsBoundaryKey)) lowerBound := encodeScanLowerBound(1, 1, event.CRTs, event.StartTs) - require.Len(t, lowerBound, encodedKeyMetasOffset) + require.Len(t, lowerBound, encodedKeyAttributesOffset) require.True(t, bytes.HasPrefix(key, lowerBound)) previousEvent := &common.RawKVEntry{ @@ -172,10 +172,10 @@ func TestEventStoreKeyBounds(t *testing.T) { require.Less(t, bytes.Compare(EncodeKey(1, 1, previousEvent, CompressionNone), lowerBound), 0) } -func TestEventStoreCRTsCollector(t *testing.T) { +func TestEventStoreTxnCommitTsCollector(t *testing.T) { t.Parallel() - collector := newEventStoreCRTsCollector() + collector := newEventStoreTxnCommitTsCollector() event := &common.RawKVEntry{ OpType: common.OpTypePut, StartTs: 1, @@ -188,8 +188,8 @@ func TestEventStoreCRTsCollector(t *testing.T) { UserKey: eventKey, Trailer: uint64(pebble.InternalKeyKindSet), }, eventValue)) - deleteStart := EncodeKeyPrefix(1, 1, 100) - deleteEnd := EncodeKeyPrefix(1, 1, 200) + deleteStart := EncodeTxnCommitTsBoundaryKey(1, 1, 100) + deleteEnd := EncodeTxnCommitTsBoundaryKey(1, 1, 200) require.NoError(t, collector.Add(pebble.InternalKey{ UserKey: deleteStart, Trailer: uint64(pebble.InternalKeyKindRangeDelete), @@ -197,19 +197,19 @@ func TestEventStoreCRTsCollector(t *testing.T) { props := make(map[string]string) require.NoError(t, collector.Finish(props)) - require.Equal(t, "10", props[eventStoreMinCRTsTableProperty]) - require.Equal(t, "100", props[eventStoreMaxCRTsTableProperty]) + require.Equal(t, "10", props[eventStoreMinTxnCommitTsProperty]) + require.Equal(t, "100", props[eventStoreMaxTxnCommitTsProperty]) require.Equal(t, strconv.Itoa(len(eventKey)+len(eventValue)+len(deleteStart)+len(deleteEnd)), props[eventStoreLogicalBytesProperty]) - require.True(t, newEventStoreTableFilter(9, 10)(props)) - require.True(t, newEventStoreTableFilter(100, 100)(props)) - require.False(t, newEventStoreTableFilter(101, 300)(props)) - require.True(t, newEventStoreTableFilter(101, 300)(nil)) + require.True(t, newEventStoreSSTFileFilter(9, 10)(props)) + require.True(t, newEventStoreSSTFileFilter(100, 100)(props)) + require.False(t, newEventStoreSSTFileFilter(101, 300)(props)) + require.True(t, newEventStoreSSTFileFilter(101, 300)(nil)) corruptedProps := map[string]string{ - eventStoreMinCRTsTableProperty: "300", - eventStoreMaxCRTsTableProperty: "100", + eventStoreMinTxnCommitTsProperty: "300", + eventStoreMaxTxnCommitTsProperty: "100", } - require.True(t, newEventStoreTableFilter(101, 200)(corruptedProps)) + require.True(t, newEventStoreSSTFileFilter(101, 200)(corruptedProps)) } diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go index c7400d70c3..1961b85a51 100644 --- a/logservice/eventstore/table_properties.go +++ b/logservice/eventstore/table_properties.go @@ -21,75 +21,75 @@ import ( ) const ( - eventStoreMinCRTsTableProperty = "event-store-min-crts" - eventStoreMaxCRTsTableProperty = "event-store-max-crts" - eventStoreLogicalBytesProperty = "event-store-logical-bytes" - eventStoreCRTsCollectorName = "event-store-crts-collector" + eventStoreMinTxnCommitTsProperty = "event-store-min-txn-commit-ts" + eventStoreMaxTxnCommitTsProperty = "event-store-max-txn-commit-ts" + eventStoreLogicalBytesProperty = "event-store-logical-bytes" + eventStoreTxnCommitTsCollectorName = "event-store-txn-commit-ts-collector" ) -type eventStoreCRTsCollector struct { +type eventStoreTxnCommitTsCollector struct { minTs uint64 maxTs uint64 logicalBytes uint64 hasTs bool } -func newEventStoreCRTsCollector() pebble.TablePropertyCollector { - return &eventStoreCRTsCollector{} +func newEventStoreTxnCommitTsCollector() pebble.TablePropertyCollector { + return &eventStoreTxnCommitTsCollector{} } -func (c *eventStoreCRTsCollector) Add(key pebble.InternalKey, value []byte) error { +func (c *eventStoreTxnCommitTsCollector) Add(key pebble.InternalKey, value []byte) error { // Event store DeleteRange is GC-only: it removes data that should already be // below the future scan range. Do not widen table properties with the range - // tombstone end key. For example, a cleanup tombstone [CRTs=100, CRTs=1000) + // tombstone end key. For example, a cleanup tombstone [commit-ts=100, commit-ts=1000) // would make this cleanup-only SST overlap scans like [500,600]. c.recordEncodedKey(key.UserKey) c.logicalBytes += uint64(len(key.UserKey) + len(value)) return nil } -func (c *eventStoreCRTsCollector) Finish(userProps map[string]string) error { +func (c *eventStoreTxnCommitTsCollector) Finish(userProps map[string]string) error { if !c.hasTs { return nil } - userProps[eventStoreMinCRTsTableProperty] = strconv.FormatUint(c.minTs, 10) - userProps[eventStoreMaxCRTsTableProperty] = strconv.FormatUint(c.maxTs, 10) + userProps[eventStoreMinTxnCommitTsProperty] = strconv.FormatUint(c.minTs, 10) + userProps[eventStoreMaxTxnCommitTsProperty] = strconv.FormatUint(c.maxTs, 10) userProps[eventStoreLogicalBytesProperty] = strconv.FormatUint(c.logicalBytes, 10) return nil } -func (c *eventStoreCRTsCollector) Name() string { - return eventStoreCRTsCollectorName +func (c *eventStoreTxnCommitTsCollector) Name() string { + return eventStoreTxnCommitTsCollectorName } -func (c *eventStoreCRTsCollector) recordEncodedKey(key []byte) { - crts, ok := decodeCRTsFromEncodedKey(key) +func (c *eventStoreTxnCommitTsCollector) recordEncodedKey(key []byte) { + txnCommitTs, ok := decodeTxnCommitTsFromEncodedKey(key) if !ok { return } - if !c.hasTs || crts < c.minTs { - c.minTs = crts + if !c.hasTs || txnCommitTs < c.minTs { + c.minTs = txnCommitTs } - if !c.hasTs || crts > c.maxTs { - c.maxTs = crts + if !c.hasTs || txnCommitTs > c.maxTs { + c.maxTs = txnCommitTs } c.hasTs = true } -func newEventStoreTableFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { +func newEventStoreSSTFileFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { return func(userProps map[string]string) bool { - shouldScan := eventStoreTableMayContainCRTs(userProps, lowerTs, upperTs) - recordEventStoreTableFilterMetrics(userProps, shouldScan) + shouldScan := eventStoreSSTFileMayContainTxnCommitTs(userProps, lowerTs, upperTs) + recordEventStoreSSTFileFilterMetrics(userProps, shouldScan) return shouldScan } } -func eventStoreTableMayContainCRTs(userProps map[string]string, lowerTs uint64, upperTs uint64) bool { - minTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMinCRTsTableProperty) +func eventStoreSSTFileMayContainTxnCommitTs(userProps map[string]string, lowerTs uint64, upperTs uint64) bool { + minTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMinTxnCommitTsProperty) if !ok { return true } - maxTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMaxCRTsTableProperty) + maxTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMaxTxnCommitTsProperty) if !ok { return true } @@ -99,14 +99,14 @@ func eventStoreTableMayContainCRTs(userProps map[string]string, lowerTs uint64, return maxTs >= lowerTs && minTs <= upperTs } -func recordEventStoreTableFilterMetrics(userProps map[string]string, shouldScan bool) { +func recordEventStoreSSTFileFilterMetrics(userProps map[string]string, shouldScan bool) { result := "scanned" if !shouldScan { result = "skipped" } - metrics.EventStoreTableFilterCount.WithLabelValues(result).Inc() + metrics.EventStoreSSTFileFilterCount.WithLabelValues(result).Inc() if logicalBytes, ok := parseEventStoreUint64TableProperty(userProps, eventStoreLogicalBytesProperty); ok { - metrics.EventStoreTableFilterLogicalBytes.WithLabelValues(result).Add(float64(logicalBytes)) + metrics.EventStoreSSTFileFilterLogicalBytes.WithLabelValues(result).Add(float64(logicalBytes)) } } diff --git a/pkg/metrics/event_store.go b/pkg/metrics/event_store.go index 3e499b3d52..87ae2f4d02 100644 --- a/pkg/metrics/event_store.go +++ b/pkg/metrics/event_store.go @@ -81,18 +81,18 @@ var ( Help: "The number of bytes scanned by event store.", }, []string{"type"}) - EventStoreTableFilterCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + EventStoreSSTFileFilterCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ticdc", Subsystem: "event_store", - Name: "table_filter_count", - Help: "The number of SST table filter decisions by event store.", + Name: "sst_file_filter_count", + Help: "The number of SST file filter decisions by event store.", }, []string{"result"}) - EventStoreTableFilterLogicalBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + EventStoreSSTFileFilterLogicalBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ticdc", Subsystem: "event_store", - Name: "table_filter_logical_bytes", - Help: "The estimated logical bytes in SSTs scanned or skipped by event store table filter.", + Name: "sst_file_filter_logical_bytes", + Help: "The estimated logical bytes in SST files scanned or skipped by event store.", }, []string{"result"}) EventStoreDeleteRangeCount = prometheus.NewCounter( @@ -350,8 +350,8 @@ func initEventStoreMetrics(registry *prometheus.Registry) { registry.MustRegister(EventStoreWriteDurationHistogram) registry.MustRegister(EventStoreScanRequestsCount) registry.MustRegister(EventStoreScanBytes) - registry.MustRegister(EventStoreTableFilterCount) - registry.MustRegister(EventStoreTableFilterLogicalBytes) + registry.MustRegister(EventStoreSSTFileFilterCount) + registry.MustRegister(EventStoreSSTFileFilterLogicalBytes) registry.MustRegister(EventStoreDeleteRangeCount) registry.MustRegister(EventStoreDeleteRangeFetchedCount) registry.MustRegister(EventStoreSubscriptionResolvedTsLagHist) From 5770a5241c3d091e78f86726ada9d3e3986fac49 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 15:54:57 +0800 Subject: [PATCH 07/13] logservice: remove event store sst file filtering --- logservice/eventstore/event_store.go | 11 -- logservice/eventstore/pebble.go | 3 - logservice/eventstore/pebble_test.go | 43 -------- logservice/eventstore/table_properties.go | 123 ---------------------- pkg/metrics/event_store.go | 16 --- 5 files changed, 196 deletions(-) delete mode 100644 logservice/eventstore/table_properties.go diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 434a717161..687cf03dec 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -905,13 +905,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com // CommitTsStart with start ts greater than LastScannedTxnStartTs, then scan // later commit ts up to CommitTsEnd. // - // Table filter bounds: - // lowerTs is the commit-ts lower bound for TableFilter. It skips SSTs whose - // collected CRTs range does not overlap [lowerTs, CommitTsEnd]. Therefore - // lowerTs is CommitTsStart+1 in the first case, and CommitTsStart in the - // second case. var start []byte - lowerTs := dataRange.CommitTsStart + 1 if dataRange.LastScannedTxnStartTs != 0 { start = encodeScanLowerBound( uint64(subStat.subID), @@ -919,7 +913,6 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com dataRange.CommitTsStart, dataRange.LastScannedTxnStartTs+1, ) - lowerTs = dataRange.CommitTsStart } else { start = EncodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) } @@ -928,10 +921,6 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com iter, _ := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, - TableFilter: newEventStoreSSTFileFilter( - lowerTs, - dataRange.CommitTsEnd, - ), }) decoder := e.decoderPool.Get().(*zstd.Decoder) startTime := time.Now() diff --git a/logservice/eventstore/pebble.go b/logservice/eventstore/pebble.go index e63c0e714b..f5f48bf71b 100644 --- a/logservice/eventstore/pebble.go +++ b/logservice/eventstore/pebble.go @@ -57,9 +57,6 @@ func newPebbleOptions(dbNum int) *pebble.Options { // Configure options to optimize read/write performance Levels: make([]pebble.LevelOptions, 7), - TablePropertyCollectors: []func() pebble.TablePropertyCollector{ - newEventStoreTxnCommitTsCollector, - }, } for i := 0; i < len(opts.Levels); i++ { diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index 45726a1768..b2ce6bd33a 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -18,7 +18,6 @@ import ( "encoding/binary" "fmt" "os" - "strconv" "testing" "github.com/cockroachdb/pebble" @@ -171,45 +170,3 @@ func TestEventStoreKeyBounds(t *testing.T) { } require.Less(t, bytes.Compare(EncodeKey(1, 1, previousEvent, CompressionNone), lowerBound), 0) } - -func TestEventStoreTxnCommitTsCollector(t *testing.T) { - t.Parallel() - - collector := newEventStoreTxnCommitTsCollector() - event := &common.RawKVEntry{ - OpType: common.OpTypePut, - StartTs: 1, - CRTs: 10, - Key: []byte("key"), - } - eventKey := EncodeKey(1, 1, event, CompressionNone) - eventValue := []byte("value") - require.NoError(t, collector.Add(pebble.InternalKey{ - UserKey: eventKey, - Trailer: uint64(pebble.InternalKeyKindSet), - }, eventValue)) - deleteStart := EncodeTxnCommitTsBoundaryKey(1, 1, 100) - deleteEnd := EncodeTxnCommitTsBoundaryKey(1, 1, 200) - require.NoError(t, collector.Add(pebble.InternalKey{ - UserKey: deleteStart, - Trailer: uint64(pebble.InternalKeyKindRangeDelete), - }, deleteEnd)) - - props := make(map[string]string) - require.NoError(t, collector.Finish(props)) - require.Equal(t, "10", props[eventStoreMinTxnCommitTsProperty]) - require.Equal(t, "100", props[eventStoreMaxTxnCommitTsProperty]) - require.Equal(t, strconv.Itoa(len(eventKey)+len(eventValue)+len(deleteStart)+len(deleteEnd)), - props[eventStoreLogicalBytesProperty]) - - require.True(t, newEventStoreSSTFileFilter(9, 10)(props)) - require.True(t, newEventStoreSSTFileFilter(100, 100)(props)) - require.False(t, newEventStoreSSTFileFilter(101, 300)(props)) - require.True(t, newEventStoreSSTFileFilter(101, 300)(nil)) - - corruptedProps := map[string]string{ - eventStoreMinTxnCommitTsProperty: "300", - eventStoreMaxTxnCommitTsProperty: "100", - } - require.True(t, newEventStoreSSTFileFilter(101, 200)(corruptedProps)) -} diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go deleted file mode 100644 index 1961b85a51..0000000000 --- a/logservice/eventstore/table_properties.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2026 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package eventstore - -import ( - "strconv" - - "github.com/cockroachdb/pebble" - "github.com/pingcap/ticdc/pkg/metrics" -) - -const ( - eventStoreMinTxnCommitTsProperty = "event-store-min-txn-commit-ts" - eventStoreMaxTxnCommitTsProperty = "event-store-max-txn-commit-ts" - eventStoreLogicalBytesProperty = "event-store-logical-bytes" - eventStoreTxnCommitTsCollectorName = "event-store-txn-commit-ts-collector" -) - -type eventStoreTxnCommitTsCollector struct { - minTs uint64 - maxTs uint64 - logicalBytes uint64 - hasTs bool -} - -func newEventStoreTxnCommitTsCollector() pebble.TablePropertyCollector { - return &eventStoreTxnCommitTsCollector{} -} - -func (c *eventStoreTxnCommitTsCollector) Add(key pebble.InternalKey, value []byte) error { - // Event store DeleteRange is GC-only: it removes data that should already be - // below the future scan range. Do not widen table properties with the range - // tombstone end key. For example, a cleanup tombstone [commit-ts=100, commit-ts=1000) - // would make this cleanup-only SST overlap scans like [500,600]. - c.recordEncodedKey(key.UserKey) - c.logicalBytes += uint64(len(key.UserKey) + len(value)) - return nil -} - -func (c *eventStoreTxnCommitTsCollector) Finish(userProps map[string]string) error { - if !c.hasTs { - return nil - } - userProps[eventStoreMinTxnCommitTsProperty] = strconv.FormatUint(c.minTs, 10) - userProps[eventStoreMaxTxnCommitTsProperty] = strconv.FormatUint(c.maxTs, 10) - userProps[eventStoreLogicalBytesProperty] = strconv.FormatUint(c.logicalBytes, 10) - return nil -} - -func (c *eventStoreTxnCommitTsCollector) Name() string { - return eventStoreTxnCommitTsCollectorName -} - -func (c *eventStoreTxnCommitTsCollector) recordEncodedKey(key []byte) { - txnCommitTs, ok := decodeTxnCommitTsFromEncodedKey(key) - if !ok { - return - } - if !c.hasTs || txnCommitTs < c.minTs { - c.minTs = txnCommitTs - } - if !c.hasTs || txnCommitTs > c.maxTs { - c.maxTs = txnCommitTs - } - c.hasTs = true -} - -func newEventStoreSSTFileFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { - return func(userProps map[string]string) bool { - shouldScan := eventStoreSSTFileMayContainTxnCommitTs(userProps, lowerTs, upperTs) - recordEventStoreSSTFileFilterMetrics(userProps, shouldScan) - return shouldScan - } -} - -func eventStoreSSTFileMayContainTxnCommitTs(userProps map[string]string, lowerTs uint64, upperTs uint64) bool { - minTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMinTxnCommitTsProperty) - if !ok { - return true - } - maxTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMaxTxnCommitTsProperty) - if !ok { - return true - } - if minTs > maxTs { - return true - } - return maxTs >= lowerTs && minTs <= upperTs -} - -func recordEventStoreSSTFileFilterMetrics(userProps map[string]string, shouldScan bool) { - result := "scanned" - if !shouldScan { - result = "skipped" - } - metrics.EventStoreSSTFileFilterCount.WithLabelValues(result).Inc() - if logicalBytes, ok := parseEventStoreUint64TableProperty(userProps, eventStoreLogicalBytesProperty); ok { - metrics.EventStoreSSTFileFilterLogicalBytes.WithLabelValues(result).Add(float64(logicalBytes)) - } -} - -func parseEventStoreUint64TableProperty(userProps map[string]string, key string) (uint64, bool) { - value, ok := userProps[key] - if !ok { - return 0, false - } - ts, err := strconv.ParseUint(value, 10, 64) - if err != nil { - return 0, false - } - return ts, true -} diff --git a/pkg/metrics/event_store.go b/pkg/metrics/event_store.go index 87ae2f4d02..d934173f27 100644 --- a/pkg/metrics/event_store.go +++ b/pkg/metrics/event_store.go @@ -81,20 +81,6 @@ var ( Help: "The number of bytes scanned by event store.", }, []string{"type"}) - EventStoreSSTFileFilterCount = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "ticdc", - Subsystem: "event_store", - Name: "sst_file_filter_count", - Help: "The number of SST file filter decisions by event store.", - }, []string{"result"}) - - EventStoreSSTFileFilterLogicalBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "ticdc", - Subsystem: "event_store", - Name: "sst_file_filter_logical_bytes", - Help: "The estimated logical bytes in SST files scanned or skipped by event store.", - }, []string{"result"}) - EventStoreDeleteRangeCount = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "ticdc", @@ -350,8 +336,6 @@ func initEventStoreMetrics(registry *prometheus.Registry) { registry.MustRegister(EventStoreWriteDurationHistogram) registry.MustRegister(EventStoreScanRequestsCount) registry.MustRegister(EventStoreScanBytes) - registry.MustRegister(EventStoreSSTFileFilterCount) - registry.MustRegister(EventStoreSSTFileFilterLogicalBytes) registry.MustRegister(EventStoreDeleteRangeCount) registry.MustRegister(EventStoreDeleteRangeFetchedCount) registry.MustRegister(EventStoreSubscriptionResolvedTsLagHist) From 85244a483866994c0914a562909c3568fbc0a7a4 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 16:04:23 +0800 Subject: [PATCH 08/13] fix --- logservice/eventstore/event_store.go | 4 +- .../eventstore/event_store_bench_test.go | 4 +- logservice/eventstore/event_store_test.go | 4 +- logservice/eventstore/format.go | 12 +++--- logservice/eventstore/format_test.go | 2 +- logservice/eventstore/gc_test.go | 40 +++++++++---------- logservice/eventstore/pebble_test.go | 2 +- 7 files changed, 34 insertions(+), 34 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 687cf03dec..757df15bb4 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -914,9 +914,9 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com dataRange.LastScannedTxnStartTs+1, ) } else { - start = EncodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) + start = encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) } - end := EncodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) + end := encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) // it's impossible return error here iter, _ := db.NewIter(&pebble.IterOptions{ LowerBound: start, diff --git a/logservice/eventstore/event_store_bench_test.go b/logservice/eventstore/event_store_bench_test.go index a61c2ff221..c32a1ef884 100644 --- a/logservice/eventstore/event_store_bench_test.go +++ b/logservice/eventstore/event_store_bench_test.go @@ -181,8 +181,8 @@ func BenchmarkEventStoreIteratorNext(b *testing.B) { StartKey: []byte{}, EndKey: []byte{0xff}, } - lower := EncodeTxnCommitTsBoundaryKey(1, 1, 1) - upper := EncodeTxnCommitTsBoundaryKey(1, 1, 1<<63) + lower := encodeTxnCommitTsBoundaryKey(1, 1, 1) + upper := encodeTxnCommitTsBoundaryKey(1, 1, 1<<63) b.ReportAllocs() b.ResetTimer() diff --git a/logservice/eventstore/event_store_test.go b/logservice/eventstore/event_store_test.go index 92ef174824..eb92c90715 100644 --- a/logservice/eventstore/event_store_test.go +++ b/logservice/eventstore/event_store_test.go @@ -1248,8 +1248,8 @@ func TestEventStoreIter_NextWithFiltering(t *testing.T) { // Create iterator with a wider range to ensure it sees all keys, // so we can test the internal filtering logic. - start := EncodeTxnCommitTsBoundaryKey(subID, tableID, 0) - end := EncodeTxnCommitTsBoundaryKey(subID, tableID, 500) + start := encodeTxnCommitTsBoundaryKey(subID, tableID, 0) + end := encodeTxnCommitTsBoundaryKey(subID, tableID, 500) innerIter, err := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, diff --git a/logservice/eventstore/format.go b/logservice/eventstore/format.go index 31dc449da8..47cd09eb74 100644 --- a/logservice/eventstore/format.go +++ b/logservice/eventstore/format.go @@ -53,8 +53,8 @@ const ( dmlOrderShift = 8 ) -// EncodeTxnCommitTsBoundaryKey encodes the event-store key boundary up to txnCommitTs. -func EncodeTxnCommitTsBoundaryKey(uniqueID uint64, tableID int64, txnCommitTs uint64) []byte { +// encodeTxnCommitTsBoundaryKey encodes the event-store key boundary up to txnCommitTs. +func encodeTxnCommitTsBoundaryKey(uniqueID uint64, tableID int64, txnCommitTs uint64) []byte { buf := make([]byte, encodedKeyTxnCommitTsEnd) encodeTxnCommitTsBoundaryKeyTo(buf, uniqueID, tableID, txnCommitTs) return buf @@ -140,8 +140,8 @@ func getDMLOrder(rowKV *common.RawKVEntry) DMLOrder { func deleteDataRange( db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, ) error { - start := EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs) - end := EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs) + start := encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs) + end := encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs) return db.DeleteRange(start, end, pebble.NoSync) } @@ -149,8 +149,8 @@ func deleteDataRange( func compactDataRange( db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, ) error { - start := EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs) - end := EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs) + start := encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs) + end := encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs) return db.Compact(start, end, false) } diff --git a/logservice/eventstore/format_test.go b/logservice/eventstore/format_test.go index 3e77b64182..4adf3a8b63 100644 --- a/logservice/eventstore/format_test.go +++ b/logservice/eventstore/format_test.go @@ -48,7 +48,7 @@ func TestEventStoreKeyFormatGolden(t *testing.T) { require.Equal(t, 34, encodedKeyAttributesEnd) require.Equal(t, expectedKey[:encodedKeyTxnCommitTsEnd], - EncodeTxnCommitTsBoundaryKey(uniqueID, tableID, txnCommitTs)) + encodeTxnCommitTsBoundaryKey(uniqueID, tableID, txnCommitTs)) require.Equal(t, expectedKey[:encodedKeyAttributesOffset], encodeScanLowerBound(uniqueID, tableID, txnCommitTs, txnStartTs)) diff --git a/logservice/eventstore/gc_test.go b/logservice/eventstore/gc_test.go index 3c55babeea..f17b9342a8 100644 --- a/logservice/eventstore/gc_test.go +++ b/logservice/eventstore/gc_test.go @@ -62,16 +62,16 @@ func TestGCManager(t *testing.T) { db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, ) error { return mdb.DeleteRange( - EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), - EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), + encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), + encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), nil) } compactFn := func( db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, ) error { return mdb.Compact( - EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), - EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), + encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), + encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), false) } gcm := newGCManager([]*pebble.DB{nil}, deleteFn, compactFn) @@ -93,15 +93,15 @@ func TestGCManager(t *testing.T) { deleteCalls := mdb.getDeleteCalls() require.Len(t, deleteCalls, 4) // The order of delete ranges is not guaranteed because it iterates over a map. - if bytes.Equal(deleteCalls[0], EncodeTxnCommitTsBoundaryKey(1, 10, 100)) { - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[1]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[2]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[3]) + if bytes.Equal(deleteCalls[0], encodeTxnCommitTsBoundaryKey(1, 10, 100)) { + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[1]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[2]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[3]) } else { - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[0]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[1]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 100), deleteCalls[2]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[3]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[0]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[1]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 100), deleteCalls[2]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[3]) } // Check internal state for compaction @@ -122,15 +122,15 @@ func TestGCManager(t *testing.T) { compactCalls := mdb.getCompactCalls() require.Len(t, compactCalls, 4) // The order of compaction is not guaranteed because it iterates over a map. - if bytes.Equal(compactCalls[0], EncodeTxnCommitTsBoundaryKey(1, 10, 0)) { - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[1]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[2]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[3]) + if bytes.Equal(compactCalls[0], encodeTxnCommitTsBoundaryKey(1, 10, 0)) { + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[1]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[2]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[3]) } else { - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[0]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[1]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 0), compactCalls[2]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[3]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[0]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[1]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 0), compactCalls[2]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[3]) } // Verify internal state is now compacted gcm.mu.Lock() diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index b2ce6bd33a..062c34d11e 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -154,7 +154,7 @@ func TestEventStoreKeyBounds(t *testing.T) { Key: []byte("key"), } key := EncodeKey(1, 1, event, CompressionNone) - commitTsBoundaryKey := EncodeTxnCommitTsBoundaryKey(1, 1, event.CRTs) + commitTsBoundaryKey := encodeTxnCommitTsBoundaryKey(1, 1, event.CRTs) require.Len(t, commitTsBoundaryKey, encodedKeyTxnCommitTsEnd) require.True(t, bytes.HasPrefix(key, commitTsBoundaryKey)) From 604fe9ee74fc629f3a1616080c3a96259cdeee82 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 16:22:23 +0800 Subject: [PATCH 09/13] fix scan iterator --- logservice/eventstore/event_store.go | 37 +++++++++++------ logservice/eventstore/event_store_test.go | 19 ++++++--- logservice/eventstore/gc.go | 8 ++-- pkg/eventservice/event_scanner.go | 42 +++++++++++++------ pkg/eventservice/event_scanner_test.go | 49 +++++++++++++++++++++++ pkg/eventservice/event_service_test.go | 9 +++-- 6 files changed, 125 insertions(+), 39 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 757df15bb4..10ebadf36d 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/pebble" "github.com/klauspost/compress/zstd" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/logpuller" @@ -87,8 +88,8 @@ type EventStore interface { UpdateDispatcherCheckpointTs(dispatcherID common.DispatcherID, checkpointTs uint64) - // GetIterator return an iterator which scan the data in ts range (dataRange.CommitTsStart, dataRange.CommitTsEnd] - GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) EventIterator + // GetIterator returns an iterator which scans data in ts range (dataRange.CommitTsStart, dataRange.CommitTsEnd]. + GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) (EventIterator, error) GetLogCoordinatorNodeID() node.ID } @@ -104,7 +105,8 @@ type EventIterator interface { Next() (*common.RawKVEntry, bool) // Close closes the iterator. - // It returns the number of events that are read from the iterator. + // It returns the number of events that are read from the iterator and any + // accumulated iterator error. Close() (eventCnt int64, err error) } @@ -789,9 +791,9 @@ func (e *eventStore) UpdateDispatcherCheckpointTs( updateSubStatCheckpoint(dispatcherStat.removingSubStat) } -func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) EventIterator { +func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) (EventIterator, error) { if e.closed.Load() { - return nil + return nil, nil } e.dispatcherMeta.RLock() @@ -799,7 +801,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com if !ok { log.Warn("fail to find dispatcher", zap.Stringer("dispatcherID", dispatcherID)) e.dispatcherMeta.RUnlock() - return nil + return nil, nil } tryGetDB := func(subStat *subscriptionStat, force bool) *pebble.DB { @@ -917,17 +919,26 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com start = encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) } end := encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) - // it's impossible return error here - iter, _ := db.NewIter(&pebble.IterOptions{ + iter, err := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, }) - decoder := e.decoderPool.Get().(*zstd.Decoder) + if err != nil { + return nil, errors.Trace(err) + } + startTime := time.Now() - // todo: what happens if iter.First() returns false? - _ = iter.First() + if ok := iter.First(); !ok { + if err := iter.Error(); err != nil { + if closeErr := iter.Close(); closeErr != nil { + return nil, errors.Trace(closeErr) + } + return nil, errors.Trace(err) + } + } metricEventStoreFirstReadDurationHistogram.Observe(time.Since(startTime).Seconds()) metrics.EventStoreScanRequestsCount.Inc() + decoder := e.decoderPool.Get().(*zstd.Decoder) needCheckSpan := true if stat.tableSpan.Equal(subStat.tableSpan) { @@ -945,7 +956,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com rowCount: 0, decoder: decoder, decoderPool: e.decoderPool, - } + }, nil } func (e *eventStore) GetLogCoordinatorNodeID() node.ID { @@ -1535,7 +1546,7 @@ func (iter *eventStoreIter) Close() (int64, error) { iter.decoderPool.Put(iter.decoder) iter.innerIter = nil metricEventStoreCloseReadDurationHistogram.Observe(time.Since(startTime).Seconds()) - return iter.rowCount, err + return iter.rowCount, errors.Trace(err) } func (e *eventStore) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) error { diff --git a/logservice/eventstore/event_store_test.go b/logservice/eventstore/event_store_test.go index eb92c90715..33c328c278 100644 --- a/logservice/eventstore/event_store_test.go +++ b/logservice/eventstore/event_store_test.go @@ -104,6 +104,15 @@ func newEventStoreForTest(path string) (logpuller.SubscriptionClient, EventStore return subClient, store } +func requireEventIterator( + t testing.TB, store EventStore, dispatcherID common.DispatcherID, dataRange common.DataRange, +) EventIterator { + t.Helper() + iter, err := store.GetIterator(dispatcherID, dataRange) + require.NoError(t, err) + return iter +} + func setDataSharingForTest(t *testing.T, enable bool) func() { t.Helper() originalCfg := config.GetGlobalServerConfig().Clone() @@ -718,7 +727,7 @@ func TestEventStoreSwitchSubStat(t *testing.T) { // case 1: dispatcher 2 use data from subStat 1 updateSubStatResolvedTs(1, 200) { - iter := store.GetIterator(dispatcherID2, common.DataRange{ + iter := requireEventIterator(t, store, dispatcherID2, common.DataRange{ Span: &heartbeatpb.TableSpan{ TableID: tableID, StartKey: []byte("b"), @@ -734,7 +743,7 @@ func TestEventStoreSwitchSubStat(t *testing.T) { // case 2: subStat 2 is ready, dispatcher 2 read data from subStat 2 and stop listen subStat 1 updateSubStatResolvedTs(2, 200) { - iter := store.GetIterator(dispatcherID2, common.DataRange{ + iter := requireEventIterator(t, store, dispatcherID2, common.DataRange{ Span: &heartbeatpb.TableSpan{ TableID: tableID, StartKey: []byte("b"), @@ -762,7 +771,7 @@ func TestEventStoreSwitchSubStat(t *testing.T) { // case 3: subStat 1 advance quicker than subStat 2, dispatcher 2 can still read data from subStat 1 updateSubStatResolvedTs(1, 220) { - iter := store.GetIterator(dispatcherID2, common.DataRange{ + iter := requireEventIterator(t, store, dispatcherID2, common.DataRange{ Span: &heartbeatpb.TableSpan{ TableID: tableID, StartKey: []byte("b"), @@ -796,7 +805,7 @@ func TestEventStoreSwitchSubStat(t *testing.T) { // dispatcher 2 read data from subStat 2 and totally remove itself from the subsriber list of subStat 1 updateSubStatResolvedTs(2, 220) { - iter := store.GetIterator(dispatcherID2, common.DataRange{ + iter := requireEventIterator(t, store, dispatcherID2, common.DataRange{ Span: &heartbeatpb.TableSpan{ TableID: tableID, StartKey: []byte("b"), @@ -1135,7 +1144,7 @@ func TestEventStoreGetIteratorConcurrently(t *testing.T) { CommitTsStart: startTs, CommitTsEnd: lastCommitTs + 1, } - iter := store.GetIterator(dispatcherID, dataRange) + iter := requireEventIterator(t, store, dispatcherID, dataRange) require.NotNil(t, iter, "iterator should not be nil") var receivedEvents []*common.RawKVEntry diff --git a/logservice/eventstore/gc.go b/logservice/eventstore/gc.go index 5fb43adfde..8dd7429908 100644 --- a/logservice/eventstore/gc.go +++ b/logservice/eventstore/gc.go @@ -35,11 +35,9 @@ type ( ) type gcRangeItem struct { - dbIndex int - uniqueKeyID uint64 - tableID int64 - // TODO: startTxnCommitTs may be unnecessary now because delete ranges can - // start from 0, but it may be useful after table range splitting. + dbIndex int + uniqueKeyID uint64 + tableID int64 startTxnCommitTs uint64 endTxnCommitTs uint64 } diff --git a/pkg/eventservice/event_scanner.go b/pkg/eventservice/event_scanner.go index c4540d34be..e733cc750f 100644 --- a/pkg/eventservice/event_scanner.go +++ b/pkg/eventservice/event_scanner.go @@ -34,7 +34,7 @@ import ( // eventGetter is the interface for getting iterator of events // The implementation of eventGetter is eventstore.EventStore type eventGetter interface { - GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) eventstore.EventIterator + GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) (eventstore.EventIterator, error) } // schemaGetter is the interface for getting schema info and ddl events @@ -65,7 +65,7 @@ type eventScanner struct { // newEventScanner creates a new EventScanner func newEventScanner( - eventStore eventstore.EventStore, + eventStore eventGetter, schemaStore schemastore.SchemaStore, mounter event.Mounter, mode int64, @@ -129,19 +129,33 @@ func (s *eventScanner) scan( } metrics.EventServiceGetDDLEventDuration.Observe(time.Since(start).Seconds()) - iter := s.eventGetter.GetIterator(dispatcherStat.info.GetID(), dataRange) + iter, err := s.eventGetter.GetIterator(dispatcherStat.info.GetID(), dataRange) + if err != nil { + return 0, nil, false, err + } if iter == nil { resolved := event.NewResolvedEvent(dataRange.CommitTsEnd, dispatcherStat.id, dispatcherStat.epoch) events = append(events, resolved) sess.appendEvents(events) return 0, sess.events, false, nil } - defer s.closeIterator(iter) // Execute event scanning and merging merger := newEventMerger(events) - interrupted, err := s.scanAndMergeEvents(sess, merger, iter) - return sess.eventBytes, sess.events, interrupted, err + interrupted, scanErr := s.scanAndMergeEvents(sess, merger, iter) + _, closeErr := s.closeIterator(iter) + if scanErr != nil { + if closeErr != nil { + log.Warn("event store iterator close returned error after scan error", + zap.Stringer("dispatcherID", dispatcherStat.info.GetID()), + zap.Error(closeErr)) + } + return sess.eventBytes, sess.events, interrupted, scanErr + } + if closeErr != nil { + return 0, nil, false, closeErr + } + return sess.eventBytes, sess.events, interrupted, nil } // fetchDDLEvents retrieves DDL events which finishedTs are within the range (start, end] @@ -172,14 +186,16 @@ func (s *eventScanner) fetchDDLEvents(stat *dispatcherStat, dataRange common.Dat return result, nil } -// closeIterator closes the event iterator and records metrics -func (s *eventScanner) closeIterator(iter eventstore.EventIterator) { - if iter != nil { - eventCount, _ := iter.Close() - if eventCount != 0 { - updateMetricEventStoreOutputKv(s.mode, float64(eventCount)) - } +// closeIterator closes the event iterator and records metrics. +func (s *eventScanner) closeIterator(iter eventstore.EventIterator) (int64, error) { + if iter == nil { + return 0, nil + } + eventCount, err := iter.Close() + if eventCount != 0 { + updateMetricEventStoreOutputKv(s.mode, float64(eventCount)) } + return eventCount, err } // scanAndMergeEvents performs the main scanning and merging logic diff --git a/pkg/eventservice/event_scanner_test.go b/pkg/eventservice/event_scanner_test.go index 2f63654d57..7a76adeae9 100644 --- a/pkg/eventservice/event_scanner_test.go +++ b/pkg/eventservice/event_scanner_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/logservice/eventstore" "github.com/pingcap/ticdc/logservice/schemastore" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/common/event" @@ -34,6 +35,17 @@ type mockMounter struct { event.Mounter } +type stubEventGetter struct { + iter eventstore.EventIterator + err error +} + +func (g *stubEventGetter) GetIterator( + dispatcherID common.DispatcherID, dataRange common.DataRange, +) (eventstore.EventIterator, error) { + return g.iter, g.err +} + func makeDispatcherReady(disp *dispatcherStat) { disp.setHandshaked() } @@ -46,6 +58,43 @@ func (m *mockMounter) DecodeToChunk(rawKV *common.RawKVEntry, tableInfo *common. } } +func TestEventScannerReturnsIteratorErrors(t *testing.T) { + disInfo := newMockDispatcherInfoForTest(t) + changefeedStatus := newChangefeedStatus(disInfo.GetChangefeedID(), 0) + disp := newDispatcherStat(disInfo, 1, 1, nil, changefeedStatus) + makeDispatcherReady(disp) + + dataRange := common.DataRange{ + Span: disInfo.GetTableSpan(), + CommitTsStart: disInfo.GetStartTs(), + CommitTsEnd: disInfo.GetStartTs() + 1, + } + + getIterErr := errors.New("get iterator failed") + scanner := newEventScanner( + &stubEventGetter{err: getIterErr}, + NewMockSchemaStore(), + &mockMounter{}, + 0, + ) + _, events, interrupted, err := scanner.scan(context.Background(), disp, dataRange, scanLimit{}) + require.ErrorIs(t, err, getIterErr) + require.Nil(t, events) + require.False(t, interrupted) + + closeErr := errors.New("close iterator failed") + scanner = newEventScanner( + &stubEventGetter{iter: &mockEventIterator{closeErr: closeErr}}, + NewMockSchemaStore(), + &mockMounter{}, + 0, + ) + _, events, interrupted, err = scanner.scan(context.Background(), disp, dataRange, scanLimit{}) + require.ErrorIs(t, err, closeErr) + require.Nil(t, events) + require.False(t, interrupted) +} + func TestEventScanner(t *testing.T) { helper := event.NewEventTestHelper(t) defer helper.Close() diff --git a/pkg/eventservice/event_service_test.go b/pkg/eventservice/event_service_test.go index e8cb8acde0..e58508d10a 100644 --- a/pkg/eventservice/event_service_test.go +++ b/pkg/eventservice/event_service_test.go @@ -252,7 +252,9 @@ func (m *mockEventStore) UnregisterDispatcher(changefeedID common.ChangeFeedID, m.unregisterCount.Add(1) } -func (m *mockEventStore) GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) eventstore.EventIterator { +func (m *mockEventStore) GetIterator( + dispatcherID common.DispatcherID, dataRange common.DataRange, +) (eventstore.EventIterator, error) { span, ok := m.dispatcherMap.Load(dispatcherID) if !ok { log.Panic("dispatcher not found", zap.Stringer("dispatcherID", dispatcherID)) @@ -277,7 +279,7 @@ func (m *mockEventStore) GetIterator(dispatcherID common.DispatcherID, dataRange if len(entries) != 0 { iter = &mockEventIterator{events: entries} } - return iter + return iter, nil } func (m *mockEventStore) GetLogCoordinatorNodeID() node.ID { @@ -312,6 +314,7 @@ type mockEventIterator struct { prevStartTS uint64 prevCommitTS uint64 rowCount int + closeErr error } func (iter *mockEventIterator) Next() (*common.RawKVEntry, bool) { @@ -332,7 +335,7 @@ func (iter *mockEventIterator) Next() (*common.RawKVEntry, bool) { } func (m *mockEventIterator) Close() (int64, error) { - return 0, nil + return int64(m.rowCount), m.closeErr } var _ schemastore.SchemaStore = &mockSchemaStore{} From 4a8249e546bad070dbde13c6f5e17d3358133406 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 21:41:54 +0800 Subject: [PATCH 10/13] f --- logservice/eventstore/event_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 10ebadf36d..7a1e7808ad 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -931,7 +931,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com if ok := iter.First(); !ok { if err := iter.Error(); err != nil { if closeErr := iter.Close(); closeErr != nil { - return nil, errors.Trace(closeErr) + log.Warn("close iterator failed after first read error", zap.Error(closeErr)) } return nil, errors.Trace(err) } From a687c9a216c3aa14838b45a609cccc54e49ee0be Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 7 May 2026 16:27:18 +0800 Subject: [PATCH 11/13] f --- logservice/eventstore/event_store.go | 2 ++ pkg/eventservice/event_broker.go | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 77e597fd22..031005a700 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -935,6 +935,8 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com } return nil, errors.Trace(err) } + // Empty range. Return the iterator so the scanner can finish the scan + // through the normal Next and Close path. } metricEventStoreFirstReadDurationHistogram.Observe(time.Since(startTime).Seconds()) metrics.EventStoreScanRequestsCount.Inc() diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 21bdb93641..ac8390e5c8 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -712,7 +712,6 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) { } if err != nil { - log.Error("scan events failed", zap.Stringer("changefeedID", task.changefeedStat.changefeedID), zap.Stringer("dispatcherID", task.id), zap.Int64("tableID", task.info.GetTableSpan().GetTableID()), From 80138bc7ad2b7ec4275c2eede98f3f3456dabc60 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 7 May 2026 16:46:32 +0800 Subject: [PATCH 12/13] address --- logservice/eventstore/event_store.go | 21 ++++--- logservice/eventstore/event_store_test.go | 68 +++++++++++++---------- 2 files changed, 52 insertions(+), 37 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 031005a700..4138cb3609 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -928,18 +928,21 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com } startTime := time.Now() - if ok := iter.First(); !ok { - if err := iter.Error(); err != nil { - if closeErr := iter.Close(); closeErr != nil { - log.Warn("close iterator failed after first read error", zap.Error(closeErr)) - } + hasFirstEvent := iter.First() + metricEventStoreFirstReadDurationHistogram.Observe(time.Since(startTime).Seconds()) + metrics.EventStoreScanRequestsCount.Inc() + if !hasFirstEvent { + // Empty range or first-read error. Close returns any accumulated + // iterator error while also releasing Pebble resources. + closeStartTime := time.Now() + err := iter.Close() + metricEventStoreCloseReadDurationHistogram.Observe(time.Since(closeStartTime).Seconds()) + if err != nil { return nil, errors.Trace(err) } - // Empty range. Return the iterator so the scanner can finish the scan - // through the normal Next and Close path. + return nil, nil } - metricEventStoreFirstReadDurationHistogram.Observe(time.Since(startTime).Seconds()) - metrics.EventStoreScanRequestsCount.Inc() + decoder := e.decoderPool.Get().(*zstd.Decoder) needCheckSpan := true diff --git a/logservice/eventstore/event_store_test.go b/logservice/eventstore/event_store_test.go index 33c328c278..5b3a8ac9ec 100644 --- a/logservice/eventstore/event_store_test.go +++ b/logservice/eventstore/event_store_test.go @@ -695,6 +695,22 @@ func TestEventStoreSwitchSubStat(t *testing.T) { require.NotNil(t, subStat) subStat.resolvedTs.Store(ts) } + getIterator := func() { + iter, err := store.GetIterator(dispatcherID2, common.DataRange{ + Span: &heartbeatpb.TableSpan{ + TableID: tableID, + StartKey: []byte("b"), + EndKey: []byte("h"), + }, + CommitTsStart: 100, + CommitTsEnd: 150, + }) + require.NoError(t, err) + if iter != nil { + _, err = iter.Close() + require.NoError(t, err) + } + } // ============ prepare two subscriptions ============ // add a dispatcher to create an subscription { @@ -727,33 +743,23 @@ func TestEventStoreSwitchSubStat(t *testing.T) { // case 1: dispatcher 2 use data from subStat 1 updateSubStatResolvedTs(1, 200) { - iter := requireEventIterator(t, store, dispatcherID2, common.DataRange{ - Span: &heartbeatpb.TableSpan{ - TableID: tableID, - StartKey: []byte("b"), - EndKey: []byte("h"), - }, - CommitTsStart: 100, - CommitTsEnd: 150, - }) - iterImpl := iter.(*eventStoreIter) - require.True(t, iterImpl.needCheckSpan) + getIterator() + dispatcherStat := store.(*eventStore).dispatcherMeta.dispatcherStats[dispatcherID2] + require.NotNil(t, dispatcherStat) + require.Equal(t, logpuller.SubscriptionID(1), dispatcherStat.subStat.subID) + require.Equal(t, logpuller.SubscriptionID(2), dispatcherStat.pendingSubStat.subID) + require.Nil(t, dispatcherStat.removingSubStat) } // case 2: subStat 2 is ready, dispatcher 2 read data from subStat 2 and stop listen subStat 1 updateSubStatResolvedTs(2, 200) { - iter := requireEventIterator(t, store, dispatcherID2, common.DataRange{ - Span: &heartbeatpb.TableSpan{ - TableID: tableID, - StartKey: []byte("b"), - EndKey: []byte("h"), - }, - CommitTsStart: 100, - CommitTsEnd: 150, - }) - iterImpl := iter.(*eventStoreIter) - require.False(t, iterImpl.needCheckSpan) + getIterator() + dispatcherStat := store.(*eventStore).dispatcherMeta.dispatcherStats[dispatcherID2] + require.NotNil(t, dispatcherStat) + require.Equal(t, logpuller.SubscriptionID(2), dispatcherStat.subStat.subID) + require.Nil(t, dispatcherStat.pendingSubStat) + require.Equal(t, logpuller.SubscriptionID(1), dispatcherStat.removingSubStat.subID) } // check dispatcher 2 is no longer receive event from subStat 1 { @@ -771,7 +777,7 @@ func TestEventStoreSwitchSubStat(t *testing.T) { // case 3: subStat 1 advance quicker than subStat 2, dispatcher 2 can still read data from subStat 1 updateSubStatResolvedTs(1, 220) { - iter := requireEventIterator(t, store, dispatcherID2, common.DataRange{ + iter, err := store.GetIterator(dispatcherID2, common.DataRange{ Span: &heartbeatpb.TableSpan{ TableID: tableID, StartKey: []byte("b"), @@ -780,8 +786,11 @@ func TestEventStoreSwitchSubStat(t *testing.T) { CommitTsStart: 100, CommitTsEnd: 220, }) - iterImpl := iter.(*eventStoreIter) - require.True(t, iterImpl.needCheckSpan) + require.NoError(t, err) + if iter != nil { + _, err = iter.Close() + require.NoError(t, err) + } } { subStats := store.(*eventStore).dispatcherMeta.tableStats[tableID] @@ -805,7 +814,7 @@ func TestEventStoreSwitchSubStat(t *testing.T) { // dispatcher 2 read data from subStat 2 and totally remove itself from the subsriber list of subStat 1 updateSubStatResolvedTs(2, 220) { - iter := requireEventIterator(t, store, dispatcherID2, common.DataRange{ + iter, err := store.GetIterator(dispatcherID2, common.DataRange{ Span: &heartbeatpb.TableSpan{ TableID: tableID, StartKey: []byte("b"), @@ -814,8 +823,11 @@ func TestEventStoreSwitchSubStat(t *testing.T) { CommitTsStart: 100, CommitTsEnd: 220, }) - iterImpl := iter.(*eventStoreIter) - require.False(t, iterImpl.needCheckSpan) + require.NoError(t, err) + if iter != nil { + _, err = iter.Close() + require.NoError(t, err) + } } { subStats := store.(*eventStore).dispatcherMeta.tableStats[tableID] From 1d0ea0842aaeff71c5bf576aca21ea724c891aec Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 7 May 2026 16:56:56 +0800 Subject: [PATCH 13/13] small fix --- pkg/eventservice/event_scanner.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/eventservice/event_scanner.go b/pkg/eventservice/event_scanner.go index e733cc750f..7b8c712e27 100644 --- a/pkg/eventservice/event_scanner.go +++ b/pkg/eventservice/event_scanner.go @@ -143,7 +143,7 @@ func (s *eventScanner) scan( // Execute event scanning and merging merger := newEventMerger(events) interrupted, scanErr := s.scanAndMergeEvents(sess, merger, iter) - _, closeErr := s.closeIterator(iter) + closeErr := s.closeIterator(iter) if scanErr != nil { if closeErr != nil { log.Warn("event store iterator close returned error after scan error", @@ -187,15 +187,15 @@ func (s *eventScanner) fetchDDLEvents(stat *dispatcherStat, dataRange common.Dat } // closeIterator closes the event iterator and records metrics. -func (s *eventScanner) closeIterator(iter eventstore.EventIterator) (int64, error) { +func (s *eventScanner) closeIterator(iter eventstore.EventIterator) error { if iter == nil { - return 0, nil + return nil } eventCount, err := iter.Close() if eventCount != 0 { updateMetricEventStoreOutputKv(s.mode, float64(eventCount)) } - return eventCount, err + return err } // scanAndMergeEvents performs the main scanning and merging logic