From 25e437f150cdf5c4c39069371dc93825ca1e9d6d Mon Sep 17 00:00:00 2001 From: "F." Date: Tue, 14 Apr 2026 21:36:40 +0200 Subject: [PATCH] feat(cron): add Shutdown for graceful drain and harden scheduler lifecycle Introduce Cron.Shutdown(ctx) as a companion to Stop: it stops future scheduling and waits for in-flight jobs to finish without cancelling their contexts. Stop retains its existing behaviour of cancelling job contexts before draining. Internally, replace the single (rootCtx, rootCancel, jobWaiter, loopDone) tuple with a runLifecycle struct that holds independent scheduler and job context pairs. This makes it straightforward for Stop and Shutdown to target different cancellation scopes and allows completed runs to be garbage-collected once all jobs have returned. Additional hardening: - @every rejects non-positive durations (zero and negative) and rounds sub-second intervals up to one second, both with explicit errors. - Malformed TZ=/CRON_TZ= prefixes (empty location name or missing schedule body) now return parse errors instead of panicking. - WithLocation, WithParser, WithLogger, and WithClock treat nil as "keep the package default" rather than leaving the scheduler in an invalid state. Tests, examples, and documentation (README, MIGRATION, CHANGELOG, doc.go) are updated to cover all new behaviour. Co-Authored-By: Oz --- CHANGELOG.md | 10 ++ MIGRATION.md | 19 +++- README.md | 38 +++++++- cron.go | 247 +++++++++++++++++++++++++++++++++++------------- cron_test.go | 242 ++++++++++++++++++++++++++++++++++++++++++++--- doc.go | 35 ++++++- example_test.go | 36 +++++++ option.go | 55 +++++++---- option_test.go | 28 ++++++ parser.go | 49 +++++++--- parser_test.go | 6 ++ 11 files changed, 649 insertions(+), 116 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c96134..afbbab7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `EventHooks` struct with `OnJobStart` and `OnJobComplete` callbacks, set via `WithEventHooks` option. - `ErrorFunc` type and `WithOnError` option for error-only callbacks. +- `Shutdown(ctx context.Context) error` for graceful scheduler draining without + cancelling contexts already handed to running jobs. - `example_test.go` with runnable examples for pkg.go.dev. - `context.Context` threading throughout the public API: - `Job.Run(ctx context.Context) error` — jobs receive a cancellable context @@ -33,6 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `ErrAlreadyRunning` if a scheduler is already active. - `Stop(ctx context.Context) error` — cancels the scheduler, waits for in-flight jobs bounded by `ctx`. + - `Shutdown(ctx context.Context) error` — stops future scheduling and waits + for in-flight jobs without cancelling their contexts. - `Clock` interface (`Clock`, `Timer`) and `WithClock` option for deterministic testing without `time.Sleep`. - `ErrAlreadyRunning` sentinel error returned by `Run` when called twice. @@ -50,6 +54,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Entry.WrappedJob` is now unexported (`wrappedJob`). - `FuncJob` signature: `func(context.Context) error` (was `func()`). - Minimum Go version: **1.26**. +- `@every` rejects non-positive durations while still rounding accepted + sub-second intervals to one second. +- Malformed `TZ=` / `CRON_TZ=` prefixes now return parse errors instead of + panicking. +- Passing nil to `WithLocation`, `WithParser`, `WithLogger`, or `WithClock` + preserves the package defaults. ### Removed diff --git a/MIGRATION.md b/MIGRATION.md index 3993e67..1fc3595 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -75,6 +75,15 @@ Returns `cron.ErrAlreadyRunning` if a scheduler is already active. `Stop` cancels the scheduler and waits for in-flight jobs, bounded by `ctx`. It returns `ctx.Err()` if the deadline elapses before all jobs finish. +### Shutdown (graceful drain) + +If you want to stop scheduling new work without cancelling contexts already +handed to running jobs, use: + +```go +err := c.Shutdown(ctx) +``` + ## 4. Logger — `log/slog` replaces custom interface The custom `Logger` interface, `PrintfLogger`, and `VerbosePrintfLogger` are @@ -131,6 +140,13 @@ if errors.Is(err, cron.ErrPanic) { } ``` +`Recover` is not enabled by default. To preserve v3-style panic recovery, +install it explicitly: + +```go +cron.New(cron.WithChain(cron.Recover(logger))) +``` + ## 9. Removed symbols | Removed | Replacement | @@ -149,7 +165,8 @@ if errors.Is(err, cron.ErrPanic) { 1. Update import path to `github.com/hyp3rd/cron/v4`. 2. Add `context.Context` parameter and `error` return to all `Job` implementations and `FuncJob` / `AddFunc` closures. -3. Pass a `context.Context` to `Start`, `Run`, and `Stop`. +3. Pass a `context.Context` to `Start`, `Run`, and `Stop`, and use + `Shutdown(ctx)` when you need graceful draining instead of cancellation. 4. Replace `Logger`/`PrintfLogger`/`VerbosePrintfLogger` with `*slog.Logger`. 5. Rename `NewParser` calls to `NewSpecParser`. 6. Replace `Entry.WrappedJob` usage with `Entry.Job`. diff --git a/README.md b/README.md index 8d56ce4..bf5b8ed 100644 --- a/README.md +++ b/README.md @@ -43,10 +43,10 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() - c.Start(ctx) + c.Start(context.Background()) <-ctx.Done() - c.Stop(context.Background()) + c.Shutdown(context.Background()) } ``` @@ -54,14 +54,17 @@ func main() { - **Standard 5-field cron expressions** (minute, hour, dom, month, dow) plus optional seconds via `WithSeconds()`. -- **`context.Context` throughout** — `Start`, `Run`, `Stop`, and every `Job` - receive a context for cancellation and deadlines. +- **Context-aware lifecycle** — `Start`, `Run`, `Stop`, `Shutdown`, and every + `Job` participate in cancellation and deadlines. - **`log/slog` logging** — structured, leveled logging out of the box. Default level is `slog.LevelWarn` to keep the scheduler quiet. - **`Clock` interface** — inject a fake clock via `WithClock` for deterministic, zero-`time.Sleep` tests. - **Job wrappers** — `Recover`, `SkipIfStillRunning`, `DelayIfStillRunning`, `Timeout`, `MaxConcurrent`, `RetryOnError`, and custom `JobWrapper` chains. +- **Defensive configuration** — malformed `TZ=` / `CRON_TZ=` prefixes and + invalid `@every` intervals return parse errors; nil `With*` options keep + defaults. - **Named entries** — `AddNamedFunc` / `AddNamedJob` attach human-readable labels for logging and observability. - **Event hooks** — `WithEventHooks` for `OnJobStart` / `OnJobComplete` @@ -98,6 +101,10 @@ func main() { @every 1h30m ``` +`@every` accepts positive `time.ParseDuration` values. Durations smaller than a +second still round up to 1 second; `@every 0s` and negative durations are +rejected as configuration errors. + ### Time zones ```go @@ -106,8 +113,26 @@ cron.New(cron.WithLocation(time.UTC)) c.AddFunc("CRON_TZ=Asia/Tokyo 0 6 * * ?", myJob) ``` +Malformed timezone prefixes such as `CRON_TZ=` or `CRON_TZ=UTC` without a +schedule body return parse errors instead of panicking. + +## Lifecycle + +Use `Start(ctx)` for a background scheduler and `Run(ctx)` for a blocking one. +The context passed to `Start` or `Run` is also the parent context for every job. + +- `Stop(ctx)` cancels the scheduler and cancels contexts already handed to + running jobs before waiting for them to return. +- `Shutdown(ctx)` stops future scheduling and waits for running jobs to finish + without cancelling their contexts. +- If the `Start` / `Run` context is cancelled directly, both the scheduler and + job contexts are cancelled. + ## Job wrappers / Chain +Panic recovery is available but not enabled by default. Install `Recover` +explicitly if you want panics turned into logged `ErrPanic` errors: + ```go c := cron.New(cron.WithChain( cron.Recover(logger), @@ -186,6 +211,11 @@ c := cron.New(cron.WithClock(fakeClock)) See `clock.go` for the interface definition. +## Option defaults + +Passing `nil` to `WithLocation`, `WithParser`, `WithLogger`, or `WithClock` +keeps the package default instead of leaving the scheduler in an invalid state. + ## Migration from robfig/cron/v3 See [MIGRATION.md](MIGRATION.md) for a step-by-step upgrade guide. diff --git a/cron.go b/cron.go index 251de37..7819f8b 100644 --- a/cron.go +++ b/cron.go @@ -31,14 +31,23 @@ type Cron struct { parser Parser clock Clock nextID EntryID - jobWaiter sync.WaitGroup - rootCtx context.Context //nolint:containedctx // stored to propagate cancellation to in-flight jobs - rootCancel context.CancelFunc - loopDone chan struct{} + currentRun *runLifecycle + runs []*runLifecycle onError ErrorFunc hooks EventHooks } +type runLifecycle struct { + schedulerCtx context.Context //nolint:containedctx // stored to coordinate graceful and forced shutdown across a scheduler run + schedulerCancel context.CancelFunc + jobCtx context.Context //nolint:containedctx // stored to propagate + // cancellation semantics to in-flight jobs for a scheduler run + jobCancel context.CancelFunc + loopDone chan struct{} + jobsDone chan struct{} + activeJobs int +} + // Parser turns a cron spec string into a [Schedule]. The default // implementation is [SpecParser]; callers can supply their own by passing // [WithParser]. @@ -48,8 +57,9 @@ type Parser interface { // Job is the unit of work scheduled by [Cron]. Implementations should honor // the provided context: when ctx is cancelled the job is expected to return -// promptly. A non-nil error is logged by the scheduler but does not stop -// future executions. +// promptly. Job contexts are cancelled when the parent [Cron.Start] / [Cron.Run] +// context is cancelled or when [Cron.Stop] is called. A non-nil error is logged +// by the scheduler but does not stop future executions. type Job interface { Run(ctx context.Context) error } @@ -195,7 +205,7 @@ func (c *Cron) Entries() []Entry { } snapCh := c.snapshot - loopDone := c.loopDone + loopDone := c.currentRun.loopDone c.runningMu.Unlock() replyChan := make(chan []Entry, 1) @@ -239,7 +249,7 @@ func (c *Cron) Remove(id EntryID) { } removeCh := c.remove - loopDone := c.loopDone + loopDone := c.currentRun.loopDone c.runningMu.Unlock() select { @@ -252,8 +262,8 @@ func (c *Cron) Remove(id EntryID) { } // Start launches the scheduler in its own goroutine bound to ctx. When ctx is -// cancelled the scheduler exits; any jobs already in flight are allowed to -// finish. Calling Start on an already-running scheduler is a no-op. +// cancelled the scheduler exits and running job contexts are cancelled. Calling +// Start on an already-running scheduler is a no-op. func (c *Cron) Start(ctx context.Context) { c.runningMu.Lock() defer c.runningMu.Unlock() @@ -262,12 +272,11 @@ func (c *Cron) Start(ctx context.Context) { return } - loopCtx := c.enterRunning(ctx) - loopDone := c.loopDone + run := c.enterRunning(ctx) go func() { - c.schedulerLoop(loopCtx) - c.markStopped(loopDone) + c.schedulerLoop(run) + c.markStopped(run) }() } @@ -283,50 +292,60 @@ func (c *Cron) Run(ctx context.Context) error { return ErrAlreadyRunning } - loopCtx := c.enterRunning(ctx) - loopDone := c.loopDone + run := c.enterRunning(ctx) c.runningMu.Unlock() - c.schedulerLoop(loopCtx) - c.markStopped(loopDone) + c.schedulerLoop(run) + c.markStopped(run) return nil } -// Stop cancels the running scheduler and waits for in-flight jobs to finish, -// bounded by the provided context. It returns ctx.Err() if the context is -// cancelled before all jobs complete. Calling Stop on a scheduler that is not -// running is a no-op and returns nil. +// Stop cancels the running scheduler and the contexts already handed to +// in-flight jobs, then waits for those jobs to finish, bounded by the provided +// context. It returns ctx.Err() if the context is cancelled before all jobs +// complete. Calling Stop on a scheduler that is not running is a no-op and +// returns nil. func (c *Cron) Stop(ctx context.Context) error { + return c.stopRuns(ctx, "stop", true) +} + +// Shutdown stops the scheduler and waits for in-flight jobs to finish without +// cancelling their contexts. The wait is bounded by the provided context. It +// returns ctx.Err() if the context is cancelled before all jobs complete. +func (c *Cron) Shutdown(ctx context.Context) error { + return c.stopRuns(ctx, "shutdown", false) +} + +func (c *Cron) stopRuns(ctx context.Context, op string, cancelJobs bool) error { c.runningMu.Lock() - if c.rootCancel != nil { - c.rootCancel() - } + c.cleanupCompletedRunsLocked() - loopDone := c.loopDone - c.runningMu.Unlock() + runs := append([]*runLifecycle(nil), c.runs...) + for _, run := range runs { + run.schedulerCancel() - if loopDone != nil { - select { - case <-loopDone: - case <-ctx.Done(): - return fmt.Errorf("cron: stop: %w", ctx.Err()) + if cancelJobs { + run.jobCancel() } } + c.runningMu.Unlock() - done := make(chan struct{}) - - go func() { - c.jobWaiter.Wait() - close(done) - }() + for _, run := range runs { + err := waitForRunChannel(ctx, run.loopDone) + if err != nil { + return fmt.Errorf("cron: %s: %w", op, err) + } + } - select { - case <-done: - return nil - case <-ctx.Done(): - return fmt.Errorf("cron: stop: %w", ctx.Err()) + for _, run := range runs { + err := c.waitForRunJobs(ctx, run) + if err != nil { + return fmt.Errorf("cron: %s: %w", op, err) + } } + + return nil } func (c *Cron) parseAndSchedule(name, spec string, cmd Job) (EntryID, error) { @@ -359,7 +378,7 @@ func (c *Cron) scheduleEntry(name string, sched Schedule, cmd Job) EntryID { } addCh := c.add - loopDone := c.loopDone + loopDone := c.currentRun.loopDone c.runningMu.Unlock() select { @@ -374,26 +393,35 @@ func (c *Cron) scheduleEntry(name string, sched Schedule, cmd Job) EntryID { return entry.ID } -// enterRunning must be called with runningMu held. It sets up the root context -// derived from the caller's ctx and marks the scheduler as running. -func (c *Cron) enterRunning(ctx context.Context) context.Context { +// enterRunning must be called with runningMu held. It sets up independent +// scheduler and job contexts derived from the caller's ctx and marks the +// scheduler as running. +func (c *Cron) enterRunning(ctx context.Context) *runLifecycle { + c.cleanupCompletedRunsLocked() + + run := &runLifecycle{ + loopDone: make(chan struct{}), + } + run.schedulerCtx, run.schedulerCancel = context.WithCancel(ctx) //nolint:gosec // G118: cancellation is retained for Stop/Shutdown + run.jobCtx, run.jobCancel = context.WithCancel(ctx) //nolint:gosec // G118: cancellation is retained for Stop/Shutdown + c.running.Store(true) - c.rootCtx, c.rootCancel = context.WithCancel(ctx) //nolint:gosec // G118: rootCancel is stored and called by Stop - c.loopDone = make(chan struct{}) + c.currentRun = run + c.runs = append(c.runs, run) - return c.rootCtx + return run } // markStopped clears the running flag once the scheduler loop has exited. It // must not acquire runningMu: Schedule/Remove/Entries may hold it while // waiting on loopDone. -func (c *Cron) markStopped(loopDone chan struct{}) { +func (c *Cron) markStopped(run *runLifecycle) { c.running.Store(false) - close(loopDone) + close(run.loopDone) } // schedulerLoop runs the scheduler until ctx is cancelled. -func (c *Cron) schedulerLoop(ctx context.Context) { +func (c *Cron) schedulerLoop(run *runLifecycle) { c.logger.Info("start") now := c.initializeEntries() @@ -403,7 +431,7 @@ func (c *Cron) schedulerLoop(ctx context.Context) { var shouldStop bool - now, shouldStop = c.processSchedulerEvent(ctx, now) + now, shouldStop = c.processSchedulerEvent(run, now) if shouldStop { return } @@ -456,19 +484,19 @@ func (c *Cron) initializeEntries() time.Time { return now } -func (c *Cron) processSchedulerEvent(ctx context.Context, now time.Time) (time.Time, bool) { +func (c *Cron) processSchedulerEvent(run *runLifecycle, now time.Time) (time.Time, bool) { timer := c.newSchedulerTimer(now) defer timer.Stop() for { select { case firedAt := <-timer.C(): - return c.handleTimerFired(ctx, firedAt), false + return c.handleTimerFired(run, firedAt), false case newEntry := <-c.add: return c.handleEntryAdded(newEntry), false case replyChan := <-c.snapshot: replyChan <- c.entrySnapshot() - case <-ctx.Done(): + case <-run.schedulerCtx.Done(): c.logger.Info("stop") return now, true @@ -488,21 +516,21 @@ func (c *Cron) newSchedulerTimer(now time.Time) Timer { return c.clock.NewTimer(c.entries[0].Next.Sub(now)) } -func (c *Cron) handleTimerFired(ctx context.Context, firedAt time.Time) time.Time { +func (c *Cron) handleTimerFired(run *runLifecycle, firedAt time.Time) time.Time { now := firedAt.In(c.location) c.logger.Info("wake", "now", now) - c.runDueEntries(ctx, now) + c.runDueEntries(run, now) return now } -func (c *Cron) runDueEntries(ctx context.Context, now time.Time) { +func (c *Cron) runDueEntries(run *runLifecycle, now time.Time) { for _, entry := range c.entries { if entry.Next.After(now) || entry.Next.IsZero() { break } - c.startJob(ctx, entry) + c.startJob(run, entry) entry.Prev = entry.Next entry.Next = entry.Schedule.Next(now) c.logger.Info("run", "now", now, "entry", entry.ID, "name", entry.Name, "next", entry.Next) @@ -530,10 +558,37 @@ func (c *Cron) handleEntryRemoved(id EntryID) time.Time { // configured [EventHooks] and [ErrorFunc]. Non-nil errors are logged at Warn // level and do not affect future executions. Hook panics are recovered and // logged so that observability callbacks cannot crash the scheduler. -func (c *Cron) startJob(ctx context.Context, entry *Entry) { - c.jobWaiter.Go(func() { - c.executeJob(ctx, entry) - }) +func (c *Cron) startJob(run *runLifecycle, entry *Entry) { + c.runningMu.Lock() + if run.activeJobs == 0 { + run.jobsDone = make(chan struct{}) + } + + run.activeJobs++ + c.runningMu.Unlock() + + go func() { + defer c.finishJob(run) + + c.executeJob(run.jobCtx, entry) + }() +} + +func (c *Cron) finishJob(run *runLifecycle) { + c.runningMu.Lock() + defer c.runningMu.Unlock() + + if run.activeJobs == 0 { + return + } + + run.activeJobs-- + if run.activeJobs == 0 && run.jobsDone != nil { + close(run.jobsDone) + run.jobsDone = nil + } + + c.cleanupCompletedRunsLocked() } func (c *Cron) executeJob(ctx context.Context, entry *Entry) { @@ -593,3 +648,65 @@ func (c *Cron) entrySnapshot() []Entry { func (c *Cron) removeEntry(id EntryID) { c.entries = slices.DeleteFunc(c.entries, func(e *Entry) bool { return e.ID == id }) } + +func (c *Cron) waitForRunJobs(ctx context.Context, run *runLifecycle) error { + c.runningMu.Lock() + if run.activeJobs == 0 { + c.cleanupCompletedRunsLocked() + c.runningMu.Unlock() + + return nil + } + + jobsDone := run.jobsDone + c.runningMu.Unlock() + + return waitForRunChannel(ctx, jobsDone) +} + +func (c *Cron) cleanupCompletedRunsLocked() { + if len(c.runs) == 0 { + return + } + + filtered := c.runs[:0] + for _, run := range c.runs { + if run.activeJobs == 0 && isClosed(run.loopDone) { + if c.currentRun == run { + c.currentRun = nil + } + + continue + } + + filtered = append(filtered, run) + } + + c.runs = filtered +} + +func waitForRunChannel(ctx context.Context, done <-chan struct{}) error { + if done == nil { + return nil + } + + select { + case <-done: + return nil + case <-ctx.Done(): + return fmt.Errorf("context done: %w", ctx.Err()) + } +} + +func isClosed(done <-chan struct{}) bool { + if done == nil { + return false + } + + select { + case <-done: + return true + default: + return false + } +} diff --git a/cron_test.go b/cron_test.go index 6db24a3..6fb069c 100644 --- a/cron_test.go +++ b/cron_test.go @@ -117,6 +117,16 @@ func newFakeWithSeconds() (*Cron, *fakeClock) { return New(WithParser(testParserWithSeconds()), WithChain(), WithClock(fc)), fc } +func newQuietFakeWithSeconds() (*Cron, *fakeClock) { + fc := newFakeClock(baseTime) + + return New( + WithParser(testParserWithSeconds()), + WithClock(fc), + WithLogger(DiscardLogger()), + ), fc +} + func TestFuncPanicRecovery(t *testing.T) { t.Parallel() @@ -216,6 +226,33 @@ func TestStopCausesJobsToNotRun(t *testing.T) { } } +func TestShutdownCausesJobsToNotRun(t *testing.T) { + t.Parallel() + + var calls atomic.Int64 + + cron, fc := newFakeWithSeconds() + cron.Start(context.Background()) + + err := cron.Shutdown(context.Background()) + if err != nil { + t.Fatalf("shutdown: %v", err) + } + + mustAddFunc(t, cron, everySecondSpec, func(_ context.Context) error { + calls.Add(1) + + return nil + }) + + fc.Advance(2 * time.Second) + time.Sleep(10 * time.Millisecond) + + if c := calls.Load(); c != 0 { + t.Fatalf("expected no job runs after shutdown, got %d", c) + } +} + // Add a job, start cron, expect it runs. func TestAddBeforeRunning(t *testing.T) { t.Parallel() @@ -516,6 +553,115 @@ func TestStopWithoutStart(t *testing.T) { } } +func TestShutdownWithoutStart(t *testing.T) { + t.Parallel() + + cron := New() + + err := cron.Shutdown(context.Background()) + if err != nil { + t.Errorf("unexpected shutdown error: %v", err) + } +} + +func TestStopCancelsRunningJobContext(t *testing.T) { + t.Parallel() + + started := make(chan struct{}, 1) + canceled := make(chan struct{}, 1) + + cron, fc := newQuietFakeWithSeconds() + + mustAddFunc(t, cron, everySecondSpec, func(ctx context.Context) error { + signalJobStarted(started) + <-ctx.Done() + signalJobStarted(canceled) + + return ctx.Err() + }) + + cron.Start(context.Background()) + fc.BlockUntilTimers(1) + fc.Advance(1 * time.Second) + waitForJobStarted(t, started) + + err := cron.Stop(context.Background()) + if err != nil { + t.Fatalf("stop: %v", err) + } + + waitForJobStarted(t, canceled) +} + +func TestShutdownLetsRunningJobsFinish(t *testing.T) { + t.Parallel() + + started := make(chan struct{}, 1) + finished := make(chan struct{}, 1) + allowFinish := make(chan struct{}) + + var ( + calls atomic.Int64 + canceled atomic.Bool + ) + + cron, fc := newQuietFakeWithSeconds() + + mustAddFunc(t, cron, everySecondSpec, func(ctx context.Context) error { + calls.Add(1) + signalJobStarted(started) + + select { + case <-allowFinish: + signalJobStarted(finished) + + return nil + case <-ctx.Done(): + canceled.Store(true) + signalJobStarted(finished) + + return ctx.Err() + } + }) + + cron.Start(context.Background()) + fc.BlockUntilTimers(1) + fc.Advance(1 * time.Second) + waitForJobStarted(t, started) + + shutdownDone := make(chan error, 1) + + go func() { + shutdownDone <- cron.Shutdown(context.Background()) + }() + + expectErrorChannelPending(t, shutdownDone, 20*time.Millisecond, "expected Shutdown to wait for the running job") + + close(allowFinish) + + err := waitForErrorResult(t, shutdownDone, awaitTimeout, "shutdown") + if err != nil { + t.Fatalf("shutdown: %v", err) + } + + waitForJobStarted(t, finished) + + if canceled.Load() { + t.Fatal("expected Shutdown not to cancel the running job context") + } + + if calls.Load() != 1 { + t.Fatalf("expected exactly one run before shutdown, got %d", calls.Load()) + } + + fc.Advance(2 * time.Second) + time.Sleep(10 * time.Millisecond) + + if calls.Load() != 1 { + t.Fatalf("expected no future runs after shutdown, got %d", calls.Load()) + } +} + type testJob struct { wg *sync.WaitGroup name string @@ -811,6 +957,14 @@ func TestStopAndWait(t *testing.T) { t.Run("a couple fast jobs and a slow job added, waits for slow job", testStopAndWaitSlowJob) } +func TestShutdownAndWait(t *testing.T) { + t.Parallel() + + t.Run("nothing running, returns immediately", testShutdownAndWaitNothingRunning) + t.Run("repeated calls to Shutdown", testShutdownAndWaitRepeatedCalls) + t.Run("a slow job blocks until it completes", testShutdownAndWaitSlowJob) +} + func TestMultiThreadedStartAndStop(t *testing.T) { t.Parallel() @@ -860,6 +1014,18 @@ func testStopAndWaitNothingRunning(t *testing.T) { } } +func testShutdownAndWaitNothingRunning(t *testing.T) { + t.Parallel() + + cron, _ := newFakeWithSeconds() + cron.Start(context.Background()) + + err := cron.Shutdown(context.Background()) + if err != nil { + t.Errorf("unexpected shutdown error: %v", err) + } +} + func testStopAndWaitRepeatedCalls(t *testing.T) { t.Parallel() @@ -876,6 +1042,22 @@ func testStopAndWaitRepeatedCalls(t *testing.T) { } } +func testShutdownAndWaitRepeatedCalls(t *testing.T) { + t.Parallel() + + cron, _ := newFakeWithSeconds() + cron.Start(context.Background()) + + _ = cron.Shutdown(context.Background()) //nolint:errcheck // first shutdown + + time.Sleep(time.Millisecond) + + err := cron.Shutdown(context.Background()) + if err != nil { + t.Errorf("unexpected shutdown error: %v", err) + } +} + func testStopAndWaitFastJobs(t *testing.T) { t.Parallel() @@ -902,38 +1084,74 @@ func testStopAndWaitFastJobs(t *testing.T) { func testStopAndWaitSlowJob(t *testing.T) { t.Parallel() + runSlowJobWaitTest(t, "Stop", func(cronInstance *Cron, ctx context.Context) error { + return cronInstance.Stop(ctx) + }) +} + +func testShutdownAndWaitSlowJob(t *testing.T) { + t.Parallel() + + runSlowJobWaitTest(t, "Shutdown", func(cronInstance *Cron, ctx context.Context) error { + return cronInstance.Shutdown(ctx) + }) +} + +func runSlowJobWaitTest(t *testing.T, operation string, halt func(*Cron, context.Context) error) { + t.Helper() + slowJobStarted := make(chan struct{}, 1) - cron := newWithSeconds() - mustAddFunc(t, cron, everySecondWithSeconds, noop) - cron.Start(context.Background()) - mustAddFunc(t, cron, everySecondWithSeconds, func(_ context.Context) error { + cronInstance := newWithSeconds() + mustAddFunc(t, cronInstance, everySecondWithSeconds, noop) + cronInstance.Start(context.Background()) + mustAddFunc(t, cronInstance, everySecondWithSeconds, func(_ context.Context) error { signalJobStarted(slowJobStarted) time.Sleep(slowStopJobDelay) return nil }) - mustAddFunc(t, cron, everySecondWithSeconds, noop) + mustAddFunc(t, cronInstance, everySecondWithSeconds, noop) waitForJobStarted(t, slowJobStarted) - // A short deadline should trip because the slow job is still running. shortCtx, cancelShort := context.WithTimeout(context.Background(), waitForStopCheck) - - err := cron.Stop(shortCtx) + err := halt(cronInstance, shortCtx) cancelShort() if err == nil { - t.Error("expected Stop to time out while slow job was running") + t.Errorf("expected %s to time out while slow job was running", operation) } - // A longer deadline should succeed once the slow job wraps up. longCtx, cancelLong := context.WithTimeout(context.Background(), waitForStopCompletion) defer cancelLong() - err = cron.Stop(longCtx) + err = halt(cronInstance, longCtx) if err != nil { - t.Errorf("expected Stop to succeed, got %v", err) + t.Errorf("expected %s to succeed, got %v", operation, err) + } +} + +func expectErrorChannelPending(t *testing.T, done <-chan error, timeout time.Duration, message string) { + t.Helper() + + select { + case err := <-done: + t.Fatalf("%s, got %v", message, err) + case <-time.After(timeout): + } +} + +func waitForErrorResult(t *testing.T, done <-chan error, timeout time.Duration, label string) error { + t.Helper() + + select { + case err := <-done: + return err + case <-time.After(timeout): + t.Fatalf("%s did not complete in time", label) + + return nil } } diff --git a/doc.go b/doc.go index 27dda91..0912e38 100644 --- a/doc.go +++ b/doc.go @@ -16,8 +16,9 @@ It requires Go 1.26 or later. # Usage Callers may register Funcs to be invoked on a given schedule. Cron will run -them in their own goroutines. All jobs receive a [context.Context] that is -cancelled when the scheduler is stopped. +them in their own goroutines. All jobs receive a [context.Context] derived from +the [Cron.Start] / [Cron.Run] context. That job context is cancelled when the +parent context is cancelled or when [Cron.Stop] is called. c := cron.New() c.AddFunc("30 * * * *", func(ctx context.Context) error { @@ -41,7 +42,20 @@ cancelled when the scheduler is stopped. // Inspect the cron job entries' next and previous run times. inspect(c.Entries()) .. - c.Stop(ctx) // Stop the scheduler and wait for in-flight jobs. + c.Stop(ctx) // Cancel running jobs, then wait. + c.Shutdown(ctx) // Graceful drain without cancelling running jobs. + +# Lifecycle + +Cron exposes two shutdown modes: + + - [Cron.Stop] cancels the scheduler and the contexts already handed to running + jobs, then waits for those jobs to return. + - [Cron.Shutdown] stops future scheduling and waits for running jobs to + finish without cancelling their contexts. + +If the context passed to [Cron.Start] or [Cron.Run] is cancelled directly, both +the scheduler and job contexts are cancelled. # CRON Expression Format @@ -138,6 +152,9 @@ where "duration" is a string accepted by time.ParseDuration For example, "@every 1h30m10s" would indicate a schedule that activates after 1 hour, 30 minutes, 10 seconds, and then every interval after that. +Intervals must be greater than zero. Sub-second intervals are accepted and +rounded up to one second. + Note: The interval does not take the job runtime into account. For example, if a job takes 3 minutes to run, and it is scheduled to run every 5 minutes, it will have only 2 minutes of idle time between each run. @@ -169,6 +186,8 @@ For example: The prefix "TZ=(TIME ZONE)" is also supported for legacy compatibility. +Malformed timezone prefixes return parse errors rather than panicking. + Be aware that jobs scheduled during daylight-savings leap-ahead transitions will not be run! @@ -178,11 +197,14 @@ A Cron runner may be configured with a chain of job wrappers to add cross-cutting functionality to all submitted jobs. For example, they may be used to achieve the following effects: - - Recover any panics from jobs (activated by default) + - Recover panics from jobs by installing [Recover] - Delay a job's execution if the previous run hasn't completed yet - Skip a job's execution if the previous run hasn't completed yet - Log each job's invocations +Panic recovery is not enabled by default. To preserve scheduler availability +when jobs may panic, install [Recover] explicitly via [WithChain] or [NewChain]. + Install wrappers for all jobs added to a cron using the [WithChain] option: cron.New(cron.WithChain( @@ -255,6 +277,11 @@ custom [*slog.Logger] via [WithLogger] to control log level and destination: slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}), ))) +# Option defaults + +Passing nil to [WithLocation], [WithParser], [WithLogger], or [WithClock] keeps +the package default instead of leaving the scheduler in an invalid state. + # Implementation Cron entries are stored in an array, sorted by their next activation time. Cron diff --git a/example_test.go b/example_test.go index 50f8826..9ebe658 100644 --- a/example_test.go +++ b/example_test.go @@ -67,6 +67,28 @@ func ExampleCron_AddNamedFunc() { // heartbeat } +func ExampleCron_Shutdown() { + cronInstance := cron.New(cron.WithSeconds()) + done := make(chan struct{}) + + cronInstance.AddFunc(exampleEverySecond, func(_ context.Context) error { + fmt.Println("tick") + close(done) + + return nil + }) + + cronInstance.Start(context.Background()) + <-done + + cronInstance.Shutdown(context.Background()) + fmt.Println("graceful stop") + + // Output: + // tick + // graceful stop +} + func ExampleNextN() { sched, _ := cron.ParseStandard(exampleEveryHour) @@ -116,6 +138,20 @@ func ExampleTimeout() { // done } +func ExampleRecover() { + job := cron.NewChain(cron.Recover(cron.DiscardLogger())).Then( + cron.FuncJob(func(_ context.Context) error { + panic("boom") + }), + ) + + err := job.Run(context.Background()) + fmt.Println(errors.Is(err, cron.ErrPanic)) + + // Output: + // true +} + func ExampleMaxConcurrent() { logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) diff --git a/option.go b/option.go index 874c861..f6cc6ef 100644 --- a/option.go +++ b/option.go @@ -8,10 +8,17 @@ import ( // Option represents a modification to the default behavior of a Cron. type Option func(*Cron) -// WithLocation overrides the timezone of the cron instance. +// WithLocation overrides the timezone of the cron instance. Passing nil keeps +// the default [time.Local]. func WithLocation(loc *time.Location) Option { - return func(c *Cron) { - c.location = loc + return func(cronInstance *Cron) { + if loc == nil { + cronInstance.location = time.Local + + return + } + + cronInstance.location = loc } } @@ -23,33 +30,47 @@ func WithSeconds() Option { )) } -// WithParser overrides the parser used for interpreting job schedules. +// WithParser overrides the parser used for interpreting job schedules. Passing +// nil keeps the default parser. func WithParser(p Parser) Option { - return func(c *Cron) { - c.parser = p + return func(cronInstance *Cron) { + if p == nil { + return + } + + cronInstance.parser = p } } // WithChain specifies Job wrappers to apply to all jobs added to this cron. // Refer to the Chain* functions in this package for provided wrappers. func WithChain(wrappers ...JobWrapper) Option { - return func(c *Cron) { - c.chain = NewChain(wrappers...) + return func(cronInstance *Cron) { + cronInstance.chain = NewChain(wrappers...) } } -// WithLogger uses the provided logger. +// WithLogger uses the provided logger. Passing nil keeps the default logger. func WithLogger(logger *slog.Logger) Option { - return func(c *Cron) { - c.logger = logger + return func(cronInstance *Cron) { + if logger == nil { + return + } + + cronInstance.logger = logger } } // WithClock overrides the clock used by the cron instance. It is intended // primarily for tests that want to drive the scheduler deterministically. +// Passing nil keeps the default system clock. func WithClock(clock Clock) Option { - return func(c *Cron) { - c.clock = clock + return func(cronInstance *Cron) { + if clock == nil { + return + } + + cronInstance.clock = clock } } @@ -60,8 +81,8 @@ type ErrorFunc func(id EntryID, name string, err error) // WithOnError registers a callback invoked whenever a job returns a non-nil // error. The callback receives the entry's ID, name, and the error. func WithOnError(fn ErrorFunc) Option { - return func(c *Cron) { - c.onError = fn + return func(cronInstance *Cron) { + cronInstance.onError = fn } } @@ -78,7 +99,7 @@ type EventHooks struct { // WithEventHooks registers lifecycle callbacks for job execution events. func WithEventHooks(hooks EventHooks) Option { - return func(c *Cron) { - c.hooks = hooks + return func(cronInstance *Cron) { + cronInstance.hooks = hooks } } diff --git a/option_test.go b/option_test.go index e0538ee..266d736 100644 --- a/option_test.go +++ b/option_test.go @@ -57,3 +57,31 @@ func TestWithLoggerCapturesSchedulerEvents(t *testing.T) { t.Error("expected to see some actions, got:", out) } } + +func TestNilOptionsPreserveDefaults(t *testing.T) { + t.Parallel() + + cronInstance := New( + WithLocation(nil), + WithParser(nil), + WithLogger(nil), + WithClock(nil), + ) + + if cronInstance.location != time.Local { + t.Errorf("expected default location, got %v", cronInstance.location) + } + + if cronInstance.logger == nil { + t.Fatal("expected default logger") + } + + if cronInstance.clock == nil { + t.Fatal("expected default clock") + } + + _, err := cronInstance.parser.Parse("* * * * *") + if err != nil { + t.Fatalf("expected default parser to remain active: %v", err) + } +} diff --git a/parser.go b/parser.go index d2591f4..5ec8fc2 100644 --- a/parser.go +++ b/parser.go @@ -23,6 +23,9 @@ var ( errRangeStepMustBePositive = errors.New("step of range should be a positive number") errNegativeNumber = errors.New("negative number not allowed") errUnrecognizedDescriptor = errors.New("unrecognized descriptor") + errMissingLocationName = errors.New("missing timezone location") + errMissingLocationSpec = errors.New("missing schedule after timezone prefix") + errNonPositiveInterval = errors.New("interval must be greater than zero") ) const ( @@ -316,15 +319,28 @@ func extractLocation(spec string) (string, *time.Location, error) { return spec, loc, nil } - i := strings.Index(spec, " ") - eq := strings.Index(spec, "=") + fields := strings.Fields(spec) + if len(fields) == 0 { + return "", nil, errEmptySpec + } + + eq := strings.Index(fields[0], "=") + + locationName := fields[0][eq+1:] + if locationName == "" { + return "", nil, fmt.Errorf("%w: %s", errMissingLocationName, spec) + } - loc, err := time.LoadLocation(spec[eq+1 : i]) + if len(fields) == 1 { + return "", nil, fmt.Errorf("%w: %s", errMissingLocationSpec, spec) + } + + loc, err := time.LoadLocation(locationName) if err != nil { - return "", nil, fmt.Errorf("provided bad location %s: %w", spec[eq+1:i], err) + return "", nil, fmt.Errorf("provided bad location %s: %w", locationName, err) } - return strings.TrimSpace(spec[i:]), loc, nil + return strings.Join(fields[1:], " "), loc, nil } func parseScheduleFields(fields []string, loc *time.Location) (*SpecSchedule, error) { @@ -593,15 +609,22 @@ func parseDescriptor(descriptor string, loc *time.Location) (Schedule, error) { }, nil } - const every = "@every " - if strings.HasPrefix(descriptor, every) { - duration, err := time.ParseDuration(descriptor[len(every):]) - if err != nil { - return nil, fmt.Errorf("failed to parse duration %s: %w", descriptor, err) - } - - return Every(duration), nil + if strings.HasPrefix(descriptor, "@every ") { + return parseEveryDescriptor(descriptor) } return nil, fmt.Errorf(wrappedErrorWithValueFormat, errUnrecognizedDescriptor, descriptor) } + +func parseEveryDescriptor(descriptor string) (Schedule, error) { + duration, err := time.ParseDuration(descriptor[len("@every "):]) + if err != nil { + return nil, fmt.Errorf("failed to parse duration %s: %w", descriptor, err) + } + + if duration <= 0 { + return nil, fmt.Errorf("%w: %s", errNonPositiveInterval, descriptor) + } + + return Every(duration), nil +} diff --git a/parser_test.go b/parser_test.go index 2f8c603..21c1f0f 100644 --- a/parser_test.go +++ b/parser_test.go @@ -173,7 +173,12 @@ func TestParseScheduleErrors(t *testing.T) { tests := []struct{ expr, err string }{ {"* 5 j * * *", failedToParseIntText}, {"@every Xm", "failed to parse duration"}, + {"@every 0s", "interval must be greater than zero"}, + {"@every -5m", "interval must be greater than zero"}, {"@unrecognized", "unrecognized descriptor"}, + {"CRON_TZ=UTC", "missing schedule after timezone prefix"}, + {"CRON_TZ=", "missing timezone location"}, + {"TZ=", "missing timezone location"}, {"* * * *", "expected 5 to 6 fields"}, {"", "empty spec string"}, } @@ -208,6 +213,7 @@ func TestParseSchedule(t *testing.T) { {NewStandardParser(), "CRON_TZ=UTC 5 * * * *", every5min(time.UTC)}, {parserWithSeconds, "CRON_TZ=Asia/Tokyo 0 5 * * * *", every5min(tokyo)}, {parserWithSeconds, "@every 5m", ConstantDelaySchedule{fiveMinuteDelay}}, + {parserWithSeconds, "@every 500ms", ConstantDelaySchedule{time.Second}}, {parserWithSeconds, "@midnight", midnight(time.Local)}, {parserWithSeconds, "TZ=UTC @midnight", midnight(time.UTC)}, {parserWithSeconds, "TZ=Asia/Tokyo @midnight", midnight(tokyo)},