-
Notifications
You must be signed in to change notification settings - Fork 0
feat(v4): add named entries, event hooks, new job wrappers, and schedule inspection #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| --- | ||
| # These are supported funding model platforms | ||
|
|
||
| github: [hyp3rd] |
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -118,3 +118,106 @@ func SkipIfStillRunning(logger *slog.Logger) JobWrapper { | |
| }) | ||
| } | ||
| } | ||
|
|
||
| // MaxConcurrent allows up to limit concurrent invocations of the wrapped job. | ||
| // Additional invocations beyond the limit are skipped and logged at Info level. | ||
| // It generalizes [SkipIfStillRunning], which is equivalent to MaxConcurrent | ||
| // with a limit of 1. | ||
| func MaxConcurrent(limit int, logger *slog.Logger) JobWrapper { | ||
| limit = max(limit, 1) | ||
|
|
||
| return func(job Job) Job { | ||
| sem := make(chan struct{}, limit) | ||
|
|
||
| for range limit { | ||
| sem <- struct{}{} | ||
| } | ||
|
|
||
| return FuncJob(func(ctx context.Context) error { | ||
| select { | ||
| case token := <-sem: | ||
| defer func() { sem <- token }() | ||
|
|
||
| return job.Run(ctx) | ||
| default: | ||
| logger.Info("skip", "reason", "max_concurrent", "limit", limit) | ||
|
|
||
| return nil | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // Timeout cancels the job's context after the given duration. If the job does | ||
| // not return before the deadline, the context passed to it is cancelled. The | ||
| // wrapper waits for the job to return and reports any error (including | ||
| // [context.DeadlineExceeded]) to the caller. | ||
| // | ||
| // Note: the wrapper does not forcefully kill the job goroutine. The job must | ||
| // honor ctx.Done() for cancellation to take effect. | ||
| func Timeout(duration time.Duration) JobWrapper { | ||
| return func(job Job) Job { | ||
| return FuncJob(func(ctx context.Context) error { | ||
| ctx, cancel := context.WithTimeout(ctx, duration) | ||
| defer cancel() | ||
|
|
||
| return job.Run(ctx) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| // RetryOnError retries the wrapped job up to maxRetries times when it returns a | ||
| // non-nil error. Between attempts it waits for the given backoff duration (or | ||
| // until the context is cancelled). A zero backoff retries immediately. | ||
| // | ||
| // The backoff uses real wall-clock time, not the scheduler's [Clock] interface. | ||
| func RetryOnError(maxRetries int, backoff time.Duration) JobWrapper { | ||
| maxRetries = max(maxRetries, 0) | ||
|
|
||
| return func(job Job) Job { | ||
| return FuncJob(func(ctx context.Context) error { | ||
| return executeWithRetry(ctx, job, maxRetries, backoff) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func executeWithRetry(ctx context.Context, job Job, maxRetries int, backoff time.Duration) error { | ||
| var err error | ||
|
|
||
| for attempt := range maxRetries + 1 { | ||
| err = job.Run(ctx) | ||
| if err == nil { | ||
| return nil | ||
| } | ||
|
Comment on lines
+184
to
+191
|
||
|
|
||
| if attempt == maxRetries { | ||
| break | ||
| } | ||
|
|
||
| waitErr := retryBackoff(ctx, backoff) | ||
| if waitErr != nil { | ||
| return waitErr | ||
| } | ||
| } | ||
|
|
||
| return err | ||
| } | ||
|
|
||
| // retryBackoff waits for the given duration or until the context is cancelled. | ||
| // A zero duration returns immediately. | ||
| func retryBackoff(ctx context.Context, backoff time.Duration) error { | ||
| if backoff <= 0 { | ||
| return nil | ||
| } | ||
|
|
||
| timer := time.NewTimer(backoff) | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| timer.Stop() | ||
|
|
||
| return fmt.Errorf("retry backoff: %w", ctx.Err()) | ||
| case <-timer.C: | ||
| return nil | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MaxConcurrent panics if limit is negative because make(chan struct{}, limit) requires a non-negative capacity. Consider guarding/clamping limit (e.g., treat limit < 1 as 1, or return a wrapper that always skips) so callers can't crash the scheduler by passing a bad limit.