Skip to content

[rust-scheduler] Performance and Architectural Optimizations for Cue-Scheduler#2320

Draft
ramonfigueiredo wants to merge 13 commits into
AcademySoftwareFoundation:masterfrom
ramonfigueiredo:rust-scheduler-perf-and-architecture
Draft

[rust-scheduler] Performance and Architectural Optimizations for Cue-Scheduler#2320
ramonfigueiredo wants to merge 13 commits into
AcademySoftwareFoundation:masterfrom
ramonfigueiredo:rust-scheduler-perf-and-architecture

Conversation

@ramonfigueiredo
Copy link
Copy Markdown
Collaborator

@ramonfigueiredo ramonfigueiredo commented May 14, 2026

Related Issues

Summarize your change.

Implements all seven targeted improvements proposed in #2318 across the Rust scheduler crate. Each item is its own commit so the history is bisectable and individual changes can be reverted independently if needed.

  1. Shard HostCache::hosts_index lock
  • The bucket directory was a single RwLock<BTreeMap<CoreKey, MemoryBTree>> that every check_out / check_in serialized through.
  • Wrap each per-CoreKey bucket in its own Arc<Mutex> so disjoint core sizes proceed in parallel.
  • The outer RwLock now only guards the (rarely changing) bucket directory and is read on every operation; check_in fast-paths and only upgrades to outer write when a brand-new CoreKey appears.
  • Added a concurrent-check-ins regression test.
  1. Replace LayerPermitService with FOR UPDATE SKIP LOCKED
  • The in-memory LayerPermitService Actix actor could only deduplicate within a single scheduler process: multi-replica deployments raced.
  • Replace it with a per-layer row lock acquired via SELECT pk_layer FROM layer WHERE pk_layer = $1 FOR UPDATE SKIP LOCKED held in a transaction (LayerLockGuard) for the duration of layer processing.
  • Multi-replica coordination now lives in Postgres directly.
  • LayerPermitService and its message types are deleted.
  • The matcher's concurrency_semaphore is halved (pool_size / 2 - 1) because each concurrent layer now holds two connections (lock tx + per-proc tx).
  1. Remove redundant outer transaction in dispatcher
  • Handler previously opened an outer transaction purely to scope a session-level host advisory lock around dispatch, holding a pool connection across every per-frame transaction, gRPC call, and compensation step.
  • Switch the host lock from session-level pg_try_advisory_lock to transaction-scoped pg_try_advisory_xact_lock and acquire it inside each per-proc transaction.
  • The lock now auto-releases on COMMIT/ROLLBACK, hold time is bounded to the small DB-only critical section, and no connection is held during gRPC.
  • The outer transaction is gone; host_dao::lock and host_dao::unlock are deleted.
  1. Make reservation TTL a safety net only
  • The 10s reservation TTL silently let reservations expire during slow dispatches (>10s RQD calls, retries): a second layer could then grab the same host.
  • The eager check_in / Invalidate release path already exists; the timer was meant only to catch leaked reservations.
  • Raise the TTL to 5 min via new HostCacheConfig::host_reservation_safety_ttl (humantime-serde) and document it as a leak-recovery net.
  1. Drop Actix actor layer
  • HostCacheService and RqdDispatcherService were Actix actors whose Handler impls did nothing but await an async block: no actor state, no message ordering, no supervision.
  • Refactor both to plain Clone structs with regular async methods:
  • HostCacheService exposes check_out / check_in_payload / cache_ratio directly.
  • Periodic refresh and cleanup loops live in spawn_background_tasks (tokio::spawn).
  • RqdDispatcherService exposes dispatch_layer.
  • OnceCell singletons hold the service value directly (cheap Clone) instead of Addr<...>.
  • Matcher call sites become direct .method().await: no more mailbox round-trips, heap-allocated message envelopes, or "Actor is unresponsive" expects.
  • main.rs drops actix::System; runtime.block_on(async_main()) is used directly.
  • The actix dependency is removed from Cargo.toml.

Marker message structs (CheckOut, CheckIn, CacheRatio, DispatchLayerMessage) are deleted or trimmed to data-only types kept for parameter / response grouping.

  1. Host-cache DB-failure circuit breaker
  • matcher.rs previously panicked when HostCacheService failed to query the DB: a single transient hiccup rolled back every in-flight dispatch.
  • Add a DbCircuitBreaker wrapping fetch_group_data:
  • On success: reset the failure counter and close the breaker.
  • On failure: bump the counter and open the breaker for an exponentially growing window (base 500ms, capped at 30s).
  • Concurrent callers that hit an open breaker short-circuit without retrying the DB.
  • After CONFIG.host_cache.db_circuit_breaker.failure_threshold (10) consecutive failures the host cache logs and process::exit(1)s: the orchestration layer handles the restart, no panic backtrace.
  • matcher.rs now warns and skips the layer on FailedToQueryHostCache; other clusters keep dispatching.
  • Added 3 unit tests for the breaker state machine.
  1. Wire dead metrics to Prometheus
  • HOSTS_ATTEMPTED and WASTED_ATTEMPTS were AtomicUsize counters in pipeline/matcher.rs that were incremented on every host-candidate attempt and wasted job cycle but never exposed.
  • Add scheduler_hosts_attempted_total and scheduler_wasted_attempts_total Counters in metrics/mod.rs and bump them alongside the atomics.

