-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathloop.go
More file actions
37 lines (34 loc) · 807 Bytes
/
loop.go
File metadata and controls
37 lines (34 loc) · 807 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package pubsub
import (
"context"
"github.com/pkg/errors"
"github.com/spoke-d/task"
"github.com/spoke-d/task/tomb"
)
// Loop takes a task.Func and iterates until cancel is called.
// This is useful for exhausting a subscriber without caring about a
// schedule or interval.
func Loop(fn task.Func) (func() error, error) {
tomb := tomb.New(false)
if err := tomb.Go(func(ctx context.Context) error {
LOOP:
for {
err := fn(ctx)
switch {
case errors.Cause(err) == task.ErrSkip:
continue LOOP
case errors.Cause(err) == task.ErrTerminate:
fallthrough
case err != nil:
return errors.WithStack(err)
default:
continue LOOP
}
}
}); err != nil {
return func() error { return nil }, errors.WithStack(err)
}
return func() error {
return tomb.Kill(nil)
}, nil
}