Skip to content

Commit df28fc2

Browse files
Mark CMark C
authored andcommitted
Add basic getbyids impl to mongo and cosmos.
1 parent c530d23 commit df28fc2

3 files changed

Lines changed: 151 additions & 13 deletions

File tree

connectors/mongo/conn.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ func (c *conn) GetInfo(ctx context.Context, r *connect.Request[adiomv1.GetInfoRe
327327
LsnStream: true,
328328
MultiNamespacePlan: true,
329329
DefaultPlan: !c.settings.PerNamespaceStreams,
330+
GetByIds: true,
330331
},
331332
Sink: &adiomv1.Capabilities_Sink{
332333
SupportedDataTypes: []adiomv1.DataType{adiomv1.DataType_DATA_TYPE_MONGO_BSON},
@@ -1047,3 +1048,48 @@ func maybeUnavailableError(err error) error {
10471048
}
10481049
return connect.NewError(connect.CodeInternal, err)
10491050
}
1051+
1052+
// GetByIds implements adiomv1connect.ConnectorServiceHandler.
1053+
func (c *conn) GetByIds(ctx context.Context, r *connect.Request[adiomv1.GetByIdsRequest]) (*connect.Response[adiomv1.GetByIdsResponse], error) {
1054+
col, _, ok := GetCol(c.client, r.Msg.GetNamespace())
1055+
if !ok {
1056+
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("namespace should be fully qualified"))
1057+
}
1058+
1059+
// TODO: maybe use batch endpoint if we need to optimize
1060+
res := make([]*adiomv1.GetByIdsResponse_ResponseItem, len(r.Msg.GetIds()))
1061+
var eg errgroup.Group
1062+
for i, id := range r.Msg.GetIds() {
1063+
eg.Go(func() error {
1064+
if len(id.GetId()) < 1 {
1065+
res[i] = &adiomv1.GetByIdsResponse_ResponseItem{}
1066+
return nil
1067+
}
1068+
bv := id.GetId()[0]
1069+
rawVal := bson.RawValue{
1070+
Type: bsontype.Type(bv.GetType()),
1071+
Value: bv.GetData(),
1072+
}
1073+
v, err := col.FindOne(ctx, bson.M{"_id": rawVal}).Raw()
1074+
if err != nil {
1075+
if errors.Is(err, mongo.ErrNoDocuments) {
1076+
res[i] = &adiomv1.GetByIdsResponse_ResponseItem{}
1077+
return nil
1078+
}
1079+
return fmt.Errorf("err in findone: %w", err)
1080+
}
1081+
res[i] = &adiomv1.GetByIdsResponse_ResponseItem{
1082+
Data: v,
1083+
}
1084+
return nil
1085+
})
1086+
}
1087+
1088+
if err := eg.Wait(); err != nil {
1089+
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err finding ids: %w", err))
1090+
}
1091+
1092+
return connect.NewResponse(&adiomv1.GetByIdsResponse{
1093+
Data: res,
1094+
}), nil
1095+
}

java/src/main/java/adiom/BsonHelper.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,26 @@
1414

