Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 172 additions & 22 deletions logservice/eventstore/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package eventstore

import (
"context"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -95,30 +96,116 @@ func (d *gcManager) fetchAllGCItems() []gcRangeItem {
}

func (d *gcManager) run(ctx context.Context) error {
deleteTicker := time.NewTicker(50 * time.Millisecond)
defer deleteTicker.Stop()
compactTicker := time.NewTicker(10 * time.Minute)
defer compactTicker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-deleteTicker.C:
ranges := d.fetchAllGCItems()
if len(ranges) == 0 {
continue
var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()

deleteTicker := time.NewTicker(50 * time.Millisecond)
defer deleteTicker.Stop()

const deleteInfoLogInterval = 10 * time.Minute
lastInfoLog := time.Now()
windowStart := lastInfoLog
var windowBatchCount int
var windowRangeCount int
var windowTotalDuration time.Duration
var windowMaxBatchDuration time.Duration

logDeleteRangeStats := func(now time.Time) {
avgRangesPerBatch := 0.0
if windowBatchCount > 0 {
avgRangesPerBatch = float64(windowRangeCount) / float64(windowBatchCount)
}
avgBatchDuration := time.Duration(0)
if windowBatchCount > 0 {
avgBatchDuration = windowTotalDuration / time.Duration(windowBatchCount)
}
log.Debug("gc manager deleting ranges", zap.Int("rangeCount", len(ranges)))
d.doGCJob(ranges)
d.updateCompactRanges(ranges)
metrics.EventStoreDeleteRangeCount.Add(float64(len(ranges)))
case <-compactTicker.C:
// it seems pebble doesn't compact cold range(no data write),
// so we do a manual compaction periodically.
d.doCompaction()

log.Info("gc manager delete range progress",
zap.Duration("interval", now.Sub(windowStart)),
zap.Int("batchCount", windowBatchCount),
zap.Int("deletedRangeCount", windowRangeCount),
zap.Float64("avgRangesPerBatch", avgRangesPerBatch),
zap.Duration("avgBatchDuration", avgBatchDuration),
zap.Duration("maxBatchDuration", windowMaxBatchDuration))

lastInfoLog = now
windowStart = now
windowBatchCount = 0
windowRangeCount = 0
windowTotalDuration = 0
windowMaxBatchDuration = 0
}
Comment thread
lidezhu marked this conversation as resolved.
}

for {
select {
case <-ctx.Done():
return
case <-deleteTicker.C:
ranges := d.fetchAllGCItems()
if len(ranges) == 0 {
if time.Since(lastInfoLog) >= deleteInfoLogInterval {
logDeleteRangeStats(time.Now())
}
continue
}

metrics.EventStoreDeleteRangeFetchedCount.Add(float64(len(ranges)))

originalRangeCount := len(ranges)
ranges, mergedCount := mergeDeleteRanges(ranges)
if mergedCount > 0 {
log.Debug("gc manager coalesced delete ranges",
zap.Int("fetchedRangeCount", originalRangeCount),
zap.Int("deleteOpCount", len(ranges)))
}

startTime := time.Now()
d.doGCJob(ranges)
d.updateCompactRanges(ranges)
metrics.EventStoreDeleteRangeCount.Add(float64(len(ranges)))

duration := time.Since(startTime)
windowBatchCount++
windowRangeCount += len(ranges)
windowTotalDuration += duration
if duration > windowMaxBatchDuration {
windowMaxBatchDuration = duration
}
log.Debug("gc manager deleted ranges",
zap.Int("batchRangeCount", len(ranges)),
zap.Duration("duration", duration))

if time.Since(lastInfoLog) >= deleteInfoLogInterval {
logDeleteRangeStats(time.Now())
}
}
}
}()

go func() {
defer wg.Done()

compactTicker := time.NewTicker(10 * time.Minute)
defer compactTicker.Stop()

for {
select {
case <-ctx.Done():
return
case <-compactTicker.C:
// it seems pebble doesn't compact cold range(no data write),
// so we do a manual compaction periodically.
d.doCompaction()
}
}
}()

<-ctx.Done()
wg.Wait()
return nil
}

