Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/bin/ampctl/src/cmd/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
pub mod inspect;
pub mod list;
pub mod prune;
pub mod resume;
pub mod rm;
pub mod stop;

Expand All @@ -23,6 +24,10 @@ pub enum Commands {
#[command(after_help = include_str!("job/stop__after_help.md"))]
Stop(stop::Args),

/// Resume a stopped job
#[command(after_help = include_str!("job/resume__after_help.md"))]
Resume(resume::Args),

/// Remove job(s) by identifier or status filter
#[command(alias = "remove")]
#[command(after_help = include_str!("job/rm__after_help.md"))]
Expand All @@ -39,6 +44,7 @@ pub async fn run(command: Commands) -> anyhow::Result<()> {
Commands::List(args) => list::run(args).await?,
Commands::Inspect(args) => inspect::run(args).await?,
Commands::Stop(args) => stop::run(args).await?,
Commands::Resume(args) => resume::run(args).await?,
Commands::Rm(args) => rm::run(args).await?,
Commands::Prune(args) => prune::run(args).await?,
}
Expand Down
98 changes: 98 additions & 0 deletions crates/bin/ampctl/src/cmd/job/resume.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//! Job resume command.
//!
//! Resumes a stopped job through the admin API by:
//! 1. Creating a client for the admin API
//! 2. Using the client's job resume method
//! 3. Displaying success message
//!
//! # Configuration
//!
//! - Admin URL: `--admin-url` flag or `AMP_ADMIN_URL` env var (default: `http://localhost:1610`)
//! - Logging: `AMP_LOG` env var (`error`, `warn`, `info`, `debug`, `trace`)

use monitoring::logging;
use worker::job::JobId;

use crate::args::GlobalArgs;

/// Command-line arguments for the `jobs resume` command.
#[derive(Debug, clap::Args)]
pub struct Args {
#[command(flatten)]
pub global: GlobalArgs,

/// The job ID to resume
pub id: JobId,
}

/// Result of a job resume operation.
#[derive(serde::Serialize)]
struct ResumeResult {
job_id: JobId,
}

impl std::fmt::Display for ResumeResult {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
writeln!(
f,
"{} Job {} resume requested",
console::style("✓").green().bold(),
self.job_id
)
}
}

/// Resume a job by requesting it to resume via the admin API.
///
/// # Errors
///
/// Returns [`Error`] for API errors (400/404/409/500) or network failures.
#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, job_id = %id))]
pub async fn run(Args { global, id }: Args) -> Result<(), Error> {
let client = global.build_client().map_err(Error::ClientBuildError)?;

tracing::debug!("Resuming job via admin API");

client.jobs().resume(&id).await.map_err(|err| {
tracing::error!(error = %err, error_source = logging::error_source(&err), "Failed to resume job");
match err {
crate::client::jobs::ResumeError::NotFound(_) => Error::JobNotFound { id },
_ => Error::ResumeJobError(err),
}
})?;
let result = ResumeResult { job_id: id };
global.print(&result).map_err(Error::JsonSerialization)?;

Ok(())
}

