Skip to content

feat(spider-storage): Adds job recovery for restarted storage cache layer.#339

Open
sitaowang1998 wants to merge 18 commits into
y-scope:storage-service-devfrom
sitaowang1998:job-recovery
Open

feat(spider-storage): Adds job recovery for restarted storage cache layer.#339
sitaowang1998 wants to merge 18 commits into
y-scope:storage-service-devfrom
sitaowang1998:job-recovery

Conversation

@sitaowang1998

@sitaowang1998 sitaowang1998 commented Jun 8, 2026

Copy link
Copy Markdown
Collaborator

Description

This PR adds recovery of jobs in Running, CommitReady and CleanupReady states when the storage runtime restarts. All recoverable jobs are fetched from the database and filled into the cache layer and the ready queue.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Tests added that restarts the storage runtime.
  • GitHub workflows pass.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added execution manager runtime for per-node task coordination and lifecycle management
    • Implemented task executor subprocess with framed binary communication protocol
    • Added job recovery from persistent storage on runtime startup
    • Introduced session tracking for improved request reliability
    • Added gRPC storage client for task instance management
  • Bug Fixes & Improvements

    • Updated ID system for improved performance and consistency
    • Enhanced workflow checks to validate generated protocol code integrity
    • Improved development setup with modular Ubuntu installation scripts
  • Testing

    • Added comprehensive integration test suite for execution manager runtime
    • Added integration tests for task executor subprocess communication
    • Added process pool recovery and timeout handling tests
    • Added job recovery validation tests
  • Chores

    • Updated Cargo workspace to include new components and test crates
    • Enhanced build tasks for protocol code generation and multi-threaded testing
    • Reorganized development dependency installation scripts

