Skip to content
216 changes: 215 additions & 1 deletion rust/lance-arrow/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ pub fn read_ipc_stream_single_at(
/// Modern IPC streams have an 8-byte prefix `[continuation: 4][size: 4]`.
/// Legacy streams have a 4-byte prefix `[size: 4]`. Returns `(prefix_len, meta_size)`.
fn parse_ipc_message_prefix(buf: &Buffer) -> Result<(usize, usize), ArrowError> {
let has_continuation = buf.len() >= 4 && buf[..4] == [0xff; 4];
let has_continuation = buf.len() >= 4 && buf[..4] == IPC_CONTINUATION;
if has_continuation {
if buf.len() < 8 {
return Err(ArrowError::ParseError(
Expand Down Expand Up @@ -358,6 +358,134 @@ pub fn read_ipc_stream_single(data: &Bytes) -> Result<RecordBatch, ArrowError> {
}
}

// ---------------------------------------------------------------------------
// Aligned IPC sections
// ---------------------------------------------------------------------------

/// Byte alignment that each IPC section's stream start is padded to.
///
/// When several IPC streams are concatenated into one larger blob (e.g. a
/// cache entry), a section that starts at an arbitrary offset would leave its
/// array data misaligned. [`FileDecoder`] with `require_alignment = false`
/// then silently copies each buffer into a freshly aligned allocation on
/// every read, defeating zero-copy. Padding each section start to a 64-byte
/// boundary keeps the decoded buffers borrowed directly from the input.
pub const IPC_SECTION_ALIGNMENT: usize = 64;

/// Number of zero-padding bytes needed to advance `pos` to the next
/// [`IPC_SECTION_ALIGNMENT`] boundary.
fn section_padding(pos: usize) -> usize {
(IPC_SECTION_ALIGNMENT - (pos % IPC_SECTION_ALIGNMENT)) % IPC_SECTION_ALIGNMENT
}

/// A [`Write`] adapter that counts the bytes written through it.
struct CountingWriter<'a> {
inner: &'a mut dyn Write,
count: usize,
}

impl Write for CountingWriter<'_> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let n = self.inner.write(buf)?;
self.count += n;
Ok(n)
}

fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
}

/// Write zero padding so the next byte lands on an [`IPC_SECTION_ALIGNMENT`]
/// boundary, advancing `pos` past it.
fn write_section_padding(writer: &mut dyn Write, pos: &mut usize) -> Result<(), ArrowError> {
let pad = section_padding(*pos);
if pad > 0 {
const ZEROS: [u8; IPC_SECTION_ALIGNMENT] = [0u8; IPC_SECTION_ALIGNMENT];
writer
.write_all(&ZEROS[..pad])
.map_err(|e| ArrowError::IoError(e.to_string(), e))?;
*pos += pad;
}
Ok(())
}

/// Write `batch` as a 64-byte-aligned single-batch Arrow IPC section.
///
/// `pos` is the absolute byte offset of `writer` within the enclosing blob.
/// Zero padding is written first so the IPC stream begins on an
/// [`IPC_SECTION_ALIGNMENT`] boundary, then the stream itself. `pos` is
/// advanced past both the padding and the stream so the caller can write
/// further aligned sections.
///
/// Paired with [`read_ipc_section_at`]. For the decoded buffers to be borrowed
/// zero-copy, the blob must ultimately be read back from a buffer whose base
/// address is at least 64-byte aligned.
pub fn write_ipc_section(
writer: &mut dyn Write,
pos: &mut usize,
batch: &RecordBatch,
) -> Result<(), ArrowError> {
write_section_padding(writer, pos)?;

let mut counting = CountingWriter {
inner: writer,
count: 0,
};
write_ipc_stream(batch, &mut counting)?;
*pos += counting.count;
Ok(())
}

/// Read a single [`RecordBatch`] from an aligned IPC section at `offset`.
///
/// Skips the alignment padding written by [`write_ipc_section`], then reads
/// the stream, advancing `offset` past the section (padding + stream + EOS).
///
/// Zero-copy: array buffers borrow from `data`'s allocation when `data`'s base
/// address is at least 64-byte aligned (see [`write_ipc_section`]).
pub fn read_ipc_section_at(data: &Bytes, offset: &mut usize) -> Result<RecordBatch, ArrowError> {
*offset += section_padding(*offset);
read_ipc_stream_single_at(data, offset)
}

/// Write `batches` as a single 64-byte-aligned multi-batch Arrow IPC section.
///
/// Like [`write_ipc_section`] but emits every batch from `iter` into one IPC
/// stream (schema + N batches + EOS). `iter` must yield at least one batch.
/// Paired with [`read_ipc_section_batches_at`].
pub fn write_ipc_section_batches<I>(
writer: &mut dyn Write,
pos: &mut usize,
iter: I,
) -> Result<(), ArrowError>
where
I: IntoIterator<Item = RecordBatch>,
{
write_section_padding(writer, pos)?;

let mut counting = CountingWriter {
inner: writer,
count: 0,
};
write_ipc_stream_batches(iter, &mut counting)?;
*pos += counting.count;
Ok(())
}

/// Read all [`RecordBatch`]es from an aligned multi-batch IPC section at
/// `offset`, advancing `offset` past the section (padding + stream + EOS).
///
/// Zero-copy: array buffers borrow from `data`'s allocation when `data`'s base
/// address is at least 64-byte aligned (see [`write_ipc_section_batches`]).
pub fn read_ipc_section_batches_at(
data: &Bytes,
offset: &mut usize,
) -> Result<Vec<RecordBatch>, ArrowError> {
*offset += section_padding(*offset);
read_ipc_stream_at(data, offset)
}

#[cfg(test)]
mod tests {
use arrow_array::{ArrayRef, record_batch};
Expand Down Expand Up @@ -403,4 +531,90 @@ mod tests {
assert_col_zero_copy(batch.column(1));
}
}

/// Allocate a [`Bytes`] whose base address is 64-byte aligned, modelling a
/// backend that reads cache entries into an aligned buffer. A plain
/// `Bytes::from(vec)` only guarantees the allocator's alignment for `u8`.
fn aligned_bytes(payload: &[u8]) -> Bytes {
let mut v = vec![0u8; payload.len() + IPC_SECTION_ALIGNMENT];
let pad = section_padding(v.as_ptr() as usize);
v[pad..pad + payload.len()].copy_from_slice(payload);
Bytes::from(v).slice(pad..pad + payload.len())
}

#[test]
fn test_aligned_ipc_sections_are_zero_copy() {
// A LargeBinary column exercises the i64-offset buffer whose 8-byte
// alignment requirement triggers a realigning memcpy when misaligned.
let blocks = arrow_array::LargeBinaryArray::from_vec(vec![&b"hello"[..], b"world"]);
let section_a = RecordBatch::try_from_iter([("a", Arc::new(blocks) as ArrayRef)]).unwrap();
let section_b = record_batch!(("b", Int64, [10i64, 20, 30, 40, 50])).unwrap();

let mut buf = Vec::new();
// Arbitrary, deliberately non-64-aligned preamble so the first section
// must be padded rather than landing at offset 0 by luck.
buf.extend_from_slice(&[0xABu8; 7]);
let mut pos = buf.len();
// The first section's stream begins after padding the 7-byte preamble
// up to the next 64-byte boundary.
assert_eq!(7 + section_padding(7), IPC_SECTION_ALIGNMENT);
write_ipc_section(&mut buf, &mut pos, &section_a).unwrap();
write_ipc_section(&mut buf, &mut pos, &section_b).unwrap();

let data = aligned_bytes(&buf);
assert_eq!(
section_padding(data.as_ptr() as usize),
0,
"base not aligned"
);

let mut offset = 7;
let read_a = read_ipc_section_at(&data, &mut offset).unwrap();
let read_b = read_ipc_section_at(&data, &mut offset).unwrap();
assert_eq!(read_a, section_a);
assert_eq!(read_b, section_b);

let data_base = data.as_ptr() as usize;
let data_end = data_base + data.len();
for batch in [&read_a, &read_b] {
for buffer in batch.column(0).to_data().buffers() {
let ptr = buffer.as_ptr() as usize;
assert!(
ptr >= data_base && ptr < data_end,
"section buffer at {ptr:#x} was realigned out of the input \
[{data_base:#x}..{data_end:#x}) — misaligned section",
);
}
}
}

#[test]
fn test_aligned_multi_batch_section_roundtrip_zero_copy() {
// A multi-batch section (e.g. IVF SQ storage chunks) must round-trip
// every batch and decode the first batch's buffers zero-copy.
let b1 = record_batch!(("v", Int64, [1i64, 2, 3])).unwrap();
let b2 = record_batch!(("v", Int64, [4i64, 5])).unwrap();
let b3 = record_batch!(("v", Int64, [6i64])).unwrap();

let mut buf = vec![0xCDu8; 5];
let mut pos = buf.len();
write_ipc_section_batches(&mut buf, &mut pos, [b1.clone(), b2.clone(), b3.clone()])
.unwrap();

let data = aligned_bytes(&buf);
let mut offset = 5;
let read = read_ipc_section_batches_at(&data, &mut offset).unwrap();
assert_eq!(read, vec![b1, b2, b3]);
assert_eq!(offset, buf.len(), "offset should land at section end");

let data_base = data.as_ptr() as usize;
let data_end = data_base + data.len();
for buffer in read[0].column(0).to_data().buffers() {
let ptr = buffer.as_ptr() as usize;
assert!(
ptr >= data_base && ptr < data_end,
"first batch buffer at {ptr:#x} was realigned out of the input",
);
}
}
}
Loading
Loading