-
Notifications
You must be signed in to change notification settings - Fork 143
Expand file tree
/
Copy pathrecorded_output.go
More file actions
79 lines (71 loc) · 3.03 KB
/
recorded_output.go
File metadata and controls
79 lines (71 loc) · 3.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package river
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/riverqueue/river/internal/jobexecutor"
"github.com/riverqueue/river/rivertype"
)
const (
maxOutputSizeMB = 32
maxOutputSizeBytes = maxOutputSizeMB * 1024 * 1024
)
// RecordOutput records output JSON from a job. The "output" can be any
// JSON-encodable value and will be stored in the database on the job row after
// the current execution attempt completes. Output may be useful for debugging,
// or for storing the result of a job temporarily without needing to create a
// dedicated table to keep it in.
//
// For example, with workflows, it's common for subsequent task to depend on
// something done in an earlier dependency task. Consider the creation of an
// external resource in another API or in an database—it will typically have a
// unique ID that must be used to reference the resource later. A later step
// may require that info in order to complete its work, and the output can be
// a convenient way to store that info.
//
// Output is stored in the job's metadata under the `"output"` key
// ([github.com/riverqueue/river/rivertype.MetadataKeyOutput]).
// This function must be called within an Worker's Work function. It returns an
// error if called anywhere else. As with any stored value, care should be taken
// to ensure that the payload size is not too large. Output is limited to 32MB
// in size for safety, but should be kept much smaller than this.
//
// Only one output can be stored per job. If this function is called more than
// once, the output will be overwritten with the latest value. The output also
// must be recorded _before_ the job finishes executing so that it can be stored
// when the job's row is updated.
//
// Once recorded, the output is stored regardless of the outcome of the
// execution attempt (success, error, panic, etc.).
//
// RecordOutput always stores output lazily as a job is being completed (whether
// that's completion to success or failure). Client.JobUpdate and JobUpdateTx
// are available to store output eagerly at any time, including from inside a
// work function as the job is being executed.
//
// The output is marshalled to JSON as part of this function and it will return
// an error if the output is not JSON-encodable.
func RecordOutput(ctx context.Context, output any) error {
metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
if !hasMetadataUpdates {
return errors.New("RecordOutput must be called within a Worker")
}
outputBytes, err := json.Marshal(output)
if err != nil {
return err
}
if err := checkOutputSize(outputBytes); err != nil {
return err
}
metadataUpdates[rivertype.MetadataKeyOutput] = json.RawMessage(outputBytes)
return nil
}
// Postgres JSONB is limited to 255MB, but it would be a bad idea to get
// anywhere close to that limit for output.
func checkOutputSize(outputBytes []byte) error {
if len(outputBytes) > maxOutputSizeBytes {
return fmt.Errorf("output is too large: %d bytes (max %d MB)", len(outputBytes), maxOutputSizeMB)
}
return nil
}