LinZhihao-723 and others added 15 commits May 24, 2026 18:46
…acker; Refactor integration tests to extract common helpers into `test-utils`. (y-scope#328)
…UIDv7 for database-generated IDs. (y-scope#337)

Co-authored-by: LinZhihao-723 <pleiades3190@gmail.com>
…ntsman, and wolf variants. (y-scope#336)

Co-authored-by: sitaowang1998 <sitaowang1998@outlook.com>
…ask instance management: (y-scope#333)

* Add `spider-proto` component for protobuf source files.
* Add `spider-proto-rust` component for the generated Rust code.
* Add tasks to build the generated Rust code.
* Add a GitHub workflow to verify the generated Rust code is up-to-date.
* Implement a gRPC-backed `StorageClient` for the execution manager.
Co-authored-by: LinZhihao-723 <pleiades3190@gmail.com>
@sitaowang1998 sitaowang1998 requested a review from a team as a code owner June 8, 2026 17:12
@coderabbitai

coderabbitai Bot commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: f302817c-c0cc-4da6-95db-efee834af35f

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/spider-storage/src/state/runtime.rs (1)

119-133: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Cancel or abort the spawned pool task when recovery fails.

create_runtime spawns the task-instance pool before recovery. If recovery fails on Line 132, the function returns early and drops task_instance_pool_join_handle without shutdown, leaving the pool task running in the background.

Suggested fix
-    let (task_instance_pool_connector, task_instance_pool_join_handle) = create_task_instance_pool(
+    let (task_instance_pool_connector, mut task_instance_pool_join_handle) = create_task_instance_pool(
         ready_queue_sender.clone(),
         db.clone(),
         cancellation_token.clone(),
         task_instance_pool_config,
     )
     .map_err(CacheError::from)?;
-
-    let job_cache = recover_job_cache(
-        &db,
-        ready_queue_sender.clone(),
-        task_instance_pool_connector.clone(),
-    )
-    .await?;
+    let job_cache = match recover_job_cache(
+        &db,
+        ready_queue_sender.clone(),
+        task_instance_pool_connector.clone(),
+    )
+    .await
+    {
+        Ok(cache) => cache,
+        Err(e) => {
+            cancellation_token.cancel();
+            task_instance_pool_join_handle.abort();
+            return Err(e);
+        }
+    };
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/spider-storage/src/state/runtime.rs` around lines 119 - 133,
create_runtime spawns the task-instance pool via create_task_instance_pool and
then calls recover_job_cache; if recovery fails you must shut down the pool
instead of returning with the join handle still running. On any early return
after calling create_task_instance_pool (e.g., when recover_job_cache returns
Err), call the cancellation mechanism used by the pool (e.g.,
cancellation_token.cancel()) and then ensure task_instance_pool_join_handle is
awaited or aborted (call join/await or abort on the join handle) before
returning the error; reference create_runtime, create_task_instance_pool,
recover_job_cache, task_instance_pool_join_handle and cancellation_token to
locate the fix.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In @.github/workflows/proto-generated-code-checks.yaml:
- Around line 11-23: Add a top-level permissions block and disable credential
persistence on the checkout step to reduce token exposure: add a top-level
permissions: block (e.g., permissions: contents: read) alongside the existing
concurrency block, and in the proto-code-committed job update the
actions/checkout step (the uses: "actions/checkout@...") to include with:
persist-credentials: false (and keep submodules: "recursive"); ensure the job
name proto-code-committed and concurrency block remain unchanged.

In `@components/spider-execution-manager/src/runtime.rs`:
- Around line 395-406: The code in Outcome::InTaskFailure (runtime.rs) treats
the msgpack-encoded error bytes as UTF-8; instead deserialize the bytes as the
documented ExecutorError (the type defined where process_pool.rs documents
msgpack encoding) using a msgpack deserializer (e.g.
rmp_serde::from_slice::<ExecutorError>(&error)), then format the resulting
ExecutorError (Display or Debug) into the Failure message; if deserialization
fails, fall back to a safe binary-to-string (or hex) fallback so the Failure
still contains useful info. Ensure you reference the ExecutorError type and add
the necessary use/imports and error-handling around the deserialization in the
Outcome::InTaskFailure branch.

In `@components/spider-proto/storage/storage.proto`:
- Line 3: The proto's declared package "storage" doesn't match Buf's
package-directory mapping, causing PACKAGE_DIRECTORY_MATCH failures; fix by
either updating the proto package declaration to the correct fully-qualified
package that corresponds to its directory layout or adjust the Buf
module/package-directory mapping so this proto's directory maps to the "storage"
package; specifically edit the "package storage;" line in the storage.proto (or
the Buf module config in buf.yaml/buf.work) so the package name and the
repository directory mapping align.

In `@components/spider-storage/src/cache/job.rs`:
- Around line 157-159: During recovery, defend against inconsistent rows where
state == JobState::CommitReady but job_outputs is None by adding an explicit
check before calling task_graph.restore_outputs: if recovered state is
CommitReady and job_outputs.is_none(), return an error (or otherwise reject the
row) instead of silently continuing; locate the recovery logic around the
variables job_outputs and the call task_graph.restore_outputs(outputs).await?
and add the guard that validates this combination (or convert it to a safe
fallback) so the code does not enqueue/complete the commit task via
succeed_commit_task_instance while leaving output slots unset.

In `@components/spider-storage/src/db/mariadb.rs`:
- Around line 397-409: The recovery currently calls sqlx::query_as(...,
SELECT_QUERY).fetch_all(&self.pool).await? which loads all rows (and blobs) into
memory and then re-materializes them later; change this to a paged/streamed
recovery: modify SELECT_QUERY (or use a variant) to include ORDER BY id LIMIT ?
and implement an async loop that fetches pages (using fetch/fetch_many or fetch
+ try_next) and processes each row as it arrives, advancing by the last-seen id
(or using offset) until no more rows remain; remove the fetch_all + intermediate
vector materialization and apply the same streamed/page-loop pattern to the
other recovery block referenced around the 411-440 region so blobs are handled
incrementally rather than all at once.
- Around line 427-430: During recovery, enforce the invariant that
JobState::CommitReady must have outputs: check after deserializing
serialized_job_outputs (the serialized_job_outputs -> job_outputs step) and if
the row state is JobState::CommitReady but serialized_job_outputs is None,
return a DbError (e.g. an invariant/validation error) instead of producing
job_outputs: None; update the recovery code path that builds job_outputs (the
map/transpose block) to explicitly validate the state and fail fast when
CommitReady lacks outputs so corrupted rows are caught at DB-recovery time.

In `@components/spider-task-executor/src/bin/spider_task_executor.rs`:
- Around line 73-77: The code currently joins the raw package string into
pkg_dir and can be exploited via path traversal (e.g., "../../...") before
calling manager.load; update the resolution in the block that uses manager.get,
pkg_dir, package and manager.load to validate/sanitize package first — either
reject any package containing path separators or ".." and restrict to a safe
filename pattern (alphanumeric, hyphen/underscore), or build the full path then
canonicalize it and assert the canonicalized path starts with the canonicalized
pkg_dir before calling manager.load; return an error if validation fails so no
unintended .so can be loaded.

In `@tests/huntsman/test-utils/src/executor.rs`:
- Around line 100-107: Wrap the unbounded awaits in recv(), try_recv(),
shutdown_clean(), and wait_for_exit() with tokio::time::timeout using a CI-safe
Duration so tests fail fast instead of hanging: replace direct awaits on
self.responses.next().await in recv() and try_recv() and on
self.child.wait().await in shutdown_clean() and wait_for_exit() with
timeout(duration, ...).await, handle the Err(timeout) case by panicking or
returning a clear error message specifying which method timed out (include the
method name like recv/try_recv/shutdown_clean/wait_for_exit in the message) and
unwrap/propagate the Ok(inner) result as before; keep existing
deserialization/error unwraps for non-timeout failures.

In `@tools/scripts/lib_install/ubuntu/install-dev-common.sh`:
- Line 29: Replace the unpinned bootstrap "curl -LsSf
https://astral.sh/uv/install.sh | sh" with a safer approach: either remove the
auto-install and document "uv" as a required pre-req for developers, or fetch a
specific uv release artifact (referencing the uv release version) and verify its
checksum/signature before installation (download pinned tarball/installer +
checksum file, verify, then run installer). Ensure the change targets the exact
command string "curl -LsSf https://astral.sh/uv/install.sh | sh" and update any
comments or README to reflect the new pre-req or pinned-install flow.
- Around line 6-9: The script enables strict modes with "set -e" and "set -u"
but misses pipefail, so failures in pipelines (e.g., the curl | sh pipeline that
installs "uv") can be masked; add "set -o pipefail" (or change to "set -euo
pipefail") alongside the existing "set -e" and "set -u" so the pipeline failure
causes the script to exit and the "uv" installation cannot be silently skipped.

---

Outside diff comments:
In `@components/spider-storage/src/state/runtime.rs`:
- Around line 119-133: create_runtime spawns the task-instance pool via
create_task_instance_pool and then calls recover_job_cache; if recovery fails
you must shut down the pool instead of returning with the join handle still
running. On any early return after calling create_task_instance_pool (e.g., when
recover_job_cache returns Err), call the cancellation mechanism used by the pool
(e.g., cancellation_token.cancel()) and then ensure
task_instance_pool_join_handle is awaited or aborted (call join/await or abort
on the join handle) before returning the error; reference create_runtime,
create_task_instance_pool, recover_job_cache, task_instance_pool_join_handle and
cancellation_token to locate the fix.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 64bda151-1585-4e6b-b36b-5f2c6b52e139

📥 Commits

Reviewing files that changed from the base of the PR and between 8db54d7 and da87450.

⛔ Files ignored due to path filters (2)
  • Cargo.lock is excluded by !**/*.lock
  • components/spider-proto-rust/src/generated/storage.rs is excluded by !**/generated/**
📒 Files selected for processing (74)
  • .devcontainer/Dockerfile
  • .github/workflows/code-linting-checks.yaml
  • .github/workflows/proto-generated-code-checks.yaml
  • .github/workflows/tests.yaml
  • Cargo.toml
  • components/spider-core/Cargo.toml
  • components/spider-core/src/lib.rs
  • components/spider-core/src/session.rs
  • components/spider-core/src/types/id.rs
  • components/spider-execution-manager/Cargo.toml
  • components/spider-execution-manager/src/client.rs
  • components/spider-execution-manager/src/client/grpc/mod.rs
  • components/spider-execution-manager/src/client/grpc/storage.rs
  • components/spider-execution-manager/src/client/liveness.rs
  • components/spider-execution-manager/src/client/scheduler.rs
  • components/spider-execution-manager/src/client/storage.rs
  • components/spider-execution-manager/src/lib.rs
  • components/spider-execution-manager/src/liveness.rs
  • components/spider-execution-manager/src/process_pool.rs
  • components/spider-execution-manager/src/runtime.rs
  • components/spider-proto-rust/Cargo.toml
  • components/spider-proto-rust/build.rs
  • components/spider-proto-rust/src/id.rs
  • components/spider-proto-rust/src/lib.rs
  • components/spider-proto/storage/storage.proto
  • components/spider-storage/Cargo.toml
  • components/spider-storage/src/cache.rs
  • components/spider-storage/src/cache/job.rs
  • components/spider-storage/src/cache/sync.rs
  • components/spider-storage/src/cache/task.rs
  • components/spider-storage/src/db.rs
  • components/spider-storage/src/db/error.rs
  • components/spider-storage/src/db/mariadb.rs
  • components/spider-storage/src/db/protocol.rs
  • components/spider-storage/src/state/job_cache.rs
  • components/spider-storage/src/state/runtime.rs
  • components/spider-storage/src/state/service.rs
  • components/spider-storage/src/state/test_utils.rs
  • components/spider-storage/src/task_instance_pool.rs
  • components/spider-storage/tests/mariadb_infra.rs
  • components/spider-storage/tests/mariadb_test.rs
  • components/spider-storage/tests/recovery_test.rs
  • components/spider-storage/tests/scheduling_infra.rs
  • components/spider-storage/tests/test_spider_storage.rs
  • components/spider-task-executor/Cargo.toml
  • components/spider-task-executor/src/bin/spider_task_executor.rs
  • components/spider-task-executor/src/error.rs
  • components/spider-task-executor/src/lib.rs
  • components/spider-task-executor/src/manager.rs
  • components/spider-task-executor/src/protocol.rs
  • components/spider-tdl/src/task.rs
  • components/spider-tdl/src/task_context.rs
  • components/spider-tdl/tests/test_task_macro.rs
  • taskfiles/build.yaml
  • taskfiles/lint.yaml
  • taskfiles/test.yaml
  • tests/huntsman/em-runtime/Cargo.toml
  • tests/huntsman/em-runtime/src/lib.rs
  • tests/huntsman/em-runtime/tests/test_runtime.rs
  • tests/huntsman/integration-test-tasks/Cargo.toml
  • tests/huntsman/integration-test-tasks/src/lib.rs
  • tests/huntsman/task-executor/Cargo.toml
  • tests/huntsman/task-executor/src/lib.rs
  • tests/huntsman/task-executor/tests/overhead_instrument.rs
  • tests/huntsman/task-executor/tests/test_executor.rs
  • tests/huntsman/task-executor/tests/test_process_pool.rs
  • tests/huntsman/tdl-integration/tests/complex.rs
  • tests/huntsman/test-utils/Cargo.toml
  • tests/huntsman/test-utils/src/executor.rs
  • tests/huntsman/test-utils/src/lib.rs
  • tests/huntsman/test-utils/src/mock.rs
  • tools/scripts/lib_install/ubuntu/install-dev-common.sh
  • tools/scripts/lib_install/ubuntu/install-dev-huntsman.sh
  • tools/scripts/lib_install/ubuntu/install-dev-wolf.sh
💤 Files with no reviewable changes (1)
  • components/spider-storage/src/cache.rs

Comment thread .github/workflows/proto-generated-code-checks.yaml
Comment thread components/spider-execution-manager/src/runtime.rs
Comment thread components/spider-proto/storage/storage.proto
Comment thread components/spider-storage/src/cache/job.rs
Comment on lines +397 to +409
let rows = sqlx::query_as::<
_,
(
JobId,
ResourceGroupId,
JobState,
String,
Vec<u8>,
Option<Vec<u8>>,
),
>(SELECT_QUERY)
.fetch_all(&self.pool)
.await?;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Bound recovery reads to avoid startup memory blow-ups.

Line 397 fetches all recoverable rows (including blob payloads) into memory, then Line 411 materializes another vector. Under a large in-flight backlog, restart recovery can spike memory and fail availability. Please switch to paged/streamed recovery (e.g., ORDER BY id LIMIT ? loop).

Also applies to: 411-440

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/spider-storage/src/db/mariadb.rs` around lines 397 - 409, The
recovery currently calls sqlx::query_as(...,
SELECT_QUERY).fetch_all(&self.pool).await? which loads all rows (and blobs) into
memory and then re-materializes them later; change this to a paged/streamed
recovery: modify SELECT_QUERY (or use a variant) to include ORDER BY id LIMIT ?
and implement an async loop that fetches pages (using fetch/fetch_many or fetch
+ try_next) and processes each row as it arrives, advancing by the last-seen id
(or using offset) until no more rows remain; remove the fetch_all + intermediate
vector materialization and apply the same streamed/page-loop pattern to the
other recovery block referenced around the 411-440 region so blobs are handled
incrementally rather than all at once.

Comment thread components/spider-storage/src/db/mariadb.rs
Comment on lines +73 to +77
let pkg = if let Some(pkg) = manager.get(package) {
pkg
} else {
let path = pkg_dir.join(package).join(format!("lib{package}.so"));
manager.load(&path)?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Block path traversal in package resolution.

Line 76 joins tdl_context.package directly into a filesystem path. A crafted value like ../../tmp/evil can escape SPIDER_TDL_PACKAGE_DIR and load an unintended shared object.

Suggested patch
 use std::{
-    path::{Path, PathBuf},
+    path::{Component, Path, PathBuf},
     time::Instant,
 };
@@
 fn run_task(
@@
 ) -> Result<Vec<u8>, ExecutorError> {
+    let mut comps = Path::new(package).components();
+    let is_single_normal_component =
+        matches!(comps.next(), Some(Component::Normal(_))) && comps.next().is_none();
+    if !is_single_normal_component {
+        return Err(ExecutorError::InvalidLibrary(format!(
+            "invalid package name: {package}"
+        )));
+    }
+
     let pkg = if let Some(pkg) = manager.get(package) {
         pkg
     } else {
         let path = pkg_dir.join(package).join(format!("lib{package}.so"));
         manager.load(&path)?
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/spider-task-executor/src/bin/spider_task_executor.rs` around lines
73 - 77, The code currently joins the raw package string into pkg_dir and can be
exploited via path traversal (e.g., "../../...") before calling manager.load;
update the resolution in the block that uses manager.get, pkg_dir, package and
manager.load to validate/sanitize package first — either reject any package
containing path separators or ".." and restrict to a safe filename pattern
(alphanumeric, hyphen/underscore), or build the full path then canonicalize it
and assert the canonicalized path starts with the canonicalized pkg_dir before
calling manager.load; return an error if validation fails so no unintended .so
can be loaded.

Comment thread tests/huntsman/test-utils/src/executor.rs
Comment thread tools/scripts/lib_install/ubuntu/install-dev-common.sh
Comment thread tools/scripts/lib_install/ubuntu/install-dev-common.sh
Comment thread components/spider-storage/src/cache/job.rs Outdated
Comment on lines +193 to +195
for (output_reader, output) in self.outputs.iter().zip(persisted_outputs) {
*output_reader.writer().write().await = Some(output);
}

@LinZhihao-723 LinZhihao-723 Jun 9, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you read this change carefully (assume it's done by your coding agent)?
The purpose of having the reader/writer wrapper around the RW lock is to split read/write access. If we need to mutate the output, we should design a new API, instead of performing write through a "reader".

@sitaowang1998 sitaowang1998 Jun 9, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually after my fight with codex. The original is to directly add a Vec<OutputWriter> to the TaskGraph. What do you propose for the new API?

Comment thread components/spider-storage/src/db/protocol.rs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants