Skip to content
59 changes: 52 additions & 7 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ type eventStore struct {
subClient logpuller.SubscriptionClient

dbs []*pebble.DB
pebbleCache *pebble.Cache
tableCache *pebble.TableCache
chs []*chann.UnlimitedChannel[eventWithCallback, uint64]
writeTaskPools []*writeTaskPool

Expand Down Expand Up @@ -258,11 +260,14 @@ func New(
log.Panic("fail to remove path", zap.String("path", dbPath), zap.Error(err))
}

dbs, pebbleCache, tableCache := createPebbleDBs(dbPath, dbCount)
store := &eventStore{
pdClock: appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock),
subClient: subClient,

dbs: createPebbleDBs(dbPath, dbCount),
dbs: dbs,
pebbleCache: pebbleCache,
tableCache: tableCache,
chs: make([]*chann.UnlimitedChannel[eventWithCallback, uint64], 0, dbCount),
writeTaskPools: make([]*writeTaskPool, 0, dbCount),

Expand Down Expand Up @@ -425,6 +430,16 @@ func (e *eventStore) Close(_ context.Context) error {
log.Error("failed to close pebble db", zap.Error(err))
}
}
if e.tableCache != nil {
if err := e.tableCache.Unref(); err != nil {
log.Warn("failed to unref pebble table cache", zap.Error(err))
}
e.tableCache = nil
}
if e.pebbleCache != nil {
e.pebbleCache.Unref()
e.pebbleCache = nil
}

return nil
}
Expand Down Expand Up @@ -874,19 +889,49 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com
e.dispatcherMeta.Unlock()
}

// convert range before pass it to pebble: (startTs, endTs] is equal to [startTs + 1, endTs + 1)
// dataRange fields:
// CommitTsStart and CommitTsEnd define the commit-ts scan window.
// LastScannedTxnStartTs records how far the previous scan progressed inside
// CommitTsStart. It is zero if there is no unfinished scan at CommitTsStart.
//
// Iterator key bounds:
// Pebble uses [LowerBound, UpperBound), so end is always encoded as
// CommitTsEnd+1.
//
// If LastScannedTxnStartTs is zero, scan commit ts in
// (CommitTsStart, CommitTsEnd], and use CommitTsStart+1 as LowerBound.
//
// If LastScannedTxnStartTs is non-zero, continue scanning commit ts
// CommitTsStart with start ts greater than LastScannedTxnStartTs, then scan
// later commit ts up to CommitTsEnd.
//
// Table filter bounds:
// lowerTs is the commit-ts lower bound for TableFilter. It skips SSTs whose
// collected txn commit ts range does not overlap [lowerTs, CommitTsEnd].
// Therefore lowerTs is CommitTsStart+1 in the first case, and CommitTsStart
// in the second case.
var start []byte
lowerTs := dataRange.CommitTsStart + 1
if dataRange.LastScannedTxnStartTs != 0 {
start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart, dataRange.LastScannedTxnStartTs+1)
start = encodeScanLowerBound(
uint64(subStat.subID),
stat.tableSpan.TableID,
dataRange.CommitTsStart,
dataRange.LastScannedTxnStartTs+1,
)
lowerTs = dataRange.CommitTsStart
} else {
start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1)
start = encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1)
}
end := EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1)
// TODO: optimize read performance
end := encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1)
// it's impossible return error here
iter, _ := db.NewIter(&pebble.IterOptions{
LowerBound: start,
UpperBound: end,
TableFilter: newEventStoreSSTFileFilter(
lowerTs,
dataRange.CommitTsEnd,
),
})
Comment on lines 928 to 935
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The error returned by db.NewIter is ignored. If an error occurs (e.g., due to invalid bounds if CommitTsStart > CommitTsEnd), iter will be nil, which will cause a panic when iter.First() is called on line 939. It is recommended to handle the error and return early to avoid a potential crash.

Suggested change
iter, _ := db.NewIter(&pebble.IterOptions{
LowerBound: start,
UpperBound: end,
TableFilter: newEventStoreSSTFileFilter(
lowerTs,
dataRange.CommitTsEnd,
),
})
iter, err := db.NewIter(&pebble.IterOptions{
LowerBound: start,
UpperBound: end,
TableFilter: newEventStoreSSTFileFilter(
lowerTs,
dataRange.CommitTsEnd,
),
})
if err != nil {
log.Error("failed to create pebble iterator",
zap.Stringer("dispatcherID", dispatcherID),
zap.Error(err))
return nil
}

