feat(spider-scheduler): Add round-robin scheduler core implementation.#334
feat(spider-scheduler): Add round-robin scheduler core implementation.#334LinZhihao-723 wants to merge 18 commits into
Conversation
…/spider into scheduler-skeleton
WalkthroughThis PR introduces the Spider scheduler component into the workspace while refactoring the ChangesTaskId Type Refactor
Spider Scheduler Implementation
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
components/spider-scheduler/src/core_impl/round_robin/implementation.rs (1)
309-309: ⚡ Quick winReduce per-tick log level to debug/trace.
These are hot-path logs and currently emit every tick at
info, which can be noisy and expensive under small tick intervals. Consider downgrading todebug(or sampling).Also applies to: 824-828
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/spider-scheduler/src/core_impl/round_robin/implementation.rs` at line 309, Change the hot-path tracing::info!("Starting scheduling tick."); to a lower-volume level (tracing::debug! or tracing::trace!) and do the same for the other similar per-tick info logs (the other tracing::info! calls that log scheduling tick/start messages). If you want finer control, wrap the call with a sampling or feature-gated conditional (e.g., tracing::debug!(target: "scheduler", ...) or use tracing::span/Level::TRACE) so frequent ticks don’t spam logs; update the same log invocations that emit per-tick messages to use the chosen lower level.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@components/spider-scheduler/src/core_impl/round_robin/implementation.rs`:
- Around line 241-242: The finalizing_job_queue currently stores timestamps as
SystemTime which makes elapsed checks fallible; change the queue element type
from (JobId, SystemTime) to (JobId, Instant) and replace uses of
SystemTime::now() / SystemTime::elapsed() with Instant::now() and
Instant::elapsed() so expiry checks are monotonic and non-fallible; update all
code paths that push into finalizing_job_queue and that check/compare expiry
(the retirement/expiry logic that inspects finalizing_job_queue and computes
elapsed) to use Instant-based semantics and adjust any test/timeout constants if
needed.
- Around line 552-555: The buffered_tasks dedup uses inbound_entry.task_id
directly, but downstream logic normalizes finalizing-lane IDs to
TaskId::Commit/TaskId::Cleanup; update the dedupe key by canonicalizing
inbound_entry.task_id for finalizing lanes before inserting/checking
buffered_tasks (i.e., map any equivalent/alias finalizing IDs to the canonical
TaskId::Commit or TaskId::Cleanup via a small match/normalize function) so
buffered entries match what dispatch/removal expects and cannot survive
indefinitely; apply the same canonicalization in both locations that currently
insert/check buffered_tasks (the spots referencing
buffered_tasks.insert((inbound_entry.job_id, inbound_entry.task_id))).
In `@components/spider-scheduler/src/dispatch_queue.rs`:
- Around line 82-84: The comment notes enqueue and bump_session_id must not run
concurrently but the code doesn't enforce it; modify DispatchQueue so both
enqueue (the send path using cloned writers) and bump_session_id (the
draining/rewind path) acquire the same lock before operating: move session_id
and the sender/receiver drain logic under a shared Mutex (or use a single RwLock
with exclusive write for bump_session_id and read for enqueue) so enqueue cannot
send while bump_session_id is draining; update functions enqueue and
bump_session_id to lock that mutex before accessing session_id or performing
send/drain to prevent dropped assignments.
---
Nitpick comments:
In `@components/spider-scheduler/src/core_impl/round_robin/implementation.rs`:
- Line 309: Change the hot-path tracing::info!("Starting scheduling tick."); to
a lower-volume level (tracing::debug! or tracing::trace!) and do the same for
the other similar per-tick info logs (the other tracing::info! calls that log
scheduling tick/start messages). If you want finer control, wrap the call with a
sampling or feature-gated conditional (e.g., tracing::debug!(target:
"scheduler", ...) or use tracing::span/Level::TRACE) so frequent ticks don’t
spam logs; update the same log invocations that emit per-tick messages to use
the chosen lower level.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 7aa4dcab-6b24-416b-8681-416cee8217ab
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (23)
Cargo.tomlcomponents/spider-core/src/types/id.rscomponents/spider-scheduler/Cargo.tomlcomponents/spider-scheduler/src/core.rscomponents/spider-scheduler/src/core_impl/mod.rscomponents/spider-scheduler/src/core_impl/round_robin/implementation.rscomponents/spider-scheduler/src/core_impl/round_robin/mod.rscomponents/spider-scheduler/src/core_impl/round_robin/tests.rscomponents/spider-scheduler/src/dispatch_queue.rscomponents/spider-scheduler/src/error.rscomponents/spider-scheduler/src/lib.rscomponents/spider-scheduler/src/storage_client.rscomponents/spider-scheduler/src/types.rscomponents/spider-storage/src/cache.rscomponents/spider-storage/src/cache/job.rscomponents/spider-storage/src/task_instance_pool.rscomponents/spider-storage/tests/scheduling_infra.rscomponents/spider-tdl/src/task.rscomponents/spider-tdl/src/task_context.rscomponents/spider-tdl/tests/test_task_macro.rstests/huntsman/task-executor/src/lib.rstests/huntsman/task-executor/tests/test_process_pool.rstests/huntsman/tdl-integration/tests/complex.rs
💤 Files with no reviewable changes (1)
- components/spider-storage/src/cache.rs
| pub(super) finalizing_job_queue: VecDeque<(JobId, SystemTime)>, | ||
|
|
There was a problem hiding this comment.
Use monotonic time for finalizing-job expiry.
SystemTime::elapsed() is fallible on clock rollback. That propagates as an error and can terminate the scheduler loop. Use Instant for expiry tracking so retirement is monotonic and non-fallible.
Proposed fix
use std::{
collections::{HashMap, HashSet, VecDeque},
- time::{Duration, SystemTime},
+ time::{Duration, Instant},
};
...
- pub(super) finalizing_job_queue: VecDeque<(JobId, SystemTime)>,
+ pub(super) finalizing_job_queue: VecDeque<(JobId, Instant)>,
...
fn mark_job_finalizing(&mut self, job_id: JobId) {
if self.finalizing_jobs.insert(job_id) {
self.finalizing_job_queue
- .push_back((job_id, SystemTime::now()));
+ .push_back((job_id, Instant::now()));
}
}
...
- fn retire_expired_finalizing_jobs(&mut self) -> Result<(), SchedulerError> {
+ fn retire_expired_finalizing_jobs(&mut self) -> Result<(), SchedulerError> {
const EXPIRATION_TIME: Duration = Duration::from_hours(6);
while let Some((job_id, insertion_time)) = self.finalizing_job_queue.front() {
- if insertion_time.elapsed()? > EXPIRATION_TIME {
+ if insertion_time.elapsed() > EXPIRATION_TIME {
...
} else {
break;
}
}
Ok(())
}Also applies to: 456-457, 471-475
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@components/spider-scheduler/src/core_impl/round_robin/implementation.rs`
around lines 241 - 242, The finalizing_job_queue currently stores timestamps as
SystemTime which makes elapsed checks fallible; change the queue element type
from (JobId, SystemTime) to (JobId, Instant) and replace uses of
SystemTime::now() / SystemTime::elapsed() with Instant::now() and
Instant::elapsed() so expiry checks are monotonic and non-fallible; update all
code paths that push into finalizing_job_queue and that check/compare expiry
(the retirement/expiry logic that inspects finalizing_job_queue and computes
elapsed) to use Instant-based semantics and adjust any test/timeout constants if
needed.
Description
This PR depends on #332.
This PR implements a round-robin scheduling algorithm.
The scheduler operates in discrete, interval-paced ticks. Each tick consumes the results of an asynchronous inbound-queue poll (all three lanes are polled concurrently per round, throttled by the remaining buffer capacities), loads new tasks into the internal buffers, and then makes scheduling decisions until the dispatch queue reaches capacity. Key behaviors:
tracinginstrumentation across the scheduling flow (startup/shutdown, session bumps, job state transitions, finalizing-gate changes, and decision-loop summaries).The module is laid out as
round_robin/{mod.rs, implementation.rs, tests.rs}, with internals exposed to the sibling test module viapub(super)and onlyRoundRobinConfig/RoundRobinCoreexported.Checklist
breaking change.
Validation performed
Summary by CodeRabbit
Release Notes
New Features
Refactor