1515
public class BsonHelper {
1616

17+
public static List<String> getIdParts(List<adiom.v1.Messages.BsonValue> bvs) {
18+
if (bvs.isEmpty()) {
19+
throw new IllegalArgumentException("Must not have empty ids.");
20+
}
21+
List<String> res = new java.util.ArrayList<>(bvs.size());
22+
for (adiom.v1.Messages.BsonValue bv : bvs) {
23+
BsonType typ = BsonType.findByValue(bv.getType());
24+
ByteBufferBsonInput input = new ByteBufferBsonInput(new ByteBufNIO(ByteBuffer.wrap(bv.getData().toByteArray())));
25+
if (typ == BsonType.STRING) {
26+
String s = input.readString();
27+
input.close();
28+
res.add(s);
29+
} else {
30+
input.close();
31+
throw new IllegalArgumentException("Only String id currently supported.");
32+
}
33+
}
34+
return res;
35+
}
36+
1737
public static String getId(List<adiom.v1.Messages.BsonValue> bvs) {
1838
if (bvs.isEmpty()) {
1939
throw new IllegalArgumentException("Must not have empty ids.");
@@ -31,12 +51,12 @@ public static String getId(List<adiom.v1.Messages.BsonValue> bvs) {
3151
}
3252
}
3353

34-
public static adiom.v1.Messages.BsonValue toId(String id) {
54+
public static adiom.v1.Messages.BsonValue toId(String key, String id) {
3555
BasicOutputBuffer outputBuffer = new BasicOutputBuffer(id.length() + 5);
3656
outputBuffer.writeString(id);
3757
ByteString bs = ByteString.copyFrom(outputBuffer.getInternalBuffer(), 0, outputBuffer.getSize());
3858
outputBuffer.close();
39-
return adiom.v1.Messages.BsonValue.newBuilder().setName("id").setData(bs).setType(BsonType.STRING.getValue()).build();
59+
return adiom.v1.Messages.BsonValue.newBuilder().setName(key).setData(bs).setType(BsonType.STRING.getValue()).build();
4060
}
4161

4262
public static PartitionKey getPartitionKey(List<adiom.v1.Messages.BsonValue> bvs) {

java/src/main/java/adiom/Main.java

Lines changed: 83 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.azure.cosmos.models.CosmosBulkOperationResponse;
2929
import com.azure.cosmos.models.CosmosBulkOperations;
3030
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
31+
import com.azure.cosmos.models.CosmosItemIdentity;
3132
import com.azure.cosmos.models.CosmosItemOperation;
3233
import com.azure.cosmos.models.CosmosQueryRequestOptions;
3334
import com.azure.cosmos.models.FeedRange;
@@ -48,6 +49,8 @@
4849
import adiom.v1.Messages.DataType;
4950
import adiom.v1.Messages.GeneratePlanRequest;
5051
import adiom.v1.Messages.GeneratePlanResponse;
52+
import adiom.v1.Messages.GetByIdsRequest;
53+
import adiom.v1.Messages.GetByIdsResponse;
5154
import adiom.v1.Messages.GetInfoRequest;
5255
import adiom.v1.Messages.GetInfoResponse;
5356
import adiom.v1.Messages.GetNamespaceMetadataRequest;
@@ -204,7 +207,8 @@ public void getInfo(GetInfoRequest request, StreamObserver<GetInfoResponse> resp
204207
.setCapabilities(Capabilities.newBuilder()
205208
.setSource(Source.newBuilder()
206209
.addSupportedDataTypes(DataType.DATA_TYPE_JSON_ID)
207-
.setMultiNamespacePlan(true))
210+
.setMultiNamespacePlan(true)
211+
.setGetByIds(true))
208212
.setSink(Sink.newBuilder()
209213
.addSupportedDataTypes(DataType.DATA_TYPE_JSON_ID)))
210214
.build());
@@ -406,7 +410,7 @@ public void generatePlan(GeneratePlanRequest request, StreamObserver<GeneratePla
406410
}
407411

408412
CosmosChangeFeedRequestOptions ccfro = CosmosChangeFeedRequestOptions
409-
.createForProcessingFromNow(FeedRange.forFullRange()).setMaxItemCount(1);
413+
.createForProcessingFromNow(FeedRange.forFullRange()).setMaxItemCount(1).allVersionsAndDeletes();
410414
UpdatesPartition.Builder updatesPartitionBuilder = UpdatesPartition.newBuilder()
411415
.addNamespaces(namespace);
412416
for (FeedResponse<Object> fr : helper.container.queryChangeFeed(ccfro, Object.class).iterableByPage()) {
@@ -440,6 +444,57 @@ private synchronized long nextIndex() {
440444
return currentIndex++;
441445
}
442446

447+
@Override
448+
public void getByIds(GetByIdsRequest request, StreamObserver<GetByIdsResponse> responseObserver) {
449+
GetByIdsResponse.Builder respBuilder = GetByIdsResponse.newBuilder();
450+
java.util.Map<String, Integer> m = new java.util.HashMap<>();
451+
List<CosmosItemIdentity> itemsToRead = new ArrayList<>();
452+
453+
NsHelper helper = getNsHelper(responseObserver, request.getNamespace());
454+
if (helper == null) {
455+
return;
456+
}
457+
List<String> idKeys = new ArrayList<>();
458+
for (String path: helper.pkd.getPaths()) {
459+
idKeys.add(path.substring(1));
460+
}
461+
if (!idKeys.getLast().equals("id")) {
462+
idKeys.add("id");
463+
}
464+
465+
int i = 0;
466+
for (GetByIdsRequest.IdRequest id : request.getIdsList()) {
467+
PartitionKey pk = BsonHelper.getPartitionKey(id.getIdList());
468+
String idPart = BsonHelper.getId(id.getIdList());
469+
List<String> idParts = BsonHelper.getIdParts(id.getIdList());
470+
itemsToRead.add(new CosmosItemIdentity(pk, idPart));
471+
StringBuilder sb = new StringBuilder();
472+
for (String s : idParts) {
473+
sb.append(s);
474+
sb.append(",");
475+
}
476+
m.put(sb.toString(), i);
477+
respBuilder.addData(GetByIdsResponse.ResponseItem.getDefaultInstance());
478+
i++;
479+
}
480+
FeedResponse<JsonNode> fr = helper.container.readMany(itemsToRead, JsonNode.class);
481+
for (JsonNode n : fr.getResults()) {
482+
StringBuilder sb = new StringBuilder();
483+
for (String s : idKeys) {
484+
sb.append(n.get(s).asText());
485+
sb.append(",");
486+
}
487+
int idx = m.get(sb.toString());
488+
ObjectNode objectNode = (ObjectNode) (n);
489+
objectNode.remove(CosmosInternalKeys);
490+
ByteString data = ByteString.copyFromUtf8(n.toString());
491+
respBuilder.setData(idx, GetByIdsResponse.ResponseItem.newBuilder().setData(data));
492+
}
493+
494+
responseObserver.onNext(respBuilder.build());
495+
responseObserver.onCompleted();
496+
}
497+
443498
@Override
444499
public void listData(ListDataRequest request, StreamObserver<ListDataResponse> responseObserver) {
445500
String namespace = request.getPartition().getNamespace();
@@ -545,6 +600,13 @@ public void streamUpdates(StreamUpdatesRequest request,
545600
if (helper == null) {
546601
return;
547602
}
603+
List<String> idKeys = new ArrayList<>();
604+
for (String path: helper.pkd.getPaths()) {
605+
idKeys.add(path.substring(1));
606+
}
607+
if (!idKeys.getLast().equals("id")) {
608+
idKeys.add("id");
609+
}
548610

549611
String continuation = request.getCursor().toStringUtf8();
550612

@@ -556,22 +618,32 @@ public void streamUpdates(StreamUpdatesRequest request,
556618
List<Update> updates = new ArrayList<>();
557619
for (JsonNode node : fr.getResults()) {
558620
JsonNode opType = node.get("metadata").get("operationType");
559-
if (opType != null && opType.asText() == "delete") {
560-
String id = node.get("metadata").get("id").asText();
561-
updates.add(Update.newBuilder().setType(adiom.v1.Messages.UpdateType.UPDATE_TYPE_DELETE)
562-
.addId(BsonHelper.toId(id)).build());
621+
if (opType != null && opType.asText().equals("delete")) {
622+
Update.Builder b = Update.newBuilder().setType(adiom.v1.Messages.UpdateType.UPDATE_TYPE_DELETE);
623+
for (String k : idKeys) {
624+
if (k.equals("id")) {
625+
b.addId(BsonHelper.toId(k, node.get("metadata").get(k).asText()));
626+
} else {
627+
b.addId(BsonHelper.toId(k, node.get("metadata").get("partitionKey").get(k).asText()));
628+
}
629+
}
630+
updates.add(b.build());
563631
} else {
564632
adiom.v1.Messages.UpdateType typ = adiom.v1.Messages.UpdateType.UPDATE_TYPE_UPDATE;
565-
if (opType != null && opType.asText() == "create") {
633+
if (opType != null && opType.asText().equals("create")) {
566634
typ = adiom.v1.Messages.UpdateType.UPDATE_TYPE_INSERT;
567635
}
568636
JsonNode currentNode = node.get("current");
569637
ObjectNode objectNode = (ObjectNode) (currentNode);
570638
objectNode.remove(CosmosInternalKeys);
571-
String id = currentNode.get("id").asText();
572-
updates.add(Update.newBuilder().setType(typ)
573-
.setData(ByteString.copyFromUtf8(currentNode.toString())).addId(BsonHelper.toId(id))
574-
.build());
639+
640+
Update.Builder b = Update.newBuilder().setType(typ)
641+
.setData(ByteString.copyFromUtf8(currentNode.toString()));
642+
for (String k : idKeys) {
643+
b.addId(BsonHelper.toId(k, currentNode.get(k).asText()));
644+
}
645+
646+
updates.add(b.build());
575647
}
576648
}
577649

0 commit comments

Comments
 (0)