-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue.go
More file actions
331 lines (300 loc) · 10.5 KB
/
queue.go
File metadata and controls
331 lines (300 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
package concurrent
import (
"context"
"errors"
"iter"
"sync"
)
// DefaultQueueCapacity is the default maximum number of jobs that can be queued
// when using NewQueue. This provides a reasonable default for most use cases
// while preventing unbounded memory growth.
const DefaultQueueCapacity = 1_000_000
// ErrFull is returned when a job queue has reached its capacity limit.
// When the queue is full, additional jobs will be replaced with error-emitting
// jobs that yield this error when processed.
var ErrFull = errors.New("job queue is full")
// Job represents a generic job function that processes data of type T.
// The job receives a context for cancellation and a yield function to emit
// results. The yield function returns true if the caller wants more results,
// or false to stop processing.
//
// Jobs can yield multiple values by calling the yield function multiple times.
// They should respect context cancellation and stop processing when the
// context is done.
type Job[T any] func(context.Context, func(T, error) bool)
// Queue is a thread-safe, bounded queue for processing jobs concurrently.
// It supports generic job types and provides capacity limits, graceful shutdown,
// and synchronization primitives for coordinating between producers and consumers.
//
// The queue uses a condition variable to efficiently block consumers when empty
// and wake them when jobs become available. It maintains a wait group to track
// outstanding jobs for synchronization.
//
// When the queue reaches its capacity limit, additional jobs are dropped and
// replaced with error-emitting jobs that yield ErrFull when processed.
//
// Type parameter T represents the type of values that jobs will produce.
type Queue[T any] struct {
group sync.WaitGroup // tracks outstanding jobs for Wait()
mutex sync.Mutex // protects access to jobs and cap
cond sync.Cond // signals when jobs become available
jobs []Job[T] // slice of pending jobs
cap int // maximum queue capacity, negative means closed
}
// NewQueue creates a new job queue with the default capacity.
// The queue will accept up to DefaultQueueCapacity jobs before dropping
// additional jobs and replacing them with error-emitting jobs that yield ErrFull.
//
// Type parameter T specifies the type of values that jobs will produce.
func NewQueue[T any]() *Queue[T] {
return NewQueueWithCapacity[T](DefaultQueueCapacity)
}
// NewQueueWithCapacity creates a new job queue with the specified capacity.
// The capacity determines the maximum number of jobs that can be queued
// before additional jobs are dropped.
//
// When the queue reaches capacity, additional jobs are dropped and replaced
// with error-emitting jobs that yield ErrFull when processed.
//
// Type parameter T specifies the type of values that jobs will produce.
func NewQueueWithCapacity[T any](capacity int) *Queue[T] {
q := &Queue[T]{cap: capacity}
q.cond.L = &q.mutex
return q
}
// Push adds a job to the queue for processing.
//
// If the queue has reached its capacity, the job will be dropped and replaced
// with an error-emitting job that yields ErrFull when processed. This prevents
// unbounded memory growth without blocking the caller.
//
// Push panics if called on a queue that has been marked as done via Done().
// This prevents adding jobs to a queue that consumers have stopped processing.
//
// Push is safe for concurrent use and will wake up blocked consumers.
func (q *Queue[T]) Push(job Job[T]) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.cap < 0 {
panic("push called on queue that is not accepting any more jobs")
}
if len(q.jobs) > q.cap {
return
}
if len(q.jobs) == q.cap {
// replace job with one that will emit the error
job = func(ctx context.Context, yield func(T, error) bool) {
yield(*new(T), ErrFull)
}
}
q.jobs = append(q.jobs, job)
q.cond.Signal()
q.group.Add(1)
}
// Pull returns an iterator that yields jobs from the queue in FIFO order.
//
// The iterator blocks when the queue is empty and waits for new jobs to be
// pushed. It terminates when Done() is called and all remaining jobs have
// been consumed.
//
// Each job yielded by the iterator is automatically marked as done in the
// wait group when the iterator advances, enabling Wait() to track completion.
//
// Pull is safe for concurrent use by multiple consumers. Jobs are distributed
// among consumers in a round-robin fashion.
//
// The iterator can be stopped early by returning false from the yield function.
func (q *Queue[T]) Pull() iter.Seq[Job[T]] {
return func(yield func(Job[T]) bool) {
for {
q.mutex.Lock()
for len(q.jobs) == 0 && q.cap >= 0 {
q.cond.Wait()
}
if len(q.jobs) == 0 {
q.mutex.Unlock()
return
}
job := q.jobs[0]
q.jobs[0] = nil
q.jobs = q.jobs[1:]
q.mutex.Unlock()
ack := yield(job)
q.group.Done()
if !ack {
return
}
}
}
}
// Wait blocks until all jobs that have been pushed to the queue have been
// processed by consumers via Pull().
//
// Wait tracks job completion through an internal wait group that is incremented
// when jobs are pushed and decremented when jobs are consumed from Pull().
//
// This method is useful for ensuring all work has completed before proceeding
// or shutting down the application.
func (q *Queue[T]) Wait() {
q.group.Wait()
}
// Flush removes all pending jobs from the queue without processing them.
//
// All flushed jobs are marked as done in the wait group, allowing Wait()
// to proceed without waiting for the flushed jobs to be processed.
//
// This method is useful for graceful shutdown scenarios where you want to
// stop processing new work but still wait for already-started jobs to complete.
//
// Flush is safe for concurrent use.
func (q *Queue[T]) Flush() {
q.mutex.Lock()
for range len(q.jobs) {
q.group.Done()
}
q.jobs = nil
q.mutex.Unlock()
}
// Done marks the queue as closed for new jobs and signals all blocked consumers
// to stop pulling jobs once the queue is empty.
//
// After Done() is called:
// - Push() will panic if called
// - Pull() will terminate once all remaining jobs are consumed
// - New Pull() calls will return immediately if the queue is empty
//
// Done is safe for concurrent use and is typically called by producers to
// signal that no more work will be submitted.
func (q *Queue[T]) Done() {
q.mutex.Lock()
q.cap = -1
q.cond.Broadcast()
q.mutex.Unlock()
}
// Process executes jobs from the queue concurrently and returns an iterator
// of their results.
//
// The function creates a pool of worker goroutines (controlled by the context's
// concurrency limit) that pull jobs from the queue and execute them. Results
// are collected and yielded through the returned iterator.
//
// The concurrency level is determined by Limit(ctx). Workers respect context
// cancellation and will stop processing when the context is cancelled.
//
// The iterator terminates when:
// - All jobs in the queue have been processed
// - The context is cancelled
// - The consumer stops the iterator by returning false
//
// Process automatically calls queue.Done() when all jobs have been consumed,
// preventing new jobs from being added and allowing graceful shutdown.
//
// Type parameter T represents the type of values that jobs produce.
func Process[T any](ctx context.Context, queue *Queue[T]) iter.Seq2[T, error] {
return func(yield func(T, error) bool) {
type item struct {
val T
err error
panic any
}
items := make(chan item)
group := new(sync.WaitGroup)
defer group.Wait()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for range Limit(ctx) {
group.Add(1)
go func() {
defer group.Done()
for job := range queue.Pull() {
func() {
defer recoverAndRethrow(func(v any) {
select {
case items <- item{panic: v}:
case <-ctx.Done():
}
})
job(ctx, func(val T, err error) bool {
select {
case items <- item{val: val, err: err}:
return true
case <-ctx.Done():
return false
}
})
}()
}
}()
}
go func() {
queue.Wait()
queue.Done()
group.Wait()
close(items)
}()
for item := range items {
if item.panic != nil {
panic(item.panic)
}
if !yield(item.val, item.err) {
return
}
}
}
}
// Task represents a simple function that performs work and may return an error.
// Tasks are used with TaskQueue for scenarios where you only care about
// success/failure rather than producing specific result values.
//
// Tasks should respect context cancellation and return promptly when the
// context is cancelled.
type Task func(context.Context) error
// TaskQueue is a specialized queue for processing Task functions.
// It's a convenience wrapper around Queue[struct{}] for scenarios where
// you only need to track success/failure of operations rather than
// collecting specific result values.
type TaskQueue Queue[struct{}]
// Queue returns the underlying generic queue that backs this TaskQueue.
// This allows access to the full Queue API when needed.
func (q *TaskQueue) Queue() *Queue[struct{}] {
return (*Queue[struct{}])(q)
}
// Push adds a task to the queue for processing.
// The task will be executed by workers created via ProcessTasks().
//
// Tasks are converted to jobs that yield empty struct{} values on success
// or error values on failure.
func (q *TaskQueue) Push(task Task) {
q.Queue().Push(func(ctx context.Context, yield func(struct{}, error) bool) {
yield(struct{}{}, task(ctx))
})
}
// NewTaskQueue creates a new task queue with the default capacity.
// Task queues are optimized for scenarios where you only need to track
// success/failure of operations.
func NewTaskQueue() *TaskQueue {
return (*TaskQueue)(NewQueue[struct{}]())
}
// NewTaskQueueWithCapacity creates a new task queue with the specified capacity.
// The capacity determines how many tasks can be queued before additional
// tasks are dropped and ErrFull is emitted.
func NewTaskQueueWithCapacity(capacity int) *TaskQueue {
return (*TaskQueue)(NewQueueWithCapacity[struct{}](capacity))
}
// ProcessTasks executes all tasks in the queue concurrently and returns the
// first error encountered, if any.
//
// Tasks are processed by a pool of workers (controlled by the context's
// concurrency limit). If any task returns an error, processing stops and
// that error is returned.
//
// This function is useful for scenarios where you want fail-fast behavior
// and don't need to collect result values from successful tasks.
func ProcessTasks(ctx context.Context, tasks *TaskQueue) error {
for _, err := range Process(ctx, tasks.Queue()) {
if err != nil {
return err
}
}
return nil
}