From 3842a88ee387ec22936aacca7dc11aa06024920d Mon Sep 17 00:00:00 2001 From: Jeremy Scott Date: Fri, 14 Jan 2022 11:58:30 +1300 Subject: [PATCH 1/7] [ch11410] - Fork Cron library and allow configuration of timing function --- cron.go | 119 ++++++++++++++++++++++++++++++++++++++++-------------- go.mod | 2 +- option.go | 7 ++++ 3 files changed, 97 insertions(+), 31 deletions(-) diff --git a/cron.go b/cron.go index f6e451db..dcc03096 100644 --- a/cron.go +++ b/cron.go @@ -2,6 +2,8 @@ package cron import ( "context" + "math" + "math/rand" "sort" "sync" "time" @@ -11,19 +13,20 @@ import ( // specified by the schedule. It may be started, stopped, and the entries may // be inspected while running. type Cron struct { - entries []*Entry - chain Chain - stop chan struct{} - add chan *Entry - remove chan EntryID - snapshot chan chan []Entry - running bool - logger Logger - runningMu sync.Mutex - location *time.Location - parser Parser - nextID EntryID - jobWaiter sync.WaitGroup + entries []*Entry + chain Chain + stop chan struct{} + add chan *Entry + remove chan EntryID + snapshot chan chan []Entry + running bool + logger Logger + runningMu sync.Mutex + location *time.Location + parser Parser + nextID EntryID + jobWaiter sync.WaitGroup + timeCallback func() time.Time } // Job is an interface for submitted cron jobs. @@ -64,11 +67,32 @@ type Entry struct { // It is kept around so that user code that needs to get at the job later, // e.g. via Entries() can do so. Job Job + + // Add some jitter to the Entry's scheduled time. The actual jitter used + // will be randomly chosen within the range [0, Jitter]. The jitter will + // always be converted to a positive duration. + Jitter time.Duration } // Valid returns true if this is not the zero entry. func (e Entry) Valid() bool { return e.ID != 0 } +// Add a random amount of jitter to the input time using the maximum jitter +// amount. The calculated jitter is on the closed interval [0, jitterMaximum] +// where the only exception to this rule is if the jitter duration is the +// maximum value for an int64 which is on the closed interval [0, Int64Max]. +func calculateJitteredTime(now time.Time, jitterMaximum time.Duration) time.Time { + result := now + if jitterMaximum > 0 { + val := jitterMaximum.Nanoseconds() + if val < math.MinInt64 { + val = jitterMaximum.Nanoseconds()+1 + } + result = result.Add(time.Duration(rand.Int63n(val))) + } + return result +} + // byTime is a wrapper for sorting the entry array by time // (with zero time at the end). type byTime []*Entry @@ -107,17 +131,18 @@ func (s byTime) Less(i, j int) bool { // See "cron.With*" to modify the default behavior. func New(opts ...Option) *Cron { c := &Cron{ - entries: nil, - chain: NewChain(), - add: make(chan *Entry), - stop: make(chan struct{}), - snapshot: make(chan chan []Entry), - remove: make(chan EntryID), - running: false, - runningMu: sync.Mutex{}, - logger: DefaultLogger, - location: time.Local, - parser: standardParser, + entries: nil, + chain: NewChain(), + add: make(chan *Entry), + stop: make(chan struct{}), + snapshot: make(chan chan []Entry), + remove: make(chan EntryID), + running: false, + runningMu: sync.Mutex{}, + logger: DefaultLogger, + location: time.Local, + parser: standardParser, + timeCallback: nil, } for _, opt := range opts { opt(c) @@ -134,23 +159,49 @@ func (f FuncJob) Run() { f() } // The spec is parsed using the time zone of this Cron instance as the default. // An opaque ID is returned that can be used to later remove it. func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) { - return c.AddJob(spec, FuncJob(cmd)) + return c.AddJobWithJitter(spec, FuncJob(cmd), 0) +} + +// AddFuncWithJitter adds a func to the Cron to be run on the given schedule. +// The spec is parsed using the time zone of this Cron instance as the default. +// An opaque ID is returned that can be used to later remove it. +func (c *Cron) AddFuncWithJitter(spec string, cmd func(), jitter time.Duration) (EntryID, error) { + return c.AddJobWithJitter(spec, FuncJob(cmd), jitter) } // AddJob adds a Job to the Cron to be run on the given schedule. // The spec is parsed using the time zone of this Cron instance as the default. // An opaque ID is returned that can be used to later remove it. func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) { + return c.AddJobWithJitter(spec, cmd, 0) +} + +// AddJobWithJitter adds a Job to the Cron to be run on the given schedule with +// a specified amount of jitter for each invocation. +// The spec is parsed using the time zone of this Cron instance as the default. +// An opaque ID is returned that can be used to later remove it. +func (c *Cron) AddJobWithJitter(spec string, cmd Job, jitter time.Duration) (EntryID, error) { schedule, err := c.parser.Parse(spec) if err != nil { return 0, err } - return c.Schedule(schedule, cmd), nil + return c.ScheduleWithJitter(schedule, cmd, jitter), nil } // Schedule adds a Job to the Cron to be run on the given schedule. // The job is wrapped with the configured Chain. func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { + return c.ScheduleWithJitter(schedule, cmd, 0) +} + +// ScheduleWithJitter adds a Job to the Cron to be run on the given schedule +// with a specified amount of jitter for each invocation. +// The job is wrapped with the configured Chain. +func (c *Cron) ScheduleWithJitter(schedule Schedule, cmd Job, jitter time.Duration) EntryID { + if jitter < 0 { + jitter = -jitter + } + c.runningMu.Lock() defer c.runningMu.Unlock() c.nextID++ @@ -159,6 +210,7 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID { Schedule: schedule, WrappedJob: c.chain.Then(cmd), Job: cmd, + Jitter: jitter, } if !c.running { c.entries = append(c.entries, entry) @@ -237,7 +289,7 @@ func (c *Cron) run() { // Figure out the next activation times for each entry. now := c.now() for _, entry := range c.entries { - entry.Next = entry.Schedule.Next(now) + entry.Next = calculateJitteredTime(entry.Schedule.Next(now), entry.Jitter) c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next) } @@ -267,14 +319,14 @@ func (c *Cron) run() { } c.startJob(e.WrappedJob) e.Prev = e.Next - e.Next = e.Schedule.Next(now) + e.Next = calculateJitteredTime(e.Schedule.Next(now), e.Jitter) c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next) } case newEntry := <-c.add: timer.Stop() now = c.now() - newEntry.Next = newEntry.Schedule.Next(now) + newEntry.Next = calculateJitteredTime(newEntry.Schedule.Next(now), newEntry.Jitter) c.entries = append(c.entries, newEntry) c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next) @@ -310,7 +362,14 @@ func (c *Cron) startJob(j Job) { // now returns current time in c location func (c *Cron) now() time.Time { - return time.Now().In(c.location) + var result time.Time + if c.timeCallback == nil { + result = time.Now() + } else { + result = c.timeCallback() + } + result = result.In(c.location) + return result } // Stop stops the cron scheduler if it is running; otherwise it does nothing. diff --git a/go.mod b/go.mod index 8c95bf47..9b9f2725 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module github.com/robfig/cron/v3 +module github.com/team-rocos/cron/v3 go 1.12 diff --git a/option.go b/option.go index 07638201..ac0f4c4e 100644 --- a/option.go +++ b/option.go @@ -43,3 +43,10 @@ func WithLogger(logger Logger) Option { c.logger = logger } } + +// WithTimeCallback uses the provided time function callback for getting the current time. +func WithTimeCallback(timeCallback func() time.Time) Option { + return func(c *Cron) { + c.timeCallback = timeCallback + } +} From aad2d2072add93f0a45222ef70e005d819dca404 Mon Sep 17 00:00:00 2001 From: Jeremy Scott Date: Fri, 14 Jan 2022 14:20:11 +1300 Subject: [PATCH 2/7] [ch11410] - Fork Cron library and allow configuration of timing function - Revert the Golang module name. --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 9b9f2725..8c95bf47 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module github.com/team-rocos/cron/v3 +module github.com/robfig/cron/v3 go 1.12 From af01580322056b01a64c0a6647290a479c3adaa6 Mon Sep 17 00:00:00 2001 From: Edwin Hayes Date: Fri, 13 May 2022 14:29:50 +1200 Subject: [PATCH 3/7] [ch12701] - Fixed issue where custom time callback was used for first event, but not for subsequent events. --- constantdelay_test.go | 2 +- cron.go | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/constantdelay_test.go b/constantdelay_test.go index f43a58ad..24a11731 100644 --- a/constantdelay_test.go +++ b/constantdelay_test.go @@ -26,7 +26,7 @@ func TestConstantDelayNext(t *testing.T) { {"Mon Jul 9 23:35:51 2012", 25*time.Hour + 44*time.Minute + 24*time.Second, "Thu Jul 11 01:20:15 2012"}, // Wrap around months - {"Mon Jul 9 23:35 2012", 91*24*time.Hour + 25*time.Minute, "Thu Oct 9 00:00 2012"}, + {"Mon Jun 9 23:35 2012", 91*24*time.Hour + 25*time.Minute, "Thu Sep 9 00:00 2012"}, // Don't do JUL->OCT otherwise daylight savings breaks the test in some time zones... // Wrap around minute, hour, day, month, and year {"Mon Dec 31 23:59:45 2012", 15 * time.Second, "Tue Jan 1 00:00:00 2013"}, diff --git a/cron.go b/cron.go index dcc03096..f103ece9 100644 --- a/cron.go +++ b/cron.go @@ -86,7 +86,7 @@ func calculateJitteredTime(now time.Time, jitterMaximum time.Duration) time.Time if jitterMaximum > 0 { val := jitterMaximum.Nanoseconds() if val < math.MinInt64 { - val = jitterMaximum.Nanoseconds()+1 + val = jitterMaximum.Nanoseconds() + 1 } result = result.Add(time.Duration(rand.Int63n(val))) } @@ -308,8 +308,9 @@ func (c *Cron) run() { for { select { - case now = <-timer.C: - now = now.In(c.location) + case <-timer.C: + // Note: we can't just use the value which comes back from timer.C, because that won't respect the timeCallback, if one is set. + now = c.now() c.logger.Info("wake", "now", now) // Run every entry whose next time was less than now From 01ffd6345b189b1d0672b82819b4a05091d828f1 Mon Sep 17 00:00:00 2001 From: Mohsen Pashna Date: Thu, 7 Jul 2022 15:46:24 +1200 Subject: [PATCH 4/7] ignore vscode settings --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 00268614..3c31f54c 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ _cgo_export.* _testmain.go *.exe +.vscode From ad961817d7bfa6fa4a4e472d168a027bac940fef Mon Sep 17 00:00:00 2001 From: Mohsen Pashna Date: Thu, 7 Jul 2022 15:46:50 +1200 Subject: [PATCH 5/7] [GRM-124] Added `RecalculateTime` to api --- cron.go | 60 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/cron.go b/cron.go index f103ece9..9a506362 100644 --- a/cron.go +++ b/cron.go @@ -13,20 +13,21 @@ import ( // specified by the schedule. It may be started, stopped, and the entries may // be inspected while running. type Cron struct { - entries []*Entry - chain Chain - stop chan struct{} - add chan *Entry - remove chan EntryID - snapshot chan chan []Entry - running bool - logger Logger - runningMu sync.Mutex - location *time.Location - parser Parser - nextID EntryID - jobWaiter sync.WaitGroup - timeCallback func() time.Time + entries []*Entry + chain Chain + stop chan struct{} + add chan *Entry + remove chan EntryID + snapshot chan chan []Entry + running bool + logger Logger + runningMu sync.Mutex + location *time.Location + parser Parser + nextID EntryID + jobWaiter sync.WaitGroup + timeCallback func() time.Time + recalculateTimer chan struct{} } // Job is an interface for submitted cron jobs. @@ -131,18 +132,19 @@ func (s byTime) Less(i, j int) bool { // See "cron.With*" to modify the default behavior. func New(opts ...Option) *Cron { c := &Cron{ - entries: nil, - chain: NewChain(), - add: make(chan *Entry), - stop: make(chan struct{}), - snapshot: make(chan chan []Entry), - remove: make(chan EntryID), - running: false, - runningMu: sync.Mutex{}, - logger: DefaultLogger, - location: time.Local, - parser: standardParser, - timeCallback: nil, + entries: nil, + chain: NewChain(), + add: make(chan *Entry), + stop: make(chan struct{}), + snapshot: make(chan chan []Entry), + remove: make(chan EntryID), + running: false, + runningMu: sync.Mutex{}, + logger: DefaultLogger, + location: time.Local, + parser: standardParser, + timeCallback: nil, + recalculateTimer: make(chan struct{}), } for _, opt := range opts { opt(c) @@ -281,6 +283,10 @@ func (c *Cron) Run() { c.run() } +func (c *Cron) RecalculateTimer() { + c.recalculateTimer <- struct{}{} +} + // run the scheduler.. this is private just due to the need to synchronize // access to the 'running' state variable. func (c *Cron) run() { @@ -345,6 +351,8 @@ func (c *Cron) run() { now = c.now() c.removeEntry(id) c.logger.Info("removed", "entry", id) + case <-c.recalculateTimer: + break } break From 457323b3892f4505d951a766d9eb6445553732d0 Mon Sep 17 00:00:00 2001 From: Mohsen Pashna Date: Thu, 7 Jul 2022 15:56:02 +1200 Subject: [PATCH 6/7] [GRM-124] changed the method as per comment --- cron.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cron.go b/cron.go index 9a506362..bc8e1312 100644 --- a/cron.go +++ b/cron.go @@ -283,7 +283,7 @@ func (c *Cron) Run() { c.run() } -func (c *Cron) RecalculateTimer() { +func (c *Cron) RecalculateNextEvent() { c.recalculateTimer <- struct{}{} } From 966ed6863c9e1311d6e9e9244a21b06e257850ba Mon Sep 17 00:00:00 2001 From: Jeremy Scott Date: Wed, 10 Aug 2022 09:07:29 +1200 Subject: [PATCH 7/7] [GRM-287] - Tasks scheduled by the scheduler seem to either behave erratically or don't run at all. --- cron.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cron.go b/cron.go index bc8e1312..d08bd2cc 100644 --- a/cron.go +++ b/cron.go @@ -352,6 +352,7 @@ func (c *Cron) run() { c.removeEntry(id) c.logger.Info("removed", "entry", id) case <-c.recalculateTimer: + now = c.now() break }