Skip to content

Commit 7e9c9f9

Browse files
committed
Add tests for kinesis source reader errs
1 parent bfb189c commit 7e9c9f9

11 files changed

Lines changed: 194 additions & 86 deletions

File tree

connectors/kinesis/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type NewClientParams struct {
2727
// falls back to credentials config file.
2828
Profile string
2929
Credentials aws.CredentialsProvider
30+
Retryer aws.Retryer
3031
}
3132

3233
func NewClient(params *NewClientParams) (*Client, error) {
@@ -53,6 +54,9 @@ func NewClient(params *NewClientParams) (*Client, error) {
5354
if params.Endpoint != "" {
5455
opts.BaseEndpoint = ptr.New(params.Endpoint)
5556
}
57+
if params.Retryer != nil {
58+
opts.Retryer = params.Retryer
59+
}
5660
})
5761

5862
return &Client{svc: svc}, nil

connectors/kinesis/kinesisfake/describe_stream.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import (
55
"fmt"
66
"log/slog"
77
"strings"
8+
9+
kinesistypes "github.com/aws/aws-sdk-go-v2/service/kinesis/types"
10+
"reduction.dev/reduction/util/ptr"
811
)
912

1013
type DescribeStreamRequest struct {
@@ -66,10 +69,11 @@ func (f *Fake) describeStream(body []byte) (*DescribeStreamResponse, error) {
6669
}
6770

6871
stream := f.db.streams[request.StreamName]
69-
slog.Info("streams", "s", stream)
7072
if stream == nil {
7173
slog.Info("sending ResourceNotFoundException")
72-
return nil, &ResourceNotFoundException{}
74+
return nil, &kinesistypes.ResourceNotFoundException{
75+
Message: ptr.New(fmt.Sprintf("Stream %s not found.", request.StreamName)),
76+
}
7377
}
7478
return &DescribeStreamResponse{
7579
StreamDescription: StreamDescription{
Lines changed: 20 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,32 @@
11
package kinesisfake
22

33
import (
4+
"errors"
45
"fmt"
56
"net/http"
7+
8+
"github.com/aws/smithy-go"
69
)
710

11+
var errCodeToStatusCode = map[string]int{
12+
"ResourceNotFoundException": http.StatusNotFound,
13+
"ExpiredIteratorException": http.StatusBadRequest,
14+
"ProvisionedThroughputExceededException": http.StatusBadRequest,
15+
"AccessDeniedException": http.StatusForbidden,
16+
"InvalidArgumentException": http.StatusBadRequest,
17+
}
18+
819
func handleError(w http.ResponseWriter, err error) {
9-
if kerr, ok := err.(KinesisError); ok {
10-
w.WriteHeader(kerr.StatusCode())
11-
fmt.Fprintf(w, "{ \"__type\": \"%s\", \"message\": \"%s\" }", kerr.AWSExceptionCode(), kerr.Error())
12-
return
20+
var apiErr smithy.APIError
21+
if errors.As(err, &apiErr) {
22+
statusCode := errCodeToStatusCode[apiErr.ErrorCode()]
23+
if statusCode == 0 {
24+
statusCode = http.StatusInternalServerError
25+
}
26+
w.WriteHeader(statusCode)
27+
fmt.Fprintf(w, "{ \"__type\": \"%s\", \"message\": \"%s\" }", apiErr.ErrorCode(), apiErr.ErrorMessage())
1328
}
29+
1430
w.WriteHeader(http.StatusInternalServerError)
1531
fmt.Fprintf(w, "{ \"error\": \"Internal Server Error: %v\" }", err)
1632
}
@@ -20,59 +36,3 @@ type KinesisError interface {
2036
StatusCode() int
2137
AWSExceptionCode() string
2238
}
23-
24-
/* UnsupportedOperationError */
25-
26-
type UnsupportedOperationError struct {
27-
Operation string
28-
}
29-
30-
func (e *UnsupportedOperationError) Error() string {
31-
return fmt.Sprintf("Operation '%s' not supported", e.Operation)
32-
}
33-
34-
func (e *UnsupportedOperationError) StatusCode() int {
35-
return http.StatusNotImplemented
36-
}
37-
38-
/* ResourceNotFoundException */
39-
40-
type ResourceNotFoundException struct {
41-
message string
42-
}
43-
44-
func (e *ResourceNotFoundException) Error() string {
45-
if e.message != "" {
46-
return e.message
47-
}
48-
return "Resource not found"
49-
}
50-
51-
func (e *ResourceNotFoundException) StatusCode() int {
52-
return http.StatusBadRequest
53-
}
54-
55-
func (e *ResourceNotFoundException) AWSExceptionCode() string {
56-
return "ResourceNotFoundException"
57-
}
58-
59-
/* ExpiredIteratorException */
60-
61-
type ExpiredIteratorException struct {
62-
message string
63-
}
64-
65-
func (e *ExpiredIteratorException) Error() string {
66-
if e.message != "" {
67-
return e.message
68-
}
69-
return "Iterator expired"
70-
}
71-
72-
func (e *ExpiredIteratorException) StatusCode() int {
73-
return http.StatusBadRequest
74-
}
75-
76-
func (e *ExpiredIteratorException) AWSExceptionCode() string {
77-
return "ExpiredIteratorException"
78-
}

connectors/kinesis/kinesisfake/fake.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@ package kinesisfake
22

33
import (
44
"encoding/json"
5+
"fmt"
56
"io"
67
"log/slog"
78
"math/big"
89
"net/http"
910
"net/http/httptest"
1011
"strings"
1112
"sync/atomic"
13+
14+
kinesistypes "github.com/aws/aws-sdk-go-v2/service/kinesis/types"
15+
"reduction.dev/reduction/util/ptr"
1216
)
1317

1418
func StartFake() (*httptest.Server, *Fake) {
@@ -53,7 +57,7 @@ func route(f *Fake, w http.ResponseWriter, r *http.Request) {
5357
case "DeleteStream":
5458
resp, err = f.deleteStream(body)
5559
default:
56-
err = &UnsupportedOperationError{operation}
60+
err = &kinesistypes.InvalidArgumentException{Message: ptr.New(fmt.Sprintf("Invalid Operation: %s", operation))}
5761
}
5862

5963
if err != nil {
@@ -92,4 +96,5 @@ type Fake struct {
9296
db *db
9397
lastIteratorTimestamp atomic.Int64
9498
iteratorsExpirationAt atomic.Int64
99+
getRecordsError error
95100
}

connectors/kinesis/kinesisfake/expire_shard_iterators.go renamed to connectors/kinesis/kinesisfake/fake_control.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,9 @@ package kinesisfake
66
func (f *Fake) ExpireShardIterators() {
77
f.iteratorsExpirationAt.Store(f.lastIteratorTimestamp.Load())
88
}
9+
10+
// SetGetRecordsError sets an error that will be returned by GetRecords calls.
11+
// Set to nil to clear the error.
12+
func (f *Fake) SetGetRecordsError(err error) {
13+
f.getRecordsError = err
14+
}

connectors/kinesis/kinesisfake/get_records.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import (
66
"slices"
77
"strconv"
88
"strings"
9+
10+
kinesistypes "github.com/aws/aws-sdk-go-v2/service/kinesis/types"
11+
"reduction.dev/reduction/util/ptr"
912
)
1013

1114
type GetRecordsRequest struct {
@@ -30,6 +33,11 @@ type Record struct {
3033
}
3134

3235
func (f *Fake) getRecords(body []byte) (*GetRecordsResponse, error) {
36+
// Check if we have a simulated error
37+
if f.getRecordsError != nil {
38+
return nil, f.getRecordsError
39+
}
40+
3341
var request GetRecordsRequest
3442
err := json.Unmarshal(body, &request)
3543
if err != nil {
@@ -38,7 +46,9 @@ func (f *Fake) getRecords(body []byte) (*GetRecordsResponse, error) {
3846

3947
stream := f.db.streams[streamNameFromARN(request.StreamARN)]
4048
if stream == nil {
41-
return nil, &ResourceNotFoundException{fmt.Sprintf("no stream %s", request.StreamARN)}
49+
return nil, &kinesistypes.ResourceNotFoundException{
50+
Message: ptr.New(fmt.Sprintf("no stream %s", request.StreamARN)),
51+
}
4252
}
4353

4454
// Parse the iterator
@@ -49,8 +59,8 @@ func (f *Fake) getRecords(body []byte) (*GetRecordsResponse, error) {
4959

5060
// Check if the iterator has expired
5161
if int64(timestamp) <= f.iteratorsExpirationAt.Load() {
52-
return nil, &ExpiredIteratorException{
53-
message: fmt.Sprintf("Iterator %s has expired", request.ShardIterator),
62+
return nil, &kinesistypes.ExpiredIteratorException{
63+
Message: ptr.New(fmt.Sprintf("Iterator %s has expired", request.ShardIterator)),
5464
}
5565
}
5666

@@ -59,7 +69,9 @@ func (f *Fake) getRecords(body []byte) (*GetRecordsResponse, error) {
5969
return s.id == shardID
6070
})
6171
if shardIndex == -1 {
62-
return nil, &ResourceNotFoundException{fmt.Sprintf("no shard %s", request.ShardIterator)}
72+
return nil, &kinesistypes.ResourceNotFoundException{
73+
Message: ptr.New(fmt.Sprintf("no shard %s", request.ShardIterator)),
74+
}
6375
}
6476
shard := stream.shards[shardIndex]
6577

connectors/kinesis/kinesisfake/get_shard_iterator.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import (
44
"encoding/json"
55
"fmt"
66
"strconv"
7+
8+
kinesistypes "github.com/aws/aws-sdk-go-v2/service/kinesis/types"
9+
"reduction.dev/reduction/util/ptr"
710
)
811

912
type GetShardIteratorRequest struct {
@@ -28,7 +31,9 @@ func (f *Fake) getShardIterator(body []byte) (*GetShardIteratorResponse, error)
2831

2932
stream := f.db.streams[streamNameFromARN(request.StreamARN)]
3033
if stream == nil {
31-
return nil, &ResourceNotFoundException{}
34+
return nil, &kinesistypes.ResourceNotFoundException{
35+
Message: ptr.New(fmt.Sprintf("Stream %s not found", request.StreamARN)),
36+
}
3237
}
3338

3439
position := 0
@@ -45,7 +50,6 @@ func (f *Fake) getShardIterator(body []byte) (*GetShardIteratorResponse, error)
4550

4651
// Increment the timestamp for this iterator using atomic operations
4752
timestamp := f.lastIteratorTimestamp.Add(1)
48-
4953
shardIterator := shardIteratorFor(request.ShardId, timestamp, position)
5054

5155
return &GetShardIteratorResponse{ShardIterator: shardIterator}, nil

connectors/kinesis/kinesisfake/put_records.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import (
88
"math/big"
99
"strconv"
1010
"time"
11+
12+
kinesistypes "github.com/aws/aws-sdk-go-v2/service/kinesis/types"
13+
"reduction.dev/reduction/util/ptr"
1114
)
1215

1316
type PutRecordsRequest struct {
@@ -42,8 +45,8 @@ func (f *Fake) putRecords(body []byte) (*PutRecordsResponse, error) {
4245
streamName := streamNameFromARN(request.StreamARN)
4346
stream := f.db.streams[streamName]
4447
if stream == nil {
45-
return nil, &ResourceNotFoundException{
46-
message: fmt.Sprintf("Stream %s under account %s not found.", streamName, "123456789012"),
48+
return nil, &kinesistypes.ResourceNotFoundException{
49+
Message: ptr.New(fmt.Sprintf("Stream %s under account %s not found.", streamName, "123456789012")),
4750
}
4851
}
4952

connectors/kinesis/source_reader.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"github.com/aws/aws-sdk-go-v2/service/kinesis"
1010
kinesistypes "github.com/aws/aws-sdk-go-v2/service/kinesis/types"
11-
"github.com/aws/smithy-go"
1211
protocol "reduction.dev/reduction-protocol/kinesispb"
1312
"reduction.dev/reduction/connectors"
1413
"reduction.dev/reduction/connectors/kinesis/kinesispb"
@@ -173,18 +172,13 @@ func (s *SourceReader) refreshShardIterator(shard *kinesisShard) error {
173172
var _ connectors.SourceReader = (*SourceReader)(nil)
174173

175174
func sourceErrorFrom(err error) *connectors.SourceError {
176-
kinesisErrToConnectorErr := map[smithy.APIError]func(error) *connectors.SourceError{
177-
&kinesistypes.AccessDeniedException{}: connectors.NewTerminalError,
178-
&kinesistypes.InvalidArgumentException{}: connectors.NewTerminalError,
175+
switch {
176+
case errors.As(err, new(*kinesistypes.AccessDeniedException)):
177+
return connectors.NewTerminalError(err)
178+
case errors.As(err, new(*kinesistypes.InvalidArgumentException)):
179+
return connectors.NewTerminalError(err)
180+
default:
181+
// All other errors are retryable
182+
return connectors.NewRetryableError(err)
179183
}
180-
181-
// Check if the error is a terminal error
182-
for kinesisErr, connectorErr := range kinesisErrToConnectorErr {
183-
if errors.As(err, &kinesisErr) {
184-
return connectorErr(err)
185-
}
186-
}
187-
188-
// Default to retryable for unknown errors
189-
return connectors.NewRetryableError(err)
190184
}

0 commit comments

Comments
 (0)