[rust-scheduler] Replace round-robin ClusterFeed with priority-queue scheduling#2317
Conversation
…scheduling Fixes AcademySoftwareFoundation#2315. The ClusterFeed previously iterated clusters with a fixed-step round-robin and applied lap-based 10ms/100ms/5s sleeps between rounds. Every cluster received equal air-time regardless of work, and a sleeping cluster added delay to neighboring clusters. Replace the Vec<Cluster> + AtomicUsize index with a BinaryHeap<Scheduled> ordered by (next_eligible_at asc, last_dispatched_jobs desc). The feed now pops the highest-priority cluster, sleeps until eligible (preemptible via Notify), emits, and waits for a Done message that re-inserts the cluster with updated stats. Lap-based sleep tiers and the parallel sleep_map are removed; eligibility now lives on each heap entry. Cluster feed changes (cluster.rs): - Added Scheduled { cluster, next_eligible_at, last_dispatched_jobs } with Ord prioritizing earliest eligibility, then busiest cluster on ties (productivity bias). - ClusterFeed now holds Arc<Mutex<BinaryHeap<Scheduled>>> + Notify. - FeedMessage::Sleep / Stop() collapsed into FeedMessage::Done { cluster, processed_jobs, sleep } + unit-variant Stop. - stream() dispatch loop now pops, sleeps with preemption, and emits. - Control loop re-inserts on Done and notifies the dispatch loop. - Added ClusterFeed::load_from_clusters shim for tests. Consumer (pipeline/entrypoint.rs): - Always sends FeedMessage::Done with processed_jobs and optional back-off. - Preserves should_stop / empty_job_cycles logic. Config (config/mod.rs): - Added cluster_empty_back_off: Duration (default 3s), replacing hardcoded entrypoint back-off. - Added cluster_productivity_bias: bool (default true), toggling busy-cluster tie-breaking. Observability (metrics/mod.rs): - Added scheduler_cluster_polls_total {show_id, facility_id} CounterVec. - Added scheduler_cluster_rounds_total Counter (mirrors CLUSTER_ROUNDS). - Added scheduler_cluster_last_dispatched_jobs {show_id, facility_id} Gauge, updated from the Done handler to expose the productivity-bias signal. Tests: - Added 6 unit tests covering Scheduled ordering, sleep-back-off timing, Done round-trip, and a 500ms mixed-workload proportionality test asserting busy clusters receive >2x the polls of idle ones. - Updated tests/util.rs for the two new QueueConfig fields.
📝 WalkthroughWalkthroughThe scheduler's core dispatch mechanism is redesigned from round-robin iteration with a sleep-map to a priority-queue architecture. A new ChangesPriority-Queue Cluster Scheduler
Sequence Diagram(s)sequenceDiagram
participant Dispatch as Dispatch Task
participant Heap as BinaryHeap
participant Control as Control Task
participant Sender as Cluster Consumer
participant RecvMsg as FeedMessage Receiver
Dispatch->>Heap: Pop best Scheduled (by next_eligible_at, then productivity bias)
Dispatch->>Dispatch: Sleep until next_eligible_at with Notify preemption
Dispatch->>Sender: Send selected Cluster
Dispatch->>Dispatch: Loop or stop
Control->>RecvMsg: Recv FeedMessage
alt FeedMessage::Done
Control->>Control: Compute next_eligible_at from optional sleep duration
Control->>Control: Record processed_jobs via metrics
Control->>Heap: Insert updated Scheduled with new state
Control->>Dispatch: Notify preemption to wake early
else FeedMessage::Stop
Control->>Control: Set stop_flag
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rust/crates/scheduler/src/cluster.rs`:
- Around line 539-583: The control loop currently only sets stop_flag_ctrl and
notifies on receiving FeedMessage::Stop, but it does not handle the case where
control_receiver.recv().await returns None (all senders dropped), allowing the
dispatcher to spin; update the async loop that awaits
control_receiver.recv().await to detect the None (channel closed) case and in
that branch call stop_flag_ctrl.store(true, Ordering::Relaxed) and
notify_ctrl.notify_one(), then break the loop; keep the existing handling for
FeedMessage::Done (pushing Scheduled into queue_ctrl) and FeedMessage::Stop
unchanged.
- Around line 98-123: The bug is that Ord::cmp can return Equal while
PartialEq::eq considers last_dispatched_jobs, breaking the Ord/Eq contract; fix
by making cmp consider the exact same fields used in eq (next_eligible_at,
last_dispatched_jobs, cluster) in some deterministic order so Ordering::Equal
implies equality. Concretely, change the logic in Scheduled::cmp to always
compare next_eligible_at first, then compare last_dispatched_jobs, then cluster;
if CONFIG.queue.cluster_productivity_bias should increase priority of
last_dispatched_jobs, implement that by moving the last_dispatched_jobs
comparison earlier (but still include it regardless), otherwise keep it as the
second comparison — do not skip comparing last_dispatched_jobs when bias is
false. Ensure the final tie-breaker remains self.cluster.cmp(&other.cluster).
In `@rust/crates/scheduler/src/pipeline/entrypoint.rs`:
- Around line 105-108: The Err(err) branch currently returns (0, true) which
sets should_stop and triggers FeedMessage::Stop; change this to avoid halting
the whole scheduler by not setting should_stop on a single fetch error: log the
error with cluster.id via error!("Failed to fetch job for cluster {}: {}",
cluster.id, err), return (0, false) to skip this cluster for now, and either
implement simple retry logic in the fetch path or add a per-cluster error
counter (e.g. in the cluster state used by the loop) and only set
should_stop/emit FeedMessage::Stop when that counter exceeds a threshold; update
the handling around the fetch call (the code that expects (count, should_stop))
and ensure tests cover transient failures.
- Around line 91-101: The shared atomic counter cycles_without_jobs is
read-after-write by separate tasks, causing TOCTOU races in the stop
calculation; replace the current fetch_add/store/load pattern inside the block
that checks CONFIG.queue.empty_job_cycles_before_quiting with an atomic update
that yields a consistent new value (e.g., use fetch_add(previous) and compute
new_count = prev + 1 when processed == 0, and use swap(0) or fetch_update to
reset to 0 when processed != 0) and base stop on that single atomic result
rather than a separate load; update uses of cycles_without_jobs, processed, and
the stop determination in the same branch (the code around the stop variable and
the for_each_concurrent section) and consider using Ordering::SeqCst for the
operations to ensure strict ordering across threads.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 49579147-8cbd-4442-b499-869b9539a741
📒 Files selected for processing (5)
rust/crates/scheduler/src/cluster.rsrust/crates/scheduler/src/config/mod.rsrust/crates/scheduler/src/metrics/mod.rsrust/crates/scheduler/src/pipeline/entrypoint.rsrust/crates/scheduler/tests/util.rs
- Scheduled::cmp now always compares last_dispatched_jobs as a final tiebreaker so Ord agrees with PartialEq regardless of the cluster_productivity_bias toggle. Required by BinaryHeap. - Control loop now sets stop_flag and notifies the dispatcher when the control channel closes (e.g. consumer panic), preventing the dispatch loop from spinning on the empty-queue wake cycle. - Added ord_and_eq_stay_consistent_for_differing_jobs regression test.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rust/crates/scheduler/src/pipeline/entrypoint.rs (1)
71-123:⚠️ Potential issue | 🟠 Major | ⚡ Quick winCount fetch failures toward the empty-cycle stop threshold.
This branch now treats query errors as empty only for back-off, not for
empty_job_cycles_before_quiting:Err(err)returns(0, false), and the stop counter is only updated in theOk(jobs)path. If the DAO is down for every cluster, the scheduler will never trip the configured safety net and can loop forever.♻️ Minimal fix
- let (processed_count, should_stop) = match jobs { + let processed_count = match jobs { Ok(jobs) => { metrics::increment_jobs_queried(jobs.len()); let processed_jobs = AtomicUsize::new(0); stream::iter(jobs) @@ - let processed = processed_jobs.load(Ordering::Relaxed); - - let stop = match CONFIG.queue.empty_job_cycles_before_quiting { - Some(limit) => { - // Combine the update and read into a single atomic - // operation so concurrent clusters can't race between - // a fetch_add/store and a separate load. SeqCst gives - // us a strict total order across all clusters. - let new_count = if processed == 0 { - cycles_without_jobs.fetch_add(1, Ordering::SeqCst) + 1 - } else { - cycles_without_jobs.swap(0, Ordering::SeqCst); - 0 - }; - new_count >= limit - } - None => false, - }; - - (processed, stop) + processed_jobs.load(Ordering::Relaxed) } Err(err) => { // Don't halt the entire scheduler for a single cluster's // fetch error (network blip, transient DB load). Treat // the cluster as empty for this cycle so it gets the @@ error!( "Failed to fetch jobs for cluster {}: {}", cluster, err ); - (0, false) + 0 } }; + + let should_stop = match CONFIG.queue.empty_job_cycles_before_quiting { + Some(limit) => { + let new_count = if processed_count == 0 { + cycles_without_jobs.fetch_add(1, Ordering::SeqCst) + 1 + } else { + cycles_without_jobs.swap(0, Ordering::SeqCst); + 0 + }; + new_count >= limit + } + None => false, + };🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rust/crates/scheduler/src/pipeline/entrypoint.rs` around lines 71 - 123, The Err branch currently returns (0, false) so fetch failures don't advance the empty-cycle safety counter; update the Err(err) handling to update cycles_without_jobs and compute the same stop boolean logic used in the Ok(jobs) path (i.e., increment cycles_without_jobs when treated-as-empty, reset it to 0 when not, and compare against CONFIG.queue.empty_job_cycles_before_quiting) and then return (0, stop) instead of (0, false); reference cycles_without_jobs, CONFIG.queue.empty_job_cycles_before_quiting, and the stop/limit logic in this function to mirror the behavior used when processed == 0.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@rust/crates/scheduler/src/pipeline/entrypoint.rs`:
- Around line 71-123: The Err branch currently returns (0, false) so fetch
failures don't advance the empty-cycle safety counter; update the Err(err)
handling to update cycles_without_jobs and compute the same stop boolean logic
used in the Ok(jobs) path (i.e., increment cycles_without_jobs when
treated-as-empty, reset it to 0 when not, and compare against
CONFIG.queue.empty_job_cycles_before_quiting) and then return (0, stop) instead
of (0, false); reference cycles_without_jobs,
CONFIG.queue.empty_job_cycles_before_quiting, and the stop/limit logic in this
function to mirror the behavior used when processed == 0.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2bd1a413-b156-4185-a997-14bb0ce83c7f
📒 Files selected for processing (2)
rust/crates/scheduler/src/cluster.rsrust/crates/scheduler/src/pipeline/entrypoint.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- rust/crates/scheduler/src/cluster.rs
|
Hi @DiegoTavares |
|
This PR is ready for review, but kept in draft because there are other changes from @DiegoTavares in progress that conflict with this PR. |
Related Issues
Summarize your change.
The ClusterFeed previously iterated clusters with a fixed-step round-robin and applied lap-based 10ms/100ms/5s sleeps between rounds. Every cluster received equal air-time regardless of work, and a sleeping cluster added delay to neighboring clusters.
Replace the Vec + AtomicUsize index with a BinaryHeap ordered by (next_eligible_at asc, last_dispatched_jobs desc). The feed now pops the highest-priority cluster, sleeps until eligible (preemptible via Notify), emits, and waits for a Done message that re-inserts the cluster with updated stats. Lap-based sleep tiers and the parallel sleep_map are removed; eligibility now lives on each heap entry.
Cluster feed changes (cluster.rs):
Consumer (pipeline/entrypoint.rs):
Config (config/mod.rs):
Observability (metrics/mod.rs):
Tests:
Review feedback addressed: