Skip to content

Commit d375f73

Browse files
authored
Add GetByIds (#364)
1 parent 79acb60 commit d375f73

20 files changed

Lines changed: 4188 additions & 448 deletions

File tree

connectors/airbyte/conn.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
)
1616

1717
type source struct {
18+
adiomv1connect.UnimplementedConnectorServiceHandler
1819
dockerImage string
1920
config string
2021

connectors/dynamodb/conn.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
)
2525

2626
type conn struct {
27+
adiomv1connect.UnimplementedConnectorServiceHandler
2728
client *client
2829
streamsClient *dynamodbstreams.Client
2930
spec string

connectors/mongo/conn.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type buffer struct {
6464
}
6565

6666
type conn struct {
67+
adiomv1connect.UnimplementedConnectorServiceHandler
6768
client *mongo.Client
6869

6970
settings ConnectorSettings
@@ -326,6 +327,7 @@ func (c *conn) GetInfo(ctx context.Context, r *connect.Request[adiomv1.GetInfoRe
326327
LsnStream: true,
327328
MultiNamespacePlan: true,
328329
DefaultPlan: !c.settings.PerNamespaceStreams,
330+
GetByIds: true,
329331
},
330332
Sink: &adiomv1.Capabilities_Sink{
331333
SupportedDataTypes: []adiomv1.DataType{adiomv1.DataType_DATA_TYPE_MONGO_BSON},
@@ -1046,3 +1048,48 @@ func maybeUnavailableError(err error) error {
10461048
}
10471049
return connect.NewError(connect.CodeInternal, err)
10481050
}
1051+
1052+
// GetByIds implements adiomv1connect.ConnectorServiceHandler.
1053+
func (c *conn) GetByIds(ctx context.Context, r *connect.Request[adiomv1.GetByIdsRequest]) (*connect.Response[adiomv1.GetByIdsResponse], error) {
1054+
col, _, ok := GetCol(c.client, r.Msg.GetNamespace())
1055+
if !ok {
1056+
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("namespace should be fully qualified"))
1057+
}
1058+
1059+
// TODO: maybe use batch endpoint if we need to optimize
1060+
res := make([]*adiomv1.GetByIdsResponse_ResponseItem, len(r.Msg.GetIds()))
1061+
var eg errgroup.Group
1062+
for i, id := range r.Msg.GetIds() {
1063+
eg.Go(func() error {
1064+
if len(id.GetId()) < 1 {
1065+
res[i] = &adiomv1.GetByIdsResponse_ResponseItem{}
1066+
return nil
1067+
}
1068+
bv := id.GetId()[0]
1069+
rawVal := bson.RawValue{
1070+
Type: bsontype.Type(bv.GetType()),
1071+
Value: bv.GetData(),
1072+
}
1073+
v, err := col.FindOne(ctx, bson.M{"_id": rawVal}).Raw()
1074+
if err != nil {
1075+
if errors.Is(err, mongo.ErrNoDocuments) {
1076+
res[i] = &adiomv1.GetByIdsResponse_ResponseItem{}
1077+
return nil
1078+
}
1079+
return fmt.Errorf("err in findone: %w", err)
1080+
}
1081+
res[i] = &adiomv1.GetByIdsResponse_ResponseItem{
1082+
Data: v,
1083+
}
1084+
return nil
1085+
})
1086+
}
1087+
1088+
if err := eg.Wait(); err != nil {
1089+
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err finding ids: %w", err))
1090+
}
1091+
1092+
return connect.NewResponse(&adiomv1.GetByIdsResponse{
1093+
Data: res,
1094+
}), nil
1095+
}

connectors/null/connector.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
)
2222

2323
type conn struct {
24+
adiomv1connect.UnimplementedConnectorServiceHandler
2425
id string
2526
logJson bool
2627
sleep time.Duration
@@ -146,5 +147,10 @@ func (c *conn) WriteUpdates(ctx context.Context, r *connect.Request[adiomv1.Writ
146147
}
147148

148149
func NewConn(id string, logJson bool, sleep time.Duration, sleepJitter time.Duration) adiomv1connect.ConnectorServiceHandler {
149-
return &conn{id, logJson, sleep, sleepJitter}
150+
return &conn{
151+
id: id,
152+
logJson: logJson,
153+
sleep: sleep,
154+
sleepJitter: sleepJitter,
155+
}
150156
}

connectors/postgres/conn.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type PostgresSettings struct {
5252
}
5353

5454
type conn struct {
55+
adiomv1connect.UnimplementedConnectorServiceHandler
5556
replicationUrl string
5657
id uint64
5758
c *pgxpool.Pool

connectors/random/connector.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
)
2020

2121
type conn struct {
22+
adiomv1connect.UnimplementedConnectorServiceHandler
2223
settings ConnectorSettings
2324

2425
docMap map[string]*IndexMap //map of locations to map of document IDs

connectors/random/connv2.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
)
2121

2222
type connV2 struct {
23+
adiomv1connect.UnimplementedConnectorServiceHandler
2324
payload map[string]any
2425
namespacePrefix string
2526
initialSource []byte

connectors/s3vector/conn.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
)
2323

2424
type conn struct {
25+
adiomv1connect.UnimplementedConnectorServiceHandler
2526
client *s3vectors.Client
2627
bucketName *string
2728
vectorKey string

connectors/testconn/connector.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
)
3131

3232
type conn struct {
33+
adiomv1connect.UnimplementedConnectorServiceHandler
3334
bootstrapPath string
3435
updatesPath string
3536
loop bool

gen/adiom/v1/adiom.pb.go

Lines changed: 51 additions & 43 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)