diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 82954a1cb9..0246756b7b 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -204,6 +204,8 @@ type eventStore struct { subClient logpuller.SubscriptionClient dbs []*pebble.DB + pebbleCache *pebble.Cache + tableCache *pebble.TableCache chs []*chann.UnlimitedChannel[eventWithCallback, uint64] writeTaskPools []*writeTaskPool @@ -258,11 +260,14 @@ func New( log.Panic("fail to remove path", zap.String("path", dbPath), zap.Error(err)) } + dbs, pebbleCache, tableCache := createPebbleDBs(dbPath, dbCount) store := &eventStore{ pdClock: appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock), subClient: subClient, - dbs: createPebbleDBs(dbPath, dbCount), + dbs: dbs, + pebbleCache: pebbleCache, + tableCache: tableCache, chs: make([]*chann.UnlimitedChannel[eventWithCallback, uint64], 0, dbCount), writeTaskPools: make([]*writeTaskPool, 0, dbCount), @@ -425,6 +430,16 @@ func (e *eventStore) Close(_ context.Context) error { log.Error("failed to close pebble db", zap.Error(err)) } } + if e.tableCache != nil { + if err := e.tableCache.Unref(); err != nil { + log.Warn("failed to unref pebble table cache", zap.Error(err)) + } + e.tableCache = nil + } + if e.pebbleCache != nil { + e.pebbleCache.Unref() + e.pebbleCache = nil + } return nil } @@ -874,19 +889,49 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com e.dispatcherMeta.Unlock() } - // convert range before pass it to pebble: (startTs, endTs] is equal to [startTs + 1, endTs + 1) + // dataRange fields: + // CommitTsStart and CommitTsEnd define the commit-ts scan window. + // LastScannedTxnStartTs records how far the previous scan progressed inside + // CommitTsStart. It is zero if there is no unfinished scan at CommitTsStart. + // + // Iterator key bounds: + // Pebble uses [LowerBound, UpperBound), so end is always encoded as + // CommitTsEnd+1. + // + // If LastScannedTxnStartTs is zero, scan commit ts in + // (CommitTsStart, CommitTsEnd], and use CommitTsStart+1 as LowerBound. + // + // If LastScannedTxnStartTs is non-zero, continue scanning commit ts + // CommitTsStart with start ts greater than LastScannedTxnStartTs, then scan + // later commit ts up to CommitTsEnd. + // + // Table filter bounds: + // lowerTs is the commit-ts lower bound for TableFilter. It skips SSTs whose + // collected txn commit ts range does not overlap [lowerTs, CommitTsEnd]. + // Therefore lowerTs is CommitTsStart+1 in the first case, and CommitTsStart + // in the second case. var start []byte + lowerTs := dataRange.CommitTsStart + 1 if dataRange.LastScannedTxnStartTs != 0 { - start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart, dataRange.LastScannedTxnStartTs+1) + start = encodeScanLowerBound( + uint64(subStat.subID), + stat.tableSpan.TableID, + dataRange.CommitTsStart, + dataRange.LastScannedTxnStartTs+1, + ) + lowerTs = dataRange.CommitTsStart } else { - start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) + start = encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) } - end := EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) - // TODO: optimize read performance + end := encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) // it's impossible return error here iter, _ := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, + TableFilter: newEventStoreSSTFileFilter( + lowerTs, + dataRange.CommitTsEnd, + ), }) decoder := e.decoderPool.Get().(*zstd.Decoder) startTime := time.Now() @@ -1429,7 +1474,7 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) { key := iter.innerIter.Key() value := iter.innerIter.Value() - _, compressionType := DecodeKeyMetas(key) + _, compressionType := DecodeKeyAttributes(key) var decodedValue []byte if compressionType == CompressionZSTD { var err error diff --git a/logservice/eventstore/event_store_bench_test.go b/logservice/eventstore/event_store_bench_test.go index e8926e92e5..c32a1ef884 100644 --- a/logservice/eventstore/event_store_bench_test.go +++ b/logservice/eventstore/event_store_bench_test.go @@ -181,8 +181,8 @@ func BenchmarkEventStoreIteratorNext(b *testing.B) { StartKey: []byte{}, EndKey: []byte{0xff}, } - lower := EncodeKeyPrefix(1, 1, 1) - upper := EncodeKeyPrefix(1, 1, 1<<63) + lower := encodeTxnCommitTsBoundaryKey(1, 1, 1) + upper := encodeTxnCommitTsBoundaryKey(1, 1, 1<<63) b.ReportAllocs() b.ResetTimer() diff --git a/logservice/eventstore/event_store_test.go b/logservice/eventstore/event_store_test.go index f2cddd8ff8..eb92c90715 100644 --- a/logservice/eventstore/event_store_test.go +++ b/logservice/eventstore/event_store_test.go @@ -891,7 +891,7 @@ func TestWriteToEventStore(t *testing.T) { key := iter.Key() value := iter.Value() - _, compressionType := DecodeKeyMetas(key) + _, compressionType := DecodeKeyAttributes(key) var decodedValue []byte if compressionType == CompressionZSTD { @@ -965,7 +965,7 @@ func TestWriteToEventStoreZstdCompressionDisabled(t *testing.T) { count := 0 for iter.First(); iter.Valid(); iter.Next() { - _, compressionType := DecodeKeyMetas(iter.Key()) + _, compressionType := DecodeKeyAttributes(iter.Key()) require.Equal(t, CompressionNone, compressionType) readEntry := &common.RawKVEntry{} @@ -1248,8 +1248,8 @@ func TestEventStoreIter_NextWithFiltering(t *testing.T) { // Create iterator with a wider range to ensure it sees all keys, // so we can test the internal filtering logic. - start := EncodeKeyPrefix(subID, tableID, 0) - end := EncodeKeyPrefix(subID, tableID, 500) + start := encodeTxnCommitTsBoundaryKey(subID, tableID, 0) + end := encodeTxnCommitTsBoundaryKey(subID, tableID, 500) innerIter, err := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, diff --git a/logservice/eventstore/format.go b/logservice/eventstore/format.go index bb8cefe2b5..47cd09eb74 100644 --- a/logservice/eventstore/format.go +++ b/logservice/eventstore/format.go @@ -38,6 +38,14 @@ const ( CompressionZSTD ) +const ( + encodedKeyUint64Len = 8 + encodedKeyTxnCommitTsStart = 2 * encodedKeyUint64Len + encodedKeyTxnCommitTsEnd = encodedKeyTxnCommitTsStart + encodedKeyUint64Len + encodedKeyAttributesOffset = 4 * encodedKeyUint64Len + encodedKeyAttributesEnd = encodedKeyAttributesOffset + 2 +) + const ( // Bitmask for DML order and compression type. dmlOrderMask = 0xFF00 // DML order is stored in the high 8 bits for sorting. @@ -45,44 +53,33 @@ const ( dmlOrderShift = 8 ) -// EncodeKeyPrefix encodes uniqueID, tableID, CRTs and StartTs. -// StartTs is optional. -// The result should be a prefix of normal key. (TODO: add a unit test) -func EncodeKeyPrefix(uniqueID uint64, tableID int64, CRTs uint64, startTs ...uint64) []byte { - if len(startTs) > 1 { - log.Panic("startTs should be at most one") - } - // uniqueID, tableID, CRTs. - keySize := 8 + 8 + 8 - if len(startTs) > 0 { - keySize += 8 - } - buf := make([]byte, 0, keySize) - uint64Buf := [8]byte{} - // uniqueID - binary.BigEndian.PutUint64(uint64Buf[:], uniqueID) - buf = append(buf, uint64Buf[:]...) - // tableID - binary.BigEndian.PutUint64(uint64Buf[:], uint64(tableID)) - buf = append(buf, uint64Buf[:]...) - // CRTs - binary.BigEndian.PutUint64(uint64Buf[:], CRTs) - buf = append(buf, uint64Buf[:]...) - if len(startTs) > 0 { - // startTs - binary.BigEndian.PutUint64(uint64Buf[:], startTs[0]) - buf = append(buf, uint64Buf[:]...) - } +// encodeTxnCommitTsBoundaryKey encodes the event-store key boundary up to txnCommitTs. +func encodeTxnCommitTsBoundaryKey(uniqueID uint64, tableID int64, txnCommitTs uint64) []byte { + buf := make([]byte, encodedKeyTxnCommitTsEnd) + encodeTxnCommitTsBoundaryKeyTo(buf, uniqueID, tableID, txnCommitTs) + return buf +} + +func encodeScanLowerBound(uniqueID uint64, tableID int64, txnCommitTs uint64, txnStartTs uint64) []byte { + buf := make([]byte, encodedKeyAttributesOffset) + encodeTxnCommitTsBoundaryKeyTo(buf, uniqueID, tableID, txnCommitTs) + binary.BigEndian.PutUint64(buf[encodedKeyTxnCommitTsEnd:encodedKeyAttributesOffset], txnStartTs) return buf } +func encodeTxnCommitTsBoundaryKeyTo(buf []byte, uniqueID uint64, tableID int64, txnCommitTs uint64) { + binary.BigEndian.PutUint64(buf[:encodedKeyUint64Len], uniqueID) + binary.BigEndian.PutUint64(buf[encodedKeyUint64Len:encodedKeyTxnCommitTsStart], uint64(tableID)) + binary.BigEndian.PutUint64(buf[encodedKeyTxnCommitTsStart:encodedKeyTxnCommitTsEnd], txnCommitTs) +} + func encodedKeyLen(event *common.RawKVEntry) int { - // uniqueID, tableID, CRTs, startTs, Put/Delete, CompressionType, Key - return 8 + 8 + 8 + 8 + 1 + 1 + len(event.Key) + // uniqueID, tableID, txnCommitTs, txnStartTs, Put/Delete, CompressionType, Key + return encodedKeyAttributesEnd + len(event.Key) } // EncodeKeyTo appends an encoded event-store key to buf. -// Format: uniqueID, tableID, CRTs, startTs, delete/update/insert, Key. +// Format: uniqueID, tableID, txnCommitTs, txnStartTs, delete/update/insert, Key. func EncodeKeyTo( buf []byte, uniqueID uint64, @@ -97,9 +94,9 @@ func EncodeKeyTo( buf = binary.BigEndian.AppendUint64(buf, uniqueID) // table ID buf = binary.BigEndian.AppendUint64(buf, uint64(tableID)) - // CRTs + // txn commit ts buf = binary.BigEndian.AppendUint64(buf, event.CRTs) - // startTs + // txn start ts buf = binary.BigEndian.AppendUint64(buf, event.StartTs) // Let Delete < Update < Insert dmlOrder := getDMLOrder(event) @@ -114,12 +111,22 @@ func EncodeKey(uniqueID uint64, tableID int64, event *common.RawKVEntry, compres return EncodeKeyTo(make([]byte, 0, encodedKeyLen(event)), uniqueID, tableID, event, compressionType) } -// DecodeKeyMetas decodes compression type and dml order from the key. -func DecodeKeyMetas(key []byte) (DMLOrder, CompressionType) { - combinedOrder := binary.BigEndian.Uint16(key[32:34]) // The combined order is at offset 32 for 2 bytes. +// DecodeKeyAttributes decodes compression type and dml order from the key. +func DecodeKeyAttributes(key []byte) (DMLOrder, CompressionType) { + combinedOrder := binary.BigEndian.Uint16(key[encodedKeyAttributesOffset:encodedKeyAttributesEnd]) return DMLOrder((combinedOrder & dmlOrderMask) >> dmlOrderShift), CompressionType(combinedOrder & compressionMask) } +// decodeTxnCommitTsFromEncodedKey decodes txnCommitTs from an event-store key boundary. +// It works for both full event keys and DeleteRange boundary keys because both +// contain uniqueID, tableID, and txnCommitTs as the first three fields. +func decodeTxnCommitTsFromEncodedKey(key []byte) (uint64, bool) { + if len(key) < encodedKeyTxnCommitTsEnd { + return 0, false + } + return binary.BigEndian.Uint64(key[encodedKeyTxnCommitTsStart:encodedKeyTxnCommitTsEnd]), true +} + // getDMLOrder returns the order of the dml types: delete pending.item.endTs { - pending.item.endTs = endTS + if endTxnCommitTs > pending.item.endTxnCommitTs { + pending.item.endTxnCommitTs = endTxnCommitTs } } @@ -163,7 +169,7 @@ func (d *gcManager) pendingDeleteRangeCount() int { func shouldFlushDeleteRange(pending *pendingDeleteRange, now time.Time, minRangeInterval, maxDelay time.Duration) bool { // addGCItem only records valid ranges, so this should not happen in normal flow. // Keep the guard as a defensive check against unexpected state or future refactors. - if pending == nil || pending.item.endTs <= pending.item.startTs { + if pending == nil || pending.item.endTxnCommitTs <= pending.item.startTxnCommitTs { return false } if maxDelay > 0 && now.Sub(pending.firstEnqueueTime) >= maxDelay { @@ -173,8 +179,8 @@ func shouldFlushDeleteRange(pending *pendingDeleteRange, now time.Time, minRange return true } - startPhysical := oracle.ExtractPhysical(pending.item.startTs) - endPhysical := oracle.ExtractPhysical(pending.item.endTs) + startPhysical := oracle.ExtractPhysical(pending.item.startTxnCommitTs) + endPhysical := oracle.ExtractPhysical(pending.item.endTxnCommitTs) if endPhysical <= startPhysical { return false } @@ -290,7 +296,7 @@ func (d *gcManager) run(ctx context.Context) error { func (d *gcManager) doGCJob(ranges []gcRangeItem) { for _, r := range ranges { db := d.dbs[r.dbIndex] - if err := d.deleteDataRange(db, r.uniqueKeyID, r.tableID, r.startTs, r.endTs); err != nil { + if err := d.deleteDataRange(db, r.uniqueKeyID, r.tableID, r.startTxnCommitTs, r.endTxnCommitTs); err != nil { log.Warn("gc manager failed to delete data range", zap.Error(err)) } } @@ -306,8 +312,8 @@ func (d *gcManager) updateCompactRanges(ranges []gcRangeItem) { state = &compactState{} d.compactRanges[key] = state } - if state.endTs < r.endTs { - state.endTs = r.endTs + if state.endTxnCommitTs < r.endTxnCommitTs { + state.endTxnCommitTs = r.endTxnCommitTs state.compacted = false } } @@ -318,7 +324,7 @@ func (d *gcManager) doCompaction() { d.mu.Lock() for key, state := range d.compactRanges { if !state.compacted { - toCompact[key] = state.endTs + toCompact[key] = state.endTxnCommitTs state.compacted = true } } @@ -326,14 +332,14 @@ func (d *gcManager) doCompaction() { startTime := time.Now() log.Info("gc manager compacting ranges", zap.Int("rangeCount", len(toCompact))) - for key, endTs := range toCompact { + for key, endTxnCommitTs := range toCompact { db := d.dbs[key.dbIndex] - if err := d.compactDataRange(db, key.uniqueKeyID, key.tableID, 0, endTs); err != nil { + if err := d.compactDataRange(db, key.uniqueKeyID, key.tableID, 0, endTxnCommitTs); err != nil { log.Warn("gc manager failed to compact data range", zap.Int("dbIndex", key.dbIndex), zap.Uint64("uniqueKeyID", key.uniqueKeyID), zap.Int64("tableID", key.tableID), - zap.Uint64("endTs", endTs), + zap.Uint64("endTxnCommitTs", endTxnCommitTs), zap.Error(err)) } } diff --git a/logservice/eventstore/gc_test.go b/logservice/eventstore/gc_test.go index 79aaed8ea8..f17b9342a8 100644 --- a/logservice/eventstore/gc_test.go +++ b/logservice/eventstore/gc_test.go @@ -58,11 +58,21 @@ func (m *mockDB) getCompactCalls() [][]byte { func TestGCManager(t *testing.T) { mdb := &mockDB{} - deleteFn := func(db *pebble.DB, uniqueKeyID uint64, tableID int64, startTs uint64, endTs uint64) error { - return mdb.DeleteRange(EncodeKeyPrefix(uniqueKeyID, tableID, startTs), EncodeKeyPrefix(uniqueKeyID, tableID, endTs), nil) + deleteFn := func( + db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, + ) error { + return mdb.DeleteRange( + encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), + encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), + nil) } - compactFn := func(db *pebble.DB, uniqueKeyID uint64, tableID int64, startTs uint64, endTs uint64) error { - return mdb.Compact(EncodeKeyPrefix(uniqueKeyID, tableID, startTs), EncodeKeyPrefix(uniqueKeyID, tableID, endTs), false) + compactFn := func( + db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64, + ) error { + return mdb.Compact( + encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs), + encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs), + false) } gcm := newGCManager([]*pebble.DB{nil}, deleteFn, compactFn) @@ -83,26 +93,26 @@ func TestGCManager(t *testing.T) { deleteCalls := mdb.getDeleteCalls() require.Len(t, deleteCalls, 4) // The order of delete ranges is not guaranteed because it iterates over a map. - if bytes.Equal(deleteCalls[0], EncodeKeyPrefix(1, 10, 100)) { - require.Equal(t, EncodeKeyPrefix(1, 10, 200), deleteCalls[1]) - require.Equal(t, EncodeKeyPrefix(1, 20, 300), deleteCalls[2]) - require.Equal(t, EncodeKeyPrefix(1, 20, 400), deleteCalls[3]) + if bytes.Equal(deleteCalls[0], encodeTxnCommitTsBoundaryKey(1, 10, 100)) { + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[1]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[2]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[3]) } else { - require.Equal(t, EncodeKeyPrefix(1, 20, 300), deleteCalls[0]) - require.Equal(t, EncodeKeyPrefix(1, 20, 400), deleteCalls[1]) - require.Equal(t, EncodeKeyPrefix(1, 10, 100), deleteCalls[2]) - require.Equal(t, EncodeKeyPrefix(1, 10, 200), deleteCalls[3]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 300), deleteCalls[0]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 400), deleteCalls[1]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 100), deleteCalls[2]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 200), deleteCalls[3]) } // Check internal state for compaction gcm.mu.Lock() state1, ok := gcm.compactRanges[compactKey1] require.True(t, ok) - require.Equal(t, uint64(200), state1.endTs) + require.Equal(t, uint64(200), state1.endTxnCommitTs) require.False(t, state1.compacted) state2, ok := gcm.compactRanges[compactKey2] require.True(t, ok) - require.Equal(t, uint64(400), state2.endTs) + require.Equal(t, uint64(400), state2.endTxnCommitTs) require.False(t, state2.compacted) gcm.mu.Unlock() } @@ -112,15 +122,15 @@ func TestGCManager(t *testing.T) { compactCalls := mdb.getCompactCalls() require.Len(t, compactCalls, 4) // The order of compaction is not guaranteed because it iterates over a map. - if bytes.Equal(compactCalls[0], EncodeKeyPrefix(1, 10, 0)) { - require.Equal(t, EncodeKeyPrefix(1, 10, 200), compactCalls[1]) - require.Equal(t, EncodeKeyPrefix(1, 20, 0), compactCalls[2]) - require.Equal(t, EncodeKeyPrefix(1, 20, 400), compactCalls[3]) + if bytes.Equal(compactCalls[0], encodeTxnCommitTsBoundaryKey(1, 10, 0)) { + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[1]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[2]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[3]) } else { - require.Equal(t, EncodeKeyPrefix(1, 20, 0), compactCalls[0]) - require.Equal(t, EncodeKeyPrefix(1, 20, 400), compactCalls[1]) - require.Equal(t, EncodeKeyPrefix(1, 10, 0), compactCalls[2]) - require.Equal(t, EncodeKeyPrefix(1, 10, 200), compactCalls[3]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 0), compactCalls[0]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 20, 400), compactCalls[1]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 0), compactCalls[2]) + require.Equal(t, encodeTxnCommitTsBoundaryKey(1, 10, 200), compactCalls[3]) } // Verify internal state is now compacted gcm.mu.Lock() @@ -155,12 +165,12 @@ func TestGCManagerDelaysSmallDeleteRanges(t *testing.T) { gcm := newGCManager(nil, nil, nil) now := time.Unix(100, 0) - startTs := oracle.ComposeTS(1_000, 0) - midTs := oracle.ComposeTS(2_000, 0) - endTs := oracle.ComposeTS(3_000, 0) + startTxnCommitTs := oracle.ComposeTS(1_000, 0) + midTxnCommitTs := oracle.ComposeTS(2_000, 0) + endTxnCommitTs := oracle.ComposeTS(3_000, 0) - gcm.addGCItem(0, 1, 10, startTs, midTs) - gcm.addGCItem(0, 1, 10, midTs, endTs) + gcm.addGCItem(0, 1, 10, startTxnCommitTs, midTxnCommitTs) + gcm.addGCItem(0, 1, 10, midTxnCommitTs, endTxnCommitTs) compactKey := compactItemKey{dbIndex: 0, uniqueKeyID: 1, tableID: 10} gcm.mu.Lock() @@ -175,18 +185,18 @@ func TestGCManagerDelaysSmallDeleteRanges(t *testing.T) { gcm.mu.Lock() pending, ok = gcm.deleteRanges[compactKey] require.True(t, ok) - require.Equal(t, startTs, pending.item.startTs) - require.Equal(t, endTs, pending.item.endTs) + require.Equal(t, startTxnCommitTs, pending.item.startTxnCommitTs) + require.Equal(t, endTxnCommitTs, pending.item.endTxnCommitTs) gcm.mu.Unlock() ranges = gcm.fetchGCItems(now.Add(31*time.Minute), 5*time.Minute, 30*time.Minute) require.Len(t, ranges, 1) require.Equal(t, gcRangeItem{ - dbIndex: 0, - uniqueKeyID: 1, - tableID: 10, - startTs: startTs, - endTs: endTs, + dbIndex: 0, + uniqueKeyID: 1, + tableID: 10, + startTxnCommitTs: startTxnCommitTs, + endTxnCommitTs: endTxnCommitTs, }, ranges[0]) require.Equal(t, 0, gcm.pendingDeleteRangeCount()) } @@ -195,10 +205,10 @@ func TestGCManagerFlushesLargeDeleteRangeImmediately(t *testing.T) { gcm := newGCManager(nil, nil, nil) now := time.Unix(100, 0) - startTs := oracle.ComposeTS(1_000, 0) - endTs := oracle.ComposeTS(1_000+6*60*1000, 0) + startTxnCommitTs := oracle.ComposeTS(1_000, 0) + endTxnCommitTs := oracle.ComposeTS(1_000+6*60*1000, 0) - gcm.addGCItem(0, 1, 10, startTs, endTs) + gcm.addGCItem(0, 1, 10, startTxnCommitTs, endTxnCommitTs) compactKey := compactItemKey{dbIndex: 0, uniqueKeyID: 1, tableID: 10} gcm.mu.Lock() @@ -210,11 +220,11 @@ func TestGCManagerFlushesLargeDeleteRangeImmediately(t *testing.T) { ranges := gcm.fetchGCItems(now.Add(time.Minute), 5*time.Minute, 30*time.Minute) require.Len(t, ranges, 1) require.Equal(t, gcRangeItem{ - dbIndex: 0, - uniqueKeyID: 1, - tableID: 10, - startTs: startTs, - endTs: endTs, + dbIndex: 0, + uniqueKeyID: 1, + tableID: 10, + startTxnCommitTs: startTxnCommitTs, + endTxnCommitTs: endTxnCommitTs, }, ranges[0]) require.Equal(t, 0, gcm.pendingDeleteRangeCount()) } @@ -236,18 +246,18 @@ func TestGCManagerWidensDisjointDeleteRanges(t *testing.T) { pending, ok := gcm.deleteRanges[compactKey] require.True(t, ok) pending.firstEnqueueTime = now - require.Equal(t, firstStart, pending.item.startTs) - require.Equal(t, secondEnd, pending.item.endTs) + require.Equal(t, firstStart, pending.item.startTxnCommitTs) + require.Equal(t, secondEnd, pending.item.endTxnCommitTs) gcm.mu.Unlock() ranges := gcm.fetchGCItems(now.Add(31*time.Minute), 5*time.Minute, 30*time.Minute) require.Len(t, ranges, 1) require.Equal(t, gcRangeItem{ - dbIndex: 0, - uniqueKeyID: 1, - tableID: 10, - startTs: firstStart, - endTs: secondEnd, + dbIndex: 0, + uniqueKeyID: 1, + tableID: 10, + startTxnCommitTs: firstStart, + endTxnCommitTs: secondEnd, }, ranges[0]) require.Equal(t, 0, gcm.pendingDeleteRangeCount()) } diff --git a/logservice/eventstore/pebble.go b/logservice/eventstore/pebble.go index 08d252070a..e63c0e714b 100644 --- a/logservice/eventstore/pebble.go +++ b/logservice/eventstore/pebble.go @@ -32,6 +32,7 @@ const ( cacheSize = 1 << 30 // 1GB memTableTotalSize = 1 << 30 // 1GB memTableSize = 64 << 20 // 64MB + maxOpenFilesPerDB = 10000 ) func newPebbleOptions(dbNum int) *pebble.Options { @@ -39,7 +40,7 @@ func newPebbleOptions(dbNum int) *pebble.Options { // Disable WAL to decrease io DisableWAL: true, - MaxOpenFiles: 10000, + MaxOpenFiles: maxOpenFilesPerDB, MaxConcurrentCompactions: func() int { return 6 }, @@ -56,6 +57,9 @@ func newPebbleOptions(dbNum int) *pebble.Options { // Configure options to optimize read/write performance Levels: make([]pebble.LevelOptions, 7), + TablePropertyCollectors: []func() pebble.TablePropertyCollector{ + newEventStoreTxnCommitTsCollector, + }, } for i := 0; i < len(opts.Levels); i++ { @@ -74,9 +78,10 @@ func newPebbleOptions(dbNum int) *pebble.Options { return opts } -func createPebbleDBs(rootDir string, dbNum int) []*pebble.DB { +func createPebbleDBs(rootDir string, dbNum int) ([]*pebble.DB, *pebble.Cache, *pebble.TableCache) { cache := pebble.NewCache(cacheSize) - tableCache := pebble.NewTableCache(cache, dbNum, int(cache.MaxSize())) + tableCacheSize := dbNum * pebble.TableCacheSize(maxOpenFilesPerDB) + tableCache := pebble.NewTableCache(cache, dbNum, tableCacheSize) dbs := make([]*pebble.DB, dbNum) for i := 0; i < dbNum; i++ { id := strconv.Itoa(i + 1) @@ -113,7 +118,7 @@ func createPebbleDBs(rootDir string, dbNum int) []*pebble.DB { } dbs[i] = db } - return dbs + return dbs, cache, tableCache } type eventStoreWriteStallState struct { diff --git a/logservice/eventstore/pebble_test.go b/logservice/eventstore/pebble_test.go index c95bf3928d..501d48a1aa 100644 --- a/logservice/eventstore/pebble_test.go +++ b/logservice/eventstore/pebble_test.go @@ -18,6 +18,7 @@ import ( "encoding/binary" "fmt" "os" + "strconv" "testing" "github.com/cockroachdb/pebble" @@ -121,12 +122,12 @@ func TestCompressionAndKeyOrder(t *testing.T) { Key: []byte("test-key"), } keyWithZstd := EncodeKey(1, 1, ev, CompressionZSTD) - dmlOrder, compressionType := DecodeKeyMetas(keyWithZstd) + dmlOrder, compressionType := DecodeKeyAttributes(keyWithZstd) require.Equal(t, DMLOrderInsert, dmlOrder) require.Equal(t, CompressionZSTD, compressionType) keyWithNone := EncodeKey(1, 1, ev, CompressionNone) - dmlOrder, compressionType = DecodeKeyMetas(keyWithNone) + dmlOrder, compressionType = DecodeKeyAttributes(keyWithNone) require.Equal(t, DMLOrderInsert, dmlOrder) require.Equal(t, CompressionNone, compressionType) @@ -143,3 +144,73 @@ func TestCompressionAndKeyOrder(t *testing.T) { require.Less(t, bytes.Compare(keyDelete, keyUpdate), 0, "Delete should come before Update") require.Less(t, bytes.Compare(keyUpdate, keyInsert), 0, "Update should come before Insert") } + +func TestEventStoreKeyBounds(t *testing.T) { + t.Parallel() + + event := &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: 20, + CRTs: 10, + Key: []byte("key"), + } + key := EncodeKey(1, 1, event, CompressionNone) + commitTsBoundaryKey := encodeTxnCommitTsBoundaryKey(1, 1, event.CRTs) + require.Len(t, commitTsBoundaryKey, encodedKeyTxnCommitTsEnd) + require.True(t, bytes.HasPrefix(key, commitTsBoundaryKey)) + + lowerBound := encodeScanLowerBound(1, 1, event.CRTs, event.StartTs) + require.Len(t, lowerBound, encodedKeyAttributesOffset) + require.True(t, bytes.HasPrefix(key, lowerBound)) + + previousEvent := &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: event.StartTs - 1, + CRTs: event.CRTs, + Key: []byte("key"), + } + require.Less(t, bytes.Compare(EncodeKey(1, 1, previousEvent, CompressionNone), lowerBound), 0) +} + +func TestEventStoreTxnCommitTsCollector(t *testing.T) { + t.Parallel() + + collector := newEventStoreTxnCommitTsCollector() + event := &common.RawKVEntry{ + OpType: common.OpTypePut, + StartTs: 1, + CRTs: 10, + Key: []byte("key"), + } + eventKey := EncodeKey(1, 1, event, CompressionNone) + eventValue := []byte("value") + require.NoError(t, collector.Add(pebble.InternalKey{ + UserKey: eventKey, + Trailer: uint64(pebble.InternalKeyKindSet), + }, eventValue)) + deleteStart := encodeTxnCommitTsBoundaryKey(1, 1, 100) + deleteEnd := encodeTxnCommitTsBoundaryKey(1, 1, 200) + require.NoError(t, collector.Add(pebble.InternalKey{ + UserKey: deleteStart, + Trailer: uint64(pebble.InternalKeyKindRangeDelete), + }, deleteEnd)) + + props := make(map[string]string) + require.NoError(t, collector.Finish(props)) + require.Equal(t, "10", props[eventStoreMinTxnCommitTsProperty]) + require.Equal(t, "200", props[eventStoreMaxTxnCommitTsProperty]) + require.Equal(t, strconv.Itoa(len(eventKey)+len(eventValue)+len(deleteStart)+len(deleteEnd)), + props[eventStoreLogicalBytesProperty]) + + require.True(t, newEventStoreSSTFileFilter(9, 10)(props)) + require.True(t, newEventStoreSSTFileFilter(100, 100)(props)) + require.True(t, newEventStoreSSTFileFilter(101, 300)(props)) + require.False(t, newEventStoreSSTFileFilter(201, 300)(props)) + require.True(t, newEventStoreSSTFileFilter(101, 300)(nil)) + + corruptedProps := map[string]string{ + eventStoreMinTxnCommitTsProperty: "300", + eventStoreMaxTxnCommitTsProperty: "100", + } + require.True(t, newEventStoreSSTFileFilter(101, 200)(corruptedProps)) +} diff --git a/logservice/eventstore/table_properties.go b/logservice/eventstore/table_properties.go new file mode 100644 index 0000000000..c354be145e --- /dev/null +++ b/logservice/eventstore/table_properties.go @@ -0,0 +1,165 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventstore + +import ( + "strconv" + + "github.com/cockroachdb/pebble" + "github.com/pingcap/ticdc/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +// Pebble table properties are per-SST metadata generated when Pebble writes an +// SST file. Event store stores the txn commit ts range of each SST here, and +// passes a TableFilter during scans so Pebble can skip SST files that cannot +// contain events in the requested commit-ts range. + +const ( + eventStoreMinTxnCommitTsProperty = "event-store-min-txn-commit-ts" + eventStoreMaxTxnCommitTsProperty = "event-store-max-txn-commit-ts" + eventStoreLogicalBytesProperty = "event-store-logical-bytes" + eventStoreTxnCommitTsCollectorName = "event-store-txn-commit-ts-collector" +) + +type eventStoreTxnCommitTsCollector struct { + minTs uint64 + maxTs uint64 + logicalBytes uint64 + hasTs bool +} + +func newEventStoreTxnCommitTsCollector() pebble.TablePropertyCollector { + return &eventStoreTxnCommitTsCollector{} +} + +func (c *eventStoreTxnCommitTsCollector) Add(key pebble.InternalKey, value []byte) error { + // Range deletion handling: + // 1. Pebble passes the tombstone start key in key.UserKey and the exclusive + // end key in value. + // 2. Event store uses DeleteRange only for GC, so normal event scans should + // not encounter these tombstones. + // 3. Recording both boundaries keeps the SST property consistent with + // Pebble's [start, end) range deletion format. This affects only SST + // metadata and does not affect scan correctness or filtering precision. + c.recordEncodedKey(key.UserKey) + if key.Kind() == pebble.InternalKeyKindRangeDelete { + c.recordEncodedKey(value) + } + c.logicalBytes += uint64(len(key.UserKey) + len(value)) + return nil +} + +func (c *eventStoreTxnCommitTsCollector) Finish(userProps map[string]string) error { + if !c.hasTs { + return nil + } + userProps[eventStoreMinTxnCommitTsProperty] = strconv.FormatUint(c.minTs, 10) + userProps[eventStoreMaxTxnCommitTsProperty] = strconv.FormatUint(c.maxTs, 10) + userProps[eventStoreLogicalBytesProperty] = strconv.FormatUint(c.logicalBytes, 10) + return nil +} + +func (c *eventStoreTxnCommitTsCollector) Name() string { + return eventStoreTxnCommitTsCollectorName +} + +func (c *eventStoreTxnCommitTsCollector) recordEncodedKey(key []byte) { + txnCommitTs, ok := decodeTxnCommitTsFromEncodedKey(key) + if !ok { + return + } + if !c.hasTs { + c.minTs = txnCommitTs + c.maxTs = txnCommitTs + c.hasTs = true + return + } + if txnCommitTs < c.minTs { + c.minTs = txnCommitTs + } + if txnCommitTs > c.maxTs { + c.maxTs = txnCommitTs + } +} + +func newEventStoreSSTFileFilter(lowerTs uint64, upperTs uint64) func(map[string]string) bool { + scannedCount := metrics.EventStoreSSTFileFilterCount.WithLabelValues("scanned") + skippedCount := metrics.EventStoreSSTFileFilterCount.WithLabelValues("skipped") + scannedLogicalBytes := metrics.EventStoreSSTFileFilterLogicalBytes.WithLabelValues("scanned") + skippedLogicalBytes := metrics.EventStoreSSTFileFilterLogicalBytes.WithLabelValues("skipped") + return func(userProps map[string]string) bool { + shouldScan := eventStoreSSTFileMayContainTxnCommitTs(userProps, lowerTs, upperTs) + recordEventStoreSSTFileFilterMetrics( + userProps, + shouldScan, + scannedCount, + skippedCount, + scannedLogicalBytes, + skippedLogicalBytes, + ) + return shouldScan + } +} + +func eventStoreSSTFileMayContainTxnCommitTs(userProps map[string]string, lowerTs uint64, upperTs uint64) bool { + minTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMinTxnCommitTsProperty) + if !ok { + return true + } + maxTs, ok := parseEventStoreUint64TableProperty(userProps, eventStoreMaxTxnCommitTsProperty) + if !ok { + return true + } + if minTs > maxTs { + // Corrupted or incompatible properties should not make Pebble skip data. + return true + } + // Two inclusive ranges [minTs, maxTs] and [lowerTs, upperTs] overlap iff + // each range starts before or at the other range's end. Equal boundaries are + // included because commit-ts scan ranges are inclusive here. + return maxTs >= lowerTs && minTs <= upperTs +} + +func recordEventStoreSSTFileFilterMetrics( + userProps map[string]string, + shouldScan bool, + scannedCount prometheus.Counter, + skippedCount prometheus.Counter, + scannedLogicalBytes prometheus.Counter, + skippedLogicalBytes prometheus.Counter, +) { + count := scannedCount + logicalBytesCounter := scannedLogicalBytes + if !shouldScan { + count = skippedCount + logicalBytesCounter = skippedLogicalBytes + } + count.Inc() + if logicalBytes, ok := parseEventStoreUint64TableProperty(userProps, eventStoreLogicalBytesProperty); ok { + logicalBytesCounter.Add(float64(logicalBytes)) + } +} + +func parseEventStoreUint64TableProperty(userProps map[string]string, key string) (uint64, bool) { + value, ok := userProps[key] + if !ok { + return 0, false + } + ts, err := strconv.ParseUint(value, 10, 64) + if err != nil { + return 0, false + } + return ts, true +} diff --git a/pkg/metrics/event_store.go b/pkg/metrics/event_store.go index d934173f27..ff272268d0 100644 --- a/pkg/metrics/event_store.go +++ b/pkg/metrics/event_store.go @@ -81,6 +81,20 @@ var ( Help: "The number of bytes scanned by event store.", }, []string{"type"}) + EventStoreSSTFileFilterCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_store", + Name: "sst_file_filter_count", + Help: "The number of SST file filter decisions by event store.", + }, []string{"decision"}) + + EventStoreSSTFileFilterLogicalBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "event_store", + Name: "sst_file_filter_logical_bytes", + Help: "The estimated logical bytes in SST files scanned or skipped by event store.", + }, []string{"decision"}) + EventStoreDeleteRangeCount = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: "ticdc", @@ -336,6 +350,8 @@ func initEventStoreMetrics(registry *prometheus.Registry) { registry.MustRegister(EventStoreWriteDurationHistogram) registry.MustRegister(EventStoreScanRequestsCount) registry.MustRegister(EventStoreScanBytes) + registry.MustRegister(EventStoreSSTFileFilterCount) + registry.MustRegister(EventStoreSSTFileFilterLogicalBytes) registry.MustRegister(EventStoreDeleteRangeCount) registry.MustRegister(EventStoreDeleteRangeFetchedCount) registry.MustRegister(EventStoreSubscriptionResolvedTsLagHist)