From 458af57413a30d19c4e1120635f9d4bb1815fa47 Mon Sep 17 00:00:00 2001 From: Mark C Date: Mon, 2 Feb 2026 11:25:53 -0800 Subject: [PATCH] Some optimizations for mongo connector --- connectors/mongo/conn.go | 18 +++++++++++------- internal/app/options/connectorflags.go | 4 ++++ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/connectors/mongo/conn.go b/connectors/mongo/conn.go index 3097b45..bcc5251 100644 --- a/connectors/mongo/conn.go +++ b/connectors/mongo/conn.go @@ -40,6 +40,7 @@ type ConnectorSettings struct { SampleFactor int // a factor to determine how many extra samples per partition are used MaxPageSize int PerNamespaceStreams bool + SkipBatchOverwrite bool Query string // query filter, as a v2 Extended JSON string, e.g., '{\"x\":{\"$gt\":1}}'" } @@ -234,6 +235,9 @@ func (c *conn) GeneratePlan(ctx context.Context, r *connect.Request[adiomv1.Gene }() for _, partition := range partitions { + if !r.Msg.GetInitialSync() { + break + } eg.Go(func() error { ns, _ := ToNS(partition.Namespace) col := c.client.Database(ns.Db).Collection(ns.Col) @@ -820,7 +824,7 @@ func (c *conn) StreamUpdates(ctx context.Context, r *connect.Request[adiomv1.Str } // inserts data and overwrites on conflict -func insertBatchOverwrite(ctx context.Context, collection *mongo.Collection, documents []interface{}) error { +func insertBatchOverwrite(ctx context.Context, collection *mongo.Collection, documents []interface{}, skipBatchOverwrite bool) error { // eagerly attempt an unordered insert _, bwErr := collection.InsertMany(ctx, documents, options.InsertMany().SetOrdered(false)) @@ -849,12 +853,12 @@ func insertBatchOverwrite(ctx context.Context, collection *mongo.Collection, doc if isBSONObjectTooLargeError(bwErr) { slog.Debug(fmt.Sprintf("Bulk write failed due to BSON object too large: %v", bwErr)) mid := len(documents) / 2 - err := insertBatchOverwrite(ctx, collection, documents[:mid]) + err := insertBatchOverwrite(ctx, collection, documents[:mid], skipBatchOverwrite) if err != nil { slog.Error(fmt.Sprintf("Bulk write failed (first half, up to %d): %v", mid, err)) return err } - err = insertBatchOverwrite(ctx, collection, documents[mid:]) + err = insertBatchOverwrite(ctx, collection, documents[mid:], skipBatchOverwrite) if err != nil { slog.Error(fmt.Sprintf("Bulk write failed (second half, from %d): %v", mid, err)) return err @@ -866,8 +870,8 @@ func insertBatchOverwrite(ctx context.Context, collection *mongo.Collection, doc return bwErr } - // redo them all as a bulk replace - if len(bulkOverwrite) > 0 { + // maybe redo them all as a bulk replace + if len(bulkOverwrite) > 0 && !skipBatchOverwrite { _, err := collection.BulkWrite(ctx, bulkOverwrite, options.BulkWrite().SetOrdered(false)) if err != nil { slog.Error(fmt.Sprintf("Failed to overwrite documents in collection: %v", err)) @@ -888,7 +892,7 @@ func (c *conn) WriteData(ctx context.Context, r *connect.Request[adiomv1.WriteDa for _, data := range r.Msg.GetData() { batch = append(batch, bson.Raw(data)) if c.settings.WriterMaxBatchSize > 0 && len(batch) >= c.settings.WriterMaxBatchSize { - err := insertBatchOverwrite(ctx, col, batch) + err := insertBatchOverwrite(ctx, col, batch, c.settings.SkipBatchOverwrite) if err != nil { if !errors.Is(err, context.Canceled) { slog.Error(fmt.Sprintf("Failed to insert batch: %v", err)) @@ -899,7 +903,7 @@ func (c *conn) WriteData(ctx context.Context, r *connect.Request[adiomv1.WriteDa } } if len(batch) > 0 { - err := insertBatchOverwrite(ctx, col, batch) + err := insertBatchOverwrite(ctx, col, batch, c.settings.SkipBatchOverwrite) if err != nil { if !errors.Is(err, context.Canceled) { slog.Error(fmt.Sprintf("Failed to insert batch: %v", err)) diff --git a/internal/app/options/connectorflags.go b/internal/app/options/connectorflags.go index 47853a2..e75ac6d 100644 --- a/internal/app/options/connectorflags.go +++ b/internal/app/options/connectorflags.go @@ -756,6 +756,10 @@ func CosmosFlags(settings *cosmos.ConnectorSettings) []cli.Flag { func MongoFlags(settings *mongo.ConnectorSettings) []cli.Flag { return []cli.Flag{ + altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "skip-batch-overwrite", + Destination: &settings.SkipBatchOverwrite, + }), altsrc.NewDurationFlag(&cli.DurationFlag{ Name: "server-timeout", Required: false,