Review feedback addressed

  • cluster.rs: dedupe filtered clusters via HashSet before seeding the priority heap (filter_clusters can fold distinct inputs onto the same identity); make the dispatch publish path preemptible (sender.reserve() raced against the stop notify) and only bump CLUSTER_ROUNDS / cluster_polls_total once a Permit is held.
  • metrics: cache per-cluster Counter / Gauge handles in two scc::HashMap<(Uuid, Uuid), _> statics so Uuid::to_string() allocations happen at most once per (show, facility) pair, not on every dispatch loop pop.
  • metrics: new counter: scheduler_layers_skipped_by_lock_total exposes how often a layer is skipped because another scheduler holds its row lock. Expected and healthy in multi-replica deployments, distinct from real waste.
  • metrics: tightened semantics: scheduler_wasted_attempts_total no longer counts jobs whose layers were all peer-locked. It now strictly means "job processed zero layers AND none were skipped by a peer lock", i.e. a real no-host-candidate miss. Existing alerts may need re-baselining.
  • entrypoint.rs: fold fetch-error cycles into the same cycles_without_jobs atomic the success path uses so the empty_job_cycles_before_quiting safety net actually trips when the DB is persistently unhealthy (previously the Err arm returned (0, false) and never incremented the counter, defeating the comment that promised the safety net).
  • tests/stress_tests.rs: run on Tokio's multi-thread runtime (worker_threads = 8) so the new contention paths (sharded HostCache lock, pg_try_advisory_xact_lock races, SKIP-LOCKED layer locking) are actually exercised — #[tokio::test] defaults to current-thread, which made the contention tests effective no-ops.

…scheduling

Fixes AcademySoftwareFoundation#2315.

The ClusterFeed previously iterated clusters with a fixed-step round-robin and applied lap-based 10ms/100ms/5s sleeps between rounds. Every cluster received equal air-time regardless of work, and a sleeping cluster added delay to neighboring clusters.

Replace the Vec<Cluster> + AtomicUsize index with a BinaryHeap<Scheduled> ordered by (next_eligible_at asc, last_dispatched_jobs desc). The feed now pops the highest-priority cluster, sleeps until eligible (preemptible via Notify), emits, and waits for a Done message that re-inserts the cluster with updated stats. Lap-based sleep tiers and the parallel sleep_map are removed; eligibility now lives on each heap entry.

Cluster feed changes (cluster.rs):
- Added Scheduled { cluster, next_eligible_at, last_dispatched_jobs } with Ord prioritizing earliest eligibility, then busiest cluster on ties (productivity bias).
- ClusterFeed now holds Arc<Mutex<BinaryHeap<Scheduled>>> + Notify.
- FeedMessage::Sleep / Stop() collapsed into FeedMessage::Done { cluster, processed_jobs, sleep } + unit-variant Stop.
- stream() dispatch loop now pops, sleeps with preemption, and emits.
- Control loop re-inserts on Done and notifies the dispatch loop.
- Added ClusterFeed::load_from_clusters shim for tests.

Consumer (pipeline/entrypoint.rs):
- Always sends FeedMessage::Done with processed_jobs and optional back-off.
- Preserves should_stop / empty_job_cycles logic.

Config (config/mod.rs):
- Added cluster_empty_back_off: Duration (default 3s), replacing hardcoded entrypoint back-off.
- Added cluster_productivity_bias: bool (default true), toggling busy-cluster tie-breaking.

Observability (metrics/mod.rs):
- Added scheduler_cluster_polls_total {show_id, facility_id} CounterVec.
- Added scheduler_cluster_rounds_total Counter (mirrors CLUSTER_ROUNDS).
- Added scheduler_cluster_last_dispatched_jobs {show_id, facility_id} Gauge, updated from the Done handler to expose the productivity-bias signal.

Tests:
- Added 6 unit tests covering Scheduled ordering, sleep-back-off timing, Done round-trip, and a 500ms mixed-workload proportionality test asserting busy clusters receive >2x the polls of idle ones.
- Updated tests/util.rs for the two new QueueConfig fields.
- Scheduled::cmp now always compares last_dispatched_jobs as a final tiebreaker so Ord agrees with PartialEq regardless of the cluster_productivity_bias toggle. Required by BinaryHeap.
- Control loop now sets stop_flag and notifies the dispatcher when the control channel closes (e.g. consumer panic), preventing the dispatch loop from spinning on the empty-queue wake cycle.
- Added ord_and_eq_stay_consistent_for_differing_jobs regression test.
Addresses issue AcademySoftwareFoundation#2318 (item 7).

These atomic counters in pipeline/matcher.rs were incremented on every
host candidate selection and every wasted job cycle, but never exposed
through the /metrics endpoint, so they had no operational value.

Add scheduler_hosts_attempted_total and scheduler_wasted_attempts_total
Counters in metrics/mod.rs and bump them alongside the existing atomics.
HOSTS_ATTEMPTED / WASTED_ATTEMPTS are kept for source-level compatibility
with smoke tests that read them directly; CLUSTER_ROUNDS was already
wired to Prometheus in AcademySoftwareFoundation#2315.
Addresses issue AcademySoftwareFoundation#2318 (item 4).

