Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/cockroachdb/pebble"
"github.com/klauspost/compress/zstd"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/logservice/logpuller"
Expand Down Expand Up @@ -87,8 +88,8 @@ type EventStore interface {

UpdateDispatcherCheckpointTs(dispatcherID common.DispatcherID, checkpointTs uint64)

// GetIterator return an iterator which scan the data in ts range (dataRange.CommitTsStart, dataRange.CommitTsEnd]
GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) EventIterator
// GetIterator returns an iterator which scans data in ts range (dataRange.CommitTsStart, dataRange.CommitTsEnd].
GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) (EventIterator, error)

GetLogCoordinatorNodeID() node.ID
}
Expand All @@ -104,7 +105,8 @@ type EventIterator interface {
Next() (*common.RawKVEntry, bool)

// Close closes the iterator.
// It returns the number of events that are read from the iterator.
// It returns the number of events that are read from the iterator and any
// accumulated iterator error.
Close() (eventCnt int64, err error)
}

Expand Down Expand Up @@ -789,17 +791,17 @@ func (e *eventStore) UpdateDispatcherCheckpointTs(
updateSubStatCheckpoint(dispatcherStat.removingSubStat)
}

func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) EventIterator {
func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) (EventIterator, error) {
if e.closed.Load() {
return nil
return nil, nil
}

e.dispatcherMeta.RLock()
stat, ok := e.dispatcherMeta.dispatcherStats[dispatcherID]
if !ok {
log.Warn("fail to find dispatcher", zap.Stringer("dispatcherID", dispatcherID))
e.dispatcherMeta.RUnlock()
return nil
return nil, nil
}

tryGetDB := func(subStat *subscriptionStat, force bool) *pebble.DB {
Expand Down Expand Up @@ -917,17 +919,31 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com
start = encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1)
}
end := encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1)
// it's impossible return error here
iter, _ := db.NewIter(&pebble.IterOptions{
iter, err := db.NewIter(&pebble.IterOptions{
LowerBound: start,
UpperBound: end,
})
decoder := e.decoderPool.Get().(*zstd.Decoder)
if err != nil {
return nil, errors.Trace(err)
}

startTime := time.Now()
// todo: what happens if iter.First() returns false?
_ = iter.First()
hasFirstEvent := iter.First()
metricEventStoreFirstReadDurationHistogram.Observe(time.Since(startTime).Seconds())
metrics.EventStoreScanRequestsCount.Inc()
if !hasFirstEvent {
// Empty range or first-read error. Close returns any accumulated
// iterator error while also releasing Pebble resources.
closeStartTime := time.Now()
err := iter.Close()
metricEventStoreCloseReadDurationHistogram.Observe(time.Since(closeStartTime).Seconds())
if err != nil {
return nil, errors.Trace(err)
}
return nil, nil
}

decoder := e.decoderPool.Get().(*zstd.Decoder)

needCheckSpan := true
if stat.tableSpan.Equal(subStat.tableSpan) {
Expand All @@ -945,7 +961,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com
rowCount: 0,
decoder: decoder,
decoderPool: e.decoderPool,
}
}, nil
}