/// Errors for job resume operations.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Failed to build client
#[error("failed to build admin API client")]
ClientBuildError(#[source] crate::args::BuildClientError),

/// Job not found
///
/// This occurs when the job ID is valid but no job
/// record exists with that ID in the metadata database.
#[error("job not found: {id}")]
JobNotFound { id: JobId },

/// Error resuming job via admin API
///
/// This occurs when the resume request fails due to:
/// - Invalid job ID format
/// - Network or connection errors
/// - Metadata database errors
///
/// Note: The resume operation is idempotent - resuming a job that's already
/// in a Scheduled or Running state returns success.
#[error("failed to resume job")]
ResumeJobError(#[source] crate::client::jobs::ResumeError),

/// Failed to serialize result to JSON
#[error("failed to serialize result to JSON")]
JsonSerialization(#[source] serde_json::Error),
}
13 changes: 13 additions & 0 deletions crates/bin/ampctl/src/cmd/job/resume__after_help.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
## Examples

Resume a job:
$ ampctl job resume 12345

With custom admin URL:
$ ampctl job resume 12345 --admin-url http://prod-server:1610

## Notes

Resuming a job is a request, not immediate start. The worker node will
attempt to finish current operations cleanly. Check job status with
`ampctl job inspect <id>` to confirm it resumed.
130 changes: 130 additions & 0 deletions crates/clients/admin/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ fn job_stop(id: &JobId) -> String {
format!("jobs/{id}/stop")
}

/// Build URL path for resuming a job.
///
/// PUT `/jobs/{id}/resume`
fn job_resume(id: &JobId) -> String {
format!("jobs/{id}/resume")
}

/// Build URL path for deleting a job by ID.
///
/// DELETE `/jobs/{id}`
Expand Down Expand Up @@ -229,6 +236,88 @@ impl<'a> JobsClient<'a> {
}
}

/// Resume a stopped job by ID.
///
/// PUTs to `/jobs/{id}/resume` endpoint.
///
/// This operation is idempotent - resuming a job that's already in a scheduled or running state returns success (200).
///
/// # Errors
///
/// Returns [`StopError`] for network errors, API errors (400/404/500),
/// or unexpected responses.
#[tracing::instrument(skip(self), fields(job_id = %id))]
pub async fn resume(&self, id: &JobId) -> Result<(), ResumeError> {
let url = self
.client
.base_url()
.join(&job_resume(id))
.expect("valid URL");

tracing::debug!("Sending PUT request to resume job");

let response = self
.client
.http()
.put(url.as_str())
.send()
.await
.map_err(|err| ResumeError::Network {
url: url.to_string(),
source: err,
})?;

let status = response.status();
tracing::debug!(status = %status, "Received API response");

match status.as_u16() {
200 => {
tracing::debug!("Job stop request processed successfully");
Ok(())
}
400 | 404 | 500 => {
let text = response.text().await.map_err(|err| {
tracing::error!(status = %status, error = %err, error_source = logging::error_source(&err), "Failed to read error response");
ResumeError::UnexpectedResponse {
status: status.as_u16(),
message: format!("Failed to read error response: {}", err),
}
})?;

let error_response: ErrorResponse = serde_json::from_str(&text).map_err(|err| {
tracing::error!(status = %status, error = %err, error_source = logging::error_source(&err), "Failed to parse error response");
ResumeError::UnexpectedResponse {
status: status.as_u16(),
message: text.clone(),
}
})?;

match error_response.error_code.as_str() {
"INVALID_JOB_ID" => Err(ResumeError::InvalidJobId(error_response.into())),
"JOB_NOT_FOUND" => Err(ResumeError::NotFound(error_response.into())),
"STOP_JOB_ERROR" => Err(ResumeError::ResumeJobError(error_response.into())),
"UNEXPECTED_STATE_CONFLICT" => {
Err(ResumeError::UnexpectedStateConflict(error_response.into()))
}
_ => Err(ResumeError::UnexpectedResponse {
status: status.as_u16(),
message: text,
}),
}
}
_ => {
let text = response
.text()
.await
.unwrap_or_else(|_| String::from("Failed to read response body"));
Err(ResumeError::UnexpectedResponse {
status: status.as_u16(),
message: text,
})
}
}
}

/// Delete a job by ID.
///
/// DELETEs to `/jobs/{id}` endpoint.
Expand Down Expand Up @@ -681,6 +770,47 @@ pub enum StopError {
UnexpectedResponse { status: u16, message: String },
}

/// Errors that can occur when resuming a job.
#[derive(Debug, thiserror::Error)]
pub enum ResumeError {
/// The job ID in the URL path is invalid (400, INVALID_JOB_ID)
///
/// This occurs when the ID cannot be parsed as a valid JobId.
#[error("invalid job ID")]
InvalidJobId(#[source] ApiError),

/// Job not found (404, JOB_NOT_FOUND)
///
/// This occurs when the job ID is valid but no job
/// record exists with that ID in the metadata database.
#[error("job not found")]
NotFound(#[source] ApiError),

/// Database error during stop operation (500, STOP_JOB_ERROR)
///
/// This occurs when:
/// - Database connection fails or is lost during the transaction
/// - Transaction conflicts or deadlocks occur
/// - Database constraint violations are encountered
#[error("failed to stop job")]
ResumeJobError(#[source] ApiError),

/// Unexpected state conflict during stop operation (500, UNEXPECTED_STATE_CONFLICT)
///
/// This indicates an internal inconsistency in the state machine.
/// It should not occur under normal operation and indicates a bug.
#[error("unexpected state conflict")]
UnexpectedStateConflict(#[source] ApiError),

/// Network or connection error
#[error("network error connecting to {url}")]
Network { url: String, source: reqwest::Error },

/// Unexpected response from API
#[error("unexpected response (status {status}): {message}")]
UnexpectedResponse { status: u16, message: String },
}

/// Errors that can occur when deleting a job by ID.
#[derive(Debug, thiserror::Error)]
pub enum DeleteByIdError {
Expand Down
38 changes: 38 additions & 0 deletions crates/core/metadata-db/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,44 @@ where
}
}

/// Update job status to Scheduled
///
/// This function will only update the job status if it's currently in a valid state
/// to be resumed (Stopped). If the job is already scheduled, this is
/// considered success (idempotent behavior). If the job is in a running or scheduled state, this returns a conflict error.
///
/// Returns an error if the job doesn't exist, is in a terminal state, or if there's a database error.
///
/// **Note:** This function does not send notifications. The caller is responsible for
/// calling `send_job_notification` after successful status update if worker notification
/// is required.
#[tracing::instrument(skip(exe), err)]
pub async fn request_resume<'c, E>(
exe: E,
job_id: impl Into<JobId> + std::fmt::Debug,
) -> Result<(), Error>
where
E: Executor<'c>,
{
// Try to update job status
match sql::update_status_if_any_state(
exe,
job_id.into(),
&[JobStatus::Stopped],
JobStatus::Scheduled,
)
.await
{
Ok(()) => Ok(()),
// Check if the job is already scheduled (idempotent behavior)
Err(JobStatusUpdateError::StateConflict {
actual: JobStatus::Scheduled,
..
}) => Ok(()),
Err(err) => Err(Error::JobStatusUpdate(err)),
}
}

/// List jobs with cursor-based pagination support, optionally filtered by status
///
/// Uses cursor-based pagination where `last_job_id` is the ID of the last job
Expand Down
1 change: 1 addition & 0 deletions crates/services/admin-api/src/handlers/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ pub mod delete_by_id;
pub mod get_all;
pub mod get_by_id;
pub mod job_info;
pub mod resume;
pub mod stop;
Loading