Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions datafusion/datasource-json/src/boundary_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::task::{Context, Poll};
use bytes::Bytes;
use futures::stream::{BoxStream, Stream};
use futures::{StreamExt, TryFutureExt};
use object_store::{GetOptions, GetRange, ObjectStore};
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};

/// How far past `raw_end` the initial bounded fetch covers. If the terminating
/// newline is not found within this window, `ScanningLastTerminator` issues
Expand Down Expand Up @@ -90,10 +90,52 @@ async fn get_stream(
range: std::ops::Range<u64>,
) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
let opts = GetOptions {
range: Some(GetRange::Bounded(range)),
range: Some(GetRange::Bounded(range.clone())),
..Default::default()
};
let result = store.get_opts(&location, opts).await?;

#[cfg(not(target_arch = "wasm32"))]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like it may be improved upstream as well? Doing a spawn_blocking for each (small) read seems not great.

if let GetResultPayload::File(mut file, _path) = result.payload {
use std::io::{Read, Seek, SeekFrom};
const CHUNK_SIZE: u64 = 8 * 1024;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this ideal? Maybe a bit bigger gets higher throughput?


file.seek(SeekFrom::Start(range.start)).map_err(|e| {
object_store::Error::Generic {
store: "local",
source: Box::new(e),
}
})?;

return Ok(futures::stream::try_unfold(
(file, range.end - range.start),
move |(mut file, remaining)| async move {
if remaining == 0 {
return Ok(None);
}
let to_read = remaining.min(CHUNK_SIZE);
let cap = usize::try_from(to_read).map_err(|e| {
object_store::Error::Generic {
store: "local",
source: Box::new(e),
}
})?;

let mut buf = Vec::with_capacity(cap);
let read =
(&mut file)
.take(to_read)
.read_to_end(&mut buf)
.map_err(|e| object_store::Error::Generic {
store: "local",
source: Box::new(e),
})?;
Ok(Some((Bytes::from(buf), (file, remaining - read as u64))))
},
)
.boxed());
}

Ok(result.into_stream())
}

Expand Down
Loading