diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c96134..afbbab7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `EventHooks` struct with `OnJobStart` and `OnJobComplete` callbacks, set via `WithEventHooks` option. - `ErrorFunc` type and `WithOnError` option for error-only callbacks. +- `Shutdown(ctx context.Context) error` for graceful scheduler draining without + cancelling contexts already handed to running jobs. - `example_test.go` with runnable examples for pkg.go.dev. - `context.Context` threading throughout the public API: - `Job.Run(ctx context.Context) error` — jobs receive a cancellable context @@ -33,6 +35,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 `ErrAlreadyRunning` if a scheduler is already active. - `Stop(ctx context.Context) error` — cancels the scheduler, waits for in-flight jobs bounded by `ctx`. + - `Shutdown(ctx context.Context) error` — stops future scheduling and waits + for in-flight jobs without cancelling their contexts. - `Clock` interface (`Clock`, `Timer`) and `WithClock` option for deterministic testing without `time.Sleep`. - `ErrAlreadyRunning` sentinel error returned by `Run` when called twice. @@ -50,6 +54,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Entry.WrappedJob` is now unexported (`wrappedJob`). - `FuncJob` signature: `func(context.Context) error` (was `func()`). - Minimum Go version: **1.26**. +- `@every` rejects non-positive durations while still rounding accepted + sub-second intervals to one second. +- Malformed `TZ=` / `CRON_TZ=` prefixes now return parse errors instead of + panicking. +- Passing nil to `WithLocation`, `WithParser`, `WithLogger`, or `WithClock` + preserves the package defaults. ### Removed diff --git a/MIGRATION.md b/MIGRATION.md index 3993e67..1fc3595 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -75,6 +75,15 @@ Returns `cron.ErrAlreadyRunning` if a scheduler is already active. `Stop` cancels the scheduler and waits for in-flight jobs, bounded by `ctx`. It returns `ctx.Err()` if the deadline elapses before all jobs finish. +### Shutdown (graceful drain) + +If you want to stop scheduling new work without cancelling contexts already +handed to running jobs, use: + +```go +err := c.Shutdown(ctx) +``` + ## 4. Logger — `log/slog` replaces custom interface The custom `Logger` interface, `PrintfLogger`, and `VerbosePrintfLogger` are @@ -131,6 +140,13 @@ if errors.Is(err, cron.ErrPanic) { } ``` +`Recover` is not enabled by default. To preserve v3-style panic recovery, +install it explicitly: + +```go +cron.New(cron.WithChain(cron.Recover(logger))) +``` + ## 9. Removed symbols | Removed | Replacement | @@ -149,7 +165,8 @@ if errors.Is(err, cron.ErrPanic) { 1. Update import path to `github.com/hyp3rd/cron/v4`. 2. Add `context.Context` parameter and `error` return to all `Job` implementations and `FuncJob` / `AddFunc` closures. -3. Pass a `context.Context` to `Start`, `Run`, and `Stop`. +3. Pass a `context.Context` to `Start`, `Run`, and `Stop`, and use + `Shutdown(ctx)` when you need graceful draining instead of cancellation. 4. Replace `Logger`/`PrintfLogger`/`VerbosePrintfLogger` with `*slog.Logger`. 5. Rename `NewParser` calls to `NewSpecParser`. 6. Replace `Entry.WrappedJob` usage with `Entry.Job`. diff --git a/README.md b/README.md index 8d56ce4..bf5b8ed 100644 --- a/README.md +++ b/README.md @@ -43,10 +43,10 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() - c.Start(ctx) + c.Start(context.Background()) <-ctx.Done() - c.Stop(context.Background()) + c.Shutdown(context.Background()) } ``` @@ -54,14 +54,17 @@ func main() { - **Standard 5-field cron expressions** (minute, hour, dom, month, dow) plus optional seconds via `WithSeconds()`. -- **`context.Context` throughout** — `Start`, `Run`, `Stop`, and every `Job` - receive a context for cancellation and deadlines. +- **Context-aware lifecycle** — `Start`, `Run`, `Stop`, `Shutdown`, and every + `Job` participate in cancellation and deadlines. - **`log/slog` logging** — structured, leveled logging out of the box. Default level is `slog.LevelWarn` to keep the scheduler quiet. - **`Clock` interface** — inject a fake clock via `WithClock` for deterministic, zero-`time.Sleep` tests. - **Job wrappers** — `Recover`, `SkipIfStillRunning`, `DelayIfStillRunning`, `Timeout`, `MaxConcurrent`, `RetryOnError`, and custom `JobWrapper` chains. +- **Defensive configuration** — malformed `TZ=` / `CRON_TZ=` prefixes and + invalid `@every` intervals return parse errors; nil `With*` options keep + defaults. - **Named entries** — `AddNamedFunc` / `AddNamedJob` attach human-readable labels for logging and observability. - **Event hooks** — `WithEventHooks` for `OnJobStart` / `OnJobComplete` @@ -98,6 +101,10 @@ func main() { @every 1h30m ``` +`@every` accepts positive `time.ParseDuration` values. Durations smaller than a +second still round up to 1 second; `@every 0s` and negative durations are +rejected as configuration errors. + ### Time zones ```go @@ -106,8 +113,26 @@ cron.New(cron.WithLocation(time.UTC)) c.AddFunc("CRON_TZ=Asia/Tokyo 0 6 * * ?", myJob) ``` +Malformed timezone prefixes such as `CRON_TZ=` or `CRON_TZ=UTC` without a +schedule body return parse errors instead of panicking. + +## Lifecycle + +Use `Start(ctx)` for a background scheduler and `Run(ctx)` for a blocking one. +The context passed to `Start` or `Run` is also the parent context for every job. + +- `Stop(ctx)` cancels the scheduler and cancels contexts already handed to + running jobs before waiting for them to return. +- `Shutdown(ctx)` stops future scheduling and waits for running jobs to finish + without cancelling their contexts. +- If the `Start` / `Run` context is cancelled directly, both the scheduler and + job contexts are cancelled. + ## Job wrappers / Chain +Panic recovery is available but not enabled by default. Install `Recover` +explicitly if you want panics turned into logged `ErrPanic` errors: + ```go c := cron.New(cron.WithChain( cron.Recover(logger), @@ -186,6 +211,11 @@ c := cron.New(cron.WithClock(fakeClock)) See `clock.go` for the interface definition. +## Option defaults + +Passing `nil` to `WithLocation`, `WithParser`, `WithLogger`, or `WithClock` +keeps the package default instead of leaving the scheduler in an invalid state. + ## Migration from robfig/cron/v3 See [MIGRATION.md](MIGRATION.md) for a step-by-step upgrade guide. diff --git a/cron.go b/cron.go index 251de37..7819f8b 100644 --- a/cron.go +++ b/cron.go @@ -31,14 +31,23 @@ type Cron struct { parser Parser clock Clock nextID EntryID - jobWaiter sync.WaitGroup - rootCtx context.Context //nolint:containedctx // stored to propagate cancellation to in-flight jobs - rootCancel context.CancelFunc - loopDone chan struct{} + currentRun *runLifecycle + runs []*runLifecycle onError ErrorFunc hooks EventHooks } +type runLifecycle struct { + schedulerCtx context.Context //nolint:containedctx // stored to coordinate graceful and forced shutdown across a scheduler run + schedulerCancel context.CancelFunc + jobCtx context.Context //nolint:containedctx // stored to propagate + // cancellation semantics to in-flight jobs for a scheduler run + jobCancel context.CancelFunc + loopDone chan struct{} + jobsDone chan struct{} + activeJobs int +} + // Parser turns a cron spec string into a [Schedule]. The default // implementation is [SpecParser]; callers can supply their own by passing // [WithParser]. @@ -48,8 +57,9 @@ type Parser interface { // Job is the unit of work scheduled by [Cron]. Implementations should honor // the provided context: when ctx is cancelled the job is expected to return -// promptly. A non-nil error is logged by the scheduler but does not stop -// future executions. +// promptly. Job contexts are cancelled when the parent [Cron.Start] / [Cron.Run] +// context is cancelled or when [Cron.Stop] is called. A non-nil error is logged +// by the scheduler but does not stop future executions. type Job interface { Run(ctx context.Context) error } @@ -195,7 +205,7 @@ func (c *Cron) Entries() []Entry { } snapCh := c.snapshot - loopDone := c.loopDone + loopDone := c.currentRun.loopDone c.runningMu.Unlock() replyChan := make(chan []Entry, 1) @@ -239,7 +249,7 @@ func (c *Cron) Remove(id EntryID) { } removeCh := c.remove - loopDone := c.loopDone + loopDone := c.currentRun.loopDone c.runningMu.Unlock() select { @@ -252,8 +262,8 @@ func (c *Cron) Remove(id EntryID) { } // Start launches the scheduler in its own goroutine bound to ctx. When ctx is -// cancelled the scheduler exits; any jobs already in flight are allowed to -// finish. Calling Start on an already-running scheduler is a no-op. +// cancelled the scheduler exits and running job contexts are cancelled. Calling +// Start on an already-running scheduler is a no-op. func (c *Cron) Start(ctx context.Context) { c.runningMu.Lock() defer c.runningMu.Unlock() @@ -262,12 +272,11 @@ func (c *Cron) Start(ctx context.Context) { return } - loopCtx := c.enterRunning(ctx) - loopDone := c.loopDone + run := c.enterRunning(ctx) go func() { - c.schedulerLoop(loopCtx) - c.markStopped(loopDone) + c.schedulerLoop(run) + c.markStopped(run) }() } @@ -283,50 +292,60 @@ func (c *Cron) Run(ctx context.Context) error { return ErrAlreadyRunning } - loopCtx := c.enterRunning(ctx) - loopDone := c.loopDone + run := c.enterRunning(ctx) c.runningMu.Unlock() - c.schedulerLoop(loopCtx) - c.markStopped(loopDone) + c.schedulerLoop(run) + c.markStopped(run) return nil } -// Stop cancels the running scheduler and waits for in-flight jobs to finish, -// bounded by the provided context. It returns ctx.Err() if the context is -// cancelled before all jobs complete. Calling Stop on a scheduler that is not -// running is a no-op and returns nil. +// Stop cancels the running scheduler and the contexts already handed to +// in-flight jobs, then waits for those jobs to finish, bounded by the provided +// context. It returns ctx.Err() if the context is cancelled before all jobs +// complete. Calling Stop on a scheduler that is not running is a no-op and +// returns nil. func (c *Cron) Stop(ctx context.Context) error { + return c.stopRuns(ctx, "stop", true) +} + +// Shutdown stops the scheduler and waits for in-flight jobs to finish without +// cancelling their contexts. The wait is bounded by the provided context. It +// returns ctx.Err() if the context is cancelled before all jobs complete. +func (c *Cron) Shutdown(ctx context.Context) error { + return c.stopRuns(ctx, "shutdown", false) +} + +func (c *Cron) stopRuns(ctx context.Context, op string, cancelJobs bool) error { c.runningMu.Lock() - if c.rootCancel != nil { - c.rootCancel() - } + c.cleanupCompletedRunsLocked() - loopDone := c.loopDone - c.runningMu.Unlock() + runs := append([]*runLifecycle(nil), c.runs...) + for _, run := range runs { + run.schedulerCancel() - if loopDone != nil { - select { - case <-loopDone: - case <-ctx.Done(): - return fmt.Errorf("cron: stop: %w", ctx.Err()) + if cancelJobs { + run.jobCancel() } } + c.runningMu.Unlock() - done := make(chan struct{}) - - go func() { - c.jobWaiter.Wait() - close(done) - }() + for _, run := range runs { + err := waitForRunChannel(ctx, run.loopDone) + if err != nil { + return fmt.Errorf("cron: %s: %w", op, err) + } + } - select { - case <-done: - return nil - case <-ctx.Done(): - return fmt.Errorf("cron: stop: %w", ctx.Err()) + for _, run := range runs { + err := c.waitForRunJobs(ctx, run) + if err != nil { + return fmt.Errorf("cron: %s: %w", op, err) + } } + + return nil } func (c *Cron) parseAndSchedule(name, spec string, cmd Job) (EntryID, error) { @@ -359,7 +378,7 @@ func (c *Cron) scheduleEntry(name string, sched Schedule, cmd Job) EntryID { } addCh := c.add - loopDone := c.loopDone + loopDone := c.currentRun.loopDone c.runningMu.Unlock() select { @@ -374,26 +393,35 @@ func (c *Cron) scheduleEntry(name string, sched Schedule, cmd Job) EntryID { return entry.ID } -// enterRunning must be called with runningMu held. It sets up the root context -// derived from the caller's ctx and marks the scheduler as running. -func (c *Cron) enterRunning(ctx context.Context) context.Context { +// enterRunning must be called with runningMu held. It sets up independent +// scheduler and job contexts derived from the caller's ctx and marks the +// scheduler as running. +func (c *Cron) enterRunning(ctx context.Context) *runLifecycle { + c.cleanupCompletedRunsLocked() + + run := &runLifecycle{ + loopDone: make(chan struct{}), + } + run.schedulerCtx, run.schedulerCancel = context.WithCancel(ctx) //nolint:gosec // G118: cancellation is retained for Stop/Shutdown + run.jobCtx, run.jobCancel = context.WithCancel(ctx) //nolint:gosec // G118: cancellation is retained for Stop/Shutdown + c.running.Store(true) - c.rootCtx, c.rootCancel = context.WithCancel(ctx) //nolint:gosec // G118: rootCancel is stored and called by Stop - c.loopDone = make(chan struct{}) + c.currentRun = run + c.runs = append(c.runs, run) - return c.rootCtx + return run } // markStopped clears the running flag once the scheduler loop has exited. It // must not acquire runningMu: Schedule/Remove/Entries may hold it while // waiting on loopDone. -func (c *Cron) markStopped(loopDone chan struct{}) { +func (c *Cron) markStopped(run *runLifecycle) { c.running.Store(false) - close(loopDone) + close(run.loopDone) } // schedulerLoop runs the scheduler until ctx is cancelled. -func (c *Cron) schedulerLoop(ctx context.Context) { +func (c *Cron) schedulerLoop(run *runLifecycle) { c.logger.Info("start") now := c.initializeEntries() @@ -403,7 +431,7 @@ func (c *Cron) schedulerLoop(ctx context.Context) { var shouldStop bool - now, shouldStop = c.processSchedulerEvent(ctx, now) + now, shouldStop = c.processSchedulerEvent(run, now) if shouldStop { return } @@ -456,19 +484,19 @@ func (c *Cron) initializeEntries() time.Time { return now } -func (c *Cron) processSchedulerEvent(ctx context.Context, now time.Time) (time.Time, bool) { +func (c *Cron) processSchedulerEvent(run *runLifecycle, now time.Time) (time.Time, bool) { timer := c.newSchedulerTimer(now) defer timer.Stop() for { select { case firedAt := <-timer.C(): - return c.handleTimerFired(ctx, firedAt), false + return c.handleTimerFired(run, firedAt), false case newEntry := <-c.add: return c.handleEntryAdded(newEntry), false case replyChan := <-c.snapshot: replyChan <- c.entrySnapshot() - case <-ctx.Done(): + case <-run.schedulerCtx.Done(): c.logger.Info("stop") return now, true @@ -488,21 +516,21 @@ func (c *Cron) newSchedulerTimer(now time.Time) Timer { return c.clock.NewTimer(c.entries[0].Next.Sub(now)) } -func (c *Cron) handleTimerFired(ctx context.Context, firedAt time.Time) time.Time { +func (c *Cron) handleTimerFired(run *runLifecycle, firedAt time.Time) time.Time { now := firedAt.In(c.location) c.logger.Info("wake", "now", now) - c.runDueEntries(ctx, now) + c.runDueEntries(run, now) return now } -func (c *Cron) runDueEntries(ctx context.Context, now time.Time) { +func (c *Cron) runDueEntries(run *runLifecycle, now time.Time) { for _, entry := range c.entries { if entry.Next.After(now) || entry.Next.IsZero() { break } - c.startJob(ctx, entry) + c.startJob(run, entry) entry.Prev = entry.Next entry.Next = entry.Schedule.Next(now) c.logger.Info("run", "now", now, "entry", entry.ID, "name", entry.Name, "next", entry.Next) @@ -530,10 +558,37 @@ func (c *Cron) handleEntryRemoved(id EntryID) time.Time { // configured [EventHooks] and [ErrorFunc]. Non-nil errors are logged at Warn // level and do not affect future executions. Hook panics are recovered and // logged so that observability callbacks cannot crash the scheduler. -func (c *Cron) startJob(ctx context.Context, entry *Entry) { - c.jobWaiter.Go(func() { - c.executeJob(ctx, entry) - }) +func (c *Cron) startJob(run *runLifecycle, entry *Entry) { + c.runningMu.Lock() + if run.activeJobs == 0 { + run.jobsDone = make(chan struct{}) + } + + run.activeJobs++ + c.runningMu.Unlock() + + go func() { + defer c.finishJob(run) + + c.executeJob(run.jobCtx, entry) + }() +} + +func (c *Cron) finishJob(run *runLifecycle) { + c.runningMu.Lock() + defer c.runningMu.Unlock() + + if run.activeJobs == 0 { + return + } + + run.activeJobs-- + if run.activeJobs == 0 && run.jobsDone != nil { + close(run.jobsDone) + run.jobsDone = nil + } + + c.cleanupCompletedRunsLocked() } func (c *Cron) executeJob(ctx context.Context, entry *Entry) { @@ -593,3 +648,65 @@ func (c *Cron) entrySnapshot() []Entry { func (c *Cron) removeEntry(id EntryID) { c.entries = slices.DeleteFunc(c.entries, func(e *Entry) bool { return e.ID == id }) } + +func (c *Cron) waitForRunJobs(ctx context.Context, run *runLifecycle) error { + c.runningMu.Lock() + if run.activeJobs == 0 { + c.cleanupCompletedRunsLocked() + c.runningMu.Unlock() + + return nil + } + + jobsDone := run.jobsDone + c.runningMu.Unlock() + + return waitForRunChannel(ctx, jobsDone) +} + +func (c *Cron) cleanupCompletedRunsLocked() { + if len(c.runs) == 0 { + return + } + + filtered := c.runs[:0] + for _, run := range c.runs { + if run.activeJobs == 0 && isClosed(run.loopDone) { + if c.currentRun == run { + c.currentRun = nil + } + + continue + } + + filtered = append(filtered, run) + } + + c.runs = filtered +} + +func waitForRunChannel(ctx context.Context, done <-chan struct{}) error { + if done == nil { + return nil + } + + select { + case <-done: + return nil + case <-ctx.Done(): + return fmt.Errorf("context done: %w", ctx.Err()) + } +} + +func isClosed(done <-chan struct{}) bool { + if done == nil { + return false + } + + select { + case <-done: + return true + default: + return false + } +} diff --git a/cron_test.go b/cron_test.go index 6db24a3..6fb069c 100644 --- a/cron_test.go +++ b/cron_test.go @@ -117,6 +117,16 @@ func newFakeWithSeconds() (*Cron, *fakeClock) { return New(WithParser(testParserWithSeconds()), WithChain(), WithClock(fc)), fc } +func newQuietFakeWithSeconds() (*Cron, *fakeClock) { + fc := newFakeClock(baseTime) + + return New( + WithParser(testParserWithSeconds()), + WithClock(fc), + WithLogger(DiscardLogger()), + ), fc +} + func TestFuncPanicRecovery(t *testing.T) { t.Parallel() @@ -216,6 +226,33 @@ func TestStopCausesJobsToNotRun(t *testing.T) { } } +func TestShutdownCausesJobsToNotRun(t *testing.T) { + t.Parallel() + + var calls atomic.Int64 + + cron, fc := newFakeWithSeconds() + cron.Start(context.Background()) + + err := cron.Shutdown(context.Background()) + if err != nil { + t.Fatalf("shutdown: %v", err) + } + + mustAddFunc(t, cron, everySecondSpec, func(_ context.Context) error { + calls.Add(1) + + return nil + }) + + fc.Advance(2 * time.Second) + time.Sleep(10 * time.Millisecond) + + if c := calls.Load(); c != 0 { + t.Fatalf("expected no job runs after shutdown, got %d", c) + } +} + // Add a job, start cron, expect it runs. func TestAddBeforeRunning(t *testing.T) { t.Parallel() @@ -516,6 +553,115 @@ func TestStopWithoutStart(t *testing.T) { } } +func TestShutdownWithoutStart(t *testing.T) { + t.Parallel() + + cron := New() + + err := cron.Shutdown(context.Background()) + if err != nil { + t.Errorf("unexpected shutdown error: %v", err) + } +} + +func TestStopCancelsRunningJobContext(t *testing.T) { + t.Parallel() + + started := make(chan struct{}, 1) + canceled := make(chan struct{}, 1) + + cron, fc := newQuietFakeWithSeconds() + + mustAddFunc(t, cron, everySecondSpec, func(ctx context.Context) error { + signalJobStarted(started) + <-ctx.Done() + signalJobStarted(canceled) + + return ctx.Err() + }) + + cron.Start(context.Background()) + fc.BlockUntilTimers(1) + fc.Advance(1 * time.Second) + waitForJobStarted(t, started) + + err := cron.Stop(context.Background()) + if err != nil { + t.Fatalf("stop: %v", err) + } + + waitForJobStarted(t, canceled) +} + +func TestShutdownLetsRunningJobsFinish(t *testing.T) { + t.Parallel() + + started := make(chan struct{}, 1) + finished := make(chan struct{}, 1) + allowFinish := make(chan struct{}) + + var ( + calls atomic.Int64 + canceled atomic.Bool + ) + + cron, fc := newQuietFakeWithSeconds() + + mustAddFunc(t, cron, everySecondSpec, func(ctx context.Context) error { + calls.Add(1) + signalJobStarted(started) + + select { + case <-allowFinish: + signalJobStarted(finished) + + return nil + case <-ctx.Done(): + canceled.Store(true) + signalJobStarted(finished) + + return ctx.Err() + } + }) + + cron.Start(context.Background()) + fc.BlockUntilTimers(1) + fc.Advance(1 * time.Second) + waitForJobStarted(t, started) + + shutdownDone := make(chan error, 1) + + go func() { + shutdownDone <- cron.Shutdown(context.Background()) + }() + + expectErrorChannelPending(t, shutdownDone, 20*time.Millisecond, "expected Shutdown to wait for the running job") + + close(allowFinish) + + err := waitForErrorResult(t, shutdownDone, awaitTimeout, "shutdown") + if err != nil { + t.Fatalf("shutdown: %v", err) + } + + waitForJobStarted(t, finished) + + if canceled.Load() { + t.Fatal("expected Shutdown not to cancel the running job context") + } + + if calls.Load() != 1 { + t.Fatalf("expected exactly one run before shutdown, got %d", calls.Load()) + } + + fc.Advance(2 * time.Second) + time.Sleep(10 * time.Millisecond) + + if calls.Load() != 1 { + t.Fatalf("expected no future runs after shutdown, got %d", calls.Load()) + } +} + type testJob struct { wg *sync.WaitGroup name string @@ -811,6 +957,14 @@ func TestStopAndWait(t *testing.T) { t.Run("a couple fast jobs and a slow job added, waits for slow job", testStopAndWaitSlowJob) } +func TestShutdownAndWait(t *testing.T) { + t.Parallel() + + t.Run("nothing running, returns immediately", testShutdownAndWaitNothingRunning) + t.Run("repeated calls to Shutdown", testShutdownAndWaitRepeatedCalls) + t.Run("a slow job blocks until it completes", testShutdownAndWaitSlowJob) +} + func TestMultiThreadedStartAndStop(t *testing.T) { t.Parallel() @@ -860,6 +1014,18 @@ func testStopAndWaitNothingRunning(t *testing.T) { } } +func testShutdownAndWaitNothingRunning(t *testing.T) { + t.Parallel() + + cron, _ := newFakeWithSeconds() + cron.Start(context.Background()) + + err := cron.Shutdown(context.Background()) + if err != nil { + t.Errorf("unexpected shutdown error: %v", err) + } +} + func testStopAndWaitRepeatedCalls(t *testing.T) { t.Parallel() @@ -876,6 +1042,22 @@ func testStopAndWaitRepeatedCalls(t *testing.T) { } } +func testShutdownAndWaitRepeatedCalls(t *testing.T) { + t.Parallel() + + cron, _ := newFakeWithSeconds() + cron.Start(context.Background()) + + _ = cron.Shutdown(context.Background()) //nolint:errcheck // first shutdown + + time.Sleep(time.Millisecond) + + err := cron.Shutdown(context.Background()) + if err != nil { + t.Errorf("unexpected shutdown error: %v", err) + } +} + func testStopAndWaitFastJobs(t *testing.T) { t.Parallel() @@ -902,38 +1084,74 @@ func testStopAndWaitFastJobs(t *testing.T) { func testStopAndWaitSlowJob(t *testing.T) { t.Parallel() + runSlowJobWaitTest(t, "Stop", func(cronInstance *Cron, ctx context.Context) error { + return cronInstance.Stop(ctx) + }) +} + +func testShutdownAndWaitSlowJob(t *testing.T) { + t.Parallel() + + runSlowJobWaitTest(t, "Shutdown", func(cronInstance *Cron, ctx context.Context) error { + return cronInstance.Shutdown(ctx) + }) +} + +func runSlowJobWaitTest(t *testing.T, operation string, halt func(*Cron, context.Context) error) { + t.Helper() + slowJobStarted := make(chan struct{}, 1) - cron := newWithSeconds() - mustAddFunc(t, cron, everySecondWithSeconds, noop) - cron.Start(context.Background()) - mustAddFunc(t, cron, everySecondWithSeconds, func(_ context.Context) error { + cronInstance := newWithSeconds() + mustAddFunc(t, cronInstance, everySecondWithSeconds, noop) + cronInstance.Start(context.Background()) + mustAddFunc(t, cronInstance, everySecondWithSeconds, func(_ context.Context) error { signalJobStarted(slowJobStarted) time.Sleep(slowStopJobDelay) return nil }) - mustAddFunc(t, cron, everySecondWithSeconds, noop) + mustAddFunc(t, cronInstance, everySecondWithSeconds, noop) waitForJobStarted(t, slowJobStarted) - // A short deadline should trip because the slow job is still running. shortCtx, cancelShort := context.WithTimeout(context.Background(), waitForStopCheck) - - err := cron.Stop(shortCtx) + err := halt(cronInstance, shortCtx) cancelShort() if err == nil { - t.Error("expected Stop to time out while slow job was running") + t.Errorf("expected %s to time out while slow job was running", operation) } - // A longer deadline should succeed once the slow job wraps up. longCtx, cancelLong := context.WithTimeout(context.Background(), waitForStopCompletion) defer cancelLong() - err = cron.Stop(longCtx) + err = halt(cronInstance, longCtx) if err != nil { - t.Errorf("expected Stop to succeed, got %v", err) + t.Errorf("expected %s to succeed, got %v", operation, err) + } +} + +func expectErrorChannelPending(t *testing.T, done <-chan error, timeout time.Duration, message string) { + t.Helper() + + select { + case err := <-done: + t.Fatalf("%s, got %v", message, err) + case <-time.After(timeout): + } +} + +func waitForErrorResult(t *testing.T, done <-chan error, timeout time.Duration, label string) error { + t.Helper() + + select { + case err := <-done: + return err + case <-time.After(timeout): + t.Fatalf("%s did not complete in time", label) + + return nil } } diff --git a/doc.go b/doc.go index 27dda91..0912e38 100644 --- a/doc.go +++ b/doc.go @@ -16,8 +16,9 @@ It requires Go 1.26 or later. # Usage Callers may register Funcs to be invoked on a given schedule. Cron will run -them in their own goroutines. All jobs receive a [context.Context] that is -cancelled when the scheduler is stopped. +them in their own goroutines. All jobs receive a [context.Context] derived from +the [Cron.Start] / [Cron.Run] context. That job context is cancelled when the +parent context is cancelled or when [Cron.Stop] is called. c := cron.New() c.AddFunc("30 * * * *", func(ctx context.Context) error { @@ -41,7 +42,20 @@ cancelled when the scheduler is stopped. // Inspect the cron job entries' next and previous run times. inspect(c.Entries()) .. - c.Stop(ctx) // Stop the scheduler and wait for in-flight jobs. + c.Stop(ctx) // Cancel running jobs, then wait. + c.Shutdown(ctx) // Graceful drain without cancelling running jobs. + +# Lifecycle + +Cron exposes two shutdown modes: + + - [Cron.Stop] cancels the scheduler and the contexts already handed to running + jobs, then waits for those jobs to return. + - [Cron.Shutdown] stops future scheduling and waits for running jobs to + finish without cancelling their contexts. + +If the context passed to [Cron.Start] or [Cron.Run] is cancelled directly, both +the scheduler and job contexts are cancelled. # CRON Expression Format @@ -138,6 +152,9 @@ where "duration" is a string accepted by time.ParseDuration For example, "@every 1h30m10s" would indicate a schedule that activates after 1 hour, 30 minutes, 10 seconds, and then every interval after that. +Intervals must be greater than zero. Sub-second intervals are accepted and +rounded up to one second. + Note: The interval does not take the job runtime into account. For example, if a job takes 3 minutes to run, and it is scheduled to run every 5 minutes, it will have only 2 minutes of idle time between each run. @@ -169,6 +186,8 @@ For example: The prefix "TZ=(TIME ZONE)" is also supported for legacy compatibility. +Malformed timezone prefixes return parse errors rather than panicking. + Be aware that jobs scheduled during daylight-savings leap-ahead transitions will not be run! @@ -178,11 +197,14 @@ A Cron runner may be configured with a chain of job wrappers to add cross-cutting functionality to all submitted jobs. For example, they may be used to achieve the following effects: - - Recover any panics from jobs (activated by default) + - Recover panics from jobs by installing [Recover] - Delay a job's execution if the previous run hasn't completed yet - Skip a job's execution if the previous run hasn't completed yet - Log each job's invocations +Panic recovery is not enabled by default. To preserve scheduler availability +when jobs may panic, install [Recover] explicitly via [WithChain] or [NewChain]. + Install wrappers for all jobs added to a cron using the [WithChain] option: cron.New(cron.WithChain( @@ -255,6 +277,11 @@ custom [*slog.Logger] via [WithLogger] to control log level and destination: slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}), ))) +# Option defaults + +Passing nil to [WithLocation], [WithParser], [WithLogger], or [WithClock] keeps +the package default instead of leaving the scheduler in an invalid state. + # Implementation Cron entries are stored in an array, sorted by their next activation time. Cron diff --git a/example_test.go b/example_test.go index 50f8826..9ebe658 100644 --- a/example_test.go +++ b/example_test.go @@ -67,6 +67,28 @@ func ExampleCron_AddNamedFunc() { // heartbeat } +func ExampleCron_Shutdown() { + cronInstance := cron.New(cron.WithSeconds()) + done := make(chan struct{}) + + cronInstance.AddFunc(exampleEverySecond, func(_ context.Context) error { + fmt.Println("tick") + close(done) + + return nil + }) + + cronInstance.Start(context.Background()) + <-done + + cronInstance.Shutdown(context.Background()) + fmt.Println("graceful stop") + + // Output: + // tick + // graceful stop +} + func ExampleNextN() { sched, _ := cron.ParseStandard(exampleEveryHour) @@ -116,6 +138,20 @@ func ExampleTimeout() { // done } +func ExampleRecover() { + job := cron.NewChain(cron.Recover(cron.DiscardLogger())).Then( + cron.FuncJob(func(_ context.Context) error { + panic("boom") + }), + ) + + err := job.Run(context.Background()) + fmt.Println(errors.Is(err, cron.ErrPanic)) + + // Output: + // true +} + func ExampleMaxConcurrent() { logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) diff --git a/option.go b/option.go index 874c861..f6cc6ef 100644 --- a/option.go +++ b/option.go @@ -8,10 +8,17 @@ import ( // Option represents a modification to the default behavior of a Cron. type Option func(*Cron) -// WithLocation overrides the timezone of the cron instance. +// WithLocation overrides the timezone of the cron instance. Passing nil keeps +// the default [time.Local]. func WithLocation(loc *time.Location) Option { - return func(c *Cron) { - c.location = loc + return func(cronInstance *Cron) { + if loc == nil { + cronInstance.location = time.Local + + return + } + + cronInstance.location = loc } } @@ -23,33 +30,47 @@ func WithSeconds() Option { )) } -// WithParser overrides the parser used for interpreting job schedules. +// WithParser overrides the parser used for interpreting job schedules. Passing +// nil keeps the default parser. func WithParser(p Parser) Option { - return func(c *Cron) { - c.parser = p + return func(cronInstance *Cron) { + if p == nil { + return + } + + cronInstance.parser = p } } // WithChain specifies Job wrappers to apply to all jobs added to this cron. // Refer to the Chain* functions in this package for provided wrappers. func WithChain(wrappers ...JobWrapper) Option { - return func(c *Cron) { - c.chain = NewChain(wrappers...) + return func(cronInstance *Cron) { + cronInstance.chain = NewChain(wrappers...) } } -// WithLogger uses the provided logger. +// WithLogger uses the provided logger. Passing nil keeps the default logger. func WithLogger(logger *slog.Logger) Option { - return func(c *Cron) { - c.logger = logger + return func(cronInstance *Cron) { + if logger == nil { + return + } + + cronInstance.logger = logger } } // WithClock overrides the clock used by the cron instance. It is intended // primarily for tests that want to drive the scheduler deterministically. +// Passing nil keeps the default system clock. func WithClock(clock Clock) Option { - return func(c *Cron) { - c.clock = clock + return func(cronInstance *Cron) { + if clock == nil { + return + } + + cronInstance.clock = clock } } @@ -60,8 +81,8 @@ type ErrorFunc func(id EntryID, name string, err error) // WithOnError registers a callback invoked whenever a job returns a non-nil // error. The callback receives the entry's ID, name, and the error. func WithOnError(fn ErrorFunc) Option { - return func(c *Cron) { - c.onError = fn + return func(cronInstance *Cron) { + cronInstance.onError = fn } } @@ -78,7 +99,7 @@ type EventHooks struct { // WithEventHooks registers lifecycle callbacks for job execution events. func WithEventHooks(hooks EventHooks) Option { - return func(c *Cron) { - c.hooks = hooks + return func(cronInstance *Cron) { + cronInstance.hooks = hooks } } diff --git a/option_test.go b/option_test.go index e0538ee..266d736 100644 --- a/option_test.go +++ b/option_test.go @@ -57,3 +57,31 @@ func TestWithLoggerCapturesSchedulerEvents(t *testing.T) { t.Error("expected to see some actions, got:", out) } } + +func TestNilOptionsPreserveDefaults(t *testing.T) { + t.Parallel() + + cronInstance := New( + WithLocation(nil), + WithParser(nil), + WithLogger(nil), + WithClock(nil), + ) + + if cronInstance.location != time.Local { + t.Errorf("expected default location, got %v", cronInstance.location) + } + + if cronInstance.logger == nil { + t.Fatal("expected default logger") + } + + if cronInstance.clock == nil { + t.Fatal("expected default clock") + } + + _, err := cronInstance.parser.Parse("* * * * *") + if err != nil { + t.Fatalf("expected default parser to remain active: %v", err) + } +} diff --git a/parser.go b/parser.go index d2591f4..5ec8fc2 100644 --- a/parser.go +++ b/parser.go @@ -23,6 +23,9 @@ var ( errRangeStepMustBePositive = errors.New("step of range should be a positive number") errNegativeNumber = errors.New("negative number not allowed") errUnrecognizedDescriptor = errors.New("unrecognized descriptor") + errMissingLocationName = errors.New("missing timezone location") + errMissingLocationSpec = errors.New("missing schedule after timezone prefix") + errNonPositiveInterval = errors.New("interval must be greater than zero") ) const ( @@ -316,15 +319,28 @@ func extractLocation(spec string) (string, *time.Location, error) { return spec, loc, nil } - i := strings.Index(spec, " ") - eq := strings.Index(spec, "=") + fields := strings.Fields(spec) + if len(fields) == 0 { + return "", nil, errEmptySpec + } + + eq := strings.Index(fields[0], "=") + + locationName := fields[0][eq+1:] + if locationName == "" { + return "", nil, fmt.Errorf("%w: %s", errMissingLocationName, spec) + } - loc, err := time.LoadLocation(spec[eq+1 : i]) + if len(fields) == 1 { + return "", nil, fmt.Errorf("%w: %s", errMissingLocationSpec, spec) + } + + loc, err := time.LoadLocation(locationName) if err != nil { - return "", nil, fmt.Errorf("provided bad location %s: %w", spec[eq+1:i], err) + return "", nil, fmt.Errorf("provided bad location %s: %w", locationName, err) } - return strings.TrimSpace(spec[i:]), loc, nil + return strings.Join(fields[1:], " "), loc, nil } func parseScheduleFields(fields []string, loc *time.Location) (*SpecSchedule, error) { @@ -593,15 +609,22 @@ func parseDescriptor(descriptor string, loc *time.Location) (Schedule, error) { }, nil } - const every = "@every " - if strings.HasPrefix(descriptor, every) { - duration, err := time.ParseDuration(descriptor[len(every):]) - if err != nil { - return nil, fmt.Errorf("failed to parse duration %s: %w", descriptor, err) - } - - return Every(duration), nil + if strings.HasPrefix(descriptor, "@every ") { + return parseEveryDescriptor(descriptor) } return nil, fmt.Errorf(wrappedErrorWithValueFormat, errUnrecognizedDescriptor, descriptor) } + +func parseEveryDescriptor(descriptor string) (Schedule, error) { + duration, err := time.ParseDuration(descriptor[len("@every "):]) + if err != nil { + return nil, fmt.Errorf("failed to parse duration %s: %w", descriptor, err) + } + + if duration <= 0 { + return nil, fmt.Errorf("%w: %s", errNonPositiveInterval, descriptor) + } + + return Every(duration), nil +} diff --git a/parser_test.go b/parser_test.go index 2f8c603..21c1f0f 100644 --- a/parser_test.go +++ b/parser_test.go @@ -173,7 +173,12 @@ func TestParseScheduleErrors(t *testing.T) { tests := []struct{ expr, err string }{ {"* 5 j * * *", failedToParseIntText}, {"@every Xm", "failed to parse duration"}, + {"@every 0s", "interval must be greater than zero"}, + {"@every -5m", "interval must be greater than zero"}, {"@unrecognized", "unrecognized descriptor"}, + {"CRON_TZ=UTC", "missing schedule after timezone prefix"}, + {"CRON_TZ=", "missing timezone location"}, + {"TZ=", "missing timezone location"}, {"* * * *", "expected 5 to 6 fields"}, {"", "empty spec string"}, } @@ -208,6 +213,7 @@ func TestParseSchedule(t *testing.T) { {NewStandardParser(), "CRON_TZ=UTC 5 * * * *", every5min(time.UTC)}, {parserWithSeconds, "CRON_TZ=Asia/Tokyo 0 5 * * * *", every5min(tokyo)}, {parserWithSeconds, "@every 5m", ConstantDelaySchedule{fiveMinuteDelay}}, + {parserWithSeconds, "@every 500ms", ConstantDelaySchedule{time.Second}}, {parserWithSeconds, "@midnight", midnight(time.Local)}, {parserWithSeconds, "TZ=UTC @midnight", midnight(time.UTC)}, {parserWithSeconds, "TZ=Asia/Tokyo @midnight", midnight(tokyo)},