Skip to content
Draft

[wip] #4946

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,18 +876,16 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com

// convert range before pass it to pebble: (startTs, endTs] is equal to [startTs + 1, endTs + 1)
var start []byte
lowerCRTs := dataRange.CommitTsStart
if dataRange.LastScannedTxnStartTs != 0 {
start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart, dataRange.LastScannedTxnStartTs+1)
} else {
start = EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1)
lowerCRTs = dataRange.CommitTsStart + 1
}
end := EncodeKeyPrefix(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1)
// TODO: optimize read performance
// it's impossible return error here
iter, _ := db.NewIter(&pebble.IterOptions{
LowerBound: start,
UpperBound: end,
})
iter, _ := db.NewIter(newEventStoreIterOptions(start, end, lowerCRTs, dataRange.CommitTsEnd))
decoder := e.decoderPool.Get().(*zstd.Decoder)
startTime := time.Now()
// todo: what happens if iter.First() returns false?
Expand Down
17 changes: 16 additions & 1 deletion logservice/eventstore/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ const (
dmlOrderMask = 0xFF00 // DML order is stored in the high 8 bits for sorting.
compressionMask = 0x00FF // Compression type is stored in the low 8 bits.
dmlOrderShift = 8

uniqueIDSize = 8
tableIDSize = 8
commitTsSize = 8
startTsSize = 8
eventStoreCommitTsOffset = uniqueIDSize + tableIDSize
eventStoreStartTsOffset = eventStoreCommitTsOffset + commitTsSize
eventStoreKeyMetaOffset = eventStoreStartTsOffset + startTsSize
)

