Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"components/spider-core",
"components/spider-derive",
"components/spider-execution-manager",
"components/spider-scheduler",
"components/spider-storage",
"components/spider-task-executor",
"components/spider-tdl",
Expand Down
14 changes: 14 additions & 0 deletions components/spider-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "spider-scheduler"
version = "0.1.0"
edition = "2024"

[lib]
name = "spider_scheduler"
path = "src/lib.rs"

[dependencies]
async-trait = "0.1.89"
spider-core = { path = "../spider-core" }
thiserror = "2.0.18"
tokio-util = "0.7.18"
47 changes: 47 additions & 0 deletions components/spider-scheduler/src/core.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//! The abstract core of a Spider scheduler.

use async_trait::async_trait;

use crate::{
dispatch_queue::DispatchQueueSink,
error::SchedulerError,
storage_client::SchedulerStorageClient,
};

/// An abstracted core for a scheduling algorithm.
///
/// A core owns its decision loop: it polls the inbound queue through a [`SchedulerStorageClient`],
/// applies its algorithm (reading storage as needed for placement), and writes assignments to a
/// [`DispatchQueueSink`]. Modeling the algorithm as a trait lets different scheduling strategies
/// share the same runtime entry point.
#[async_trait]
pub trait SchedulerCore: Send {
/// The storage client used by the core to poll and read for placement decisions.
type StorageClient: SchedulerStorageClient;

/// The dispatch sink the core writes assignments to.
type Sink: DispatchQueueSink;

/// Runs the scheduling loop until `cancellation_token` is triggered.
///
/// The core polls the inbound queue through `storage_client`, applies its scheduling algorithm,
/// and writes assignments to `sink`, repeating until `cancellation_token` is fired, at which
/// point it returns.
///
/// # Parameters
///
/// * `storage_client` - The storage client used to poll the inbound queue and read state for
/// placement.
/// * `sink` - The dispatch sink that assignments are written to.
/// * `cancellation_token` - The token to signal the scheduling loop to stop.
///
/// # Errors
///
/// Returns a [`SchedulerError`] instance indicating an irrecoverable error.
async fn run(
&mut self,
storage_client: Self::StorageClient,
sink: Self::Sink,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), SchedulerError>;
}
74 changes: 74 additions & 0 deletions components/spider-scheduler/src/dispatch_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//! The dispatching queue that decouples the scheduler core's placement decisions from the
//! execution-manager-facing service.

use std::time::Duration;

use async_trait::async_trait;
use spider_core::types::id::SessionId;

use crate::{error::SchedulerError, types::TaskAssignment};

/// The writer side of the dispatching queue used by the scheduler core.
#[async_trait]
pub trait DispatchQueueSink: Send + Sync + Clone {
/// Enqueues a task assignment for execution managers to consume.
///
/// # Parameters
///
/// * `assignment` - The task assignment to enqueue.
///
/// # Errors
///
/// Returns an error if:
///
/// * [`SchedulerError::DispatchQueueClosed`] if the dispatching queue is closed.
async fn enqueue(&self, assignment: TaskAssignment) -> Result<(), SchedulerError>;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have a batched enqueue method for better performance.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current planned implementation won't benefit from a batch operation:

  • The dispatch queue is implemented using async channel, meaning that all enqueue operations will be serialized.
  • The scheduler decision maker pops assignments from the queue one by one; a batch operation means we need to construct/destruct vector on top of the popped results, which introduces unnecessary overhead.


/// Bumps the session ID and invalidates all queued task assignments.
///
/// # Parameters
///
/// * `new_session_id` - The new session ID. Must be greater than the current session ID.
///
/// # Errors
///
/// Returns an error if:
///
/// * [`SchedulerError::DispatchQueueClosed`] if the dispatching queue is closed.
/// * [`SchedulerError::InvalidSessionId`] if the new session ID is not greater than the current
/// session ID.
async fn bump_session_id(&self, new_session_id: SessionId) -> Result<(), SchedulerError>;

/// # Returns
///
/// The current size of the dispatch queue.
fn size(&self) -> usize;
}

