diff --git a/crates/common/src/db/jobs.rs b/crates/common/src/db/jobs.rs index cb61ad7..5f05149 100644 --- a/crates/common/src/db/jobs.rs +++ b/crates/common/src/db/jobs.rs @@ -279,10 +279,14 @@ pub async fn retire_expired_cron_jobs( /// Build the SQL command pg_cron runs on each tick for a CRON job. /// /// Materializes a QUEUED execution by joining `jobs` with `endpoints`. The -/// `cron_ends_at` guard is critical: without it pg_cron keeps inserting -/// executions every tick forever, ignoring the job's end window. Table names are -/// resolved through `tbl(prefix, ..)` and schema-qualified because pg_cron runs -/// this command outside any scoped connection (no search_path). +/// `cron_starts_at`/`cron_ends_at` guards bound the active window: pg_cron is +/// registered with only the cron expression and starts ticking immediately, so +/// without the `cron_starts_at` guard a job with a future `starts_at` would fire +/// on the next matching tick instead of waiting; without the `cron_ends_at` +/// guard pg_cron keeps inserting executions every tick forever, ignoring the +/// job's end window. Table names are resolved through `tbl(prefix, ..)` and +/// schema-qualified because pg_cron runs this command outside any scoped +/// connection (no search_path). fn build_cron_command(prefix: &str, schema_name: &str, job_id: &str) -> String { let te = tbl(prefix, "executions"); let tj = tbl(prefix, "jobs"); @@ -297,6 +301,7 @@ fn build_cron_command(prefix: &str, schema_name: &str, job_id: &str) -> String { FROM \"{schema}\".\"{tj}\" j \ JOIN \"{schema}\".\"{tend}\" e ON e.name = j.endpoint \ WHERE j.job_id = '{job_id}' AND j.status = 'ACTIVE' \ + AND (j.cron_starts_at IS NULL OR j.cron_starts_at <= now()) \ AND (j.cron_ends_at IS NULL OR j.cron_ends_at > now()) \ ON CONFLICT (job_id, idempotency_key) WHERE idempotency_key IS NOT NULL DO NOTHING", schema = schema_name, @@ -411,6 +416,17 @@ mod tests { ); } + #[test] + fn cron_command_enforces_starts_at_window() { + let cmd = build_cron_command("", "ws_acme", "job-123"); + // pg_cron starts ticking as soon as the job is registered, so the + // command itself must hold back materialization until starts_at. + assert!( + cmd.contains("j.cron_starts_at IS NULL OR j.cron_starts_at <= now()"), + "pg_cron command must guard on cron_starts_at, got: {cmd}" + ); + } + #[test] fn cron_command_targets_correct_schema_and_job() { let cmd = build_cron_command("", "ws_acme", "job-123");