Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
6e812cc
feat(spider-task-executor): Add executor binary with bincode wire pro…
LinZhihao-723 May 24, 2026
86c7bea
feat(spider-execution-manager): Add single-process supervisor pool fo…
LinZhihao-723 May 26, 2026
27091f0
feat(spider-execution-manager): Add scheduler, storage, and liveness …
LinZhihao-723 May 29, 2026
85a5130
feat(spider-execution-manager): Add liveness actor with session ID tr…
LinZhihao-723 Jun 2, 2026
100129e
refactor(huntsman): Unify `TaskId` by replacing `spider-core`'s defin…
LinZhihao-723 Jun 5, 2026
d95057f
refactor(spider-huntsman): Use auto-incrementing u64 IDs instead of U…
sitaowang1998 Jun 6, 2026
e030af8
feat(spider-execution-manager): Add the runtime that drives the main …
LinZhihao-723 Jun 7, 2026
0683275
build: Split the Ubuntu dev-dependency install script into common, hu…
LinZhihao-723 Jun 7, 2026
6e6727d
Merge branch 'main' into merge-main
sitaowang1998 Jun 7, 2026
e39e319
feat(huntsman): Add protobuf scaffolding and gRPC StorageClient for t…
sitaowang1998 Jun 7, 2026
a069c77
Merge branch 'main' into merge-main
sitaowang1998 Jun 7, 2026
29bbb6f
Merge remote-tracking branch 'upstream/storage-service-dev' into merg…
sitaowang1998 Jun 8, 2026
8015294
Add get recoverable jobs in db
sitaowang1998 Jun 8, 2026
a667a79
Add runtime recovery
sitaowang1998 Jun 8, 2026
da87450
Add unit tests
sitaowang1998 Jun 8, 2026
e3db65f
Address comment
sitaowang1998 Jun 8, 2026
5776c7d
Merge branch 'storage-service-dev' into job-recovery
sitaowang1998 Jun 8, 2026
022223f
Use RecoverableJob
sitaowang1998 Jun 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 89 additions & 1 deletion components/spider-storage/src/cache/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
job_submission::ValidatedJobSubmission,
task::TaskGraph,
},
db::InternalJobOrchestration,
db::{InternalJobOrchestration, RecoverableJob},
ready_queue::ReadyQueueSender,
task_instance_pool::{TaskInstanceMetadata, TaskInstancePoolConnector},
};
Expand Down Expand Up @@ -93,6 +93,94 @@ impl<
})
}

/// Recovers a job control block from persistent database state.
///
/// This constructor does not mutate the database. It rebuilds enough cache state to resume
/// scheduling:
///
/// * [`JobState::Running`] jobs enqueue their initially-ready regular tasks.
/// * [`JobState::CommitReady`] jobs enqueue the commit task.
/// * [`JobState::CleanupReady`] jobs enqueue the cleanup task.
///
/// # Returns
///
/// The recovered [`SharedJobControlBlock`] on success.
///
/// # Errors
///
/// Returns an error if:
///
/// * [`InternalError::UnexpectedJobState`] if `state` is not recoverable.
/// * [`InternalError::TaskGraphCorrupted`] if a commit-ready job has no persisted outputs.
/// * Forwards [`TaskGraph::create`]'s return values on failure.
/// * Forwards [`TaskGraph::restore_outputs`]'s return values on failure.
/// * Forwards [`SharedJobControlBlock::resend_ready_tasks`]'s return values on failure.
pub async fn recover(
recoverable_job: RecoverableJob,
ready_queue_sender: ReadyQueueSenderType,
db_connector: DbConnectorType,
task_instance_pool_connector: TaskInstancePoolConnectorType,
) -> Result<Self, CacheError> {
let RecoverableJob {
id,
resource_group_id,
state,
job_submission,
job_outputs,
} = recoverable_job;
if !matches!(
state,
JobState::Running | JobState::CommitReady | JobState::CleanupReady
) {
return Err(UnexpectedJobState {
current: state,
expected: JobState::Running,
}
.into());
}

let num_tasks = job_submission.task_graph().get_num_tasks();
let mut task_graph = TaskGraph::create(job_submission).await?;
if matches!(state, JobState::CommitReady) && job_outputs.is_none() {
return Err(InternalError::TaskGraphCorrupted(
"commit-ready job has no persisted outputs".to_owned(),
)
.into());
}
if let Some(outputs) = job_outputs {
task_graph.restore_outputs(outputs).await?;
}
Comment thread
sitaowang1998 marked this conversation as resolved.
let num_incomplete_tasks = if matches!(state, JobState::CommitReady) {
0
} else {
num_tasks
};

if matches!(state, JobState::CleanupReady) {
task_graph.cancel_non_terminal().await;
}

let job_execution_state = JobExecutionState {
state,
task_graph,
num_incomplete_tasks: AtomicUsize::new(num_incomplete_tasks),
ready_queue_sender,
db_connector,
task_instance_pool_connector,
};
let recovered = Self {
inner: Arc::new(JobControlBlock {
id,
owner_id: resource_group_id,
job_execution_state: JobExecutionStateHandle {
inner: tokio::sync::RwLock::new(job_execution_state),
},
}),
};
recovered.resend_ready_tasks().await?;
Ok(recovered)
}