/// The reader side of the dispatching queue, drained by the execution-manager-facing service.
#[async_trait]
pub trait DispatchQueueSource: Send + Sync + Clone {
/// Dequeues the next task assignment for an execution manager to execute.
///
/// # Parameters
///
/// * `wait_time` - The maximum amount of time to wait for a task assignment.
///
/// # Returns
///
/// `None` if no task assignment is available within the specified wait time, or a tuple
/// containing:
///
/// * The storage session associated with the assignment.
/// * The next task assignment ready to execute.
///
/// # Errors
///
/// Returns an error if:
///
/// * [`SchedulerError::DispatchQueueClosed`] if the dispatching queue is closed.
async fn dequeue(
&self,
wait_time: Duration,
) -> Result<Option<(SessionId, TaskAssignment)>, SchedulerError>;
}
31 changes: 31 additions & 0 deletions components/spider-scheduler/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//! The error types used in this crate.

use spider_core::types::id::{JobId, SessionId};

/// Errors returned by [`crate::storage_client::SchedulerStorageClient`] operations.
#[derive(Debug, thiserror::Error)]
pub enum StorageClientError {
/// The inbound queue is closed and can no longer yield ready entries.
#[error("inbound queue is closed")]
InboundClosed,

/// No job with the requested identifier exists.
#[error("job not found: {0:?}")]
JobNotFound(JobId),
}