func (e *eventStore) GetLogCoordinatorNodeID() node.ID {
Expand Down Expand Up @@ -1535,7 +1551,7 @@ func (iter *eventStoreIter) Close() (int64, error) {
iter.decoderPool.Put(iter.decoder)
iter.innerIter = nil
metricEventStoreCloseReadDurationHistogram.Observe(time.Since(startTime).Seconds())
return iter.rowCount, err
return iter.rowCount, errors.Trace(err)
}

func (e *eventStore) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) error {
Expand Down
79 changes: 50 additions & 29 deletions logservice/eventstore/event_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ func newEventStoreForTest(path string) (logpuller.SubscriptionClient, EventStore
return subClient, store
}

func requireEventIterator(
t testing.TB, store EventStore, dispatcherID common.DispatcherID, dataRange common.DataRange,
) EventIterator {
t.Helper()
iter, err := store.GetIterator(dispatcherID, dataRange)
require.NoError(t, err)
return iter
}

func setDataSharingForTest(t *testing.T, enable bool) func() {
t.Helper()
originalCfg := config.GetGlobalServerConfig().Clone()
Expand Down Expand Up @@ -686,6 +695,22 @@ func TestEventStoreSwitchSubStat(t *testing.T) {
require.NotNil(t, subStat)
subStat.resolvedTs.Store(ts)
}
getIterator := func() {
iter, err := store.GetIterator(dispatcherID2, common.DataRange{
Span: &heartbeatpb.TableSpan{
TableID: tableID,
StartKey: []byte("b"),
EndKey: []byte("h"),
},
CommitTsStart: 100,
CommitTsEnd: 150,
})
require.NoError(t, err)
if iter != nil {
_, err = iter.Close()
require.NoError(t, err)
}
}
// ============ prepare two subscriptions ============
// add a dispatcher to create an subscription
{
Expand Down Expand Up @@ -718,33 +743,23 @@ func TestEventStoreSwitchSubStat(t *testing.T) {
// case 1: dispatcher 2 use data from subStat 1
updateSubStatResolvedTs(1, 200)
{
iter := store.GetIterator(dispatcherID2, common.DataRange{
Span: &heartbeatpb.TableSpan{
TableID: tableID,
StartKey: []byte("b"),
EndKey: []byte("h"),
},
CommitTsStart: 100,
CommitTsEnd: 150,
})
iterImpl := iter.(*eventStoreIter)
require.True(t, iterImpl.needCheckSpan)
getIterator()
dispatcherStat := store.(*eventStore).dispatcherMeta.dispatcherStats[dispatcherID2]
require.NotNil(t, dispatcherStat)
require.Equal(t, logpuller.SubscriptionID(1), dispatcherStat.subStat.subID)
require.Equal(t, logpuller.SubscriptionID(2), dispatcherStat.pendingSubStat.subID)
require.Nil(t, dispatcherStat.removingSubStat)
}

// case 2: subStat 2 is ready, dispatcher 2 read data from subStat 2 and stop listen subStat 1
updateSubStatResolvedTs(2, 200)
{
iter := store.GetIterator(dispatcherID2, common.DataRange{
Span: &heartbeatpb.TableSpan{
TableID: tableID,
StartKey: []byte("b"),
EndKey: []byte("h"),
},
CommitTsStart: 100,
CommitTsEnd: 150,
})
iterImpl := iter.(*eventStoreIter)
require.False(t, iterImpl.needCheckSpan)
getIterator()
dispatcherStat := store.(*eventStore).dispatcherMeta.dispatcherStats[dispatcherID2]
require.NotNil(t, dispatcherStat)
require.Equal(t, logpuller.SubscriptionID(2), dispatcherStat.subStat.subID)
require.Nil(t, dispatcherStat.pendingSubStat)
require.Equal(t, logpuller.SubscriptionID(1), dispatcherStat.removingSubStat.subID)
}
// check dispatcher 2 is no longer receive event from subStat 1
{
Expand All @@ -762,7 +777,7 @@ func TestEventStoreSwitchSubStat(t *testing.T) {
// case 3: subStat 1 advance quicker than subStat 2, dispatcher 2 can still read data from subStat 1
updateSubStatResolvedTs(1, 220)
{
iter := store.GetIterator(dispatcherID2, common.DataRange{
iter, err := store.GetIterator(dispatcherID2, common.DataRange{
Span: &heartbeatpb.TableSpan{
TableID: tableID,
StartKey: []byte("b"),
Expand All @@ -771,8 +786,11 @@ func TestEventStoreSwitchSubStat(t *testing.T) {
CommitTsStart: 100,
CommitTsEnd: 220,
})
iterImpl := iter.(*eventStoreIter)
require.True(t, iterImpl.needCheckSpan)
require.NoError(t, err)
if iter != nil {
_, err = iter.Close()
require.NoError(t, err)
}
}
{
subStats := store.(*eventStore).dispatcherMeta.tableStats[tableID]
Expand All @@ -796,7 +814,7 @@ func TestEventStoreSwitchSubStat(t *testing.T) {
// dispatcher 2 read data from subStat 2 and totally remove itself from the subsriber list of subStat 1
updateSubStatResolvedTs(2, 220)
{
iter := store.GetIterator(dispatcherID2, common.DataRange{
iter, err := store.GetIterator(dispatcherID2, common.DataRange{
Span: &heartbeatpb.TableSpan{
TableID: tableID,
StartKey: []byte("b"),
Expand All @@ -805,8 +823,11 @@ func TestEventStoreSwitchSubStat(t *testing.T) {
CommitTsStart: 100,
CommitTsEnd: 220,
})
iterImpl := iter.(*eventStoreIter)
require.False(t, iterImpl.needCheckSpan)
require.NoError(t, err)
if iter != nil {
_, err = iter.Close()
require.NoError(t, err)
}
}
{
subStats := store.(*eventStore).dispatcherMeta.tableStats[tableID]
Expand Down Expand Up @@ -1135,7 +1156,7 @@ func TestEventStoreGetIteratorConcurrently(t *testing.T) {
CommitTsStart: startTs,
CommitTsEnd: lastCommitTs + 1,
}
iter := store.GetIterator(dispatcherID, dataRange)
iter := requireEventIterator(t, store, dispatcherID, dataRange)
require.NotNil(t, iter, "iterator should not be nil")

var receivedEvents []*common.RawKVEntry
Expand Down
1 change: 0 additions & 1 deletion pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,6 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) {
}

if err != nil {

log.Error("scan events failed",
zap.Stringer("changefeedID", task.changefeedStat.changefeedID),
zap.Stringer("dispatcherID", task.id), zap.Int64("tableID", task.info.GetTableSpan().GetTableID()),
Expand Down
42 changes: 29 additions & 13 deletions pkg/eventservice/event_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
// eventGetter is the interface for getting iterator of events
// The implementation of eventGetter is eventstore.EventStore
type eventGetter interface {
GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) eventstore.EventIterator
GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) (eventstore.EventIterator, error)
}

// schemaGetter is the interface for getting schema info and ddl events
Expand Down Expand Up @@ -65,7 +65,7 @@ type eventScanner struct {

// newEventScanner creates a new EventScanner
func newEventScanner(
eventStore eventstore.EventStore,
eventStore eventGetter,
schemaStore schemastore.SchemaStore,
mounter event.Mounter,
mode int64,
Expand Down Expand Up @@ -129,19 +129,33 @@ func (s *eventScanner) scan(
}
metrics.EventServiceGetDDLEventDuration.Observe(time.Since(start).Seconds())

iter := s.eventGetter.GetIterator(dispatcherStat.info.GetID(), dataRange)
iter, err := s.eventGetter.GetIterator(dispatcherStat.info.GetID(), dataRange)
if err != nil {
return 0, nil, false, err
}
if iter == nil {
resolved := event.NewResolvedEvent(dataRange.CommitTsEnd, dispatcherStat.id, dispatcherStat.epoch)
events = append(events, resolved)
sess.appendEvents(events)
return 0, sess.events, false, nil
}
defer s.closeIterator(iter)

// Execute event scanning and merging
merger := newEventMerger(events)
interrupted, err := s.scanAndMergeEvents(sess, merger, iter)
return sess.eventBytes, sess.events, interrupted, err
interrupted, scanErr := s.scanAndMergeEvents(sess, merger, iter)
closeErr := s.closeIterator(iter)
if scanErr != nil {
if closeErr != nil {
log.Warn("event store iterator close returned error after scan error",
zap.Stringer("dispatcherID", dispatcherStat.info.GetID()),
zap.Error(closeErr))
}
return sess.eventBytes, sess.events, interrupted, scanErr
}
if closeErr != nil {
return 0, nil, false, closeErr
}
return sess.eventBytes, sess.events, interrupted, nil
}

// fetchDDLEvents retrieves DDL events which finishedTs are within the range (start, end]
Expand Down Expand Up @@ -172,14 +186,16 @@ func (s *eventScanner) fetchDDLEvents(stat *dispatcherStat, dataRange common.Dat
return result, nil
}

// closeIterator closes the event iterator and records metrics
func (s *eventScanner) closeIterator(iter eventstore.EventIterator) {
if iter != nil {
eventCount, _ := iter.Close()
if eventCount != 0 {
updateMetricEventStoreOutputKv(s.mode, float64(eventCount))
}
// closeIterator closes the event iterator and records metrics.
func (s *eventScanner) closeIterator(iter eventstore.EventIterator) error {
if iter == nil {
return nil
}
eventCount, err := iter.Close()
if eventCount != 0 {
updateMetricEventStoreOutputKv(s.mode, float64(eventCount))
}
return err
}

// scanAndMergeEvents performs the main scanning and merging logic
Expand Down
Loading
Loading