Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion rust/crates/scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ debug = true
opencue-proto = { path = "../opencue-proto" }

# External Dependencies
actix = "0.13"
chrono = "0.4.38"
futures = { workspace = true }
scc = "3.1"
Expand Down
640 changes: 526 additions & 114 deletions rust/crates/scheduler/src/cluster.rs

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions rust/crates/scheduler/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ pub struct QueueConfig {
pub hostname_tags_chunk_size: usize,
pub host_candidate_attempts_per_layer: usize,
pub empty_job_cycles_before_quiting: Option<usize>,
/// Delay before an empty cluster (no eligible jobs) becomes eligible for dispatch again.
/// Used by the priority-queue cluster feed to back off without blocking neighbors.
#[serde(with = "humantime_serde")]
pub cluster_empty_back_off: Duration,
/// When `true`, the cluster priority queue biases toward clusters that produced more
/// jobs in the most recent dispatch (productivity bias). When `false`, all clusters
/// compete strictly on `next_eligible_at` with a stable tiebreaker.
pub cluster_productivity_bias: bool,
pub mem_reserved_min: ByteSize,
#[serde(with = "humantime_serde")]
pub subscription_recalculation_interval: Duration,
Expand All @@ -108,6 +116,8 @@ impl Default for QueueConfig {
hostname_tags_chunk_size: 300,
host_candidate_attempts_per_layer: 10,
empty_job_cycles_before_quiting: None,
cluster_empty_back_off: Duration::from_secs(3),
cluster_productivity_bias: true,
mem_reserved_min: ByteSize::mib(250),
subscription_recalculation_interval: Duration::from_secs(3),
resource_recalculation_interval: Duration::from_secs(10),
Expand Down Expand Up @@ -222,6 +232,41 @@ pub struct HostCacheConfig {
#[serde(with = "humantime_serde")]
pub host_staleness_threshold: Duration,
pub update_stat_on_book: bool,
/// Safety net for leaked host reservations. The reservation system relies
/// on explicit `check_in` / `Invalidate` calls; this TTL only fires when a
/// task crashes or otherwise drops a checked-out host without releasing it.
/// Should be larger than the longest plausible dispatch (slow RQD, retries).
#[serde(with = "humantime_serde")]
pub host_reservation_safety_ttl: Duration,
/// Circuit-breaker configuration for host-cache DB queries.
pub db_circuit_breaker: DbCircuitBreakerConfig,
}

/// Circuit-breaker tuning for host-cache database queries.
///
/// On consecutive failures the breaker opens for an exponentially growing
/// window (`base_backoff` → `max_backoff`), short-circuiting further queries
/// without hitting the DB. Once `failure_threshold` consecutive failures
/// accumulate, the scheduler exits with status 1 so the orchestrator restarts
/// it cleanly rather than letting it limp along.
#[derive(Debug, Deserialize, Clone)]
#[serde(default)]
pub struct DbCircuitBreakerConfig {
pub failure_threshold: u32,
#[serde(with = "humantime_serde")]
pub base_backoff: Duration,
#[serde(with = "humantime_serde")]
pub max_backoff: Duration,
}

impl Default for DbCircuitBreakerConfig {
fn default() -> Self {
Self {
failure_threshold: 10,
base_backoff: Duration::from_millis(500),
max_backoff: Duration::from_secs(30),
}
}
}

impl Default for HostCacheConfig {
Expand All @@ -236,6 +281,8 @@ impl Default for HostCacheConfig {
concurrent_fetch_permit: 4,
host_staleness_threshold: Duration::from_secs(2 * 60), // 2 minutes
update_stat_on_book: false,
host_reservation_safety_ttl: Duration::from_secs(5 * 60), // 5 minutes
db_circuit_breaker: DbCircuitBreakerConfig::default(),
}
}
}
Expand Down
40 changes: 13 additions & 27 deletions rust/crates/scheduler/src/dao/host_dao.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,44 +295,30 @@ impl HostDao {
/// * `Ok(true)` - Lock successfully acquired
/// * `Ok(false)` - Lock already held by another process
/// * `Err(miette::Error)` - Database operation failed
pub async fn lock(
&self,
transaction: &mut Transaction<'_, Postgres>,
host_id: &Uuid,
) -> Result<bool> {
trace!("Locking {}", host_id);
sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_lock(hashtext($1))")
.bind(host_id.to_string())
.fetch_one(&mut **transaction)
.await
.into_diagnostic()
.wrap_err("Failed to acquire advisory lock")
}

/// Releases an advisory lock on a host after dispatch completion.
/// Acquires a transaction-scoped advisory lock on a host.
///
/// Releases the PostgreSQL advisory lock that was acquired during
/// the dispatch process, allowing other dispatchers to access the host.
///
/// # Arguments
/// * `host_id` - The UUID of the host to unlock
/// Uses `pg_try_advisory_xact_lock`, which auto-releases on COMMIT or
/// ROLLBACK. This is the preferred lock variant for short critical
/// sections inside the per-proc transaction — callers don't need to issue
/// an explicit unlock and the lock cannot leak if the connection is
/// returned to the pool while still session-locked.
///
/// # Returns
/// * `Ok(true)` - Lock successfully released
/// * `Ok(false)` - Lock was not held by this process
/// * `Err(miette::Error)` - Database operation failed
pub async fn unlock(
/// * `Ok(true)` — lock acquired (and will release at tx end)
/// * `Ok(false)` — another transaction holds the lock; caller should skip
/// * `Err(_)` — database error
pub async fn try_xact_lock(
&self,
transaction: &mut Transaction<'_, Postgres>,
host_id: &Uuid,
) -> Result<bool> {
trace!("Unlocking {}", host_id);
sqlx::query_scalar::<_, bool>("SELECT pg_advisory_unlock(hashtext($1))")
trace!("Acquiring xact advisory lock for {}", host_id);
sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_xact_lock(hashtext($1))")
.bind(host_id.to_string())
.fetch_one(&mut **transaction)
.await
.into_diagnostic()
.wrap_err("Failed to release advisory lock")
.wrap_err("Failed to acquire transaction-scoped advisory lock")
}

/// Updates a host's available resource counts after frame dispatch.
Expand Down
48 changes: 48 additions & 0 deletions rust/crates/scheduler/src/dao/layer_dao.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,23 @@ ORDER BY
lf.int_layer_order
"#;

/// Guard returned by [`LayerDao::try_lock_layer`] holding an open transaction
/// that has a row-level lock on the layer. Drop / commit the guard to release
/// the lock so other schedulers can pick the layer up.
pub struct LayerLockGuard {
transaction: sqlx::Transaction<'static, Postgres>,
}

impl LayerLockGuard {
/// Releases the lock by committing the underlying empty transaction.
/// Dropping the guard without calling this also releases (via implicit
/// rollback) but that path logs an extra rollback failure on some
/// sqlx versions; prefer this explicit release on the happy path.
pub async fn release(self) -> Result<(), sqlx::Error> {
self.transaction.commit().await
}
}

impl LayerDao {
/// Creates a new LayerDao from database configuration.
///
Expand Down Expand Up @@ -339,6 +356,37 @@ impl LayerDao {
Ok(self.group_layers_and_frames(combined_models))
}

/// Attempts to acquire a row-level lock on a layer via
/// `SELECT … FOR UPDATE SKIP LOCKED`.
///
/// Used to deduplicate dispatch attempts across concurrent scheduler
/// processes / replicas — if another scheduler already holds the row's
/// lock, this returns `Ok(None)` immediately (no waiting) and the caller
/// should skip the layer.
///
/// The lock is scoped to the returned [`LayerLockGuard`]'s transaction and
/// is released when the guard is committed or dropped.
pub async fn try_lock_layer(
&self,
layer_id: Uuid,
) -> Result<Option<LayerLockGuard>, sqlx::Error> {
let mut tx = self.connection_pool.begin().await?;
let locked: Option<String> = sqlx::query_scalar(
"SELECT pk_layer FROM layer WHERE pk_layer = $1 FOR UPDATE SKIP LOCKED",
)
.bind(layer_id.to_string())
.fetch_optional(&mut *tx)
.await?;

if locked.is_some() {
Ok(Some(LayerLockGuard { transaction: tx }))
} else {
// No row returned means another transaction already locked it.
let _ = tx.rollback().await;
Ok(None)
}
}

/// Groups flat query results into layers with their associated frames.
///
/// Transforms the denormalized query results into a structured hierarchy
Expand Down
Loading
Loading