The 10s reservation TTL in HostCacheService is fragile: dispatches that
take >10s (slow RQD, retries) silently let the reservation expire,
opening the door for a second layer to grab the same host. The eager
release path via check_in / Invalidate already exists; the timer was
only meant to catch leaked reservations.

Raise the TTL to 5 min via new HostCacheConfig::host_reservation_safety_ttl
(humantime-serde configurable) and document its intent as a leak-recovery
net rather than primary lifecycle. Explicit release on check_in /
Invalidate remains unchanged.
Addresses issue AcademySoftwareFoundation#2318 (item 6).

matcher.rs previously called panic! when HostCacheService failed to
query the database. A single transient hiccup (statement timeout,
network blip, replica failover) brought the whole scheduler down and
rolled back every in-flight dispatch.

Add a DbCircuitBreaker in host_cache/actor.rs that wraps the DB query
in fetch_group_data:
- On success: reset the failure counter and close the breaker.
- On failure: bump the counter and open the breaker for an
  exponentially growing window (base 500ms, capped at 30s).
- Concurrent callers that hit an open breaker short-circuit without
  retrying the DB.
- After CONFIG.host_cache.db_circuit_breaker.failure_threshold (10)
  consecutive failures the host cache logs and process::exit(1)s — the
  orchestration layer handles the restart, no panic backtrace.

matcher.rs now logs the FailedToQueryHostCache arm at warn and lets
the layer skip; other clusters keep dispatching. Added 3 unit tests
for the breaker state machine.
Addresses issue AcademySoftwareFoundation#2318 (item 3).

Handler<DispatchLayerMessage> previously opened an outer transaction
purely to scope a session-level host advisory lock around dispatch.
The outer tx did no DB writes of its own and held a pool connection
across every per-frame transaction, gRPC call, and compensation step
of the entire layer dispatch.

- Switch the host lock from session-level pg_try_advisory_lock to
  transaction-scoped pg_try_advisory_xact_lock and acquire it inside
  each per-proc transaction in dispatch_virtual_proc. The lock now
  auto-releases on COMMIT/ROLLBACK, hold time is bounded to the small
  DB-only critical section, and no connection is held during gRPC.
- Remove the outer transaction from Handler<DispatchLayerMessage>.
- Drop the host_dao::lock / host_dao::unlock methods (no callers).
- dispatch() no longer takes a Transaction parameter.

Behavioral change: dispatch locking is now per-frame rather than
per-layer. Two schedulers may interleave frames on the same host;
each frame still has exclusive lock-protected DB updates (and the
frame int_version optimistic check still gates frame ownership), so
correctness is preserved while parallelism improves.
Addresses issue AcademySoftwareFoundation#2318 (item 1).

The hosts_index was a single RwLock<BTreeMap<CoreKey, MemoryBTree>>
that every check_out / check_in acquired. check_out took the outer
read lock to scan and then re-acquired the outer write lock to remove
on a hit; check_in always took the outer write lock. All operations
across all core buckets serialized through this one lock.

Wrap each per-CoreKey bucket in its own Arc<Mutex<MemoryBTree>>:
- The outer RwLock now guards only the (rarely changing) bucket
  directory and is read on every operation.
- check_out collects bucket Arcs under a brief outer read, then scans
  each bucket independently with its own Mutex.
- check_in fast-paths into the outer read lock and an existing bucket
  Mutex; only the rare new-CoreKey case upgrades to outer write.

Disjoint core sizes now proceed in parallel, which is the common case
under mixed workloads. Same-bucket contention still exists but is
bounded to short critical sections instead of the entire dispatch.

Added `concurrent_check_ins_on_disjoint_buckets` test exercising the
sharded paths. All 19 host_cache::cache unit tests pass.
Addresses issue AcademySoftwareFoundation#2318 (item 2).

The in-memory LayerPermitService was an Actix actor that issued
time-limited permits keyed on layer ID to dedupe concurrent layer
dispatches. The permit could only deduplicate within a single
scheduler process, so multi-replica deployments would happily race
each other on the same layer.

Replace it with a per-layer row lock acquired via
`SELECT pk_layer FROM layer WHERE pk_layer = $1 FOR UPDATE SKIP LOCKED`
held in a transaction for the duration of layer processing:
- Multi-replica coordination is now handled by Postgres directly.
- LayerLockGuard's transaction is committed (or implicitly rolled back)
  when processing finishes, releasing the row lock.
- LayerPermitService and its Actix message types are deleted entirely.

Each concurrent layer dispatch now holds two DB connections (the lock
transaction plus the per-proc transaction), so the matcher's
concurrency_semaphore is halved from (pool_size - 1) to
(pool_size / 2 - 1) — at minimum 1 — to keep us inside the pool budget.
Addresses issue AcademySoftwareFoundation#2318 (item 5).

HostCacheService and RqdDispatcherService were Actix actors whose
Handler impls did nothing but await an async block — no actor-private
state, no message ordering, no supervision. Every call paid for an
extra mailbox round-trip, a heap-allocated message envelope, and a
ResponseActFuture boxing per send.

