Skip to content

Commit 33bdf7e

Browse files
committed
feat: add errors.Join for batch operations and benchmark tests
- Enhance batchOperation to return aggregated errors using errors.Join - Add benchmark tests using Go 1.24+ testing.B.Loop() pattern - Update dependencies to latest versions
1 parent 18c69d2 commit 33bdf7e

File tree

6 files changed

+150
-30
lines changed

6 files changed

+150
-30
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
- 6379:6379
2424

2525
steps:
26-
- uses: actions/checkout@v5
26+
- uses: actions/checkout@v6
2727

2828
- name: Set up Go
2929
uses: actions/setup-go@v6
@@ -37,7 +37,7 @@ jobs:
3737
lint:
3838
runs-on: ubuntu-latest
3939
steps:
40-
- uses: actions/checkout@v5
40+
- uses: actions/checkout@v6
4141

4242
- name: Set up Go
4343
uses: actions/setup-go@v6

.golangci.version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.4.0
1+
2.7.2

go.mod

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go.sum

Lines changed: 10 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

manager.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -317,27 +317,32 @@ func (s *Manager) RunJobsByState(queue string, state JobState) (int, error) {
317317
return count, nil
318318
}
319319

320-
// batchOperation performs a batch operation on items and returns succeeded and failed items.
321-
func batchOperation[T any](items []T, operation func(T) error) (succeeded, failed []T) {
320+
// batchOperation performs a batch operation on items and returns succeeded, failed items, and aggregated errors.
321+
func batchOperation[T any](items []T, operation func(T) error) (succeeded, failed []T, err error) {
322322
succeeded = make([]T, 0, len(items))
323323
failed = make([]T, 0, len(items))
324+
var errs []error
324325

325326
for _, item := range items {
326-
if err := operation(item); err != nil {
327+
if opErr := operation(item); opErr != nil {
327328
failed = append(failed, item)
329+
errs = append(errs, opErr)
328330
} else {
329331
succeeded = append(succeeded, item)
330332
}
331333
}
332-
return succeeded, failed
334+
335+
if len(errs) > 0 {
336+
return succeeded, failed, errors.Join(errs...)
337+
}
338+
return succeeded, failed, nil
333339
}
334340

335341
// BatchRunJobs triggers immediate execution of multiple jobs identified by their IDs.
336342
func (s *Manager) BatchRunJobs(queue string, jobIDs []string) ([]string, []string, error) {
337-
succeeded, failed := batchOperation(jobIDs, func(jobID string) error {
343+
return batchOperation(jobIDs, func(jobID string) error {
338344
return s.Inspector.RunTask(queue, jobID)
339345
})
340-
return succeeded, failed, nil
341346
}
342347

343348
// ArchiveJob moves a job with the specified ID to the archive.
@@ -394,10 +399,9 @@ func (s *Manager) ArchiveJobsByState(queue string, state JobState) (int, error)
394399

395400
// BatchArchiveJobs archives multiple jobs identified by their IDs.
396401
func (s *Manager) BatchArchiveJobs(queue string, jobIDs []string) ([]string, []string, error) {
397-
succeeded, failed := batchOperation(jobIDs, func(jobID string) error {
402+
return batchOperation(jobIDs, func(jobID string) error {
398403
return s.Inspector.ArchiveTask(queue, jobID)
399404
})
400-
return succeeded, failed, nil
401405
}
402406

403407
// CancelJob cancels a job with the specified ID.
@@ -447,10 +451,9 @@ func (s *Manager) CancelActiveJobs(queue string, size, page int) (int, error) {
447451

448452
// BatchCancelJobs cancels multiple jobs identified by their IDs.
449453
func (s *Manager) BatchCancelJobs(jobIDs []string) ([]string, []string, error) {
450-
succeeded, failed := batchOperation(jobIDs, func(jobID string) error {
454+
return batchOperation(jobIDs, func(jobID string) error {
451455
return s.Inspector.CancelProcessing(jobID)
452456
})
453-
return succeeded, failed, nil
454457
}
455458

456459
// DeleteJob deletes a job with the specified ID from its queue.
@@ -501,10 +504,9 @@ func (s *Manager) DeleteJobsByState(queue string, state JobState) (int, error) {
501504

502505
// BatchDeleteJobs deletes multiple jobs identified by their IDs.
503506
func (s *Manager) BatchDeleteJobs(queue string, jobIDs []string) ([]string, []string, error) {
504-
succeeded, failed := batchOperation(jobIDs, func(jobID string) error {
507+
return batchOperation(jobIDs, func(jobID string) error {
505508
return s.Inspector.DeleteTask(queue, jobID)
506509
})
507-
return succeeded, failed, nil
508510
}
509511

510512
// RunAggregatingJobs triggers all aggregating jobs to run immediately in a specified queue and group.

tests/benchmarks_test.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package tests
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/kaptinlin/queue"
8+
)
9+
10+
func BenchmarkJobCreation(b *testing.B) {
11+
payload := map[string]interface{}{"key": "value", "count": 123}
12+
13+
for b.Loop() {
14+
_ = queue.NewJob("test_job", payload)
15+
}
16+
}
17+
18+
func BenchmarkJobCreationWithOptions(b *testing.B) {
19+
payload := map[string]interface{}{"key": "value", "count": 123}
20+
21+
for b.Loop() {
22+
_ = queue.NewJob("test_job", payload,
23+
queue.WithQueue("critical"),
24+
queue.WithMaxRetries(3),
25+
)
26+
}
27+
}
28+
29+
func BenchmarkJobConvertToAsynqTask(b *testing.B) {
30+
payload := map[string]interface{}{"key": "value", "count": 123}
31+
job := queue.NewJob("test_job", payload, queue.WithQueue("default"))
32+
33+
for b.Loop() {
34+
_, _, err := job.ConvertToAsynqTask()
35+
if err != nil {
36+
b.Fatal(err)
37+
}
38+
}
39+
}
40+
41+
func BenchmarkHandlerCreation(b *testing.B) {
42+
handlerFunc := func(_ context.Context, _ *queue.Job) error {
43+
return nil
44+
}
45+
46+
for b.Loop() {
47+
_ = queue.NewHandler("test_job", handlerFunc)
48+
}
49+
}
50+
51+
func BenchmarkHandlerProcess(b *testing.B) {
52+
handler := queue.NewHandler("test_job", func(_ context.Context, _ *queue.Job) error {
53+
return nil
54+
})
55+
56+
job := queue.NewJob("test_job", map[string]interface{}{"key": "value"})
57+
ctx := context.Background()
58+
59+
for b.Loop() {
60+
err := handler.Process(ctx, job)
61+
if err != nil {
62+
b.Fatal(err)
63+
}
64+
}
65+
}
66+
67+
func BenchmarkJobDecodePayload(b *testing.B) {
68+
type TestPayload struct {
69+
Key string `json:"key"`
70+
Count int `json:"count"`
71+
}
72+
73+
payload := map[string]interface{}{"key": "value", "count": 123}
74+
job := queue.NewJob("test_job", payload)
75+
76+
for b.Loop() {
77+
var decoded TestPayload
78+
err := job.DecodePayload(&decoded)
79+
if err != nil {
80+
b.Fatal(err)
81+
}
82+
}
83+
}
84+
85+
func BenchmarkClientEnqueue(b *testing.B) {
86+
redisConfig := getRedisConfig()
87+
client, err := queue.NewClient(redisConfig)
88+
if err != nil {
89+
b.Skipf("Redis not available: %v", err)
90+
}
91+
defer func() { _ = client.Stop() }()
92+
93+
payload := map[string]interface{}{"key": "value"}
94+
95+
for b.Loop() {
96+
_, err := client.Enqueue("benchmark_job", payload)
97+
if err != nil {
98+
b.Fatal(err)
99+
}
100+
}
101+
}
102+
103+
func BenchmarkClientEnqueueJob(b *testing.B) {
104+
redisConfig := getRedisConfig()
105+
client, err := queue.NewClient(redisConfig)
106+
if err != nil {
107+
b.Skipf("Redis not available: %v", err)
108+
}
109+
defer func() { _ = client.Stop() }()
110+
111+
for b.Loop() {
112+
job := queue.NewJob("benchmark_job", map[string]interface{}{"key": "value"})
113+
_, err := client.EnqueueJob(job)
114+
if err != nil {
115+
b.Fatal(err)
116+
}
117+
}
118+
}

0 commit comments

Comments
 (0)