Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 31 additions & 31 deletions crates/api/src/handlers/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ pub async fn create(
AppError::InvalidCron("No upcoming run for this cron schedule".into())
})?;

let mut conn =
kronos_common::db::scoped::scoped_connection(&state.pool, &ws.0.schema_name)
let mut tx =
kronos_common::db::scoped::scoped_transaction(&state.pool, &ws.0.schema_name)
.await
.map_err(AppError::from)?;

let job = db::jobs::create_cron(
&mut *conn,
&mut *tx,
&body.endpoint,
&ep.endpoint_type,
body.input.as_ref(),
Expand All @@ -202,17 +202,19 @@ pub async fn create(
)
.await?;

// Register with pg_cron for automatic execution materialization
if let Err(e) = db::jobs::register_pg_cron(
&state.pool,
// Register with pg_cron for automatic execution materialization.
// Run it on the same tx as the row write so the persisted job and
// the pg_cron schedule commit (or roll back) together. If pg_cron is
// unhealthy the whole create fails and the client can retry.
db::jobs::register_pg_cron_conn(
&mut *tx,
&ws.0.schema_name,
&job.job_id,
cron_expr.as_str(),
)
.await
{
tracing::error!(job_id = %job.job_id, "Failed to register pg_cron job: {}", e);
}
.await?;

tx.commit().await.map_err(AppError::from)?;

metrics::counter!(m::JOBS_CREATED_TOTAL,
"trigger_type" => "CRON",
Expand Down Expand Up @@ -353,22 +355,19 @@ pub async fn update(

let created = db::jobs::retire_and_replace(&mut *tx, &job_id, &new_job).await?;

tx.commit().await.map_err(AppError::from)?;

// Unschedule old pg_cron job and register new one
if let Err(e) = db::jobs::unregister_pg_cron(&state.pool, &ws.0.schema_name, &job_id).await {
tracing::error!(job_id = %job_id, "Failed to unregister old pg_cron job: {}", e);
}
if let Err(e) = db::jobs::register_pg_cron(
&state.pool,
// Unschedule the old pg_cron job and register the new one on the same tx as
// the version flip, so the retire/replace and the schedule swap commit (or
// roll back) atomically.
db::jobs::unregister_pg_cron_conn(&mut *tx, &ws.0.schema_name, &job_id).await?;
db::jobs::register_pg_cron_conn(
&mut *tx,
&ws.0.schema_name,
&created.job_id,
cron_expr.as_str(),
)
.await
{
tracing::error!(job_id = %created.job_id, "Failed to register new pg_cron job: {}", e);
}
.await?;

tx.commit().await.map_err(AppError::from)?;

Ok(HttpResponse::Ok().json(serde_json::json!({ "data": {
"job_id": created.job_id,
Expand All @@ -392,11 +391,11 @@ pub async fn cancel(
ws: Workspace,
path: web::Path<String>,
) -> Result<HttpResponse, AppError> {
let mut conn = kronos_common::db::scoped::scoped_connection(&state.pool, &ws.0.schema_name)
let mut tx = kronos_common::db::scoped::scoped_transaction(&state.pool, &ws.0.schema_name)
.await
.map_err(AppError::from)?;
let job_id = path.into_inner();
let job = db::jobs::get(&mut *conn, &job_id)
let job = db::jobs::get(&mut *tx, &job_id)
.await?
.ok_or_else(|| AppError::JobNotFound(job_id.clone()))?;

Expand All @@ -405,21 +404,22 @@ pub async fn cancel(
}

if job.trigger_type != "CRON" {
db::executions::cancel_pending_for_job(&mut *conn, &job_id).await?;
db::executions::cancel_pending_for_job(&mut *tx, &job_id).await?;
}

let cancelled = db::jobs::cancel(&mut *conn, &job_id)
let cancelled = db::jobs::cancel(&mut *tx, &job_id)
.await?
.ok_or_else(|| AppError::Conflict("Job could not be cancelled".into()))?;

// Unregister from pg_cron if this was a CRON job
// Unregister from pg_cron if this was a CRON job. Run it on the same tx as
// the status flip so the cancel and the pg_cron unschedule commit (or roll
// back) atomically — no more RETIRED rows left with a live pg_cron entry.
if job.trigger_type == "CRON" {
if let Err(e) = db::jobs::unregister_pg_cron(&state.pool, &ws.0.schema_name, &job_id).await
{
tracing::error!(job_id = %job_id, "Failed to unregister pg_cron job: {}", e);
}
db::jobs::unregister_pg_cron_conn(&mut *tx, &ws.0.schema_name, &job_id).await?;
}

tx.commit().await.map_err(AppError::from)?;

Ok(HttpResponse::Ok().json(serde_json::json!({ "data": job_summary(&cancelled) })))
}

Expand Down
41 changes: 33 additions & 8 deletions crates/common/src/db/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,14 @@ pub async fn advance_cron_tick(
Ok(result.rows_affected() > 0)
}

/// Register a CRON job with pg_cron. The pg_cron job will directly INSERT
/// execution rows when the cron schedule fires.
pub async fn register_pg_cron(
pool: &PgPool,
/// Register a CRON job with pg_cron on an existing connection or transaction.
/// Running this on the same tx that writes the `jobs` row keeps the persisted
/// row and the pg_cron schedule atomic: if either fails, both roll back.
///
/// The pg_cron job will directly INSERT execution rows when the cron schedule
/// fires.
pub async fn register_pg_cron_conn(
conn: &mut PgConnection,
schema_name: &str,
job_id: &str,
cron_expression: &str,
Expand Down Expand Up @@ -291,24 +295,45 @@ pub async fn register_pg_cron(
.bind(&cron_job_name)
.bind(cron_expression)
.bind(&command)
.execute(pool)
.execute(&mut *conn)
.await?;

Ok(())
}

/// Unregister a CRON job from pg_cron.
pub async fn unregister_pg_cron(
/// Register a CRON job with pg_cron using a connection acquired from the pool.
pub async fn register_pg_cron(
pool: &PgPool,
schema_name: &str,
job_id: &str,
cron_expression: &str,
) -> Result<(), sqlx::Error> {
let mut conn = pool.acquire().await?;
register_pg_cron_conn(&mut conn, schema_name, job_id, cron_expression).await
}

/// Unregister a CRON job from pg_cron on an existing connection or transaction.
pub async fn unregister_pg_cron_conn(
conn: &mut PgConnection,
schema_name: &str,
job_id: &str,
) -> Result<(), sqlx::Error> {
let cron_job_name = format!("kronos_{}_{}", schema_name, job_id);

sqlx::query("SELECT cron.unschedule($1)")
.bind(&cron_job_name)
.execute(pool)
.execute(&mut *conn)
.await?;

Ok(())
}
Comment on lines +315 to 329

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

pg_cron unschedule function behavior when job name not found

💡 Result:

When you call the pg_cron function cron.unschedule(job_name text) and the specified job name cannot be found for the current user, the function raises a PostgreSQL error [1][2]. Specifically, the implementation of cron_unschedule_named in the pg_cron source code performs a lookup in the cron.job table filtering by both the job name and the current user's identity [2]. If no tuple is found matching these criteria, the function executes an ereport with the error message: "could not find valid entry for job '[job_name]'" [2]. This behavior means that: - You will receive an error rather than a boolean false return value [1][2]. - If you are logged in as a different user than the one who created the job, the function will fail with this "not found" error even if the job name exists in the table, because it cannot locate a matching entry for your current user [3][1]. - To avoid this error, you must ensure the job name exists and is owned by the current database role executing the command [1][2]. Alternatively, you can use the cron.unschedule(jobid bigint) function if you have the job ID, as it may have different permission handling characteristics [3][1].

Citations:


Make unregister_pg_cron_conn idempotent for missing pg_cron jobs

cron.unschedule($1) with the job name raises a PostgreSQL error (e.g., “could not find valid entry for job …”) when no matching entry exists for the current user, so unregister_pg_cron_conn (crates/common/src/db/jobs.rs, lines 315-329) can return Err(sqlx::Error) instead of succeeding. Adjust to either pre-check cron.job for that name/ownership (and only then call cron.unschedule), or catch/translate the specific “not found” error to Ok(()) (or use cron.unschedule(jobid bigint) if you can supply the job id).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/common/src/db/jobs.rs` around lines 315 - 329, The
unregister_pg_cron_conn function can return an error if cron.unschedule($1) is
called for a non-existent job; update unregister_pg_cron_conn to be idempotent
by either 1) querying cron.job for a row matching the generated cron_job_name
(and ownership) and only calling cron.unschedule when found, or 2) executing
cron.unschedule($1) inside a match that maps the specific “could not find valid
entry for job” Postgres/sqlx error to Ok(()) while propagating other errors;
locate the cron_job_name construction and the sqlx::query("SELECT
cron.unschedule($1)") call to implement one of these two fixes.


/// Unregister a CRON job from pg_cron using a connection acquired from the pool.
pub async fn unregister_pg_cron(
pool: &PgPool,
schema_name: &str,
job_id: &str,
) -> Result<(), sqlx::Error> {
let mut conn = pool.acquire().await?;
unregister_pg_cron_conn(&mut conn, schema_name, job_id).await
}