From af9b69b5e82bc125d37351cbd1f106f71410b453 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 29 Apr 2026 18:19:48 +0800 Subject: [PATCH 01/14] logservice: improve event store Pebble table filtering --- logservice/eventstore/event_store.go | 24 ++++- logservice/eventstore/pebble.go | 13 ++- logservice/eventstore/pebble_test.go | 58 ++++++++++++ logservice/eventstore/table_properties.go | 108 ++++++++++++++++++++++ 4 files changed, 197 insertions(+), 6 deletions(-) create mode 100644 logservice/eventstore/table_properties.go diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 82954a1cb9..c12c46d296 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -204,6 +204,8 @@ type eventStore struct { subClient logpuller.SubscriptionClient dbs []*pebble.DB + pebbleCache *pebble.Cache + tableCache *pebble.TableCache chs []*chann.UnlimitedChannel[eventWithCallback, uint64] writeTaskPools []*writeTaskPool @@ -258,11 +260,14 @@ func New( log.Panic("fail to remove path", zap.String("path", dbPath), zap.Error(err)) } + dbs, pebbleCache, tableCache := createPebbleDBs(dbPath, dbCount) store := &eventStore{ pdClock: appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock), subClient: subClient, - dbs: createPebbleDBs(dbPath, dbCount), + dbs: dbs, + pebbleCache: pebbleCache, + tableCache: tableCache, chs: make([]*chann.UnlimitedChannel[eventWithCallback, uint64], 0, dbCount), writeTaskPools: make([]*writeTaskPool, 0, dbCount), @@ -425,6 +430,16 @@ func (e *eventStore) Close(_ context.Context) error { log.Error("failed to close pebble db", zap.Error(err)) } } + if e.tableCache != nil { + if err := e.tableCache.Unref(); err != nil { + log.Error("failed to unref pebble table cache", zap.Error(err)) + } + e.tableCache = nil + } + if e.pebbleCache != nil { + e.pebbleCache.Unref() + e.pebbleCache = nil + } return nil } @@ -876,17 +891,22 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com // convert range before pass it to pebble: (startTs, endTs] is equal to [startTs + 1, endTs + 1) var start []byte + lowerTs := dataRange.CommitTsStart + 1 if dataRange.LastScannedTxnStartTs != 0 { start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart, dataRange.LastScannedTxnStartTs+1) + lowerTs = dataRange.CommitTsStart } else { start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) } end := EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) - // TODO: optimize read performance // it's impossible return error here iter, _ := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, + TableFilter: newEventStoreTableFilter( + lowerTs, + dataRange.CommitTsEnd, + ), }) decoder := e.decoderPool.Get().(*zstd.Decoder) startTime := time.Now() diff --git a/logservice/eventstore/pebble.go b/logservice/eventstore/pebble.go index 08d252070a..e72c4133b0 100644 --- a/logservice/eventstore/pebble.go +++ b/logservice/eventstore/pebble.go @@ -32,6 +32,7 @@ const ( cacheSize = 1 << 30 // 1GB memTableTotalSize = 1 << 30 // 1GB memTableSize = 64 << 20 // 64MB + maxOpenFilesPerDB = 10000 ) func newPebbleOptions(dbNum int) *pebble.Options { @@ -39,7 +40,7 @@ func newPebbleOptions(dbNum int) *pebble.Options { // Disable WAL to decrease io DisableWAL: true, - MaxOpenFiles: 10000, + MaxOpenFiles: maxOpenFilesPerDB, MaxConcurrentCompactions: func() int { return 6 }, @@ -56,6 +57,9 @@ func newPebbleOptions(dbNum int) *pebble.Options { // Configure options to optimize read/write performance Levels: make([]pebble.LevelOptions, 7), + TablePropertyCollectors: []func() pebble.TablePropertyCollector{ + newEventStoreCRTsCollector, + }, } for i := 0; i < len(opts.Levels); i++ { @@ -74,9 +78,10 @@ func newPebbleOptions(dbNum int) *pebble.Options { return opts } -func createPebbleDBs(rootDir string, dbNum int) []*pebble.DB { +func createPebbleDBs(rootDir string, dbNum int) ([]*pebble.DB, *pebble.Cache, *pebble.TableCache) { cache := pebble.NewCache(cacheSize) - tableCache := pebble.NewTableCache(cache, dbNum, int(cache.MaxSize())) + tableCacheSize := dbNum * pebble.TableCacheSize(maxOpenFilesPerDB) + tableCache := pebble.NewTableCache(cache, dbNum, tableCacheSize) dbs := make([]*pebble.DB, dbNum) for i := 0; i < dbNum; i++ { id := strconv.Itoa(i + 1) @@ -113,7 +118,7 @@ func createPebbleDBs(rootDir string, dbNum int) []*pebble.DB { } dbs[i] = db } - return dbs + return dbs, cache, tableCache } type eventStoreWriteStallState struct { diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index c95bf3928d..0e80628926 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -143,3 +143,61 @@ func TestCompressionAndKeyOrder(t *testing.T) { require.Less(t, bytes.Compare(keyDelete, keyUpdate), 0, "Delete should come before Update") require.Less(t, bytes.Compare(keyUpdate, keyInsert), 0, "Update should come before Insert") } + +func TestEventStoreCRTsCollector(t *testing.T) { + t.Parallel() + + collector := newEventStoreCRTsCollector() + event := &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: 1, + CRTs: 10, + Key: []byte("key"), + } + require.NoError(t, collector.Add(pebble.InternalKey{ + UserKey: EncodeKey(1, 1, event, CompressionNone), + Trailer: uint64(pebble.InternalKeyKindSet), + }, nil)) + require.NoError(t, collector.Add(pebble.InternalKey{ + UserKey: EncodeKeyPrefix(1, 1, 100), + Trailer: uint64(pebble.InternalKeyKindRangeDelete), + }, EncodeKeyPrefix(1, 1, 200))) + + props := make(map[string]string) + require.NoError(t, collector.Finish(props)) + require.Equal(t, "10", props[eventStoreMinCRTsTableProperty]) + require.Equal(t, "200", props[eventStoreMaxCRTsTableProperty]) + + require.True(t, newEventStoreTableFilter(9, 10)(props)) + require.True(t, newEventStoreTableFilter(150, 150)(props)) + require.False(t, newEventStoreTableFilter(201, 300)(props)) + require.True(t, newEventStoreTableFilter(201, 300)(nil)) +} + +func TestEventStoreTableFilterKeepsRangeDeletionTables(t *testing.T) { + t.Parallel() + + db, err := pebble.Open(t.TempDir(), newPebbleOptions(1)) + require.NoError(t, err) + defer db.Close() + + event := &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: 1, + CRTs: 80, + Key: []byte("key"), + } + require.NoError(t, db.Set(EncodeKey(1, 1, event, CompressionNone), []byte("value"), pebble.NoSync)) + require.NoError(t, db.Flush()) + require.NoError(t, deleteDataRange(db, 1, 1, 0, 100)) + require.NoError(t, db.Flush()) + + iter, err := db.NewIter(&pebble.IterOptions{ + LowerBound: EncodeKeyPrefix(1, 1, 80), + UpperBound: EncodeKeyPrefix(1, 1, 81), + TableFilter: newEventStoreTableFilter(80, 80), + }) + require.NoError(t, err) + defer iter.Close() + require.False(t, iter.First()) +} diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go new file mode 100644 index 0000000000..e183fcbd16 --- /dev/null +++ b/logservice/eventstore/table_properties.go @@ -0,0 +1,108 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventstore + +import ( + "encoding/binary" + "strconv" + + "github.com/cockroachdb/pebble" +) + +const ( + eventStoreMinCRTsTableProperty = "event-store-min-crts" + eventStoreMaxCRTsTableProperty = "event-store-max-crts" + eventStoreCRTsCollectorName = "event-store-crts-collector" + + encodedKeyCRTsOffset = 16 + encodedKeyCRTsEnd = encodedKeyCRTsOffset + 8 +) + +type eventStoreCRTsCollector struct { + minTs uint64 + maxTs uint64 + hasTs bool +} + +func newEventStoreCRTsCollector() pebble.TablePropertyCollector { + return &eventStoreCRTsCollector{} +} + +func (c *eventStoreCRTsCollector) Add(key pebble.InternalKey, value []byte) error { + c.recordEncodedKey(key.UserKey) + if key.Kind() == pebble.InternalKeyKindRangeDelete { + c.recordEncodedKey(value) + } + return nil +} + +func (c *eventStoreCRTsCollector) Finish(userProps map[string]string) error { + if !c.hasTs { + return nil + } + userProps[eventStoreMinCRTsTableProperty] = strconv.FormatUint(c.minTs, 10) + userProps[eventStoreMaxCRTsTableProperty] = strconv.FormatUint(c.maxTs, 10) + return nil +} + +func (c *eventStoreCRTsCollector) Name() string { + return eventStoreCRTsCollectorName +} + +func (c *eventStoreCRTsCollector) recordEncodedKey(key []byte) { + crts, ok := decodeCRTsFromEncodedKey(key) + if !ok { + return + } + if !c.hasTs || crts < c.minTs { + c.minTs = crts + } + if !c.hasTs || crts > c.maxTs { + c.maxTs = crts + } + c.hasTs = true +} + +func decodeCRTsFromEncodedKey(key []byte) (uint64, bool) { + if len(key) < encodedKeyCRTsEnd { + return 0, false + } + return binary.BigEndian.Uint64(key[encodedKeyCRTsOffset:encodedKeyCRTsEnd]), true +} + +func newEventStoreTableFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { + return func(userProps map[string]string) bool { + minTs, ok := parseEventStoreCRTsTableProperty(userProps, eventStoreMinCRTsTableProperty) + if !ok { + return true + } + maxTs, ok := parseEventStoreCRTsTableProperty(userProps, eventStoreMaxCRTsTableProperty) + if !ok { + return true + } + return maxTs >= lowerTs && minTs <= upperTs + } +} + +func parseEventStoreCRTsTableProperty(userProps map[string]string, key string) (uint64, bool) { + value, ok := userProps[key] + if !ok { + return 0, false + } + ts, err := strconv.ParseUint(value, 10, 64) + if err != nil { + return 0, false + } + return ts, true +} From a959f579efe42db0690a5a7b1ecb447c74f57a0d Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 29 Apr 2026 18:57:56 +0800 Subject: [PATCH 02/14] add some comment --- logservice/eventstore/event_store.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index c12c46d296..b1c8a848f6 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -889,7 +889,27 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com e.dispatcherMeta.Unlock() } - // convert range before pass it to pebble: (startTs, endTs] is equal to [startTs + 1, endTs + 1) + // dataRange fields: + // CommitTsStart and CommitTsEnd define the commit-ts scan window. + // LastScannedTxnStartTs records how far the previous scan progressed inside + // CommitTsStart. It is zero if there is no unfinished scan at CommitTsStart. + // + // Iterator key bounds: + // Pebble uses [LowerBound, UpperBound), so end is always encoded as + // CommitTsEnd+1. + // + // If LastScannedTxnStartTs is zero, scan commit ts in + // (CommitTsStart, CommitTsEnd], and use CommitTsStart+1 as LowerBound. + // + // If LastScannedTxnStartTs is non-zero, continue scanning commit ts + // CommitTsStart with start ts greater than LastScannedTxnStartTs, then scan + // later commit ts up to CommitTsEnd. + // + // Table filter bounds: + // lowerTs is the commit-ts lower bound for TableFilter. It skips SSTs whose + // collected CRTs range does not overlap [lowerTs, CommitTsEnd]. Therefore + // lowerTs is CommitTsStart+1 in the first case, and CommitTsStart in the + // second case. var start []byte lowerTs := dataRange.CommitTsStart + 1 if dataRange.LastScannedTxnStartTs != 0 { From 8668f0dc051b06f6c400c2fd591f86f2baa7c236 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Wed, 29 Apr 2026 20:34:50 +0800 Subject: [PATCH 03/14] r --- logservice/eventstore/format.go | 32 +++++++++++++++++--- logservice/eventstore/pebble_test.go | 36 +++-------------------- logservice/eventstore/table_properties.go | 20 ++++--------- 3 files changed, 37 insertions(+), 51 deletions(-) diff --git a/logservice/eventstore/format.go b/logservice/eventstore/format.go index bb8cefe2b5..6c4815a97a 100644 --- a/logservice/eventstore/format.go +++ b/logservice/eventstore/format.go @@ -38,6 +38,20 @@ const ( CompressionZSTD ) +const ( + encodedKeyUint64Len = 8 + encodedKeyOrderLen = 2 + + encodedKeyUniqueIDOffset = 0 + encodedKeyTableIDOffset = encodedKeyUniqueIDOffset + encodedKeyUint64Len + encodedKeyCRTsOffset = encodedKeyTableIDOffset + encodedKeyUint64Len + encodedKeyCRTsEnd = encodedKeyCRTsOffset + encodedKeyUint64Len + encodedKeyStartTsOffset = encodedKeyCRTsEnd + encodedKeyStartTsEnd = encodedKeyStartTsOffset + encodedKeyUint64Len + encodedKeyMetasOffset = encodedKeyStartTsEnd + encodedKeyMetasEnd = encodedKeyMetasOffset + encodedKeyOrderLen +) + const ( // Bitmask for DML order and compression type. dmlOrderMask = 0xFF00 // DML order is stored in the high 8 bits for sorting. @@ -53,9 +67,9 @@ func EncodeKeyPrefix(uniqueID uint64, tableID int64, CRTs uint64, startTs ...uin log.Panic("startTs should be at most one") } // uniqueID, tableID, CRTs. - keySize := 8 + 8 + 8 + keySize := encodedKeyCRTsEnd if len(startTs) > 0 { - keySize += 8 + keySize = encodedKeyStartTsEnd } buf := make([]byte, 0, keySize) uint64Buf := [8]byte{} @@ -78,7 +92,7 @@ func EncodeKeyPrefix(uniqueID uint64, tableID int64, CRTs uint64, startTs ...uin func encodedKeyLen(event *common.RawKVEntry) int { // uniqueID, tableID, CRTs, startTs, Put/Delete, CompressionType, Key - return 8 + 8 + 8 + 8 + 1 + 1 + len(event.Key) + return encodedKeyMetasEnd + len(event.Key) } // EncodeKeyTo appends an encoded event-store key to buf. @@ -116,10 +130,20 @@ func EncodeKey(uniqueID uint64, tableID int64, event *common.RawKVEntry, compres // DecodeKeyMetas decodes compression type and dml order from the key. func DecodeKeyMetas(key []byte) (DMLOrder, CompressionType) { - combinedOrder := binary.BigEndian.Uint16(key[32:34]) // The combined order is at offset 32 for 2 bytes. + combinedOrder := binary.BigEndian.Uint16(key[encodedKeyMetasOffset:encodedKeyMetasEnd]) return DMLOrder((combinedOrder & dmlOrderMask) >> dmlOrderShift), CompressionType(combinedOrder & compressionMask) } +// decodeCRTsFromEncodedKey decodes CRTs from an event-store key prefix. +// It works for both full event keys and DeleteRange boundary keys because both +// contain uniqueID, tableID, and CRTs as the first three fields. +func decodeCRTsFromEncodedKey(key []byte) (uint64, bool) { + if len(key) < encodedKeyCRTsEnd { + return 0, false + } + return binary.BigEndian.Uint64(key[encodedKeyCRTsOffset:encodedKeyCRTsEnd]), true +} + // getDMLOrder returns the order of the dml types: delete Date: Thu, 30 Apr 2026 16:41:00 +0800 Subject: [PATCH 04/14] f --- logservice/eventstore/pebble_test.go | 6 ++++++ logservice/eventstore/table_properties.go | 3 +++ 2 files changed, 9 insertions(+) diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index 73bb6374bb..839fae621a 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -172,4 +172,10 @@ func TestEventStoreCRTsCollector(t *testing.T) { require.True(t, newEventStoreTableFilter(100, 100)(props)) require.False(t, newEventStoreTableFilter(101, 300)(props)) require.True(t, newEventStoreTableFilter(101, 300)(nil)) + + corruptedProps := map[string]string{ + eventStoreMinCRTsTableProperty: "300", + eventStoreMaxCRTsTableProperty: "100", + } + require.True(t, newEventStoreTableFilter(101, 200)(corruptedProps)) } diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go index 2fa4334f1b..62d8076a2b 100644 --- a/logservice/eventstore/table_properties.go +++ b/logservice/eventstore/table_properties.go @@ -81,6 +81,9 @@ func newEventStoreTableFilter(lowerTs uint64, upperTs uint64) func(map[string]st if !ok { return true } + if minTs > maxTs { + return true + } return maxTs >= lowerTs && minTs <= upperTs } } From a384cb12e4e9a90af0c7ee22b250b86fb9f5b9f6 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 15:19:16 +0800 Subject: [PATCH 05/14] fix --- logservice/eventstore/event_store.go | 7 ++- logservice/eventstore/format.go | 62 +++++++++-------------- logservice/eventstore/pebble_test.go | 42 +++++++++++++-- logservice/eventstore/table_properties.go | 56 +++++++++++++------- pkg/metrics/event_store.go | 16 ++++++ 5 files changed, 122 insertions(+), 61 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index b1c8a848f6..1711b45439 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -913,7 +913,12 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com var start []byte lowerTs := dataRange.CommitTsStart + 1 if dataRange.LastScannedTxnStartTs != 0 { - start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart, dataRange.LastScannedTxnStartTs+1) + start = encodeScanLowerBound( + uint64(subStat.subID), + stat.tableSpan.TableID, + dataRange.CommitTsStart, + dataRange.LastScannedTxnStartTs+1, + ) lowerTs = dataRange.CommitTsStart } else { start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) diff --git a/logservice/eventstore/format.go b/logservice/eventstore/format.go index 6c4815a97a..627e6831b1 100644 --- a/logservice/eventstore/format.go +++ b/logservice/eventstore/format.go @@ -39,17 +39,11 @@ const ( ) const ( - encodedKeyUint64Len = 8 - encodedKeyOrderLen = 2 - - encodedKeyUniqueIDOffset = 0 - encodedKeyTableIDOffset = encodedKeyUniqueIDOffset + encodedKeyUint64Len - encodedKeyCRTsOffset = encodedKeyTableIDOffset + encodedKeyUint64Len - encodedKeyCRTsEnd = encodedKeyCRTsOffset + encodedKeyUint64Len - encodedKeyStartTsOffset = encodedKeyCRTsEnd - encodedKeyStartTsEnd = encodedKeyStartTsOffset + encodedKeyUint64Len - encodedKeyMetasOffset = encodedKeyStartTsEnd - encodedKeyMetasEnd = encodedKeyMetasOffset + encodedKeyOrderLen + encodedKeyUint64Len = 8 + encodedKeyCRTsOffset = 2 * encodedKeyUint64Len + encodedKeyCRTsEnd = encodedKeyCRTsOffset + encodedKeyUint64Len + encodedKeyMetasOffset = 4 * encodedKeyUint64Len + encodedKeyMetasEnd = encodedKeyMetasOffset + 2 ) const ( @@ -59,37 +53,27 @@ const ( dmlOrderShift = 8 ) -// EncodeKeyPrefix encodes uniqueID, tableID, CRTs and StartTs. -// StartTs is optional. -// The result should be a prefix of normal key. (TODO: add a unit test) -func EncodeKeyPrefix(uniqueID uint64, tableID int64, CRTs uint64, startTs ...uint64) []byte { - if len(startTs) > 1 { - log.Panic("startTs should be at most one") - } - // uniqueID, tableID, CRTs. - keySize := encodedKeyCRTsEnd - if len(startTs) > 0 { - keySize = encodedKeyStartTsEnd - } - buf := make([]byte, 0, keySize) - uint64Buf := [8]byte{} - // uniqueID - binary.BigEndian.PutUint64(uint64Buf[:], uniqueID) - buf = append(buf, uint64Buf[:]...) - // tableID - binary.BigEndian.PutUint64(uint64Buf[:], uint64(tableID)) - buf = append(buf, uint64Buf[:]...) - // CRTs - binary.BigEndian.PutUint64(uint64Buf[:], CRTs) - buf = append(buf, uint64Buf[:]...) - if len(startTs) > 0 { - // startTs - binary.BigEndian.PutUint64(uint64Buf[:], startTs[0]) - buf = append(buf, uint64Buf[:]...) - } +// EncodeKeyPrefix encodes uniqueID, tableID, and txnCommitTs. +// The result is a prefix of a full event-store key. +func EncodeKeyPrefix(uniqueID uint64, tableID int64, txnCommitTs uint64) []byte { + buf := make([]byte, encodedKeyCRTsEnd) + encodeKeyPrefixTo(buf, uniqueID, tableID, txnCommitTs) return buf } +func encodeScanLowerBound(uniqueID uint64, tableID int64, txnCommitTs uint64, startTs uint64) []byte { + buf := make([]byte, encodedKeyMetasOffset) + encodeKeyPrefixTo(buf, uniqueID, tableID, txnCommitTs) + binary.BigEndian.PutUint64(buf[encodedKeyCRTsEnd:encodedKeyMetasOffset], startTs) + return buf +} + +func encodeKeyPrefixTo(buf []byte, uniqueID uint64, tableID int64, txnCommitTs uint64) { + binary.BigEndian.PutUint64(buf[:encodedKeyUint64Len], uniqueID) + binary.BigEndian.PutUint64(buf[encodedKeyUint64Len:encodedKeyCRTsOffset], uint64(tableID)) + binary.BigEndian.PutUint64(buf[encodedKeyCRTsOffset:encodedKeyCRTsEnd], txnCommitTs) +} + func encodedKeyLen(event *common.RawKVEntry) int { // uniqueID, tableID, CRTs, startTs, Put/Delete, CompressionType, Key return encodedKeyMetasEnd + len(event.Key) diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index 839fae621a..bdefadf5f5 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -18,6 +18,7 @@ import ( "encoding/binary" "fmt" "os" + "strconv" "testing" "github.com/cockroachdb/pebble" @@ -144,6 +145,33 @@ func TestCompressionAndKeyOrder(t *testing.T) { require.Less(t, bytes.Compare(keyUpdate, keyInsert), 0, "Update should come before Insert") } +func TestEventStoreKeyBounds(t *testing.T) { + t.Parallel() + + event := &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: 20, + CRTs: 10, + Key: []byte("key"), + } + key := EncodeKey(1, 1, event, CompressionNone) + prefix := EncodeKeyPrefix(1, 1, event.CRTs) + require.Len(t, prefix, encodedKeyCRTsEnd) + require.True(t, bytes.HasPrefix(key, prefix)) + + lowerBound := encodeScanLowerBound(1, 1, event.CRTs, event.StartTs) + require.Len(t, lowerBound, encodedKeyMetasOffset) + require.True(t, bytes.HasPrefix(key, lowerBound)) + + previousEvent := &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: event.StartTs - 1, + CRTs: event.CRTs, + Key: []byte("key"), + } + require.Less(t, bytes.Compare(EncodeKey(1, 1, previousEvent, CompressionNone), lowerBound), 0) +} + func TestEventStoreCRTsCollector(t *testing.T) { t.Parallel() @@ -154,19 +182,25 @@ func TestEventStoreCRTsCollector(t *testing.T) { CRTs: 10, Key: []byte("key"), } + eventKey := EncodeKey(1, 1, event, CompressionNone) + eventValue := []byte("value") require.NoError(t, collector.Add(pebble.InternalKey{ - UserKey: EncodeKey(1, 1, event, CompressionNone), + UserKey: eventKey, Trailer: uint64(pebble.InternalKeyKindSet), - }, nil)) + }, eventValue)) + deleteStart := EncodeKeyPrefix(1, 1, 100) + deleteEnd := EncodeKeyPrefix(1, 1, 200) require.NoError(t, collector.Add(pebble.InternalKey{ - UserKey: EncodeKeyPrefix(1, 1, 100), + UserKey: deleteStart, Trailer: uint64(pebble.InternalKeyKindRangeDelete), - }, EncodeKeyPrefix(1, 1, 200))) + }, deleteEnd)) props := make(map[string]string) require.NoError(t, collector.Finish(props)) require.Equal(t, "10", props[eventStoreMinCRTsTableProperty]) require.Equal(t, "100", props[eventStoreMaxCRTsTableProperty]) + require.Equal(t, strconv.Itoa(len(eventKey)+len(eventValue)+len(deleteStart)+len(deleteEnd)), + props[eventStoreLogicalBytesProperty]) require.True(t, newEventStoreTableFilter(9, 10)(props)) require.True(t, newEventStoreTableFilter(100, 100)(props)) diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go index 62d8076a2b..c7400d70c3 100644 --- a/logservice/eventstore/table_properties.go +++ b/logservice/eventstore/table_properties.go @@ -17,30 +17,34 @@ import ( "strconv" "github.com/cockroachdb/pebble" + "github.com/pingcap/ticdc/pkg/metrics" ) const ( eventStoreMinCRTsTableProperty = "event-store-min-crts" eventStoreMaxCRTsTableProperty = "event-store-max-crts" + eventStoreLogicalBytesProperty = "event-store-logical-bytes" eventStoreCRTsCollectorName = "event-store-crts-collector" ) type eventStoreCRTsCollector struct { - minTs uint64 - maxTs uint64 - hasTs bool + minTs uint64 + maxTs uint64 + logicalBytes uint64 + hasTs bool } func newEventStoreCRTsCollector() pebble.TablePropertyCollector { return &eventStoreCRTsCollector{} } -func (c *eventStoreCRTsCollector) Add(key pebble.InternalKey, _ []byte) error { +func (c *eventStoreCRTsCollector) Add(key pebble.InternalKey, value []byte) error { // Event store DeleteRange is GC-only: it removes data that should already be // below the future scan range. Do not widen table properties with the range // tombstone end key. For example, a cleanup tombstone [CRTs=100, CRTs=1000) // would make this cleanup-only SST overlap scans like [500,600]. c.recordEncodedKey(key.UserKey) + c.logicalBytes += uint64(len(key.UserKey) + len(value)) return nil } @@ -50,6 +54,7 @@ func (c *eventStoreCRTsCollector) Finish(userProps map[string]string) error { } userProps[eventStoreMinCRTsTableProperty] = strconv.FormatUint(c.minTs, 10) userProps[eventStoreMaxCRTsTableProperty] = strconv.FormatUint(c.maxTs, 10) + userProps[eventStoreLogicalBytesProperty] = strconv.FormatUint(c.logicalBytes, 10) return nil } @@ -73,22 +78,39 @@ func (c *eventStoreCRTsCollector) recordEncodedKey(key []byte) { func newEventStoreTableFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { return func(userProps map[string]string) bool { - minTs, ok := parseEventStoreCRTsTableProperty(userProps, eventStoreMinCRTsTableProperty) - if !ok { - return true - } - maxTs, ok := parseEventStoreCRTsTableProperty(userProps, eventStoreMaxCRTsTableProperty) - if !ok { - return true - } - if minTs > maxTs { - return true - } - return maxTs >= lowerTs && minTs <= upperTs + shouldScan := eventStoreTableMayContainCRTs(userProps, lowerTs, upperTs) + recordEventStoreTableFilterMetrics(userProps, shouldScan) + return shouldScan } } -func parseEventStoreCRTsTableProperty(userProps map[string]string, key string) (uint64, bool) { +func eventStoreTableMayContainCRTs(userProps map[string]string, lowerTs uint64, upperTs uint64) bool { + minTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMinCRTsTableProperty) + if !ok { + return true + } + maxTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMaxCRTsTableProperty) + if !ok { + return true + } + if minTs > maxTs { + return true + } + return maxTs >= lowerTs && minTs <= upperTs +} + +func recordEventStoreTableFilterMetrics(userProps map[string]string, shouldScan bool) { + result := "scanned" + if !shouldScan { + result = "skipped" + } + metrics.EventStoreTableFilterCount.WithLabelValues(result).Inc() + if logicalBytes, ok := parseEventStoreUint64TableProperty(userProps, eventStoreLogicalBytesProperty); ok { + metrics.EventStoreTableFilterLogicalBytes.WithLabelValues(result).Add(float64(logicalBytes)) + } +} + +func parseEventStoreUint64TableProperty(userProps map[string]string, key string) (uint64, bool) { value, ok := userProps[key] if !ok { return 0, false diff --git a/pkg/metrics/event_store.go b/pkg/metrics/event_store.go index d934173f27..3e499b3d52 100644 --- a/pkg/metrics/event_store.go +++ b/pkg/metrics/event_store.go @@ -81,6 +81,20 @@ var ( Help: "The number of bytes scanned by event store.", }, []string{"type"}) + EventStoreTableFilterCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_store", + Name: "table_filter_count", + Help: "The number of SST table filter decisions by event store.", + }, []string{"result"}) + + EventStoreTableFilterLogicalBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_store", + Name: "table_filter_logical_bytes", + Help: "The estimated logical bytes in SSTs scanned or skipped by event store table filter.", + }, []string{"result"}) + EventStoreDeleteRangeCount = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "ticdc", @@ -336,6 +350,8 @@ func initEventStoreMetrics(registry *prometheus.Registry) { registry.MustRegister(EventStoreWriteDurationHistogram) registry.MustRegister(EventStoreScanRequestsCount) registry.MustRegister(EventStoreScanBytes) + registry.MustRegister(EventStoreTableFilterCount) + registry.MustRegister(EventStoreTableFilterLogicalBytes) registry.MustRegister(EventStoreDeleteRangeCount) registry.MustRegister(EventStoreDeleteRangeFetchedCount) registry.MustRegister(EventStoreSubscriptionResolvedTsLagHist) From a6cdcbcbd39938f8a81032302ef5b001c729297c Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 15:47:16 +0800 Subject: [PATCH 06/14] fix --- logservice/eventstore/event_store.go | 8 +- .../eventstore/event_store_bench_test.go | 4 +- logservice/eventstore/event_store_test.go | 8 +- logservice/eventstore/format.go | 75 ++++++------ logservice/eventstore/format_test.go | 70 ++++++++++++ logservice/eventstore/gc.go | 78 +++++++------ logservice/eventstore/gc_test.go | 108 ++++++++++-------- logservice/eventstore/pebble.go | 2 +- logservice/eventstore/pebble_test.go | 38 +++--- logservice/eventstore/table_properties.go | 58 +++++----- pkg/metrics/event_store.go | 16 +-- 11 files changed, 278 insertions(+), 187 deletions(-) create mode 100644 logservice/eventstore/format_test.go diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 1711b45439..434a717161 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -921,14 +921,14 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com ) lowerTs = dataRange.CommitTsStart } else { - start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) + start = EncodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) } - end := EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) + end := EncodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) // it's impossible return error here iter, _ := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, - TableFilter: newEventStoreTableFilter( + TableFilter: newEventStoreSSTFileFilter( lowerTs, dataRange.CommitTsEnd, ), @@ -1474,7 +1474,7 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) { key := iter.innerIter.Key() value := iter.innerIter.Value() - _, compressionType := DecodeKeyMetas(key) + _, compressionType := DecodeKeyAttributes(key) var decodedValue []byte if compressionType == CompressionZSTD { var err error diff --git a/logservice/eventstore/event_store_bench_test.go b/logservice/eventstore/event_store_bench_test.go index e8926e92e5..a61c2ff221 100644 --- a/logservice/eventstore/event_store_bench_test.go +++ b/logservice/eventstore/event_store_bench_test.go @@ -181,8 +181,8 @@ func BenchmarkEventStoreIteratorNext(b *testing.B) { StartKey: []byte{}, EndKey: []byte{0xff}, } - lower := EncodeKeyPrefix(1, 1, 1) - upper := EncodeKeyPrefix(1, 1, 1<<63) + lower := EncodeTxnCommitTsBoundaryKey(1, 1, 1) + upper := EncodeTxnCommitTsBoundaryKey(1, 1, 1<<63) b.ReportAllocs() b.ResetTimer() diff --git a/logservice/eventstore/event_store_test.go b/logservice/eventstore/event_store_test.go index f2cddd8ff8..92ef174824 100644 --- a/logservice/eventstore/event_store_test.go +++ b/logservice/eventstore/event_store_test.go @@ -891,7 +891,7 @@ func TestWriteToEventStore(t *testing.T) { key := iter.Key() value := iter.Value() - _, compressionType := DecodeKeyMetas(key) + _, compressionType := DecodeKeyAttributes(key) var decodedValue []byte if compressionType == CompressionZSTD { @@ -965,7 +965,7 @@ func TestWriteToEventStoreZstdCompressionDisabled(t *testing.T) { count := 0 for iter.First(); iter.Valid(); iter.Next() { - _, compressionType := DecodeKeyMetas(iter.Key()) + _, compressionType := DecodeKeyAttributes(iter.Key()) require.Equal(t, CompressionNone, compressionType) readEntry := &common.RawKVEntry{} @@ -1248,8 +1248,8 @@ func TestEventStoreIter_NextWithFiltering(t *testing.T) { // Create iterator with a wider range to ensure it sees all keys, // so we can test the internal filtering logic. - start := EncodeKeyPrefix(subID, tableID, 0) - end := EncodeKeyPrefix(subID, tableID, 500) + start := EncodeTxnCommitTsBoundaryKey(subID, tableID, 0) + end := EncodeTxnCommitTsBoundaryKey(subID, tableID, 500) innerIter, err := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, diff --git a/logservice/eventstore/format.go b/logservice/eventstore/format.go index 627e6831b1..31dc449da8 100644 --- a/logservice/eventstore/format.go +++ b/logservice/eventstore/format.go @@ -39,11 +39,11 @@ const ( ) const ( - encodedKeyUint64Len = 8 - encodedKeyCRTsOffset = 2 * encodedKeyUint64Len - encodedKeyCRTsEnd = encodedKeyCRTsOffset + encodedKeyUint64Len - encodedKeyMetasOffset = 4 * encodedKeyUint64Len - encodedKeyMetasEnd = encodedKeyMetasOffset + 2 + encodedKeyUint64Len = 8 + encodedKeyTxnCommitTsStart = 2 * encodedKeyUint64Len + encodedKeyTxnCommitTsEnd = encodedKeyTxnCommitTsStart + encodedKeyUint64Len + encodedKeyAttributesOffset = 4 * encodedKeyUint64Len + encodedKeyAttributesEnd = encodedKeyAttributesOffset + 2 ) const ( @@ -53,34 +53,33 @@ const ( dmlOrderShift = 8 ) -// EncodeKeyPrefix encodes uniqueID, tableID, and txnCommitTs. -// The result is a prefix of a full event-store key. -func EncodeKeyPrefix(uniqueID uint64, tableID int64, txnCommitTs uint64) []byte { - buf := make([]byte, encodedKeyCRTsEnd) - encodeKeyPrefixTo(buf, uniqueID, tableID, txnCommitTs) +// EncodeTxnCommitTsBoundaryKey encodes the event-store key boundary up to txnCommitTs. +func EncodeTxnCommitTsBoundaryKey(uniqueID uint64, tableID int64, txnCommitTs uint64) []byte { + buf := make([]byte, encodedKeyTxnCommitTsEnd) + encodeTxnCommitTsBoundaryKeyTo(buf, uniqueID, tableID, txnCommitTs) return buf } -func encodeScanLowerBound(uniqueID uint64, tableID int64, txnCommitTs uint64, startTs uint64) []byte { - buf := make([]byte, encodedKeyMetasOffset) - encodeKeyPrefixTo(buf, uniqueID, tableID, txnCommitTs) - binary.BigEndian.PutUint64(buf[encodedKeyCRTsEnd:encodedKeyMetasOffset], startTs) +func encodeScanLowerBound(uniqueID uint64, tableID int64, txnCommitTs uint64, txnStartTs uint64) []byte { + buf := make([]byte, encodedKeyAttributesOffset) + encodeTxnCommitTsBoundaryKeyTo(buf, uniqueID, tableID, txnCommitTs) + binary.BigEndian.PutUint64(buf[encodedKeyTxnCommitTsEnd:encodedKeyAttributesOffset], txnStartTs) return buf } -func encodeKeyPrefixTo(buf []byte, uniqueID uint64, tableID int64, txnCommitTs uint64) { +func encodeTxnCommitTsBoundaryKeyTo(buf []byte, uniqueID uint64, tableID int64, txnCommitTs uint64) { binary.BigEndian.PutUint64(buf[:encodedKeyUint64Len], uniqueID) - binary.BigEndian.PutUint64(buf[encodedKeyUint64Len:encodedKeyCRTsOffset], uint64(tableID)) - binary.BigEndian.PutUint64(buf[encodedKeyCRTsOffset:encodedKeyCRTsEnd], txnCommitTs) + binary.BigEndian.PutUint64(buf[encodedKeyUint64Len:encodedKeyTxnCommitTsStart], uint64(tableID)) + binary.BigEndian.PutUint64(buf[encodedKeyTxnCommitTsStart:encodedKeyTxnCommitTsEnd], txnCommitTs) } func encodedKeyLen(event *common.RawKVEntry) int { - // uniqueID, tableID, CRTs, startTs, Put/Delete, CompressionType, Key - return encodedKeyMetasEnd + len(event.Key) + // uniqueID, tableID, txnCommitTs, txnStartTs, Put/Delete, CompressionType, Key + return encodedKeyAttributesEnd + len(event.Key) } // EncodeKeyTo appends an encoded event-store key to buf. -// Format: uniqueID, tableID, CRTs, startTs, delete/update/insert, Key. +// Format: uniqueID, tableID, txnCommitTs, txnStartTs, delete/update/insert, Key. func EncodeKeyTo( buf []byte, uniqueID uint64, @@ -95,9 +94,9 @@ func EncodeKeyTo( buf = binary.BigEndian.AppendUint64(buf, uniqueID) // table ID buf = binary.BigEndian.AppendUint64(buf, uint64(tableID)) - // CRTs + // txn commit ts buf = binary.BigEndian.AppendUint64(buf, event.CRTs) - // startTs + // txn start ts buf = binary.BigEndian.AppendUint64(buf, event.StartTs) // Let Delete < Update < Insert dmlOrder := getDMLOrder(event) @@ -112,20 +111,20 @@ func EncodeKey(uniqueID uint64, tableID int64, event *common.RawKVEntry, compres return EncodeKeyTo(make([]byte, 0, encodedKeyLen(event)), uniqueID, tableID, event, compressionType) } -// DecodeKeyMetas decodes compression type and dml order from the key. -func DecodeKeyMetas(key []byte) (DMLOrder, CompressionType) { - combinedOrder := binary.BigEndian.Uint16(key[encodedKeyMetasOffset:encodedKeyMetasEnd]) +// DecodeKeyAttributes decodes compression type and dml order from the key. +func DecodeKeyAttributes(key []byte) (DMLOrder, CompressionType) { + combinedOrder := binary.BigEndian.Uint16(key[encodedKeyAttributesOffset:encodedKeyAttributesEnd]) return DMLOrder((combinedOrder & dmlOrderMask) >> dmlOrderShift), CompressionType(combinedOrder & compressionMask) } -// decodeCRTsFromEncodedKey decodes CRTs from an event-store key prefix. +// decodeTxnCommitTsFromEncodedKey decodes txnCommitTs from an event-store key boundary. // It works for both full event keys and DeleteRange boundary keys because both -// contain uniqueID, tableID, and CRTs as the first three fields. -func decodeCRTsFromEncodedKey(key []byte) (uint64, bool) { - if len(key) < encodedKeyCRTsEnd { +// contain uniqueID, tableID, and txnCommitTs as the first three fields. +func decodeTxnCommitTsFromEncodedKey(key []byte) (uint64, bool) { + if len(key) < encodedKeyTxnCommitTsEnd { return 0, false } - return binary.BigEndian.Uint64(key[encodedKeyCRTsOffset:encodedKeyCRTsEnd]), true + return binary.BigEndian.Uint64(key[encodedKeyTxnCommitTsStart:encodedKeyTxnCommitTsEnd]), true } // getDMLOrder returns the order of the dml types: delete pending.item.endTs { - pending.item.endTs = endTS + if endTxnCommitTs > pending.item.endTxnCommitTs { + pending.item.endTxnCommitTs = endTxnCommitTs } } @@ -163,7 +171,7 @@ func (d *gcManager) pendingDeleteRangeCount() int { func shouldFlushDeleteRange(pending *pendingDeleteRange, now time.Time, minRangeInterval, maxDelay time.Duration) bool { // addGCItem only records valid ranges, so this should not happen in normal flow. // Keep the guard as a defensive check against unexpected state or future refactors. - if pending == nil || pending.item.endTs <= pending.item.startTs { + if pending == nil || pending.item.endTxnCommitTs <= pending.item.startTxnCommitTs { return false } if maxDelay > 0 && now.Sub(pending.firstEnqueueTime) >= maxDelay { @@ -173,8 +181,8 @@ func shouldFlushDeleteRange(pending *pendingDeleteRange, now time.Time, minRange return true } - startPhysical := oracle.ExtractPhysical(pending.item.startTs) - endPhysical := oracle.ExtractPhysical(pending.item.endTs) + startPhysical := oracle.ExtractPhysical(pending.item.startTxnCommitTs) + endPhysical := oracle.ExtractPhysical(pending.item.endTxnCommitTs) if endPhysical <= startPhysical { return false } @@ -290,7 +298,7 @@ func (d *gcManager) run(ctx context.Context) error { func (d *gcManager) doGCJob(ranges []gcRangeItem) { for _, r := range ranges { db := d.dbs[r.dbIndex] - if err := d.deleteDataRange(db, r.uniqueKeyID, r.tableID, r.startTs, r.endTs); err != nil { + if err := d.deleteDataRange(db, r.uniqueKeyID, r.tableID, r.startTxnCommitTs, r.endTxnCommitTs); err != nil { log.Warn("gc manager failed to delete data range", zap.Error(err)) } } @@ -306,8 +314,8 @@ func (d *gcManager) updateCompactRanges(ranges []gcRangeItem) { state = &compactState{} d.compactRanges[key] = state } - if state.endTs < r.endTs { - state.endTs = r.endTs + if state.endTxnCommitTs < r.endTxnCommitTs { + state.endTxnCommitTs = r.endTxnCommitTs state.compacted = false } } @@ -318,7 +326,7 @@ func (d *gcManager) doCompaction() { d.mu.Lock() for key, state := range d.compactRanges { if !state.compacted { - toCompact[key] = state.endTs + toCompact[key] = state.endTxnCommitTs state.compacted = true } } @@ -326,14 +334,14 @@ func (d *gcManager) doCompaction() { startTime := time.Now() log.Info("gc manager compacting ranges", zap.Int("rangeCount", len(toCompact))) - for key, endTs := range toCompact { + for key, endTxnCommitTs := range toCompact { db := d.dbs[key.dbIndex] - if err := d.compactDataRange(db, key.uniqueKeyID, key.tableID, 0, endTs); err != nil { + if err := d.compactDataRange(db, key.uniqueKeyID, key.tableID, 0, endTxnCommitTs); err != nil { log.Warn("gc manager failed to compact data range", zap.Int("dbIndex", key.dbIndex), zap.Uint64("uniqueKeyID", key.uniqueKeyID), zap.Int64("tableID", key.tableID), - zap.Uint64("endTs", endTs), + zap.Uint64("endTxnCommitTs", endTxnCommitTs), zap.Error(err)) } } diff --git a/logservice/eventstore/gc_test.go b/logservice/eventstore/gc_test.go index 79aaed8ea8..3c55babeea 100644 --- a/logservice/eventstore/gc_test.go +++ b/logservice/eventstore/gc_test.go @@ -58,11 +58,21 @@ func (m *mockDB) getCompactCalls() [][]byte { func TestGCManager(t *testing.T) { mdb := &mockDB{} - deleteFn := func(db *pebble.DB, uniqueKeyID uint64, tableID int64, startTs uint64, endTs uint64) error { - return mdb.DeleteRange(EncodeKeyPrefix(uniqueKeyID, tableID, startTs), EncodeKeyPrefix(uniqueKeyID, tableID, endTs), nil) + deleteFn := func( + db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, + ) error { + return mdb.DeleteRange( + EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), + EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), + nil) } - compactFn := func(db *pebble.DB, uniqueKeyID uint64, tableID int64, startTs uint64, endTs uint64) error { - return mdb.Compact(EncodeKeyPrefix(uniqueKeyID, tableID, startTs), EncodeKeyPrefix(uniqueKeyID, tableID, endTs), false) + compactFn := func( + db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, + ) error { + return mdb.Compact( + EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), + EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), + false) } gcm := newGCManager([]*pebble.DB{nil}, deleteFn, compactFn) @@ -83,26 +93,26 @@ func TestGCManager(t *testing.T) { deleteCalls := mdb.getDeleteCalls() require.Len(t, deleteCalls, 4) // The order of delete ranges is not guaranteed because it iterates over a map. - if bytes.Equal(deleteCalls[0], EncodeKeyPrefix(1, 10, 100)) { - require.Equal(t, EncodeKeyPrefix(1, 10, 200), deleteCalls[1]) - require.Equal(t, EncodeKeyPrefix(1, 20, 300), deleteCalls[2]) - require.Equal(t, EncodeKeyPrefix(1, 20, 400), deleteCalls[3]) + if bytes.Equal(deleteCalls[0], EncodeTxnCommitTsBoundaryKey(1, 10, 100)) { + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[1]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[2]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[3]) } else { - require.Equal(t, EncodeKeyPrefix(1, 20, 300), deleteCalls[0]) - require.Equal(t, EncodeKeyPrefix(1, 20, 400), deleteCalls[1]) - require.Equal(t, EncodeKeyPrefix(1, 10, 100), deleteCalls[2]) - require.Equal(t, EncodeKeyPrefix(1, 10, 200), deleteCalls[3]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[0]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[1]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 100), deleteCalls[2]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[3]) } // Check internal state for compaction gcm.mu.Lock() state1, ok := gcm.compactRanges[compactKey1] require.True(t, ok) - require.Equal(t, uint64(200), state1.endTs) + require.Equal(t, uint64(200), state1.endTxnCommitTs) require.False(t, state1.compacted) state2, ok := gcm.compactRanges[compactKey2] require.True(t, ok) - require.Equal(t, uint64(400), state2.endTs) + require.Equal(t, uint64(400), state2.endTxnCommitTs) require.False(t, state2.compacted) gcm.mu.Unlock() } @@ -112,15 +122,15 @@ func TestGCManager(t *testing.T) { compactCalls := mdb.getCompactCalls() require.Len(t, compactCalls, 4) // The order of compaction is not guaranteed because it iterates over a map. - if bytes.Equal(compactCalls[0], EncodeKeyPrefix(1, 10, 0)) { - require.Equal(t, EncodeKeyPrefix(1, 10, 200), compactCalls[1]) - require.Equal(t, EncodeKeyPrefix(1, 20, 0), compactCalls[2]) - require.Equal(t, EncodeKeyPrefix(1, 20, 400), compactCalls[3]) + if bytes.Equal(compactCalls[0], EncodeTxnCommitTsBoundaryKey(1, 10, 0)) { + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[1]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[2]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[3]) } else { - require.Equal(t, EncodeKeyPrefix(1, 20, 0), compactCalls[0]) - require.Equal(t, EncodeKeyPrefix(1, 20, 400), compactCalls[1]) - require.Equal(t, EncodeKeyPrefix(1, 10, 0), compactCalls[2]) - require.Equal(t, EncodeKeyPrefix(1, 10, 200), compactCalls[3]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[0]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[1]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 0), compactCalls[2]) + require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[3]) } // Verify internal state is now compacted gcm.mu.Lock() @@ -155,12 +165,12 @@ func TestGCManagerDelaysSmallDeleteRanges(t *testing.T) { gcm := newGCManager(nil, nil, nil) now := time.Unix(100, 0) - startTs := oracle.ComposeTS(1_000, 0) - midTs := oracle.ComposeTS(2_000, 0) - endTs := oracle.ComposeTS(3_000, 0) + startTxnCommitTs := oracle.ComposeTS(1_000, 0) + midTxnCommitTs := oracle.ComposeTS(2_000, 0) + endTxnCommitTs := oracle.ComposeTS(3_000, 0) - gcm.addGCItem(0, 1, 10, startTs, midTs) - gcm.addGCItem(0, 1, 10, midTs, endTs) + gcm.addGCItem(0, 1, 10, startTxnCommitTs, midTxnCommitTs) + gcm.addGCItem(0, 1, 10, midTxnCommitTs, endTxnCommitTs) compactKey := compactItemKey{dbIndex: 0, uniqueKeyID: 1, tableID: 10} gcm.mu.Lock() @@ -175,18 +185,18 @@ func TestGCManagerDelaysSmallDeleteRanges(t *testing.T) { gcm.mu.Lock() pending, ok = gcm.deleteRanges[compactKey] require.True(t, ok) - require.Equal(t, startTs, pending.item.startTs) - require.Equal(t, endTs, pending.item.endTs) + require.Equal(t, startTxnCommitTs, pending.item.startTxnCommitTs) + require.Equal(t, endTxnCommitTs, pending.item.endTxnCommitTs) gcm.mu.Unlock() ranges = gcm.fetchGCItems(now.Add(31*time.Minute), 5*time.Minute, 30*time.Minute) require.Len(t, ranges, 1) require.Equal(t, gcRangeItem{ - dbIndex: 0, - uniqueKeyID: 1, - tableID: 10, - startTs: startTs, - endTs: endTs, + dbIndex: 0, + uniqueKeyID: 1, + tableID: 10, + startTxnCommitTs: startTxnCommitTs, + endTxnCommitTs: endTxnCommitTs, }, ranges[0]) require.Equal(t, 0, gcm.pendingDeleteRangeCount()) } @@ -195,10 +205,10 @@ func TestGCManagerFlushesLargeDeleteRangeImmediately(t *testing.T) { gcm := newGCManager(nil, nil, nil) now := time.Unix(100, 0) - startTs := oracle.ComposeTS(1_000, 0) - endTs := oracle.ComposeTS(1_000+6*60*1000, 0) + startTxnCommitTs := oracle.ComposeTS(1_000, 0) + endTxnCommitTs := oracle.ComposeTS(1_000+6*60*1000, 0) - gcm.addGCItem(0, 1, 10, startTs, endTs) + gcm.addGCItem(0, 1, 10, startTxnCommitTs, endTxnCommitTs) compactKey := compactItemKey{dbIndex: 0, uniqueKeyID: 1, tableID: 10} gcm.mu.Lock() @@ -210,11 +220,11 @@ func TestGCManagerFlushesLargeDeleteRangeImmediately(t *testing.T) { ranges := gcm.fetchGCItems(now.Add(time.Minute), 5*time.Minute, 30*time.Minute) require.Len(t, ranges, 1) require.Equal(t, gcRangeItem{ - dbIndex: 0, - uniqueKeyID: 1, - tableID: 10, - startTs: startTs, - endTs: endTs, + dbIndex: 0, + uniqueKeyID: 1, + tableID: 10, + startTxnCommitTs: startTxnCommitTs, + endTxnCommitTs: endTxnCommitTs, }, ranges[0]) require.Equal(t, 0, gcm.pendingDeleteRangeCount()) } @@ -236,18 +246,18 @@ func TestGCManagerWidensDisjointDeleteRanges(t *testing.T) { pending, ok := gcm.deleteRanges[compactKey] require.True(t, ok) pending.firstEnqueueTime = now - require.Equal(t, firstStart, pending.item.startTs) - require.Equal(t, secondEnd, pending.item.endTs) + require.Equal(t, firstStart, pending.item.startTxnCommitTs) + require.Equal(t, secondEnd, pending.item.endTxnCommitTs) gcm.mu.Unlock() ranges := gcm.fetchGCItems(now.Add(31*time.Minute), 5*time.Minute, 30*time.Minute) require.Len(t, ranges, 1) require.Equal(t, gcRangeItem{ - dbIndex: 0, - uniqueKeyID: 1, - tableID: 10, - startTs: firstStart, - endTs: secondEnd, + dbIndex: 0, + uniqueKeyID: 1, + tableID: 10, + startTxnCommitTs: firstStart, + endTxnCommitTs: secondEnd, }, ranges[0]) require.Equal(t, 0, gcm.pendingDeleteRangeCount()) } diff --git a/logservice/eventstore/pebble.go b/logservice/eventstore/pebble.go index e72c4133b0..e63c0e714b 100644 --- a/logservice/eventstore/pebble.go +++ b/logservice/eventstore/pebble.go @@ -58,7 +58,7 @@ func newPebbleOptions(dbNum int) *pebble.Options { // Configure options to optimize read/write performance Levels: make([]pebble.LevelOptions, 7), TablePropertyCollectors: []func() pebble.TablePropertyCollector{ - newEventStoreCRTsCollector, + newEventStoreTxnCommitTsCollector, }, } diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index bdefadf5f5..45726a1768 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -122,12 +122,12 @@ func TestCompressionAndKeyOrder(t *testing.T) { Key: []byte("test-key"), } keyWithZstd := EncodeKey(1, 1, ev, CompressionZSTD) - dmlOrder, compressionType := DecodeKeyMetas(keyWithZstd) + dmlOrder, compressionType := DecodeKeyAttributes(keyWithZstd) require.Equal(t, DMLOrderInsert, dmlOrder) require.Equal(t, CompressionZSTD, compressionType) keyWithNone := EncodeKey(1, 1, ev, CompressionNone) - dmlOrder, compressionType = DecodeKeyMetas(keyWithNone) + dmlOrder, compressionType = DecodeKeyAttributes(keyWithNone) require.Equal(t, DMLOrderInsert, dmlOrder) require.Equal(t, CompressionNone, compressionType) @@ -155,12 +155,12 @@ func TestEventStoreKeyBounds(t *testing.T) { Key: []byte("key"), } key := EncodeKey(1, 1, event, CompressionNone) - prefix := EncodeKeyPrefix(1, 1, event.CRTs) - require.Len(t, prefix, encodedKeyCRTsEnd) - require.True(t, bytes.HasPrefix(key, prefix)) + commitTsBoundaryKey := EncodeTxnCommitTsBoundaryKey(1, 1, event.CRTs) + require.Len(t, commitTsBoundaryKey, encodedKeyTxnCommitTsEnd) + require.True(t, bytes.HasPrefix(key, commitTsBoundaryKey)) lowerBound := encodeScanLowerBound(1, 1, event.CRTs, event.StartTs) - require.Len(t, lowerBound, encodedKeyMetasOffset) + require.Len(t, lowerBound, encodedKeyAttributesOffset) require.True(t, bytes.HasPrefix(key, lowerBound)) previousEvent := &common.RawKVEntry{ @@ -172,10 +172,10 @@ func TestEventStoreKeyBounds(t *testing.T) { require.Less(t, bytes.Compare(EncodeKey(1, 1, previousEvent, CompressionNone), lowerBound), 0) } -func TestEventStoreCRTsCollector(t *testing.T) { +func TestEventStoreTxnCommitTsCollector(t *testing.T) { t.Parallel() - collector := newEventStoreCRTsCollector() + collector := newEventStoreTxnCommitTsCollector() event := &common.RawKVEntry{ OpType: common.OpTypePut, StartTs: 1, @@ -188,8 +188,8 @@ func TestEventStoreCRTsCollector(t *testing.T) { UserKey: eventKey, Trailer: uint64(pebble.InternalKeyKindSet), }, eventValue)) - deleteStart := EncodeKeyPrefix(1, 1, 100) - deleteEnd := EncodeKeyPrefix(1, 1, 200) + deleteStart := EncodeTxnCommitTsBoundaryKey(1, 1, 100) + deleteEnd := EncodeTxnCommitTsBoundaryKey(1, 1, 200) require.NoError(t, collector.Add(pebble.InternalKey{ UserKey: deleteStart, Trailer: uint64(pebble.InternalKeyKindRangeDelete), @@ -197,19 +197,19 @@ func TestEventStoreCRTsCollector(t *testing.T) { props := make(map[string]string) require.NoError(t, collector.Finish(props)) - require.Equal(t, "10", props[eventStoreMinCRTsTableProperty]) - require.Equal(t, "100", props[eventStoreMaxCRTsTableProperty]) + require.Equal(t, "10", props[eventStoreMinTxnCommitTsProperty]) + require.Equal(t, "100", props[eventStoreMaxTxnCommitTsProperty]) require.Equal(t, strconv.Itoa(len(eventKey)+len(eventValue)+len(deleteStart)+len(deleteEnd)), props[eventStoreLogicalBytesProperty]) - require.True(t, newEventStoreTableFilter(9, 10)(props)) - require.True(t, newEventStoreTableFilter(100, 100)(props)) - require.False(t, newEventStoreTableFilter(101, 300)(props)) - require.True(t, newEventStoreTableFilter(101, 300)(nil)) + require.True(t, newEventStoreSSTFileFilter(9, 10)(props)) + require.True(t, newEventStoreSSTFileFilter(100, 100)(props)) + require.False(t, newEventStoreSSTFileFilter(101, 300)(props)) + require.True(t, newEventStoreSSTFileFilter(101, 300)(nil)) corruptedProps := map[string]string{ - eventStoreMinCRTsTableProperty: "300", - eventStoreMaxCRTsTableProperty: "100", + eventStoreMinTxnCommitTsProperty: "300", + eventStoreMaxTxnCommitTsProperty: "100", } - require.True(t, newEventStoreTableFilter(101, 200)(corruptedProps)) + require.True(t, newEventStoreSSTFileFilter(101, 200)(corruptedProps)) } diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go index c7400d70c3..1961b85a51 100644 --- a/logservice/eventstore/table_properties.go +++ b/logservice/eventstore/table_properties.go @@ -21,75 +21,75 @@ import ( ) const ( - eventStoreMinCRTsTableProperty = "event-store-min-crts" - eventStoreMaxCRTsTableProperty = "event-store-max-crts" - eventStoreLogicalBytesProperty = "event-store-logical-bytes" - eventStoreCRTsCollectorName = "event-store-crts-collector" + eventStoreMinTxnCommitTsProperty = "event-store-min-txn-commit-ts" + eventStoreMaxTxnCommitTsProperty = "event-store-max-txn-commit-ts" + eventStoreLogicalBytesProperty = "event-store-logical-bytes" + eventStoreTxnCommitTsCollectorName = "event-store-txn-commit-ts-collector" ) -type eventStoreCRTsCollector struct { +type eventStoreTxnCommitTsCollector struct { minTs uint64 maxTs uint64 logicalBytes uint64 hasTs bool } -func newEventStoreCRTsCollector() pebble.TablePropertyCollector { - return &eventStoreCRTsCollector{} +func newEventStoreTxnCommitTsCollector() pebble.TablePropertyCollector { + return &eventStoreTxnCommitTsCollector{} } -func (c *eventStoreCRTsCollector) Add(key pebble.InternalKey, value []byte) error { +func (c *eventStoreTxnCommitTsCollector) Add(key pebble.InternalKey, value []byte) error { // Event store DeleteRange is GC-only: it removes data that should already be // below the future scan range. Do not widen table properties with the range - // tombstone end key. For example, a cleanup tombstone [CRTs=100, CRTs=1000) + // tombstone end key. For example, a cleanup tombstone [commit-ts=100, commit-ts=1000) // would make this cleanup-only SST overlap scans like [500,600]. c.recordEncodedKey(key.UserKey) c.logicalBytes += uint64(len(key.UserKey) + len(value)) return nil } -func (c *eventStoreCRTsCollector) Finish(userProps map[string]string) error { +func (c *eventStoreTxnCommitTsCollector) Finish(userProps map[string]string) error { if !c.hasTs { return nil } - userProps[eventStoreMinCRTsTableProperty] = strconv.FormatUint(c.minTs, 10) - userProps[eventStoreMaxCRTsTableProperty] = strconv.FormatUint(c.maxTs, 10) + userProps[eventStoreMinTxnCommitTsProperty] = strconv.FormatUint(c.minTs, 10) + userProps[eventStoreMaxTxnCommitTsProperty] = strconv.FormatUint(c.maxTs, 10) userProps[eventStoreLogicalBytesProperty] = strconv.FormatUint(c.logicalBytes, 10) return nil } -func (c *eventStoreCRTsCollector) Name() string { - return eventStoreCRTsCollectorName +func (c *eventStoreTxnCommitTsCollector) Name() string { + return eventStoreTxnCommitTsCollectorName } -func (c *eventStoreCRTsCollector) recordEncodedKey(key []byte) { - crts, ok := decodeCRTsFromEncodedKey(key) +func (c *eventStoreTxnCommitTsCollector) recordEncodedKey(key []byte) { + txnCommitTs, ok := decodeTxnCommitTsFromEncodedKey(key) if !ok { return } - if !c.hasTs || crts < c.minTs { - c.minTs = crts + if !c.hasTs || txnCommitTs < c.minTs { + c.minTs = txnCommitTs } - if !c.hasTs || crts > c.maxTs { - c.maxTs = crts + if !c.hasTs || txnCommitTs > c.maxTs { + c.maxTs = txnCommitTs } c.hasTs = true } -func newEventStoreTableFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { +func newEventStoreSSTFileFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { return func(userProps map[string]string) bool { - shouldScan := eventStoreTableMayContainCRTs(userProps, lowerTs, upperTs) - recordEventStoreTableFilterMetrics(userProps, shouldScan) + shouldScan := eventStoreSSTFileMayContainTxnCommitTs(userProps, lowerTs, upperTs) + recordEventStoreSSTFileFilterMetrics(userProps, shouldScan) return shouldScan } } -func eventStoreTableMayContainCRTs(userProps map[string]string, lowerTs uint64, upperTs uint64) bool { - minTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMinCRTsTableProperty) +func eventStoreSSTFileMayContainTxnCommitTs(userProps map[string]string, lowerTs uint64, upperTs uint64) bool { + minTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMinTxnCommitTsProperty) if !ok { return true } - maxTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMaxCRTsTableProperty) + maxTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMaxTxnCommitTsProperty) if !ok { return true } @@ -99,14 +99,14 @@ func eventStoreTableMayContainCRTs(userProps map[string]string, lowerTs uint64, return maxTs >= lowerTs && minTs <= upperTs } -func recordEventStoreTableFilterMetrics(userProps map[string]string, shouldScan bool) { +func recordEventStoreSSTFileFilterMetrics(userProps map[string]string, shouldScan bool) { result := "scanned" if !shouldScan { result = "skipped" } - metrics.EventStoreTableFilterCount.WithLabelValues(result).Inc() + metrics.EventStoreSSTFileFilterCount.WithLabelValues(result).Inc() if logicalBytes, ok := parseEventStoreUint64TableProperty(userProps, eventStoreLogicalBytesProperty); ok { - metrics.EventStoreTableFilterLogicalBytes.WithLabelValues(result).Add(float64(logicalBytes)) + metrics.EventStoreSSTFileFilterLogicalBytes.WithLabelValues(result).Add(float64(logicalBytes)) } } diff --git a/pkg/metrics/event_store.go b/pkg/metrics/event_store.go index 3e499b3d52..87ae2f4d02 100644 --- a/pkg/metrics/event_store.go +++ b/pkg/metrics/event_store.go @@ -81,18 +81,18 @@ var ( Help: "The number of bytes scanned by event store.", }, []string{"type"}) - EventStoreTableFilterCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + EventStoreSSTFileFilterCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ticdc", Subsystem: "event_store", - Name: "table_filter_count", - Help: "The number of SST table filter decisions by event store.", + Name: "sst_file_filter_count", + Help: "The number of SST file filter decisions by event store.", }, []string{"result"}) - EventStoreTableFilterLogicalBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + EventStoreSSTFileFilterLogicalBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ticdc", Subsystem: "event_store", - Name: "table_filter_logical_bytes", - Help: "The estimated logical bytes in SSTs scanned or skipped by event store table filter.", + Name: "sst_file_filter_logical_bytes", + Help: "The estimated logical bytes in SST files scanned or skipped by event store.", }, []string{"result"}) EventStoreDeleteRangeCount = prometheus.NewCounter( @@ -350,8 +350,8 @@ func initEventStoreMetrics(registry *prometheus.Registry) { registry.MustRegister(EventStoreWriteDurationHistogram) registry.MustRegister(EventStoreScanRequestsCount) registry.MustRegister(EventStoreScanBytes) - registry.MustRegister(EventStoreTableFilterCount) - registry.MustRegister(EventStoreTableFilterLogicalBytes) + registry.MustRegister(EventStoreSSTFileFilterCount) + registry.MustRegister(EventStoreSSTFileFilterLogicalBytes) registry.MustRegister(EventStoreDeleteRangeCount) registry.MustRegister(EventStoreDeleteRangeFetchedCount) registry.MustRegister(EventStoreSubscriptionResolvedTsLagHist) From 5770a5241c3d091e78f86726ada9d3e3986fac49 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 15:54:57 +0800 Subject: [PATCH 07/14] logservice: remove event store sst file filtering --- logservice/eventstore/event_store.go | 11 -- logservice/eventstore/pebble.go | 3 - logservice/eventstore/pebble_test.go | 43 -------- logservice/eventstore/table_properties.go | 123 ---------------------- pkg/metrics/event_store.go | 16 --- 5 files changed, 196 deletions(-) delete mode 100644 logservice/eventstore/table_properties.go diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 434a717161..687cf03dec 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -905,13 +905,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com // CommitTsStart with start ts greater than LastScannedTxnStartTs, then scan // later commit ts up to CommitTsEnd. // - // Table filter bounds: - // lowerTs is the commit-ts lower bound for TableFilter. It skips SSTs whose - // collected CRTs range does not overlap [lowerTs, CommitTsEnd]. Therefore - // lowerTs is CommitTsStart+1 in the first case, and CommitTsStart in the - // second case. var start []byte - lowerTs := dataRange.CommitTsStart + 1 if dataRange.LastScannedTxnStartTs != 0 { start = encodeScanLowerBound( uint64(subStat.subID), @@ -919,7 +913,6 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com dataRange.CommitTsStart, dataRange.LastScannedTxnStartTs+1, ) - lowerTs = dataRange.CommitTsStart } else { start = EncodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) } @@ -928,10 +921,6 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com iter, _ := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, - TableFilter: newEventStoreSSTFileFilter( - lowerTs, - dataRange.CommitTsEnd, - ), }) decoder := e.decoderPool.Get().(*zstd.Decoder) startTime := time.Now() diff --git a/logservice/eventstore/pebble.go b/logservice/eventstore/pebble.go index e63c0e714b..f5f48bf71b 100644 --- a/logservice/eventstore/pebble.go +++ b/logservice/eventstore/pebble.go @@ -57,9 +57,6 @@ func newPebbleOptions(dbNum int) *pebble.Options { // Configure options to optimize read/write performance Levels: make([]pebble.LevelOptions, 7), - TablePropertyCollectors: []func() pebble.TablePropertyCollector{ - newEventStoreTxnCommitTsCollector, - }, } for i := 0; i < len(opts.Levels); i++ { diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index 45726a1768..b2ce6bd33a 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -18,7 +18,6 @@ import ( "encoding/binary" "fmt" "os" - "strconv" "testing" "github.com/cockroachdb/pebble" @@ -171,45 +170,3 @@ func TestEventStoreKeyBounds(t *testing.T) { } require.Less(t, bytes.Compare(EncodeKey(1, 1, previousEvent, CompressionNone), lowerBound), 0) } - -func TestEventStoreTxnCommitTsCollector(t *testing.T) { - t.Parallel() - - collector := newEventStoreTxnCommitTsCollector() - event := &common.RawKVEntry{ - OpType: common.OpTypePut, - StartTs: 1, - CRTs: 10, - Key: []byte("key"), - } - eventKey := EncodeKey(1, 1, event, CompressionNone) - eventValue := []byte("value") - require.NoError(t, collector.Add(pebble.InternalKey{ - UserKey: eventKey, - Trailer: uint64(pebble.InternalKeyKindSet), - }, eventValue)) - deleteStart := EncodeTxnCommitTsBoundaryKey(1, 1, 100) - deleteEnd := EncodeTxnCommitTsBoundaryKey(1, 1, 200) - require.NoError(t, collector.Add(pebble.InternalKey{ - UserKey: deleteStart, - Trailer: uint64(pebble.InternalKeyKindRangeDelete), - }, deleteEnd)) - - props := make(map[string]string) - require.NoError(t, collector.Finish(props)) - require.Equal(t, "10", props[eventStoreMinTxnCommitTsProperty]) - require.Equal(t, "100", props[eventStoreMaxTxnCommitTsProperty]) - require.Equal(t, strconv.Itoa(len(eventKey)+len(eventValue)+len(deleteStart)+len(deleteEnd)), - props[eventStoreLogicalBytesProperty]) - - require.True(t, newEventStoreSSTFileFilter(9, 10)(props)) - require.True(t, newEventStoreSSTFileFilter(100, 100)(props)) - require.False(t, newEventStoreSSTFileFilter(101, 300)(props)) - require.True(t, newEventStoreSSTFileFilter(101, 300)(nil)) - - corruptedProps := map[string]string{ - eventStoreMinTxnCommitTsProperty: "300", - eventStoreMaxTxnCommitTsProperty: "100", - } - require.True(t, newEventStoreSSTFileFilter(101, 200)(corruptedProps)) -} diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go deleted file mode 100644 index 1961b85a51..0000000000 --- a/logservice/eventstore/table_properties.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2026 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package eventstore - -import ( - "strconv" - - "github.com/cockroachdb/pebble" - "github.com/pingcap/ticdc/pkg/metrics" -) - -const ( - eventStoreMinTxnCommitTsProperty = "event-store-min-txn-commit-ts" - eventStoreMaxTxnCommitTsProperty = "event-store-max-txn-commit-ts" - eventStoreLogicalBytesProperty = "event-store-logical-bytes" - eventStoreTxnCommitTsCollectorName = "event-store-txn-commit-ts-collector" -) - -type eventStoreTxnCommitTsCollector struct { - minTs uint64 - maxTs uint64 - logicalBytes uint64 - hasTs bool -} - -func newEventStoreTxnCommitTsCollector() pebble.TablePropertyCollector { - return &eventStoreTxnCommitTsCollector{} -} - -func (c *eventStoreTxnCommitTsCollector) Add(key pebble.InternalKey, value []byte) error { - // Event store DeleteRange is GC-only: it removes data that should already be - // below the future scan range. Do not widen table properties with the range - // tombstone end key. For example, a cleanup tombstone [commit-ts=100, commit-ts=1000) - // would make this cleanup-only SST overlap scans like [500,600]. - c.recordEncodedKey(key.UserKey) - c.logicalBytes += uint64(len(key.UserKey) + len(value)) - return nil -} - -func (c *eventStoreTxnCommitTsCollector) Finish(userProps map[string]string) error { - if !c.hasTs { - return nil - } - userProps[eventStoreMinTxnCommitTsProperty] = strconv.FormatUint(c.minTs, 10) - userProps[eventStoreMaxTxnCommitTsProperty] = strconv.FormatUint(c.maxTs, 10) - userProps[eventStoreLogicalBytesProperty] = strconv.FormatUint(c.logicalBytes, 10) - return nil -} - -func (c *eventStoreTxnCommitTsCollector) Name() string { - return eventStoreTxnCommitTsCollectorName -} - -func (c *eventStoreTxnCommitTsCollector) recordEncodedKey(key []byte) { - txnCommitTs, ok := decodeTxnCommitTsFromEncodedKey(key) - if !ok { - return - } - if !c.hasTs || txnCommitTs < c.minTs { - c.minTs = txnCommitTs - } - if !c.hasTs || txnCommitTs > c.maxTs { - c.maxTs = txnCommitTs - } - c.hasTs = true -} - -func newEventStoreSSTFileFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { - return func(userProps map[string]string) bool { - shouldScan := eventStoreSSTFileMayContainTxnCommitTs(userProps, lowerTs, upperTs) - recordEventStoreSSTFileFilterMetrics(userProps, shouldScan) - return shouldScan - } -} - -func eventStoreSSTFileMayContainTxnCommitTs(userProps map[string]string, lowerTs uint64, upperTs uint64) bool { - minTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMinTxnCommitTsProperty) - if !ok { - return true - } - maxTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMaxTxnCommitTsProperty) - if !ok { - return true - } - if minTs > maxTs { - return true - } - return maxTs >= lowerTs && minTs <= upperTs -} - -func recordEventStoreSSTFileFilterMetrics(userProps map[string]string, shouldScan bool) { - result := "scanned" - if !shouldScan { - result = "skipped" - } - metrics.EventStoreSSTFileFilterCount.WithLabelValues(result).Inc() - if logicalBytes, ok := parseEventStoreUint64TableProperty(userProps, eventStoreLogicalBytesProperty); ok { - metrics.EventStoreSSTFileFilterLogicalBytes.WithLabelValues(result).Add(float64(logicalBytes)) - } -} - -func parseEventStoreUint64TableProperty(userProps map[string]string, key string) (uint64, bool) { - value, ok := userProps[key] - if !ok { - return 0, false - } - ts, err := strconv.ParseUint(value, 10, 64) - if err != nil { - return 0, false - } - return ts, true -} diff --git a/pkg/metrics/event_store.go b/pkg/metrics/event_store.go index 87ae2f4d02..d934173f27 100644 --- a/pkg/metrics/event_store.go +++ b/pkg/metrics/event_store.go @@ -81,20 +81,6 @@ var ( Help: "The number of bytes scanned by event store.", }, []string{"type"}) - EventStoreSSTFileFilterCount = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "ticdc", - Subsystem: "event_store", - Name: "sst_file_filter_count", - Help: "The number of SST file filter decisions by event store.", - }, []string{"result"}) - - EventStoreSSTFileFilterLogicalBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "ticdc", - Subsystem: "event_store", - Name: "sst_file_filter_logical_bytes", - Help: "The estimated logical bytes in SST files scanned or skipped by event store.", - }, []string{"result"}) - EventStoreDeleteRangeCount = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "ticdc", @@ -350,8 +336,6 @@ func initEventStoreMetrics(registry *prometheus.Registry) { registry.MustRegister(EventStoreWriteDurationHistogram) registry.MustRegister(EventStoreScanRequestsCount) registry.MustRegister(EventStoreScanBytes) - registry.MustRegister(EventStoreSSTFileFilterCount) - registry.MustRegister(EventStoreSSTFileFilterLogicalBytes) registry.MustRegister(EventStoreDeleteRangeCount) registry.MustRegister(EventStoreDeleteRangeFetchedCount) registry.MustRegister(EventStoreSubscriptionResolvedTsLagHist) From 85244a483866994c0914a562909c3568fbc0a7a4 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 16:04:23 +0800 Subject: [PATCH 08/14] fix --- logservice/eventstore/event_store.go | 4 +- .../eventstore/event_store_bench_test.go | 4 +- logservice/eventstore/event_store_test.go | 4 +- logservice/eventstore/format.go | 12 +++--- logservice/eventstore/format_test.go | 2 +- logservice/eventstore/gc_test.go | 40 +++++++++---------- logservice/eventstore/pebble_test.go | 2 +- 7 files changed, 34 insertions(+), 34 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 687cf03dec..757df15bb4 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -914,9 +914,9 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com dataRange.LastScannedTxnStartTs+1, ) } else { - start = EncodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) + start = encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) } - end := EncodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) + end := encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) // it's impossible return error here iter, _ := db.NewIter(&pebble.IterOptions{ LowerBound: start, diff --git a/logservice/eventstore/event_store_bench_test.go b/logservice/eventstore/event_store_bench_test.go index a61c2ff221..c32a1ef884 100644 --- a/logservice/eventstore/event_store_bench_test.go +++ b/logservice/eventstore/event_store_bench_test.go @@ -181,8 +181,8 @@ func BenchmarkEventStoreIteratorNext(b *testing.B) { StartKey: []byte{}, EndKey: []byte{0xff}, } - lower := EncodeTxnCommitTsBoundaryKey(1, 1, 1) - upper := EncodeTxnCommitTsBoundaryKey(1, 1, 1<<63) + lower := encodeTxnCommitTsBoundaryKey(1, 1, 1) + upper := encodeTxnCommitTsBoundaryKey(1, 1, 1<<63) b.ReportAllocs() b.ResetTimer() diff --git a/logservice/eventstore/event_store_test.go b/logservice/eventstore/event_store_test.go index 92ef174824..eb92c90715 100644 --- a/logservice/eventstore/event_store_test.go +++ b/logservice/eventstore/event_store_test.go @@ -1248,8 +1248,8 @@ func TestEventStoreIter_NextWithFiltering(t *testing.T) { // Create iterator with a wider range to ensure it sees all keys, // so we can test the internal filtering logic. - start := EncodeTxnCommitTsBoundaryKey(subID, tableID, 0) - end := EncodeTxnCommitTsBoundaryKey(subID, tableID, 500) + start := encodeTxnCommitTsBoundaryKey(subID, tableID, 0) + end := encodeTxnCommitTsBoundaryKey(subID, tableID, 500) innerIter, err := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, diff --git a/logservice/eventstore/format.go b/logservice/eventstore/format.go index 31dc449da8..47cd09eb74 100644 --- a/logservice/eventstore/format.go +++ b/logservice/eventstore/format.go @@ -53,8 +53,8 @@ const ( dmlOrderShift = 8 ) -// EncodeTxnCommitTsBoundaryKey encodes the event-store key boundary up to txnCommitTs. -func EncodeTxnCommitTsBoundaryKey(uniqueID uint64, tableID int64, txnCommitTs uint64) []byte { +// encodeTxnCommitTsBoundaryKey encodes the event-store key boundary up to txnCommitTs. +func encodeTxnCommitTsBoundaryKey(uniqueID uint64, tableID int64, txnCommitTs uint64) []byte { buf := make([]byte, encodedKeyTxnCommitTsEnd) encodeTxnCommitTsBoundaryKeyTo(buf, uniqueID, tableID, txnCommitTs) return buf @@ -140,8 +140,8 @@ func getDMLOrder(rowKV *common.RawKVEntry) DMLOrder { func deleteDataRange( db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, ) error { - start := EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs) - end := EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs) + start := encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs) + end := encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs) return db.DeleteRange(start, end, pebble.NoSync) } @@ -149,8 +149,8 @@ func deleteDataRange( func compactDataRange( db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, ) error { - start := EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs) - end := EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs) + start := encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs) + end := encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs) return db.Compact(start, end, false) } diff --git a/logservice/eventstore/format_test.go b/logservice/eventstore/format_test.go index 3e77b64182..4adf3a8b63 100644 --- a/logservice/eventstore/format_test.go +++ b/logservice/eventstore/format_test.go @@ -48,7 +48,7 @@ func TestEventStoreKeyFormatGolden(t *testing.T) { require.Equal(t, 34, encodedKeyAttributesEnd) require.Equal(t, expectedKey[:encodedKeyTxnCommitTsEnd], - EncodeTxnCommitTsBoundaryKey(uniqueID, tableID, txnCommitTs)) + encodeTxnCommitTsBoundaryKey(uniqueID, tableID, txnCommitTs)) require.Equal(t, expectedKey[:encodedKeyAttributesOffset], encodeScanLowerBound(uniqueID, tableID, txnCommitTs, txnStartTs)) diff --git a/logservice/eventstore/gc_test.go b/logservice/eventstore/gc_test.go index 3c55babeea..f17b9342a8 100644 --- a/logservice/eventstore/gc_test.go +++ b/logservice/eventstore/gc_test.go @@ -62,16 +62,16 @@ func TestGCManager(t *testing.T) { db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, ) error { return mdb.DeleteRange( - EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), - EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), + encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), + encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), nil) } compactFn := func( db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, ) error { return mdb.Compact( - EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), - EncodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), + encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), + encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), false) } gcm := newGCManager([]*pebble.DB{nil}, deleteFn, compactFn) @@ -93,15 +93,15 @@ func TestGCManager(t *testing.T) { deleteCalls := mdb.getDeleteCalls() require.Len(t, deleteCalls, 4) // The order of delete ranges is not guaranteed because it iterates over a map. - if bytes.Equal(deleteCalls[0], EncodeTxnCommitTsBoundaryKey(1, 10, 100)) { - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[1]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[2]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[3]) + if bytes.Equal(deleteCalls[0], encodeTxnCommitTsBoundaryKey(1, 10, 100)) { + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[1]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[2]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[3]) } else { - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[0]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[1]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 100), deleteCalls[2]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[3]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[0]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[1]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 100), deleteCalls[2]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[3]) } // Check internal state for compaction @@ -122,15 +122,15 @@ func TestGCManager(t *testing.T) { compactCalls := mdb.getCompactCalls() require.Len(t, compactCalls, 4) // The order of compaction is not guaranteed because it iterates over a map. - if bytes.Equal(compactCalls[0], EncodeTxnCommitTsBoundaryKey(1, 10, 0)) { - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[1]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[2]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[3]) + if bytes.Equal(compactCalls[0], encodeTxnCommitTsBoundaryKey(1, 10, 0)) { + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[1]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[2]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[3]) } else { - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[0]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[1]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 0), compactCalls[2]) - require.Equal(t, EncodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[3]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[0]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[1]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 0), compactCalls[2]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[3]) } // Verify internal state is now compacted gcm.mu.Lock() diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index b2ce6bd33a..062c34d11e 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -154,7 +154,7 @@ func TestEventStoreKeyBounds(t *testing.T) { Key: []byte("key"), } key := EncodeKey(1, 1, event, CompressionNone) - commitTsBoundaryKey := EncodeTxnCommitTsBoundaryKey(1, 1, event.CRTs) + commitTsBoundaryKey := encodeTxnCommitTsBoundaryKey(1, 1, event.CRTs) require.Len(t, commitTsBoundaryKey, encodedKeyTxnCommitTsEnd) require.True(t, bytes.HasPrefix(key, commitTsBoundaryKey)) From d0123a14b31fd7142d92e8af009a11b57ad3b6c6 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 16:24:47 +0800 Subject: [PATCH 09/14] small fix --- logservice/eventstore/event_store.go | 2 +- logservice/eventstore/gc.go | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 757df15bb4..4605fab8f3 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -432,7 +432,7 @@ func (e *eventStore) Close(_ context.Context) error { } if e.tableCache != nil { if err := e.tableCache.Unref(); err != nil { - log.Error("failed to unref pebble table cache", zap.Error(err)) + log.Warn("failed to unref pebble table cache", zap.Error(err)) } e.tableCache = nil } diff --git a/logservice/eventstore/gc.go b/logservice/eventstore/gc.go index 5fb43adfde..8dd7429908 100644 --- a/logservice/eventstore/gc.go +++ b/logservice/eventstore/gc.go @@ -35,11 +35,9 @@ type ( ) type gcRangeItem struct { - dbIndex int - uniqueKeyID uint64 - tableID int64 - // TODO: startTxnCommitTs may be unnecessary now because delete ranges can - // start from 0, but it may be useful after table range splitting. + dbIndex int + uniqueKeyID uint64 + tableID int64 startTxnCommitTs uint64 endTxnCommitTs uint64 } From 84aba94b62f85f4c8321b8db6e03109903b58901 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 19:10:37 +0800 Subject: [PATCH 10/14] Revert "logservice: remove event store sst file filtering" This reverts commit 5770a5241c3d091e78f86726ada9d3e3986fac49. --- logservice/eventstore/event_store.go | 11 ++ logservice/eventstore/pebble.go | 3 + logservice/eventstore/pebble_test.go | 43 ++++++++ logservice/eventstore/table_properties.go | 123 ++++++++++++++++++++++ pkg/metrics/event_store.go | 16 +++ 5 files changed, 196 insertions(+) create mode 100644 logservice/eventstore/table_properties.go diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 4605fab8f3..3c42d6ccdd 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -905,7 +905,13 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com // CommitTsStart with start ts greater than LastScannedTxnStartTs, then scan // later commit ts up to CommitTsEnd. // + // Table filter bounds: + // lowerTs is the commit-ts lower bound for TableFilter. It skips SSTs whose + // collected CRTs range does not overlap [lowerTs, CommitTsEnd]. Therefore + // lowerTs is CommitTsStart+1 in the first case, and CommitTsStart in the + // second case. var start []byte + lowerTs := dataRange.CommitTsStart + 1 if dataRange.LastScannedTxnStartTs != 0 { start = encodeScanLowerBound( uint64(subStat.subID), @@ -913,6 +919,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com dataRange.CommitTsStart, dataRange.LastScannedTxnStartTs+1, ) + lowerTs = dataRange.CommitTsStart } else { start = encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) } @@ -921,6 +928,10 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com iter, _ := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, + TableFilter: newEventStoreSSTFileFilter( + lowerTs, + dataRange.CommitTsEnd, + ), }) decoder := e.decoderPool.Get().(*zstd.Decoder) startTime := time.Now() diff --git a/logservice/eventstore/pebble.go b/logservice/eventstore/pebble.go index f5f48bf71b..e63c0e714b 100644 --- a/logservice/eventstore/pebble.go +++ b/logservice/eventstore/pebble.go @@ -57,6 +57,9 @@ func newPebbleOptions(dbNum int) *pebble.Options { // Configure options to optimize read/write performance Levels: make([]pebble.LevelOptions, 7), + TablePropertyCollectors: []func() pebble.TablePropertyCollector{ + newEventStoreTxnCommitTsCollector, + }, } for i := 0; i < len(opts.Levels); i++ { diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index 062c34d11e..ebee708ffc 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -18,6 +18,7 @@ import ( "encoding/binary" "fmt" "os" + "strconv" "testing" "github.com/cockroachdb/pebble" @@ -170,3 +171,45 @@ func TestEventStoreKeyBounds(t *testing.T) { } require.Less(t, bytes.Compare(EncodeKey(1, 1, previousEvent, CompressionNone), lowerBound), 0) } + +func TestEventStoreTxnCommitTsCollector(t *testing.T) { + t.Parallel() + + collector := newEventStoreTxnCommitTsCollector() + event := &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: 1, + CRTs: 10, + Key: []byte("key"), + } + eventKey := EncodeKey(1, 1, event, CompressionNone) + eventValue := []byte("value") + require.NoError(t, collector.Add(pebble.InternalKey{ + UserKey: eventKey, + Trailer: uint64(pebble.InternalKeyKindSet), + }, eventValue)) + deleteStart := encodeTxnCommitTsBoundaryKey(1, 1, 100) + deleteEnd := encodeTxnCommitTsBoundaryKey(1, 1, 200) + require.NoError(t, collector.Add(pebble.InternalKey{ + UserKey: deleteStart, + Trailer: uint64(pebble.InternalKeyKindRangeDelete), + }, deleteEnd)) + + props := make(map[string]string) + require.NoError(t, collector.Finish(props)) + require.Equal(t, "10", props[eventStoreMinTxnCommitTsProperty]) + require.Equal(t, "100", props[eventStoreMaxTxnCommitTsProperty]) + require.Equal(t, strconv.Itoa(len(eventKey)+len(eventValue)+len(deleteStart)+len(deleteEnd)), + props[eventStoreLogicalBytesProperty]) + + require.True(t, newEventStoreSSTFileFilter(9, 10)(props)) + require.True(t, newEventStoreSSTFileFilter(100, 100)(props)) + require.False(t, newEventStoreSSTFileFilter(101, 300)(props)) + require.True(t, newEventStoreSSTFileFilter(101, 300)(nil)) + + corruptedProps := map[string]string{ + eventStoreMinTxnCommitTsProperty: "300", + eventStoreMaxTxnCommitTsProperty: "100", + } + require.True(t, newEventStoreSSTFileFilter(101, 200)(corruptedProps)) +} diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go new file mode 100644 index 0000000000..1961b85a51 --- /dev/null +++ b/logservice/eventstore/table_properties.go @@ -0,0 +1,123 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventstore + +import ( + "strconv" + + "github.com/cockroachdb/pebble" + "github.com/pingcap/ticdc/pkg/metrics" +) + +const ( + eventStoreMinTxnCommitTsProperty = "event-store-min-txn-commit-ts" + eventStoreMaxTxnCommitTsProperty = "event-store-max-txn-commit-ts" + eventStoreLogicalBytesProperty = "event-store-logical-bytes" + eventStoreTxnCommitTsCollectorName = "event-store-txn-commit-ts-collector" +) + +type eventStoreTxnCommitTsCollector struct { + minTs uint64 + maxTs uint64 + logicalBytes uint64 + hasTs bool +} + +func newEventStoreTxnCommitTsCollector() pebble.TablePropertyCollector { + return &eventStoreTxnCommitTsCollector{} +} + +func (c *eventStoreTxnCommitTsCollector) Add(key pebble.InternalKey, value []byte) error { + // Event store DeleteRange is GC-only: it removes data that should already be + // below the future scan range. Do not widen table properties with the range + // tombstone end key. For example, a cleanup tombstone [commit-ts=100, commit-ts=1000) + // would make this cleanup-only SST overlap scans like [500,600]. + c.recordEncodedKey(key.UserKey) + c.logicalBytes += uint64(len(key.UserKey) + len(value)) + return nil +} + +func (c *eventStoreTxnCommitTsCollector) Finish(userProps map[string]string) error { + if !c.hasTs { + return nil + } + userProps[eventStoreMinTxnCommitTsProperty] = strconv.FormatUint(c.minTs, 10) + userProps[eventStoreMaxTxnCommitTsProperty] = strconv.FormatUint(c.maxTs, 10) + userProps[eventStoreLogicalBytesProperty] = strconv.FormatUint(c.logicalBytes, 10) + return nil +} + +func (c *eventStoreTxnCommitTsCollector) Name() string { + return eventStoreTxnCommitTsCollectorName +} + +func (c *eventStoreTxnCommitTsCollector) recordEncodedKey(key []byte) { + txnCommitTs, ok := decodeTxnCommitTsFromEncodedKey(key) + if !ok { + return + } + if !c.hasTs || txnCommitTs < c.minTs { + c.minTs = txnCommitTs + } + if !c.hasTs || txnCommitTs > c.maxTs { + c.maxTs = txnCommitTs + } + c.hasTs = true +} + +func newEventStoreSSTFileFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { + return func(userProps map[string]string) bool { + shouldScan := eventStoreSSTFileMayContainTxnCommitTs(userProps, lowerTs, upperTs) + recordEventStoreSSTFileFilterMetrics(userProps, shouldScan) + return shouldScan + } +} + +func eventStoreSSTFileMayContainTxnCommitTs(userProps map[string]string, lowerTs uint64, upperTs uint64) bool { + minTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMinTxnCommitTsProperty) + if !ok { + return true + } + maxTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMaxTxnCommitTsProperty) + if !ok { + return true + } + if minTs > maxTs { + return true + } + return maxTs >= lowerTs && minTs <= upperTs +} + +func recordEventStoreSSTFileFilterMetrics(userProps map[string]string, shouldScan bool) { + result := "scanned" + if !shouldScan { + result = "skipped" + } + metrics.EventStoreSSTFileFilterCount.WithLabelValues(result).Inc() + if logicalBytes, ok := parseEventStoreUint64TableProperty(userProps, eventStoreLogicalBytesProperty); ok { + metrics.EventStoreSSTFileFilterLogicalBytes.WithLabelValues(result).Add(float64(logicalBytes)) + } +} + +func parseEventStoreUint64TableProperty(userProps map[string]string, key string) (uint64, bool) { + value, ok := userProps[key] + if !ok { + return 0, false + } + ts, err := strconv.ParseUint(value, 10, 64) + if err != nil { + return 0, false + } + return ts, true +} diff --git a/pkg/metrics/event_store.go b/pkg/metrics/event_store.go index d934173f27..87ae2f4d02 100644 --- a/pkg/metrics/event_store.go +++ b/pkg/metrics/event_store.go @@ -81,6 +81,20 @@ var ( Help: "The number of bytes scanned by event store.", }, []string{"type"}) + EventStoreSSTFileFilterCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_store", + Name: "sst_file_filter_count", + Help: "The number of SST file filter decisions by event store.", + }, []string{"result"}) + + EventStoreSSTFileFilterLogicalBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_store", + Name: "sst_file_filter_logical_bytes", + Help: "The estimated logical bytes in SST files scanned or skipped by event store.", + }, []string{"result"}) + EventStoreDeleteRangeCount = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "ticdc", @@ -336,6 +350,8 @@ func initEventStoreMetrics(registry *prometheus.Registry) { registry.MustRegister(EventStoreWriteDurationHistogram) registry.MustRegister(EventStoreScanRequestsCount) registry.MustRegister(EventStoreScanBytes) + registry.MustRegister(EventStoreSSTFileFilterCount) + registry.MustRegister(EventStoreSSTFileFilterLogicalBytes) registry.MustRegister(EventStoreDeleteRangeCount) registry.MustRegister(EventStoreDeleteRangeFetchedCount) registry.MustRegister(EventStoreSubscriptionResolvedTsLagHist) From 56d2a7ad6fbe68149374885d68ff265811e32288 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 20:37:57 +0800 Subject: [PATCH 11/14] add some comment --- logservice/eventstore/event_store.go | 6 +++--- logservice/eventstore/table_properties.go | 5 +++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 3c42d6ccdd..0246756b7b 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -907,9 +907,9 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com // // Table filter bounds: // lowerTs is the commit-ts lower bound for TableFilter. It skips SSTs whose - // collected CRTs range does not overlap [lowerTs, CommitTsEnd]. Therefore - // lowerTs is CommitTsStart+1 in the first case, and CommitTsStart in the - // second case. + // collected txn commit ts range does not overlap [lowerTs, CommitTsEnd]. + // Therefore lowerTs is CommitTsStart+1 in the first case, and CommitTsStart + // in the second case. var start []byte lowerTs := dataRange.CommitTsStart + 1 if dataRange.LastScannedTxnStartTs != 0 { diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go index 1961b85a51..397148c510 100644 --- a/logservice/eventstore/table_properties.go +++ b/logservice/eventstore/table_properties.go @@ -20,6 +20,11 @@ import ( "github.com/pingcap/ticdc/pkg/metrics" ) +// Pebble table properties are per-SST metadata generated when Pebble writes an +// SST file. Event store stores the txn commit ts range of each SST here, and +// passes a TableFilter during scans so Pebble can skip SST files that cannot +// contain events in the requested commit-ts range. + const ( eventStoreMinTxnCommitTsProperty = "event-store-min-txn-commit-ts" eventStoreMaxTxnCommitTsProperty = "event-store-max-txn-commit-ts" From e200ea2468282dc9f7d3902cb09d949c835c83a2 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 20:53:11 +0800 Subject: [PATCH 12/14] small fix --- logservice/eventstore/table_properties.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go index 397148c510..df273e3e88 100644 --- a/logservice/eventstore/table_properties.go +++ b/logservice/eventstore/table_properties.go @@ -72,13 +72,18 @@ func (c *eventStoreTxnCommitTsCollector) recordEncodedKey(key []byte) { if !ok { return } - if !c.hasTs || txnCommitTs < c.minTs { + if !c.hasTs { + c.minTs = txnCommitTs + c.maxTs = txnCommitTs + c.hasTs = true + return + } + if txnCommitTs < c.minTs { c.minTs = txnCommitTs } - if !c.hasTs || txnCommitTs > c.maxTs { + if txnCommitTs > c.maxTs { c.maxTs = txnCommitTs } - c.hasTs = true } func newEventStoreSSTFileFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { @@ -99,8 +104,12 @@ func eventStoreSSTFileMayContainTxnCommitTs(userProps map[string]string, lowerTs return true } if minTs > maxTs { + // Corrupted or incompatible properties should not make Pebble skip data. return true } + // Two inclusive ranges [minTs, maxTs] and [lowerTs, upperTs] overlap iff + // each range starts before or at the other range's end. Equal boundaries are + // included because commit-ts scan ranges are inclusive here. return maxTs >= lowerTs && minTs <= upperTs } From ca044fe4276b771fab3073399654523a4a0836b5 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 21:22:05 +0800 Subject: [PATCH 13/14] add some comment --- logservice/eventstore/pebble_test.go | 5 +++-- logservice/eventstore/table_properties.go | 15 +++++++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index ebee708ffc..501d48a1aa 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -198,13 +198,14 @@ func TestEventStoreTxnCommitTsCollector(t *testing.T) { props := make(map[string]string) require.NoError(t, collector.Finish(props)) require.Equal(t, "10", props[eventStoreMinTxnCommitTsProperty]) - require.Equal(t, "100", props[eventStoreMaxTxnCommitTsProperty]) + require.Equal(t, "200", props[eventStoreMaxTxnCommitTsProperty]) require.Equal(t, strconv.Itoa(len(eventKey)+len(eventValue)+len(deleteStart)+len(deleteEnd)), props[eventStoreLogicalBytesProperty]) require.True(t, newEventStoreSSTFileFilter(9, 10)(props)) require.True(t, newEventStoreSSTFileFilter(100, 100)(props)) - require.False(t, newEventStoreSSTFileFilter(101, 300)(props)) + require.True(t, newEventStoreSSTFileFilter(101, 300)(props)) + require.False(t, newEventStoreSSTFileFilter(201, 300)(props)) require.True(t, newEventStoreSSTFileFilter(101, 300)(nil)) corruptedProps := map[string]string{ diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go index df273e3e88..520f370b0d 100644 --- a/logservice/eventstore/table_properties.go +++ b/logservice/eventstore/table_properties.go @@ -44,11 +44,18 @@ func newEventStoreTxnCommitTsCollector() pebble.TablePropertyCollector { } func (c *eventStoreTxnCommitTsCollector) Add(key pebble.InternalKey, value []byte) error { - // Event store DeleteRange is GC-only: it removes data that should already be - // below the future scan range. Do not widen table properties with the range - // tombstone end key. For example, a cleanup tombstone [commit-ts=100, commit-ts=1000) - // would make this cleanup-only SST overlap scans like [500,600]. + // Range deletion handling: + // 1. Pebble passes the tombstone start key in key.UserKey and the exclusive + // end key in value. + // 2. Event store uses DeleteRange only for GC, so normal event scans should + // not encounter these tombstones. + // 3. Recording both boundaries keeps the SST property consistent with + // Pebble's [start, end) range deletion format. This affects only SST + // metadata and does not affect scan correctness or filtering precision. c.recordEncodedKey(key.UserKey) + if key.Kind() == pebble.InternalKeyKindRangeDelete { + c.recordEncodedKey(value) + } c.logicalBytes += uint64(len(key.UserKey) + len(value)) return nil } From dd16bb53ba829efbef7d444428bd67484fca61b9 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sat, 2 May 2026 21:30:51 +0800 Subject: [PATCH 14/14] f --- logservice/eventstore/table_properties.go | 33 ++++++++++++++++++----- pkg/metrics/event_store.go | 4 +-- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go index 520f370b0d..c354be145e 100644 --- a/logservice/eventstore/table_properties.go +++ b/logservice/eventstore/table_properties.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/pebble" "github.com/pingcap/ticdc/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" ) // Pebble table properties are per-SST metadata generated when Pebble writes an @@ -94,9 +95,20 @@ func (c *eventStoreTxnCommitTsCollector) recordEncodedKey(key []byte) { } func newEventStoreSSTFileFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { + scannedCount := metrics.EventStoreSSTFileFilterCount.WithLabelValues("scanned") + skippedCount := metrics.EventStoreSSTFileFilterCount.WithLabelValues("skipped") + scannedLogicalBytes := metrics.EventStoreSSTFileFilterLogicalBytes.WithLabelValues("scanned") + skippedLogicalBytes := metrics.EventStoreSSTFileFilterLogicalBytes.WithLabelValues("skipped") return func(userProps map[string]string) bool { shouldScan := eventStoreSSTFileMayContainTxnCommitTs(userProps, lowerTs, upperTs) - recordEventStoreSSTFileFilterMetrics(userProps, shouldScan) + recordEventStoreSSTFileFilterMetrics( + userProps, + shouldScan, + scannedCount, + skippedCount, + scannedLogicalBytes, + skippedLogicalBytes, + ) return shouldScan } } @@ -120,14 +132,23 @@ func eventStoreSSTFileMayContainTxnCommitTs(userProps map[string]string, lowerTs return maxTs >= lowerTs && minTs <= upperTs } -func recordEventStoreSSTFileFilterMetrics(userProps map[string]string, shouldScan bool) { - result := "scanned" +func recordEventStoreSSTFileFilterMetrics( + userProps map[string]string, + shouldScan bool, + scannedCount prometheus.Counter, + skippedCount prometheus.Counter, + scannedLogicalBytes prometheus.Counter, + skippedLogicalBytes prometheus.Counter, +) { + count := scannedCount + logicalBytesCounter := scannedLogicalBytes if !shouldScan { - result = "skipped" + count = skippedCount + logicalBytesCounter = skippedLogicalBytes } - metrics.EventStoreSSTFileFilterCount.WithLabelValues(result).Inc() + count.Inc() if logicalBytes, ok := parseEventStoreUint64TableProperty(userProps, eventStoreLogicalBytesProperty); ok { - metrics.EventStoreSSTFileFilterLogicalBytes.WithLabelValues(result).Add(float64(logicalBytes)) + logicalBytesCounter.Add(float64(logicalBytes)) } } diff --git a/pkg/metrics/event_store.go b/pkg/metrics/event_store.go index 87ae2f4d02..ff272268d0 100644 --- a/pkg/metrics/event_store.go +++ b/pkg/metrics/event_store.go @@ -86,14 +86,14 @@ var ( Subsystem: "event_store", Name: "sst_file_filter_count", Help: "The number of SST file filter decisions by event store.", - }, []string{"result"}) + }, []string{"decision"}) EventStoreSSTFileFilterLogicalBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "ticdc", Subsystem: "event_store", Name: "sst_file_filter_logical_bytes", Help: "The estimated logical bytes in SST files scanned or skipped by event store.", - }, []string{"result"}) + }, []string{"decision"}) EventStoreDeleteRangeCount = prometheus.NewCounter( prometheus.CounterOpts{