Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions connectors/mongo/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ConnectorSettings struct {
SampleFactor int // a factor to determine how many extra samples per partition are used
MaxPageSize int
PerNamespaceStreams bool
SkipBatchOverwrite bool

Query string // query filter, as a v2 Extended JSON string, e.g., '{\"x\":{\"$gt\":1}}'"
}
Expand Down Expand Up @@ -234,6 +235,9 @@ func (c *conn) GeneratePlan(ctx context.Context, r *connect.Request[adiomv1.Gene
}()

for _, partition := range partitions {
if !r.Msg.GetInitialSync() {
break
}
eg.Go(func() error {
ns, _ := ToNS(partition.Namespace)
col := c.client.Database(ns.Db).Collection(ns.Col)
Comment on lines 237 to 243
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "conn.go" | grep mongo

Repository: adiom-data/dsync

Length of output: 86


🏁 Script executed:

cat -n connectors/mongo/conn.go | sed -n '230,250p'

Repository: adiom-data/dsync

Length of output: 723


🏁 Script executed:

cat -n connectors/mongo/conn.go | sed -n '235,260p'

Repository: adiom-data/dsync

Length of output: 1010


🏁 Script executed:

rg -n "eg\.Go\(func" connectors/mongo/conn.go

Repository: adiom-data/dsync

Length of output: 114


🏁 Script executed:

cat -n connectors/mongo/conn.go | sed -n '1,20p'

Repository: adiom-data/dsync

Length of output: 498


🏁 Script executed:

cat -n connectors/mongo/conn.go | sed -n '1060,1085p'

Repository: adiom-data/dsync

Length of output: 1103


🏁 Script executed:

cat -n connectors/mongo/conn.go | sed -n '237,270p'

Repository: adiom-data/dsync

Length of output: 1617


Capture partition before spawning goroutines.

Line 242 and 254 close over the range variable partition; in Go this variable is reused, so goroutines can observe the last value and race. All concurrent tasks spawned via eg.Go() will reference the same final partition value.

🔧 Safe capture pattern
 for _, partition := range partitions {
 	if !r.Msg.GetInitialSync() {
 		break
 	}
+	partition := partition
 	eg.Go(func() error {
 		ns, _ := ToNS(partition.Namespace)
🤖 Prompt for AI Agents
In `@connectors/mongo/conn.go` around lines 237 - 243, The goroutine closure in
the loop over partitions closes over the loop variable partition causing races;
fix by capturing the current partition before spawning the goroutine (e.g.,
assign cur := partition or pass partition as a parameter) and then use that
captured variable inside the eg.Go closure where ToNS(partition.Namespace) and
c.client.Database(...).Collection(...) are invoked so each goroutine gets the
correct partition value.

Expand Down Expand Up @@ -820,7 +824,7 @@ func (c *conn) StreamUpdates(ctx context.Context, r *connect.Request[adiomv1.Str
}

// inserts data and overwrites on conflict
func insertBatchOverwrite(ctx context.Context, collection *mongo.Collection, documents []interface{}) error {
func insertBatchOverwrite(ctx context.Context, collection *mongo.Collection, documents []interface{}, skipBatchOverwrite bool) error {
// eagerly attempt an unordered insert
_, bwErr := collection.InsertMany(ctx, documents, options.InsertMany().SetOrdered(false))

Expand Down Expand Up @@ -849,12 +853,12 @@ func insertBatchOverwrite(ctx context.Context, collection *mongo.Collection, doc
if isBSONObjectTooLargeError(bwErr) {
slog.Debug(fmt.Sprintf("Bulk write failed due to BSON object too large: %v", bwErr))
mid := len(documents) / 2
err := insertBatchOverwrite(ctx, collection, documents[:mid])
err := insertBatchOverwrite(ctx, collection, documents[:mid], skipBatchOverwrite)
if err != nil {
slog.Error(fmt.Sprintf("Bulk write failed (first half, up to %d): %v", mid, err))
return err
}
err = insertBatchOverwrite(ctx, collection, documents[mid:])
err = insertBatchOverwrite(ctx, collection, documents[mid:], skipBatchOverwrite)
if err != nil {
slog.Error(fmt.Sprintf("Bulk write failed (second half, from %d): %v", mid, err))
return err
Expand All @@ -866,8 +870,8 @@ func insertBatchOverwrite(ctx context.Context, collection *mongo.Collection, doc
return bwErr
}

// redo them all as a bulk replace
if len(bulkOverwrite) > 0 {
// maybe redo them all as a bulk replace
if len(bulkOverwrite) > 0 && !skipBatchOverwrite {
_, err := collection.BulkWrite(ctx, bulkOverwrite, options.BulkWrite().SetOrdered(false))
if err != nil {
slog.Error(fmt.Sprintf("Failed to overwrite documents in collection: %v", err))
Expand All @@ -888,7 +892,7 @@ func (c *conn) WriteData(ctx context.Context, r *connect.Request[adiomv1.WriteDa
for _, data := range r.Msg.GetData() {
batch = append(batch, bson.Raw(data))
if c.settings.WriterMaxBatchSize > 0 && len(batch) >= c.settings.WriterMaxBatchSize {
err := insertBatchOverwrite(ctx, col, batch)
err := insertBatchOverwrite(ctx, col, batch, c.settings.SkipBatchOverwrite)
if err != nil {
if !errors.Is(err, context.Canceled) {
slog.Error(fmt.Sprintf("Failed to insert batch: %v", err))
Expand All @@ -899,7 +903,7 @@ func (c *conn) WriteData(ctx context.Context, r *connect.Request[adiomv1.WriteDa
}
}
if len(batch) > 0 {
err := insertBatchOverwrite(ctx, col, batch)
err := insertBatchOverwrite(ctx, col, batch, c.settings.SkipBatchOverwrite)
if err != nil {
if !errors.Is(err, context.Canceled) {
slog.Error(fmt.Sprintf("Failed to insert batch: %v", err))
Expand Down
4 changes: 4 additions & 0 deletions internal/app/options/connectorflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,10 @@ func CosmosFlags(settings *cosmos.ConnectorSettings) []cli.Flag {

func MongoFlags(settings *mongo.ConnectorSettings) []cli.Flag {
return []cli.Flag{
altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "skip-batch-overwrite",
Destination: &settings.SkipBatchOverwrite,
}),
altsrc.NewDurationFlag(&cli.DurationFlag{
Name: "server-timeout",
Required: false,
Expand Down
Loading