func (d *gcManager) doGCJob(ranges []gcRangeItem) {
Expand All @@ -130,6 +217,65 @@ func (d *gcManager) doGCJob(ranges []gcRangeItem) {
}
}

// mergeDeleteRanges merges delete ranges for the same (dbIndex, uniqueKeyID, tableID) when they are
// contiguous or overlapping. It is used as a best-effort mitigation for rare cases where the delete
// goroutine is blocked for a long time and ranges accumulate.
func mergeDeleteRanges(ranges []gcRangeItem) ([]gcRangeItem, int) {
if len(ranges) < 2 {
return ranges, 0
}

// Common case: at most one range per (dbIndex, uniqueKeyID, tableID), so no merge.
seen := make(map[compactItemKey]struct{}, len(ranges))
hasDuplicateKey := false
for _, r := range ranges {
key := compactItemKey{dbIndex: r.dbIndex, uniqueKeyID: r.uniqueKeyID, tableID: r.tableID}
if _, ok := seen[key]; ok {
hasDuplicateKey = true
break
}
seen[key] = struct{}{}
}
if !hasDuplicateKey {
return ranges, 0
}

sort.Slice(ranges, func(i, j int) bool {
if ranges[i].dbIndex != ranges[j].dbIndex {
return ranges[i].dbIndex < ranges[j].dbIndex
}
if ranges[i].uniqueKeyID != ranges[j].uniqueKeyID {
return ranges[i].uniqueKeyID < ranges[j].uniqueKeyID
}
if ranges[i].tableID != ranges[j].tableID {
return ranges[i].tableID < ranges[j].tableID
}
if ranges[i].startTs != ranges[j].startTs {
return ranges[i].startTs < ranges[j].startTs
}
return ranges[i].endTs < ranges[j].endTs
})

originalCount := len(ranges)
out := ranges[:0]
cur := ranges[0]

for _, r := range ranges[1:] {
sameRangeKey := cur.dbIndex == r.dbIndex && cur.uniqueKeyID == r.uniqueKeyID && cur.tableID == r.tableID
contiguousOrOverlapping := r.startTs <= cur.endTs
if sameRangeKey && contiguousOrOverlapping {
if r.endTs > cur.endTs {
cur.endTs = r.endTs
}
continue
}
out = append(out, cur)
cur = r
}
out = append(out, cur)
return out, originalCount - len(out)
}

func (d *gcManager) updateCompactRanges(ranges []gcRangeItem) {
d.mu.Lock()
defer d.mu.Unlock()
Expand Down Expand Up @@ -158,6 +304,7 @@ func (d *gcManager) doCompaction() {
}
d.mu.Unlock()

startTime := time.Now()
log.Info("gc manager compacting ranges", zap.Int("rangeCount", len(toCompact)))
for key, endTs := range toCompact {
db := d.dbs[key.dbIndex]
Expand All @@ -170,4 +317,7 @@ func (d *gcManager) doCompaction() {
zap.Error(err))
}
}
log.Info("gc manager compacting ranges done",
zap.Int("rangeCount", len(toCompact)),
zap.Duration("duration", time.Since(startTime)))
}
74 changes: 74 additions & 0 deletions logservice/eventstore/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,77 @@ func TestGCManager(t *testing.T) {
require.Len(t, compactCalls, 6)
}
}