/// Returns the job ID.
#[must_use]
pub fn id(&self) -> JobId {
Expand Down
7 changes: 7 additions & 0 deletions components/spider-storage/src/cache/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ impl<Type: Send + Sync> Reader<Type> {
Self { inner }
}

/// # Returns
///
/// A writer for the same shared data.
pub(crate) fn writer(&self) -> Writer<Type> {
Writer::new(self.inner.clone())
}

/// # Returns
///
/// A guard that allows read access to the shared data. The guard will be released when it goes
Expand Down
24 changes: 24 additions & 0 deletions components/spider-storage/src/cache/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,30 @@ impl TaskGraph {
&self.outputs
}

/// Restores graph outputs from persisted job outputs.
///
/// # Errors
///
/// Returns an error if:
///
/// * [`InternalError::TaskOutputsLengthMismatch`] if the number of persisted outputs does not
/// match the number of graph outputs.
pub async fn restore_outputs(
&self,
persisted_outputs: Vec<TaskOutput>,
) -> Result<(), InternalError> {
if persisted_outputs.len() != self.outputs.len() {
return Err(InternalError::TaskOutputsLengthMismatch(
self.outputs.len(),
persisted_outputs.len(),
));
}
for (output_reader, output) in self.outputs.iter().zip(persisted_outputs) {
*output_reader.writer().write().await = Some(output);
}
Comment on lines +193 to +195

@LinZhihao-723 LinZhihao-723 Jun 9, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you read this change carefully (assume it's done by your coding agent)?
The purpose of having the reader/writer wrapper around the RW lock is to split read/write access. If we need to mutate the output, we should design a new API, instead of performing write through a "reader".

@sitaowang1998 sitaowang1998 Jun 9, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually after my fight with codex. The original is to directly add a Vec<OutputWriter> to the TaskGraph. What do you propose for the new API?

Ok(())
}

#[must_use]
pub const fn has_commit_task(&self) -> bool {
self.commit_task.is_some()
Expand Down
1 change: 1 addition & 0 deletions components/spider-storage/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub use protocol::{
ExecutionManagerLivenessManagement,
ExternalJobOrchestration,
InternalJobOrchestration,
RecoverableJob,
ResourceGroupManagement,
SessionManagement,
};
9 changes: 9 additions & 0 deletions components/spider-storage/src/db/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub enum DbError {
#[error("Task graph serialization failure: {0}")]
TaskGraphSerializationFailure(#[source] Box<dyn std::error::Error + Send + Sync>),

#[error("Task graph deserialization failure: {0}")]
TaskGraphDeserializationFailure(#[source] Box<dyn std::error::Error + Send + Sync>),

#[error("Value serialization failure: {0}")]
ValueSerializationFailure(#[source] Box<dyn std::error::Error + Send + Sync>),

Expand All @@ -57,6 +60,12 @@ impl DbError {
Self::TaskGraphSerializationFailure(Box::new(e))
}

pub fn task_graph_de<DeserializationError: std::error::Error + Send + Sync + 'static>(
e: DeserializationError,
) -> Self {
Self::TaskGraphDeserializationFailure(Box::new(e))
}

pub fn value_ser<SerializationError: serde::ser::Error + Send + Sync + 'static>(
e: SerializationError,
) -> Self {
Expand Down
61 changes: 60 additions & 1 deletion components/spider-storage/src/db/mariadb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use const_format::formatcp;
use secrecy::ExposeSecret;
use spider_core::{
job::JobState,
task::TaskGraph,
types::{
id::{ExecutionManagerId, JobId, ResourceGroupId, SessionId},
io::TaskOutput,
io::{TaskInput, TaskOutput},
},
};
use spider_derive::MySqlEnum;
Expand All @@ -22,6 +23,7 @@ use crate::{
ExecutionManagerLivenessManagement,
ExternalJobOrchestration,
InternalJobOrchestration,
RecoverableJob,
ResourceGroupManagement,
SessionManagement,
error::ExpectedStates,
Expand Down Expand Up @@ -380,6 +382,63 @@ impl InternalJobOrchestration for MariaDbStorageConnector {
tx.commit().await?;
Ok(deleted_job_ids)
}

async fn get_recoverable_jobs(&self) -> Result<Vec<RecoverableJob>, DbError> {
const SELECT_QUERY: &str = formatcp!(
"SELECT `id`, `resource_group_id`, `state`, `serialized_task_graph`, \
`serialized_job_inputs`, `serialized_job_outputs` FROM `{table}` WHERE `state` IN \
('{running_state}','{commit_ready_state}','{cleanup_ready_state}');",
table = JOBS_TABLE_NAME,
running_state = JobState::Running.as_str(),
commit_ready_state = JobState::CommitReady.as_str(),
cleanup_ready_state = JobState::CleanupReady.as_str(),
);

let rows = sqlx::query_as::<
_,
(
JobId,
ResourceGroupId,
JobState,
String,
Vec<u8>,
Option<Vec<u8>>,
),
>(SELECT_QUERY)
.fetch_all(&self.pool)
.await?;
Comment on lines +397 to +409

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Bound recovery reads to avoid startup memory blow-ups.

Line 397 fetches all recoverable rows (including blob payloads) into memory, then Line 411 materializes another vector. Under a large in-flight backlog, restart recovery can spike memory and fail availability. Please switch to paged/streamed recovery (e.g., ORDER BY id LIMIT ? loop).

Also applies to: 411-440

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/spider-storage/src/db/mariadb.rs` around lines 397 - 409, The
recovery currently calls sqlx::query_as(...,
SELECT_QUERY).fetch_all(&self.pool).await? which loads all rows (and blobs) into
memory and then re-materializes them later; change this to a paged/streamed
recovery: modify SELECT_QUERY (or use a variant) to include ORDER BY id LIMIT ?
and implement an async loop that fetches pages (using fetch/fetch_many or fetch
+ try_next) and processes each row as it arrives, advancing by the last-seen id
(or using offset) until no more rows remain; remove the fetch_all + intermediate
vector materialization and apply the same streamed/page-loop pattern to the
other recovery block referenced around the 411-440 region so blobs are handled
incrementally rather than all at once.


rows.into_iter()
.map(
|(
id,
resource_group_id,
state,
serialized_task_graph,
serialized_job_inputs,
serialized_job_outputs,
)| {
let task_graph = TaskGraph::from_json(&serialized_task_graph)
.map_err(DbError::task_graph_de)?;
let job_inputs: Vec<TaskInput> =
rmp_serde::from_slice(&serialized_job_inputs).map_err(DbError::value_de)?;
let job_submission = ValidatedJobSubmission::create(task_graph, job_inputs)
.map_err(|e| DbError::CorruptedDbState(e.to_string()))?;
let job_outputs = serialized_job_outputs
.map(|outputs| rmp_serde::from_slice(&outputs).map_err(DbError::value_de))
.transpose()?;

Comment thread
sitaowang1998 marked this conversation as resolved.
Ok(RecoverableJob {
id,
resource_group_id,
state,
job_submission,
job_outputs,
})
},
)
.collect()
}
}

#[async_trait]
Expand Down
33 changes: 33 additions & 0 deletions components/spider-storage/src/db/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@ use spider_core::{

use crate::{cache::job_submission::ValidatedJobSubmission, db::error::DbError};

/// A job persisted in the database that should be rebuilt in the storage cache on startup.
///
/// Only jobs that have already started execution are recoverable. [`JobState::Ready`] jobs remain
/// database-only until a client starts them.
pub struct RecoverableJob {
/// The persisted job ID.
pub id: JobId,
/// The owning resource group.
pub resource_group_id: ResourceGroupId,
/// The source-of-truth database state.
pub state: JobState,
/// The original job submission.
pub job_submission: ValidatedJobSubmission,
/// The committed job outputs, if the job has reached the commit phase.
pub job_outputs: Option<Vec<TaskOutput>>,
}
Comment thread
sitaowang1998 marked this conversation as resolved.

/// The database storage interface. A database storage must implement the following traits:
///
/// * [`ExternalJobOrchestration`]
Expand Down Expand Up @@ -244,6 +261,22 @@ pub trait InternalJobOrchestration: Clone + Send + Sync {
&self,
expire_after_sec: u64,
) -> Result<Vec<JobId>, DbError>;

/// Gets all jobs that should be recovered into the cache.
///
/// # Returns
///
/// All persisted jobs in [`JobState::Running`], [`JobState::CommitReady`], or
/// [`JobState::CleanupReady`] on success.
///
/// # Errors
///
/// Returns an error if:
///
/// * [`DbError::TaskGraphDeserializationFailure`] if a persisted task graph is invalid.
/// * [`DbError::ValueDeserializationFailure`] if persisted inputs or outputs are invalid.
/// * Forwards [`sqlx::error::Error`] on DB operation failure.
async fn get_recoverable_jobs(&self) -> Result<Vec<RecoverableJob>, DbError>;
}

/// Defines the storage interface for resource group management in the database.
Expand Down
60 changes: 57 additions & 3 deletions components/spider-storage/src/state/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

use crate::{
cache::error::{CacheError, InternalError},
cache::{
error::{CacheError, InternalError},
job::SharedJobControlBlock,
},
config::DatabaseConfig,
db::{DbStorage, MariaDbStorageConnector, SessionManagement},
ready_queue::{ReadyQueueConfig, ReadyQueueSender, ReadyQueueSenderHandle, create_ready_queue},
Expand Down Expand Up @@ -121,11 +124,16 @@ pub async fn create_runtime(
)
.map_err(CacheError::from)?;

// TODO: Recover jobs from the database.
let job_cache = recover_job_cache(
&db,
ready_queue_sender.clone(),
task_instance_pool_connector.clone(),
)
.await?;
let service_state = ServiceState::new(
db,
session_id,
JobCache::new(),
job_cache,
ready_queue_sender,
ready_queue_receiver,
task_instance_pool_connector,
Expand All @@ -144,6 +152,52 @@ pub async fn create_runtime(

const STOP_BACKGROUND_TASKS_TIMEOUT_SEC: u64 = 30;

/// Recovers jobs from persistent storage into the cache.
///
/// # Returns
///
/// A [`JobCache`] containing all recoverable jobs on success.
///
/// # Errors
///
/// Returns an error if:
///
/// * Forwards [`DbStorage::get_recoverable_jobs`]'s return values on failure.
/// * Forwards [`SharedJobControlBlock::recover`]'s return values on failure.
/// * Forwards [`JobCache::insert`]'s return values on failure.
async fn recover_job_cache<
ReadyQueueSenderType: ReadyQueueSender,
DbConnectorType: DbStorage,
TaskInstancePoolConnectorType: TaskInstancePoolConnector,
>(
db: &DbConnectorType,
ready_queue_sender: ReadyQueueSenderType,
task_instance_pool_connector: TaskInstancePoolConnectorType,
) -> Result<
JobCache<ReadyQueueSenderType, DbConnectorType, TaskInstancePoolConnectorType>,
StorageServerError,
> {
let job_cache = JobCache::new();
for recoverable_job in db.get_recoverable_jobs().await? {
let id = recoverable_job.id;
let state = recoverable_job.state;
let jcb = SharedJobControlBlock::recover(
recoverable_job,
ready_queue_sender.clone(),
db.clone(),
task_instance_pool_connector.clone(),
)
.await?;
job_cache.insert(jcb).await?;
tracing::info!(
job_id = ? id,
job_state = ? state,
"Job recovered into cache.",
);
}
Ok(job_cache)
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down
Loading
Loading