Refactor both to plain `#[derive(Clone)]` structs (already Arc-wrapped
internally) with regular async methods:
- HostCacheService now exposes `check_out`, `check_in_payload`, and
  `cache_ratio` as `pub` async / sync methods. The periodic refresh
  and cleanup loops live in `spawn_background_tasks`, started by the
  singleton initializer.
- RqdDispatcherService exposes `dispatch_layer` as a pub async method.
- The OnceCell singletons now hold the service value directly (cheap
  Clone) instead of an `Addr<...>`.

Matcher and main.rs:
- MatchingService fields lose their `Addr<...>` wrapper.
- Matcher call sites become direct `.method(...).await` calls; the
  `Actor is unresponsive` expects are gone since no mailbox exists.
- main.rs no longer constructs an actix::System; it just block_on's the
  Tokio runtime directly. The redundant `System::current().stop()` is
  removed.

Cargo.toml:
- Remove the `actix = "0.13"` dependency.

Messages modules retain the data types that are still useful as
parameter / response grouping (`CheckedOutHost`, `CheckInPayload`,
`CacheRatioResponse`, `DispatchLayerMessage`, `DispatchResult`); the
now-redundant `CheckOut`, `CheckIn`, and `CacheRatio` marker structs
are deleted along with their actix `Message` / `MessageResponse`
derives.

stress_tests.rs swaps `#[actix::test]` for `#[tokio::test]` so the
(pre-existing) feature-gated smoke tests at least target the current
toolchain.
@ramonfigueiredo ramonfigueiredo self-assigned this May 14, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 14, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 22550ab4-57c9-45ad-b860-cb5ef7d0e374

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Replaces Actix actors with direct service calls across the scheduler crate, adds a priority-queue ClusterFeed with productivity/backoff, shards HostCache into per-core buckets, adds a DB circuit breaker, and converts advisory/row locks to transaction-scoped locking with RAII guards. Configuration and metrics were extended for new behavior.

Changes

Actix removal and service refactor

Layer / File(s) Summary
Dependency and runtime bootstrap
rust/crates/scheduler/Cargo.toml, rust/crates/scheduler/src/main.rs, rust/crates/scheduler/tests/stress_tests.rs
Removes actix dependency; runs async_main() directly on Tokio runtime; test harness switched from #[actix::test] to #[tokio::test(...)].
HostCache actor → service
rust/crates/scheduler/src/host_cache/actor.rs, rust/crates/scheduler/src/host_cache/messages.rs, rust/crates/scheduler/src/host_cache/mod.rs
Removes Actix message/handler derives and actor impls; HostCacheService becomes a direct service with spawn_background_tasks(), check_in_payload() and pub async fn check_out(...); singleton accessor returns HostCacheService (cloned) instead of Addr.
Dispatcher actor → service
rust/crates/scheduler/src/pipeline/dispatcher/actor.rs, rust/crates/scheduler/src/pipeline/dispatcher/messages.rs, rust/crates/scheduler/src/pipeline/dispatcher/mod.rs
Removes actor/Handler impls and message derives; adds pub async fn dispatch_layer(...) entrypoint; singleton returns RqdDispatcherService directly instead of actor Addr.
Matcher wiring
rust/crates/scheduler/src/pipeline/matcher.rs
MatchingService fields changed to concrete HostCacheService and RqdDispatcherService; replaces actor send/await calls with direct async method calls; removes layer_permit usage and related initialization.

Cluster scheduling and metrics

Layer / File(s) Summary
Priority-queue scheduler core
rust/crates/scheduler/src/cluster.rs, rust/crates/scheduler/src/config/mod.rs
Adds private Scheduled with custom Ord/Eq; ClusterFeed reworked to queue: Arc<Mutex<BinaryHeap<Scheduled>>>, stop_flag, and Notify; FeedMessage becomes Stop
Dispatch & control loops
rust/crates/scheduler/src/cluster.rs
ClusterFeed::stream spawns dispatch loop (pop, sleep until eligibility, Notify preemption, send Cluster) and a control loop (process Done, compute next_eligible_at, update metrics, reinsert, handle Stop/channel close).
Job backoff and reporting
rust/crates/scheduler/src/pipeline/entrypoint.rs, rust/crates/scheduler/tests/util.rs
Job fetch now returns (processed_count, should_stop); empty cycles set sleep = Some(CONFIG.queue.cluster_empty_back_off) and use cycles_without_jobs atomic; FeedMessage::Done always reported; tests set 3s backoff and enable productivity bias.
Metrics
rust/crates/scheduler/src/metrics/mod.rs
Registers CLUSTER_POLLS_TOTAL, CLUSTER_ROUNDS_TOTAL, CLUSTER_LAST_DISPATCHED_JOBS and counters for host attempts/wasted attempts/layers-skipped; adds helper update functions and cached label handles.
Config knobs
rust/crates/scheduler/src/config/mod.rs
Adds QueueConfig.cluster_empty_back_off: Duration and cluster_productivity_bias: bool with defaults.

Host cache, locking and concurrency

