diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 4605fab8f3..4138cb3609 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/pebble" "github.com/klauspost/compress/zstd" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/logpuller" @@ -87,8 +88,8 @@ type EventStore interface { UpdateDispatcherCheckpointTs(dispatcherID common.DispatcherID, checkpointTs uint64) - // GetIterator return an iterator which scan the data in ts range (dataRange.CommitTsStart, dataRange.CommitTsEnd] - GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) EventIterator + // GetIterator returns an iterator which scans data in ts range (dataRange.CommitTsStart, dataRange.CommitTsEnd]. + GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) (EventIterator, error) GetLogCoordinatorNodeID() node.ID } @@ -104,7 +105,8 @@ type EventIterator interface { Next() (*common.RawKVEntry, bool) // Close closes the iterator. - // It returns the number of events that are read from the iterator. + // It returns the number of events that are read from the iterator and any + // accumulated iterator error. Close() (eventCnt int64, err error) } @@ -789,9 +791,9 @@ func (e *eventStore) UpdateDispatcherCheckpointTs( updateSubStatCheckpoint(dispatcherStat.removingSubStat) } -func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) EventIterator { +func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) (EventIterator, error) { if e.closed.Load() { - return nil + return nil, nil } e.dispatcherMeta.RLock() @@ -799,7 +801,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com if !ok { log.Warn("fail to find dispatcher", zap.Stringer("dispatcherID", dispatcherID)) e.dispatcherMeta.RUnlock() - return nil + return nil, nil } tryGetDB := func(subStat *subscriptionStat, force bool) *pebble.DB { @@ -917,17 +919,31 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com start = encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsStart+1) } end := encodeTxnCommitTsBoundaryKey(uint64(subStat.subID), stat.tableSpan.TableID, dataRange.CommitTsEnd+1) - // it's impossible return error here - iter, _ := db.NewIter(&pebble.IterOptions{ + iter, err := db.NewIter(&pebble.IterOptions{ LowerBound: start, UpperBound: end, }) - decoder := e.decoderPool.Get().(*zstd.Decoder) + if err != nil { + return nil, errors.Trace(err) + } + startTime := time.Now() - // todo: what happens if iter.First() returns false? - _ = iter.First() + hasFirstEvent := iter.First() metricEventStoreFirstReadDurationHistogram.Observe(time.Since(startTime).Seconds()) metrics.EventStoreScanRequestsCount.Inc() + if !hasFirstEvent { + // Empty range or first-read error. Close returns any accumulated + // iterator error while also releasing Pebble resources. + closeStartTime := time.Now() + err := iter.Close() + metricEventStoreCloseReadDurationHistogram.Observe(time.Since(closeStartTime).Seconds()) + if err != nil { + return nil, errors.Trace(err) + } + return nil, nil + } + + decoder := e.decoderPool.Get().(*zstd.Decoder) needCheckSpan := true if stat.tableSpan.Equal(subStat.tableSpan) { @@ -945,7 +961,7 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com rowCount: 0, decoder: decoder, decoderPool: e.decoderPool, - } + }, nil } func (e *eventStore) GetLogCoordinatorNodeID() node.ID { @@ -1535,7 +1551,7 @@ func (iter *eventStoreIter) Close() (int64, error) { iter.decoderPool.Put(iter.decoder) iter.innerIter = nil metricEventStoreCloseReadDurationHistogram.Observe(time.Since(startTime).Seconds()) - return iter.rowCount, err + return iter.rowCount, errors.Trace(err) } func (e *eventStore) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) error { diff --git a/logservice/eventstore/event_store_test.go b/logservice/eventstore/event_store_test.go index eb92c90715..5b3a8ac9ec 100644 --- a/logservice/eventstore/event_store_test.go +++ b/logservice/eventstore/event_store_test.go @@ -104,6 +104,15 @@ func newEventStoreForTest(path string) (logpuller.SubscriptionClient, EventStore return subClient, store } +func requireEventIterator( + t testing.TB, store EventStore, dispatcherID common.DispatcherID, dataRange common.DataRange, +) EventIterator { + t.Helper() + iter, err := store.GetIterator(dispatcherID, dataRange) + require.NoError(t, err) + return iter +} + func setDataSharingForTest(t *testing.T, enable bool) func() { t.Helper() originalCfg := config.GetGlobalServerConfig().Clone() @@ -686,6 +695,22 @@ func TestEventStoreSwitchSubStat(t *testing.T) { require.NotNil(t, subStat) subStat.resolvedTs.Store(ts) } + getIterator := func() { + iter, err := store.GetIterator(dispatcherID2, common.DataRange{ + Span: &heartbeatpb.TableSpan{ + TableID: tableID, + StartKey: []byte("b"), + EndKey: []byte("h"), + }, + CommitTsStart: 100, + CommitTsEnd: 150, + }) + require.NoError(t, err) + if iter != nil { + _, err = iter.Close() + require.NoError(t, err) + } + } // ============ prepare two subscriptions ============ // add a dispatcher to create an subscription { @@ -718,33 +743,23 @@ func TestEventStoreSwitchSubStat(t *testing.T) { // case 1: dispatcher 2 use data from subStat 1 updateSubStatResolvedTs(1, 200) { - iter := store.GetIterator(dispatcherID2, common.DataRange{ - Span: &heartbeatpb.TableSpan{ - TableID: tableID, - StartKey: []byte("b"), - EndKey: []byte("h"), - }, - CommitTsStart: 100, - CommitTsEnd: 150, - }) - iterImpl := iter.(*eventStoreIter) - require.True(t, iterImpl.needCheckSpan) + getIterator() + dispatcherStat := store.(*eventStore).dispatcherMeta.dispatcherStats[dispatcherID2] + require.NotNil(t, dispatcherStat) + require.Equal(t, logpuller.SubscriptionID(1), dispatcherStat.subStat.subID) + require.Equal(t, logpuller.SubscriptionID(2), dispatcherStat.pendingSubStat.subID) + require.Nil(t, dispatcherStat.removingSubStat) } // case 2: subStat 2 is ready, dispatcher 2 read data from subStat 2 and stop listen subStat 1 updateSubStatResolvedTs(2, 200) { - iter := store.GetIterator(dispatcherID2, common.DataRange{ - Span: &heartbeatpb.TableSpan{ - TableID: tableID, - StartKey: []byte("b"), - EndKey: []byte("h"), - }, - CommitTsStart: 100, - CommitTsEnd: 150, - }) - iterImpl := iter.(*eventStoreIter) - require.False(t, iterImpl.needCheckSpan) + getIterator() + dispatcherStat := store.(*eventStore).dispatcherMeta.dispatcherStats[dispatcherID2] + require.NotNil(t, dispatcherStat) + require.Equal(t, logpuller.SubscriptionID(2), dispatcherStat.subStat.subID) + require.Nil(t, dispatcherStat.pendingSubStat) + require.Equal(t, logpuller.SubscriptionID(1), dispatcherStat.removingSubStat.subID) } // check dispatcher 2 is no longer receive event from subStat 1 { @@ -762,7 +777,7 @@ func TestEventStoreSwitchSubStat(t *testing.T) { // case 3: subStat 1 advance quicker than subStat 2, dispatcher 2 can still read data from subStat 1 updateSubStatResolvedTs(1, 220) { - iter := store.GetIterator(dispatcherID2, common.DataRange{ + iter, err := store.GetIterator(dispatcherID2, common.DataRange{ Span: &heartbeatpb.TableSpan{ TableID: tableID, StartKey: []byte("b"), @@ -771,8 +786,11 @@ func TestEventStoreSwitchSubStat(t *testing.T) { CommitTsStart: 100, CommitTsEnd: 220, }) - iterImpl := iter.(*eventStoreIter) - require.True(t, iterImpl.needCheckSpan) + require.NoError(t, err) + if iter != nil { + _, err = iter.Close() + require.NoError(t, err) + } } { subStats := store.(*eventStore).dispatcherMeta.tableStats[tableID] @@ -796,7 +814,7 @@ func TestEventStoreSwitchSubStat(t *testing.T) { // dispatcher 2 read data from subStat 2 and totally remove itself from the subsriber list of subStat 1 updateSubStatResolvedTs(2, 220) { - iter := store.GetIterator(dispatcherID2, common.DataRange{ + iter, err := store.GetIterator(dispatcherID2, common.DataRange{ Span: &heartbeatpb.TableSpan{ TableID: tableID, StartKey: []byte("b"), @@ -805,8 +823,11 @@ func TestEventStoreSwitchSubStat(t *testing.T) { CommitTsStart: 100, CommitTsEnd: 220, }) - iterImpl := iter.(*eventStoreIter) - require.False(t, iterImpl.needCheckSpan) + require.NoError(t, err) + if iter != nil { + _, err = iter.Close() + require.NoError(t, err) + } } { subStats := store.(*eventStore).dispatcherMeta.tableStats[tableID] @@ -1135,7 +1156,7 @@ func TestEventStoreGetIteratorConcurrently(t *testing.T) { CommitTsStart: startTs, CommitTsEnd: lastCommitTs + 1, } - iter := store.GetIterator(dispatcherID, dataRange) + iter := requireEventIterator(t, store, dispatcherID, dataRange) require.NotNil(t, iter, "iterator should not be nil") var receivedEvents []*common.RawKVEntry diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 21bdb93641..ac8390e5c8 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -712,7 +712,6 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) { } if err != nil { - log.Error("scan events failed", zap.Stringer("changefeedID", task.changefeedStat.changefeedID), zap.Stringer("dispatcherID", task.id), zap.Int64("tableID", task.info.GetTableSpan().GetTableID()), diff --git a/pkg/eventservice/event_scanner.go b/pkg/eventservice/event_scanner.go index c4540d34be..7b8c712e27 100644 --- a/pkg/eventservice/event_scanner.go +++ b/pkg/eventservice/event_scanner.go @@ -34,7 +34,7 @@ import ( // eventGetter is the interface for getting iterator of events // The implementation of eventGetter is eventstore.EventStore type eventGetter interface { - GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) eventstore.EventIterator + GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) (eventstore.EventIterator, error) } // schemaGetter is the interface for getting schema info and ddl events @@ -65,7 +65,7 @@ type eventScanner struct { // newEventScanner creates a new EventScanner func newEventScanner( - eventStore eventstore.EventStore, + eventStore eventGetter, schemaStore schemastore.SchemaStore, mounter event.Mounter, mode int64, @@ -129,19 +129,33 @@ func (s *eventScanner) scan( } metrics.EventServiceGetDDLEventDuration.Observe(time.Since(start).Seconds()) - iter := s.eventGetter.GetIterator(dispatcherStat.info.GetID(), dataRange) + iter, err := s.eventGetter.GetIterator(dispatcherStat.info.GetID(), dataRange) + if err != nil { + return 0, nil, false, err + } if iter == nil { resolved := event.NewResolvedEvent(dataRange.CommitTsEnd, dispatcherStat.id, dispatcherStat.epoch) events = append(events, resolved) sess.appendEvents(events) return 0, sess.events, false, nil } - defer s.closeIterator(iter) // Execute event scanning and merging merger := newEventMerger(events) - interrupted, err := s.scanAndMergeEvents(sess, merger, iter) - return sess.eventBytes, sess.events, interrupted, err + interrupted, scanErr := s.scanAndMergeEvents(sess, merger, iter) + closeErr := s.closeIterator(iter) + if scanErr != nil { + if closeErr != nil { + log.Warn("event store iterator close returned error after scan error", + zap.Stringer("dispatcherID", dispatcherStat.info.GetID()), + zap.Error(closeErr)) + } + return sess.eventBytes, sess.events, interrupted, scanErr + } + if closeErr != nil { + return 0, nil, false, closeErr + } + return sess.eventBytes, sess.events, interrupted, nil } // fetchDDLEvents retrieves DDL events which finishedTs are within the range (start, end] @@ -172,14 +186,16 @@ func (s *eventScanner) fetchDDLEvents(stat *dispatcherStat, dataRange common.Dat return result, nil } -// closeIterator closes the event iterator and records metrics -func (s *eventScanner) closeIterator(iter eventstore.EventIterator) { - if iter != nil { - eventCount, _ := iter.Close() - if eventCount != 0 { - updateMetricEventStoreOutputKv(s.mode, float64(eventCount)) - } +// closeIterator closes the event iterator and records metrics. +func (s *eventScanner) closeIterator(iter eventstore.EventIterator) error { + if iter == nil { + return nil } + eventCount, err := iter.Close() + if eventCount != 0 { + updateMetricEventStoreOutputKv(s.mode, float64(eventCount)) + } + return err } // scanAndMergeEvents performs the main scanning and merging logic diff --git a/pkg/eventservice/event_scanner_test.go b/pkg/eventservice/event_scanner_test.go index 2f63654d57..7a76adeae9 100644 --- a/pkg/eventservice/event_scanner_test.go +++ b/pkg/eventservice/event_scanner_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/logservice/eventstore" "github.com/pingcap/ticdc/logservice/schemastore" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/common/event" @@ -34,6 +35,17 @@ type mockMounter struct { event.Mounter } +type stubEventGetter struct { + iter eventstore.EventIterator + err error +} + +func (g *stubEventGetter) GetIterator( + dispatcherID common.DispatcherID, dataRange common.DataRange, +) (eventstore.EventIterator, error) { + return g.iter, g.err +} + func makeDispatcherReady(disp *dispatcherStat) { disp.setHandshaked() } @@ -46,6 +58,43 @@ func (m *mockMounter) DecodeToChunk(rawKV *common.RawKVEntry, tableInfo *common. } } +func TestEventScannerReturnsIteratorErrors(t *testing.T) { + disInfo := newMockDispatcherInfoForTest(t) + changefeedStatus := newChangefeedStatus(disInfo.GetChangefeedID(), 0) + disp := newDispatcherStat(disInfo, 1, 1, nil, changefeedStatus) + makeDispatcherReady(disp) + + dataRange := common.DataRange{ + Span: disInfo.GetTableSpan(), + CommitTsStart: disInfo.GetStartTs(), + CommitTsEnd: disInfo.GetStartTs() + 1, + } + + getIterErr := errors.New("get iterator failed") + scanner := newEventScanner( + &stubEventGetter{err: getIterErr}, + NewMockSchemaStore(), + &mockMounter{}, + 0, + ) + _, events, interrupted, err := scanner.scan(context.Background(), disp, dataRange, scanLimit{}) + require.ErrorIs(t, err, getIterErr) + require.Nil(t, events) + require.False(t, interrupted) + + closeErr := errors.New("close iterator failed") + scanner = newEventScanner( + &stubEventGetter{iter: &mockEventIterator{closeErr: closeErr}}, + NewMockSchemaStore(), + &mockMounter{}, + 0, + ) + _, events, interrupted, err = scanner.scan(context.Background(), disp, dataRange, scanLimit{}) + require.ErrorIs(t, err, closeErr) + require.Nil(t, events) + require.False(t, interrupted) +} + func TestEventScanner(t *testing.T) { helper := event.NewEventTestHelper(t) defer helper.Close() diff --git a/pkg/eventservice/event_service_test.go b/pkg/eventservice/event_service_test.go index e8cb8acde0..e58508d10a 100644 --- a/pkg/eventservice/event_service_test.go +++ b/pkg/eventservice/event_service_test.go @@ -252,7 +252,9 @@ func (m *mockEventStore) UnregisterDispatcher(changefeedID common.ChangeFeedID, m.unregisterCount.Add(1) } -func (m *mockEventStore) GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) eventstore.EventIterator { +func (m *mockEventStore) GetIterator( + dispatcherID common.DispatcherID, dataRange common.DataRange, +) (eventstore.EventIterator, error) { span, ok := m.dispatcherMap.Load(dispatcherID) if !ok { log.Panic("dispatcher not found", zap.Stringer("dispatcherID", dispatcherID)) @@ -277,7 +279,7 @@ func (m *mockEventStore) GetIterator(dispatcherID common.DispatcherID, dataRange if len(entries) != 0 { iter = &mockEventIterator{events: entries} } - return iter + return iter, nil } func (m *mockEventStore) GetLogCoordinatorNodeID() node.ID { @@ -312,6 +314,7 @@ type mockEventIterator struct { prevStartTS uint64 prevCommitTS uint64 rowCount int + closeErr error } func (iter *mockEventIterator) Next() (*common.RawKVEntry, bool) { @@ -332,7 +335,7 @@ func (iter *mockEventIterator) Next() (*common.RawKVEntry, bool) { } func (m *mockEventIterator) Close() (int64, error) { - return 0, nil + return int64(m.rowCount), m.closeErr } var _ schemastore.SchemaStore = &mockSchemaStore{}