Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"components/spider-core",
"components/spider-derive",
"components/spider-execution-manager",
"components/spider-scheduler",
"components/spider-storage",
"components/spider-task-executor",
"components/spider-tdl",
Expand Down
47 changes: 14 additions & 33 deletions components/spider-core/src/types/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use serde::{Deserialize, Serialize};
use sqlx::{Database, encode::IsNull};
use uuid::Uuid;

use crate::task::TaskIndex;

/// A generic identifier type that wraps a UUID and a type marker.
///
/// # Type Parameters:
Expand Down Expand Up @@ -96,9 +98,18 @@ pub type UuidBytes = uuid::Bytes;
pub enum ResourceGroupIdMarker {}
pub type ResourceGroupId = Id<ResourceGroupIdMarker>;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum TaskIdMarker {}
pub type TaskId = Id<TaskIdMarker>;
/// Identifier of a task inside a job.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum TaskId {
/// The index of the task in the job's task graph.
Index(TaskIndex),

/// The commit task.
Commit,

/// The cleanup task.
Cleanup,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum JobIdMarker {}
Expand Down Expand Up @@ -169,33 +180,3 @@ where
}

pub type SignedJobId = SignedId<JobIdMarker>;

pub type SignedTaskId = SignedId<TaskIdMarker>;

#[cfg(test)]
mod tests {
use std::any::TypeId;

use super::*;

#[test]
fn test_id_basic() {
let id = TaskId::new();
let underlying_uuid = id.as_uuid_ref().to_owned();
assert_eq!(id, TaskId::from(underlying_uuid));

assert_ne!(TypeId::of::<TaskId>(), TypeId::of::<JobId>());
}

#[test]
fn task_id_json_roundtrip() {
let id = TaskId::new();
let deserialized_id: TaskId = serde_json::from_str(
serde_json::to_string(&id)
.expect("JSON serialization failure")
.as_str(),
)
.expect("JSON deserialization failure");
assert_eq!(id, deserialized_id);
}
}
24 changes: 24 additions & 0 deletions components/spider-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "spider-scheduler"
version = "0.1.0"
edition = "2024"

[lib]
name = "spider_scheduler"
path = "src/lib.rs"

[dependencies]
async-channel = "2.3.1"
async-trait = "0.1.89"
serde = { version = "1.0.228", features = ["derive"] }
spider-core = { path = "../spider-core" }
thiserror = "2.0.18"
tokio = { version = "1.52.3", features = ["macros", "rt", "sync", "time"] }
tokio-util = "0.7.18"
tracing = { version = "0.1.41", default-features = false, features = ["std"] }

[dev-dependencies]
anyhow = "1.0.102"
dashmap = "6.1.0"
tokio = { version = "1.52.3", features = ["macros", "rt-multi-thread"] }
tokio-util = { version = "0.7.18", features = ["rt"] }
47 changes: 47 additions & 0 deletions components/spider-scheduler/src/core.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//! The abstract core of a Spider scheduler.

use async_trait::async_trait;

use crate::{
dispatch_queue::DispatchQueueSink,
error::SchedulerError,
storage_client::SchedulerStorageClient,
};

/// An abstracted core for a scheduling algorithm.
///
/// A core owns its decision loop: it polls the inbound queue through a [`SchedulerStorageClient`],
/// applies its algorithm (reading storage as needed for placement), and writes assignments to a
/// [`DispatchQueueSink`]. Modeling the algorithm as a trait lets different scheduling strategies
/// share the same runtime entry point.
#[async_trait]
pub trait SchedulerCore: Send {
/// The dispatch sink the core writes assignments to.
type Sink: DispatchQueueSink;

/// The storage client used by the core to poll and read for placement decisions.
type StorageClient: SchedulerStorageClient;

/// Runs the scheduling loop until `cancellation_token` is triggered.
///
/// The core polls the inbound queue through `storage_client`, applies its scheduling algorithm,
/// and writes assignments to `sink`, repeating until `cancellation_token` is fired, at which
/// point it returns.
///
/// # Parameters
///
/// * `storage_client` - The storage client used to poll the inbound queue and read state for
/// placement.
/// * `sink` - The dispatch sink that assignments are written to.
/// * `cancellation_token` - The token to signal the scheduling loop to stop.
///
/// # Errors
///
/// Returns a [`SchedulerError`] instance indicating an irrecoverable error.
async fn run(
self,
storage_client: Self::StorageClient,
sink: Self::Sink,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), SchedulerError>;
}
3 changes: 3 additions & 0 deletions components/spider-scheduler/src/core_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod round_robin;

pub use round_robin::*;
Loading
Loading