-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathenvelope.go
More file actions
145 lines (134 loc) · 4.23 KB
/
envelope.go
File metadata and controls
145 lines (134 loc) · 4.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package arcp
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"sync"
)
// Envelope is the wire-level frame for every ARCP message. Field
// names map exactly onto the JSON object documented in the spec.
type Envelope struct {
// ARCP is the protocol version literal. Always "1.1" on the wire.
ARCP string `json:"arcp"`
// ID is a unique message identifier (UUIDv7 recommended).
ID string `json:"id"`
// Type is the wire-type token, for example "job.submit".
Type string `json:"type"`
// SessionID identifies the owning session; empty on session.hello.
SessionID string `json:"session_id,omitempty"`
// JobID is set on job-scoped envelopes (job.*).
JobID string `json:"job_id,omitempty"`
// TraceID is a 32-hex-char W3C trace identifier when present.
TraceID string `json:"trace_id,omitempty"`
// EventSeq is the session-scoped monotonic sequence for
// job.event / job.result / job.error. Zero on every other type.
EventSeq uint64 `json:"event_seq,omitempty"`
// Payload is the typed payload, kept raw so the dispatch loop can
// hand it off to a registered MessageType without parsing twice.
Payload json.RawMessage `json:"payload,omitempty"`
// Extensions carries x-vendor.* namespaced opaque fields per the
// spec extensions namespace.
Extensions map[string]json.RawMessage `json:"extensions,omitempty"`
}
// Validate reports a structured *Error if the envelope is missing
// fields the protocol mandates.
func (e *Envelope) Validate() error {
if e.ARCP != ProtocolVersion {
return ErrInvalidRequest.WithMessage(fmt.Sprintf("envelope arcp must be %q", ProtocolVersion))
}
if e.ID == "" {
return ErrInvalidRequest.WithMessage("envelope id is required")
}
if e.Type == "" {
return ErrInvalidRequest.WithMessage("envelope type is required")
}
return nil
}
// NewEnvelope returns an Envelope with arcp/id pre-populated.
func NewEnvelope(typ string, payload any) (Envelope, error) {
body, err := MarshalPayload(payload)
if err != nil {
return Envelope{}, err
}
return Envelope{
ARCP: ProtocolVersion,
ID: NewEnvelopeID(),
Type: typ,
Payload: body,
}, nil
}
// MarshalPayload encodes any value to json.RawMessage. nil and
// already-raw values pass through.
func MarshalPayload(payload any) (json.RawMessage, error) {
switch v := payload.(type) {
case nil:
return nil, nil
case json.RawMessage:
return v, nil
case []byte:
return json.RawMessage(v), nil
}
body, err := json.Marshal(payload)
if err != nil {
return nil, ErrInvalidRequest.WithMessage("payload marshal: " + err.Error())
}
return body, nil
}
// DecodePayload unmarshals e.Payload into v.
func (e *Envelope) DecodePayload(v any) error {
if len(e.Payload) == 0 {
return nil
}
dec := json.NewDecoder(bytes.NewReader(e.Payload))
if err := dec.Decode(v); err != nil {
return ErrInvalidRequest.WithMessage("payload decode: " + err.Error())
}
return nil
}
// MessageType is implemented by every typed payload struct registered
// against the envelope dispatch table.
type MessageType interface {
ARCPType() string
}
var (
registryMu sync.RWMutex
registry = map[string]reflect.Type{}
)
// RegisterMessageType associates the concrete struct behind m with its
// wire-type string. Each wire-type string may only be registered once;
// duplicate registration panics. Call from init() in messages/*.go.
func RegisterMessageType(m MessageType) {
t := m.ARCPType()
if t == "" {
panic("arcp: RegisterMessageType called with empty type")
}
registryMu.Lock()
defer registryMu.Unlock()
if existing, ok := registry[t]; ok {
panic(fmt.Sprintf("arcp: duplicate MessageType registration for %q (existing: %v)", t, existing))
}
registry[t] = reflect.TypeOf(m).Elem()
}
// NewPayloadForType returns a zero value of the registered type for typ
// suitable for json.Unmarshal. It returns nil if typ is not registered.
func NewPayloadForType(typ string) any {
registryMu.RLock()
defer registryMu.RUnlock()
t, ok := registry[typ]
if !ok {
return nil
}
return reflect.New(t).Interface()
}
// RegisteredTypes returns the names of every registered MessageType,
// useful for diagnostics.
func RegisteredTypes() []string {
registryMu.RLock()
defer registryMu.RUnlock()
out := make([]string, 0, len(registry))
for k := range registry {
out = append(out, k)
}
return out
}