Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ tools/bin
tools/include
tools/workload/bin

.design
.issue
.vscode
.idea
Expand Down
8 changes: 7 additions & 1 deletion downstreamadapter/dispatcher/basic_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,14 @@ func (d *BasicDispatcher) handleEvents(dispatcherEvents []DispatcherEvent, wakeC
log.Panic("sync point event should only be singly handled",
zap.Stringer("dispatcherID", d.id))
}
block = true
syncPoint := event.(*commonEvent.SyncPointEvent)
if d.ShouldSkipSyncPoint(syncPoint.GetCommitTs()) {
log.Debug("dispatcher skip sync point event by control window",
zap.Stringer("dispatcher", d.id),
zap.Uint64("commitTs", syncPoint.GetCommitTs()))
continue
}
block = true
log.Info("dispatcher receive sync point event",
zap.Stringer("dispatcher", d.id),
zap.Uint64("commitTs", syncPoint.GetCommitTs()),
Expand Down
26 changes: 25 additions & 1 deletion downstreamadapter/dispatcher/basic_dispatcher_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type SharedInfo struct {
// the config of filter
filterConfig *eventpb.FilterConfig
// if syncPointInfo is not nil, means enable Sync Point feature,
syncPointConfig *syncpoint.SyncPointConfig
syncPointConfig *syncpoint.SyncPointConfig
syncPointControl atomic.Pointer[common.SyncPointControl]

// The atomicity level of a transaction.
txnAtomicity config.AtomicityLevel
Expand Down Expand Up @@ -121,6 +122,8 @@ func NewSharedInfo(
} else {
sharedInfo.txnAtomicity = config.DefaultAtomicityLevel()
}
disabled := common.NewDisabledSyncPointControl()
sharedInfo.syncPointControl.Store(&disabled)
return sharedInfo
}

Expand Down Expand Up @@ -223,6 +226,10 @@ func (d *BasicDispatcher) GetSyncPointInterval() time.Duration {
return time.Duration(0)
}

func (d *BasicDispatcher) ShouldSkipSyncPoint(commitTs uint64) bool {
return d.sharedInfo.ShouldSkipSyncPoint(commitTs)
}

func (d *BasicDispatcher) GetTableSpan() *heartbeatpb.TableSpan {
return d.tableSpan
}
Expand Down Expand Up @@ -258,6 +265,23 @@ func (s *SharedInfo) IsOutputRawChangeEvent() bool {
return s.outputRawChangeEvent
}

func (s *SharedInfo) SetSyncPointControl(control common.SyncPointControl) {
clone := control.Clone()
s.syncPointControl.Store(&clone)
}

func (s *SharedInfo) GetSyncPointControl() common.SyncPointControl {
control := s.syncPointControl.Load()
if control == nil {
return common.NewDisabledSyncPointControl()
}
return control.Clone()
}

func (s *SharedInfo) ShouldSkipSyncPoint(commitTs uint64) bool {
return s.GetSyncPointControl().Contains(commitTs)
}

func (s *SharedInfo) GetStatusesChan() chan TableSpanStatusWithSeq {
return s.statusesChan
}
Expand Down
41 changes: 37 additions & 4 deletions downstreamadapter/dispatchermanager/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ type DispatcherManager struct {

// Shared info for all dispatchers
sharedInfo *dispatcher.SharedInfo
// appliedSyncPointControl is the current syncpoint skip control installed on this dispatcher manager.
appliedSyncPointControl atomic.Pointer[common.SyncPointControl]

metricTableTriggerEventDispatcherCount prometheus.Gauge
metricEventDispatcherCount prometheus.Gauge
Expand All @@ -182,6 +184,7 @@ func NewDispatcherManager(
startTs uint64,
maintainerID node.ID,
newChangefeed bool,
syncPointControl *heartbeatpb.SyncPointControl,
) (*DispatcherManager, error) {
failpoint.Inject("NewDispatcherManagerDelay", nil)

Expand Down Expand Up @@ -275,6 +278,7 @@ func NewDispatcherManager(
make(chan *heartbeatpb.TableSpanBlockStatus, 1024*1024),
make(chan error, 1),
)
manager.SetSyncPointControl(syncPointControl)

// Register Event Dispatcher Manager in HeartBeatCollector,
// which is responsible for communication with the maintainer.
Expand Down Expand Up @@ -738,10 +742,11 @@ func (e *DispatcherManager) collectComponentStatusWhenChanged(ctx context.Contex
// Returns a HeartBeatRequest containing the aggregated information.
func (e *DispatcherManager) aggregateDispatcherHeartbeats(needCompleteStatus bool) *heartbeatpb.HeartBeatRequest {
message := heartbeatpb.HeartBeatRequest{
ChangefeedID: e.changefeedID.ToPB(),
CompeleteStatus: needCompleteStatus,
Watermark: heartbeatpb.NewMaxWatermark(),
RedoWatermark: heartbeatpb.NewMaxWatermark(),
ChangefeedID: e.changefeedID.ToPB(),
CompeleteStatus: needCompleteStatus,
Watermark: heartbeatpb.NewMaxWatermark(),
RedoWatermark: heartbeatpb.NewMaxWatermark(),
SyncPointControl: e.GetSyncPointControl().ToPB(),
}

toCleanMap := make([]*cleanMap, 0)
Expand Down Expand Up @@ -813,6 +818,34 @@ func (e *DispatcherManager) aggregateDispatcherHeartbeats(needCompleteStatus boo
return &message
}

func (e *DispatcherManager) SetSyncPointControl(control *heartbeatpb.SyncPointControl) {
newControl := common.NewSyncPointControlFromPB(control)
current := e.appliedSyncPointControl.Load()
if current != nil && current.Epoch > newControl.Epoch {
return
}
if current != nil && current.Epoch == newControl.Epoch && !current.Equal(newControl) {
log.Warn("ignore syncpoint control with same epoch but different window",
zap.Stringer("changefeedID", e.changefeedID),
zap.Uint64("epoch", newControl.Epoch),
zap.Uint64("currentSkipStartTs", current.SkipStartTs),
zap.Uint64("currentSkipEndTs", current.SkipEndTs),
zap.Uint64("newSkipStartTs", newControl.SkipStartTs),
zap.Uint64("newSkipEndTs", newControl.SkipEndTs))
return
}
e.sharedInfo.SetSyncPointControl(newControl)
e.appliedSyncPointControl.Store(&newControl)
}

func (e *DispatcherManager) GetSyncPointControl() common.SyncPointControl {
control := e.appliedSyncPointControl.Load()
if control == nil {
return common.NewDisabledSyncPointControl()
}
return control.Clone()
}

func (e *DispatcherManager) MergeDispatcher(dispatcherIDs []common.DispatcherID, mergedDispatcherID common.DispatcherID, mode int64) *MergeCheckTask {
if common.IsRedoMode(mode) {
return e.mergeRedoDispatcher(dispatcherIDs, mergedDispatcherID)
Expand Down
34 changes: 34 additions & 0 deletions downstreamadapter/dispatchermanager/dispatcher_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,40 @@ func TestCollectComponentStatusWhenChangedWatermarkSeqNoFallback(t *testing.T) {
require.Equal(t, uint64(200), req.Request.RedoWatermark.Seq)
}

func TestAggregateDispatcherHeartbeatsCarriesSyncPointControl(t *testing.T) {
manager := createTestManager(t)
manager.SetSyncPointControl(&heartbeatpb.SyncPointControl{
Epoch: 3,
SkipStartTs: 100,
SkipEndTs: 200,
})

msg := manager.aggregateDispatcherHeartbeats(false)
require.NotNil(t, msg.SyncPointControl)
require.Equal(t, uint64(3), msg.SyncPointControl.Epoch)
require.Equal(t, uint64(100), msg.SyncPointControl.SkipStartTs)
require.Equal(t, uint64(200), msg.SyncPointControl.SkipEndTs)
}

func TestSetSyncPointControlIgnoreSameEpochDifferentWindow(t *testing.T) {
manager := createTestManager(t)
manager.SetSyncPointControl(&heartbeatpb.SyncPointControl{
Epoch: 3,
SkipStartTs: 100,
SkipEndTs: 200,
})
manager.SetSyncPointControl(&heartbeatpb.SyncPointControl{
Epoch: 3,
SkipStartTs: 150,
SkipEndTs: 250,
})

control := manager.GetSyncPointControl()
require.Equal(t, uint64(3), control.Epoch)
require.Equal(t, uint64(100), control.SkipStartTs)
require.Equal(t, uint64(200), control.SkipEndTs)
}

func TestMergeDispatcherNormal(t *testing.T) {
manager := createTestManager(t)

Expand Down
16 changes: 16 additions & 0 deletions downstreamadapter/dispatchermanager/heartbeat_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type HeartBeatCollector struct {
heartBeatResponseDynamicStream dynstream.DynamicStream[int, common.GID, HeartBeatResponse, *DispatcherManager, *HeartBeatResponseHandler]
schedulerDispatcherRequestDynamicStream dynstream.DynamicStream[int, common.GID, SchedulerDispatcherRequest, *DispatcherManager, *SchedulerDispatcherRequestHandler]
checkpointTsMessageDynamicStream dynstream.DynamicStream[int, common.GID, CheckpointTsMessage, *DispatcherManager, *CheckpointTsMessageHandler]
syncPointControlMessageDynamicStream dynstream.DynamicStream[int, common.GID, SyncPointControlMessage, *DispatcherManager, *SyncPointControlMessageHandler]
redoResolvedTsForwardMessageDynamicStream dynstream.DynamicStream[int, common.GID, RedoResolvedTsForwardMessage, *DispatcherManager, *RedoResolvedTsForwardMessageHandler]
redoMetaMessageDynamicStream dynstream.DynamicStream[int, common.GID, RedoMetaMessage, *DispatcherManager, *RedoMetaMessageHandler]
mergeDispatcherRequestDynamicStream dynstream.DynamicStream[int, common.GID, MergeDispatcherRequest, *DispatcherManager, *MergeDispatcherRequestHandler]
Expand All @@ -75,6 +76,7 @@ func NewHeartBeatCollector(serverId node.ID) *HeartBeatCollector {
heartBeatResponseDynamicStream: newHeartBeatResponseDynamicStream(dStatusDS),
schedulerDispatcherRequestDynamicStream: newSchedulerDispatcherRequestDynamicStream(),
checkpointTsMessageDynamicStream: newCheckpointTsMessageDynamicStream(),
syncPointControlMessageDynamicStream: newSyncPointControlMessageDynamicStream(),
redoResolvedTsForwardMessageDynamicStream: newRedoResolvedTsForwardMessageDynamicStream(),
redoMetaMessageDynamicStream: newRedoMetaMessageDynamicStream(),
mergeDispatcherRequestDynamicStream: newMergeDispatcherRequestDynamicStream(),
Expand Down Expand Up @@ -124,6 +126,10 @@ func (c *HeartBeatCollector) RegisterDispatcherManager(m *DispatcherManager) err
if err != nil {
return errors.Trace(err)
}
err = c.syncPointControlMessageDynamicStream.AddPath(m.changefeedID.Id, m)
if err != nil {
return errors.Trace(err)
}
err = c.mergeDispatcherRequestDynamicStream.AddPath(m.changefeedID.Id, m)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -166,6 +172,10 @@ func (c *HeartBeatCollector) RemoveDispatcherManager(id common.ChangeFeedID) err
if err != nil {
return errors.Trace(err)
}
err = c.syncPointControlMessageDynamicStream.RemovePath(id.Id)
if err != nil {
return errors.Trace(err)
}
err = c.mergeDispatcherRequestDynamicStream.RemovePath(id.Id)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -275,6 +285,11 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ
c.checkpointTsMessageDynamicStream.Push(
common.NewChangefeedGIDFromPB(checkpointTsMessage.ChangefeedID),
NewCheckpointTsMessage(checkpointTsMessage))
case messaging.TypeSyncPointControlMessage:
syncPointControlMessage := msg.Message[0].(*heartbeatpb.SyncPointControlMessage)
c.syncPointControlMessageDynamicStream.Push(
common.NewChangefeedGIDFromPB(syncPointControlMessage.ChangefeedID),
NewSyncPointControlMessage(syncPointControlMessage))
case messaging.TypeRedoResolvedTsForwardMessage:
redoMessage := msg.Message[0].(*heartbeatpb.RedoResolvedTsForwardMessage)
c.redoResolvedTsForwardMessageDynamicStream.Push(
Expand Down Expand Up @@ -306,6 +321,7 @@ func (c *HeartBeatCollector) Close() {
c.isClosed.Store(true)

c.checkpointTsMessageDynamicStream.Close()
c.syncPointControlMessageDynamicStream.Close()
c.redoResolvedTsForwardMessageDynamicStream.Close()
c.redoMetaMessageDynamicStream.Close()
c.heartBeatResponseDynamicStream.Close()
Expand Down
57 changes: 57 additions & 0 deletions downstreamadapter/dispatchermanager/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,63 @@ func (h *CheckpointTsMessageHandler) OnDrop(event CheckpointTsMessage) interface
return nil
}

func newSyncPointControlMessageDynamicStream() dynstream.DynamicStream[int, common.GID, SyncPointControlMessage, *DispatcherManager, *SyncPointControlMessageHandler] {
ds := dynstream.NewParallelDynamicStream("syncpoint-control",
&SyncPointControlMessageHandler{})
ds.Start()
return ds
}

type SyncPointControlMessage struct {
*heartbeatpb.SyncPointControlMessage
}

func NewSyncPointControlMessage(msg *heartbeatpb.SyncPointControlMessage) SyncPointControlMessage {
return SyncPointControlMessage{msg}
}

type SyncPointControlMessageHandler struct{}

func (h *SyncPointControlMessageHandler) Path(msg SyncPointControlMessage) common.GID {
return common.NewChangefeedGIDFromPB(msg.ChangefeedID)
}

func (h *SyncPointControlMessageHandler) Handle(dispatcherManager *DispatcherManager, messages ...SyncPointControlMessage) bool {
if len(messages) != 1 {
panic("invalid message count")
}
dispatcherManager.SetSyncPointControl(messages[0].Control)
return false
}
Comment on lines +653 to +659
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The Handle method for SyncPointControlMessageHandler should not panic if the number of messages is not exactly one. The dynstream framework can batch multiple events for the same path into a single Handle call if they are queued. Panicking in production code due to unexpected batching is dangerous. Instead, you should process all messages in the batch or at least the most recent one (the last one in the slice) to ensure the dispatcher manager stays in sync with the maintainer's desired state.

func (h *SyncPointControlMessageHandler) Handle(dispatcherManager *DispatcherManager, messages ...SyncPointControlMessage) bool {
	if len(messages) > 0 {
		dispatcherManager.SetSyncPointControl(messages[len(messages)-1].Control)
	}
	return false
}


func (h *SyncPointControlMessageHandler) GetSize(event SyncPointControlMessage) int {
return 0
}

func (h *SyncPointControlMessageHandler) IsPaused(event SyncPointControlMessage) bool {
return false
}

func (h *SyncPointControlMessageHandler) GetArea(path common.GID, dest *DispatcherManager) int {
return 0
}

func (h *SyncPointControlMessageHandler) GetMetricLabel(dest *DispatcherManager) string {
return dest.changefeedID.String()
}

func (h *SyncPointControlMessageHandler) GetTimestamp(event SyncPointControlMessage) dynstream.Timestamp {
return 0
}

func (h *SyncPointControlMessageHandler) GetType(event SyncPointControlMessage) dynstream.EventType {
return dynstream.DefaultEventType
}

func (h *SyncPointControlMessageHandler) OnDrop(event SyncPointControlMessage) interface{} {
return nil
}

// RedoResolvedTsForwardMessageDynamicStream is responsible for push RedoResolvedTsForwardMessage to the corresponding table trigger event dispatcher.
func newRedoResolvedTsForwardMessageDynamicStream() dynstream.DynamicStream[int, common.GID, RedoResolvedTsForwardMessage, *DispatcherManager, *RedoResolvedTsForwardMessageHandler] {
ds := dynstream.NewParallelDynamicStream("redo-resolved-ts",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (m *DispatcherOrchestrator) handleBootstrapRequest(
req.StartTs,
from,
req.IsNewChangefeed,
req.SyncPointControl,
)
// Fast return the error to maintainer.
if err != nil {
Expand Down Expand Up @@ -203,6 +204,9 @@ func (m *DispatcherOrchestrator) handleBootstrapRequest(
m.mutex.Unlock()
metrics.DispatcherManagerGauge.WithLabelValues(cfId.Keyspace(), cfId.Name()).Inc()
} else {
if req.SyncPointControl != nil {
manager.SetSyncPointControl(req.SyncPointControl)
}
// Check and potentially add a table trigger event dispatcher.
// This is necessary during maintainer node migration, as the existing
// dispatcher manager on the new node may not have a table trigger
Expand Down Expand Up @@ -368,8 +372,9 @@ func createBootstrapResponse(
startTs, redoStartTs uint64,
) *heartbeatpb.MaintainerBootstrapResponse {
response := &heartbeatpb.MaintainerBootstrapResponse{
ChangefeedID: changefeedID,
Spans: make([]*heartbeatpb.BootstrapTableSpan, 0, manager.GetDispatcherMap().Len()),
ChangefeedID: changefeedID,
Spans: make([]*heartbeatpb.BootstrapTableSpan, 0, manager.GetDispatcherMap().Len()),
SyncPointControl: manager.GetSyncPointControl().ToPB(),
}

// table trigger event dispatcher startTs
Expand Down
Loading
Loading