From 58549609d1a7ad85b33780b1425c5a23b330e6cb Mon Sep 17 00:00:00 2001 From: Benjamin Knofe-Vider Date: Fri, 5 Jun 2026 12:27:11 +0200 Subject: [PATCH] fix(arrow): drop-row at batch boundary in RowsToRecord The loop condition `rows.Next() && count < batchSize` advanced the underlying cursor once after the final scan; that row was silently dropped when the caller asked for the next batch. Unbounded SELECTs crossing the 1024-row batch boundary lost one row per transition (deterministic, ORDER BY-stable). COUNT(*) returned the parquet metadata count, hiding the discrepancy from aggregation queries. Reported by Marce Coll on dbt_marce.credit_purchase_events (COUNT(*) = 12617, SELECT = 12605) and dbt.usage_allocation (2,689,942 vs 2,687,758). WHERE filters still found the rows. Swap the order: `count < batchSize && rows.Next()` so Next() is not called when the batch is already full. Add regression test covering the production case. --- duckdbservice/arrow_helpers.go | 10 ++- duckdbservice/arrow_helpers_test.go | 100 ++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 1 deletion(-) diff --git a/duckdbservice/arrow_helpers.go b/duckdbservice/arrow_helpers.go index a267e542..34de2360 100644 --- a/duckdbservice/arrow_helpers.go +++ b/duckdbservice/arrow_helpers.go @@ -38,7 +38,15 @@ func RowsToRecord(alloc memory.Allocator, rows *sql.Rows, schema *arrow.Schema, numFields := schema.NumFields() count := 0 - for rows.Next() && count < batchSize { + // Order matters: check `count < batchSize` first, then call rows.Next(). + // The reverse (rows.Next() && count < batchSize) advances the cursor once + // more after the final scan and that row is silently dropped — the next + // call to RowsToRecord starts from the row *after* the one we skipped. + // Production reads were losing one row at every batch boundary + // (batchSize=1024) for unbounded SELECTs; COUNT(*) still returned the + // parquet-metadata row count, so the discrepancy was invisible to + // aggregation queries. See TestRowsToRecordNoRowsLostAtBatchBoundary. + for count < batchSize && rows.Next() { values := make([]interface{}, numFields) valuePtrs := make([]interface{}, numFields) for i := range values { diff --git a/duckdbservice/arrow_helpers_test.go b/duckdbservice/arrow_helpers_test.go index 235bc5fd..5ad83195 100644 --- a/duckdbservice/arrow_helpers_test.go +++ b/duckdbservice/arrow_helpers_test.go @@ -1059,3 +1059,103 @@ func TestGetQuerySchemaTrailingSemicolon(t *testing.T) { } } +// TestRowsToRecordNoRowsLostAtBatchBoundary reproduces the production bug where +// RowsToRecord silently dropped one row at every batch transition for unbounded +// SELECTs. The driver-level cursor was being advanced by the loop condition +// `rows.Next() && count < batchSize` even when the batch was already full, so +// the row at index `batchSize`, `2*batchSize`, ... never reached a Scan call +// and was lost when the caller asked for the next batch. +// +// Reported by Marce Coll on dbt_marce.credit_purchase_events +// (12617 rows reported by COUNT(*), 12605 rows delivered by SELECT) and +// dbt.usage_allocation (2,689,942 vs 2,687,758). Symptoms: +// - SELECT col FROM big_table delivers fewer rows than COUNT(*). +// - WHERE col = X still returns the missing row. +// - Number of lost rows scales linearly with table size. +func TestRowsToRecordNoRowsLostAtBatchBoundary(t *testing.T) { + alloc := memory.NewGoAllocator() + db, err := sql.Open("duckdb", "") + if err != nil { + t.Fatalf("open: %v", err) + } + defer func() { _ = db.Close() }() + + // Pick row counts that exercise multiple batch transitions and a partial + // final batch. batchSize = 1024 matches the value used by DoGetStatement. + const batchSize = 1024 + cases := []struct { + name string + rows int + }{ + {"single batch exact", 1024}, + {"one row over boundary", 1025}, + {"two batches exact", 2048}, + {"three batches + partial", 3500}, + {"production case credit_purchase_events", 12617}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tbl := fmt.Sprintf("t_%d", tc.rows) + if _, err := db.Exec(fmt.Sprintf( + "CREATE TABLE %s AS SELECT i AS id FROM range(0, %d) t(i)", tbl, tc.rows, + )); err != nil { + t.Fatalf("create table: %v", err) + } + + ctx := context.Background() + rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT id FROM %s ORDER BY id", tbl)) + if err != nil { + t.Fatalf("query: %v", err) + } + defer func() { _ = rows.Close() }() + + schema, err := GetQuerySchema(ctx, db, fmt.Sprintf("SELECT id FROM %s", tbl), nil) + if err != nil { + t.Fatalf("schema: %v", err) + } + + seen := make([]bool, tc.rows) + delivered := 0 + for { + rec, err := RowsToRecord(alloc, rows, schema, batchSize) + if err != nil { + t.Fatalf("RowsToRecord: %v", err) + } + if rec == nil { + break + } + col := rec.Column(0).(*array.Int64) + for i := 0; i < col.Len(); i++ { + id := col.Value(i) + if id < 0 || id >= int64(tc.rows) { + rec.Release() + t.Fatalf("delivered out-of-range id %d (expected 0..%d)", id, tc.rows-1) + } + if seen[id] { + rec.Release() + t.Fatalf("id %d delivered twice", id) + } + seen[id] = true + delivered++ + } + rec.Release() + } + + if delivered != tc.rows { + // Walk the seen set to point at the first dropped id; this + // matches the production symptom (deterministic missing rows). + firstDropped := -1 + for i, ok := range seen { + if !ok { + firstDropped = i + break + } + } + t.Fatalf("delivered %d rows, expected %d (first dropped id = %d)", + delivered, tc.rows, firstDropped) + } + }) + } +} +