Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions crates/common/src/db/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,14 @@ pub async fn retire_expired_cron_jobs(
/// Build the SQL command pg_cron runs on each tick for a CRON job.
///
/// Materializes a QUEUED execution by joining `jobs` with `endpoints`. The
/// `cron_ends_at` guard is critical: without it pg_cron keeps inserting
/// executions every tick forever, ignoring the job's end window. Table names are
/// resolved through `tbl(prefix, ..)` and schema-qualified because pg_cron runs
/// this command outside any scoped connection (no search_path).
/// `cron_starts_at`/`cron_ends_at` guards bound the active window: pg_cron is
/// registered with only the cron expression and starts ticking immediately, so
/// without the `cron_starts_at` guard a job with a future `starts_at` would fire
/// on the next matching tick instead of waiting; without the `cron_ends_at`
/// guard pg_cron keeps inserting executions every tick forever, ignoring the
/// job's end window. Table names are resolved through `tbl(prefix, ..)` and
/// schema-qualified because pg_cron runs this command outside any scoped
/// connection (no search_path).
fn build_cron_command(prefix: &str, schema_name: &str, job_id: &str) -> String {
let te = tbl(prefix, "executions");
let tj = tbl(prefix, "jobs");
Expand All @@ -297,6 +301,7 @@ fn build_cron_command(prefix: &str, schema_name: &str, job_id: &str) -> String {
FROM \"{schema}\".\"{tj}\" j \
JOIN \"{schema}\".\"{tend}\" e ON e.name = j.endpoint \
WHERE j.job_id = '{job_id}' AND j.status = 'ACTIVE' \
AND (j.cron_starts_at IS NULL OR j.cron_starts_at <= now()) \
AND (j.cron_ends_at IS NULL OR j.cron_ends_at > now()) \
ON CONFLICT (job_id, idempotency_key) WHERE idempotency_key IS NOT NULL DO NOTHING",
schema = schema_name,
Expand Down Expand Up @@ -411,6 +416,17 @@ mod tests {
);
}

#[test]
fn cron_command_enforces_starts_at_window() {
let cmd = build_cron_command("", "ws_acme", "job-123");
// pg_cron starts ticking as soon as the job is registered, so the
// command itself must hold back materialization until starts_at.
assert!(
cmd.contains("j.cron_starts_at IS NULL OR j.cron_starts_at <= now()"),
"pg_cron command must guard on cron_starts_at, got: {cmd}"
);
}

#[test]
fn cron_command_targets_correct_schema_and_job() {
let cmd = build_cron_command("", "ws_acme", "job-123");
Expand Down