Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions connectors/airbyte/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

type source struct {
adiomv1connect.UnimplementedConnectorServiceHandler
dockerImage string
config string

Expand Down
1 change: 1 addition & 0 deletions connectors/dynamodb/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

type conn struct {
adiomv1connect.UnimplementedConnectorServiceHandler
client *client
streamsClient *dynamodbstreams.Client
spec string
Expand Down
47 changes: 47 additions & 0 deletions connectors/mongo/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type buffer struct {
}

type conn struct {
adiomv1connect.UnimplementedConnectorServiceHandler
client *mongo.Client

settings ConnectorSettings
Expand Down Expand Up @@ -326,6 +327,7 @@ func (c *conn) GetInfo(ctx context.Context, r *connect.Request[adiomv1.GetInfoRe
LsnStream: true,
MultiNamespacePlan: true,
DefaultPlan: !c.settings.PerNamespaceStreams,
GetByIds: true,
},
Sink: &adiomv1.Capabilities_Sink{
SupportedDataTypes: []adiomv1.DataType{adiomv1.DataType_DATA_TYPE_MONGO_BSON},
Expand Down Expand Up @@ -1046,3 +1048,48 @@ func maybeUnavailableError(err error) error {
}
return connect.NewError(connect.CodeInternal, err)
}

// GetByIds implements adiomv1connect.ConnectorServiceHandler.
func (c *conn) GetByIds(ctx context.Context, r *connect.Request[adiomv1.GetByIdsRequest]) (*connect.Response[adiomv1.GetByIdsResponse], error) {
col, _, ok := GetCol(c.client, r.Msg.GetNamespace())
if !ok {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("namespace should be fully qualified"))
}

// TODO: maybe use batch endpoint if we need to optimize
res := make([]*adiomv1.GetByIdsResponse_ResponseItem, len(r.Msg.GetIds()))
var eg errgroup.Group
for i, id := range r.Msg.GetIds() {
eg.Go(func() error {
if len(id.GetId()) < 1 {
res[i] = &adiomv1.GetByIdsResponse_ResponseItem{}
return nil
}
bv := id.GetId()[0]
rawVal := bson.RawValue{
Type: bsontype.Type(bv.GetType()),
Value: bv.GetData(),
}
v, err := col.FindOne(ctx, bson.M{"_id": rawVal}).Raw()
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
res[i] = &adiomv1.GetByIdsResponse_ResponseItem{}
return nil
}
return fmt.Errorf("err in findone: %w", err)
}
res[i] = &adiomv1.GetByIdsResponse_ResponseItem{
Data: v,
}
return nil
})
}

if err := eg.Wait(); err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err finding ids: %w", err))
}

return connect.NewResponse(&adiomv1.GetByIdsResponse{
Data: res,
}), nil
}
8 changes: 7 additions & 1 deletion connectors/null/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
)

type conn struct {
adiomv1connect.UnimplementedConnectorServiceHandler
id string
logJson bool
sleep time.Duration
Expand Down Expand Up @@ -146,5 +147,10 @@ func (c *conn) WriteUpdates(ctx context.Context, r *connect.Request[adiomv1.Writ
}

func NewConn(id string, logJson bool, sleep time.Duration, sleepJitter time.Duration) adiomv1connect.ConnectorServiceHandler {
return &conn{id, logJson, sleep, sleepJitter}
return &conn{
id: id,
logJson: logJson,
sleep: sleep,
sleepJitter: sleepJitter,
}
}
1 change: 1 addition & 0 deletions connectors/postgres/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type PostgresSettings struct {
}

type conn struct {
adiomv1connect.UnimplementedConnectorServiceHandler
replicationUrl string
id uint64
c *pgxpool.Pool
Expand Down
1 change: 1 addition & 0 deletions connectors/random/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
)

type conn struct {
adiomv1connect.UnimplementedConnectorServiceHandler
settings ConnectorSettings

docMap map[string]*IndexMap //map of locations to map of document IDs
Expand Down
1 change: 1 addition & 0 deletions connectors/random/connv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

type connV2 struct {
adiomv1connect.UnimplementedConnectorServiceHandler
payload map[string]any
namespacePrefix string
initialSource []byte
Expand Down
1 change: 1 addition & 0 deletions connectors/s3vector/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
)

type conn struct {
adiomv1connect.UnimplementedConnectorServiceHandler
client *s3vectors.Client
bucketName *string
vectorKey string
Expand Down
1 change: 1 addition & 0 deletions connectors/testconn/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
)

type conn struct {
adiomv1connect.UnimplementedConnectorServiceHandler
bootstrapPath string
updatesPath string
loop bool
Expand Down
94 changes: 51 additions & 43 deletions gen/adiom/v1/adiom.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions gen/adiom/v1/adiom_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading