From 9fe5b48d0668cfc0cde60b9b8c79531e0df744d1 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 7 May 2026 17:33:59 +0800 Subject: [PATCH 1/4] eventcollector: decouple dispatcher session from dispatcher stat --- .../eventcollector/dispatcher_session.go | 298 ++++++++++++------ .../eventcollector/dispatcher_stat.go | 112 +++---- .../eventcollector/dispatcher_stat_test.go | 29 +- 3 files changed, 259 insertions(+), 180 deletions(-) diff --git a/downstreamadapter/eventcollector/dispatcher_session.go b/downstreamadapter/eventcollector/dispatcher_session.go index a4b9c70afa..917b4b62a8 100644 --- a/downstreamadapter/eventcollector/dispatcher_session.go +++ b/downstreamadapter/eventcollector/dispatcher_session.go @@ -19,6 +19,8 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/downstreamadapter/dispatcher" + "github.com/pingcap/ticdc/downstreamadapter/syncpoint" + "github.com/pingcap/ticdc/eventpb" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/messaging" "github.com/pingcap/ticdc/pkg/node" @@ -103,22 +105,40 @@ func (d *dispatcherConnState) clearRemoteCandidates() { } type dispatcherSession struct { - owner *dispatcherStat + // target provides dispatcher metadata and progress used in requests sent to EventService. + target dispatcher.DispatcherService + // localServerID identifies the collector side when talking to EventService. + localServerID node.ID + // sendMessage sends EventService requests generated by this session. + sendMessage func(*messaging.TargetMessage) + // nextResetEpoch advances the dispatcher's epoch and returns the new value. + nextResetEpoch func(resetTs uint64) uint64 + // readyCallback is only set during the initial local registration path. readyCallback func() + // connState tracks which EventService this session is currently talking to. connState dispatcherConnState } -func newDispatcherSession(owner *dispatcherStat, readyCallback func()) *dispatcherSession { +func newDispatcherSession( + target dispatcher.DispatcherService, + localServerID node.ID, + sendMessage func(*messaging.TargetMessage), + nextResetEpoch func(resetTs uint64) uint64, + readyCallback func(), +) *dispatcherSession { return &dispatcherSession{ - owner: owner, - readyCallback: readyCallback, + target: target, + localServerID: localServerID, + sendMessage: sendMessage, + nextResetEpoch: nextResetEpoch, + readyCallback: readyCallback, } } func (s *dispatcherSession) clear() { // TODO: this design is bad because we may receive stale heartbeat response, - // which make us call clear and register again. But the register may be ignore, + // which makes us call clear and register again. But the register may be ignored, // so we will not receive any ready event. s.connState.clear() } @@ -128,46 +148,38 @@ func (s *dispatcherSession) registerTo(serverID node.ID) { // `onlyReuse` is used to control the register behavior at logservice side // it should be set to `false` when register to a local event service, // and set to `true` when register to a remote event service. - onlyReuse := serverID != s.owner.eventCollector.getLocalServerID() + onlyReuse := serverID != s.localServerID msg := messaging.NewSingleTargetMessage( serverID, messaging.EventServiceTopic, - s.owner.newDispatcherRegisterRequest(s.owner.eventCollector.getLocalServerID().String(), onlyReuse), + s.newDispatcherRegisterRequest(s.localServerID.String(), onlyReuse), ) - s.owner.eventCollector.enqueueMessageForSend(msg) + s.sendMessage(msg) } // commitReady is used to notify the event service to start sending events. func (s *dispatcherSession) commitReady(serverID node.ID) { - s.doReset(serverID, s.owner.getResetTs()) + s.doReset(serverID, s.target.GetCheckpointTs()) } // reset is used to reset the dispatcher to the specified commitTs, // it will remove the dispatcher from the dynamic stream and add it back. func (s *dispatcherSession) reset(serverID node.ID) { - s.doReset(serverID, s.owner.getResetTs()) + s.doReset(serverID, s.target.GetCheckpointTs()) } func (s *dispatcherSession) doReset(serverID node.ID, resetTs uint64) { - var epoch uint64 - for { - currentState := s.owner.loadCurrentEpochState() - nextState := newDispatcherEpochState(currentState.epoch+1, 0, resetTs) - if s.owner.currentEpoch.CompareAndSwap(currentState, nextState) { - epoch = nextState.epoch - break - } - } - resetRequest := s.owner.newDispatcherResetRequest( - s.owner.eventCollector.getLocalServerID().String(), + epoch := s.nextResetEpoch(resetTs) + resetRequest := s.newDispatcherResetRequest( + s.localServerID.String(), resetTs, epoch, ) msg := messaging.NewSingleTargetMessage(serverID, messaging.EventServiceTopic, resetRequest) - s.owner.eventCollector.enqueueMessageForSend(msg) + s.sendMessage(msg) log.Info("send reset dispatcher request to event service", - zap.Stringer("changefeedID", s.owner.target.GetChangefeedID()), - zap.Stringer("dispatcher", s.owner.getDispatcherID()), + zap.Stringer("changefeedID", s.target.GetChangefeedID()), + zap.Stringer("dispatcher", s.target.GetId()), zap.Stringer("eventServiceID", serverID), zap.Uint64("epoch", epoch), zap.Uint64("resetTs", resetTs)) @@ -175,9 +187,9 @@ func (s *dispatcherSession) doReset(serverID node.ID, resetTs uint64) { // remove is used to remove the dispatcher from the event service. func (s *dispatcherSession) remove() { - s.removeFrom(s.owner.eventCollector.getLocalServerID()) + s.removeFrom(s.localServerID) eventServiceID := s.getEventServiceID() - if eventServiceID != "" && eventServiceID != s.owner.eventCollector.getLocalServerID() { + if eventServiceID != "" && eventServiceID != s.localServerID { s.removeFrom(eventServiceID) } } @@ -185,102 +197,192 @@ func (s *dispatcherSession) remove() { // removeFrom is used to remove the dispatcher from the specified event service. func (s *dispatcherSession) removeFrom(serverID node.ID) { log.Info("send remove dispatcher request to event service", - zap.Stringer("changefeedID", s.owner.target.GetChangefeedID()), - zap.Stringer("dispatcher", s.owner.getDispatcherID()), + zap.Stringer("changefeedID", s.target.GetChangefeedID()), + zap.Stringer("dispatcher", s.target.GetId()), zap.Stringer("eventServiceID", serverID)) msg := messaging.NewSingleTargetMessage( serverID, messaging.EventServiceTopic, - s.owner.newDispatcherRemoveRequest(s.owner.eventCollector.getLocalServerID().String()), + s.newDispatcherRemoveRequest(s.localServerID.String()), ) - s.owner.eventCollector.enqueueMessageForSend(msg) + s.sendMessage(msg) } // "signalEvent" refers to the types of events that may modify the event service with which this dispatcher communicates. // "signalEvent" includes TypeReadyEvent/TypeNotReusableEvent func (s *dispatcherSession) handleSignalEvent(event dispatcher.DispatcherEvent) { - localServerID := s.owner.eventCollector.getLocalServerID() - switch event.GetType() { case commonEvent.TypeReadyEvent: - // if the dispatcher has received ready signal from local event service, - // ignore all types of signal events. - if s.isCurrentEventService(localServerID) { - // If we receive a ready event from a remote service while connected to the local - // service, it implies a stale registration. Send a remove request to clean it up. - if event.From != nil && *event.From != localServerID { - s.removeFrom(*event.From) - } - return - } - - // if the event is neither from local event service nor from the current event service, ignore it. - if *event.From != localServerID && !s.isCurrentEventService(*event.From) { - return - } - - if *event.From == localServerID { - if s.readyCallback != nil { - // If readyCallback is set, this dispatcher is performing its initial - // registration with the local event service. Therefore, no deregistration - // from a previous service is necessary. - s.connState.setEventServiceID(localServerID) - s.connState.readyEventReceived.Store(true) - s.readyCallback() - return - } - // note: this must be the first ready event from local event service - oldEventServiceID := s.getEventServiceID() - if oldEventServiceID != "" { - s.removeFrom(oldEventServiceID) - } - log.Info("received ready signal from local event service, prepare to reset the dispatcher", - zap.Stringer("changefeedID", s.owner.target.GetChangefeedID()), - zap.Stringer("dispatcher", s.owner.getDispatcherID())) - - s.connState.setEventServiceID(localServerID) - s.connState.readyEventReceived.Store(true) - s.connState.clearRemoteCandidates() - s.commitReady(localServerID) - } else { - // note: this ready event must be from a remote event service which the dispatcher is trying to register to. - // TODO: if receive too much redudant ready events from remote service, we may need reset again? - if s.connState.readyEventReceived.Load() { - log.Info("received ready signal from the same server again, ignore it", - zap.Stringer("changefeedID", s.owner.target.GetChangefeedID()), - zap.Stringer("dispatcher", s.owner.getDispatcherID()), - zap.Stringer("eventServiceID", *event.From)) - return - } - log.Info("received ready signal from remote event service, prepare to reset the dispatcher", - zap.Stringer("changefeedID", s.owner.target.GetChangefeedID()), - zap.Stringer("dispatcher", s.owner.getDispatcherID()), - zap.Stringer("eventServiceID", *event.From)) - s.connState.readyEventReceived.Store(true) - s.commitReady(*event.From) - } + s.handleReadyEvent(event) case commonEvent.TypeNotReusableEvent: - if *event.From == localServerID { - log.Panic("should not happen: local event service should not send not reusable event") - } - candidate := s.connState.getNextRemoteCandidate() - if candidate != "" { - s.registerTo(candidate) - } + s.handleNotReusableEvent(event) default: log.Panic("should not happen: unknown signal event type", zap.Int("eventType", event.GetType())) } } +func (s *dispatcherSession) handleReadyEvent(event dispatcher.DispatcherEvent) { + // if the dispatcher has received ready signal from local event service, + // ignore all types of signal events. + if s.isCurrentEventService(s.localServerID) { + // If we receive a ready event from a remote service while connected to the local + // service, it implies a stale registration. Send a remove request to clean it up. + if event.From != nil && *event.From != s.localServerID { + s.removeFrom(*event.From) + } + return + } + + // if the event is neither from local event service nor from the current event service, ignore it. + if *event.From != s.localServerID && !s.isCurrentEventService(*event.From) { + return + } + + if *event.From == s.localServerID { + s.handleLocalReadyEvent() + return + } + s.handleRemoteReadyEvent(*event.From) +} + +func (s *dispatcherSession) handleLocalReadyEvent() { + if s.readyCallback != nil { + // If readyCallback is set, this dispatcher is performing its initial + // registration with the local event service. Therefore, no deregistration + // from a previous service is necessary. + s.connState.setEventServiceID(s.localServerID) + s.connState.readyEventReceived.Store(true) + s.readyCallback() + return + } + + // note: this must be the first ready event from local event service + oldEventServiceID := s.getEventServiceID() + if oldEventServiceID != "" { + s.removeFrom(oldEventServiceID) + } + log.Info("received ready signal from local event service, prepare to reset the dispatcher", + zap.Stringer("changefeedID", s.target.GetChangefeedID()), + zap.Stringer("dispatcher", s.target.GetId())) + + s.connState.setEventServiceID(s.localServerID) + s.connState.readyEventReceived.Store(true) + s.connState.clearRemoteCandidates() + s.commitReady(s.localServerID) +} + +func (s *dispatcherSession) handleRemoteReadyEvent(serverID node.ID) { + // note: this ready event must be from a remote event service which the dispatcher is trying to register to. + // TODO: if we receive too many redundant ready events from a remote service, we may need to reset again. + if s.connState.readyEventReceived.Load() { + log.Info("received ready signal from the same server again, ignore it", + zap.Stringer("changefeedID", s.target.GetChangefeedID()), + zap.Stringer("dispatcher", s.target.GetId()), + zap.Stringer("eventServiceID", serverID)) + return + } + log.Info("received ready signal from remote event service, prepare to reset the dispatcher", + zap.Stringer("changefeedID", s.target.GetChangefeedID()), + zap.Stringer("dispatcher", s.target.GetId()), + zap.Stringer("eventServiceID", serverID)) + s.connState.readyEventReceived.Store(true) + s.commitReady(serverID) +} + +func (s *dispatcherSession) handleNotReusableEvent(event dispatcher.DispatcherEvent) { + if *event.From == s.localServerID { + log.Panic("should not happen: local event service should not send not reusable event") + } + candidate := s.connState.getNextRemoteCandidate() + if candidate != "" { + s.registerTo(candidate) + } +} + +func (s *dispatcherSession) newDispatcherRegisterRequest(serverID string, onlyReuse bool) *messaging.DispatcherRequest { + startTs := s.target.GetStartTs() + syncPointInterval := s.target.GetSyncPointInterval() + return &messaging.DispatcherRequest{ + DispatcherRequest: &eventpb.DispatcherRequest{ + ChangefeedId: s.target.GetChangefeedID().ToPB(), + DispatcherId: s.target.GetId().ToPB(), + TableSpan: s.target.GetTableSpan(), + StartTs: startTs, + // ServerId is the id of the request sender. + ServerId: serverID, + ActionType: eventpb.ActionType_ACTION_TYPE_REGISTER, + FilterConfig: s.target.GetFilterConfig(), + EnableSyncPoint: s.target.EnableSyncPoint(), + SyncPointInterval: uint64(syncPointInterval.Seconds()), + SyncPointTs: syncpoint.CalculateStartSyncPointTs(startTs, syncPointInterval, s.target.GetSkipSyncpointAtStartTs()), + OnlyReuse: onlyReuse, + BdrMode: s.target.GetBDRMode(), + Mode: s.target.GetMode(), + Epoch: 0, + Timezone: s.target.GetTimezone(), + Integrity: s.target.GetIntegrityConfig(), + OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(), + TxnAtomicity: string(s.target.GetTxnAtomicity()), + }, + } +} + +func (s *dispatcherSession) newDispatcherResetRequest(serverID string, resetTs uint64, epoch uint64) *messaging.DispatcherRequest { + syncPointInterval := s.target.GetSyncPointInterval() + + // After reset during normal run time, the event collector can filter redundant sync points, + // so only the case where resetTs is the same as startTs needs special handling. + skipSyncpointSameAsResetTs := false + if resetTs == s.target.GetStartTs() { + skipSyncpointSameAsResetTs = s.target.GetSkipSyncpointAtStartTs() + } + return &messaging.DispatcherRequest{ + DispatcherRequest: &eventpb.DispatcherRequest{ + ChangefeedId: s.target.GetChangefeedID().ToPB(), + DispatcherId: s.target.GetId().ToPB(), + TableSpan: s.target.GetTableSpan(), + StartTs: resetTs, + // ServerId is the id of the request sender. + ServerId: serverID, + ActionType: eventpb.ActionType_ACTION_TYPE_RESET, + FilterConfig: s.target.GetFilterConfig(), + EnableSyncPoint: s.target.EnableSyncPoint(), + SyncPointInterval: uint64(syncPointInterval.Seconds()), + SyncPointTs: syncpoint.CalculateStartSyncPointTs(resetTs, syncPointInterval, skipSyncpointSameAsResetTs), + // OnlyReuse: false, + BdrMode: s.target.GetBDRMode(), + Mode: s.target.GetMode(), + Epoch: epoch, + Timezone: s.target.GetTimezone(), + Integrity: s.target.GetIntegrityConfig(), + OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(), + }, + } +} + +func (s *dispatcherSession) newDispatcherRemoveRequest(serverID string) *messaging.DispatcherRequest { + return &messaging.DispatcherRequest{ + DispatcherRequest: &eventpb.DispatcherRequest{ + ChangefeedId: s.target.GetChangefeedID().ToPB(), + DispatcherId: s.target.GetId().ToPB(), + TableSpan: s.target.GetTableSpan(), + // ServerId is the id of the request sender. + ServerId: serverID, + ActionType: eventpb.ActionType_ACTION_TYPE_REMOVE, + Mode: s.target.GetMode(), + }, + } +} + func (s *dispatcherSession) setRemoteCandidates(nodes []string) { if len(nodes) == 0 { return } if s.connState.trySetRemoteCandidates(nodes) { log.Info("set remote candidates", - zap.Stringer("changefeedID", s.owner.target.GetChangefeedID()), - zap.Stringer("dispatcherID", s.owner.getDispatcherID()), - zap.Int64("tableID", s.owner.target.GetTableSpan().TableID), + zap.Stringer("changefeedID", s.target.GetChangefeedID()), + zap.Stringer("dispatcherID", s.target.GetId()), + zap.Int64("tableID", s.target.GetTableSpan().TableID), zap.Strings("nodes", nodes)) candidate := s.connState.getNextRemoteCandidate() s.registerTo(candidate) diff --git a/downstreamadapter/eventcollector/dispatcher_stat.go b/downstreamadapter/eventcollector/dispatcher_stat.go index 4ec845debf..7fc73fc617 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat.go +++ b/downstreamadapter/eventcollector/dispatcher_stat.go @@ -18,8 +18,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/ticdc/downstreamadapter/dispatcher" - "github.com/pingcap/ticdc/downstreamadapter/syncpoint" - "github.com/pingcap/ticdc/eventpb" "github.com/pingcap/ticdc/pkg/common" commonEvent "github.com/pingcap/ticdc/pkg/common/event" "github.com/pingcap/ticdc/pkg/messaging" @@ -79,14 +77,41 @@ func newDispatcherStat( target dispatcher.DispatcherService, eventCollector *EventCollector, readyCallback func(), +) *dispatcherStat { + if eventCollector == nil { + log.Panic("event collector must not be nil when creating dispatcher stat", + zap.Stringer("changefeedID", target.GetChangefeedID()), + zap.Stringer("dispatcher", target.GetId())) + } + return newDispatcherStatInternal( + target, + eventCollector, + eventCollector.getLocalServerID(), + eventCollector.enqueueMessageForSend, + readyCallback, + ) +} + +func newDispatcherStatInternal( + target dispatcher.DispatcherService, + eventCollector *EventCollector, + localServerID node.ID, + sendMessage func(*messaging.TargetMessage), + readyCallback func(), ) *dispatcherStat { stat := &dispatcherStat{ target: target, eventCollector: eventCollector, } - stat.session = newDispatcherSession(stat, readyCallback) stat.currentEpoch.Store(newDispatcherEpochState(0, 0, target.GetStartTs())) stat.lastEventCommitTs.Store(target.GetStartTs()) + stat.session = newDispatcherSession( + target, + localServerID, + sendMessage, + stat.nextResetEpoch, + readyCallback, + ) return stat } @@ -128,10 +153,14 @@ func (d *dispatcherStat) doReset(serverID node.ID, resetTs uint64) { d.session.doReset(serverID, resetTs) } -// getResetTs is used to get the resetTs of the dispatcher. -// resetTs must be larger than the startTs, otherwise it will cause panic in eventStore. -func (d *dispatcherStat) getResetTs() uint64 { - return d.target.GetCheckpointTs() +func (d *dispatcherStat) nextResetEpoch(resetTs uint64) uint64 { + for { + currentState := d.loadCurrentEpochState() + nextState := newDispatcherEpochState(currentState.epoch+1, 0, resetTs) + if d.currentEpoch.CompareAndSwap(currentState, nextState) { + return nextState.epoch + } + } } // remove is used to remove the dispatcher from the event service. @@ -558,76 +587,13 @@ func (d *dispatcherStat) isReceivingDataEvent() bool { } func (d *dispatcherStat) newDispatcherRegisterRequest(serverId string, onlyReuse bool) *messaging.DispatcherRequest { - startTs := d.target.GetStartTs() - syncPointInterval := d.target.GetSyncPointInterval() - return &messaging.DispatcherRequest{ - DispatcherRequest: &eventpb.DispatcherRequest{ - ChangefeedId: d.target.GetChangefeedID().ToPB(), - DispatcherId: d.target.GetId().ToPB(), - TableSpan: d.target.GetTableSpan(), - StartTs: startTs, - // ServerId is the id of the request sender. - ServerId: serverId, - ActionType: eventpb.ActionType_ACTION_TYPE_REGISTER, - FilterConfig: d.target.GetFilterConfig(), - EnableSyncPoint: d.target.EnableSyncPoint(), - SyncPointInterval: uint64(syncPointInterval.Seconds()), - SyncPointTs: syncpoint.CalculateStartSyncPointTs(startTs, syncPointInterval, d.target.GetSkipSyncpointAtStartTs()), - OnlyReuse: onlyReuse, - BdrMode: d.target.GetBDRMode(), - Mode: d.target.GetMode(), - Epoch: 0, - Timezone: d.target.GetTimezone(), - Integrity: d.target.GetIntegrityConfig(), - OutputRawChangeEvent: d.target.IsOutputRawChangeEvent(), - TxnAtomicity: string(d.target.GetTxnAtomicity()), - }, - } + return d.session.newDispatcherRegisterRequest(serverId, onlyReuse) } func (d *dispatcherStat) newDispatcherResetRequest(serverId string, resetTs uint64, epoch uint64) *messaging.DispatcherRequest { - syncPointInterval := d.target.GetSyncPointInterval() - - // after reset during normal run time, we can filter reduduant syncpoint at event collector side - // so we just take care of the case that resetTs is same as startTs - skipSyncpointSameAsResetTs := false - if resetTs == d.target.GetStartTs() { - skipSyncpointSameAsResetTs = d.target.GetSkipSyncpointAtStartTs() - } - return &messaging.DispatcherRequest{ - DispatcherRequest: &eventpb.DispatcherRequest{ - ChangefeedId: d.target.GetChangefeedID().ToPB(), - DispatcherId: d.target.GetId().ToPB(), - TableSpan: d.target.GetTableSpan(), - StartTs: resetTs, - // ServerId is the id of the request sender. - ServerId: serverId, - ActionType: eventpb.ActionType_ACTION_TYPE_RESET, - FilterConfig: d.target.GetFilterConfig(), - EnableSyncPoint: d.target.EnableSyncPoint(), - SyncPointInterval: uint64(syncPointInterval.Seconds()), - SyncPointTs: syncpoint.CalculateStartSyncPointTs(resetTs, syncPointInterval, skipSyncpointSameAsResetTs), - // OnlyReuse: false, - BdrMode: d.target.GetBDRMode(), - Mode: d.target.GetMode(), - Epoch: epoch, - Timezone: d.target.GetTimezone(), - Integrity: d.target.GetIntegrityConfig(), - OutputRawChangeEvent: d.target.IsOutputRawChangeEvent(), - }, - } + return d.session.newDispatcherResetRequest(serverId, resetTs, epoch) } func (d *dispatcherStat) newDispatcherRemoveRequest(serverId string) *messaging.DispatcherRequest { - return &messaging.DispatcherRequest{ - DispatcherRequest: &eventpb.DispatcherRequest{ - ChangefeedId: d.target.GetChangefeedID().ToPB(), - DispatcherId: d.target.GetId().ToPB(), - TableSpan: d.target.GetTableSpan(), - // ServerId is the id of the request sender. - ServerId: serverId, - ActionType: eventpb.ActionType_ACTION_TYPE_REMOVE, - Mode: d.target.GetMode(), - }, - } + return d.session.newDispatcherRemoveRequest(serverId) } diff --git a/downstreamadapter/eventcollector/dispatcher_stat_test.go b/downstreamadapter/eventcollector/dispatcher_stat_test.go index d84013b610..007b15cc17 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat_test.go +++ b/downstreamadapter/eventcollector/dispatcher_stat_test.go @@ -59,6 +59,17 @@ func newMockDispatcher(id common.DispatcherID, startTs uint64) *mockDispatcher { } } +// newDispatcherStatForTest is for pure state tests that do not assert messages sent to EventService. +func newDispatcherStatForTest(target dispatcher.DispatcherService, readyCallback func()) *dispatcherStat { + return newDispatcherStatInternal( + target, + nil, + "", + func(*messaging.TargetMessage) {}, + readyCallback, + ) +} + func (m *mockDispatcher) GetStartTs() uint64 { return m.startTs } @@ -342,7 +353,7 @@ func TestVerifyEventSequence(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - stat := newDispatcherStat(newMockDispatcher(common.NewDispatcherID(), 0), nil, nil) + stat := newDispatcherStatForTest(newMockDispatcher(common.NewDispatcherID(), 0), nil) state := stat.loadCurrentEpochState() state.lastEventSeq.Store(tt.lastEventSeq) result := stat.verifyEventSequence(tt.event, state) @@ -492,7 +503,7 @@ func TestFilterAndUpdateEventByCommitTs(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - stat := newDispatcherStat(newMockDispatcher(common.NewDispatcherID(), 0), nil, nil) + stat := newDispatcherStatForTest(newMockDispatcher(common.NewDispatcherID(), 0), nil) stat.lastEventCommitTs.Store(tt.lastEventCommitTs) stat.gotDDLOnTs.Store(tt.gotDDLOnTs) stat.gotSyncpointOnTS.Store(tt.gotSyncpointOnTS) @@ -512,7 +523,7 @@ func TestFilterAndUpdateEventByCommitTs(t *testing.T) { func TestUpdateCommitTsStateByEvents(t *testing.T) { t.Parallel() - stat := newDispatcherStat(newMockDispatcher(common.NewDispatcherID(), 0), nil, nil) + stat := newDispatcherStatForTest(newMockDispatcher(common.NewDispatcherID(), 0), nil) stat.lastEventCommitTs.Store(100) stat.gotDDLOnTs.Store(true) stat.gotSyncpointOnTS.Store(true) @@ -778,7 +789,7 @@ func TestIsFromCurrentEpoch(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - stat := newDispatcherStat(newMockDispatcher(common.NewDispatcherID(), 0), nil, nil) + stat := newDispatcherStatForTest(newMockDispatcher(common.NewDispatcherID(), 0), nil) state := newDispatcherEpochState(tt.epoch, tt.lastEventSeq, stat.target.GetStartTs()) stat.currentEpoch.Store(state) result := stat.isFromCurrentEpoch(tt.event, state) @@ -1176,7 +1187,7 @@ func TestHandleSingleDataEventsUpdatesDDLStateAndDedupsSameTsDDL(t *testing.T) { } currentService := node.ID("service1") - stat := newDispatcherStat(mockDisp, nil, nil) + stat := newDispatcherStatForTest(mockDisp, nil) stat.lastEventCommitTs.Store(99) stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) stat.session.connState.setEventServiceID(currentService) @@ -1221,7 +1232,7 @@ func TestHandleSingleDataEventsUpdatesSyncPointStateAndDedupsSameTsSyncPoint(t * } currentService := node.ID("service1") - stat := newDispatcherStat(mockDisp, nil, nil) + stat := newDispatcherStatForTest(mockDisp, nil) stat.lastEventCommitTs.Store(199) stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) stat.session.connState.setEventServiceID(currentService) @@ -1350,7 +1361,7 @@ func TestHandleBatchDMLEvent(t *testing.T) { t.Parallel() mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) mockDisp.handleEvents = normalHandleEvents - stat := newDispatcherStat(mockDisp, nil, nil) + stat := newDispatcherStatForTest(mockDisp, nil) stat.lastEventCommitTs.Store(tt.lastCommitTs) stat.currentEpoch.Store(newDispatcherEpochState(tt.epoch, tt.lastSeq, stat.target.GetStartTs())) if tt.tableInfo != nil { @@ -1376,7 +1387,7 @@ func TestHandleBatchDataEventsDoesNotAdvanceCommitTsWhenNoValidEvents(t *testing return false } - stat := newDispatcherStat(mockDisp, nil, nil) + stat := newDispatcherStatForTest(mockDisp, nil) stat.lastEventCommitTs.Store(50) stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) @@ -1438,7 +1449,7 @@ func TestNewDispatcherResetRequest(t *testing.T) { t.Run(tc.name, func(t *testing.T) { mockDisp := newMockDispatcher(common.NewDispatcherID(), startTs) mockDisp.skipSyncpointAtStartTs = tc.skipSyncpointAtStartTs - stat := newDispatcherStat(mockDisp, nil, nil) + stat := newDispatcherStatForTest(mockDisp, nil) resetReq := stat.newDispatcherResetRequest("local", tc.resetTs, 1) require.Equal(t, tc.expectedSyncPointTs, resetReq.SyncPointTs) }) From 298ba37e7ba9c861916e86b8b882fa5f5a787454 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 7 May 2026 21:37:06 +0800 Subject: [PATCH 2/4] eventcollector: clarify state only test helper --- .../eventcollector/dispatcher_stat_test.go | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/downstreamadapter/eventcollector/dispatcher_stat_test.go b/downstreamadapter/eventcollector/dispatcher_stat_test.go index 007b15cc17..c657621bfa 100644 --- a/downstreamadapter/eventcollector/dispatcher_stat_test.go +++ b/downstreamadapter/eventcollector/dispatcher_stat_test.go @@ -59,14 +59,15 @@ func newMockDispatcher(id common.DispatcherID, startTs uint64) *mockDispatcher { } } -// newDispatcherStatForTest is for pure state tests that do not assert messages sent to EventService. -func newDispatcherStatForTest(target dispatcher.DispatcherService, readyCallback func()) *dispatcherStat { +// newDispatcherStatForStateTest is for tests that only exercise dispatcherStat state. +// Tests that assert EventService requests should create a real EventCollector instead. +func newDispatcherStatForStateTest(target dispatcher.DispatcherService) *dispatcherStat { return newDispatcherStatInternal( target, nil, "", func(*messaging.TargetMessage) {}, - readyCallback, + nil, ) } @@ -353,7 +354,7 @@ func TestVerifyEventSequence(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - stat := newDispatcherStatForTest(newMockDispatcher(common.NewDispatcherID(), 0), nil) + stat := newDispatcherStatForStateTest(newMockDispatcher(common.NewDispatcherID(), 0)) state := stat.loadCurrentEpochState() state.lastEventSeq.Store(tt.lastEventSeq) result := stat.verifyEventSequence(tt.event, state) @@ -503,7 +504,7 @@ func TestFilterAndUpdateEventByCommitTs(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - stat := newDispatcherStatForTest(newMockDispatcher(common.NewDispatcherID(), 0), nil) + stat := newDispatcherStatForStateTest(newMockDispatcher(common.NewDispatcherID(), 0)) stat.lastEventCommitTs.Store(tt.lastEventCommitTs) stat.gotDDLOnTs.Store(tt.gotDDLOnTs) stat.gotSyncpointOnTS.Store(tt.gotSyncpointOnTS) @@ -523,7 +524,7 @@ func TestFilterAndUpdateEventByCommitTs(t *testing.T) { func TestUpdateCommitTsStateByEvents(t *testing.T) { t.Parallel() - stat := newDispatcherStatForTest(newMockDispatcher(common.NewDispatcherID(), 0), nil) + stat := newDispatcherStatForStateTest(newMockDispatcher(common.NewDispatcherID(), 0)) stat.lastEventCommitTs.Store(100) stat.gotDDLOnTs.Store(true) stat.gotSyncpointOnTS.Store(true) @@ -789,7 +790,7 @@ func TestIsFromCurrentEpoch(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - stat := newDispatcherStatForTest(newMockDispatcher(common.NewDispatcherID(), 0), nil) + stat := newDispatcherStatForStateTest(newMockDispatcher(common.NewDispatcherID(), 0)) state := newDispatcherEpochState(tt.epoch, tt.lastEventSeq, stat.target.GetStartTs()) stat.currentEpoch.Store(state) result := stat.isFromCurrentEpoch(tt.event, state) @@ -1187,7 +1188,7 @@ func TestHandleSingleDataEventsUpdatesDDLStateAndDedupsSameTsDDL(t *testing.T) { } currentService := node.ID("service1") - stat := newDispatcherStatForTest(mockDisp, nil) + stat := newDispatcherStatForStateTest(mockDisp) stat.lastEventCommitTs.Store(99) stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) stat.session.connState.setEventServiceID(currentService) @@ -1232,7 +1233,7 @@ func TestHandleSingleDataEventsUpdatesSyncPointStateAndDedupsSameTsSyncPoint(t * } currentService := node.ID("service1") - stat := newDispatcherStatForTest(mockDisp, nil) + stat := newDispatcherStatForStateTest(mockDisp) stat.lastEventCommitTs.Store(199) stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) stat.session.connState.setEventServiceID(currentService) @@ -1361,7 +1362,7 @@ func TestHandleBatchDMLEvent(t *testing.T) { t.Parallel() mockDisp := newMockDispatcher(common.NewDispatcherID(), 0) mockDisp.handleEvents = normalHandleEvents - stat := newDispatcherStatForTest(mockDisp, nil) + stat := newDispatcherStatForStateTest(mockDisp) stat.lastEventCommitTs.Store(tt.lastCommitTs) stat.currentEpoch.Store(newDispatcherEpochState(tt.epoch, tt.lastSeq, stat.target.GetStartTs())) if tt.tableInfo != nil { @@ -1387,7 +1388,7 @@ func TestHandleBatchDataEventsDoesNotAdvanceCommitTsWhenNoValidEvents(t *testing return false } - stat := newDispatcherStatForTest(mockDisp, nil) + stat := newDispatcherStatForStateTest(mockDisp) stat.lastEventCommitTs.Store(50) stat.currentEpoch.Store(newDispatcherEpochState(10, 1, stat.target.GetStartTs())) @@ -1449,7 +1450,7 @@ func TestNewDispatcherResetRequest(t *testing.T) { t.Run(tc.name, func(t *testing.T) { mockDisp := newMockDispatcher(common.NewDispatcherID(), startTs) mockDisp.skipSyncpointAtStartTs = tc.skipSyncpointAtStartTs - stat := newDispatcherStatForTest(mockDisp, nil) + stat := newDispatcherStatForStateTest(mockDisp) resetReq := stat.newDispatcherResetRequest("local", tc.resetTs, 1) require.Equal(t, tc.expectedSyncPointTs, resetReq.SyncPointTs) }) From c50921ab8c0b3dd01ab2cdb616b9591fcb3f5834 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 7 May 2026 22:27:18 +0800 Subject: [PATCH 3/4] eventcollector: clarify ready callback comments --- downstreamadapter/eventcollector/dispatcher_session.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/downstreamadapter/eventcollector/dispatcher_session.go b/downstreamadapter/eventcollector/dispatcher_session.go index 917b4b62a8..a35373dfdb 100644 --- a/downstreamadapter/eventcollector/dispatcher_session.go +++ b/downstreamadapter/eventcollector/dispatcher_session.go @@ -113,7 +113,9 @@ type dispatcherSession struct { sendMessage func(*messaging.TargetMessage) // nextResetEpoch advances the dispatcher's epoch and returns the new value. nextResetEpoch func(resetTs uint64) uint64 - // readyCallback is only set during the initial local registration path. + // readyCallback is called when the local EventService is ready in the + // two-phase add flow. A non-nil callback defers committing the dispatcher + // until CommitAddDispatcher is called. readyCallback func() // connState tracks which EventService this session is currently talking to. @@ -247,9 +249,9 @@ func (s *dispatcherSession) handleReadyEvent(event dispatcher.DispatcherEvent) { func (s *dispatcherSession) handleLocalReadyEvent() { if s.readyCallback != nil { - // If readyCallback is set, this dispatcher is performing its initial - // registration with the local event service. Therefore, no deregistration - // from a previous service is necessary. + // A non-nil readyCallback means this dispatcher is in the prepare phase + // of a two-phase add. Record the local EventService as ready and let + // the caller decide when to commit the dispatcher. s.connState.setEventServiceID(s.localServerID) s.connState.readyEventReceived.Store(true) s.readyCallback() From b54d22c9f9ccf1b78d8d0b057d2c2d77cc179b76 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Thu, 7 May 2026 22:29:46 +0800 Subject: [PATCH 4/4] f --- downstreamadapter/eventcollector/dispatcher_session.go | 1 - 1 file changed, 1 deletion(-) diff --git a/downstreamadapter/eventcollector/dispatcher_session.go b/downstreamadapter/eventcollector/dispatcher_session.go index a35373dfdb..bf5a1e416e 100644 --- a/downstreamadapter/eventcollector/dispatcher_session.go +++ b/downstreamadapter/eventcollector/dispatcher_session.go @@ -117,7 +117,6 @@ type dispatcherSession struct { // two-phase add flow. A non-nil callback defers committing the dispatcher // until CommitAddDispatcher is called. readyCallback func() - // connState tracks which EventService this session is currently talking to. connState dispatcherConnState }