maintainer: optimize maintainer checkpoint tracking#4947
maintainer: optimize maintainer checkpoint tracking#4947hongyunyan wants to merge 13 commits intopingcap:masterfrom
Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThis PR adds an internal heap-backed checkpoint timestamp tracker and integrates it into the span Controller to avoid full-span scans, changes SpanReplication.UpdateStatus to return whether checkpointTs changed, adds unit tests for the tracker and controller behaviors, introduces atomic size tracking and batch retrieval in scheduler replication groups, and tweaks one slow-path log to include MessageType. ChangesMinor Logging Enhancement
Checkpoint Timestamp Tracking Infrastructure & Integration
Scheduler Replica Optimization
Sequence Diagram(s)(skip — conditions for diagram generation not met) Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.12.1)Command failed Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request optimizes the calculation of the minimum checkpoint timestamp for non-replicating spans by introducing a heap-based checkpointTsTracker, replacing a linear scan. It also improves the efficiency of the iMap structure in the scheduler by using an atomic counter for size tracking and enhances logging for slow maintainer events with detailed message fields. Review feedback suggests removing an unnecessary state reset in the tracker's removal logic and refining the atomic counter update in the iMap.Set method to avoid redundant operations.
|
/test all |
f0fd45f to
7cfdb8d
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (3)
maintainer/span/checkpoint_ts_tracker_test.go (1)
16-100: ⚡ Quick winUse
requireassertions in this new test file.These tests are deterministic, but the new cases still use manual
if/t.Fatalfchecks instead of the repo’s standard assertion style.As per coding guidelines, "Unit tests should use `*_test.go` file naming convention and favor deterministic tests using `testify/require`."♻️ Example cleanup
import ( "testing" "github.com/pingcap/ticdc/pkg/common" + "github.com/stretchr/testify/require" ) @@ - got, ok := tracker.min() - if !ok || got != 80 { - t.Fatalf("checkpointTsTracker.min() = %d, %v, want 80, true", got, ok) - } + got, ok := tracker.min() + require.True(t, ok) + require.Equal(t, uint64(80), got)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@maintainer/span/checkpoint_ts_tracker_test.go` around lines 16 - 100, Replace manual t.Fatalf checks with testify/require assertions in the three tests (TestCheckpointTsTrackerMin, TestCheckpointTsTrackerIgnoresMissingUpdate, TestCheckpointTsTrackerRemovesStaleCheckpointTs): add an import for "github.com/stretchr/testify/require" and use functions like require.True/False, require.Equal, and require.NoError (as appropriate) to assert expected values and booleans instead of the current if/.../t.Fatalf patterns, keeping the same expectation messages/values but leveraging require for concise, standard assertions.pkg/scheduler/replica/replication_group_test.go (1)
61-137: ⚡ Quick winUse
testify/requireinstead of rawt.FatalfassertionsAll three tests use bare
t.Fatalffor assertions. Per coding guidelines, tests should favortestify/requirefor assertions.♻️ Example migration for `TestIMapLenTracksOverwriteAndDelete`
import ( "fmt" "sync/atomic" "testing" "github.com/pingcap/ticdc/pkg/node" + "github.com/stretchr/testify/require" ) func TestIMapLenTracksOverwriteAndDelete(t *testing.T) { t.Parallel() replicaMap := newIMap[testReplicationID, *testReplication]() id := testReplicationID("a") replicaMap.Set(id, &testReplication{id: id}) replicaMap.Set(id, &testReplication{id: id}) - if got := replicaMap.Len(); got != 1 { - t.Fatalf("iMap.Len() after overwrite = %d, want 1", got) - } + require.Equal(t, 1, replicaMap.Len(), "iMap.Len() after overwrite") replicaMap.Delete(testReplicationID("missing")) - if got := replicaMap.Len(); got != 1 { - t.Fatalf("iMap.Len() after deleting missing key = %d, want 1", got) - } + require.Equal(t, 1, replicaMap.Len(), "iMap.Len() after deleting missing key") replicaMap.Delete(id) - if got := replicaMap.Len(); got != 0 { - t.Fatalf("iMap.Len() after delete = %d, want 0", got) - } + require.Equal(t, 0, replicaMap.Len(), "iMap.Len() after delete") }As per coding guidelines: "Unit tests should use
*_test.gofile naming convention and favor deterministic tests usingtestify/require."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/scheduler/replica/replication_group_test.go` around lines 61 - 137, Replace raw t.Fatalf assertions in TestIMapLenTracksOverwriteAndDelete, TestGetAbsentByGroupStopsAtBatch, and TestGetAbsentByGroupSkipsNotRunnableTasks with testify/require assertions: import "github.com/stretchr/testify/require" and use require.Equal/require.Len (e.g., require.Equal(t, 1, replicaMap.Len()) or require.Len(t, absent, 3)) for value/length checks and require.Equal or require.EqualValues for atomic counts (e.g., require.Equal(t, int64(3), shouldRunCalls.Load())). Ensure the test signatures remain the same and remove the t.Fatalf calls.pkg/scheduler/replica/replication_group.go (1)
311-317: ⚡ Quick win
iMap.Setcan be simplified usingsync.Map.SwapThe current
LoadOrStore+ conditionalStorepattern performs two map writes when updating an existing key.sync.Map.Swapatomically stores the value and returns whether the key existed, collapsing this into a single operation:♻️ Proposed refactoring
func (m *iMap[T, R]) Set(key T, value R) { - if _, loaded := m.inner.LoadOrStore(key, value); loaded { - m.inner.Store(key, value) - return - } - m.size.Add(1) + if _, loaded := m.inner.Swap(key, value); !loaded { + m.size.Add(1) + } }The module targets Go 1.25.8, which exceeds the 1.20 minimum required for
Swap.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/scheduler/replica/replication_group.go` around lines 311 - 317, The Set method in iMap currently uses LoadOrStore then Store causing two writes on updates; replace that logic by calling m.inner.Swap(key, value) to atomically set the value and get whether the key existed, then only call m.size.Add(1) when Swap reports the key did not previously exist (loaded == false). Update the implementation in iMap.Set to use m.inner.Swap(key, value) and remove the extra Store/LoadOrStore branch so size is incremented exactly when a new key is inserted.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@maintainer/span/checkpoint_ts_tracker_test.go`:
- Around line 16-100: Replace manual t.Fatalf checks with testify/require
assertions in the three tests (TestCheckpointTsTrackerMin,
TestCheckpointTsTrackerIgnoresMissingUpdate,
TestCheckpointTsTrackerRemovesStaleCheckpointTs): add an import for
"github.com/stretchr/testify/require" and use functions like require.True/False,
require.Equal, and require.NoError (as appropriate) to assert expected values
and booleans instead of the current if/.../t.Fatalf patterns, keeping the same
expectation messages/values but leveraging require for concise, standard
assertions.
In `@pkg/scheduler/replica/replication_group_test.go`:
- Around line 61-137: Replace raw t.Fatalf assertions in
TestIMapLenTracksOverwriteAndDelete, TestGetAbsentByGroupStopsAtBatch, and
TestGetAbsentByGroupSkipsNotRunnableTasks with testify/require assertions:
import "github.com/stretchr/testify/require" and use require.Equal/require.Len
(e.g., require.Equal(t, 1, replicaMap.Len()) or require.Len(t, absent, 3)) for
value/length checks and require.Equal or require.EqualValues for atomic counts
(e.g., require.Equal(t, int64(3), shouldRunCalls.Load())). Ensure the test
signatures remain the same and remove the t.Fatalf calls.
In `@pkg/scheduler/replica/replication_group.go`:
- Around line 311-317: The Set method in iMap currently uses LoadOrStore then
Store causing two writes on updates; replace that logic by calling
m.inner.Swap(key, value) to atomically set the value and get whether the key
existed, then only call m.size.Add(1) when Swap reports the key did not
previously exist (loaded == false). Update the implementation in iMap.Set to use
m.inner.Swap(key, value) and remove the extra Store/LoadOrStore branch so size
is incremented exactly when a new key is inserted.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 27cb394a-23ab-4a24-a989-a9da26601d4f
📒 Files selected for processing (9)
maintainer/maintainer.gomaintainer/replica/replication_span.gomaintainer/span/checkpoint_ts_tracker.gomaintainer/span/checkpoint_ts_tracker_test.gomaintainer/span/span_controller.gomaintainer/span/span_controller_test.gopkg/scheduler/replica/replication.gopkg/scheduler/replica/replication_group.gopkg/scheduler/replica/replication_group_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
Background
Issue Number: ref #4951
During a 1M-table bootstrap test, maintainer checkpoint calculation became increasingly expensive because every 100ms tick scanned all absent and scheduling spans to compute the minimum checkpointTs. The same path also held back dispatcher scheduling progress as the number of non-replicating spans grew.
This PR focuses on replacing the repeated scans with incremental state tracking and small scheduler-side query optimizations. It does not change the checkpoint calculation rules themselves.
Motivation
The normal checkpointTs update frequency should remain unchanged because it controls externally visible checkpoint freshness. During large bootstrap waves, checkpoint calculation should avoid repeatedly scanning all absent and scheduling spans while thousands of spans are still being added.
This PR removes the repeated O(N) scans from non-replicating span checkpoint calculation and reduces scheduler batch-query overhead during operator-heavy bootstrap.
Summary of Changes
GetAbsentByGroupscanning to the requested batch size.Validation
make fmtgit fetch --prune upstream master && git diff --check upstream/master...HEADgit diff --checkgo test -count=1 ./maintainer/spango test -count=1 ./pkg/scheduler/replicago test --tags=intest -count=1 ./downstreamadapter/sink/redo -run TestRedoSinkInProcessorRelease Note
Summary by CodeRabbit
Bug Fixes
Refactor
Tests