-
Notifications
You must be signed in to change notification settings - Fork 160
Expand file tree
/
Copy pathresumable.go
More file actions
184 lines (158 loc) · 5.41 KB
/
Copy pathresumable.go
File metadata and controls
184 lines (158 loc) · 5.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package river
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/riverqueue/river/internal/rivermiddleware"
)
var (
errResumableStepNotInWorker = errors.New("river: resumable step can only be used within a Worker")
errResumableCursorNotInStep = errors.New("river: resumable cursor can only be used within ResumableStepCursor")
)
// ResumableSetCursor records a cursor for the current resumable cursor step.
// The cursor is stored only if the job attempt ends in an error, allowing a
// later retry to resume the same step from the recorded position.
//
// Alternatively, ResumableSetStepCursorTx is available to persist a step and
// cursor immediately as part of a transaction, guaranteeing that it's stored
// durably.
func ResumableSetCursor[TCursor any](ctx context.Context, cursor TCursor) error {
state := mustResumableState(ctx)
if state.StepName == "" {
return errResumableCursorNotInStep
}
cursorBytes, err := json.Marshal(cursor)
if err != nil {
return err
}
if state.Cursors == nil {
state.Cursors = make(map[string]json.RawMessage)
}
state.Cursors[state.StepName] = json.RawMessage(cursorBytes)
return nil
}
// StepOpts are options for ResumableStep and ResumableStepCursor. There are
// currently no available options, but this space is reserved for future use.
type StepOpts struct{}
// ResumableStep runs a resumable step, skipping the step on a later retry if
// an earlier attempt already completed it successfully.
// Step names must be unique across all ResumableStep and ResumableStepCursor
// calls in the same Worker execution.
//
// After a step returns an error, no subsequent steps will be run and the
// overall job will be marked as failed with that error. Be careful to put all
// executable code in steps, because any code outside of them will be run, even
// if a step returned an error.
//
// opts may be nil.
func ResumableStep(ctx context.Context, name string, opts *StepOpts, stepFunc func(ctx context.Context) error) {
state := mustResumableState(ctx)
if state.Err != nil {
return
}
if !registerResumableStepName(state, name) {
return
}
if !state.ResumeMatched {
if name == state.ResumeStep {
state.CompletedStep = name
state.ResumeMatched = true
}
return
}
previousStepName := state.StepName
state.StepName = name
defer func() { state.StepName = previousStepName }()
if err := stepFunc(ctx); err != nil {
state.Err = err
return
}
state.CompletedStep = name
}
// ResumableStepCursor runs a resumable step that also receives a persisted
// cursor value from an earlier failed attempt, if one was recorded with
// ResumableSetCursor.
// Step names must be unique across all ResumableStep and ResumableStepCursor
// calls in the same Worker execution.
//
// The cursor type T is user-specified. It may be a primitive value like an
// integer ID, or a more complex type like a struct with multiple fields. It's
// stored in a job's metadata, so it needs to be marshable and unmarshable to
// and from JSON.
//
// Notably, it's the responsibility of the step function to call
// ResumableSetCursor with an updated cursor value as progress is made, and to
// check the cursor value before running to determine where to resume from.
//
// After a step returns an error, no subsequent steps will be run and the
// overall job will be marked as failed with that error. Be careful to put all
// executable code in steps, because any code outside of them will be run, even
// if a step returned an error.
//
// opts may be nil.
func ResumableStepCursor[TCursor any](ctx context.Context, name string, opts *StepOpts, stepFunc func(ctx context.Context, cursor TCursor) error) {
state := mustResumableState(ctx)
if state.Err != nil {
return
}
if !registerResumableStepName(state, name) {
return
}
if !state.ResumeMatched {
if name == state.ResumeStep {
state.CompletedStep = name
state.ResumeMatched = true
// If cursor data exists for this step, it was only partially
// completed on the previous attempt. Fall through to re-execute
// it with the cursor rather than skipping it.
if _, hasCursor := state.Cursors[name]; !hasCursor {
return
}
} else {
return
}
}
var cursor TCursor
if cursorBytes, ok := state.Cursors[name]; ok && len(cursorBytes) > 0 {
if err := json.Unmarshal(cursorBytes, &cursor); err != nil {
state.Err = fmt.Errorf("river: unmarshal resumable cursor for step %q: %w", name, err)
return
}
}
previousStepName := state.StepName
state.StepName = name
defer func() { state.StepName = previousStepName }()
if err := stepFunc(ctx, cursor); err != nil {
state.Err = err
return
}
state.CompletedStep = name
delete(state.Cursors, name)
}
func mustResumableState(ctx context.Context) *rivermiddleware.ResumableState {
state, ok := resumableStateFromContext(ctx)
if !ok {
panic(errResumableStepNotInWorker)
}
return state
}
func registerResumableStepName(state *rivermiddleware.ResumableState, name string) bool {
if _, ok := state.AllStepNames[name]; ok {
state.Err = fmt.Errorf("river: duplicate resumable step name %q", name)
return false
}
state.AllStepNames[name] = struct{}{}
return true
}
func resumableStateFromContext(ctx context.Context) (*rivermiddleware.ResumableState, bool) {
state := ctx.Value(rivermiddleware.ResumableContextKey{})
if state == nil {
return nil, false
}
typedState, ok := state.(*rivermiddleware.ResumableState)
if !ok || typedState == nil {
return nil, false
}
return typedState, true
}