Layer / File(s) Summary
Sharded host-cache buckets
rust/crates/scheduler/src/host_cache/cache.rs
hosts_index becomes RwLock<BTreeMap<CoreKey, MemoryBucket>> where MemoryBucket = Arc<Mutex<MemoryBTree>>; check_in uses double-checked locking to create buckets; remove_host snapshots bucket Arcs then locks per-bucket for scan/remove.
DB circuit breaker
rust/crates/scheduler/src/host_cache/actor.rs, rust/crates/scheduler/src/config/mod.rs
Adds DbCircuitBreaker with failure counting, capped exponential backoff, and escalation; fetch_group_data records successes/failures and short-circuits when breaker open; HostCacheConfig includes db_circuit_breaker defaults.
Background tasks & reservation TTL
rust/crates/scheduler/src/host_cache/actor.rs, rust/crates/scheduler/src/config/mod.rs
Adds HostCacheService::spawn_background_tasks() for periodic refresh/prune; host-reservation expiration uses configurable host_reservation_safety_ttl.
Transaction-scoped advisory & row locking
rust/crates/scheduler/src/dao/host_dao.rs, rust/crates/scheduler/src/dao/layer_dao.rs, rust/crates/scheduler/src/pipeline/dispatcher/actor.rs
Replaces session-scoped advisory lock API with try_xact_lock using pg_try_advisory_xact_lock; adds LayerLockGuard RAII wrapper from SELECT ... FOR UPDATE SKIP LOCKED via LayerDao::try_lock_layer; dispatcher acquires transaction-scoped host lock in dispatch_virtual_proc and returns FailedToStartOnDb on lock failure.
Permit removal → DB-driven concurrency
rust/crates/scheduler/src/pipeline/layer_permit.rs (removed), rust/crates/scheduler/src/pipeline/matcher.rs
Removes LayerPermitService actor; matcher now uses try_lock_layer to gate per-layer processing and records skipped layers/wasted-attempt accounting accordingly; host-cache DB failures are downgraded from panic! to warn+skip.

Sequence Diagram

sequenceDiagram
    participant Runtime as Tokio<br/>Runtime
    participant Entrypoint as Scheduler<br/>Entrypoint
    participant ClusterFeed as ClusterFeed<br/>(Priority Queue)
    participant Matcher as MatchingService
    participant HostCache as HostCacheService
    participant Dispatcher as RqdDispatcher<br/>Service
    participant DAO as Database<br/>(Locks & Queries)

    Runtime->>Entrypoint: run()
    Entrypoint->>ClusterFeed: stream(sender)
    Note over ClusterFeed: Spawns dispatch & control loops
    
    loop Cluster Selection
        ClusterFeed->>ClusterFeed: Pop highest-priority<br/>from BinaryHeap
        ClusterFeed->>ClusterFeed: Sleep until next_eligible_at<br/>or Notify preemption
        ClusterFeed->>Entrypoint: Send selected Cluster
    end

    loop Job Processing per Cluster
        Entrypoint->>DAO: Query pending jobs
        alt Jobs found
            Entrypoint->>Matcher: process_layer(layer)
            Matcher->>DAO: try_lock_layer(layer_id)<br/>SELECT...FOR UPDATE<br/>SKIP LOCKED
            alt Lock acquired
                Matcher->>HostCache: check_out(cluster, filter)
                HostCache->>DAO: Query hosts (guarded<br/>by circuit breaker)
                alt Circuit breaker closed
                    DAO-->>HostCache: Host candidates
                    HostCache-->>Matcher: CheckedOutHost
                else Circuit breaker open
                    HostCache-->>Matcher: Error (skip layer)
                end
                
                alt Host checkout succeeded
                    Matcher->>Dispatcher: dispatch_layer(msg)
                    Dispatcher->>DAO: try_xact_lock(host_id)<br/>pg_try_advisory_xact_lock
                    alt Lock acquired
                        Dispatcher->>DAO: Dispatch operations
                        DAO-->>Dispatcher: Result
                        Dispatcher-->>Matcher: DispatchResult
                    else Lock failed
                        Dispatcher-->>Matcher: FailedToStartOnDb
                    end
                    
                    alt Dispatch success
                        Matcher->>HostCache: check_in_payload(Host)
                    else Dispatch error
                        Matcher->>HostCache: check_in_payload(Invalidate)
                    end
                end
            else Layer locked elsewhere
                Matcher->>Matcher: Skip layer<br/>debug log
            end
            Matcher->>DAO: release() lock
            Entrypoint->>ClusterFeed: Done {<br/>processed_jobs: count,<br/>sleep: Option<Duration><br/>}
        else No jobs or error
            Entrypoint->>ClusterFeed: Done {<br/>processed_jobs: 0,<br/>sleep: cluster_empty_back_off<br/>}
        end
    end

    loop Control Loop
        ClusterFeed->>ClusterFeed: Receive FeedMessage::Done
        ClusterFeed->>ClusterFeed: Compute next_eligible_at<br/>(immediate or backoff)
        ClusterFeed->>ClusterFeed: Update productivity metrics
        ClusterFeed->>ClusterFeed: Re-insert into heap
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related issues

Suggested reviewers

  • lithorus
  • DiegoTavares

Poem

