forked from connectrpc/connect-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinterceptor.go
More file actions
138 lines (119 loc) · 4.68 KB
/
interceptor.go
File metadata and controls
138 lines (119 loc) · 4.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright 2021-2025 The Connect Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package connect
import (
"context"
"errors"
)
var (
// errNewClientContextProhibited signals that a new client context was created
// in an interceptor, which is prohibited.
errNewClientContextProhibited = errors.New("creating a new context in an interceptor is prohibited")
)
// UnaryFunc is the generic signature of a unary RPC. Interceptors may wrap
// Funcs.
//
// The type of the request and response structs depend on the codec being used.
// When using Protobuf, request.Any() and response.Any() will always be
// [proto.Message] implementations.
type UnaryFunc func(context.Context, AnyRequest) (AnyResponse, error)
// StreamingClientFunc is the generic signature of a streaming RPC from the client's
// perspective. Interceptors may wrap StreamingClientFuncs.
type StreamingClientFunc func(context.Context, Spec) StreamingClientConn
// StreamingHandlerFunc is the generic signature of a streaming RPC from the
// handler's perspective. Interceptors may wrap StreamingHandlerFuncs.
type StreamingHandlerFunc func(context.Context, StreamingHandlerConn) error
// An Interceptor adds logic to a generated handler or client, like the
// decorators or middleware you may have seen in other libraries. Interceptors
// may mutate requests and responses, handle errors, retry, recover from panics,
// emit logs and metrics, or do nearly anything else.
//
// The returned functions must be safe to call concurrently.
type Interceptor interface {
WrapUnary(UnaryFunc) UnaryFunc
WrapStreamingClient(StreamingClientFunc) StreamingClientFunc
WrapStreamingHandler(StreamingHandlerFunc) StreamingHandlerFunc
}
// UnaryInterceptorFunc is a simple Interceptor implementation that only
// wraps unary RPCs. It has no effect on streaming RPCs.
type UnaryInterceptorFunc func(UnaryFunc) UnaryFunc
// WrapUnary implements [Interceptor] by applying the interceptor function.
func (f UnaryInterceptorFunc) WrapUnary(next UnaryFunc) UnaryFunc { return f(next) }
// WrapStreamingClient implements [Interceptor] with a no-op.
func (f UnaryInterceptorFunc) WrapStreamingClient(next StreamingClientFunc) StreamingClientFunc {
return next
}
// WrapStreamingHandler implements [Interceptor] with a no-op.
func (f UnaryInterceptorFunc) WrapStreamingHandler(next StreamingHandlerFunc) StreamingHandlerFunc {
return next
}
// A chain composes multiple interceptors into one.
type chain struct {
interceptors []Interceptor
}
// newChain composes multiple interceptors into one.
func newChain(interceptors []Interceptor) *chain {
// We usually wrap in reverse order to have the first interceptor from
// the slice act first. Rather than doing this dance repeatedly, reverse the
// interceptor order now.
var chain chain
for i := len(interceptors) - 1; i >= 0; i-- {
if interceptor := interceptors[i]; interceptor != nil {
chain.interceptors = append(chain.interceptors, interceptor)
}
}
return &chain
}
func (c *chain) WrapUnary(next UnaryFunc) UnaryFunc {
for _, interceptor := range c.interceptors {
next = unaryThunk(next)
next = interceptor.WrapUnary(next)
}
return next
}
func (c *chain) WrapStreamingClient(next StreamingClientFunc) StreamingClientFunc {
for _, interceptor := range c.interceptors {
next = streamingClientThunk(next)
next = interceptor.WrapStreamingClient(next)
}
return next
}
func (c *chain) WrapStreamingHandler(next StreamingHandlerFunc) StreamingHandlerFunc {
for _, interceptor := range c.interceptors {
next = interceptor.WrapStreamingHandler(next)
}
return next
}
func unaryThunk(next UnaryFunc) UnaryFunc {
return func(ctx context.Context, req AnyRequest) (AnyResponse, error) {
if err := checkSentinel(ctx); err != nil {
return nil, err
}
return next(ctx, req)
}
}
func streamingClientThunk(next StreamingClientFunc) StreamingClientFunc {
return func(ctx context.Context, spec Spec) StreamingClientConn {
if err := checkSentinel(ctx); err != nil {
return &errStreamingClientConn{err: err}
}
return next(ctx, spec)
}
}
func checkSentinel(ctx context.Context) error {
if ctx.Value(clientCallInfoContextKey{}) != ctx.Value(sentinelContextKey{}) {
return errNewClientContextProhibited
}
return nil
}