feat(spider-scheduler): Add channel-based dispatch queue implementation.#332
feat(spider-scheduler): Add channel-based dispatch queue implementation.#332LinZhihao-723 wants to merge 8 commits into
Conversation
…/spider into scheduler-skeleton
WalkthroughThis pull request introduces the ChangesSpider-scheduler infrastructure and TaskId migration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@components/spider-scheduler/src/dispatch_queue.rs`:
- Around line 82-84: The queue must stamp session IDs at enqueue time instead of
inferring them at dequeue: modify the internal queue element type to store
(SessionId, TaskAssignment) and change DispatchQueueWriter::enqueue to read the
current session (from wherever bump_session_id updates it) and push the stamped
pair atomically; update DispatchQueue::dequeue to return the stamped pair (or
convert it to the existing return shape) rather than attaching the session on
receive. Alternatively, if you prefer serialization instead of changing the item
type, guard both DispatchQueueWriter::enqueue and
DispatchQueueWriter::bump_session_id with the same writer-side lock/shared mutex
so bump and enqueue cannot interleave; also ensure the cloneable
DispatchQueueWriter shares that lock (or remove Clone) so concurrent clones
cannot bypass the synchronization. Ensure corresponding consumer code (dequeue
handling) and any helpers that construct or consume TaskAssignment are updated
to the new (SessionId, TaskAssignment) shape.
- Around line 93-97: The writer currently holds a hidden clone of
assignment_receiver (DispatchQueueWriterInner / create_dispatch_queue),
preventing the channel from closing when all public DispatchQueueReader
instances are dropped so DispatchQueueWriter::enqueue never returns
DispatchQueueClosed; remove the stored clone from DispatchQueueWriterInner (do
not clone assignment_receiver into the writer in create_dispatch_queue) so only
real readers hold receiver handles, allowing async_channel::Sender::send to fail
and map to SchedulerError::DispatchQueueClosed when readers are gone.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 8c98dfdc-5b9c-4dfb-8017-d96d0b1b0205
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (19)
Cargo.tomlcomponents/spider-core/src/types/id.rscomponents/spider-scheduler/Cargo.tomlcomponents/spider-scheduler/src/core.rscomponents/spider-scheduler/src/dispatch_queue.rscomponents/spider-scheduler/src/error.rscomponents/spider-scheduler/src/lib.rscomponents/spider-scheduler/src/storage_client.rscomponents/spider-scheduler/src/types.rscomponents/spider-storage/src/cache.rscomponents/spider-storage/src/cache/job.rscomponents/spider-storage/src/task_instance_pool.rscomponents/spider-storage/tests/scheduling_infra.rscomponents/spider-tdl/src/task.rscomponents/spider-tdl/src/task_context.rscomponents/spider-tdl/tests/test_task_macro.rstests/huntsman/task-executor/src/lib.rstests/huntsman/task-executor/tests/test_process_pool.rstests/huntsman/tdl-integration/tests/complex.rs
💤 Files with no reviewable changes (1)
- components/spider-storage/src/cache.rs
| /// The current implementation assumes that `enqueue` and `bump_session_id` will not be called | ||
| /// concurrently: `bump_session_id` must be called before consequent `enqueue` calls to make session | ||
| /// ID consistent with the enqueued assignments. |
There was a problem hiding this comment.
Stamp the session at enqueue time instead of inferring it at dequeue time.
Right now correctness depends on the comment at Lines 82-84, but the public API does not enforce that constraint: DispatchQueueWriter is cloneable, enqueue does not synchronise with bump_session_id, and dequeue attaches whatever session is current when the item is received. A concurrent writer can therefore enqueue an assignment just before or during a bump and have it delivered under the wrong session, or drained unexpectedly.
Please make the queue carry (SessionId, TaskAssignment) entries, or otherwise serialise enqueue and bump_session_id behind the same writer-side lock so the session is fixed when the assignment enters the queue.
Also applies to: 92-112
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@components/spider-scheduler/src/dispatch_queue.rs` around lines 82 - 84, The
queue must stamp session IDs at enqueue time instead of inferring them at
dequeue: modify the internal queue element type to store (SessionId,
TaskAssignment) and change DispatchQueueWriter::enqueue to read the current
session (from wherever bump_session_id updates it) and push the stamped pair
atomically; update DispatchQueue::dequeue to return the stamped pair (or convert
it to the existing return shape) rather than attaching the session on receive.
Alternatively, if you prefer serialization instead of changing the item type,
guard both DispatchQueueWriter::enqueue and DispatchQueueWriter::bump_session_id
with the same writer-side lock/shared mutex so bump and enqueue cannot
interleave; also ensure the cloneable DispatchQueueWriter shares that lock (or
remove Clone) so concurrent clones cannot bypass the synchronization. Ensure
corresponding consumer code (dequeue handling) and any helpers that construct or
consume TaskAssignment are updated to the new (SessionId, TaskAssignment) shape.
| self.inner | ||
| .assignment_sender | ||
| .send(assignment) | ||
| .await | ||
| .map_err(|_| SchedulerError::DispatchQueueClosed) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In async-channel 2.3.x, if a Sender and an internally retained cloned Receiver both still exist, will Sender::send continue to treat the channel as open after all externally exposed Receiver handles are dropped? Please cite the async-channel documentation for the close semantics.
💡 Result:
Sender::send continues to treat the channel as open as long as there is at least one live (externally or internally retained) Receiver handle; the channel only becomes closed once all Receivers are dropped (or the Sender/Receiver is closed manually). [1][2] In async-channel, “When all Sender(s) or all Receiver(s) are dropped, the channel becomes closed.” [1] That means dropping externally exposed Receiver clones does not close the channel if another Receiver clone is still alive internally; Sender::send will only start returning an error once the channel is closed. [1][2] Source support for close semantics: - The crate-level docs state: “When all Senders or all Receiver`s are dropped, the channel becomes closed. When a channel is closed, no more messages can be sent …” [1] - Sender::send docs state: “If the channel is closed, this method returns an error.” [2] Therefore, in your scenario (a retained cloned Receiver still exists), Sender::send will keep treating the channel as open until that retained Receiver (and any other Receiver handles) are dropped, or until the sender/receiver is closed manually. [1][2]
Citations:
- 1: https://docs.rs/crate/async-channel/2.3.0
- 2: https://docs.rs/async-channel/latest/async_channel/struct.Sender.html
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="components/spider-scheduler/src/dispatch_queue.rs"
# Show relevant sections around 70-110 and 150-210
sed -n '60,120p' "$FILE"
echo "----"
sed -n '140,210p' "$FILE"
echo "----"
# Also locate assignment_receiver usages
rg -n "assignment_receiver|assignment_sender|DispatchQueueWriterInner|DispatchQueueReader|enqueue|send\\(" "$FILE"Repository: y-scope/spider
Length of output: 7460
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="components/spider-scheduler/src/dispatch_queue.rs"
sed -n '60,120p' "$FILE"
echo "----"
sed -n '140,210p' "$FILE"
echo "----"
rg -n "assignment_receiver|assignment_sender|DispatchQueueWriterInner|DispatchQueueReader|enqueue|send\\(" "$FILE"Repository: y-scope/spider
Length of output: 7460
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="components/spider-scheduler/src/dispatch_queue.rs"
nl -ba "$FILE" | sed -n '85,115p'
echo "----"
nl -ba "$FILE" | sed -n '155,205p'Repository: y-scope/spider
Length of output: 101
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="components/spider-scheduler/src/dispatch_queue.rs"
echo "---- 85-125 ----"
cat -n "$FILE" | sed -n '85,125p'
echo "---- 155-205 ----"
cat -n "$FILE" | sed -n '155,205p'Repository: y-scope/spider
Length of output: 3516
Fix hidden retained receiver: it prevents enqueue from surfacing DispatchQueueClosed when readers are dropped.
DispatchQueueWriter::enqueue maps async_channel::Sender::send failures to SchedulerError::DispatchQueueClosed (lines 92-98), but create_dispatch_queue clones and stores assignment_receiver inside DispatchQueueWriterInner (lines 168-172, 187-191). With async-channel, the channel only becomes closed once all receiver handles are dropped, so dropping every public DispatchQueueReader leaves this internal receiver alive and keeps the channel open; send will then just wait/back-pressure once the bounded buffer fills instead of returning DispatchQueueClosed.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@components/spider-scheduler/src/dispatch_queue.rs` around lines 93 - 97, The
writer currently holds a hidden clone of assignment_receiver
(DispatchQueueWriterInner / create_dispatch_queue), preventing the channel from
closing when all public DispatchQueueReader instances are dropped so
DispatchQueueWriter::enqueue never returns DispatchQueueClosed; remove the
stored clone from DispatchQueueWriterInner (do not clone assignment_receiver
into the writer in create_dispatch_queue) so only real readers hold receiver
handles, allowing async_channel::Sender::send to fail and map to
SchedulerError::DispatchQueueClosed when readers are gone.
Description
This PR depends on #330 and #331.
This PR implements an async-channel-backed dispatch queue. It also adds unit tests to cover its basic behaviors.
Checklist
breaking change.
Validation performed
Summary by CodeRabbit
Release Notes
New Features
Refactor