From f65e1c1f608e6707312218b25794b158f1444a28 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 6 Jun 2026 13:34:16 +0000 Subject: [PATCH] fix(api): make public CRON job paths transactional with pg_cron The public CRON-job handlers wrote the jobs row, then called register/unregister_pg_cron against a fresh pool connection after the row write had committed, with the pg_cron error logged and swallowed. The API returned 2xx even when the pg_cron op failed, leaving the persisted jobs row out of sync with what pg_cron actually scheduled. Add connection-scoped variants register_pg_cron_conn / unregister_pg_cron_conn that run on the caller's connection or transaction, and have the pool-based variants delegate to them. Rework the three affected handler sites so the row write and the pg_cron op share one transaction and commit (or roll back) atomically, propagating pg_cron failures as AppError instead of swallowing them: - create (CRON): switch from scoped_connection to scoped_transaction; register on the tx and commit only after it succeeds. - update: move unregister + register inside the retire_and_replace tx. - cancel: wrap the cancel + unregister in a transaction. This is a deliberate API contract change: POST/PATCH/DELETE on CRON jobs can now fail with 5xx when pg_cron is unhealthy, and the client can retry against a consistent state. Closes #32 https://claude.ai/code/session_012cTjd515ScehwT6Dvasexj --- crates/api/src/handlers/jobs.rs | 62 ++++++++++++++++----------------- crates/common/src/db/jobs.rs | 41 +++++++++++++++++----- 2 files changed, 64 insertions(+), 39 deletions(-) diff --git a/crates/api/src/handlers/jobs.rs b/crates/api/src/handlers/jobs.rs index 5589a75..b245a08 100644 --- a/crates/api/src/handlers/jobs.rs +++ b/crates/api/src/handlers/jobs.rs @@ -184,13 +184,13 @@ pub async fn create( AppError::InvalidCron("No upcoming run for this cron schedule".into()) })?; - let mut conn = - kronos_common::db::scoped::scoped_connection(&state.pool, &ws.0.schema_name) + let mut tx = + kronos_common::db::scoped::scoped_transaction(&state.pool, &ws.0.schema_name) .await .map_err(AppError::from)?; let job = db::jobs::create_cron( - &mut *conn, + &mut *tx, &body.endpoint, &ep.endpoint_type, body.input.as_ref(), @@ -202,17 +202,19 @@ pub async fn create( ) .await?; - // Register with pg_cron for automatic execution materialization - if let Err(e) = db::jobs::register_pg_cron( - &state.pool, + // Register with pg_cron for automatic execution materialization. + // Run it on the same tx as the row write so the persisted job and + // the pg_cron schedule commit (or roll back) together. If pg_cron is + // unhealthy the whole create fails and the client can retry. + db::jobs::register_pg_cron_conn( + &mut *tx, &ws.0.schema_name, &job.job_id, cron_expr.as_str(), ) - .await - { - tracing::error!(job_id = %job.job_id, "Failed to register pg_cron job: {}", e); - } + .await?; + + tx.commit().await.map_err(AppError::from)?; metrics::counter!(m::JOBS_CREATED_TOTAL, "trigger_type" => "CRON", @@ -353,22 +355,19 @@ pub async fn update( let created = db::jobs::retire_and_replace(&mut *tx, &job_id, &new_job).await?; - tx.commit().await.map_err(AppError::from)?; - - // Unschedule old pg_cron job and register new one - if let Err(e) = db::jobs::unregister_pg_cron(&state.pool, &ws.0.schema_name, &job_id).await { - tracing::error!(job_id = %job_id, "Failed to unregister old pg_cron job: {}", e); - } - if let Err(e) = db::jobs::register_pg_cron( - &state.pool, + // Unschedule the old pg_cron job and register the new one on the same tx as + // the version flip, so the retire/replace and the schedule swap commit (or + // roll back) atomically. + db::jobs::unregister_pg_cron_conn(&mut *tx, &ws.0.schema_name, &job_id).await?; + db::jobs::register_pg_cron_conn( + &mut *tx, &ws.0.schema_name, &created.job_id, cron_expr.as_str(), ) - .await - { - tracing::error!(job_id = %created.job_id, "Failed to register new pg_cron job: {}", e); - } + .await?; + + tx.commit().await.map_err(AppError::from)?; Ok(HttpResponse::Ok().json(serde_json::json!({ "data": { "job_id": created.job_id, @@ -392,11 +391,11 @@ pub async fn cancel( ws: Workspace, path: web::Path, ) -> Result { - let mut conn = kronos_common::db::scoped::scoped_connection(&state.pool, &ws.0.schema_name) + let mut tx = kronos_common::db::scoped::scoped_transaction(&state.pool, &ws.0.schema_name) .await .map_err(AppError::from)?; let job_id = path.into_inner(); - let job = db::jobs::get(&mut *conn, &job_id) + let job = db::jobs::get(&mut *tx, &job_id) .await? .ok_or_else(|| AppError::JobNotFound(job_id.clone()))?; @@ -405,21 +404,22 @@ pub async fn cancel( } if job.trigger_type != "CRON" { - db::executions::cancel_pending_for_job(&mut *conn, &job_id).await?; + db::executions::cancel_pending_for_job(&mut *tx, &job_id).await?; } - let cancelled = db::jobs::cancel(&mut *conn, &job_id) + let cancelled = db::jobs::cancel(&mut *tx, &job_id) .await? .ok_or_else(|| AppError::Conflict("Job could not be cancelled".into()))?; - // Unregister from pg_cron if this was a CRON job + // Unregister from pg_cron if this was a CRON job. Run it on the same tx as + // the status flip so the cancel and the pg_cron unschedule commit (or roll + // back) atomically — no more RETIRED rows left with a live pg_cron entry. if job.trigger_type == "CRON" { - if let Err(e) = db::jobs::unregister_pg_cron(&state.pool, &ws.0.schema_name, &job_id).await - { - tracing::error!(job_id = %job_id, "Failed to unregister pg_cron job: {}", e); - } + db::jobs::unregister_pg_cron_conn(&mut *tx, &ws.0.schema_name, &job_id).await?; } + tx.commit().await.map_err(AppError::from)?; + Ok(HttpResponse::Ok().json(serde_json::json!({ "data": job_summary(&cancelled) }))) } diff --git a/crates/common/src/db/jobs.rs b/crates/common/src/db/jobs.rs index 5137013..90ee65a 100644 --- a/crates/common/src/db/jobs.rs +++ b/crates/common/src/db/jobs.rs @@ -260,10 +260,14 @@ pub async fn advance_cron_tick( Ok(result.rows_affected() > 0) } -/// Register a CRON job with pg_cron. The pg_cron job will directly INSERT -/// execution rows when the cron schedule fires. -pub async fn register_pg_cron( - pool: &PgPool, +/// Register a CRON job with pg_cron on an existing connection or transaction. +/// Running this on the same tx that writes the `jobs` row keeps the persisted +/// row and the pg_cron schedule atomic: if either fails, both roll back. +/// +/// The pg_cron job will directly INSERT execution rows when the cron schedule +/// fires. +pub async fn register_pg_cron_conn( + conn: &mut PgConnection, schema_name: &str, job_id: &str, cron_expression: &str, @@ -291,24 +295,45 @@ pub async fn register_pg_cron( .bind(&cron_job_name) .bind(cron_expression) .bind(&command) - .execute(pool) + .execute(&mut *conn) .await?; Ok(()) } -/// Unregister a CRON job from pg_cron. -pub async fn unregister_pg_cron( +/// Register a CRON job with pg_cron using a connection acquired from the pool. +pub async fn register_pg_cron( pool: &PgPool, schema_name: &str, job_id: &str, + cron_expression: &str, +) -> Result<(), sqlx::Error> { + let mut conn = pool.acquire().await?; + register_pg_cron_conn(&mut conn, schema_name, job_id, cron_expression).await +} + +/// Unregister a CRON job from pg_cron on an existing connection or transaction. +pub async fn unregister_pg_cron_conn( + conn: &mut PgConnection, + schema_name: &str, + job_id: &str, ) -> Result<(), sqlx::Error> { let cron_job_name = format!("kronos_{}_{}", schema_name, job_id); sqlx::query("SELECT cron.unschedule($1)") .bind(&cron_job_name) - .execute(pool) + .execute(&mut *conn) .await?; Ok(()) } + +/// Unregister a CRON job from pg_cron using a connection acquired from the pool. +pub async fn unregister_pg_cron( + pool: &PgPool, + schema_name: &str, + job_id: &str, +) -> Result<(), sqlx::Error> { + let mut conn = pool.acquire().await?; + unregister_pg_cron_conn(&mut conn, schema_name, job_id).await +}