func TestMergeDeleteRanges(t *testing.T) {
input := []gcRangeItem{
{dbIndex: 0, uniqueKeyID: 1, tableID: 10, startTs: 200, endTs: 300},
{dbIndex: 0, uniqueKeyID: 1, tableID: 10, startTs: 100, endTs: 200},
{dbIndex: 0, uniqueKeyID: 1, tableID: 10, startTs: 300, endTs: 400},
{dbIndex: 0, uniqueKeyID: 1, tableID: 10, startTs: 350, endTs: 450}, // overlap
{dbIndex: 0, uniqueKeyID: 1, tableID: 20, startTs: 100, endTs: 200}, // different table
{dbIndex: 1, uniqueKeyID: 1, tableID: 10, startTs: 100, endTs: 200}, // different db
{dbIndex: 0, uniqueKeyID: 1, tableID: 10, startTs: 500, endTs: 600}, // gap, should not merge
{dbIndex: 0, uniqueKeyID: 1, tableID: 10, startTs: 600, endTs: 700}, // contiguous with previous
}

merged, mergedCount := mergeDeleteRanges(input)
require.Greater(t, mergedCount, 0)

// Expect:
// - (100, 450] for (0,1,10) due to contiguous and overlap
// - (500, 700] for (0,1,10) due to contiguous (500,600] + (600,700]
// - plus the two unrelated keys
require.Len(t, merged, 4)

var got0100_450 bool
var got0500_700 bool
var gotTable20 bool
var gotDB1 bool
for _, r := range merged {
switch {
case r.dbIndex == 0 && r.uniqueKeyID == 1 && r.tableID == 10 && r.startTs == 100 && r.endTs == 450:
got0100_450 = true
case r.dbIndex == 0 && r.uniqueKeyID == 1 && r.tableID == 10 && r.startTs == 500 && r.endTs == 700:
got0500_700 = true
case r.dbIndex == 0 && r.uniqueKeyID == 1 && r.tableID == 20 && r.startTs == 100 && r.endTs == 200:
gotTable20 = true
case r.dbIndex == 1 && r.uniqueKeyID == 1 && r.tableID == 10 && r.startTs == 100 && r.endTs == 200:
gotDB1 = true
}
}

require.True(t, got0100_450)
require.True(t, got0500_700)
require.True(t, gotTable20)
require.True(t, gotDB1)
}

func TestMergeDeleteRangesNoOverlapNoChange(t *testing.T) {
// Same key but no overlap/contiguous; mergeDeleteRanges should not merge or change any interval.
input := []gcRangeItem{
{dbIndex: 0, uniqueKeyID: 1, tableID: 10, startTs: 100, endTs: 200},
{dbIndex: 0, uniqueKeyID: 1, tableID: 10, startTs: 300, endTs: 400},
{dbIndex: 0, uniqueKeyID: 1, tableID: 20, startTs: 100, endTs: 200},
}
original := append([]gcRangeItem(nil), input...)

out, mergedCount := mergeDeleteRanges(input)
require.Equal(t, 0, mergedCount)
require.ElementsMatch(t, original, out)
}

func TestMergeDeleteRangesFastPathNoDuplicateKey(t *testing.T) {
// No duplicate (dbIndex, uniqueKeyID, tableID) keys; mergeDeleteRanges should early-return and keep order.
input := []gcRangeItem{
{dbIndex: 0, uniqueKeyID: 1, tableID: 10, startTs: 100, endTs: 200},
{dbIndex: 0, uniqueKeyID: 1, tableID: 20, startTs: 300, endTs: 400},
{dbIndex: 0, uniqueKeyID: 2, tableID: 10, startTs: 100, endTs: 200},
{dbIndex: 1, uniqueKeyID: 1, tableID: 10, startTs: 100, endTs: 200},
}
original := append([]gcRangeItem(nil), input...)

out, mergedCount := mergeDeleteRanges(input)
require.Equal(t, 0, mergedCount)
require.Equal(t, original, out)
require.Equal(t, original, input)
}
11 changes: 10 additions & 1 deletion pkg/metrics/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,15 @@ var (
Namespace: "ticdc",
Subsystem: "event_store",
Name: "delete_range_count",
Help: "The number of delete range received by event store.",
Help: "The number of delete range operations executed by event store gc manager (after coalescing).",
})

EventStoreDeleteRangeFetchedCount = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "event_store",
Name: "delete_range_fetched_count",
Help: "The number of delete range items fetched by event store gc manager (before coalescing).",
})

EventStoreSubscriptionResolvedTsLagHist = prometheus.NewHistogram(
Expand Down Expand Up @@ -218,6 +226,7 @@ func initEventStoreMetrics(registry *prometheus.Registry) {
registry.MustRegister(EventStoreScanRequestsCount)
registry.MustRegister(EventStoreScanBytes)
registry.MustRegister(EventStoreDeleteRangeCount)
registry.MustRegister(EventStoreDeleteRangeFetchedCount)
registry.MustRegister(EventStoreSubscriptionResolvedTsLagHist)
registry.MustRegister(EventStoreOnDiskDataSizeGauge)
registry.MustRegister(EventStoreInMemoryDataSizeGauge)
Expand Down