decoder := e.decoderPool.Get().(*zstd.Decoder)
startTime := time.Now()
Expand Down Expand Up @@ -1429,7 +1474,7 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) {
key := iter.innerIter.Key()
value := iter.innerIter.Value()

_, compressionType := DecodeKeyMetas(key)
_, compressionType := DecodeKeyAttributes(key)
var decodedValue []byte
if compressionType == CompressionZSTD {
var err error
Expand Down
4 changes: 2 additions & 2 deletions logservice/eventstore/event_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ func BenchmarkEventStoreIteratorNext(b *testing.B) {
StartKey: []byte{},
EndKey: []byte{0xff},
}
lower := EncodeKeyPrefix(1, 1, 1)
upper := EncodeKeyPrefix(1, 1, 1<<63)
lower := encodeTxnCommitTsBoundaryKey(1, 1, 1)
upper := encodeTxnCommitTsBoundaryKey(1, 1, 1<<63)

b.ReportAllocs()
b.ResetTimer()
Expand Down
8 changes: 4 additions & 4 deletions logservice/eventstore/event_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ func TestWriteToEventStore(t *testing.T) {
key := iter.Key()
value := iter.Value()

_, compressionType := DecodeKeyMetas(key)
_, compressionType := DecodeKeyAttributes(key)

var decodedValue []byte
if compressionType == CompressionZSTD {
Expand Down Expand Up @@ -965,7 +965,7 @@ func TestWriteToEventStoreZstdCompressionDisabled(t *testing.T) {

count := 0
for iter.First(); iter.Valid(); iter.Next() {
_, compressionType := DecodeKeyMetas(iter.Key())
_, compressionType := DecodeKeyAttributes(iter.Key())
require.Equal(t, CompressionNone, compressionType)

readEntry := &common.RawKVEntry{}
Expand Down Expand Up @@ -1248,8 +1248,8 @@ func TestEventStoreIter_NextWithFiltering(t *testing.T) {

// Create iterator with a wider range to ensure it sees all keys,
// so we can test the internal filtering logic.
start := EncodeKeyPrefix(subID, tableID, 0)
end := EncodeKeyPrefix(subID, tableID, 500)
start := encodeTxnCommitTsBoundaryKey(subID, tableID, 0)
end := encodeTxnCommitTsBoundaryKey(subID, tableID, 500)
innerIter, err := db.NewIter(&pebble.IterOptions{
LowerBound: start,
UpperBound: end,
Expand Down
95 changes: 53 additions & 42 deletions logservice/eventstore/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,51 +38,48 @@ const (
CompressionZSTD
)

const (
encodedKeyUint64Len = 8
encodedKeyTxnCommitTsStart = 2 * encodedKeyUint64Len
encodedKeyTxnCommitTsEnd = encodedKeyTxnCommitTsStart + encodedKeyUint64Len
encodedKeyAttributesOffset = 4 * encodedKeyUint64Len
encodedKeyAttributesEnd = encodedKeyAttributesOffset + 2
)

const (
// Bitmask for DML order and compression type.
dmlOrderMask = 0xFF00 // DML order is stored in the high 8 bits for sorting.
compressionMask = 0x00FF // Compression type is stored in the low 8 bits.
dmlOrderShift = 8
)

// EncodeKeyPrefix encodes uniqueID, tableID, CRTs and StartTs.
// StartTs is optional.
// The result should be a prefix of normal key. (TODO: add a unit test)
func EncodeKeyPrefix(uniqueID uint64, tableID int64, CRTs uint64, startTs ...uint64) []byte {
if len(startTs) > 1 {
log.Panic("startTs should be at most one")
}
// uniqueID, tableID, CRTs.
keySize := 8 + 8 + 8
if len(startTs) > 0 {
keySize += 8
}
buf := make([]byte, 0, keySize)
uint64Buf := [8]byte{}
// uniqueID
binary.BigEndian.PutUint64(uint64Buf[:], uniqueID)
buf = append(buf, uint64Buf[:]...)
// tableID
binary.BigEndian.PutUint64(uint64Buf[:], uint64(tableID))
buf = append(buf, uint64Buf[:]...)
// CRTs
binary.BigEndian.PutUint64(uint64Buf[:], CRTs)
buf = append(buf, uint64Buf[:]...)
if len(startTs) > 0 {
// startTs
binary.BigEndian.PutUint64(uint64Buf[:], startTs[0])
buf = append(buf, uint64Buf[:]...)
}
// encodeTxnCommitTsBoundaryKey encodes the event-store key boundary up to txnCommitTs.
func encodeTxnCommitTsBoundaryKey(uniqueID uint64, tableID int64, txnCommitTs uint64) []byte {
buf := make([]byte, encodedKeyTxnCommitTsEnd)
encodeTxnCommitTsBoundaryKeyTo(buf, uniqueID, tableID, txnCommitTs)
return buf
}

func encodeScanLowerBound(uniqueID uint64, tableID int64, txnCommitTs uint64, txnStartTs uint64) []byte {
buf := make([]byte, encodedKeyAttributesOffset)
encodeTxnCommitTsBoundaryKeyTo(buf, uniqueID, tableID, txnCommitTs)
binary.BigEndian.PutUint64(buf[encodedKeyTxnCommitTsEnd:encodedKeyAttributesOffset], txnStartTs)
return buf
}

func encodeTxnCommitTsBoundaryKeyTo(buf []byte, uniqueID uint64, tableID int64, txnCommitTs uint64) {
binary.BigEndian.PutUint64(buf[:encodedKeyUint64Len], uniqueID)
binary.BigEndian.PutUint64(buf[encodedKeyUint64Len:encodedKeyTxnCommitTsStart], uint64(tableID))
binary.BigEndian.PutUint64(buf[encodedKeyTxnCommitTsStart:encodedKeyTxnCommitTsEnd], txnCommitTs)
}

func encodedKeyLen(event *common.RawKVEntry) int {
// uniqueID, tableID, CRTs, startTs, Put/Delete, CompressionType, Key
return 8 + 8 + 8 + 8 + 1 + 1 + len(event.Key)
// uniqueID, tableID, txnCommitTs, txnStartTs, Put/Delete, CompressionType, Key
return encodedKeyAttributesEnd + len(event.Key)
}

// EncodeKeyTo appends an encoded event-store key to buf.
// Format: uniqueID, tableID, CRTs, startTs, delete/update/insert, Key.
// Format: uniqueID, tableID, txnCommitTs, txnStartTs, delete/update/insert, Key.
func EncodeKeyTo(
buf []byte,
uniqueID uint64,
Expand All @@ -97,9 +94,9 @@ func EncodeKeyTo(
buf = binary.BigEndian.AppendUint64(buf, uniqueID)
// table ID
buf = binary.BigEndian.AppendUint64(buf, uint64(tableID))
// CRTs
// txn commit ts
buf = binary.BigEndian.AppendUint64(buf, event.CRTs)
// startTs
// txn start ts
buf = binary.BigEndian.AppendUint64(buf, event.StartTs)
// Let Delete < Update < Insert
dmlOrder := getDMLOrder(event)
Expand All @@ -114,12 +111,22 @@ func EncodeKey(uniqueID uint64, tableID int64, event *common.RawKVEntry, compres
return EncodeKeyTo(make([]byte, 0, encodedKeyLen(event)), uniqueID, tableID, event, compressionType)
}

// DecodeKeyMetas decodes compression type and dml order from the key.
func DecodeKeyMetas(key []byte) (DMLOrder, CompressionType) {
combinedOrder := binary.BigEndian.Uint16(key[32:34]) // The combined order is at offset 32 for 2 bytes.
// DecodeKeyAttributes decodes compression type and dml order from the key.
func DecodeKeyAttributes(key []byte) (DMLOrder, CompressionType) {
combinedOrder := binary.BigEndian.Uint16(key[encodedKeyAttributesOffset:encodedKeyAttributesEnd])
return DMLOrder((combinedOrder & dmlOrderMask) >> dmlOrderShift), CompressionType(combinedOrder & compressionMask)
}

// decodeTxnCommitTsFromEncodedKey decodes txnCommitTs from an event-store key boundary.
// It works for both full event keys and DeleteRange boundary keys because both
// contain uniqueID, tableID, and txnCommitTs as the first three fields.
func decodeTxnCommitTsFromEncodedKey(key []byte) (uint64, bool) {
if len(key) < encodedKeyTxnCommitTsEnd {
return 0, false
}
return binary.BigEndian.Uint64(key[encodedKeyTxnCommitTsStart:encodedKeyTxnCommitTsEnd]), true
}

// getDMLOrder returns the order of the dml types: delete<update<insert
func getDMLOrder(rowKV *common.RawKVEntry) DMLOrder {
if rowKV.OpType == common.OpTypeDelete {
Expand All @@ -130,16 +137,20 @@ func getDMLOrder(rowKV *common.RawKVEntry) DMLOrder {
return DMLOrderInsert
}

func deleteDataRange(db *pebble.DB, uniqueKeyID uint64, tableID int64, startTs uint64, endTs uint64) error {
start := EncodeKeyPrefix(uniqueKeyID, tableID, startTs)
end := EncodeKeyPrefix(uniqueKeyID, tableID, endTs)
func deleteDataRange(
db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64,
) error {
start := encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs)
end := encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs)

return db.DeleteRange(start, end, pebble.NoSync)
}

func compactDataRange(db *pebble.DB, uniqueKeyID uint64, tableID int64, startTs uint64, endTs uint64) error {
start := EncodeKeyPrefix(uniqueKeyID, tableID, startTs)
end := EncodeKeyPrefix(uniqueKeyID, tableID, endTs)
func compactDataRange(
db *pebble.DB, uniqueKeyID uint64, tableID int64, startTxnCommitTs uint64, endTxnCommitTs uint64,
) error {
start := encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, startTxnCommitTs)
end := encodeTxnCommitTsBoundaryKey(uniqueKeyID, tableID, endTxnCommitTs)

return db.Compact(start, end, false)
}
70 changes: 70 additions & 0 deletions logservice/eventstore/format_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package eventstore

import (
"encoding/hex"
"testing"

"github.com/pingcap/ticdc/pkg/common"
"github.com/stretchr/testify/require"
)

func TestEventStoreKeyFormatGolden(t *testing.T) {
t.Parallel()

const (
uniqueID = uint64(0x0102030405060708)
tableID = int64(0x1112131415161718)
txnCommitTs = uint64(0x2122232425262728)
txnStartTs = uint64(0x3132333435363738)
)
event := &common.RawKVEntry{
OpType: common.OpTypePut,
CRTs: txnCommitTs,
StartTs: txnStartTs,
Key: []byte{0x41, 0x42},
}

key := EncodeKey(uniqueID, tableID, event, CompressionZSTD)
expectedKey := mustDecodeHex(t, "010203040506070811121314151617182122232425262728313233343536373803014142")
require.Equal(t, expectedKey, key)
require.Equal(t, len(expectedKey), encodedKeyLen(event))

require.Equal(t, 16, encodedKeyTxnCommitTsStart)
require.Equal(t, 24, encodedKeyTxnCommitTsEnd)
require.Equal(t, 32, encodedKeyAttributesOffset)
require.Equal(t, 34, encodedKeyAttributesEnd)

require.Equal(t, expectedKey[:encodedKeyTxnCommitTsEnd],
encodeTxnCommitTsBoundaryKey(uniqueID, tableID, txnCommitTs))
require.Equal(t, expectedKey[:encodedKeyAttributesOffset],
encodeScanLowerBound(uniqueID, tableID, txnCommitTs, txnStartTs))

decodedTxnCommitTs, ok := decodeTxnCommitTsFromEncodedKey(key)
require.True(t, ok)
require.Equal(t, txnCommitTs, decodedTxnCommitTs)

dmlOrder, compressionType := DecodeKeyAttributes(key)
require.Equal(t, DMLOrderInsert, dmlOrder)
require.Equal(t, CompressionZSTD, compressionType)
}

func mustDecodeHex(t *testing.T, s string) []byte {
t.Helper()

b, err := hex.DecodeString(s)
require.NoError(t, err)
return b
}
Loading