Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions crates/bin/ampctl/src/cmd/dataset/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
//! - End block: `--end-block` flag (optional) - "latest", block number, or negative offset
//! - Logging: `AMP_LOG` env var (`error`, `warn`, `info`, `debug`, `trace`)

use admin_client::datasets::NodeSelector;
use datasets_common::reference::Reference;
use dump::EndBlock;
use worker::{job::JobId, node_id::NodeId};
use worker::job::JobId;

use crate::args::GlobalArgs;

Expand Down Expand Up @@ -62,8 +63,8 @@ pub struct Args {
/// If not specified, a worker will be selected randomly from available workers.
///
/// The worker must be active (has sent heartbeats recently) for the deployment to succeed.
#[arg(long, value_parser = clap::value_parser!(NodeId))]
pub worker_id: Option<NodeId>,
#[arg(long, value_parser = clap::value_parser!(NodeSelector))]
pub worker_id: Option<NodeSelector>,
}

/// Result of a dataset deployment operation.
Expand Down Expand Up @@ -125,7 +126,7 @@ async fn deploy_dataset(
dataset_ref: &Reference,
end_block: Option<EndBlock>,
parallelism: u16,
worker_id: Option<NodeId>,
worker_id: Option<NodeSelector>,
) -> Result<JobId, Error> {
let client = global.build_client().map_err(Error::ClientBuild)?;
let job_id = client
Expand Down
103 changes: 100 additions & 3 deletions crates/clients/admin/src/datasets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use datasets_common::{
use dump::EndBlock;
use monitoring::logging;
use serde_json::value::RawValue;
use worker::{job::JobId, node_id::NodeId};
use worker::job::JobId;

use super::{
Client,
Expand Down Expand Up @@ -242,7 +242,7 @@ impl<'a> DatasetsClient<'a> {
dataset_ref: &Reference,
end_block: Option<EndBlock>,
parallelism: u16,
worker_id: Option<NodeId>,
worker_id: Option<NodeSelector>,
) -> Result<JobId, DeployError> {
let namespace = dataset_ref.namespace();
let name = dataset_ref.name();
Expand Down Expand Up @@ -1105,7 +1105,7 @@ struct DeployRequest {
end_block: Option<EndBlock>,
parallelism: u16,
#[serde(skip_serializing_if = "Option::is_none")]
worker_id: Option<NodeId>,
worker_id: Option<NodeSelector>,
}

/// Input type for dataset registration manifest parameter.
Expand Down Expand Up @@ -1796,3 +1796,100 @@ pub enum ListJobsError {
#[error("unexpected response with status {status_code}")]
UnexpectedResponse { status_code: u16 },
}

use worker::node_id::{InvalidIdError, NodeId, validate_node_id};

/// A glob pattern for matching worker node IDs by prefix
///
/// Matches any node ID that starts with the pattern prefix.
/// Created by parsing a string ending with `*` (e.g., `"worker-eth-*"`).
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NodeIdGlob(String);

impl serde::Serialize for NodeIdGlob {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
format!("{}*", self.0).serialize(serializer)
}
}

impl std::str::FromStr for NodeIdGlob {
type Err = InvalidGlobError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let Some(prefix) = s.strip_suffix('*') else {
return Err(InvalidGlobError(s.to_string()));
};
validate_node_id(prefix)?;

Ok(NodeIdGlob(prefix.to_string()))
}
}

/// Error returned when a glob pattern is invalid.
#[derive(Debug, thiserror::Error)]
#[error("glob pattern must end with '*', got '{0}'")]
pub struct InvalidGlobError(String);

impl From<InvalidIdError> for InvalidGlobError {
fn from(e: InvalidIdError) -> Self {
InvalidGlobError(e.to_string())
}
}

/// Selector for targeting worker nodes by exact ID or glob pattern
///
/// Used to specify which worker(s) should handle a job deployment.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NodeSelector {
/// Match a specific worker by exact node ID
Exact(NodeId),
/// Match workers whose IDs start with the glob prefix
Glob(NodeIdGlob),
}

impl serde::Serialize for NodeSelector {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match self {
NodeSelector::Exact(node_id) => node_id.serialize(serializer),
NodeSelector::Glob(glob) => glob.serialize(serializer),
}
}
}

impl std::str::FromStr for NodeSelector {
type Err = NodeSelectorParseError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.ends_with('*') {
let glob = NodeIdGlob::from_str(s).map_err(NodeSelectorParseError::InvalidGlob)?;
Ok(NodeSelector::Glob(glob))
} else {
let node_id = s
.parse::<NodeId>()
.map_err(NodeSelectorParseError::InvalidNodeId)?;
Ok(NodeSelector::Exact(node_id))
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum NodeSelectorParseError {
/// The string is not a valid node ID
///
/// This occurs when the input doesn't end with `*` and fails node ID validation.
#[error("invalid node ID")]
InvalidNodeId(#[source] InvalidIdError),

/// The glob pattern prefix is invalid
///
/// This occurs when the input ends with `*` but the prefix portion
/// fails node ID validation rules.
#[error("invalid glob pattern")]
InvalidGlob(#[source] InvalidGlobError),
}
15 changes: 9 additions & 6 deletions crates/services/admin-api/src/handlers/datasets/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use axum::{
use dataset_store::DatasetKind;
use datasets_common::{name::Name, namespace::Namespace, reference::Reference, revision::Revision};
use monitoring::logging;
use worker::{job::JobId, node_id::NodeId};
use worker::job::JobId;

use crate::{
ctx::Ctx,
handlers::error::{ErrorResponse, IntoErrorResponse},
scheduler::ScheduleJobError,
scheduler::{NodeSelector, ScheduleJobError},
};

/// Handler for the `POST /datasets/{namespace}/{name}/versions/{revision}/deploy` endpoint
Expand Down Expand Up @@ -218,15 +218,18 @@ pub struct DeployRequest {
#[serde(default = "default_parallelism")]
pub parallelism: u16,

/// Optional worker ID to assign the job to
/// Optional worker selector - either an exact worker ID or a glob pattern.
///
/// If specified, the job will be assigned to this specific worker.
/// If not specified, a worker will be selected randomly from available workers.
/// Examples:
/// - `"worker-node-0"` - assigns to the specific worker with ID "worker-node-0"
/// - `"worker-eth-*"` - randomly selects from workers matching the pattern
///
/// If not specified, a worker will be selected randomly from all available workers.
/// If a glob pattern is provided, one matching worker will be randomly selected
/// The worker must be active (has sent heartbeats recently) for the deployment to succeed.
#[serde(default)]
#[cfg_attr(feature = "utoipa", schema(value_type = Option<String>))]
pub worker_id: Option<NodeId>,
pub worker_id: Option<NodeSelector>,
}

fn default_parallelism() -> u16 {
Expand Down
Loading