From b4ba0bca186a8aa2ca5665c9e45c78bfe3f18fae Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Wed, 8 Apr 2026 01:06:04 +0300 Subject: [PATCH] fix: json scan performance on local files The into_stream() implementation of GetResult (from arrow-rs-objectstore) fetches every 8KiB chunk using a spawn_blocking() task, resulting in a lot of scheduling overhead. Fix this by reading the data directly from the async context, using a buffer size of 8KiBs. This avoids any context switch. --- .../datasource-json/src/boundary_stream.rs | 46 ++++++++++++++++++- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-json/src/boundary_stream.rs b/datafusion/datasource-json/src/boundary_stream.rs index fc40feda6b80f..847c80279a53e 100644 --- a/datafusion/datasource-json/src/boundary_stream.rs +++ b/datafusion/datasource-json/src/boundary_stream.rs @@ -28,7 +28,7 @@ use std::task::{Context, Poll}; use bytes::Bytes; use futures::stream::{BoxStream, Stream}; use futures::{StreamExt, TryFutureExt}; -use object_store::{GetOptions, GetRange, ObjectStore}; +use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; /// How far past `raw_end` the initial bounded fetch covers. If the terminating /// newline is not found within this window, `ScanningLastTerminator` issues @@ -90,10 +90,52 @@ async fn get_stream( range: std::ops::Range, ) -> object_store::Result>> { let opts = GetOptions { - range: Some(GetRange::Bounded(range)), + range: Some(GetRange::Bounded(range.clone())), ..Default::default() }; let result = store.get_opts(&location, opts).await?; + + #[cfg(not(target_arch = "wasm32"))] + if let GetResultPayload::File(mut file, _path) = result.payload { + use std::io::{Read, Seek, SeekFrom}; + const CHUNK_SIZE: u64 = 8 * 1024; + + file.seek(SeekFrom::Start(range.start)).map_err(|e| { + object_store::Error::Generic { + store: "local", + source: Box::new(e), + } + })?; + + return Ok(futures::stream::try_unfold( + (file, range.end - range.start), + move |(mut file, remaining)| async move { + if remaining == 0 { + return Ok(None); + } + let to_read = remaining.min(CHUNK_SIZE); + let cap = usize::try_from(to_read).map_err(|e| { + object_store::Error::Generic { + store: "local", + source: Box::new(e), + } + })?; + + let mut buf = Vec::with_capacity(cap); + let read = + (&mut file) + .take(to_read) + .read_to_end(&mut buf) + .map_err(|e| object_store::Error::Generic { + store: "local", + source: Box::new(e), + })?; + Ok(Some((Bytes::from(buf), (file, remaining - read as u64)))) + }, + ) + .boxed()); + } + Ok(result.into_stream()) }