From 1a865d8209ae68763b4dcea1a92389eda99bb75c Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 29 Apr 2026 03:16:14 +0000 Subject: [PATCH 1/2] init Signed-off-by: wk989898 --- logservice/eventstore/event_store.go | 12 ++-- logservice/eventstore/format.go | 17 +++++- logservice/eventstore/pebble.go | 84 ++++++++++++++++++++++++++++ logservice/eventstore/pebble_test.go | 82 +++++++++++++++++++++++++++ 4 files changed, 188 insertions(+), 7 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 82954a1cb9..0a65e26d74 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -876,18 +876,18 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com // convert range before pass it to pebble: (startTs, endTs] is equal to [startTs + 1, endTs + 1) var start []byte + lowerCRTs := dataRange.CommitTsStart if dataRange.LastScannedTxnStartTs != 0 { start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart, dataRange.LastScannedTxnStartTs+1) } else { start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) + lowerCRTs = dataRange.CommitTsStart + 1 } end := EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) - // TODO: optimize read performance - // it's impossible return error here - iter, _ := db.NewIter(&pebble.IterOptions{ - LowerBound: start, - UpperBound: end, - }) + iter, err := db.NewIter(newEventStoreIterOptions(start, end, lowerCRTs, dataRange.CommitTsEnd)) + if err != nil { + log.Panic("new event store iterator failed", zap.Error(err)) + } decoder := e.decoderPool.Get().(*zstd.Decoder) startTime := time.Now() // todo: what happens if iter.First() returns false? diff --git a/logservice/eventstore/format.go b/logservice/eventstore/format.go index bb8cefe2b5..51939a7e97 100644 --- a/logservice/eventstore/format.go +++ b/logservice/eventstore/format.go @@ -43,6 +43,14 @@ const ( dmlOrderMask = 0xFF00 // DML order is stored in the high 8 bits for sorting. compressionMask = 0x00FF // Compression type is stored in the low 8 bits. dmlOrderShift = 8 + + uniqueIDSize = 8 + tableIDSize = 8 + commitTsSize = 8 + startTsSize = 8 + eventStoreCommitTsOffset = uniqueIDSize + tableIDSize + eventStoreStartTsOffset = eventStoreCommitTsOffset + commitTsSize + eventStoreKeyMetaOffset = eventStoreStartTsOffset + startTsSize ) // EncodeKeyPrefix encodes uniqueID, tableID, CRTs and StartTs. @@ -116,10 +124,17 @@ func EncodeKey(uniqueID uint64, tableID int64, event *common.RawKVEntry, compres // DecodeKeyMetas decodes compression type and dml order from the key. func DecodeKeyMetas(key []byte) (DMLOrder, CompressionType) { - combinedOrder := binary.BigEndian.Uint16(key[32:34]) // The combined order is at offset 32 for 2 bytes. + combinedOrder := binary.BigEndian.Uint16(key[eventStoreKeyMetaOffset : eventStoreKeyMetaOffset+2]) return DMLOrder((combinedOrder & dmlOrderMask) >> dmlOrderShift), CompressionType(combinedOrder & compressionMask) } +func decodeCRTsFromKey(key []byte) (uint64, bool) { + if len(key) < eventStoreStartTsOffset { + return 0, false + } + return binary.BigEndian.Uint64(key[eventStoreCommitTsOffset:eventStoreStartTsOffset]), true +} + // getDMLOrder returns the order of the dml types: delete t.maxTs { + t.maxTs = crts + } + if crts < t.minTs { + t.minTs = crts + } + t.hasKey = true + return nil +} + +func (t *tableCRTsCollector) Finish(userProps map[string]string) error { + if !t.hasKey { + return nil + } + userProps[minTableCRTsLabel] = strconv.FormatUint(t.minTs, 10) + userProps[maxTableCRTsLabel] = strconv.FormatUint(t.maxTs, 10) + return nil +} + +func (t *tableCRTsCollector) Name() string { + return tableCRTsCollectorName +} + +func newEventStoreIterOptions( + lowerBound []byte, + upperBound []byte, + lowerCRTs uint64, + upperCRTs uint64, +) *pebble.IterOptions { + return &pebble.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + TableFilter: func(userProps map[string]string) bool { + tableMinCRTs, tableMaxCRTs, ok := parseTableCRTs(userProps) + if !ok { + return true + } + return tableMaxCRTs >= lowerCRTs && tableMinCRTs <= upperCRTs + }, + UseL6Filters: true, + } +} + +func parseTableCRTs(userProps map[string]string) (uint64, uint64, bool) { + minCRTs, ok := userProps[minTableCRTsLabel] + if !ok { + return 0, 0, false + } + maxCRTs, ok := userProps[maxTableCRTsLabel] + if !ok { + return 0, 0, false + } + tableMinCRTs, err := strconv.ParseUint(minCRTs, 10, 64) + if err != nil { + return 0, 0, false + } + tableMaxCRTs, err := strconv.ParseUint(maxCRTs, 10, 64) + if err != nil { + return 0, 0, false + } + return tableMinCRTs, tableMaxCRTs, true +} + func newPebbleOptions(dbNum int) *pebble.Options { opts := &pebble.Options{ // Disable WAL to decrease io @@ -56,6 +134,12 @@ func newPebbleOptions(dbNum int) *pebble.Options { // Configure options to optimize read/write performance Levels: make([]pebble.LevelOptions, 7), + + TablePropertyCollectors: []func() pebble.TablePropertyCollector{ + func() pebble.TablePropertyCollector { + return &tableCRTsCollector{minTs: math.MaxUint64} + }, + }, } for i := 0; i < len(opts.Levels); i++ { diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index c95bf3928d..0f73268847 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -17,6 +17,7 @@ import ( "bytes" "encoding/binary" "fmt" + "math" "os" "testing" @@ -25,6 +26,87 @@ import ( "github.com/stretchr/testify/require" ) +func TestEventStoreTableCRTsCollector(t *testing.T) { + t.Parallel() + + collector := &tableCRTsCollector{minTs: math.MaxUint64} + require.NoError(t, collector.Add(pebble.InternalKey{ + UserKey: EncodeKey(1, 1, &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: 10, + CRTs: 20, + Key: []byte("a"), + }, CompressionNone), + }, nil)) + require.NoError(t, collector.Add(pebble.InternalKey{ + UserKey: EncodeKey(1, 1, &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: 30, + CRTs: 40, + Key: []byte("b"), + }, CompressionNone), + }, nil)) + + userProps := make(map[string]string) + require.NoError(t, collector.Finish(userProps)) + require.Equal(t, "20", userProps[minTableCRTsLabel]) + require.Equal(t, "40", userProps[maxTableCRTsLabel]) +} + +func TestEventStoreIteratorWithTableFilter(t *testing.T) { + t.Parallel() + + opts := newPebbleOptions(1) + opts.DisableAutomaticCompactions = true + db, err := pebble.Open(t.TempDir(), opts) + require.NoError(t, err) + defer db.Close() + + writeKeys := func(maxTableID int64, crts uint64) { + for tableID := int64(1); tableID <= maxTableID; tableID++ { + key := EncodeKey(1, tableID, &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: 0, + CRTs: crts, + Key: []byte{byte(tableID)}, + }, CompressionNone) + require.NoError(t, db.Set(key, []byte{'x'}, pebble.NoSync)) + } + require.NoError(t, db.Flush()) + } + + writeKeys(7, 1) + writeKeys(9, 3) + + for _, tc := range []struct { + lowerCRTs uint64 + upperCRTs uint64 + expected int + }{ + {lowerCRTs: 0, upperCRTs: 1, expected: 7}, + {lowerCRTs: 1, upperCRTs: 2, expected: 7}, + {lowerCRTs: 2, upperCRTs: 3, expected: 9}, + {lowerCRTs: 3, upperCRTs: 4, expected: 9}, + {lowerCRTs: 0, upperCRTs: 10, expected: 16}, + {lowerCRTs: 10, upperCRTs: 20, expected: 0}, + } { + t.Run(fmt.Sprintf("%d-%d", tc.lowerCRTs, tc.upperCRTs), func(t *testing.T) { + count := 0 + for tableID := int64(0); tableID <= 9; tableID++ { + start := EncodeKeyPrefix(1, tableID, tc.lowerCRTs) + end := EncodeKeyPrefix(1, tableID, tc.upperCRTs+1) + iter, err := db.NewIter(newEventStoreIterOptions(start, end, tc.lowerCRTs, tc.upperCRTs)) + require.NoError(t, err) + for iter.First(); iter.Valid(); iter.Next() { + count++ + } + require.NoError(t, iter.Close()) + } + require.Equal(t, tc.expected, count) + }) + } +} + func TestWriteAndReadRawKVEntry(t *testing.T) { dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) os.RemoveAll(dbPath) From 7a18eca2fda5f31459416104a2e4faac75a05f50 Mon Sep 17 00:00:00 2001 From: wk989898 Date: Wed, 29 Apr 2026 03:17:42 +0000 Subject: [PATCH 2/2] update Signed-off-by: wk989898 --- logservice/eventstore/event_store.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 0a65e26d74..e0a61d2450 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -884,10 +884,8 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com lowerCRTs = dataRange.CommitTsStart + 1 } end := EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) - iter, err := db.NewIter(newEventStoreIterOptions(start, end, lowerCRTs, dataRange.CommitTsEnd)) - if err != nil { - log.Panic("new event store iterator failed", zap.Error(err)) - } + // it's impossible return error here + iter, _ := db.NewIter(newEventStoreIterOptions(start, end, lowerCRTs, dataRange.CommitTsEnd)) decoder := e.decoderPool.Get().(*zstd.Decoder) startTime := time.Now() // todo: what happens if iter.First() returns false?