Skip to content

Commit 7c53e1c

Browse files
cpsievertclaude
andcommitted
fix: chunk large DataFrames in register to avoid duckdb-rs panic
duckdb-rs's Arrow virtual table function writes an entire RecordBatch into a single DuckDB DataChunk whose vectors have a fixed capacity of STANDARD_VECTOR_SIZE (2048 rows). When a RecordBatch exceeds this, FlatVector::copy panics with "assertion failed: data.len() <= self.capacity()". Work around this by splitting DataFrames larger than 2048 rows into chunks: the first chunk creates the table via CREATE TEMP TABLE ... AS SELECT * FROM arrow(?, ?), and subsequent chunks use INSERT INTO. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 640ce77 commit 7c53e1c

1 file changed

Lines changed: 96 additions & 10 deletions

File tree

src/reader/duckdb.rs

Lines changed: 96 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -516,17 +516,53 @@ impl Reader for DuckDBReader {
516516
)));
517517
}
518518

519-
// Convert DataFrame to Arrow query params
520-
let params = dataframe_to_arrow_params(df)?;
519+
// DuckDB's Arrow virtual table function (in duckdb-rs) writes an entire
520+
// RecordBatch into a single DataChunk whose vectors have a fixed capacity
521+
// of STANDARD_VECTOR_SIZE (2048). Passing a RecordBatch with more rows
522+
// causes a panic. Work around this by chunking large DataFrames.
523+
const MAX_ARROW_BATCH_ROWS: usize = 2048;
524+
let total_rows = df.height();
525+
526+
if total_rows <= MAX_ARROW_BATCH_ROWS {
527+
// Small DataFrame: register in a single batch
528+
let params = dataframe_to_arrow_params(df)?;
529+
let sql = format!(
530+
"CREATE TEMP TABLE \"{}\" AS SELECT * FROM arrow(?, ?)",
531+
name
532+
);
533+
self.conn.execute(&sql, params).map_err(|e| {
534+
GgsqlError::ReaderError(format!("Failed to register table '{}': {}", name, e))
535+
})?;
536+
} else {
537+
// Large DataFrame: create table from first chunk, then insert remaining chunks
538+
let first_chunk = df.slice(0, MAX_ARROW_BATCH_ROWS);
539+
let params = dataframe_to_arrow_params(first_chunk)?;
540+
let create_sql = format!(
541+
"CREATE TEMP TABLE \"{}\" AS SELECT * FROM arrow(?, ?)",
542+
name
543+
);
544+
self.conn.execute(&create_sql, params).map_err(|e| {
545+
GgsqlError::ReaderError(format!("Failed to register table '{}': {}", name, e))
546+
})?;
521547

522-
// Create temp table from Arrow data
523-
let sql = format!(
524-
"CREATE TEMP TABLE \"{}\" AS SELECT * FROM arrow(?, ?)",
525-
name
526-
);
527-
self.conn.execute(&sql, params).map_err(|e| {
528-
GgsqlError::ReaderError(format!("Failed to register table '{}': {}", name, e))
529-
})?;
548+
let mut offset = MAX_ARROW_BATCH_ROWS;
549+
while offset < total_rows {
550+
let chunk_size = std::cmp::min(MAX_ARROW_BATCH_ROWS, total_rows - offset);
551+
let chunk = df.slice(offset as i64, chunk_size);
552+
let params = dataframe_to_arrow_params(chunk)?;
553+
let insert_sql = format!(
554+
"INSERT INTO \"{}\" SELECT * FROM arrow(?, ?)",
555+
name
556+
);
557+
self.conn.execute(&insert_sql, params).map_err(|e| {
558+
GgsqlError::ReaderError(format!(
559+
"Failed to insert chunk into table '{}': {}",
560+
name, e
561+
))
562+
})?;
563+
offset += chunk_size;
564+
}
565+
}
530566

531567
// Track the table so we can unregister it later
532568
self.registered_tables.insert(name.to_string());
@@ -783,4 +819,54 @@ mod tests {
783819
let result = reader.execute_sql("SELECT * FROM data").unwrap();
784820
assert_eq!(result.height(), 3);
785821
}
822+
823+
#[test]
824+
fn test_register_large_dataframe() {
825+
// duckdb-rs Arrow vtab has a vector capacity of 2048 rows. DataFrames
826+
// larger than this must be chunked to avoid a panic.
827+
let mut reader = DuckDBReader::from_connection_string("duckdb://memory").unwrap();
828+
829+
let n = 3000;
830+
let ids: Vec<i32> = (0..n).collect();
831+
let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
832+
let names: Vec<String> = (0..n).map(|i| format!("item_{}", i)).collect();
833+
834+
let df = DataFrame::new(vec![
835+
Column::new("id".into(), ids),
836+
Column::new("value".into(), values),
837+
Column::new("name".into(), names),
838+
])
839+
.unwrap();
840+
841+
reader.register("large_table", df).unwrap();
842+
843+
// Verify row count
844+
let result = reader
845+
.execute_sql("SELECT COUNT(*) as cnt FROM large_table")
846+
.unwrap();
847+
let count = result.column("cnt").unwrap().i64().unwrap().get(0).unwrap();
848+
assert_eq!(count, n as i64);
849+
850+
// Verify first and last rows survived chunking intact
851+
let result = reader
852+
.execute_sql("SELECT id, name FROM large_table ORDER BY id LIMIT 1")
853+
.unwrap();
854+
assert_eq!(result.column("id").unwrap().i32().unwrap().get(0).unwrap(), 0);
855+
assert_eq!(
856+
result.column("name").unwrap().str().unwrap().get(0).unwrap(),
857+
"item_0"
858+
);
859+
860+
let result = reader
861+
.execute_sql("SELECT id, name FROM large_table ORDER BY id DESC LIMIT 1")
862+
.unwrap();
863+
assert_eq!(
864+
result.column("id").unwrap().i32().unwrap().get(0).unwrap(),
865+
(n - 1) as i32
866+
);
867+
assert_eq!(
868+
result.column("name").unwrap().str().unwrap().get(0).unwrap(),
869+
format!("item_{}", n - 1)
870+
);
871+
}
786872
}

0 commit comments

Comments
 (0)