-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathoptions.go
More file actions
264 lines (234 loc) · 7.27 KB
/
options.go
File metadata and controls
264 lines (234 loc) · 7.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
// Copyright (c) 2026 Onur Cinar.
// The source code is provided under MIT License.
// https://github.com/cinar/resile
package resile
import (
"context"
"time"
"github.com/cinar/resile/chaos"
"github.com/cinar/resile/circuit"
)
// Option defines a functional option for configuring a retry execution.
type Option func(*Config)
// WithFallback sets a function to be called if all retries are exhausted or if the circuit breaker is open.
// T must match the return type of the retry action.
func WithFallback[T any](f func(context.Context, error) (T, error)) Option {
return func(c *Config) {
c.Fallback = f
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.fallbackMiddleware())
}
}
}
// WithFallbackErr sets a fallback function for operations that only return an error.
func WithFallbackErr(f func(context.Context, error) error) Option {
return func(c *Config) {
c.Fallback = f
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.fallbackMiddleware())
}
}
}
// WithName sets the name for the operation. This is used in telemetry labels.
func WithName(name string) Option {
return func(c *Config) {
c.Name = name
}
}
// WithMaxAttempts sets the maximum number of execution attempts.
func WithMaxAttempts(attempts uint) Option {
return func(c *Config) {
c.MaxAttempts = attempts
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.retryMiddleware())
}
}
}
// WithRetry sets the maximum number of execution attempts and adds a retry policy to the pipeline.
func WithRetry(attempts uint) Option {
return func(c *Config) {
c.MaxAttempts = attempts
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.retryMiddleware())
}
}
}
// WithBaseDelay sets the initial delay for the backoff algorithm.
func WithBaseDelay(delay time.Duration) Option {
return func(c *Config) {
c.BaseDelay = delay
// If using the default backoff, we update its parameters.
if fj, ok := c.Backoff.(*fullJitter); ok {
fj.base = delay
}
}
}
// WithMaxDelay sets the maximum delay for the backoff algorithm.
func WithMaxDelay(delay time.Duration) Option {
return func(c *Config) {
c.MaxDelay = delay
if fj, ok := c.Backoff.(*fullJitter); ok {
fj.cap = delay
}
}
}
// WithBackoff sets a custom backoff algorithm.
func WithBackoff(backoff Backoff) Option {
return func(c *Config) {
c.Backoff = backoff
}
}
// WithRetryIf sets a specific error to trigger a retry.
func WithRetryIf(target error) Option {
return func(c *Config) {
c.Policy.retryIf = target
}
}
// WithRetryIfFunc sets a custom function to determine if an error should be retried.
func WithRetryIfFunc(f func(error) bool) Option {
return func(c *Config) {
c.Policy.retryIfFunc = f
}
}
// WithInstrumenter sets a telemetry instrumenter.
func WithInstrumenter(instr Instrumenter) Option {
return func(c *Config) {
c.Instrumenter = instr
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.instrumenterMiddleware())
}
}
}
// WithCircuitBreaker integrates a circuit breaker into the retry execution.
func WithCircuitBreaker(cb *circuit.Breaker) Option {
return func(c *Config) {
c.CircuitBreaker = cb
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.circuitBreakerMiddleware())
}
}
}
// WithBulkhead integrates a bulkhead into the execution.
func WithBulkhead(capacity uint) Option {
return func(c *Config) {
c.Bulkhead = NewBulkhead(capacity)
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.bulkheadMiddleware())
}
}
}
// WithBulkheadInstance integrates a shared bulkhead into the execution.
func WithBulkheadInstance(bh *Bulkhead) Option {
return func(c *Config) {
c.Bulkhead = bh
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.bulkheadMiddleware())
}
}
}
// WithPriorityBulkhead integrates a priority-aware bulkhead into the execution.
func WithPriorityBulkhead(capacity uint, thresholds map[Priority]float64) Option {
return func(c *Config) {
c.PriorityBulkhead = NewPriorityBulkhead(capacity, thresholds)
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.priorityBulkheadMiddleware())
}
}
}
// WithPriorityBulkheadInstance integrates a shared priority-aware bulkhead into the execution.
func WithPriorityBulkheadInstance(bh *PriorityBulkhead) Option {
return func(c *Config) {
c.PriorityBulkhead = bh
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.priorityBulkheadMiddleware())
}
}
}
// WithRateLimiter integrates a rate limiter into the execution.
func WithRateLimiter(limit float64, interval time.Duration) Option {
return func(c *Config) {
c.RateLimiter = NewRateLimiter(limit, interval)
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.rateLimiterMiddleware())
}
}
}
// WithRateLimiterInstance integrates a shared rate limiter into the execution.
func WithRateLimiterInstance(rl *RateLimiter) Option {
return func(c *Config) {
c.RateLimiter = rl
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.rateLimiterMiddleware())
}
}
}
// WithAdaptiveLimiter integrates an adaptive concurrency limiter into the execution.
func WithAdaptiveLimiter() Option {
return func(c *Config) {
c.AdaptiveLimiter = NewAdaptiveLimiter()
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.adaptiveLimiterMiddleware())
}
}
}
// WithAdaptiveLimiterInstance integrates a shared adaptive concurrency limiter into the execution.
func WithAdaptiveLimiterInstance(al *AdaptiveLimiter) Option {
return func(c *Config) {
c.AdaptiveLimiter = al
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.adaptiveLimiterMiddleware())
}
}
}
// WithTimeout sets a timeout for the execution.
func WithTimeout(timeout time.Duration) Option {
return func(c *Config) {
c.Timeout = timeout
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.timeoutMiddleware(timeout))
}
}
}
// WithHedgingDelay sets the delay for speculative retries (hedging).
// If a response doesn't arrive within this delay, another attempt is started concurrently.
func WithHedgingDelay(delay time.Duration) Option {
return func(c *Config) {
c.HedgingDelay = delay
}
}
// WithAdaptiveBucket sets a token bucket for adaptive retries.
// The bucket should be shared across multiple executions to protect downstream services globally.
func WithAdaptiveBucket(bucket *AdaptiveBucket) Option {
return func(c *Config) {
c.AdaptiveBucket = bucket
}
}
// WithPanicRecovery enables recovering from panics during execution.
// If a panic occurs, it is converted into a PanicError and treated as a retryable error.
func WithPanicRecovery() Option {
return func(c *Config) {
c.RecoverPanics = true
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.panicRecoveryMiddleware())
}
}
}
// WithChaos integrates a chaos injector into the execution.
func WithChaos(cfg chaos.Config) Option {
return func(c *Config) {
c.Chaos = chaos.NewInjector(cfg)
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.chaosMiddleware())
}
}
}
// WithMinDeadlineThreshold sets the minimum remaining time required to start a new attempt.
// If the context's deadline is sooner than this threshold, the execution is aborted early.
func WithMinDeadlineThreshold(threshold time.Duration) Option {
return func(c *Config) {
c.MinDeadlineThreshold = threshold
if c.pipeline != nil {
c.pipeline = append(c.pipeline, c.deadlineMiddleware())
}
}
}