server: honor scheduler concurrency for coordinator#4832
Conversation
Use the validated server scheduler config when constructing the coordinator so maintainer scheduling concurrency respects max-task-concurrency instead of a hard-coded value.
|
Skipping CI for Draft Pull Request. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAdds persisted-changefeed retrieval and makes ResumeChangefeed return the backend’s resumed ChangeFeedInfo; Controller clones returned in-memory info and applies backend-returned info on resume. API resume handler re-fetches persisted info. Elector captures SchedulerConfig and passes scheduler settings into coordinator creation. ChangesChangefeed resume & Controller defensive copy
Coordinator scheduler configurability
Sequence Diagram(s)sequenceDiagram
participant API as ResumeChangefeed handler
participant Coordinator
participant Controller
participant EtcdBackend
API->>Coordinator: GetChangefeed(ctx, id)
Coordinator->>Controller: GetChangefeed(ctx, id)
Controller-->>Coordinator: cloned in-memory ChangeFeedInfo
API->>Coordinator: Resume request (id, newCheckpointTs)
Coordinator->>Controller: ResumeChangefeed(ctx, id, newCheckpointTs)
Controller->>EtcdBackend: ResumeChangefeed(ctx, id, newCheckpointTs)
EtcdBackend-->>Controller: (*config.ChangeFeedInfo, error)
Controller->>Controller: clone resumedInfo and update in-memory state
Controller-->>Coordinator: resume result
Coordinator-->>API: final resume response / error
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request replaces hardcoded values for coordinator task concurrency and balance interval with dynamic settings retrieved from the global server configuration. It introduces a helper function coordinatorSchedulerSettings and includes a corresponding unit test to ensure the configuration is correctly applied. I have no feedback to provide.
|
/test all |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: hongyunyan, lidezhu The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/retest |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
coordinator/controller_test.go (1)
395-417: ⚡ Quick winStrengthen this concurrency test to avoid scheduler-dependent false negatives.
Line 409 snapshots
storedInfoonce, and both goroutines start unsynchronized; the read loop can finish before mutations begin, so an aliasing regression may slip through.Suggested refinement
var ( wg sync.WaitGroup storedChanged bool ) + start := make(chan struct{}) wg.Add(2) go func() { defer wg.Done() + <-start for i := 0; i < 1000; i++ { ret.SinkURI = "kafka://127.0.0.1:9092" ret.TargetTs = uint64(i + 1) } }() go func() { defer wg.Done() - storedInfo := changefeedDB.GetByID(cfID).GetInfo() + <-start for i := 0; i < 1000; i++ { + storedInfo := changefeedDB.GetByID(cfID).GetInfo() if storedInfo.SinkURI != "mysql://127.0.0.1:3306" || storedInfo.TargetTs != 0 { storedChanged = true return } } }() + close(start) wg.Wait()As per coding guidelines, "Prefer focused deterministic tests; see docs/agents/testing.md before adding or changing tests."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@coordinator/controller_test.go` around lines 395 - 417, The current concurrency test reads storedInfo once before the mutation loop, which lets the reader finish before writers run and yields flaky/scheduler-dependent behavior; change the reader to sample changefeedDB.GetByID(cfID).GetInfo() inside its loop (or use a simple start barrier such as a sync.WaitGroup/channel so both goroutines begin simultaneously) and then check for SinkURI/TargetTs changes on each iteration (update the storedChanged flag when a change is observed); reference the reader call changefeedDB.GetByID(cfID).GetInfo(), the writer updates to ret.SinkURI/ret.TargetTs, and the existing wg sync to coordinate starts.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@coordinator/controller_test.go`:
- Around line 395-417: The current concurrency test reads storedInfo once before
the mutation loop, which lets the reader finish before writers run and yields
flaky/scheduler-dependent behavior; change the reader to sample
changefeedDB.GetByID(cfID).GetInfo() inside its loop (or use a simple start
barrier such as a sync.WaitGroup/channel so both goroutines begin
simultaneously) and then check for SinkURI/TargetTs changes on each iteration
(update the storedChanged flag when a change is observed); reference the reader
call changefeedDB.GetByID(cfID).GetInfo(), the writer updates to
ret.SinkURI/ret.TargetTs, and the existing wg sync to coordinate starts.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 17782c56-d583-4a96-a3f3-02d6c2dcf96e
📒 Files selected for processing (5)
coordinator/controller.gocoordinator/controller_test.goserver/module_election.goserver/module_election_test.goserver/server.go
🚧 Files skipped from review as they are similar to previous changes (1)
- server/module_election_test.go
|
/test all |
|
In response to a cherrypick label: new pull request created to branch |
What problem does this PR solve?
Issue Number: close #4831
What is changed and how it works?
The coordinator was constructed with hard-coded scheduling settings:
10000time.MinuteThis bypassed
Debug.Scheduler.MaxTaskConcurrencyandDebug.Scheduler.CheckBalanceInterval. During bulk changefeed creation, the basic scheduler could therefore schedule a very large number of absent changefeeds at once, causing many maintainer bootstraps to run concurrently.This PR reads the coordinator scheduler settings from the validated global server config before constructing the coordinator. With the default config, maintainer scheduling concurrency returns to
10, which reduces creation-time memory and CPU spikes by throttling concurrent bootstrap work.Check List
Tests
go test ./serverQuestions
Will it cause performance regression or break compatibility?
It should reduce CPU and memory spikes during bulk changefeed creation by honoring the existing scheduler concurrency config. It may make very large bulk creation finish more gradually compared with the previous hard-coded
10000concurrency, but that behavior matches the intended configurable scheduler limit and can be tuned via server config.Do you need to update user documentation, design documentation or monitoring documentation?
No. This PR makes existing scheduler config effective for coordinator scheduling.
Release note
Summary by CodeRabbit
Chores
Bug Fixes / Behavior Changes
Tests