Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 202 additions & 99 deletions downstreamadapter/eventcollector/dispatcher_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/dispatcher"
"github.com/pingcap/ticdc/downstreamadapter/syncpoint"
"github.com/pingcap/ticdc/eventpb"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/node"
Expand Down Expand Up @@ -103,22 +105,41 @@ func (d *dispatcherConnState) clearRemoteCandidates() {
}

type dispatcherSession struct {
owner *dispatcherStat
// target provides dispatcher metadata and progress used in requests sent to EventService.
target dispatcher.DispatcherService
// localServerID identifies the collector side when talking to EventService.
localServerID node.ID
// sendMessage sends EventService requests generated by this session.
sendMessage func(*messaging.TargetMessage)
// nextResetEpoch advances the dispatcher's epoch and returns the new value.
nextResetEpoch func(resetTs uint64) uint64
// readyCallback is called when the local EventService is ready in the
// two-phase add flow. A non-nil callback defers committing the dispatcher
// until CommitAddDispatcher is called.
readyCallback func()

// connState tracks which EventService this session is currently talking to.
connState dispatcherConnState
}

func newDispatcherSession(owner *dispatcherStat, readyCallback func()) *dispatcherSession {
func newDispatcherSession(
target dispatcher.DispatcherService,
localServerID node.ID,
sendMessage func(*messaging.TargetMessage),
nextResetEpoch func(resetTs uint64) uint64,
readyCallback func(),
) *dispatcherSession {
return &dispatcherSession{
owner: owner,
readyCallback: readyCallback,
target: target,
localServerID: localServerID,
sendMessage: sendMessage,
nextResetEpoch: nextResetEpoch,
readyCallback: readyCallback,
}
}

func (s *dispatcherSession) clear() {
// TODO: this design is bad because we may receive stale heartbeat response,
// which make us call clear and register again. But the register may be ignore,
// which makes us call clear and register again. But the register may be ignored,
// so we will not receive any ready event.
s.connState.clear()
}
Expand All @@ -128,159 +149,241 @@ func (s *dispatcherSession) registerTo(serverID node.ID) {
// `onlyReuse` is used to control the register behavior at logservice side
// it should be set to `false` when register to a local event service,
// and set to `true` when register to a remote event service.
onlyReuse := serverID != s.owner.eventCollector.getLocalServerID()
onlyReuse := serverID != s.localServerID
msg := messaging.NewSingleTargetMessage(
serverID,
messaging.EventServiceTopic,
s.owner.newDispatcherRegisterRequest(s.owner.eventCollector.getLocalServerID().String(), onlyReuse),
s.newDispatcherRegisterRequest(s.localServerID.String(), onlyReuse),
)
s.owner.eventCollector.enqueueMessageForSend(msg)
s.sendMessage(msg)
}

// commitReady is used to notify the event service to start sending events.
func (s *dispatcherSession) commitReady(serverID node.ID) {
s.doReset(serverID, s.owner.getResetTs())
s.doReset(serverID, s.target.GetCheckpointTs())
}

// reset is used to reset the dispatcher to the specified commitTs,
// it will remove the dispatcher from the dynamic stream and add it back.
func (s *dispatcherSession) reset(serverID node.ID) {
s.doReset(serverID, s.owner.getResetTs())
s.doReset(serverID, s.target.GetCheckpointTs())
}

func (s *dispatcherSession) doReset(serverID node.ID, resetTs uint64) {
var epoch uint64
for {
currentState := s.owner.loadCurrentEpochState()
nextState := newDispatcherEpochState(currentState.epoch+1, 0, resetTs)
if s.owner.currentEpoch.CompareAndSwap(currentState, nextState) {
epoch = nextState.epoch
break
}
}
resetRequest := s.owner.newDispatcherResetRequest(
s.owner.eventCollector.getLocalServerID().String(),
epoch := s.nextResetEpoch(resetTs)
resetRequest := s.newDispatcherResetRequest(
s.localServerID.String(),
resetTs,
epoch,
)
msg := messaging.NewSingleTargetMessage(serverID, messaging.EventServiceTopic, resetRequest)
s.owner.eventCollector.enqueueMessageForSend(msg)
s.sendMessage(msg)
Comment on lines 161 to +180
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Clamp reset ts to collector-observed progress, not raw sink checkpoint.

Lines 162 and 168 use s.target.GetCheckpointTs() directly. That breaks the safety invariant documented in dispatcherEpochState.maxEventTs: old in-flight events can advance the sink checkpoint after a reset, even when the collector has not accepted that progress in the new epoch yet. If another commitReady/reset happens in that window, this code will start the next epoch too far ahead and can permanently skip data.

Possible fix shape
type dispatcherSession struct {
    target dispatcher.DispatcherService
    localServerID node.ID
    sendMessage func(*messaging.TargetMessage)
+   getSafeResetTs func() uint64
    nextResetEpoch func(resetTs uint64) uint64
    readyCallback func()
}

-func newDispatcherSession(
+func newDispatcherSession(
    target dispatcher.DispatcherService,
    localServerID node.ID,
    sendMessage func(*messaging.TargetMessage),
+   getSafeResetTs func() uint64,
    nextResetEpoch func(resetTs uint64) uint64,
    readyCallback func(),
) *dispatcherSession {
    return &dispatcherSession{
        target:         target,
        localServerID:  localServerID,
        sendMessage:    sendMessage,
+       getSafeResetTs: getSafeResetTs,
        nextResetEpoch: nextResetEpoch,
        readyCallback:  readyCallback,
    }
}

func (s *dispatcherSession) commitReady(serverID node.ID) {
-   s.doReset(serverID, s.target.GetCheckpointTs())
+   s.doReset(serverID, s.getSafeResetTs())
}

func (s *dispatcherSession) reset(serverID node.ID) {
-   s.doReset(serverID, s.target.GetCheckpointTs())
+   s.doReset(serverID, s.getSafeResetTs())
}

Wire getSafeResetTs from dispatcherStat using the same capped progress used for EventService heartbeats.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/eventcollector/dispatcher_session.go` around lines 160 -
179, commitReady and reset currently call doReset with
s.target.GetCheckpointTs(), which can advance reset epochs past
collector-observed progress and violate dispatcherEpochState.maxEventTs; change
both commitReady and reset to compute resetTs via the safe, capped progress (use
the dispatcher state helper, e.g. s.state.getSafeResetTs() or the equivalent
getSafeResetTs method wired from dispatcherStat used for EventService
heartbeats) and pass that value into doReset(serverID, resetTs), leaving doReset
unchanged; update imports/struct wiring if needed to expose getSafeResetTs on
s.state so commitReady/reset use the capped progress instead of
s.target.GetCheckpointTs().

log.Info("send reset dispatcher request to event service",
zap.Stringer("changefeedID", s.owner.target.GetChangefeedID()),
zap.Stringer("dispatcher", s.owner.getDispatcherID()),
zap.Stringer("changefeedID", s.target.GetChangefeedID()),
zap.Stringer("dispatcher", s.target.GetId()),
zap.Stringer("eventServiceID", serverID),
zap.Uint64("epoch", epoch),
zap.Uint64("resetTs", resetTs))
}

// remove is used to remove the dispatcher from the event service.
func (s *dispatcherSession) remove() {
s.removeFrom(s.owner.eventCollector.getLocalServerID())
s.removeFrom(s.localServerID)
eventServiceID := s.getEventServiceID()
if eventServiceID != "" && eventServiceID != s.owner.eventCollector.getLocalServerID() {
if eventServiceID != "" && eventServiceID != s.localServerID {
s.removeFrom(eventServiceID)
}
}

// removeFrom is used to remove the dispatcher from the specified event service.
func (s *dispatcherSession) removeFrom(serverID node.ID) {
log.Info("send remove dispatcher request to event service",
zap.Stringer("changefeedID", s.owner.target.GetChangefeedID()),
zap.Stringer("dispatcher", s.owner.getDispatcherID()),
zap.Stringer("changefeedID", s.target.GetChangefeedID()),
zap.Stringer("dispatcher", s.target.GetId()),
zap.Stringer("eventServiceID", serverID))
msg := messaging.NewSingleTargetMessage(
serverID,
messaging.EventServiceTopic,
s.owner.newDispatcherRemoveRequest(s.owner.eventCollector.getLocalServerID().String()),
s.newDispatcherRemoveRequest(s.localServerID.String()),
)
s.owner.eventCollector.enqueueMessageForSend(msg)
s.sendMessage(msg)
}

// "signalEvent" refers to the types of events that may modify the event service with which this dispatcher communicates.
// "signalEvent" includes TypeReadyEvent/TypeNotReusableEvent
func (s *dispatcherSession) handleSignalEvent(event dispatcher.DispatcherEvent) {
localServerID := s.owner.eventCollector.getLocalServerID()

switch event.GetType() {
case commonEvent.TypeReadyEvent:
// if the dispatcher has received ready signal from local event service,
// ignore all types of signal events.
if s.isCurrentEventService(localServerID) {
// If we receive a ready event from a remote service while connected to the local
// service, it implies a stale registration. Send a remove request to clean it up.
if event.From != nil && *event.From != localServerID {
s.removeFrom(*event.From)
}
return
}

// if the event is neither from local event service nor from the current event service, ignore it.
if *event.From != localServerID && !s.isCurrentEventService(*event.From) {
return
}

if *event.From == localServerID {
if s.readyCallback != nil {
// If readyCallback is set, this dispatcher is performing its initial
// registration with the local event service. Therefore, no deregistration
// from a previous service is necessary.
s.connState.setEventServiceID(localServerID)
s.connState.readyEventReceived.Store(true)
s.readyCallback()
return
}
// note: this must be the first ready event from local event service
oldEventServiceID := s.getEventServiceID()
if oldEventServiceID != "" {
s.removeFrom(oldEventServiceID)
}
log.Info("received ready signal from local event service, prepare to reset the dispatcher",
zap.Stringer("changefeedID", s.owner.target.GetChangefeedID()),
zap.Stringer("dispatcher", s.owner.getDispatcherID()))

s.connState.setEventServiceID(localServerID)
s.connState.readyEventReceived.Store(true)
s.connState.clearRemoteCandidates()
s.commitReady(localServerID)
} else {
// note: this ready event must be from a remote event service which the dispatcher is trying to register to.
// TODO: if receive too much redudant ready events from remote service, we may need reset again?
if s.connState.readyEventReceived.Load() {
log.Info("received ready signal from the same server again, ignore it",
zap.Stringer("changefeedID", s.owner.target.GetChangefeedID()),
zap.Stringer("dispatcher", s.owner.getDispatcherID()),
zap.Stringer("eventServiceID", *event.From))
return
}
log.Info("received ready signal from remote event service, prepare to reset the dispatcher",
zap.Stringer("changefeedID", s.owner.target.GetChangefeedID()),
zap.Stringer("dispatcher", s.owner.getDispatcherID()),
zap.Stringer("eventServiceID", *event.From))
s.connState.readyEventReceived.Store(true)
s.commitReady(*event.From)
}
s.handleReadyEvent(event)
case commonEvent.TypeNotReusableEvent:
if *event.From == localServerID {
log.Panic("should not happen: local event service should not send not reusable event")
}
candidate := s.connState.getNextRemoteCandidate()
if candidate != "" {
s.registerTo(candidate)
}
s.handleNotReusableEvent(event)
default:
log.Panic("should not happen: unknown signal event type", zap.Int("eventType", event.GetType()))
}
}

func (s *dispatcherSession) handleReadyEvent(event dispatcher.DispatcherEvent) {
// if the dispatcher has received ready signal from local event service,
// ignore all types of signal events.
if s.isCurrentEventService(s.localServerID) {
// If we receive a ready event from a remote service while connected to the local
// service, it implies a stale registration. Send a remove request to clean it up.
if event.From != nil && *event.From != s.localServerID {
s.removeFrom(*event.From)
}
return
}

// if the event is neither from local event service nor from the current event service, ignore it.
if *event.From != s.localServerID && !s.isCurrentEventService(*event.From) {
return
}

if *event.From == s.localServerID {
s.handleLocalReadyEvent()
return
}
s.handleRemoteReadyEvent(*event.From)
}

func (s *dispatcherSession) handleLocalReadyEvent() {
if s.readyCallback != nil {
// A non-nil readyCallback means this dispatcher is in the prepare phase
// of a two-phase add. Record the local EventService as ready and let
// the caller decide when to commit the dispatcher.
s.connState.setEventServiceID(s.localServerID)
s.connState.readyEventReceived.Store(true)
s.readyCallback()
return
}

// note: this must be the first ready event from local event service
oldEventServiceID := s.getEventServiceID()
if oldEventServiceID != "" {
s.removeFrom(oldEventServiceID)
}
log.Info("received ready signal from local event service, prepare to reset the dispatcher",
zap.Stringer("changefeedID", s.target.GetChangefeedID()),
zap.Stringer("dispatcher", s.target.GetId()))

s.connState.setEventServiceID(s.localServerID)
s.connState.readyEventReceived.Store(true)
s.connState.clearRemoteCandidates()
s.commitReady(s.localServerID)
}

func (s *dispatcherSession) handleRemoteReadyEvent(serverID node.ID) {
// note: this ready event must be from a remote event service which the dispatcher is trying to register to.
// TODO: if we receive too many redundant ready events from a remote service, we may need to reset again.
if s.connState.readyEventReceived.Load() {
log.Info("received ready signal from the same server again, ignore it",
zap.Stringer("changefeedID", s.target.GetChangefeedID()),
zap.Stringer("dispatcher", s.target.GetId()),
zap.Stringer("eventServiceID", serverID))
return
}
log.Info("received ready signal from remote event service, prepare to reset the dispatcher",
zap.Stringer("changefeedID", s.target.GetChangefeedID()),
zap.Stringer("dispatcher", s.target.GetId()),
zap.Stringer("eventServiceID", serverID))
s.connState.readyEventReceived.Store(true)
s.commitReady(serverID)
}

func (s *dispatcherSession) handleNotReusableEvent(event dispatcher.DispatcherEvent) {
if *event.From == s.localServerID {
log.Panic("should not happen: local event service should not send not reusable event")
}
candidate := s.connState.getNextRemoteCandidate()
if candidate != "" {
s.registerTo(candidate)
}
Comment on lines +293 to +300
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Clear the failed remote binding when no replacement candidate exists.

If getNextRemoteCandidate() returns empty here, connState.eventServiceID still points at the rejected remote. After that, trySetRemoteCandidates() will refuse any later candidate list because the session still looks attached, so this dispatcher can get stuck until some unrelated clear path runs.

Minimal fix
func (s *dispatcherSession) handleNotReusableEvent(event dispatcher.DispatcherEvent) {
    if *event.From == s.localServerID {
        log.Panic("should not happen: local event service should not send not reusable event")
    }
    candidate := s.connState.getNextRemoteCandidate()
-   if candidate != "" {
-       s.registerTo(candidate)
-   }
+   if candidate == "" {
+       s.connState.setEventServiceID("")
+       return
+   }
+   s.registerTo(candidate)
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@downstreamadapter/eventcollector/dispatcher_session.go` around lines 292 -
299, When handleNotReusableEvent in dispatcherSession observes no replacement
candidate (connState.getNextRemoteCandidate() returns empty), clear the failed
remote binding so the session no longer appears attached; specifically, if
candidate == "" then reset the remote binding (e.g., clear
connState.eventServiceID or call the connState clear/unbind helper) before
returning so trySetRemoteCandidates can accept new candidates later; keep the
existing registerTo(path) behavior when a candidate exists and optionally log
the clear action for debugging.

}

func (s *dispatcherSession) newDispatcherRegisterRequest(serverID string, onlyReuse bool) *messaging.DispatcherRequest {
startTs := s.target.GetStartTs()
syncPointInterval := s.target.GetSyncPointInterval()
return &messaging.DispatcherRequest{
DispatcherRequest: &eventpb.DispatcherRequest{
ChangefeedId: s.target.GetChangefeedID().ToPB(),
DispatcherId: s.target.GetId().ToPB(),
TableSpan: s.target.GetTableSpan(),
StartTs: startTs,
// ServerId is the id of the request sender.
ServerId: serverID,
ActionType: eventpb.ActionType_ACTION_TYPE_REGISTER,
FilterConfig: s.target.GetFilterConfig(),
EnableSyncPoint: s.target.EnableSyncPoint(),
SyncPointInterval: uint64(syncPointInterval.Seconds()),
SyncPointTs: syncpoint.CalculateStartSyncPointTs(startTs, syncPointInterval, s.target.GetSkipSyncpointAtStartTs()),
OnlyReuse: onlyReuse,
BdrMode: s.target.GetBDRMode(),
Mode: s.target.GetMode(),
Epoch: 0,
Timezone: s.target.GetTimezone(),
Integrity: s.target.GetIntegrityConfig(),
OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(),
TxnAtomicity: string(s.target.GetTxnAtomicity()),
},
}
}

func (s *dispatcherSession) newDispatcherResetRequest(serverID string, resetTs uint64, epoch uint64) *messaging.DispatcherRequest {
syncPointInterval := s.target.GetSyncPointInterval()

// After reset during normal run time, the event collector can filter redundant sync points,
// so only the case where resetTs is the same as startTs needs special handling.
skipSyncpointSameAsResetTs := false
if resetTs == s.target.GetStartTs() {
skipSyncpointSameAsResetTs = s.target.GetSkipSyncpointAtStartTs()
}
return &messaging.DispatcherRequest{
DispatcherRequest: &eventpb.DispatcherRequest{
ChangefeedId: s.target.GetChangefeedID().ToPB(),
DispatcherId: s.target.GetId().ToPB(),
TableSpan: s.target.GetTableSpan(),
StartTs: resetTs,
// ServerId is the id of the request sender.
ServerId: serverID,
ActionType: eventpb.ActionType_ACTION_TYPE_RESET,
FilterConfig: s.target.GetFilterConfig(),
EnableSyncPoint: s.target.EnableSyncPoint(),
SyncPointInterval: uint64(syncPointInterval.Seconds()),
SyncPointTs: syncpoint.CalculateStartSyncPointTs(resetTs, syncPointInterval, skipSyncpointSameAsResetTs),
// OnlyReuse: false,
BdrMode: s.target.GetBDRMode(),
Mode: s.target.GetMode(),
Epoch: epoch,
Timezone: s.target.GetTimezone(),
Integrity: s.target.GetIntegrityConfig(),
OutputRawChangeEvent: s.target.IsOutputRawChangeEvent(),
},
}
}

func (s *dispatcherSession) newDispatcherRemoveRequest(serverID string) *messaging.DispatcherRequest {
return &messaging.DispatcherRequest{
DispatcherRequest: &eventpb.DispatcherRequest{
ChangefeedId: s.target.GetChangefeedID().ToPB(),
DispatcherId: s.target.GetId().ToPB(),
TableSpan: s.target.GetTableSpan(),
// ServerId is the id of the request sender.
ServerId: serverID,
ActionType: eventpb.ActionType_ACTION_TYPE_REMOVE,
Mode: s.target.GetMode(),
},
}
}

func (s *dispatcherSession) setRemoteCandidates(nodes []string) {
if len(nodes) == 0 {
return
}
if s.connState.trySetRemoteCandidates(nodes) {
log.Info("set remote candidates",
zap.Stringer("changefeedID", s.owner.target.GetChangefeedID()),
zap.Stringer("dispatcherID", s.owner.getDispatcherID()),
zap.Int64("tableID", s.owner.target.GetTableSpan().TableID),
zap.Stringer("changefeedID", s.target.GetChangefeedID()),
zap.Stringer("dispatcherID", s.target.GetId()),
zap.Int64("tableID", s.target.GetTableSpan().TableID),
zap.Strings("nodes", nodes))
candidate := s.connState.getNextRemoteCandidate()
s.registerTo(candidate)
Expand Down
Loading
Loading