From 998dc724105558ec0bb845479e0515b1549e97a3 Mon Sep 17 00:00:00 2001 From: Shiyas Mohammed Date: Tue, 23 Dec 2025 21:10:55 +0530 Subject: [PATCH] feat(worker): add glob pattern support for worker node job affinity --- crates/bin/ampctl/src/cmd/dataset/deploy.rs | 9 +- crates/clients/admin/src/datasets.rs | 103 ++++++++- .../admin-api/src/handlers/datasets/deploy.rs | 15 +- crates/services/admin-api/src/scheduler.rs | 207 +++++++++++++++++- crates/services/controller/src/scheduler.rs | 29 ++- crates/services/worker/src/node_id.rs | 6 +- docs/openapi-specs/admin.spec.json | 2 +- tests/src/testlib/fixtures/ampctl.rs | 4 +- 8 files changed, 347 insertions(+), 28 deletions(-) diff --git a/crates/bin/ampctl/src/cmd/dataset/deploy.rs b/crates/bin/ampctl/src/cmd/dataset/deploy.rs index 14e1b68bb..f2cd0afe3 100644 --- a/crates/bin/ampctl/src/cmd/dataset/deploy.rs +++ b/crates/bin/ampctl/src/cmd/dataset/deploy.rs @@ -15,9 +15,10 @@ //! - End block: `--end-block` flag (optional) - "latest", block number, or negative offset //! - Logging: `AMP_LOG` env var (`error`, `warn`, `info`, `debug`, `trace`) +use admin_client::datasets::NodeSelector; use datasets_common::reference::Reference; use dump::EndBlock; -use worker::{job::JobId, node_id::NodeId}; +use worker::job::JobId; use crate::args::GlobalArgs; @@ -62,8 +63,8 @@ pub struct Args { /// If not specified, a worker will be selected randomly from available workers. /// /// The worker must be active (has sent heartbeats recently) for the deployment to succeed. - #[arg(long, value_parser = clap::value_parser!(NodeId))] - pub worker_id: Option, + #[arg(long, value_parser = clap::value_parser!(NodeSelector))] + pub worker_id: Option, } /// Result of a dataset deployment operation. @@ -125,7 +126,7 @@ async fn deploy_dataset( dataset_ref: &Reference, end_block: Option, parallelism: u16, - worker_id: Option, + worker_id: Option, ) -> Result { let client = global.build_client().map_err(Error::ClientBuild)?; let job_id = client diff --git a/crates/clients/admin/src/datasets.rs b/crates/clients/admin/src/datasets.rs index 33cbeee9d..6514c4db7 100644 --- a/crates/clients/admin/src/datasets.rs +++ b/crates/clients/admin/src/datasets.rs @@ -9,7 +9,7 @@ use datasets_common::{ use dump::EndBlock; use monitoring::logging; use serde_json::value::RawValue; -use worker::{job::JobId, node_id::NodeId}; +use worker::job::JobId; use super::{ Client, @@ -242,7 +242,7 @@ impl<'a> DatasetsClient<'a> { dataset_ref: &Reference, end_block: Option, parallelism: u16, - worker_id: Option, + worker_id: Option, ) -> Result { let namespace = dataset_ref.namespace(); let name = dataset_ref.name(); @@ -1105,7 +1105,7 @@ struct DeployRequest { end_block: Option, parallelism: u16, #[serde(skip_serializing_if = "Option::is_none")] - worker_id: Option, + worker_id: Option, } /// Input type for dataset registration manifest parameter. @@ -1796,3 +1796,100 @@ pub enum ListJobsError { #[error("unexpected response with status {status_code}")] UnexpectedResponse { status_code: u16 }, } + +use worker::node_id::{InvalidIdError, NodeId, validate_node_id}; + +/// A glob pattern for matching worker node IDs by prefix +/// +/// Matches any node ID that starts with the pattern prefix. +/// Created by parsing a string ending with `*` (e.g., `"worker-eth-*"`). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct NodeIdGlob(String); + +impl serde::Serialize for NodeIdGlob { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + format!("{}*", self.0).serialize(serializer) + } +} + +impl std::str::FromStr for NodeIdGlob { + type Err = InvalidGlobError; + + fn from_str(s: &str) -> Result { + let Some(prefix) = s.strip_suffix('*') else { + return Err(InvalidGlobError(s.to_string())); + }; + validate_node_id(prefix)?; + + Ok(NodeIdGlob(prefix.to_string())) + } +} + +/// Error returned when a glob pattern is invalid. +#[derive(Debug, thiserror::Error)] +#[error("glob pattern must end with '*', got '{0}'")] +pub struct InvalidGlobError(String); + +impl From for InvalidGlobError { + fn from(e: InvalidIdError) -> Self { + InvalidGlobError(e.to_string()) + } +} + +/// Selector for targeting worker nodes by exact ID or glob pattern +/// +/// Used to specify which worker(s) should handle a job deployment. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum NodeSelector { + /// Match a specific worker by exact node ID + Exact(NodeId), + /// Match workers whose IDs start with the glob prefix + Glob(NodeIdGlob), +} + +impl serde::Serialize for NodeSelector { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + NodeSelector::Exact(node_id) => node_id.serialize(serializer), + NodeSelector::Glob(glob) => glob.serialize(serializer), + } + } +} + +impl std::str::FromStr for NodeSelector { + type Err = NodeSelectorParseError; + + fn from_str(s: &str) -> Result { + if s.ends_with('*') { + let glob = NodeIdGlob::from_str(s).map_err(NodeSelectorParseError::InvalidGlob)?; + Ok(NodeSelector::Glob(glob)) + } else { + let node_id = s + .parse::() + .map_err(NodeSelectorParseError::InvalidNodeId)?; + Ok(NodeSelector::Exact(node_id)) + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum NodeSelectorParseError { + /// The string is not a valid node ID + /// + /// This occurs when the input doesn't end with `*` and fails node ID validation. + #[error("invalid node ID")] + InvalidNodeId(#[source] InvalidIdError), + + /// The glob pattern prefix is invalid + /// + /// This occurs when the input ends with `*` but the prefix portion + /// fails node ID validation rules. + #[error("invalid glob pattern")] + InvalidGlob(#[source] InvalidGlobError), +} diff --git a/crates/services/admin-api/src/handlers/datasets/deploy.rs b/crates/services/admin-api/src/handlers/datasets/deploy.rs index ed91e25ea..7ac76a2ee 100644 --- a/crates/services/admin-api/src/handlers/datasets/deploy.rs +++ b/crates/services/admin-api/src/handlers/datasets/deploy.rs @@ -11,12 +11,12 @@ use axum::{ use dataset_store::DatasetKind; use datasets_common::{name::Name, namespace::Namespace, reference::Reference, revision::Revision}; use monitoring::logging; -use worker::{job::JobId, node_id::NodeId}; +use worker::job::JobId; use crate::{ ctx::Ctx, handlers::error::{ErrorResponse, IntoErrorResponse}, - scheduler::ScheduleJobError, + scheduler::{NodeSelector, ScheduleJobError}, }; /// Handler for the `POST /datasets/{namespace}/{name}/versions/{revision}/deploy` endpoint @@ -218,15 +218,18 @@ pub struct DeployRequest { #[serde(default = "default_parallelism")] pub parallelism: u16, - /// Optional worker ID to assign the job to + /// Optional worker selector - either an exact worker ID or a glob pattern. /// - /// If specified, the job will be assigned to this specific worker. - /// If not specified, a worker will be selected randomly from available workers. + /// Examples: + /// - `"worker-node-0"` - assigns to the specific worker with ID "worker-node-0" + /// - `"worker-eth-*"` - randomly selects from workers matching the pattern /// + /// If not specified, a worker will be selected randomly from all available workers. + /// If a glob pattern is provided, one matching worker will be randomly selected /// The worker must be active (has sent heartbeats recently) for the deployment to succeed. #[serde(default)] #[cfg_attr(feature = "utoipa", schema(value_type = Option))] - pub worker_id: Option, + pub worker_id: Option, } fn default_parallelism() -> u16 { diff --git a/crates/services/admin-api/src/scheduler.rs b/crates/services/admin-api/src/scheduler.rs index b2367306c..511d05330 100644 --- a/crates/services/admin-api/src/scheduler.rs +++ b/crates/services/admin-api/src/scheduler.rs @@ -32,7 +32,7 @@ use dump::EndBlock; use metadata_db::Worker; use worker::{ job::{Job, JobId, JobStatus}, - node_id::NodeId, + node_id::{InvalidIdError, NodeId, validate_node_id}, }; /// Combined trait for scheduler functionality @@ -58,7 +58,7 @@ pub trait SchedulerJobs: Send + Sync { dataset_kind: DatasetKind, end_block: EndBlock, max_writers: u16, - worker_id: Option, + worker_id: Option, ) -> Result; /// Stop a running job @@ -167,6 +167,15 @@ pub enum ScheduleJobError { /// - Connection is lost during notification #[error("failed to notify worker: {0}")] NotifyWorker(#[source] metadata_db::Error), + + /// No active workers match the glob pattern + /// + /// This occurs when: + /// - No workers are registered in the system + /// - No workers are active (have sent heartbeats recently) + /// - No workers match the glob pattern + #[error("no active workers match pattern '{0}'")] + NoMatchingWorkers(NodeIdGlob), } /// Errors that can occur when stopping a job @@ -332,3 +341,197 @@ pub struct ListWorkersError(#[source] pub metadata_db::Error); #[derive(Debug, thiserror::Error)] #[error("metadata database error")] pub struct GetWorkerError(#[source] pub metadata_db::Error); + +/// A glob pattern for matching worker node IDs by prefix +/// +/// Matches any node ID that starts with the pattern prefix. +/// Created by parsing a string ending with `*` (e.g., `"worker-eth-*"`). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct NodeIdGlob(String); + +impl serde::Serialize for NodeIdGlob { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + format!("{}*", self.0).serialize(serializer) + } +} + +impl std::str::FromStr for NodeIdGlob { + type Err = InvalidGlobError; + + fn from_str(s: &str) -> Result { + // If the string ends with '*', remove it and validate the prefix + let Some(prefix) = s.strip_suffix('*') else { + return Err(InvalidGlobError(s.to_string())); + }; + validate_node_id(prefix)?; + + Ok(NodeIdGlob(prefix.to_string())) + } +} + +impl std::fmt::Display for NodeIdGlob { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}*", self.0) + } +} + +impl NodeIdGlob { + /// Returns true if the given string starts with this glob's prefix + pub fn matches_str(&self, s: &str) -> bool { + s.starts_with(&self.0) + } +} + +/// Error returned when a glob pattern is invalid. +#[derive(Debug, thiserror::Error)] +#[error("glob pattern must end with '*', got '{0}'")] +pub struct InvalidGlobError(String); + +impl From for InvalidGlobError { + fn from(e: InvalidIdError) -> Self { + InvalidGlobError(e.to_string()) + } +} + +/// Selector for targeting worker nodes by exact ID or glob pattern +/// +/// Used to specify which worker(s) should handle a job deployment. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum NodeSelector { + /// Match a specific worker by exact node ID + Exact(NodeId), + /// Match workers whose IDs start with the glob prefix + Glob(NodeIdGlob), +} + +impl serde::Serialize for NodeSelector { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + NodeSelector::Exact(node_id) => node_id.serialize(serializer), + NodeSelector::Glob(glob) => glob.serialize(serializer), + } + } +} + +impl<'de> serde::Deserialize<'de> for NodeSelector { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let val = String::deserialize(deserializer)?; + val.parse().map_err(serde::de::Error::custom) + } +} + +impl std::str::FromStr for NodeSelector { + type Err = NodeSelectorParseError; + + // If the string ends with '*', parse it as a glob pattern + // Otherwise, parse it as a node ID + fn from_str(s: &str) -> Result { + if s.ends_with('*') { + let glob = NodeIdGlob::from_str(s).map_err(NodeSelectorParseError::InvalidGlob)?; + Ok(NodeSelector::Glob(glob)) + } else { + let node_id = s + .parse::() + .map_err(NodeSelectorParseError::InvalidNodeId)?; + Ok(NodeSelector::Exact(node_id)) + } + } +} + +/// Errors that occur when parsing a node selector string +#[derive(Debug, thiserror::Error)] +pub enum NodeSelectorParseError { + /// The string is not a valid node ID + /// + /// This occurs when the input doesn't end with `*` and fails node ID validation. + #[error("invalid node ID")] + InvalidNodeId(#[source] InvalidIdError), + + /// The glob pattern prefix is invalid + /// + /// This occurs when the input ends with `*` but the prefix portion + /// fails node ID validation rules. + #[error("invalid glob pattern: '{0}'")] + InvalidGlob(#[source] InvalidGlobError), +} + +#[cfg(test)] +mod tests { + use super::*; + + mod glob_or_node_id { + use super::*; + + #[test] + fn parse_exact_node_id() { + let result: NodeSelector = "worker-01".parse().unwrap(); + assert!(matches!(result, NodeSelector::Exact(_))); + + if let NodeSelector::Exact(node_id) = result { + assert_eq!(node_id.as_str(), "worker-01"); + } + } + + #[test] + fn parse_glob_with_asterisk() { + let result: NodeSelector = "worker-*".parse().unwrap(); + assert!(matches!(result, NodeSelector::Glob(_))); + } + + #[test] + fn parse_invalid_glob_pattern() { + let result: Result = "[invalid".parse(); + assert!(result.is_err()); + } + + #[test] + fn parse_invalid_node_id() { + // Starts with number - invalid as NodeId, no glob chars + let result: Result = "123-worker".parse(); + assert!(result.is_err()); + } + + #[test] + fn glob_matches_node_id() { + let glob: NodeSelector = "worker-raw-eth-*".parse().unwrap(); + + if let NodeSelector::Glob(ref g) = glob { + assert!(g.matches_str("worker-raw-eth-mainnet")); + assert!(g.matches_str("worker-raw-eth-l2s")); + assert!(!g.matches_str("worker-other")); + assert!(!g.matches_str("worker-raw-eth")); // No trailing chars + } else { + panic!("Expected Glob variant"); + } + } + + #[test] + fn glob_equality() { + let glob1: NodeSelector = "worker-*".parse().unwrap(); + let glob2: NodeSelector = "worker-*".parse().unwrap(); + + assert_eq!(glob1, glob2); + } + + #[test] + fn deserialize_exact() { + let result: NodeSelector = serde_json::from_str(r#""worker-01""#).unwrap(); + assert!(matches!(result, NodeSelector::Exact(_))); + } + + #[test] + fn deserialize_glob() { + let result: NodeSelector = serde_json::from_str(r#""worker-*""#).unwrap(); + assert!(matches!(result, NodeSelector::Glob(_))); + } + } +} diff --git a/crates/services/controller/src/scheduler.rs b/crates/services/controller/src/scheduler.rs index cf634d181..bd80c1636 100644 --- a/crates/services/controller/src/scheduler.rs +++ b/crates/services/controller/src/scheduler.rs @@ -28,8 +28,8 @@ use std::time::Duration; use admin_api::scheduler::{ DeleteJobError, DeleteJobsByStatusError, GetJobError, GetWorkerError, ListJobsByDatasetError, - ListJobsError, ListWorkersError, ScheduleJobError, SchedulerJobs, SchedulerWorkers, - StopJobError, + ListJobsError, ListWorkersError, NodeSelector, ScheduleJobError, SchedulerJobs, + SchedulerWorkers, StopJobError, }; use async_trait::async_trait; use dataset_store::DatasetKind; @@ -69,14 +69,14 @@ impl Scheduler { /// Schedule a dataset synchronization job /// /// Checks for existing scheduled or running jobs to avoid duplicates, selects an available - /// worker node (or uses the specified worker_id), and registers the job in the metadata database. + /// worker node (either randomly, by exact worker_id, or by matching a glob pattern) and registers the job in the metadata database. async fn schedule_dataset_sync_job_impl( &self, end_block: EndBlock, max_writers: u16, hash_reference: HashReference, dataset_kind: DatasetKind, - worker_id: Option, + worker_id: Option, ) -> Result { // Avoid re-scheduling jobs in a scheduled or running state. let existing_jobs = @@ -100,16 +100,31 @@ impl Scheduler { .await .map_err(ScheduleJobError::ListActiveWorkers)?; + // If a specific worker_id is provided, use it. + // If a glob pattern is provided, choose a random worker node that matches the pattern. + // If no worker_id or glob pattern is provided, choose a random worker node from the list of active workers. let node_id = match worker_id { - Some(worker_id) => { + Some(NodeSelector::Exact(worker_id)) => { let worker_id_ref = metadata_db::WorkerNodeId::from(&worker_id); if !candidates.contains(&worker_id_ref) { return Err(ScheduleJobError::WorkerNotAvailable(worker_id)); } worker_id_ref.to_owned() } + Some(NodeSelector::Glob(pattern)) => { + let matching: Vec<_> = candidates + .iter() + .filter(|c| pattern.matches_str(c.as_str())) + .collect(); + if matching.is_empty() { + return Err(ScheduleJobError::NoMatchingWorkers(pattern)); + } + let Some(node_id) = matching.choose(&mut rand::rng()).cloned() else { + return Err(ScheduleJobError::NoMatchingWorkers(pattern)); + }; + node_id.to_owned() + } None => { - // Randomly select from active workers let Some(node_id) = candidates.choose(&mut rand::rng()).cloned() else { return Err(ScheduleJobError::NoWorkersAvailable); }; @@ -265,7 +280,7 @@ impl SchedulerJobs for Scheduler { dataset_kind: DatasetKind, end_block: EndBlock, max_writers: u16, - worker_id: Option, + worker_id: Option, ) -> Result { self.schedule_dataset_sync_job_impl( end_block, diff --git a/crates/services/worker/src/node_id.rs b/crates/services/worker/src/node_id.rs index 5ffacdd02..303a14786 100644 --- a/crates/services/worker/src/node_id.rs +++ b/crates/services/worker/src/node_id.rs @@ -119,7 +119,7 @@ impl<'de> serde::Deserialize<'de> for NodeId { /// - Must start with a letter /// - Can only contain alphanumeric characters, underscores, hyphens, and dots /// - Must not be empty -fn validate_node_id(id: &str) -> Result<(), InvalidIdError> { +pub fn validate_node_id(id: &str) -> Result<(), InvalidIdError> { if id.is_empty() { return Err(InvalidIdError { id: id.to_string(), @@ -175,7 +175,7 @@ impl<'a> From<&'a NodeId> for metadata_db::WorkerNodeId<'a> { #[derive(Debug, thiserror::Error)] #[error("Invalid worker ID '{id}': {reason}")] pub struct InvalidIdError { - id: String, + pub id: String, #[source] - reason: Box, + pub reason: Box, } diff --git a/docs/openapi-specs/admin.spec.json b/docs/openapi-specs/admin.spec.json index 54616c1f0..27b9f5c24 100644 --- a/docs/openapi-specs/admin.spec.json +++ b/docs/openapi-specs/admin.spec.json @@ -1781,7 +1781,7 @@ "string", "null" ], - "description": "Optional worker ID to assign the job to\n\nIf specified, the job will be assigned to this specific worker.\nIf not specified, a worker will be selected randomly from available workers.\n\nThe worker must be active (has sent heartbeats recently) for the deployment to succeed." + "description": "Optional worker selector - either an exact worker ID or a glob pattern.\n\nExamples:\n- `\"worker-node-0\"` - assigns to the specific worker with ID \"worker-node-0\"\n- `\"worker-eth-*\"` - randomly selects from workers matching the pattern\n\nIf not specified, a worker will be selected randomly from all available workers.\nIf a glob pattern is provided, one matching worker will be randomly selected\nThe worker must be active (has sent heartbeats recently) for the deployment to succeed." } } }, diff --git a/tests/src/testlib/fixtures/ampctl.rs b/tests/src/testlib/fixtures/ampctl.rs index 9fa585474..838fb021e 100644 --- a/tests/src/testlib/fixtures/ampctl.rs +++ b/tests/src/testlib/fixtures/ampctl.rs @@ -4,12 +4,12 @@ //! and provider configurations with the Admin API in test environments. It uses //! the ampctl admin API client for direct programmatic access. +use ampctl::client::datasets::NodeSelector; use common::BoxError; use datasets_common::{hash::Hash, reference::Reference}; use dump::EndBlock; use serde_json::value::RawValue; use url::Url; -use worker::node_id::NodeId; /// ampctl fixture for registering dataset manifests and provider configurations. /// @@ -184,7 +184,7 @@ impl Ampctl { dataset_ref: &str, end_block: Option, parallelism: Option, - worker_id: Option, + worker_id: Option, ) -> Result { let reference: Reference = dataset_ref.parse().map_err(|err| { format!(