🐰 I hopped through code with nimble feet,
Actors gone — the services meet.
Queues now sort by need and pace,
Buckets keep their guarded place.
The scheduler hums, all tidy and neat.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title directly and clearly describes the main architectural and performance changes across the entire changeset, including Actix removal, caching optimizations, and circuit breaker additions.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ramonfigueiredo ramonfigueiredo marked this pull request as ready for review May 14, 2026 06:51
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (3)
rust/crates/scheduler/src/pipeline/dispatcher/actor.rs (1)

264-293: ⚡ Quick win

HostLock is expected contention — match it alongside the other "info" cases.

DispatchError::HostLock(_) (now emitted from the new try_xact_lock path at lines 438–443) falls into the catch-all _ arm and gets logged at warn!. In a multi-scheduler / multi-frame-per-layer setup, losing the host advisory lock to a peer is the expected fast-fail path — the same rationale the existing comment uses for FailedToUpdateResources / FailedToCreateProc. Logging it at warn! will flood logs in healthy steady state.

♻️ Suggested fix
                             match &err {
                                 DispatchError::FailedToUpdateResources(_)
-                                | DispatchError::FailedToCreateProc { .. } => {
+                                | DispatchError::FailedToCreateProc { .. }
+                                | DispatchError::HostLock(_) => {
                                     // Resource contention during DB updates is expected in
                                     // multi-scheduler environments.
                                     info!(
                                         "({dispatch_id}) Failed to start frame {} on Db. {}",
                                         frame_str, err
                                     );
                                 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rust/crates/scheduler/src/pipeline/dispatcher/actor.rs` around lines 264 -
293, The HostLock error is an expected contention and should be treated like the
other "info" cases; in the match inside the
DispatchVirtualProcError::FailedToStartOnDb arm, add DispatchError::HostLock(_)
alongside DispatchError::FailedToUpdateResources(_) and
DispatchError::FailedToCreateProc { .. } so it uses info! instead of falling
into the catch-all warn! branch (symbols:
DispatchVirtualProcError::FailedToStartOnDb, DispatchError::HostLock,
DispatchError::FailedToUpdateResources, DispatchError::FailedToCreateProc,
last_error, try_xact_lock).
rust/crates/scheduler/src/pipeline/matcher.rs (1)

269-296: 💤 Low value

Stale comment about "actor messaging" 'static requirement.

Since the dispatcher is now a direct async call (no Actix send), the closure no longer needs a 'static lifetime for actor messaging. The cloning is still useful (the closure outlives layer once it's moved into DispatchLayerMessage later), but the comment is misleading.

📝 Suggested wording
             // Clone only the minimal data needed for the validation closure
-            // These are needed because the closure must have 'static lifetime for actor messaging
+            // These are needed because `layer` itself is moved into the dispatcher below.
             let layer_id = layer.id;
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rust/crates/scheduler/src/pipeline/matcher.rs` around lines 269 - 296, Update
the misleading comment above the cloned variables (layer_id, show_id,
cores_requested, resource_accounting_service, os) to remove any reference to
Actix/actor messaging and a `'static` requirement; instead state that these
values are cloned because the validation closure is moved out of the local scope
(e.g., into DispatchLayerMessage) and therefore must own or outlive the closure,
and clarify that cloning is for lifetime/ownership reasons used by the
host_service.check_out call and Self::validate_match invocation.
rust/crates/scheduler/src/host_cache/actor.rs (1)

539-557: 🏗️ Heavy lift

Supervisor requirement is already documented; graceful shutdown remains a design trade-off.

The escalation path permanently abandons in-flight transactions, the semaphore permit, and concurrent work without signaling downstream components. This is intentional by design (see code comment: "orchestration layer handles the restart"), and the deployment documentation (docs/_docs/getting-started/deploying-scheduler.md lines 535–539) explicitly requires process supervisors (systemd, supervisord, Kubernetes) for scheduler deployment.

However, two points remain unaddressed:

  1. No configuration toggle: The code hard-exits on breaker escalation with no option for non-supervised environments (e.g., local development, CI). A config flag (exit_on_circuit_open: bool, defaulting to true) would allow graceful error propagation in dev/test settings.
  2. Graceful shutdown not implemented: Exiting without signaling the cluster feed or matcher leaves in-flight work orphaned. For example, a frame already launched on RQD waits for the dispatcher's compensation path, which never runs. The host report reconciliation will eventually clean up, but this is indirect recovery.

The supervisor assumption is sound and documented, so deployment risk is mitigated. The graceful shutdown gap is a design trade-off—acceptable if you accept the eventual-consistency recovery model, but worth documenting in the operator runbook to set expectations.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rust/crates/scheduler/src/host_cache/actor.rs` around lines 539 - 557, Add a
configurable toggle to avoid hard-exit on circuit-breaker escalation: introduce
an exit_on_circuit_open: bool (default true) on the HostCache/actor
configuration/struct and, in the Err branch where
self.db_circuit.record_failure() returns true, branch on that flag instead of
unconditionally calling std::process::exit(1); when exit_on_circuit_open is
false, log the escalation and return a propagated error (e.g., return
Err(err).into_diagnostic()) so callers can perform graceful shutdown/cleanup
(preserving semaphore permits and signaling downstream components) rather than
forcing a process exit. Ensure you update the code paths that reference the
escalation behavior (the block with self.db_circuit.record_failure(), the error!
message, and the std::process::exit call) to consult the new flag.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rust/crates/scheduler/src/cluster.rs`:
- Around line 486-489: The shutdown check currently only happens before and
after popping a ready cluster, so if stop_flag_dispatch is set after popping the
cluster the task can still await sender.send(...) and will have already
incremented CLUSTER_ROUNDS and scheduler_cluster_polls_total; change the flow in
the dispatch loop (the block that pops ready clusters and then calls
sender.send(...)) to re-check stop_flag_dispatch immediately before incrementing
metrics and before awaiting sender.send, and if set skip incrementing
CLUSTER_ROUNDS/scheduler_cluster_polls_total and skip the send (break
out/return); ensure the same pre-send stop_flag_dispatch guard is added in the
code paths around sender.send referenced in the cluster dispatch logic
(including the section around the existing lines 525-538) so shutdown can
preempt the publish path.
- Around line 236-244: The heap seeding pushes potentially-duplicate logical
clusters because filter_clusters(...) can normalize different inputs into the
same Cluster; before pushing into the BinaryHeap, deduplicate the filtered
`clusters` collection (the variable used to build Scheduled entries) so each
logical Cluster is only enqueued once. Implement deduplication by a stable key
(e.g., cluster.id or a composite key of identifying fields after ignore-tag
stripping) and apply the same fix in both places where you construct the
heap/Scheduled entries (the blocks creating `heap: BinaryHeap` and pushing
`Scheduled { cluster, next_eligible_at, ... }`, also present around the 285-294
region). Ensure you still preserve ordering semantics and use the deduplicated
items to create Scheduled instances.

In `@rust/crates/scheduler/src/metrics/mod.rs`:
- Around line 214-230: increment_cluster_polls and
set_cluster_last_dispatched_jobs allocate UUID->String on every call, adding
heap pressure in the hot dispatch/control loops; change their signatures to
accept precomputed label strings (e.g., show_label: &str, facility_label: &str)
or a small struct of &str labels computed once at cluster creation, and use
those label slices in the CLUSTER_POLLS_TOTAL.with_label_values(...) and
CLUSTER_LAST_DISPATCHED_JOBS.with_label_values(...) calls instead of calling
to_string() on the uuids so no per-invocation heap allocation occurs; update
callers in cluster.rs (where increment_cluster_polls and
set_cluster_last_dispatched_jobs are called) to pass the precomputed labels.

In `@rust/crates/scheduler/src/pipeline/entrypoint.rs`:
- Around line 110-122: The Err arm in entrypoint.rs currently returns (0, false)
so cycles_without_jobs is not incremented on fetch errors, preventing the
empty_job_cycles_before_quiting safety net from ever triggering; update the Err
branch to treat a fetch error as an empty cycle by returning the same
incremented counter used in the Ok arm (i.e., increment cycles_without_jobs and
return (cycles_without_jobs + 1, false) or otherwise update the shared atomic
counter you use for empty-cycle accounting), and adjust the log/comment
accordingly so the comment about the safety net is accurate.

In `@rust/crates/scheduler/src/pipeline/matcher.rs`:
- Around line 139-171: The WASTED_ATTEMPTS metric is being inflated by
lock-contention skips: update the loop around try_lock_layer/Ok(None) and
processed_layers so you track skips separately (e.g., a local
skipped_due_to_lock counter incremented in the Ok(None) branch) and only
increment WASTED_ATTEMPTS (and call metrics::increment_wasted_attempts) when
processed_layers == 0 AND skipped_due_to_lock == 0 (i.e., no layers were
processed and none were skipped due to peer locks); touch the try_lock_layer
handling, the processed_layers usage, and the WASTED_ATTEMPTS increment site so
the metric reflects true wasted work rather than expected contention.

In `@rust/crates/scheduler/tests/stress_tests.rs`:
- Line 71: The test is using Tokio's default single-threaded runtime via
#[tokio::test], weakening contention coverage; change the test attribute to use
the multi-thread runtime by replacing #[tokio::test] with #[tokio::test(flavor =
"multi_thread")] (optionally adding worker_threads = <n> if you want to control
thread count) so the stress_tests in rust/crates/scheduler/tests/stress_tests.rs
run on Tokio's multi-threaded runtime and exercise locking/contention code
paths.

---

Nitpick comments:
In `@rust/crates/scheduler/src/host_cache/actor.rs`:
- Around line 539-557: Add a configurable toggle to avoid hard-exit on
circuit-breaker escalation: introduce an exit_on_circuit_open: bool (default
true) on the HostCache/actor configuration/struct and, in the Err branch where
self.db_circuit.record_failure() returns true, branch on that flag instead of
unconditionally calling std::process::exit(1); when exit_on_circuit_open is
false, log the escalation and return a propagated error (e.g., return
Err(err).into_diagnostic()) so callers can perform graceful shutdown/cleanup
(preserving semaphore permits and signaling downstream components) rather than
forcing a process exit. Ensure you update the code paths that reference the
escalation behavior (the block with self.db_circuit.record_failure(), the error!
message, and the std::process::exit call) to consult the new flag.

In `@rust/crates/scheduler/src/pipeline/dispatcher/actor.rs`:
- Around line 264-293: The HostLock error is an expected contention and should
be treated like the other "info" cases; in the match inside the
DispatchVirtualProcError::FailedToStartOnDb arm, add DispatchError::HostLock(_)
alongside DispatchError::FailedToUpdateResources(_) and
DispatchError::FailedToCreateProc { .. } so it uses info! instead of falling
into the catch-all warn! branch (symbols:
DispatchVirtualProcError::FailedToStartOnDb, DispatchError::HostLock,
DispatchError::FailedToUpdateResources, DispatchError::FailedToCreateProc,
last_error, try_xact_lock).

In `@rust/crates/scheduler/src/pipeline/matcher.rs`:
- Around line 269-296: Update the misleading comment above the cloned variables
(layer_id, show_id, cores_requested, resource_accounting_service, os) to remove
any reference to Actix/actor messaging and a `'static` requirement; instead
state that these values are cloned because the validation closure is moved out
of the local scope (e.g., into DispatchLayerMessage) and therefore must own or
outlive the closure, and clarify that cloning is for lifetime/ownership reasons
used by the host_service.check_out call and Self::validate_match invocation.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3663bac1-db53-4b40-a294-0aac243583ac

📥 Commits

Reviewing files that changed from the base of the PR and between be93550 and 505b55c.

📒 Files selected for processing (20)
  • rust/crates/scheduler/Cargo.toml
  • rust/crates/scheduler/src/cluster.rs
  • rust/crates/scheduler/src/config/mod.rs
  • rust/crates/scheduler/src/dao/host_dao.rs
  • rust/crates/scheduler/src/dao/layer_dao.rs
  • rust/crates/scheduler/src/host_cache/actor.rs
  • rust/crates/scheduler/src/host_cache/cache.rs
  • rust/crates/scheduler/src/host_cache/messages.rs
  • rust/crates/scheduler/src/host_cache/mod.rs
  • rust/crates/scheduler/src/main.rs
  • rust/crates/scheduler/src/metrics/mod.rs
  • rust/crates/scheduler/src/pipeline/dispatcher/actor.rs
  • rust/crates/scheduler/src/pipeline/dispatcher/messages.rs
  • rust/crates/scheduler/src/pipeline/dispatcher/mod.rs
  • rust/crates/scheduler/src/pipeline/entrypoint.rs
  • rust/crates/scheduler/src/pipeline/layer_permit.rs
  • rust/crates/scheduler/src/pipeline/matcher.rs
  • rust/crates/scheduler/src/pipeline/mod.rs
  • rust/crates/scheduler/tests/stress_tests.rs
  • rust/crates/scheduler/tests/util.rs
💤 Files with no reviewable changes (3)
  • rust/crates/scheduler/src/pipeline/mod.rs
  • rust/crates/scheduler/src/pipeline/layer_permit.rs
  • rust/crates/scheduler/Cargo.toml

Comment thread rust/crates/scheduler/src/cluster.rs
Comment thread rust/crates/scheduler/src/cluster.rs
Comment thread rust/crates/scheduler/src/metrics/mod.rs Outdated
Comment thread rust/crates/scheduler/src/pipeline/entrypoint.rs
Comment thread rust/crates/scheduler/src/pipeline/matcher.rs
Comment thread rust/crates/scheduler/tests/stress_tests.rs Outdated
Apply CodeRabbit fixes to the cue-scheduler optimizations:

- cluster.rs: dedupe filtered clusters via HashSet before seeding the priority heap (filter_clusters can fold distinct inputs onto the same identity); make the dispatch publish path preemptible by racing sender.reserve() against the stop notify, and only bump CLUSTER_ROUNDS / cluster_polls_total once a Permit is in hand.

- metrics: cache per-cluster Counter / Gauge handles in two scc::HashMap<(Uuid, Uuid), _> statics so Uuid::to_string() allocations happen at most once per (show, facility) pair instead of on every dispatch. Add scheduler_layers_skipped_by_lock_total and tighten the wasted_attempts description.

- entrypoint.rs: fold fetch-error cycles into the same cycles_without_jobs atomic the success path uses so the empty_job_cycles_before_quiting safety net actually trips when the DB is persistently unhealthy.

- matcher.rs: track peer-lock skips in a separate atomic and only fire WASTED_ATTEMPTS when no layer was processed and none were skipped by a peer lock. Prevents the metric from being dominated by expected multi-replica contention.

- tests/stress_tests.rs: run the harness on Tokio's multi-thread runtime (worker_threads = 8) so the new contention paths are actually exercised.
@ramonfigueiredo
Copy link
Copy Markdown
Collaborator Author

Note: This PR was created on top of the PR:

@ramonfigueiredo ramonfigueiredo marked this pull request as draft May 14, 2026 17:35
@ramonfigueiredo ramonfigueiredo marked this pull request as ready for review May 14, 2026 17:39
@ramonfigueiredo
Copy link
Copy Markdown
Collaborator Author

This PR is ready for review, but kept in draft because there are other changes from @DiegoTavares in progress that conflict with this PR.

@ramonfigueiredo ramonfigueiredo marked this pull request as draft May 14, 2026 17:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant