From 7f3a7b57430d8709820cc157d1ebc486f9271caf Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 29 Apr 2026 11:43:28 +0800 Subject: [PATCH 01/12] optimize maintainer checkpoint tracking --- maintainer/maintainer.go | 45 +++++- maintainer/replica/replication_span.go | 17 +- maintainer/span/checkpoint_ts_tracker.go | 153 ++++++++++++++++++ maintainer/span/checkpoint_ts_tracker_test.go | 100 ++++++++++++ maintainer/span/span_controller.go | 52 ++++-- maintainer/span/span_controller_test.go | 114 +++++++++++++ pkg/scheduler/replica/replication.go | 17 +- pkg/scheduler/replica/replication_group.go | 37 ++++- .../replica/replication_group_test.go | 108 +++++++++++++ 9 files changed, 601 insertions(+), 42 deletions(-) create mode 100644 maintainer/span/checkpoint_ts_tracker.go create mode 100644 maintainer/span/checkpoint_ts_tracker_test.go create mode 100644 pkg/scheduler/replica/replication_group_test.go diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 3e4e152312..80fb86fc46 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -305,12 +305,13 @@ func (m *Maintainer) HandleEvent(event *Event) bool { if duration > time.Second { // add a log for debug an occasional slow bootstrap problem if event.eventType == EventMessage { - log.Info("maintainer is too slow", + fields := []zap.Field{ zap.Stringer("changefeedID", m.changefeedID), zap.Int("eventType", event.eventType), zap.Duration("duration", duration), - zap.Any("Message", event.message), - ) + } + fields = append(fields, slowMessageFields(event.message)...) + log.Info("maintainer is too slow", fields...) } else { log.Info("maintainer is too slow", zap.Stringer("changefeedID", m.changefeedID), @@ -345,6 +346,44 @@ func (m *Maintainer) HandleEvent(event *Event) bool { return false } +func slowMessageFields(msg *messaging.TargetMessage) []zap.Field { + if msg == nil { + return []zap.Field{zap.String("message", "nil")} + } + fields := []zap.Field{ + zap.Stringer("from", msg.From), + zap.Stringer("to", msg.To), + zap.Stringer("messageType", msg.Type), + zap.String("topic", msg.Topic), + zap.Int("messageCount", len(msg.Message)), + zap.Uint64("sequence", msg.Sequence), + } + if msg.Type != messaging.TypeHeartBeatRequest || len(msg.Message) == 0 { + return fields + } + req, ok := msg.Message[0].(*heartbeatpb.HeartBeatRequest) + if !ok { + return fields + } + fields = append(fields, + zap.Int("statusCount", len(req.Statuses)), + zap.Bool("completeStatus", req.CompeleteStatus), + ) + if watermark := req.GetWatermark(); watermark != nil { + fields = append(fields, + zap.Uint64("heartbeatCheckpointTs", watermark.CheckpointTs), + zap.Uint64("heartbeatResolvedTs", watermark.ResolvedTs), + ) + } + if watermark := req.GetRedoWatermark(); watermark != nil { + fields = append(fields, + zap.Uint64("redoHeartbeatCheckpointTs", watermark.CheckpointTs), + zap.Uint64("redoHeartbeatResolvedTs", watermark.ResolvedTs), + ) + } + return fields +} + func (m *Maintainer) checkNodeChanged() { m.nodeChanged.Lock() defer m.nodeChanged.Unlock() diff --git a/maintainer/replica/replication_span.go b/maintainer/replica/replication_span.go index 17fc2b1a84..e35aa9ac40 100644 --- a/maintainer/replica/replication_span.go +++ b/maintainer/replica/replication_span.go @@ -173,13 +173,18 @@ func (r *SpanReplication) GetMode() int64 { // // The new status is only stored if its checkpointTs is greater than or equal to // the current status's checkpointTs. -func (r *SpanReplication) UpdateStatus(newStatus *heartbeatpb.TableSpanStatus) { - if newStatus != nil { - oldStatus := r.status.Load() - if newStatus.CheckpointTs >= oldStatus.CheckpointTs { - r.status.Store(newStatus) - } +// +// It returns true when the stored checkpointTs changes. +func (r *SpanReplication) UpdateStatus(newStatus *heartbeatpb.TableSpanStatus) bool { + if newStatus == nil { + return false + } + oldStatus := r.status.Load() + if newStatus.CheckpointTs < oldStatus.CheckpointTs { + return false } + r.status.Store(newStatus) + return newStatus.CheckpointTs != oldStatus.CheckpointTs } // ShouldRun always returns true. diff --git a/maintainer/span/checkpoint_ts_tracker.go b/maintainer/span/checkpoint_ts_tracker.go new file mode 100644 index 0000000000..e0151749bb --- /dev/null +++ b/maintainer/span/checkpoint_ts_tracker.go @@ -0,0 +1,153 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package span + +import ( + "container/heap" + + "github.com/pingcap/ticdc/pkg/common" +) + +type checkpointTsTracker struct { + // byID contains exactly the non-DDL spans that are absent or scheduling. + // The owning Controller must hold its mutex when accessing this tracker. + byID map[common.DispatcherID]uint64 + counts map[uint64]int + heap checkpointTsHeap +} + +func newCheckpointTsTracker() *checkpointTsTracker { + return &checkpointTsTracker{ + byID: make(map[common.DispatcherID]uint64), + counts: make(map[uint64]int), + heap: newCheckpointTsHeap(), + } +} + +func (t *checkpointTsTracker) addOrUpdate(id common.DispatcherID, checkpointTs uint64) { + if old, ok := t.byID[id]; ok { + if old == checkpointTs { + return + } + t.decrement(old) + } + t.byID[id] = checkpointTs + t.increment(checkpointTs) +} + +func (t *checkpointTsTracker) update(id common.DispatcherID, checkpointTs uint64) { + old, ok := t.byID[id] + if !ok || old == checkpointTs { + return + } + t.decrement(old) + t.byID[id] = checkpointTs + t.increment(checkpointTs) +} + +func (t *checkpointTsTracker) remove(id common.DispatcherID) { + old, ok := t.byID[id] + if !ok { + return + } + delete(t.byID, id) + t.decrement(old) + if len(t.byID) == 0 { + t.reset() + } +} + +func (t *checkpointTsTracker) min() (uint64, bool) { + if t.heap.Len() == 0 { + return 0, false + } + return t.heap.peek(), true +} + +func (t *checkpointTsTracker) increment(checkpointTs uint64) { + if t.counts[checkpointTs] > 0 { + t.counts[checkpointTs]++ + return + } + t.counts[checkpointTs] = 1 + heap.Push(&t.heap, checkpointTs) +} + +func (t *checkpointTsTracker) decrement(checkpointTs uint64) { + count := t.counts[checkpointTs] + if count <= 1 { + delete(t.counts, checkpointTs) + t.heap.remove(checkpointTs) + return + } + t.counts[checkpointTs] = count - 1 +} + +func (t *checkpointTsTracker) reset() { + t.byID = make(map[common.DispatcherID]uint64) + t.counts = make(map[uint64]int) + t.heap = newCheckpointTsHeap() +} + +type checkpointTsHeap struct { + values []uint64 + indexes map[uint64]int +} + +func newCheckpointTsHeap() checkpointTsHeap { + return checkpointTsHeap{ + indexes: make(map[uint64]int), + } +} + +func (h checkpointTsHeap) Len() int { + return len(h.values) +} + +func (h checkpointTsHeap) Less(i, j int) bool { + return h.values[i] < h.values[j] +} + +func (h checkpointTsHeap) Swap(i, j int) { + h.values[i], h.values[j] = h.values[j], h.values[i] + h.indexes[h.values[i]] = i + h.indexes[h.values[j]] = j +} + +func (h *checkpointTsHeap) Push(x any) { + checkpointTs := x.(uint64) + h.indexes[checkpointTs] = len(h.values) + h.values = append(h.values, checkpointTs) +} + +func (h *checkpointTsHeap) Pop() any { + n := len(h.values) + checkpointTs := h.values[n-1] + delete(h.indexes, checkpointTs) + h.values[n-1] = 0 + h.values = h.values[:n-1] + return checkpointTs +} + +func (h *checkpointTsHeap) peek() uint64 { + return h.values[0] +} + +func (h *checkpointTsHeap) remove(checkpointTs uint64) { + index, ok := h.indexes[checkpointTs] + if !ok { + return + } + heap.Remove(h, index) +} diff --git a/maintainer/span/checkpoint_ts_tracker_test.go b/maintainer/span/checkpoint_ts_tracker_test.go new file mode 100644 index 0000000000..0f2537ff27 --- /dev/null +++ b/maintainer/span/checkpoint_ts_tracker_test.go @@ -0,0 +1,100 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package span + +import ( + "testing" + + "github.com/pingcap/ticdc/pkg/common" +) + +func TestCheckpointTsTrackerMin(t *testing.T) { + t.Parallel() + + tracker := newCheckpointTsTracker() + id1 := common.NewDispatcherID() + id2 := common.NewDispatcherID() + id3 := common.NewDispatcherID() + + tracker.addOrUpdate(id1, 100) + tracker.addOrUpdate(id2, 80) + tracker.addOrUpdate(id3, 80) + + got, ok := tracker.min() + if !ok || got != 80 { + t.Fatalf("checkpointTsTracker.min() = %d, %v, want 80, true", got, ok) + } + + tracker.update(id2, 120) + got, ok = tracker.min() + if !ok || got != 80 { + t.Fatalf("checkpointTsTracker.min() after one duplicate update = %d, %v, want 80, true", got, ok) + } + + tracker.remove(id3) + got, ok = tracker.min() + if !ok || got != 100 { + t.Fatalf("checkpointTsTracker.min() after removing duplicate min = %d, %v, want 100, true", got, ok) + } + + tracker.remove(id1) + got, ok = tracker.min() + if !ok || got != 120 { + t.Fatalf("checkpointTsTracker.min() after removing current min = %d, %v, want 120, true", got, ok) + } + + tracker.remove(id2) + got, ok = tracker.min() + if ok || got != 0 { + t.Fatalf("checkpointTsTracker.min() after removing all = %d, %v, want 0, false", got, ok) + } +} + +func TestCheckpointTsTrackerIgnoresMissingUpdate(t *testing.T) { + t.Parallel() + + tracker := newCheckpointTsTracker() + id := common.NewDispatcherID() + tracker.update(id, 100) + tracker.remove(id) + + got, ok := tracker.min() + if ok || got != 0 { + t.Fatalf("checkpointTsTracker.min() after missing update = %d, %v, want 0, false", got, ok) + } +} + +func TestCheckpointTsTrackerRemovesStaleCheckpointTs(t *testing.T) { + t.Parallel() + + tracker := newCheckpointTsTracker() + blockingID := common.NewDispatcherID() + movingID := common.NewDispatcherID() + tracker.addOrUpdate(blockingID, 1) + tracker.addOrUpdate(movingID, 2) + + for checkpointTs := uint64(3); checkpointTs < 100; checkpointTs++ { + tracker.update(movingID, checkpointTs) + } + + if got := tracker.heap.Len(); got != 2 { + t.Fatalf("checkpointTsTracker heap size = %d, want 2", got) + } + + tracker.remove(blockingID) + got, ok := tracker.min() + if !ok || got != 99 { + t.Fatalf("checkpointTsTracker.min() after removing blocker = %d, %v, want 99, true", got, ok) + } +} diff --git a/maintainer/span/span_controller.go b/maintainer/span/span_controller.go index 45938cb9bc..0815e59657 100644 --- a/maintainer/span/span_controller.go +++ b/maintainer/span/span_controller.go @@ -57,7 +57,8 @@ type Controller struct { // so no need to schedule it ddlSpan *replica.SpanReplication - // mu protects concurrent access to [pkgreplica.ReplicationDB, ddlSpan, allTasks, schemaTasks, tableTasks] + // mu protects concurrent access to [pkgreplica.ReplicationDB, ddlSpan, allTasks, schemaTasks, tableTasks, + // nonReplicatingCheckpointTs] mu sync.RWMutex // ReplicationDB tracks the scheduling status of spans pkgreplica.ReplicationDB[common.DispatcherID, *replica.SpanReplication] @@ -67,6 +68,8 @@ type Controller struct { schemaTasks map[int64]map[common.DispatcherID]*replica.SpanReplication // tableTasks provides quick access to spans by table ID tableTasks map[int64]map[common.DispatcherID]*replica.SpanReplication + // nonReplicatingCheckpointTs tracks absent and scheduling spans so checkpoint calculation does not scan all spans. + nonReplicatingCheckpointTs *checkpointTsTracker // newGroupChecker creates a GroupChecker for validating span groups newGroupChecker func(groupID pkgreplica.GroupID) pkgreplica.GroupChecker[common.DispatcherID, *replica.SpanReplication] @@ -108,9 +111,10 @@ func NewController( keyspaceID: keyspaceID, maintainerCommittedCheckpointTs: atomic.NewUint64(ddlSpan.GetStatus().CheckpointTs), - schemaTasks: make(map[int64]map[common.DispatcherID]*replica.SpanReplication), - tableTasks: make(map[int64]map[common.DispatcherID]*replica.SpanReplication), - allTasks: make(map[common.DispatcherID]*replica.SpanReplication), + schemaTasks: make(map[int64]map[common.DispatcherID]*replica.SpanReplication), + tableTasks: make(map[int64]map[common.DispatcherID]*replica.SpanReplication), + allTasks: make(map[common.DispatcherID]*replica.SpanReplication), + nonReplicatingCheckpointTs: newCheckpointTsTracker(), } c.ReplicationDB = pkgreplica.NewReplicationDB(changefeedID.String(), c.doWithRLock, c.newGroupChecker) c.initializeDDLSpan(ddlSpan) @@ -229,15 +233,12 @@ func (c *Controller) AddNewSpans(schemaID int64, tableSpans []*heartbeatpb.Table } func (c *Controller) GetMinCheckpointTsForNonReplicatingSpans(minCheckpointTs uint64) uint64 { - for _, span := range c.GetAbsent() { - if span.GetStatus().CheckpointTs < minCheckpointTs { - minCheckpointTs = span.GetStatus().CheckpointTs - } - } - for _, span := range c.GetScheduling() { - if span.GetStatus().CheckpointTs < minCheckpointTs { - minCheckpointTs = span.GetStatus().CheckpointTs - } + c.mu.RLock() + defer c.mu.RUnlock() + + checkpointTs, ok := c.nonReplicatingCheckpointTs.min() + if ok && checkpointTs < minCheckpointTs { + return checkpointTs } return minCheckpointTs } @@ -352,10 +353,9 @@ func (c *Controller) UpdateSchemaID(tableID, newSchemaID int64) { // UpdateStatus updates the status of a span func (c *Controller) UpdateStatus(span *replica.SpanReplication, status *heartbeatpb.TableSpanStatus) { - span.UpdateStatus(status) - if span == c.ddlSpan { // ddl span don't need check by checker + span.UpdateStatus(status) return } // Note: a read lock is required inside the `GetGroupChecker` method. @@ -363,6 +363,9 @@ func (c *Controller) UpdateStatus(span *replica.SpanReplication, status *heartbe c.mu.Lock() defer c.mu.Unlock() + if span.UpdateStatus(status) { + c.nonReplicatingCheckpointTs.update(span.ID, span.GetStatus().CheckpointTs) + } checker.UpdateStatus(span) } @@ -388,6 +391,7 @@ func (c *Controller) AddReplicatingSpan(span *replica.SpanReplication) { c.allTasks[span.ID] = span c.addToSchemaAndTableMap(span) c.AddReplicatingWithoutLock(span) + c.untrackNonReplicatingSpan(span) } // MarkSpanAbsent marks span as absent @@ -395,6 +399,7 @@ func (c *Controller) MarkSpanAbsent(span *replica.SpanReplication) { c.mu.Lock() defer c.mu.Unlock() c.MarkAbsentWithoutLock(span) + c.trackNonReplicatingSpan(span) } // MarkSpanScheduling marks span as scheduling @@ -402,6 +407,7 @@ func (c *Controller) MarkSpanScheduling(span *replica.SpanReplication) { c.mu.Lock() defer c.mu.Unlock() c.MarkSchedulingWithoutLock(span) + c.trackNonReplicatingSpan(span) } // MarkSpanReplicating marks span as replicating @@ -409,6 +415,7 @@ func (c *Controller) MarkSpanReplicating(span *replica.SpanReplication) { c.mu.Lock() defer c.mu.Unlock() c.MarkReplicatingWithoutLock(span) + c.untrackNonReplicatingSpan(span) } // BindSpanToNode binds span to node @@ -416,6 +423,7 @@ func (c *Controller) BindSpanToNode(old, new node.ID, span *replica.SpanReplicat c.mu.Lock() defer c.mu.Unlock() c.BindReplicaToNodeWithoutLock(old, new, span) + c.trackNonReplicatingSpan(span) } // RemoveReplicatingSpan removes replicating span @@ -432,6 +440,7 @@ func (c *Controller) addAbsentReplicaSetWithoutLock(spans ...*replica.SpanReplic c.allTasks[span.ID] = span c.AddAbsentWithoutLock(span) c.addToSchemaAndTableMap(span) + c.trackNonReplicatingSpan(span) } } @@ -441,6 +450,7 @@ func (c *Controller) addSchedulingReplicaSetWithoutLock(span *replica.SpanReplic c.allTasks[span.ID] = span c.AddSchedulingReplicaWithoutLock(span, targetNodeID) c.addToSchemaAndTableMap(span) + c.trackNonReplicatingSpan(span) } // ReplaceReplicaSet replaces old replica sets with new spans and returns the newly created replicas. @@ -581,6 +591,7 @@ func (c *Controller) RemoveBySchemaID(schemaID int64) { func (c *Controller) removeSpanWithoutLock(spans ...*replica.SpanReplication) { for _, span := range spans { c.RemoveReplicaWithoutLock(span) + c.untrackNonReplicatingSpan(span) tableID := span.Span.TableID schemaID := span.GetSchemaID() @@ -596,6 +607,17 @@ func (c *Controller) removeSpanWithoutLock(spans ...*replica.SpanReplication) { } } +func (c *Controller) trackNonReplicatingSpan(span *replica.SpanReplication) { + if span == c.ddlSpan { + return + } + c.nonReplicatingCheckpointTs.addOrUpdate(span.ID, span.GetStatus().CheckpointTs) +} + +func (c *Controller) untrackNonReplicatingSpan(span *replica.SpanReplication) { + c.nonReplicatingCheckpointTs.remove(span.ID) +} + // addToSchemaAndTableMap adds the span to the schema and table map func (c *Controller) addToSchemaAndTableMap(span *replica.SpanReplication) { tableID := span.Span.TableID diff --git a/maintainer/span/span_controller_test.go b/maintainer/span/span_controller_test.go index 0e22c88039..e325443449 100644 --- a/maintainer/span/span_controller_test.go +++ b/maintainer/span/span_controller_test.go @@ -29,6 +29,120 @@ import ( "github.com/stretchr/testify/require" ) +func newControllerForCheckpointTsTrackerTest(t *testing.T) *Controller { + t.Helper() + + changefeedID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + ddlDispatcherID := common.NewDispatcherID() + ddlSpan := replica.NewWorkingSpanReplication(changefeedID, ddlDispatcherID, + common.DDLSpanSchemaID, + common.KeyspaceDDLSpan(common.DefaultKeyspaceID), &heartbeatpb.TableSpanStatus{ + ID: ddlDispatcherID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }, "node1", false) + appcontext.SetService(watcher.NodeManagerName, watcher.NewNodeManager(nil, nil)) + return NewController(changefeedID, ddlSpan, nil, nil, nil, common.DefaultKeyspaceID, common.DefaultMode) +} + +func newSpanReplicationForCheckpointTsTrackerTest( + controller *Controller, + schemaID int64, + tableID int64, + checkpointTs uint64, +) *replica.SpanReplication { + span := common.TableIDToComparableSpan(common.DefaultKeyspaceID, tableID) + tableSpan := &heartbeatpb.TableSpan{ + TableID: tableID, + StartKey: span.StartKey, + EndKey: span.EndKey, + KeyspaceID: common.DefaultKeyspaceID, + } + return replica.NewSpanReplication( + controller.changefeedID, + common.NewDispatcherID(), + schemaID, + tableSpan, + checkpointTs, + common.DefaultMode, + false, + ) +} + +func TestControllerGetMinCheckpointTsForNonReplicatingSpans(t *testing.T) { + controller := newControllerForCheckpointTsTrackerTest(t) + span1 := newSpanReplicationForCheckpointTsTrackerTest(controller, 1, 100, 100) + span2 := newSpanReplicationForCheckpointTsTrackerTest(controller, 1, 101, 80) + controller.AddAbsentReplicaSet(span1, span2) + + require.Equal(t, uint64(80), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) + + controller.MarkSpanScheduling(span2) + controller.UpdateStatus(span2, &heartbeatpb.TableSpanStatus{ + ID: span2.ID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 120, + }) + require.Equal(t, uint64(100), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) + + controller.MarkSpanReplicating(span1) + require.Equal(t, uint64(120), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) + + controller.MarkSpanReplicating(span2) + require.Equal(t, uint64(1000), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) +} + +func TestControllerCheckpointTsTrackerBindSpanToNode(t *testing.T) { + controller := newControllerForCheckpointTsTrackerTest(t) + span1 := newSpanReplicationForCheckpointTsTrackerTest(controller, 1, 100, 100) + span2 := newSpanReplicationForCheckpointTsTrackerTest(controller, 1, 101, 80) + controller.AddAbsentReplicaSet(span1, span2) + + controller.BindSpanToNode("", "node1", span2) + require.Equal(t, uint64(80), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) + + controller.UpdateStatus(span2, &heartbeatpb.TableSpanStatus{ + ID: span2.ID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 120, + }) + require.Equal(t, uint64(100), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) + + controller.MarkSpanReplicating(span2) + require.Equal(t, uint64(100), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) + + controller.MarkSpanReplicating(span1) + require.Equal(t, uint64(1000), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) +} + +func TestControllerCheckpointTsTrackerReplaceAndRemove(t *testing.T) { + controller := newControllerForCheckpointTsTrackerTest(t) + oldSpan := newSpanReplicationForCheckpointTsTrackerTest(controller, 1, 100, 50) + controller.AddAbsentReplicaSet(oldSpan) + newSpan := common.TableIDToComparableSpan(common.DefaultKeyspaceID, 101) + newTableSpan := &heartbeatpb.TableSpan{ + TableID: 101, + StartKey: newSpan.StartKey, + EndKey: newSpan.EndKey, + KeyspaceID: common.DefaultKeyspaceID, + } + + newSpans, inScheduling := controller.ReplaceReplicaSet( + []*replica.SpanReplication{oldSpan}, + []*heartbeatpb.TableSpan{newTableSpan}, + 80, + nil, + ) + + require.False(t, inScheduling) + require.Len(t, newSpans, 1) + require.Nil(t, controller.GetTaskByID(oldSpan.ID)) + require.Equal(t, uint64(50), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) + + controller.RemoveByTableIDs(101) + require.Equal(t, uint64(1000), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) +} + func TestNewController(t *testing.T) { cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) ddlDispatcherID := common.NewDispatcherID() diff --git a/pkg/scheduler/replica/replication.go b/pkg/scheduler/replica/replication.go index 0bcf8cd583..faba76fc6e 100644 --- a/pkg/scheduler/replica/replication.go +++ b/pkg/scheduler/replica/replication.go @@ -171,15 +171,10 @@ func (db *replicationDB[T, R]) GetAbsentSize() int { } func (db *replicationDB[T, R]) GetAbsentByGroup(id GroupID, batch int) []R { - buffer := make([]R, 0, batch) + var buffer []R db.withRLock(func() { g := db.mustGetGroup(id) - for _, stm := range g.GetAbsent() { - buffer = append(buffer, stm) - if len(buffer) >= batch { - break - } - } + buffer = g.GetAbsentBatch(batch) }) return buffer } @@ -249,9 +244,11 @@ func (db *replicationDB[T, R]) GetSchedulingWithoutLock() (ret []R) { func (db *replicationDB[T, R]) GetSchedulingSize() int { size := 0 - for _, g := range db.taskGroups { - size += g.GetSchedulingSize() - } + db.withRLock(func() { + for _, g := range db.taskGroups { + size += g.GetSchedulingSize() + } + }) return size } diff --git a/pkg/scheduler/replica/replication_group.go b/pkg/scheduler/replica/replication_group.go index afd9b1ea93..f0aee6519f 100644 --- a/pkg/scheduler/replica/replication_group.go +++ b/pkg/scheduler/replica/replication_group.go @@ -15,6 +15,7 @@ package replica import ( "sync" + "sync/atomic" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/node" @@ -224,6 +225,24 @@ func (g *replicationGroup[T, R]) GetAbsent() []R { return res } +func (g *replicationGroup[T, R]) GetAbsentBatch(batch int) []R { + if batch <= 0 { + return nil + } + capacity := batch + if absentSize := g.absent.Len(); absentSize < capacity { + capacity = absentSize + } + res := make([]R, 0, capacity) + g.absent.Range(func(_ T, r R) bool { + if r.ShouldRun() { + res = append(res, r) + } + return len(res) < batch + }) + return res +} + func (g *replicationGroup[T, R]) GetSchedulingSize() int { return g.scheduling.Len() } @@ -268,6 +287,7 @@ func (g *replicationGroup[T, R]) IsReplicating(replica R) bool { type iMap[T ReplicationID, R Replication[T]] struct { inner sync.Map + size atomic.Int64 } func newIMap[T ReplicationID, R Replication[T]]() *iMap[T, R] { @@ -289,20 +309,21 @@ func (m *iMap[T, R]) Get(key T) (R, bool) { } func (m *iMap[T, R]) Set(key T, value R) { - m.inner.Store(key, value) + if _, loaded := m.inner.LoadOrStore(key, value); loaded { + m.inner.Store(key, value) + return + } + m.size.Add(1) } func (m *iMap[T, R]) Delete(key T) { - m.inner.Delete(key) + if _, loaded := m.inner.LoadAndDelete(key); loaded { + m.size.Add(-1) + } } func (m *iMap[T, R]) Len() int { - var count int - m.inner.Range(func(_, _ interface{}) bool { - count++ - return true - }) - return count + return int(m.size.Load()) } func (m *iMap[T, R]) Range(f func(T, R) bool) { diff --git a/pkg/scheduler/replica/replication_group_test.go b/pkg/scheduler/replica/replication_group_test.go new file mode 100644 index 0000000000..629972f369 --- /dev/null +++ b/pkg/scheduler/replica/replication_group_test.go @@ -0,0 +1,108 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package replica + +import ( + "fmt" + "sync/atomic" + "testing" + + "github.com/pingcap/ticdc/pkg/node" +) + +type testReplicationID string + +func (id testReplicationID) String() string { + return string(id) +} + +type testReplication struct { + id testReplicationID + groupID GroupID + nodeID node.ID + shouldRunCalls *atomic.Int64 +} + +func (r *testReplication) GetID() testReplicationID { + return r.id +} + +func (r *testReplication) GetGroupID() GroupID { + return r.groupID +} + +func (r *testReplication) GetNodeID() node.ID { + return r.nodeID +} + +func (r *testReplication) SetNodeID(nodeID node.ID) { + r.nodeID = nodeID +} + +func (r *testReplication) ShouldRun() bool { + if r.shouldRunCalls != nil { + r.shouldRunCalls.Add(1) + } + return true +} + +func TestIMapLenTracksOverwriteAndDelete(t *testing.T) { + t.Parallel() + + replicaMap := newIMap[testReplicationID, *testReplication]() + id := testReplicationID("a") + + replicaMap.Set(id, &testReplication{id: id}) + replicaMap.Set(id, &testReplication{id: id}) + if got := replicaMap.Len(); got != 1 { + t.Fatalf("iMap.Len() after overwrite = %d, want 1", got) + } + + replicaMap.Delete(testReplicationID("missing")) + if got := replicaMap.Len(); got != 1 { + t.Fatalf("iMap.Len() after deleting missing key = %d, want 1", got) + } + + replicaMap.Delete(id) + if got := replicaMap.Len(); got != 0 { + t.Fatalf("iMap.Len() after delete = %d, want 0", got) + } +} + +func TestGetAbsentByGroupStopsAtBatch(t *testing.T) { + t.Parallel() + + var shouldRunCalls atomic.Int64 + db := NewReplicationDB[testReplicationID, *testReplication]( + "test", + func(action func()) { action() }, + NewEmptyChecker[testReplicationID, *testReplication], + ) + for i := 0; i < 100; i++ { + id := testReplicationID(fmt.Sprintf("r%d", i)) + db.AddAbsentWithoutLock(&testReplication{ + id: id, + groupID: DefaultGroupID, + shouldRunCalls: &shouldRunCalls, + }) + } + + absent := db.GetAbsentByGroup(DefaultGroupID, 3) + if got := len(absent); got != 3 { + t.Fatalf("GetAbsentByGroup() returned %d tasks, want 3", got) + } + if got := shouldRunCalls.Load(); got != 3 { + t.Fatalf("GetAbsentByGroup() called ShouldRun %d times, want 3", got) + } +} From 2339054366c1790d34701ff11fcd81f4f09e3e36 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 29 Apr 2026 11:58:30 +0800 Subject: [PATCH 02/12] add checkpoint tracker coverage --- maintainer/maintainer_slow_log_test.go | 134 ++++++++++++++++++ maintainer/span/checkpoint_ts_tracker.go | 2 + maintainer/span/span_controller_test.go | 52 +++++++ .../replica/replication_group_test.go | 31 +++- 4 files changed, 218 insertions(+), 1 deletion(-) create mode 100644 maintainer/maintainer_slow_log_test.go diff --git a/maintainer/maintainer_slow_log_test.go b/maintainer/maintainer_slow_log_test.go new file mode 100644 index 0000000000..2b3208c8fc --- /dev/null +++ b/maintainer/maintainer_slow_log_test.go @@ -0,0 +1,134 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package maintainer + +import ( + "testing" + + "github.com/pingcap/ticdc/heartbeatpb" + "github.com/pingcap/ticdc/pkg/messaging" + "github.com/pingcap/ticdc/pkg/node" + "go.uber.org/zap" +) + +func TestSlowMessageFields(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + message *messaging.TargetMessage + hasKeys []string + missKeys []string + }{ + { + name: "nil", + message: nil, + hasKeys: []string{ + "message", + }, + missKeys: []string{ + "messageType", + "statusCount", + }, + }, + { + name: "heartbeat", + message: &messaging.TargetMessage{ + From: node.ID("node1"), + To: node.ID("node2"), + Topic: messaging.MaintainerManagerTopic, + Type: messaging.TypeHeartBeatRequest, + Message: []messaging.IOTypeT{ + &heartbeatpb.HeartBeatRequest{ + Watermark: &heartbeatpb.Watermark{ + CheckpointTs: 10, + ResolvedTs: 20, + }, + RedoWatermark: &heartbeatpb.Watermark{ + CheckpointTs: 30, + ResolvedTs: 40, + }, + Statuses: []*heartbeatpb.TableSpanStatus{ + {CheckpointTs: 10}, + {CheckpointTs: 11}, + }, + CompeleteStatus: true, + }, + }, + Sequence: 1, + }, + hasKeys: []string{ + "from", + "to", + "messageType", + "topic", + "messageCount", + "sequence", + "statusCount", + "completeStatus", + "heartbeatCheckpointTs", + "heartbeatResolvedTs", + "redoHeartbeatCheckpointTs", + "redoHeartbeatResolvedTs", + }, + }, + { + name: "non-heartbeat", + message: &messaging.TargetMessage{ + From: node.ID("node1"), + To: node.ID("node2"), + Topic: messaging.MaintainerManagerTopic, + Type: messaging.TypeMaintainerCloseResponse, + Message: []messaging.IOTypeT{ + &heartbeatpb.MaintainerCloseResponse{}, + }, + }, + hasKeys: []string{ + "from", + "to", + "messageType", + "topic", + "messageCount", + }, + missKeys: []string{ + "statusCount", + "heartbeatCheckpointTs", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + keys := zapFieldKeys(slowMessageFields(tt.message)) + for _, key := range tt.hasKeys { + if _, ok := keys[key]; !ok { + t.Fatalf("slowMessageFields() missing key %q", key) + } + } + for _, key := range tt.missKeys { + if _, ok := keys[key]; ok { + t.Fatalf("slowMessageFields() unexpectedly has key %q", key) + } + } + }) + } +} + +func zapFieldKeys(fields []zap.Field) map[string]struct{} { + keys := make(map[string]struct{}, len(fields)) + for _, field := range fields { + keys[field.Key] = struct{}{} + } + return keys +} diff --git a/maintainer/span/checkpoint_ts_tracker.go b/maintainer/span/checkpoint_ts_tracker.go index e0151749bb..f84d663f06 100644 --- a/maintainer/span/checkpoint_ts_tracker.go +++ b/maintainer/span/checkpoint_ts_tracker.go @@ -64,6 +64,8 @@ func (t *checkpointTsTracker) remove(id common.DispatcherID) { delete(t.byID, id) t.decrement(old) if len(t.byID) == 0 { + // Release large maps after a bootstrap wave drains. A 1M-table changefeed + // can otherwise retain the tracker backing storage for its whole lifetime. t.reset() } } diff --git a/maintainer/span/span_controller_test.go b/maintainer/span/span_controller_test.go index e325443449..827f175e12 100644 --- a/maintainer/span/span_controller_test.go +++ b/maintainer/span/span_controller_test.go @@ -115,6 +115,31 @@ func TestControllerCheckpointTsTrackerBindSpanToNode(t *testing.T) { require.Equal(t, uint64(1000), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) } +func TestControllerCheckpointTsTrackerDirectScheduling(t *testing.T) { + controller := newControllerForCheckpointTsTrackerTest(t) + span := newSpanReplicationForCheckpointTsTrackerTest(controller, 1, 100, 90) + controller.AddSchedulingReplicaSet(span, "node1") + + require.Equal(t, uint64(90), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) + + controller.UpdateStatus(span, &heartbeatpb.TableSpanStatus{ + ID: span.ID.ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 80, + }) + require.Equal(t, uint64(90), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) + + controller.UpdateStatus(controller.GetDDLDispatcher(), &heartbeatpb.TableSpanStatus{ + ID: controller.GetDDLDispatcherID().ToPB(), + ComponentStatus: heartbeatpb.ComponentState_Working, + CheckpointTs: 1, + }) + require.Equal(t, uint64(90), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) + + controller.MarkSpanReplicating(span) + require.Equal(t, uint64(1000), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) +} + func TestControllerCheckpointTsTrackerReplaceAndRemove(t *testing.T) { controller := newControllerForCheckpointTsTrackerTest(t) oldSpan := newSpanReplicationForCheckpointTsTrackerTest(controller, 1, 100, 50) @@ -143,6 +168,33 @@ func TestControllerCheckpointTsTrackerReplaceAndRemove(t *testing.T) { require.Equal(t, uint64(1000), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) } +func TestControllerCheckpointTsTrackerReplaceIntoScheduling(t *testing.T) { + controller := newControllerForCheckpointTsTrackerTest(t) + oldSpan := newSpanReplicationForCheckpointTsTrackerTest(controller, 1, 100, 50) + controller.AddAbsentReplicaSet(oldSpan) + newSpan := common.TableIDToComparableSpan(common.DefaultKeyspaceID, 101) + newTableSpan := &heartbeatpb.TableSpan{ + TableID: 101, + StartKey: newSpan.StartKey, + EndKey: newSpan.EndKey, + KeyspaceID: common.DefaultKeyspaceID, + } + + newSpans, inScheduling := controller.ReplaceReplicaSet( + []*replica.SpanReplication{oldSpan}, + []*heartbeatpb.TableSpan{newTableSpan}, + 80, + []node.ID{"node1"}, + ) + + require.True(t, inScheduling) + require.Len(t, newSpans, 1) + require.Equal(t, uint64(50), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) + + controller.MarkSpanReplicating(newSpans[0]) + require.Equal(t, uint64(1000), controller.GetMinCheckpointTsForNonReplicatingSpans(1000)) +} + func TestNewController(t *testing.T) { cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) ddlDispatcherID := common.NewDispatcherID() diff --git a/pkg/scheduler/replica/replication_group_test.go b/pkg/scheduler/replica/replication_group_test.go index 629972f369..afdbf9f501 100644 --- a/pkg/scheduler/replica/replication_group_test.go +++ b/pkg/scheduler/replica/replication_group_test.go @@ -31,6 +31,7 @@ type testReplication struct { id testReplicationID groupID GroupID nodeID node.ID + shouldRun bool shouldRunCalls *atomic.Int64 } @@ -54,7 +55,7 @@ func (r *testReplication) ShouldRun() bool { if r.shouldRunCalls != nil { r.shouldRunCalls.Add(1) } - return true + return r.shouldRun } func TestIMapLenTracksOverwriteAndDelete(t *testing.T) { @@ -94,6 +95,7 @@ func TestGetAbsentByGroupStopsAtBatch(t *testing.T) { db.AddAbsentWithoutLock(&testReplication{ id: id, groupID: DefaultGroupID, + shouldRun: true, shouldRunCalls: &shouldRunCalls, }) } @@ -106,3 +108,30 @@ func TestGetAbsentByGroupStopsAtBatch(t *testing.T) { t.Fatalf("GetAbsentByGroup() called ShouldRun %d times, want 3", got) } } + +func TestGetAbsentByGroupSkipsNotRunnableTasks(t *testing.T) { + t.Parallel() + + var shouldRunCalls atomic.Int64 + db := NewReplicationDB[testReplicationID, *testReplication]( + "test", + func(action func()) { action() }, + NewEmptyChecker[testReplicationID, *testReplication], + ) + for i := 0; i < 100; i++ { + id := testReplicationID(fmt.Sprintf("r%d", i)) + db.AddAbsentWithoutLock(&testReplication{ + id: id, + groupID: DefaultGroupID, + shouldRunCalls: &shouldRunCalls, + }) + } + + absent := db.GetAbsentByGroup(DefaultGroupID, 3) + if got := len(absent); got != 0 { + t.Fatalf("GetAbsentByGroup() returned %d tasks, want 0", got) + } + if got := shouldRunCalls.Load(); got != 100 { + t.Fatalf("GetAbsentByGroup() called ShouldRun %d times, want 100", got) + } +} From 7cfdb8dc9966f42a3815a0b1db2974368d08e16d Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 29 Apr 2026 14:46:40 +0800 Subject: [PATCH 03/12] update --- maintainer/maintainer.go | 45 +-------- maintainer/maintainer_slow_log_test.go | 134 ------------------------- 2 files changed, 3 insertions(+), 176 deletions(-) delete mode 100644 maintainer/maintainer_slow_log_test.go diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 80fb86fc46..580286ff9e 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -305,13 +305,12 @@ func (m *Maintainer) HandleEvent(event *Event) bool { if duration > time.Second { // add a log for debug an occasional slow bootstrap problem if event.eventType == EventMessage { - fields := []zap.Field{ + log.Info("maintainer is too slow", zap.Stringer("changefeedID", m.changefeedID), zap.Int("eventType", event.eventType), zap.Duration("duration", duration), - } - fields = append(fields, slowMessageFields(event.message)...) - log.Info("maintainer is too slow", fields...) + zap.Any("MessageType", event.message.Type), + ) } else { log.Info("maintainer is too slow", zap.Stringer("changefeedID", m.changefeedID), @@ -346,44 +345,6 @@ func (m *Maintainer) HandleEvent(event *Event) bool { return false } -func slowMessageFields(msg *messaging.TargetMessage) []zap.Field { - if msg == nil { - return []zap.Field{zap.String("message", "nil")} - } - fields := []zap.Field{ - zap.Stringer("from", msg.From), - zap.Stringer("to", msg.To), - zap.Stringer("messageType", msg.Type), - zap.String("topic", msg.Topic), - zap.Int("messageCount", len(msg.Message)), - zap.Uint64("sequence", msg.Sequence), - } - if msg.Type != messaging.TypeHeartBeatRequest || len(msg.Message) == 0 { - return fields - } - req, ok := msg.Message[0].(*heartbeatpb.HeartBeatRequest) - if !ok { - return fields - } - fields = append(fields, - zap.Int("statusCount", len(req.Statuses)), - zap.Bool("completeStatus", req.CompeleteStatus), - ) - if watermark := req.GetWatermark(); watermark != nil { - fields = append(fields, - zap.Uint64("heartbeatCheckpointTs", watermark.CheckpointTs), - zap.Uint64("heartbeatResolvedTs", watermark.ResolvedTs), - ) - } - if watermark := req.GetRedoWatermark(); watermark != nil { - fields = append(fields, - zap.Uint64("redoHeartbeatCheckpointTs", watermark.CheckpointTs), - zap.Uint64("redoHeartbeatResolvedTs", watermark.ResolvedTs), - ) - } - return fields -} - func (m *Maintainer) checkNodeChanged() { m.nodeChanged.Lock() defer m.nodeChanged.Unlock() diff --git a/maintainer/maintainer_slow_log_test.go b/maintainer/maintainer_slow_log_test.go deleted file mode 100644 index 2b3208c8fc..0000000000 --- a/maintainer/maintainer_slow_log_test.go +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package maintainer - -import ( - "testing" - - "github.com/pingcap/ticdc/heartbeatpb" - "github.com/pingcap/ticdc/pkg/messaging" - "github.com/pingcap/ticdc/pkg/node" - "go.uber.org/zap" -) - -func TestSlowMessageFields(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - message *messaging.TargetMessage - hasKeys []string - missKeys []string - }{ - { - name: "nil", - message: nil, - hasKeys: []string{ - "message", - }, - missKeys: []string{ - "messageType", - "statusCount", - }, - }, - { - name: "heartbeat", - message: &messaging.TargetMessage{ - From: node.ID("node1"), - To: node.ID("node2"), - Topic: messaging.MaintainerManagerTopic, - Type: messaging.TypeHeartBeatRequest, - Message: []messaging.IOTypeT{ - &heartbeatpb.HeartBeatRequest{ - Watermark: &heartbeatpb.Watermark{ - CheckpointTs: 10, - ResolvedTs: 20, - }, - RedoWatermark: &heartbeatpb.Watermark{ - CheckpointTs: 30, - ResolvedTs: 40, - }, - Statuses: []*heartbeatpb.TableSpanStatus{ - {CheckpointTs: 10}, - {CheckpointTs: 11}, - }, - CompeleteStatus: true, - }, - }, - Sequence: 1, - }, - hasKeys: []string{ - "from", - "to", - "messageType", - "topic", - "messageCount", - "sequence", - "statusCount", - "completeStatus", - "heartbeatCheckpointTs", - "heartbeatResolvedTs", - "redoHeartbeatCheckpointTs", - "redoHeartbeatResolvedTs", - }, - }, - { - name: "non-heartbeat", - message: &messaging.TargetMessage{ - From: node.ID("node1"), - To: node.ID("node2"), - Topic: messaging.MaintainerManagerTopic, - Type: messaging.TypeMaintainerCloseResponse, - Message: []messaging.IOTypeT{ - &heartbeatpb.MaintainerCloseResponse{}, - }, - }, - hasKeys: []string{ - "from", - "to", - "messageType", - "topic", - "messageCount", - }, - missKeys: []string{ - "statusCount", - "heartbeatCheckpointTs", - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - keys := zapFieldKeys(slowMessageFields(tt.message)) - for _, key := range tt.hasKeys { - if _, ok := keys[key]; !ok { - t.Fatalf("slowMessageFields() missing key %q", key) - } - } - for _, key := range tt.missKeys { - if _, ok := keys[key]; ok { - t.Fatalf("slowMessageFields() unexpectedly has key %q", key) - } - } - }) - } -} - -func zapFieldKeys(fields []zap.Field) map[string]struct{} { - keys := make(map[string]struct{}, len(fields)) - for _, field := range fields { - keys[field.Key] = struct{}{} - } - return keys -} From 002a79afbea9823b952c6a7d8de0cfdde876e7e5 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 29 Apr 2026 17:25:11 +0800 Subject: [PATCH 04/12] Throttle checkpoint calculation under operator backlog --- maintainer/maintainer.go | 39 ++++++++++++++++++++++++++++------- maintainer/maintainer_test.go | 17 +++++++++++++++ pkg/metrics/maintainer.go | 10 +++++++++ 3 files changed, 59 insertions(+), 7 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 580286ff9e..dfd02936cc 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -49,6 +49,11 @@ import ( const ( periodEventInterval = time.Millisecond * 100 periodRedoInterval = time.Second * 1 + + checkpointNormalInterval = periodEventInterval + checkpointSlowInterval = time.Second * 5 + checkpointSlowOperatorThreshold = 3000 + checkpointResumeOperatorThreshold = 1500 ) // Maintainer is response for handle changefeed replication tasks. Maintainer should: @@ -155,10 +160,11 @@ type Maintainer struct { resolvedTsLagGauge prometheus.Gauge eventChLenGauge prometheus.Gauge - scheduledTaskGauge prometheus.Gauge - spanCountGauge prometheus.Gauge - tableCountGauge prometheus.Gauge - handleEventDuration prometheus.Observer + scheduledTaskGauge prometheus.Gauge + spanCountGauge prometheus.Gauge + tableCountGauge prometheus.Gauge + handleEventDuration prometheus.Observer + checkpointCalcDuration prometheus.Observer redoScheduledTaskGauge prometheus.Gauge redoSpanCountGauge prometheus.Gauge @@ -226,6 +232,8 @@ func NewMaintainer(cfID common.ChangeFeedID, spanCountGauge: metrics.SpanCountGauge.WithLabelValues(keyspaceName, name, "default"), tableCountGauge: metrics.TableCountGauge.WithLabelValues(keyspaceName, name, "default"), handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(keyspaceName, name), + checkpointCalcDuration: metrics.MaintainerCheckpointCalculateDuration.WithLabelValues( + keyspaceName, name), redoScheduledTaskGauge: metrics.ScheduleTaskGauge.WithLabelValues(keyspaceName, name, "redo"), redoSpanCountGauge: metrics.SpanCountGauge.WithLabelValues(keyspaceName, name, "redo"), @@ -464,6 +472,7 @@ func (m *Maintainer) cleanupMetrics() { metrics.MaintainerCheckpointTsGauge.DeleteLabelValues(keyspace, name) metrics.MaintainerCheckpointTsLagGauge.DeleteLabelValues(keyspace, name) metrics.MaintainerHandleEventDuration.DeleteLabelValues(keyspace, name) + metrics.MaintainerCheckpointCalculateDuration.DeleteLabelValues(keyspace, name) metrics.MaintainerEventChLenGauge.DeleteLabelValues(keyspace, name) metrics.MaintainerResolvedTsGauge.DeleteLabelValues(keyspace, name) metrics.MaintainerResolvedTsLagGauge.DeleteLabelValues(keyspace, name) @@ -649,25 +658,38 @@ func (m *Maintainer) handleRedoMetaTsMessage(ctx context.Context) { } } +func checkpointCalculateInterval(operatorSize int, current time.Duration) time.Duration { + if operatorSize > checkpointSlowOperatorThreshold { + return checkpointSlowInterval + } + if operatorSize < checkpointResumeOperatorThreshold { + return checkpointNormalInterval + } + return current +} + // calCheckpointTs will be a little expensive when there are a large number of operators or absent tasks // so we use a single goroutine to calculate the checkpointTs, instead of blocking event handling func (m *Maintainer) calCheckpointTs(ctx context.Context) { - ticker := time.NewTicker(periodEventInterval) - defer ticker.Stop() + interval := checkpointNormalInterval + timer := time.NewTimer(interval) + defer timer.Stop() for { select { case <-ctx.Done(): return - case <-ticker.C: + case <-timer.C: if !m.initialized.Load() { log.Warn("can not advance checkpointTs since not bootstrapped", zap.Stringer("changefeedID", m.changefeedID), zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs)) + timer.Reset(interval) break } + start := time.Now() // first check the online/offline nodes // we need to check node changed before calculating checkpointTs // to avoid the case when a node is offline, the node's heartbeat is missing @@ -682,6 +704,9 @@ func (m *Maintainer) calCheckpointTs(ctx context.Context) { m.setWatermark(*newWatermark) m.updateMetrics() } + m.checkpointCalcDuration.Observe(time.Since(start).Seconds()) + interval = checkpointCalculateInterval(m.controller.operatorController.OperatorSize(), interval) + timer.Reset(interval) } } } diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index 1df5a11e28..e7585cea08 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -430,6 +430,17 @@ func TestMaintainerCalculateNewCheckpointTs(t *testing.T) { }) } +func TestCheckpointCalculateInterval(t *testing.T) { + require.Equal(t, checkpointSlowInterval, + checkpointCalculateInterval(checkpointSlowOperatorThreshold+1, checkpointNormalInterval)) + require.Equal(t, checkpointNormalInterval, + checkpointCalculateInterval(checkpointResumeOperatorThreshold-1, checkpointSlowInterval)) + require.Equal(t, checkpointSlowInterval, + checkpointCalculateInterval(checkpointResumeOperatorThreshold, checkpointSlowInterval)) + require.Equal(t, checkpointNormalInterval, + checkpointCalculateInterval(checkpointSlowOperatorThreshold, checkpointNormalInterval)) +} + func TestMaintainerCalCheckpointTsSkipsInvalidGlobalCheckpoint(t *testing.T) { m, selfNodeID := newMaintainerForCheckpointCalculationTest(t) m.initialized.Store(true) @@ -542,6 +553,9 @@ func newMaintainerForCheckpointCalculationTest(t testing.TB) (*Maintainer, node. resolvedTsLagGauge: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "test_resolved_ts_lag", }), + checkpointCalcDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "test_checkpoint_calculate_duration", + }), } m.watermark.Watermark = heartbeatpb.NewMaxWatermark() return m, selfNode.ID @@ -603,6 +617,9 @@ func newMaintainerForRedoCheckpointCalculationTest(t testing.TB) (*Maintainer, n resolvedTsLagGauge: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "test_redo_resolved_ts_lag", }), + checkpointCalcDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "test_redo_checkpoint_calculate_duration", + }), } m.watermark.Watermark = heartbeatpb.NewMaxWatermark() return m, selfNode.ID diff --git a/pkg/metrics/maintainer.go b/pkg/metrics/maintainer.go index b77706e4f4..0067329ba2 100644 --- a/pkg/metrics/maintainer.go +++ b/pkg/metrics/maintainer.go @@ -25,6 +25,15 @@ var ( Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18), }, []string{getKeyspaceLabel(), "changefeed"}) + MaintainerCheckpointCalculateDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "maintainer", + Name: "checkpoint_calculate_duration_seconds", + Help: "Bucketed histogram of maintainer checkpoint calculation time (s).", + Buckets: prometheus.ExponentialBuckets(0.001 /* 1 ms */, 2, 20), + }, []string{getKeyspaceLabel(), "changefeed"}) + MaintainerEventChLenGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -61,6 +70,7 @@ var ( func initMaintainerMetrics(registry *prometheus.Registry) { registry.MustRegister(MaintainerHandleEventDuration) + registry.MustRegister(MaintainerCheckpointCalculateDuration) registry.MustRegister(MaintainerEventChLenGauge) registry.MustRegister(OperatorCount) registry.MustRegister(OperatorDuration) From 66babac0cec5eeb1964cc4a2be82159c4ac5236a Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 29 Apr 2026 17:29:34 +0800 Subject: [PATCH 05/12] update --- maintainer/maintainer.go | 2 +- maintainer/maintainer_test.go | 11 ----------- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index dfd02936cc..bb295d99af 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -53,7 +53,7 @@ const ( checkpointNormalInterval = periodEventInterval checkpointSlowInterval = time.Second * 5 checkpointSlowOperatorThreshold = 3000 - checkpointResumeOperatorThreshold = 1500 + checkpointResumeOperatorThreshold = 200 ) // Maintainer is response for handle changefeed replication tasks. Maintainer should: diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index e7585cea08..322f6b17cc 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -430,17 +430,6 @@ func TestMaintainerCalculateNewCheckpointTs(t *testing.T) { }) } -func TestCheckpointCalculateInterval(t *testing.T) { - require.Equal(t, checkpointSlowInterval, - checkpointCalculateInterval(checkpointSlowOperatorThreshold+1, checkpointNormalInterval)) - require.Equal(t, checkpointNormalInterval, - checkpointCalculateInterval(checkpointResumeOperatorThreshold-1, checkpointSlowInterval)) - require.Equal(t, checkpointSlowInterval, - checkpointCalculateInterval(checkpointResumeOperatorThreshold, checkpointSlowInterval)) - require.Equal(t, checkpointNormalInterval, - checkpointCalculateInterval(checkpointSlowOperatorThreshold, checkpointNormalInterval)) -} - func TestMaintainerCalCheckpointTsSkipsInvalidGlobalCheckpoint(t *testing.T) { m, selfNodeID := newMaintainerForCheckpointCalculationTest(t) m.initialized.Store(true) From 0f8d2785e0effe38a638bd50308a2a1f7361a580 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 29 Apr 2026 19:50:41 +0800 Subject: [PATCH 06/12] update --- maintainer/maintainer.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index bb295d99af..9ea9ed9191 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -47,11 +47,11 @@ import ( ) const ( - periodEventInterval = time.Millisecond * 100 + periodEventInterval = time.Millisecond * 500 periodRedoInterval = time.Second * 1 - checkpointNormalInterval = periodEventInterval - checkpointSlowInterval = time.Second * 5 + checkpointNormalInterval = 200 * time.Millisecond + checkpointSlowInterval = time.Second * 30 checkpointSlowOperatorThreshold = 3000 checkpointResumeOperatorThreshold = 200 ) @@ -659,6 +659,11 @@ func (m *Maintainer) handleRedoMetaTsMessage(ctx context.Context) { } func checkpointCalculateInterval(operatorSize int, current time.Duration) time.Duration { + defer func() { + log.Info("checkpoint calculate interval", + zap.Int("operatorSize", operatorSize), + zap.Duration("currentInterval", current)) + }() if operatorSize > checkpointSlowOperatorThreshold { return checkpointSlowInterval } From f0fd45f255dd4e29efb3ec720b0477a7e96b3008 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 29 Apr 2026 20:23:13 +0800 Subject: [PATCH 07/12] update --- maintainer/maintainer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 9ea9ed9191..5f5e336713 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -710,7 +710,7 @@ func (m *Maintainer) calCheckpointTs(ctx context.Context) { m.updateMetrics() } m.checkpointCalcDuration.Observe(time.Since(start).Seconds()) - interval = checkpointCalculateInterval(m.controller.operatorController.OperatorSize(), interval) + interval = checkpointCalculateInterval(m.controller.spanController.GetAbsentSize(), interval) timer.Reset(interval) } } From 8e4aeb4c597aad8443cbbb359be465a17d8bf829 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 6 May 2026 12:07:07 +0800 Subject: [PATCH 08/12] maintainer: address checkpoint tracker review comments --- maintainer/span/checkpoint_ts_tracker_test.go | 40 ++++++++----------- pkg/scheduler/replica/replication_group.go | 6 +-- .../replica/replication_group_test.go | 29 ++++---------- 3 files changed, 26 insertions(+), 49 deletions(-) diff --git a/maintainer/span/checkpoint_ts_tracker_test.go b/maintainer/span/checkpoint_ts_tracker_test.go index 0f2537ff27..1eef2d669a 100644 --- a/maintainer/span/checkpoint_ts_tracker_test.go +++ b/maintainer/span/checkpoint_ts_tracker_test.go @@ -17,6 +17,7 @@ import ( "testing" "github.com/pingcap/ticdc/pkg/common" + "github.com/stretchr/testify/require" ) func TestCheckpointTsTrackerMin(t *testing.T) { @@ -32,33 +33,28 @@ func TestCheckpointTsTrackerMin(t *testing.T) { tracker.addOrUpdate(id3, 80) got, ok := tracker.min() - if !ok || got != 80 { - t.Fatalf("checkpointTsTracker.min() = %d, %v, want 80, true", got, ok) - } + require.True(t, ok) + require.Equal(t, uint64(80), got) tracker.update(id2, 120) got, ok = tracker.min() - if !ok || got != 80 { - t.Fatalf("checkpointTsTracker.min() after one duplicate update = %d, %v, want 80, true", got, ok) - } + require.True(t, ok) + require.Equal(t, uint64(80), got) tracker.remove(id3) got, ok = tracker.min() - if !ok || got != 100 { - t.Fatalf("checkpointTsTracker.min() after removing duplicate min = %d, %v, want 100, true", got, ok) - } + require.True(t, ok) + require.Equal(t, uint64(100), got) tracker.remove(id1) got, ok = tracker.min() - if !ok || got != 120 { - t.Fatalf("checkpointTsTracker.min() after removing current min = %d, %v, want 120, true", got, ok) - } + require.True(t, ok) + require.Equal(t, uint64(120), got) tracker.remove(id2) got, ok = tracker.min() - if ok || got != 0 { - t.Fatalf("checkpointTsTracker.min() after removing all = %d, %v, want 0, false", got, ok) - } + require.False(t, ok) + require.Equal(t, uint64(0), got) } func TestCheckpointTsTrackerIgnoresMissingUpdate(t *testing.T) { @@ -70,9 +66,8 @@ func TestCheckpointTsTrackerIgnoresMissingUpdate(t *testing.T) { tracker.remove(id) got, ok := tracker.min() - if ok || got != 0 { - t.Fatalf("checkpointTsTracker.min() after missing update = %d, %v, want 0, false", got, ok) - } + require.False(t, ok) + require.Equal(t, uint64(0), got) } func TestCheckpointTsTrackerRemovesStaleCheckpointTs(t *testing.T) { @@ -88,13 +83,10 @@ func TestCheckpointTsTrackerRemovesStaleCheckpointTs(t *testing.T) { tracker.update(movingID, checkpointTs) } - if got := tracker.heap.Len(); got != 2 { - t.Fatalf("checkpointTsTracker heap size = %d, want 2", got) - } + require.Equal(t, 2, tracker.heap.Len()) tracker.remove(blockingID) got, ok := tracker.min() - if !ok || got != 99 { - t.Fatalf("checkpointTsTracker.min() after removing blocker = %d, %v, want 99, true", got, ok) - } + require.True(t, ok) + require.Equal(t, uint64(99), got) } diff --git a/pkg/scheduler/replica/replication_group.go b/pkg/scheduler/replica/replication_group.go index f0aee6519f..3f24c5e02d 100644 --- a/pkg/scheduler/replica/replication_group.go +++ b/pkg/scheduler/replica/replication_group.go @@ -309,11 +309,9 @@ func (m *iMap[T, R]) Get(key T) (R, bool) { } func (m *iMap[T, R]) Set(key T, value R) { - if _, loaded := m.inner.LoadOrStore(key, value); loaded { - m.inner.Store(key, value) - return + if _, loaded := m.inner.Swap(key, value); !loaded { + m.size.Add(1) } - m.size.Add(1) } func (m *iMap[T, R]) Delete(key T) { diff --git a/pkg/scheduler/replica/replication_group_test.go b/pkg/scheduler/replica/replication_group_test.go index afdbf9f501..63fc5df26f 100644 --- a/pkg/scheduler/replica/replication_group_test.go +++ b/pkg/scheduler/replica/replication_group_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/pingcap/ticdc/pkg/node" + "github.com/stretchr/testify/require" ) type testReplicationID string @@ -66,19 +67,13 @@ func TestIMapLenTracksOverwriteAndDelete(t *testing.T) { replicaMap.Set(id, &testReplication{id: id}) replicaMap.Set(id, &testReplication{id: id}) - if got := replicaMap.Len(); got != 1 { - t.Fatalf("iMap.Len() after overwrite = %d, want 1", got) - } + require.Equal(t, 1, replicaMap.Len()) replicaMap.Delete(testReplicationID("missing")) - if got := replicaMap.Len(); got != 1 { - t.Fatalf("iMap.Len() after deleting missing key = %d, want 1", got) - } + require.Equal(t, 1, replicaMap.Len()) replicaMap.Delete(id) - if got := replicaMap.Len(); got != 0 { - t.Fatalf("iMap.Len() after delete = %d, want 0", got) - } + require.Equal(t, 0, replicaMap.Len()) } func TestGetAbsentByGroupStopsAtBatch(t *testing.T) { @@ -101,12 +96,8 @@ func TestGetAbsentByGroupStopsAtBatch(t *testing.T) { } absent := db.GetAbsentByGroup(DefaultGroupID, 3) - if got := len(absent); got != 3 { - t.Fatalf("GetAbsentByGroup() returned %d tasks, want 3", got) - } - if got := shouldRunCalls.Load(); got != 3 { - t.Fatalf("GetAbsentByGroup() called ShouldRun %d times, want 3", got) - } + require.Len(t, absent, 3) + require.Equal(t, int64(3), shouldRunCalls.Load()) } func TestGetAbsentByGroupSkipsNotRunnableTasks(t *testing.T) { @@ -128,10 +119,6 @@ func TestGetAbsentByGroupSkipsNotRunnableTasks(t *testing.T) { } absent := db.GetAbsentByGroup(DefaultGroupID, 3) - if got := len(absent); got != 0 { - t.Fatalf("GetAbsentByGroup() returned %d tasks, want 0", got) - } - if got := shouldRunCalls.Load(); got != 100 { - t.Fatalf("GetAbsentByGroup() called ShouldRun %d times, want 100", got) - } + require.Len(t, absent, 0) + require.Equal(t, int64(100), shouldRunCalls.Load()) } From e573949787c4584d6baa15dababf7bd7bccf31bf Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 6 May 2026 16:14:13 +0800 Subject: [PATCH 09/12] maintainer: clarify checkpoint tracker naming --- maintainer/span/checkpoint_ts_tracker.go | 82 ++++++++++++------- maintainer/span/checkpoint_ts_tracker_test.go | 28 +++---- maintainer/span/span_controller.go | 6 +- 3 files changed, 68 insertions(+), 48 deletions(-) diff --git a/maintainer/span/checkpoint_ts_tracker.go b/maintainer/span/checkpoint_ts_tracker.go index f84d663f06..b6c41346fe 100644 --- a/maintainer/span/checkpoint_ts_tracker.go +++ b/maintainer/span/checkpoint_ts_tracker.go @@ -19,89 +19,109 @@ import ( "github.com/pingcap/ticdc/pkg/common" ) +// checkpointTsTracker maintains the minimum checkpointTs among non-DDL spans +// that are not replicating yet. The owning SpanController must hold its mutex +// when accessing this tracker. type checkpointTsTracker struct { - // byID contains exactly the non-DDL spans that are absent or scheduling. - // The owning Controller must hold its mutex when accessing this tracker. - byID map[common.DispatcherID]uint64 - counts map[uint64]int - heap checkpointTsHeap + // checkpointTsBySpanID contains exactly the non-DDL spans that are absent or + // scheduling. Replicating spans are removed because they no longer block the + // non-replicating minimum checkpoint. + checkpointTsBySpanID map[common.DispatcherID]uint64 + + // checkpointTsRefCounts tracks how many spans currently hold each checkpointTs. + // The heap stores each checkpointTs once, so the count keeps duplicate values + // from being removed too early. + checkpointTsRefCounts map[uint64]int + + // minCheckpointTsHeap stores unique checkpointTs values and gives O(1) access + // to the current minimum. Insertions and removals are O(log n). + minCheckpointTsHeap checkpointTsHeap } func newCheckpointTsTracker() *checkpointTsTracker { return &checkpointTsTracker{ - byID: make(map[common.DispatcherID]uint64), - counts: make(map[uint64]int), - heap: newCheckpointTsHeap(), + checkpointTsBySpanID: make(map[common.DispatcherID]uint64), + checkpointTsRefCounts: make(map[uint64]int), + minCheckpointTsHeap: newCheckpointTsHeap(), } } -func (t *checkpointTsTracker) addOrUpdate(id common.DispatcherID, checkpointTs uint64) { - if old, ok := t.byID[id]; ok { +// trackSpan records a span that has entered a non-replicating state. It also +// handles duplicate calls for the same span by replacing the old checkpointTs. +func (t *checkpointTsTracker) trackSpan(id common.DispatcherID, checkpointTs uint64) { + if old, ok := t.checkpointTsBySpanID[id]; ok { if old == checkpointTs { return } t.decrement(old) } - t.byID[id] = checkpointTs + t.checkpointTsBySpanID[id] = checkpointTs t.increment(checkpointTs) } -func (t *checkpointTsTracker) update(id common.DispatcherID, checkpointTs uint64) { - old, ok := t.byID[id] +// updateTrackedSpan updates checkpointTs only for spans that are already +// tracked. Missing spans are ignored because DDL or replicating spans are not +// part of the non-replicating minimum. +func (t *checkpointTsTracker) updateTrackedSpan(id common.DispatcherID, checkpointTs uint64) { + old, ok := t.checkpointTsBySpanID[id] if !ok || old == checkpointTs { return } t.decrement(old) - t.byID[id] = checkpointTs + t.checkpointTsBySpanID[id] = checkpointTs t.increment(checkpointTs) } -func (t *checkpointTsTracker) remove(id common.DispatcherID) { - old, ok := t.byID[id] +// untrackSpan removes a span after it becomes replicating or leaves the +// controller. Missing spans are ignored for the same reason as updateTrackedSpan. +func (t *checkpointTsTracker) untrackSpan(id common.DispatcherID) { + old, ok := t.checkpointTsBySpanID[id] if !ok { return } - delete(t.byID, id) + delete(t.checkpointTsBySpanID, id) t.decrement(old) - if len(t.byID) == 0 { + if len(t.checkpointTsBySpanID) == 0 { // Release large maps after a bootstrap wave drains. A 1M-table changefeed // can otherwise retain the tracker backing storage for its whole lifetime. t.reset() } } +// min returns the current minimum checkpointTs among tracked spans. func (t *checkpointTsTracker) min() (uint64, bool) { - if t.heap.Len() == 0 { + if t.minCheckpointTsHeap.Len() == 0 { return 0, false } - return t.heap.peek(), true + return t.minCheckpointTsHeap.peek(), true } func (t *checkpointTsTracker) increment(checkpointTs uint64) { - if t.counts[checkpointTs] > 0 { - t.counts[checkpointTs]++ + if t.checkpointTsRefCounts[checkpointTs] > 0 { + t.checkpointTsRefCounts[checkpointTs]++ return } - t.counts[checkpointTs] = 1 - heap.Push(&t.heap, checkpointTs) + t.checkpointTsRefCounts[checkpointTs] = 1 + heap.Push(&t.minCheckpointTsHeap, checkpointTs) } func (t *checkpointTsTracker) decrement(checkpointTs uint64) { - count := t.counts[checkpointTs] + count := t.checkpointTsRefCounts[checkpointTs] if count <= 1 { - delete(t.counts, checkpointTs) - t.heap.remove(checkpointTs) + delete(t.checkpointTsRefCounts, checkpointTs) + t.minCheckpointTsHeap.remove(checkpointTs) return } - t.counts[checkpointTs] = count - 1 + t.checkpointTsRefCounts[checkpointTs] = count - 1 } func (t *checkpointTsTracker) reset() { - t.byID = make(map[common.DispatcherID]uint64) - t.counts = make(map[uint64]int) - t.heap = newCheckpointTsHeap() + t.checkpointTsBySpanID = make(map[common.DispatcherID]uint64) + t.checkpointTsRefCounts = make(map[uint64]int) + t.minCheckpointTsHeap = newCheckpointTsHeap() } +// checkpointTsHeap is a removable min-heap for unique checkpointTs values. type checkpointTsHeap struct { values []uint64 indexes map[uint64]int diff --git a/maintainer/span/checkpoint_ts_tracker_test.go b/maintainer/span/checkpoint_ts_tracker_test.go index 1eef2d669a..e71e14ef63 100644 --- a/maintainer/span/checkpoint_ts_tracker_test.go +++ b/maintainer/span/checkpoint_ts_tracker_test.go @@ -28,30 +28,30 @@ func TestCheckpointTsTrackerMin(t *testing.T) { id2 := common.NewDispatcherID() id3 := common.NewDispatcherID() - tracker.addOrUpdate(id1, 100) - tracker.addOrUpdate(id2, 80) - tracker.addOrUpdate(id3, 80) + tracker.trackSpan(id1, 100) + tracker.trackSpan(id2, 80) + tracker.trackSpan(id3, 80) got, ok := tracker.min() require.True(t, ok) require.Equal(t, uint64(80), got) - tracker.update(id2, 120) + tracker.updateTrackedSpan(id2, 120) got, ok = tracker.min() require.True(t, ok) require.Equal(t, uint64(80), got) - tracker.remove(id3) + tracker.untrackSpan(id3) got, ok = tracker.min() require.True(t, ok) require.Equal(t, uint64(100), got) - tracker.remove(id1) + tracker.untrackSpan(id1) got, ok = tracker.min() require.True(t, ok) require.Equal(t, uint64(120), got) - tracker.remove(id2) + tracker.untrackSpan(id2) got, ok = tracker.min() require.False(t, ok) require.Equal(t, uint64(0), got) @@ -62,8 +62,8 @@ func TestCheckpointTsTrackerIgnoresMissingUpdate(t *testing.T) { tracker := newCheckpointTsTracker() id := common.NewDispatcherID() - tracker.update(id, 100) - tracker.remove(id) + tracker.updateTrackedSpan(id, 100) + tracker.untrackSpan(id) got, ok := tracker.min() require.False(t, ok) @@ -76,16 +76,16 @@ func TestCheckpointTsTrackerRemovesStaleCheckpointTs(t *testing.T) { tracker := newCheckpointTsTracker() blockingID := common.NewDispatcherID() movingID := common.NewDispatcherID() - tracker.addOrUpdate(blockingID, 1) - tracker.addOrUpdate(movingID, 2) + tracker.trackSpan(blockingID, 1) + tracker.trackSpan(movingID, 2) for checkpointTs := uint64(3); checkpointTs < 100; checkpointTs++ { - tracker.update(movingID, checkpointTs) + tracker.updateTrackedSpan(movingID, checkpointTs) } - require.Equal(t, 2, tracker.heap.Len()) + require.Equal(t, 2, tracker.minCheckpointTsHeap.Len()) - tracker.remove(blockingID) + tracker.untrackSpan(blockingID) got, ok := tracker.min() require.True(t, ok) require.Equal(t, uint64(99), got) diff --git a/maintainer/span/span_controller.go b/maintainer/span/span_controller.go index 0815e59657..40fcea4d47 100644 --- a/maintainer/span/span_controller.go +++ b/maintainer/span/span_controller.go @@ -364,7 +364,7 @@ func (c *Controller) UpdateStatus(span *replica.SpanReplication, status *heartbe c.mu.Lock() defer c.mu.Unlock() if span.UpdateStatus(status) { - c.nonReplicatingCheckpointTs.update(span.ID, span.GetStatus().CheckpointTs) + c.nonReplicatingCheckpointTs.updateTrackedSpan(span.ID, span.GetStatus().CheckpointTs) } checker.UpdateStatus(span) } @@ -611,11 +611,11 @@ func (c *Controller) trackNonReplicatingSpan(span *replica.SpanReplication) { if span == c.ddlSpan { return } - c.nonReplicatingCheckpointTs.addOrUpdate(span.ID, span.GetStatus().CheckpointTs) + c.nonReplicatingCheckpointTs.trackSpan(span.ID, span.GetStatus().CheckpointTs) } func (c *Controller) untrackNonReplicatingSpan(span *replica.SpanReplication) { - c.nonReplicatingCheckpointTs.remove(span.ID) + c.nonReplicatingCheckpointTs.untrackSpan(span.ID) } // addToSchemaAndTableMap adds the span to the schema and table map From f0dd19b1a56aa0619f738c1abe8b36b35802725f Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 6 May 2026 16:17:48 +0800 Subject: [PATCH 10/12] update --- maintainer/span/span_controller.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/maintainer/span/span_controller.go b/maintainer/span/span_controller.go index 40fcea4d47..f5d4964c69 100644 --- a/maintainer/span/span_controller.go +++ b/maintainer/span/span_controller.go @@ -358,14 +358,15 @@ func (c *Controller) UpdateStatus(span *replica.SpanReplication, status *heartbe span.UpdateStatus(status) return } + if span.UpdateStatus(status) { + c.nonReplicatingCheckpointTs.updateTrackedSpan(span.ID, span.GetStatus().CheckpointTs) + } + // Note: a read lock is required inside the `GetGroupChecker` method. checker := c.GetGroupChecker(span.GetGroupID()) c.mu.Lock() defer c.mu.Unlock() - if span.UpdateStatus(status) { - c.nonReplicatingCheckpointTs.updateTrackedSpan(span.ID, span.GetStatus().CheckpointTs) - } checker.UpdateStatus(span) } From e15cd175697741da2608c0ff16bfc828cba76a6a Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 6 May 2026 16:27:16 +0800 Subject: [PATCH 11/12] update --- maintainer/maintainer.go | 46 ++++++------------------------ maintainer/maintainer_test.go | 6 ---- maintainer/span/span_controller.go | 7 ++--- 3 files changed, 11 insertions(+), 48 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 5f5e336713..580286ff9e 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -47,13 +47,8 @@ import ( ) const ( - periodEventInterval = time.Millisecond * 500 + periodEventInterval = time.Millisecond * 100 periodRedoInterval = time.Second * 1 - - checkpointNormalInterval = 200 * time.Millisecond - checkpointSlowInterval = time.Second * 30 - checkpointSlowOperatorThreshold = 3000 - checkpointResumeOperatorThreshold = 200 ) // Maintainer is response for handle changefeed replication tasks. Maintainer should: @@ -160,11 +155,10 @@ type Maintainer struct { resolvedTsLagGauge prometheus.Gauge eventChLenGauge prometheus.Gauge - scheduledTaskGauge prometheus.Gauge - spanCountGauge prometheus.Gauge - tableCountGauge prometheus.Gauge - handleEventDuration prometheus.Observer - checkpointCalcDuration prometheus.Observer + scheduledTaskGauge prometheus.Gauge + spanCountGauge prometheus.Gauge + tableCountGauge prometheus.Gauge + handleEventDuration prometheus.Observer redoScheduledTaskGauge prometheus.Gauge redoSpanCountGauge prometheus.Gauge @@ -232,8 +226,6 @@ func NewMaintainer(cfID common.ChangeFeedID, spanCountGauge: metrics.SpanCountGauge.WithLabelValues(keyspaceName, name, "default"), tableCountGauge: metrics.TableCountGauge.WithLabelValues(keyspaceName, name, "default"), handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(keyspaceName, name), - checkpointCalcDuration: metrics.MaintainerCheckpointCalculateDuration.WithLabelValues( - keyspaceName, name), redoScheduledTaskGauge: metrics.ScheduleTaskGauge.WithLabelValues(keyspaceName, name, "redo"), redoSpanCountGauge: metrics.SpanCountGauge.WithLabelValues(keyspaceName, name, "redo"), @@ -472,7 +464,6 @@ func (m *Maintainer) cleanupMetrics() { metrics.MaintainerCheckpointTsGauge.DeleteLabelValues(keyspace, name) metrics.MaintainerCheckpointTsLagGauge.DeleteLabelValues(keyspace, name) metrics.MaintainerHandleEventDuration.DeleteLabelValues(keyspace, name) - metrics.MaintainerCheckpointCalculateDuration.DeleteLabelValues(keyspace, name) metrics.MaintainerEventChLenGauge.DeleteLabelValues(keyspace, name) metrics.MaintainerResolvedTsGauge.DeleteLabelValues(keyspace, name) metrics.MaintainerResolvedTsLagGauge.DeleteLabelValues(keyspace, name) @@ -658,43 +649,25 @@ func (m *Maintainer) handleRedoMetaTsMessage(ctx context.Context) { } } -func checkpointCalculateInterval(operatorSize int, current time.Duration) time.Duration { - defer func() { - log.Info("checkpoint calculate interval", - zap.Int("operatorSize", operatorSize), - zap.Duration("currentInterval", current)) - }() - if operatorSize > checkpointSlowOperatorThreshold { - return checkpointSlowInterval - } - if operatorSize < checkpointResumeOperatorThreshold { - return checkpointNormalInterval - } - return current -} - // calCheckpointTs will be a little expensive when there are a large number of operators or absent tasks // so we use a single goroutine to calculate the checkpointTs, instead of blocking event handling func (m *Maintainer) calCheckpointTs(ctx context.Context) { - interval := checkpointNormalInterval - timer := time.NewTimer(interval) - defer timer.Stop() + ticker := time.NewTicker(periodEventInterval) + defer ticker.Stop() for { select { case <-ctx.Done(): return - case <-timer.C: + case <-ticker.C: if !m.initialized.Load() { log.Warn("can not advance checkpointTs since not bootstrapped", zap.Stringer("changefeedID", m.changefeedID), zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs)) - timer.Reset(interval) break } - start := time.Now() // first check the online/offline nodes // we need to check node changed before calculating checkpointTs // to avoid the case when a node is offline, the node's heartbeat is missing @@ -709,9 +682,6 @@ func (m *Maintainer) calCheckpointTs(ctx context.Context) { m.setWatermark(*newWatermark) m.updateMetrics() } - m.checkpointCalcDuration.Observe(time.Since(start).Seconds()) - interval = checkpointCalculateInterval(m.controller.spanController.GetAbsentSize(), interval) - timer.Reset(interval) } } } diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index 322f6b17cc..1df5a11e28 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -542,9 +542,6 @@ func newMaintainerForCheckpointCalculationTest(t testing.TB) (*Maintainer, node. resolvedTsLagGauge: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "test_resolved_ts_lag", }), - checkpointCalcDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "test_checkpoint_calculate_duration", - }), } m.watermark.Watermark = heartbeatpb.NewMaxWatermark() return m, selfNode.ID @@ -606,9 +603,6 @@ func newMaintainerForRedoCheckpointCalculationTest(t testing.TB) (*Maintainer, n resolvedTsLagGauge: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "test_redo_resolved_ts_lag", }), - checkpointCalcDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "test_redo_checkpoint_calculate_duration", - }), } m.watermark.Watermark = heartbeatpb.NewMaxWatermark() return m, selfNode.ID diff --git a/maintainer/span/span_controller.go b/maintainer/span/span_controller.go index f5d4964c69..40fcea4d47 100644 --- a/maintainer/span/span_controller.go +++ b/maintainer/span/span_controller.go @@ -358,15 +358,14 @@ func (c *Controller) UpdateStatus(span *replica.SpanReplication, status *heartbe span.UpdateStatus(status) return } - if span.UpdateStatus(status) { - c.nonReplicatingCheckpointTs.updateTrackedSpan(span.ID, span.GetStatus().CheckpointTs) - } - // Note: a read lock is required inside the `GetGroupChecker` method. checker := c.GetGroupChecker(span.GetGroupID()) c.mu.Lock() defer c.mu.Unlock() + if span.UpdateStatus(status) { + c.nonReplicatingCheckpointTs.updateTrackedSpan(span.ID, span.GetStatus().CheckpointTs) + } checker.UpdateStatus(span) } From 1d19b7222c04f2342ddec31b6f696a1b9d171e9a Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Wed, 6 May 2026 16:59:46 +0800 Subject: [PATCH 12/12] update --- pkg/metrics/maintainer.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/pkg/metrics/maintainer.go b/pkg/metrics/maintainer.go index 0067329ba2..b77706e4f4 100644 --- a/pkg/metrics/maintainer.go +++ b/pkg/metrics/maintainer.go @@ -25,15 +25,6 @@ var ( Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18), }, []string{getKeyspaceLabel(), "changefeed"}) - MaintainerCheckpointCalculateDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "ticdc", - Subsystem: "maintainer", - Name: "checkpoint_calculate_duration_seconds", - Help: "Bucketed histogram of maintainer checkpoint calculation time (s).", - Buckets: prometheus.ExponentialBuckets(0.001 /* 1 ms */, 2, 20), - }, []string{getKeyspaceLabel(), "changefeed"}) - MaintainerEventChLenGauge = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "ticdc", @@ -70,7 +61,6 @@ var ( func initMaintainerMetrics(registry *prometheus.Registry) { registry.MustRegister(MaintainerHandleEventDuration) - registry.MustRegister(MaintainerCheckpointCalculateDuration) registry.MustRegister(MaintainerEventChLenGauge) registry.MustRegister(OperatorCount) registry.MustRegister(OperatorDuration)