-
Notifications
You must be signed in to change notification settings - Fork 11
Add ListById (iface only) #364
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 WalkthroughWalkthroughAdds a new ConnectorService RPC GetByIds (messages and capability flag), updates generated Java gRPC stubs/descriptors, implements GetByIds in Mongo and Java/Cosmos connectors, and embeds Changes
Sequence DiagramsequenceDiagram
participant Client as Client
participant GRPC as ConnectorService (gRPC)
participant Connector as Connector Impl
participant DB as Data Store / DB
Client->>GRPC: GetByIds(GetByIdsRequest)
activate GRPC
GRPC->>Connector: GetByIds(request)
activate Connector
Connector->>DB: Batch/read-by-IDs (concurrent per-ID or readMany)
activate DB
DB-->>Connector: Items or not-found
deactivate DB
Connector-->>GRPC: GetByIdsResponse (ordered data[])
deactivate Connector
GRPC-->>Client: GetByIdsResponse
deactivate GRPC
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
2bafdff to
6a93361
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@connectors/mongo/conn.go`:
- Around line 1052-1091: The loop in conn.GetByIds dereferences id.GetId()[0]
without checking for an empty slice, which can panic; inside the for-loop (and
inside the eg.Go closure) validate that len(id.GetId()) > 0 before accessing
index 0, and if empty assign res[i] = &adiomv1.GetByIdsResponse_ResponseItem{}
and return nil from the goroutine; also avoid loop-variable capture by copying i
and id into local variables (e.g., ii, item := i, id) before starting eg.Go so
each goroutine uses the correct index and id when setting res[ii].
In `@java/src/main/java/adiom/Main.java`:
- Around line 447-475: getByIds currently indexes results only by id causing
collisions when the same id exists under different partition keys and it doesn't
strip Cosmos internal fields; change the mapping logic in getByIds to use a
composite key of partitionKey + id (e.g. String key = partitionKey.toString() +
"::" + idPart) and map that composite key to a List<Integer> of response indices
(instead of a single Integer in m), populate that list when building
itemsToRead, then when iterating FeedResponse<JsonNode> fr.getResults() compute
the same composite key for each JsonNode and for each stored index set
respBuilder.setData(index, ...) so duplicates map to all correct slots; before
serializing each JsonNode remove internal fields (call
objectNode.remove(CosmosInternalKeys) or the same removal used in
listData/getUpdates) and then ByteString.copyFromUtf8(objectNode.toString()) so
internal Cosmos keys are stripped.
| // GetByIds implements adiomv1connect.ConnectorServiceHandler. | ||
| func (c *conn) GetByIds(ctx context.Context, r *connect.Request[adiomv1.GetByIdsRequest]) (*connect.Response[adiomv1.GetByIdsResponse], error) { | ||
| col, _, ok := GetCol(c.client, r.Msg.GetNamespace()) | ||
| if !ok { | ||
| return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("namespace should be fully qualified")) | ||
| } | ||
|
|
||
| // TODO: maybe use batch endpoint if we need to optimize | ||
| res := make([]*adiomv1.GetByIdsResponse_ResponseItem, len(r.Msg.GetIds())) | ||
| var eg errgroup.Group | ||
| for i, id := range r.Msg.GetIds() { | ||
| eg.Go(func() error { | ||
| bv := id.GetId()[0] | ||
| rawVal := bson.RawValue{ | ||
| Type: bsontype.Type(bv.GetType()), | ||
| Value: bv.GetData(), | ||
| } | ||
| v, err := col.FindOne(ctx, bson.M{"_id": rawVal}).Raw() | ||
| if err != nil { | ||
| if errors.Is(err, mongo.ErrNoDocuments) { | ||
| res[i] = &adiomv1.GetByIdsResponse_ResponseItem{} | ||
| return nil | ||
| } | ||
| return fmt.Errorf("err in findone: %w", err) | ||
| } | ||
| res[i] = &adiomv1.GetByIdsResponse_ResponseItem{ | ||
| Data: v, | ||
| } | ||
| return nil | ||
| }) | ||
| } | ||
|
|
||
| if err := eg.Wait(); err != nil { | ||
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err finding ids: %w", err)) | ||
| } | ||
|
|
||
| return connect.NewResponse(&adiomv1.GetByIdsResponse{ | ||
| Data: res, | ||
| }), nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Find go.mod to determine Go version
fd -t f "go.mod" | head -5Repository: adiom-data/dsync
Length of output: 66
🏁 Script executed:
# Search for Go version in go.mod
cat go.mod | head -20Repository: adiom-data/dsync
Length of output: 737
🌐 Web query:
Go 1.22 range loop variable capture goroutine fix
💡 Result:
Go 1.22 fixes the classic “range loop variable captured by goroutine/closure” bug by changing for-loop variables declared in the loop header (including for _, v := range ...) to be new variables each iteration, instead of one variable reused across iterations. That means this now behaves correctly (prints a b c in any order) without the usual workaround: (go.dev)
values := []string{"a", "b", "c"}
for _, v := range values {
go func() { fmt.Println(v) }()
}Key details:
- The new semantics apply only to packages in modules whose
go.moddeclaresgo 1.22(or later). This avoids silently changing behavior of older code. (go.dev) - In Go 1.21 you could preview/test the change with
GOEXPERIMENT=loopvar. (tip.golang.org)
If you’re on Go ≤1.21 (or your module’s go.mod is < 1.22), the safe patterns are still:
- pass the loop variable as a parameter to the closure, or
- create a per-iteration copy (
v := v) before starting the goroutine. (go.dev)
Citations:
- 1: https://go.dev/blog/go1.22?utm_source=openai
- 2: https://go.dev/blog/loopvar-preview?utm_source=openai
- 3: https://tip.golang.org/wiki/LoopvarExperiment?utm_source=openai
- 4: https://go.dev/blog/loopvar-preview?utm_source=openai
🏁 Script executed:
# Find the proto files to understand the structure of GetByIdsRequest
fd -t f "\.proto" | xargs grep -l "GetByIds\|GetId" | head -5Repository: adiom-data/dsync
Length of output: 116
🏁 Script executed:
# Search for adiomv1 proto definitions
fd -t f "\.proto" | head -10Repository: adiom-data/dsync
Length of output: 144
🏁 Script executed:
# Look for ID validation patterns elsewhere in the file
rg "GetId\(\)" connectors/mongo/conn.go -B 2 -A 2Repository: adiom-data/dsync
Length of output: 1482
🏁 Script executed:
# Check the proto definition for GetByIdsRequest and ID message structure
cat proto/adiom/v1/messages.proto | head -100Repository: adiom-data/dsync
Length of output: 2753
🏁 Script executed:
# Check if there's validation rules in proto
rg "GetByIdsRequest|message.*Id" proto/adiom/v1/ -A 5Repository: adiom-data/dsync
Length of output: 1234
Validate empty IDs to prevent panic on index access.
id.GetId()[0] panics when the request item has no IDs; the proto allows repeated BsonValue id to be empty. Add validation before processing each ID.
🐛 Proposed fix
for i, id := range r.Msg.GetIds() {
+ if len(id.GetId()) == 0 {
+ return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("id[%d] is empty", i))
+ }
eg.Go(func() error {
bv := id.GetId()[0]🤖 Prompt for AI Agents
In `@connectors/mongo/conn.go` around lines 1052 - 1091, The loop in conn.GetByIds
dereferences id.GetId()[0] without checking for an empty slice, which can panic;
inside the for-loop (and inside the eg.Go closure) validate that len(id.GetId())
> 0 before accessing index 0, and if empty assign res[i] =
&adiomv1.GetByIdsResponse_ResponseItem{} and return nil from the goroutine; also
avoid loop-variable capture by copying i and id into local variables (e.g., ii,
item := i, id) before starting eg.Go so each goroutine uses the correct index
and id when setting res[ii].
6a93361 to
e48457b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@connectors/mongo/conn.go`:
- Around line 1059-1085: The loop that spawns goroutines per r.Msg.GetIds() must
be limited and must not capture loop variables by reference: initialize eg as an
errgroup.Group value and call eg.SetLimit(<reasonableLimit>) before the loop
(e.g., 10 or a configurable value) to cap concurrency, and inside the for loop
copy the loop variables (e.g., idx, id := i, id) and use those copies in the
goroutine so res[idx] is set correctly; keep the Mongo call
(col.FindOne(...).Raw()) and error handling the same and then wait on eg.Wait()
as before.
| // TODO: maybe use batch endpoint if we need to optimize | ||
| res := make([]*adiomv1.GetByIdsResponse_ResponseItem, len(r.Msg.GetIds())) | ||
| var eg errgroup.Group | ||
| for i, id := range r.Msg.GetIds() { | ||
| eg.Go(func() error { | ||
| bv := id.GetId()[0] | ||
| rawVal := bson.RawValue{ | ||
| Type: bsontype.Type(bv.GetType()), | ||
| Value: bv.GetData(), | ||
| } | ||
| v, err := col.FindOne(ctx, bson.M{"_id": rawVal}).Raw() | ||
| if err != nil { | ||
| if errors.Is(err, mongo.ErrNoDocuments) { | ||
| res[i] = &adiomv1.GetByIdsResponse_ResponseItem{} | ||
| return nil | ||
| } | ||
| return fmt.Errorf("err in findone: %w", err) | ||
| } | ||
| res[i] = &adiomv1.GetByIdsResponse_ResponseItem{ | ||
| Data: v, | ||
| } | ||
| return nil | ||
| }) | ||
| } | ||
|
|
||
| if err := eg.Wait(); err != nil { | ||
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err finding ids: %w", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cat -n connectors/mongo/conn.go | sed -n '1059,1085p'Repository: adiom-data/dsync
Length of output: 1062
🏁 Script executed:
# Check if SetLimit is the correct API and when it was introduced
web_search
What is errgroup.SetLimit in Go? When was it introduced?Repository: adiom-data/dsync
Length of output: 153
🏁 Script executed:
# Search for any existing concurrency control patterns in the codebase
rg -i "setlimit|maxparallel|concurrency|limit" connectors/mongo/conn.go -B2 -A2Repository: adiom-data/dsync
Length of output: 753
🏁 Script executed:
# Check for other similar goroutine spawning patterns in the codebase
rg "eg\.Go\(func\(\)" connectors/mongo/conn.go -B3 -A10Repository: adiom-data/dsync
Length of output: 961
🌐 Web query:
errgroup.SetLimit Go API documentation
💡 Result:
errgroup.(*Group).SetLimit (package golang.org/x/sync/errgroup) has the signature:
func (g *Group) SetLimit(n int)It “limits the number of active goroutines in this group to at most n”; n < 0 means “no limit”, and n == 0 prevents any new goroutines from being added. After setting a limit, calls to g.Go(...) will block until the group can start another goroutine without exceeding the limit. You must not change the limit while any goroutines in the group are active. [1]
Source: Go package docs for golang.org/x/sync/errgroup on pkg.go.dev. [1]
[1] https://pkg.go.dev/golang.org/x/sync/errgroup
🏁 Script executed:
# Check Go version requirements for this project
find . -name "go.mod" -o -name ".github" -type d | head -5Repository: adiom-data/dsync
Length of output: 78
🏁 Script executed:
cat go.mod | grep -i "go \|module"Repository: adiom-data/dsync
Length of output: 278
🏁 Script executed:
# Search for any Go version hints in CI/build files
fd "\.yml|\.yaml" | xargs grep -l "go-version" 2>/dev/null | head -3Repository: adiom-data/dsync
Length of output: 42
Cap GetByIds concurrency to prevent unbounded goroutine spawning.
Spawning one goroutine per ID without limits can exhaust system resources and exceed MongoDB driver connection pool limits. Use errgroup.SetLimit() to cap parallelism:
Fix
- // TODO: maybe use batch endpoint if we need to optimize
- res := make([]*adiomv1.GetByIdsResponse_ResponseItem, len(r.Msg.GetIds()))
- var eg errgroup.Group
+ // TODO: maybe use batch endpoint if we need to optimize
+ res := make([]*adiomv1.GetByIdsResponse_ResponseItem, len(r.Msg.GetIds()))
+ const maxGetByIdsParallelism = 32 // consider making configurable
+ eg, ctx := errgroup.WithContext(ctx)
+ eg.SetLimit(maxGetByIdsParallelism)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // TODO: maybe use batch endpoint if we need to optimize | |
| res := make([]*adiomv1.GetByIdsResponse_ResponseItem, len(r.Msg.GetIds())) | |
| var eg errgroup.Group | |
| for i, id := range r.Msg.GetIds() { | |
| eg.Go(func() error { | |
| bv := id.GetId()[0] | |
| rawVal := bson.RawValue{ | |
| Type: bsontype.Type(bv.GetType()), | |
| Value: bv.GetData(), | |
| } | |
| v, err := col.FindOne(ctx, bson.M{"_id": rawVal}).Raw() | |
| if err != nil { | |
| if errors.Is(err, mongo.ErrNoDocuments) { | |
| res[i] = &adiomv1.GetByIdsResponse_ResponseItem{} | |
| return nil | |
| } | |
| return fmt.Errorf("err in findone: %w", err) | |
| } | |
| res[i] = &adiomv1.GetByIdsResponse_ResponseItem{ | |
| Data: v, | |
| } | |
| return nil | |
| }) | |
| } | |
| if err := eg.Wait(); err != nil { | |
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err finding ids: %w", err)) | |
| // TODO: maybe use batch endpoint if we need to optimize | |
| res := make([]*adiomv1.GetByIdsResponse_ResponseItem, len(r.Msg.GetIds())) | |
| const maxGetByIdsParallelism = 32 // consider making configurable | |
| eg, ctx := errgroup.WithContext(ctx) | |
| eg.SetLimit(maxGetByIdsParallelism) | |
| for i, id := range r.Msg.GetIds() { | |
| eg.Go(func() error { | |
| bv := id.GetId()[0] | |
| rawVal := bson.RawValue{ | |
| Type: bsontype.Type(bv.GetType()), | |
| Value: bv.GetData(), | |
| } | |
| v, err := col.FindOne(ctx, bson.M{"_id": rawVal}).Raw() | |
| if err != nil { | |
| if errors.Is(err, mongo.ErrNoDocuments) { | |
| res[i] = &adiomv1.GetByIdsResponse_ResponseItem{} | |
| return nil | |
| } | |
| return fmt.Errorf("err in findone: %w", err) | |
| } | |
| res[i] = &adiomv1.GetByIdsResponse_ResponseItem{ | |
| Data: v, | |
| } | |
| return nil | |
| }) | |
| } | |
| if err := eg.Wait(); err != nil { | |
| return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err finding ids: %w", err)) |
🤖 Prompt for AI Agents
In `@connectors/mongo/conn.go` around lines 1059 - 1085, The loop that spawns
goroutines per r.Msg.GetIds() must be limited and must not capture loop
variables by reference: initialize eg as an errgroup.Group value and call
eg.SetLimit(<reasonableLimit>) before the loop (e.g., 10 or a configurable
value) to cap concurrency, and inside the for loop copy the loop variables
(e.g., idx, id := i, id) and use those copies in the goroutine so res[idx] is
set correctly; keep the Mongo call (col.FindOne(...).Raw()) and error handling
the same and then wait on eg.Wait() as before.
e48457b to
df28fc2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@java/src/main/java/adiom/Main.java`:
- Around line 481-492: The loop in Main.java over fr.getResults() can NPE: guard
against n.get(s) being null before calling asText() (e.g., use a safe fallback
like "" or skip the item) when building the composite key from idKeys, and avoid
unboxing null from m.get(...) by retrieving Integer idxObj =
m.get(sb.toString()) and checking for null before converting to int (log or
continue if not found) before calling respBuilder.setData(...); ensure these
null checks are applied around the block that builds sb, looks up m, and sets
respBuilder to prevent NPEs.
🧹 Nitpick comments (2)
java/src/main/java/adiom/Main.java (2)
621-646: Consider adding defensive null checks for nested field access.Multiple chained
.get()calls on JsonNode could throw NPE if the structure varies:
- Line 625:
node.get("metadata").get(k)- Line 627:
node.get("metadata").get("partitionKey").get(k)- Line 643:
currentNode.get(k)While the Cosmos change feed structure is well-defined, defensive checks would improve robustness against unexpected data.
♻️ Example defensive access pattern
if (opType != null && opType.asText().equals("delete")) { Update.Builder b = Update.newBuilder().setType(adiom.v1.Messages.UpdateType.UPDATE_TYPE_DELETE); + JsonNode metadata = node.get("metadata"); for (String k : idKeys) { if (k.equals("id")) { - b.addId(BsonHelper.toId(k, node.get("metadata").get(k).asText())); + b.addId(BsonHelper.toId(k, metadata.get(k).asText())); } else { - b.addId(BsonHelper.toId(k, node.get("metadata").get("partitionKey").get(k).asText())); + JsonNode pkNode = metadata.get("partitionKey"); + if (pkNode != null && pkNode.has(k)) { + b.addId(BsonHelper.toId(k, pkNode.get(k).asText())); + } } }
457-463: Consider extracting idKeys building to a helper method.The same logic for building
idKeysfrom partition key paths appears in bothgetByIds(lines 457-463) andstreamUpdates(lines 603-609). Extracting this to a helper method inNsHelperwould reduce duplication.♻️ Suggested helper method
// In NsHelper class public List<String> getIdKeys() { List<String> idKeys = new ArrayList<>(); for (String path : this.pkd.getPaths()) { idKeys.add(path.substring(1)); } if (!idKeys.getLast().equals("id")) { idKeys.add("id"); } return idKeys; }
| for (JsonNode n : fr.getResults()) { | ||
| StringBuilder sb = new StringBuilder(); | ||
| for (String s : idKeys) { | ||
| sb.append(n.get(s).asText()); | ||
| sb.append(","); | ||
| } | ||
| int idx = m.get(sb.toString()); | ||
| ObjectNode objectNode = (ObjectNode) (n); | ||
| objectNode.remove(CosmosInternalKeys); | ||
| ByteString data = ByteString.copyFromUtf8(n.toString()); | ||
| respBuilder.setData(idx, GetByIdsResponse.ResponseItem.newBuilder().setData(data)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential NPE when item not found or field missing.
Two null-safety issues:
- Line 484:
n.get(s)returnsnullif the field doesn't exist, causing NPE on.asText(). - Line 487:
m.get(sb.toString())returnsnull(boxed) if the composite key isn't found (e.g., item doesn't exist in Cosmos), causing NPE when unboxing toint idx.
🐛 Proposed fix with null checks
for (JsonNode n : fr.getResults()) {
StringBuilder sb = new StringBuilder();
for (String s : idKeys) {
- sb.append(n.get(s).asText());
+ JsonNode fieldNode = n.get(s);
+ if (fieldNode == null) {
+ logger.atWarn().addKeyValue("field", s).log("Missing expected field in document");
+ continue;
+ }
+ sb.append(fieldNode.asText());
sb.append(",");
}
- int idx = m.get(sb.toString());
+ Integer idx = m.get(sb.toString());
+ if (idx == null) {
+ continue; // Item not in request map, skip
+ }
ObjectNode objectNode = (ObjectNode) (n);
objectNode.remove(CosmosInternalKeys);
ByteString data = ByteString.copyFromUtf8(n.toString());
respBuilder.setData(idx, GetByIdsResponse.ResponseItem.newBuilder().setData(data));
}🤖 Prompt for AI Agents
In `@java/src/main/java/adiom/Main.java` around lines 481 - 492, The loop in
Main.java over fr.getResults() can NPE: guard against n.get(s) being null before
calling asText() (e.g., use a safe fallback like "" or skip the item) when
building the composite key from idKeys, and avoid unboxing null from m.get(...)
by retrieving Integer idxObj = m.get(sb.toString()) and checking for null before
converting to int (log or continue if not found) before calling
respBuilder.setData(...); ensure these null checks are applied around the block
that builds sb, looks up m, and sets respBuilder to prevent NPEs.
Summary by CodeRabbit