Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 116 additions & 10 deletions src/reader/duckdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,17 +516,58 @@ impl Reader for DuckDBReader {
)));
}

// Convert DataFrame to Arrow query params
let params = dataframe_to_arrow_params(df)?;
// Workaround for a duckdb-rs limitation (not a DuckDB limitation).
//
// duckdb-rs's `ArrowVTab` writes each RecordBatch into a single DuckDB
// `DataChunk`, which has a fixed capacity of `STANDARD_VECTOR_SIZE`.
// That constant is defined in DuckDB's C++ source at
// `src/include/duckdb/common/constants.hpp` and is currently 2048.
// When a RecordBatch exceeds this, `FlatVector::copy` panics with
// `assertion failed: data.len() <= self.capacity()`.
//
// We chunk large DataFrames to stay within this limit. The first chunk
// creates the table (letting DuckDB infer the schema from Arrow), and
// subsequent chunks INSERT into it.
const MAX_ARROW_BATCH_ROWS: usize = 2048;
let total_rows = df.height();

if total_rows <= MAX_ARROW_BATCH_ROWS {
// Small DataFrame: register in a single batch
let params = dataframe_to_arrow_params(df)?;
let sql = format!(
"CREATE TEMP TABLE \"{}\" AS SELECT * FROM arrow(?, ?)",
name
);
self.conn.execute(&sql, params).map_err(|e| {
GgsqlError::ReaderError(format!("Failed to register table '{}': {}", name, e))
})?;
} else {
// Large DataFrame: create table from first chunk, then insert remaining chunks
let first_chunk = df.slice(0, MAX_ARROW_BATCH_ROWS);
let params = dataframe_to_arrow_params(first_chunk)?;
let create_sql = format!(
"CREATE TEMP TABLE \"{}\" AS SELECT * FROM arrow(?, ?)",
name
);
self.conn.execute(&create_sql, params).map_err(|e| {
GgsqlError::ReaderError(format!("Failed to register table '{}': {}", name, e))
})?;

// Create temp table from Arrow data
let sql = format!(
"CREATE TEMP TABLE \"{}\" AS SELECT * FROM arrow(?, ?)",
name
);
self.conn.execute(&sql, params).map_err(|e| {
GgsqlError::ReaderError(format!("Failed to register table '{}': {}", name, e))
})?;
let mut offset = MAX_ARROW_BATCH_ROWS;
while offset < total_rows {
let chunk_size = std::cmp::min(MAX_ARROW_BATCH_ROWS, total_rows - offset);
let chunk = df.slice(offset as i64, chunk_size);
let params = dataframe_to_arrow_params(chunk)?;
let insert_sql = format!("INSERT INTO \"{}\" SELECT * FROM arrow(?, ?)", name);
self.conn.execute(&insert_sql, params).map_err(|e| {
GgsqlError::ReaderError(format!(
"Failed to insert chunk into table '{}': {}",
name, e
))
})?;
offset += chunk_size;
}
}

// Track the table so we can unregister it later
self.registered_tables.insert(name.to_string());
Expand Down Expand Up @@ -783,4 +824,69 @@ mod tests {
let result = reader.execute_sql("SELECT * FROM data").unwrap();
assert_eq!(result.height(), 3);
}

#[test]
fn test_register_large_dataframe() {
// duckdb-rs Arrow vtab has a vector capacity of 2048 rows. DataFrames
// larger than this must be chunked to avoid a panic.
let mut reader = DuckDBReader::from_connection_string("duckdb://memory").unwrap();

let n = 3000;
let ids: Vec<i32> = (0..n).collect();
let values: Vec<f64> = (0..n).map(|i| i as f64 * 1.5).collect();
let names: Vec<String> = (0..n).map(|i| format!("item_{}", i)).collect();

let df = DataFrame::new(vec![
Column::new("id".into(), ids),
Column::new("value".into(), values),
Column::new("name".into(), names),
])
.unwrap();

reader.register("large_table", df).unwrap();

// Verify row count
let result = reader
.execute_sql("SELECT COUNT(*) as cnt FROM large_table")
.unwrap();
let count = result.column("cnt").unwrap().i64().unwrap().get(0).unwrap();
assert_eq!(count, n as i64);

// Verify first and last rows survived chunking intact
let result = reader
.execute_sql("SELECT id, name FROM large_table ORDER BY id LIMIT 1")
.unwrap();
assert_eq!(
result.column("id").unwrap().i32().unwrap().get(0).unwrap(),
0
);
assert_eq!(
result
.column("name")
.unwrap()
.str()
.unwrap()
.get(0)
.unwrap(),
"item_0"
);

let result = reader
.execute_sql("SELECT id, name FROM large_table ORDER BY id DESC LIMIT 1")
.unwrap();
assert_eq!(
result.column("id").unwrap().i32().unwrap().get(0).unwrap(),
(n - 1) as i32
);
assert_eq!(
result
.column("name")
.unwrap()
.str()
.unwrap()
.get(0)
.unwrap(),
format!("item_{}", n - 1)
);
}
}