// EncodeKeyPrefix encodes uniqueID, tableID, CRTs and StartTs.
Expand Down Expand Up @@ -116,10 +124,17 @@ func EncodeKey(uniqueID uint64, tableID int64, event *common.RawKVEntry, compres

// DecodeKeyMetas decodes compression type and dml order from the key.
func DecodeKeyMetas(key []byte) (DMLOrder, CompressionType) {
combinedOrder := binary.BigEndian.Uint16(key[32:34]) // The combined order is at offset 32 for 2 bytes.
combinedOrder := binary.BigEndian.Uint16(key[eventStoreKeyMetaOffset : eventStoreKeyMetaOffset+2])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The DecodeKeyMetas function is exported and performs a slice access key[eventStoreKeyMetaOffset : eventStoreKeyMetaOffset+2] without checking the length of the input key. While internal callers might guarantee the length, as a public function it should be defensive against shorter keys to avoid panics.

func DecodeKeyMetas(key []byte) (DMLOrder, CompressionType) {
	if len(key) < eventStoreKeyMetaOffset+2 {
		return DMLOrderDelete, CompressionNone
	}
	combinedOrder := binary.BigEndian.Uint16(key[eventStoreKeyMetaOffset : eventStoreKeyMetaOffset+2])

return DMLOrder((combinedOrder & dmlOrderMask) >> dmlOrderShift), CompressionType(combinedOrder & compressionMask)
}

func decodeCRTsFromKey(key []byte) (uint64, bool) {
if len(key) < eventStoreStartTsOffset {
return 0, false
}
return binary.BigEndian.Uint64(key[eventStoreCommitTsOffset:eventStoreStartTsOffset]), true
}

// getDMLOrder returns the order of the dml types: delete<update<insert
func getDMLOrder(rowKV *common.RawKVEntry) DMLOrder {
if rowKV.OpType == common.OpTypeDelete {
Expand Down
84 changes: 84 additions & 0 deletions logservice/eventstore/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,86 @@ const (
cacheSize = 1 << 30 // 1GB
memTableTotalSize = 1 << 30 // 1GB
memTableSize = 64 << 20 // 64MB

minTableCRTsLabel = "minCRTs"
maxTableCRTsLabel = "maxCRTs"
tableCRTsCollectorName = "table-crts-collector"
Comment on lines +36 to +38
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The property labels used here are quite generic (minCRTs, maxCRTs). To avoid potential collisions with other table property collectors that might be added in the future, it is recommended to prefix these labels with the collector name.

Suggested change
minTableCRTsLabel = "minCRTs"
maxTableCRTsLabel = "maxCRTs"
tableCRTsCollectorName = "table-crts-collector"
minTableCRTsLabel = "table-crts-collector.minCRTs"
maxTableCRTsLabel = "table-crts-collector.maxCRTs"
tableCRTsCollectorName = "table-crts-collector"

)

type tableCRTsCollector struct {
minTs uint64
maxTs uint64
hasKey bool
}

func (t *tableCRTsCollector) Add(key pebble.InternalKey, value []byte) error {
crts, ok := decodeCRTsFromKey(key.UserKey)
if !ok {
return nil
}
if crts > t.maxTs {
t.maxTs = crts
}
if crts < t.minTs {
t.minTs = crts
}
t.hasKey = true
return nil
}

func (t *tableCRTsCollector) Finish(userProps map[string]string) error {
if !t.hasKey {
return nil
}
userProps[minTableCRTsLabel] = strconv.FormatUint(t.minTs, 10)
userProps[maxTableCRTsLabel] = strconv.FormatUint(t.maxTs, 10)
return nil
}

func (t *tableCRTsCollector) Name() string {
return tableCRTsCollectorName
}

func newEventStoreIterOptions(
lowerBound []byte,
upperBound []byte,
lowerCRTs uint64,
upperCRTs uint64,
) *pebble.IterOptions {
return &pebble.IterOptions{
LowerBound: lowerBound,
UpperBound: upperBound,
TableFilter: func(userProps map[string]string) bool {
tableMinCRTs, tableMaxCRTs, ok := parseTableCRTs(userProps)
if !ok {
return true
}
return tableMaxCRTs >= lowerCRTs && tableMinCRTs <= upperCRTs
},
UseL6Filters: true,
}
}

func parseTableCRTs(userProps map[string]string) (uint64, uint64, bool) {
minCRTs, ok := userProps[minTableCRTsLabel]
if !ok {
return 0, 0, false
}
maxCRTs, ok := userProps[maxTableCRTsLabel]
if !ok {
return 0, 0, false
}
tableMinCRTs, err := strconv.ParseUint(minCRTs, 10, 64)
if err != nil {
return 0, 0, false
}
tableMaxCRTs, err := strconv.ParseUint(maxCRTs, 10, 64)
if err != nil {
return 0, 0, false
}
return tableMinCRTs, tableMaxCRTs, true
}

func newPebbleOptions(dbNum int) *pebble.Options {
opts := &pebble.Options{
// Disable WAL to decrease io
Expand All @@ -56,6 +134,12 @@ func newPebbleOptions(dbNum int) *pebble.Options {

// Configure options to optimize read/write performance
Levels: make([]pebble.LevelOptions, 7),

TablePropertyCollectors: []func() pebble.TablePropertyCollector{
func() pebble.TablePropertyCollector {
return &tableCRTsCollector{minTs: math.MaxUint64}
},
},
}

for i := 0; i < len(opts.Levels); i++ {
Expand Down
82 changes: 82 additions & 0 deletions logservice/eventstore/pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"math"
"os"
"testing"

Expand All @@ -25,6 +26,87 @@ import (
"github.com/stretchr/testify/require"
)

func TestEventStoreTableCRTsCollector(t *testing.T) {
t.Parallel()

collector := &tableCRTsCollector{minTs: math.MaxUint64}
require.NoError(t, collector.Add(pebble.InternalKey{
UserKey: EncodeKey(1, 1, &common.RawKVEntry{
OpType: common.OpTypePut,
StartTs: 10,
CRTs: 20,
Key: []byte("a"),
}, CompressionNone),
}, nil))
require.NoError(t, collector.Add(pebble.InternalKey{
UserKey: EncodeKey(1, 1, &common.RawKVEntry{
OpType: common.OpTypePut,
StartTs: 30,
CRTs: 40,
Key: []byte("b"),
}, CompressionNone),
}, nil))

userProps := make(map[string]string)
require.NoError(t, collector.Finish(userProps))
require.Equal(t, "20", userProps[minTableCRTsLabel])
require.Equal(t, "40", userProps[maxTableCRTsLabel])
}

func TestEventStoreIteratorWithTableFilter(t *testing.T) {
t.Parallel()

opts := newPebbleOptions(1)
opts.DisableAutomaticCompactions = true
db, err := pebble.Open(t.TempDir(), opts)
require.NoError(t, err)
defer db.Close()

writeKeys := func(maxTableID int64, crts uint64) {
for tableID := int64(1); tableID <= maxTableID; tableID++ {
key := EncodeKey(1, tableID, &common.RawKVEntry{
OpType: common.OpTypePut,
StartTs: 0,
CRTs: crts,
Key: []byte{byte(tableID)},
}, CompressionNone)
require.NoError(t, db.Set(key, []byte{'x'}, pebble.NoSync))
}
require.NoError(t, db.Flush())
}

writeKeys(7, 1)
writeKeys(9, 3)

for _, tc := range []struct {
lowerCRTs uint64
upperCRTs uint64
expected int
}{
{lowerCRTs: 0, upperCRTs: 1, expected: 7},
{lowerCRTs: 1, upperCRTs: 2, expected: 7},
{lowerCRTs: 2, upperCRTs: 3, expected: 9},
{lowerCRTs: 3, upperCRTs: 4, expected: 9},
{lowerCRTs: 0, upperCRTs: 10, expected: 16},
{lowerCRTs: 10, upperCRTs: 20, expected: 0},
} {
t.Run(fmt.Sprintf("%d-%d", tc.lowerCRTs, tc.upperCRTs), func(t *testing.T) {
count := 0
for tableID := int64(0); tableID <= 9; tableID++ {
start := EncodeKeyPrefix(1, tableID, tc.lowerCRTs)
end := EncodeKeyPrefix(1, tableID, tc.upperCRTs+1)
iter, err := db.NewIter(newEventStoreIterOptions(start, end, tc.lowerCRTs, tc.upperCRTs))
require.NoError(t, err)
for iter.First(); iter.Valid(); iter.Next() {
count++
}
require.NoError(t, iter.Close())
}
require.Equal(t, tc.expected, count)
})
}
}

func TestWriteAndReadRawKVEntry(t *testing.T) {
dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name())
os.RemoveAll(dbPath)
Expand Down
Loading