diff --git a/crates/api/src/handlers/jobs.rs b/crates/api/src/handlers/jobs.rs index 5589a75..b245a08 100644 --- a/crates/api/src/handlers/jobs.rs +++ b/crates/api/src/handlers/jobs.rs @@ -184,13 +184,13 @@ pub async fn create( AppError::InvalidCron("No upcoming run for this cron schedule".into()) })?; - let mut conn = - kronos_common::db::scoped::scoped_connection(&state.pool, &ws.0.schema_name) + let mut tx = + kronos_common::db::scoped::scoped_transaction(&state.pool, &ws.0.schema_name) .await .map_err(AppError::from)?; let job = db::jobs::create_cron( - &mut *conn, + &mut *tx, &body.endpoint, &ep.endpoint_type, body.input.as_ref(), @@ -202,17 +202,19 @@ pub async fn create( ) .await?; - // Register with pg_cron for automatic execution materialization - if let Err(e) = db::jobs::register_pg_cron( - &state.pool, + // Register with pg_cron for automatic execution materialization. + // Run it on the same tx as the row write so the persisted job and + // the pg_cron schedule commit (or roll back) together. If pg_cron is + // unhealthy the whole create fails and the client can retry. + db::jobs::register_pg_cron_conn( + &mut *tx, &ws.0.schema_name, &job.job_id, cron_expr.as_str(), ) - .await - { - tracing::error!(job_id = %job.job_id, "Failed to register pg_cron job: {}", e); - } + .await?; + + tx.commit().await.map_err(AppError::from)?; metrics::counter!(m::JOBS_CREATED_TOTAL, "trigger_type" => "CRON", @@ -353,22 +355,19 @@ pub async fn update( let created = db::jobs::retire_and_replace(&mut *tx, &job_id, &new_job).await?; - tx.commit().await.map_err(AppError::from)?; - - // Unschedule old pg_cron job and register new one - if let Err(e) = db::jobs::unregister_pg_cron(&state.pool, &ws.0.schema_name, &job_id).await { - tracing::error!(job_id = %job_id, "Failed to unregister old pg_cron job: {}", e); - } - if let Err(e) = db::jobs::register_pg_cron( - &state.pool, + // Unschedule the old pg_cron job and register the new one on the same tx as + // the version flip, so the retire/replace and the schedule swap commit (or + // roll back) atomically. + db::jobs::unregister_pg_cron_conn(&mut *tx, &ws.0.schema_name, &job_id).await?; + db::jobs::register_pg_cron_conn( + &mut *tx, &ws.0.schema_name, &created.job_id, cron_expr.as_str(), ) - .await - { - tracing::error!(job_id = %created.job_id, "Failed to register new pg_cron job: {}", e); - } + .await?; + + tx.commit().await.map_err(AppError::from)?; Ok(HttpResponse::Ok().json(serde_json::json!({ "data": { "job_id": created.job_id, @@ -392,11 +391,11 @@ pub async fn cancel( ws: Workspace, path: web::Path, ) -> Result { - let mut conn = kronos_common::db::scoped::scoped_connection(&state.pool, &ws.0.schema_name) + let mut tx = kronos_common::db::scoped::scoped_transaction(&state.pool, &ws.0.schema_name) .await .map_err(AppError::from)?; let job_id = path.into_inner(); - let job = db::jobs::get(&mut *conn, &job_id) + let job = db::jobs::get(&mut *tx, &job_id) .await? .ok_or_else(|| AppError::JobNotFound(job_id.clone()))?; @@ -405,21 +404,22 @@ pub async fn cancel( } if job.trigger_type != "CRON" { - db::executions::cancel_pending_for_job(&mut *conn, &job_id).await?; + db::executions::cancel_pending_for_job(&mut *tx, &job_id).await?; } - let cancelled = db::jobs::cancel(&mut *conn, &job_id) + let cancelled = db::jobs::cancel(&mut *tx, &job_id) .await? .ok_or_else(|| AppError::Conflict("Job could not be cancelled".into()))?; - // Unregister from pg_cron if this was a CRON job + // Unregister from pg_cron if this was a CRON job. Run it on the same tx as + // the status flip so the cancel and the pg_cron unschedule commit (or roll + // back) atomically — no more RETIRED rows left with a live pg_cron entry. if job.trigger_type == "CRON" { - if let Err(e) = db::jobs::unregister_pg_cron(&state.pool, &ws.0.schema_name, &job_id).await - { - tracing::error!(job_id = %job_id, "Failed to unregister pg_cron job: {}", e); - } + db::jobs::unregister_pg_cron_conn(&mut *tx, &ws.0.schema_name, &job_id).await?; } + tx.commit().await.map_err(AppError::from)?; + Ok(HttpResponse::Ok().json(serde_json::json!({ "data": job_summary(&cancelled) }))) } diff --git a/crates/common/src/db/jobs.rs b/crates/common/src/db/jobs.rs index 5137013..90ee65a 100644 --- a/crates/common/src/db/jobs.rs +++ b/crates/common/src/db/jobs.rs @@ -260,10 +260,14 @@ pub async fn advance_cron_tick( Ok(result.rows_affected() > 0) } -/// Register a CRON job with pg_cron. The pg_cron job will directly INSERT -/// execution rows when the cron schedule fires. -pub async fn register_pg_cron( - pool: &PgPool, +/// Register a CRON job with pg_cron on an existing connection or transaction. +/// Running this on the same tx that writes the `jobs` row keeps the persisted +/// row and the pg_cron schedule atomic: if either fails, both roll back. +/// +/// The pg_cron job will directly INSERT execution rows when the cron schedule +/// fires. +pub async fn register_pg_cron_conn( + conn: &mut PgConnection, schema_name: &str, job_id: &str, cron_expression: &str, @@ -291,24 +295,45 @@ pub async fn register_pg_cron( .bind(&cron_job_name) .bind(cron_expression) .bind(&command) - .execute(pool) + .execute(&mut *conn) .await?; Ok(()) } -/// Unregister a CRON job from pg_cron. -pub async fn unregister_pg_cron( +/// Register a CRON job with pg_cron using a connection acquired from the pool. +pub async fn register_pg_cron( pool: &PgPool, schema_name: &str, job_id: &str, + cron_expression: &str, +) -> Result<(), sqlx::Error> { + let mut conn = pool.acquire().await?; + register_pg_cron_conn(&mut conn, schema_name, job_id, cron_expression).await +} + +/// Unregister a CRON job from pg_cron on an existing connection or transaction. +pub async fn unregister_pg_cron_conn( + conn: &mut PgConnection, + schema_name: &str, + job_id: &str, ) -> Result<(), sqlx::Error> { let cron_job_name = format!("kronos_{}_{}", schema_name, job_id); sqlx::query("SELECT cron.unschedule($1)") .bind(&cron_job_name) - .execute(pool) + .execute(&mut *conn) .await?; Ok(()) } + +/// Unregister a CRON job from pg_cron using a connection acquired from the pool. +pub async fn unregister_pg_cron( + pool: &PgPool, + schema_name: &str, + job_id: &str, +) -> Result<(), sqlx::Error> { + let mut conn = pool.acquire().await?; + unregister_pg_cron_conn(&mut conn, schema_name, job_id).await +}