/// Errors returned by the scheduler runtime and its components.
#[derive(Debug, thiserror::Error)]
pub enum SchedulerError {
/// Forwarded from the storage client.
#[error(transparent)]
Storage(#[from] StorageClientError),

/// The dispatching queue is closed and can no longer accept assignments.
#[error("dispatching queue is closed")]
DispatchQueueClosed,

/// The session ID is invalid.
#[error("invalid session ID: {0:?}")]
InvalidSessionId(SessionId),
}
46 changes: 46 additions & 0 deletions components/spider-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//! Trait and type abstractions for the Spider scheduler.
//!
//! The scheduler is the serial decision maker that turns ready tasks discovered by the storage
//! layer into assignments for execution managers. It owns placement and ordering policy, not
//! dependency resolution: storage decides *what* is ready, and the scheduler decides *in what
//! order* and *with what throttling* ready tasks are offered to the fleet.
//!
//! The crate defines three trait seams wired into a single pipeline — a storage client that polls
//! the ready queue, a core that makes serial decisions, and a dispatching queue that fans those
//! decisions out to execution managers:
//!
//! ```text
//! storage ── authoritative ready queue (owned by the storage layer, not this crate)
//! │
//! │ poll_ready / poll_commit_ready / poll_cleanup_ready (SchedulerStorageClient)
//! ▼
//! ┌───────────────────┐
//! │ SchedulerCore │ serial loop: poll → decide → enqueue
//! └───────────────────┘
//! │
//! │ enqueue (DispatchQueueSink — writer side)
//! ▼
//! ┌───────────────────┐
//! │ dispatch queue │ bounded SPMC; a full queue back-pressures the core
//! └───────────────────┘
//! │
//! │ dequeue (DispatchQueueSource — reader side)
//! ▼
//! ┌───────────────────┐
//! │ scheduler service │ ──▶ execution managers (concurrent fan-out)
//! └───────────────────┘
//! ```

pub mod core;
pub mod dispatch_queue;
pub mod error;
pub mod storage_client;
pub mod types;

pub use crate::{
core::SchedulerCore,
dispatch_queue::{DispatchQueueSink, DispatchQueueSource},
error::{SchedulerError, StorageClientError},
storage_client::SchedulerStorageClient,
types::{InboundEntry, TaskAssignment},
};
114 changes: 114 additions & 0 deletions components/spider-scheduler/src/storage_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//! The scheduler's view of the storage layer, abstracting inbound polling and placement-time reads.

use std::time::Duration;

use async_trait::async_trait;
use spider_core::{
job::JobState,
types::id::{JobId, SessionId},
};

use crate::{error::StorageClientError, types::InboundEntry};

/// The scheduler's view of the storage layer.
///
/// Abstracts the storage-owned inbound queue and the read-only queries a scheduling algorithm
/// needs to make placement decisions. Modeled as a trait so the scheduler runtime can be driven by
/// a real storage client in production or a mock in tests.
#[async_trait]
pub trait SchedulerStorageClient: Send + Sync + Clone {
/// Polls the regular-task lane of the storage-owned inbound queue for ready tasks.
///
/// # Parameters
///
/// * `max_items` - The maximum number of entries to return from a single poll.
/// * `wait` - The maximum duration to block waiting for ready entries on the storage side.
///
/// # Returns
///
/// A tuple on success, containing:
///
/// * The storage session the poll was served under.
/// * The ready regular tasks drained from the lane.
///
/// # Errors
///
/// Returns an error if:
///
/// * [`StorageClientError::InboundClosed`] if the regular-task lane is closed and can no longer
/// yield entries.
async fn poll_ready(
&self,
max_items: usize,
wait: Duration,
) -> Result<(SessionId, Vec<InboundEntry>), StorageClientError>;

/// Polls the commit-task lane of the storage-owned inbound queue for ready tasks.
///
/// # Parameters
///
/// * `max_items` - The maximum number of entries to return from a single poll.
/// * `wait` - The maximum duration to block waiting for ready entries on the storage side.
///
/// # Returns
///
/// A tuple on success, containing:
///
/// * The storage session the poll was served under.
/// * The ready commit tasks drained from the lane.
///
/// # Errors
///
/// Returns an error if:
///
/// * [`StorageClientError::InboundClosed`] if the commit-task lane is closed and can no longer
/// yield entries.
async fn poll_commit_ready(
&self,
max_items: usize,
wait: Duration,
) -> Result<(SessionId, Vec<InboundEntry>), StorageClientError>;

/// Polls the cleanup-task lane of the storage-owned inbound queue for ready tasks.
///
/// # Parameters
///
/// * `max_items` - The maximum number of entries to return from a single poll.
/// * `wait` - The maximum duration to block waiting for ready entries on the storage side.
///
/// # Returns
///
/// A tuple on success, containing:
///
/// * The storage session the poll was served under.
/// * The ready cleanup tasks drained from the lane.
///
/// # Errors
///
/// Returns an error if:
///
/// * [`StorageClientError::InboundClosed`] if the cleanup-task lane is closed and can no longer
/// yield entries.
async fn poll_cleanup_ready(
&self,
max_items: usize,
wait: Duration,
) -> Result<(SessionId, Vec<InboundEntry>), StorageClientError>;

/// Reads the current state of a job.
///
/// # Parameters
///
/// * `job_id` - The identifier of the job to query.
///
/// # Returns
///
/// The job's current [`JobState`] on success.
///
/// # Errors
///
/// Returns an error if:
///
/// * [`StorageClientError::JobNotFound`] if no job with the given identifier exists.
async fn job_state(&self, job_id: JobId) -> Result<JobState, StorageClientError>;
}
33 changes: 33 additions & 0 deletions components/spider-scheduler/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//! The data types the scheduler exchanges with the storage layer and execution managers.

use spider_core::types::id::{JobId, ResourceGroupId, TaskId};

/// A ready task drained from the storage-owned inbound queue.
///
/// The storage client flattens storage's three ready lanes (regular, commit, and cleanup tasks)
/// into this uniform entry, resolving each to its [`TaskId`] so the scheduler core can treat every
/// ready task identically.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InboundEntry {
/// The resource group that owns the job.
pub resource_group_id: ResourceGroupId,

/// The job the task belongs to.
pub job_id: JobId,

/// The ready task.
pub task_id: TaskId,
}

/// A task placement decision written by the scheduler core to the dispatching queue.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TaskAssignment {
/// The resource group that owns the job.
pub resource_group_id: ResourceGroupId,

/// The job the task belongs to.
pub job_id: JobId,

/// The task to dispatch.
pub task_id: TaskId,
}
Loading