Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arrow-array/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use std::sync::Arc;
/// [`StringArray`]: crate::array::StringArray
/// [`LargeStringArray`]: crate::array::LargeStringArray
pub trait OffsetSizeTrait:
ArrowNativeType + std::ops::AddAssign + Integer + num_traits::CheckedAdd
ArrowNativeType + std::ops::AddAssign + Integer + num_traits::CheckedAdd + num_traits::CheckedSub

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is sealed since ArrowNativeType is sealed, so it is safe to add this here

{
/// True for 64 bit offset size and false for 32 bit offset size
const IS_LARGE: bool;
Expand Down
227 changes: 226 additions & 1 deletion arrow-buffer/src/buffer/offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::buffer::ScalarBuffer;
use crate::{ArrowNativeType, MutableBuffer, NullBuffer, OffsetBufferBuilder};
use crate::{ArrowNativeType, Buffer, MutableBuffer, NullBuffer, OffsetBufferBuilder};
use std::ops::Deref;

/// A non-empty buffer of monotonically increasing, positive integers.
Expand Down Expand Up @@ -326,6 +326,78 @@ impl<O: ArrowNativeType> OffsetBuffer<O> {

end_offset_of_last_valid_value != last_offset
}

/// Subtract `rhs` from all offsets
/// This will try to reuse the existing allocation as much as possible
///
/// Panics: this will panic if `rhs` > the first offset or if `rhs` will lead to overflow (when `rhs` is negative)
pub fn subtract(self, rhs: O) -> Self
where
O: std::ops::Sub<Output = O> + std::cmp::PartialOrd + num_traits::CheckedSub,
{
if rhs == O::usize_as(0) {
return self;
}

let len = self.len();

// Offset buffer is guaranteed to be non-empty
assert!(
self[0] >= rhs,
"shifted offsets will become negative which is not allowed"
);

// If negative, make sure that this will not create an overflow
if rhs < O::usize_as(0) {
self[len - 1].checked_sub(&rhs).expect("must not overflow");
}

let original_buffer = self.into_inner().into_inner();
let original_length = original_buffer.len();
let buffer_offset = original_buffer.ptr_offset();

// Remove this once https://github.com/apache/arrow-rs/issues/10117 is resolved
let into_mutable_buffer_result = if buffer_offset != 0 {
Err(original_buffer)
} else {
original_buffer.into_mutable()
};

let output_buffer = match into_mutable_buffer_result {
Ok(mut mutable) => {
let mut_sliced = mutable.typed_data_mut::<O>();

// Remove this once https://github.com/apache/arrow-rs/pull/10118 is merged
let mut_sliced = &mut mut_sliced[0..original_length / O::get_byte_width()];

mut_sliced
.iter_mut()
.for_each(|offset| *offset = *offset - rhs);

// Remove this slice once https://github.com/apache/arrow-rs/pull/10118 is merged
Buffer::from(mutable).slice_with_length(buffer_offset, original_length)
}
Err(original_buffer) => {
let shifted = original_buffer
.typed_data::<O>()
.iter()
.map(|item| *item - rhs)
.collect::<Vec<O>>();

Buffer::from_vec(shifted)
}
};

let output_buffer = ScalarBuffer::<O>::from(output_buffer);

// This is safe as we keep the following properties:
// 1. buffer is non-empty - the output buffer is derived from a valid offset buffer
// 2. values are greater than or equal to zero - we validated before that the first offset is greater than or equal to the shift
// and the first buffer value is coming from a valid offset buffer,
// and the input values are monotonically increasing since they are coming from a valid offset buffer so checking the first offset is enough
// 3. monotonically increasing values - we subtract from all offset the same value, thus keeping the same property as the input buffer which is a valid offset buffer
unsafe { OffsetBuffer::new_unchecked(output_buffer) }
}
}

impl<T: ArrowNativeType> Deref for OffsetBuffer<T> {
Expand Down Expand Up @@ -810,4 +882,157 @@ mod tests {
let nulls = NullBuffer::new_valid(5); // expects 3
offsets.has_non_empty_nulls(Some(&nulls));
}

#[test]
#[should_panic(expected = "shifted offsets will become negative which is not allowed")]
fn should_panic_for_subtract_by_value_that_will_cause_offsets_to_be_less_than_zero() {
// self[0] = 0, rhs = 1 -> 0 >= 1 is false -> assert fires
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 6]));
offsets.subtract(1);
}

#[test]
fn subtract_by_value_that_will_cause_offsets_to_be_less_than_zero_for_outside_the_slice() {
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 1, 4, 7]));
let sliced = offsets.slice(1, 2); // [1, 4, 7]
drop(offsets);
assert_eq!(sliced.as_ref(), &[1, 4, 7]);

let result = sliced.subtract(1);
assert_eq!(result.as_ref(), &[0, 3, 6]);
assert_eq!(result.len(), 3);
}

#[test]
#[should_panic(expected = "must not overflow")]
fn should_panic_subtract_by_value_that_will_cause_offsets_to_overflow() {
// rhs = -1 (negative). self[0] = 0 >= -1 passes.
// last offset i32::MAX - (-1) = i32::MAX + 1 -> checked_sub returns None -> expect fires
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 5, i32::MAX]));
offsets.subtract(-1);
}

#[test]
fn subtract_by_value_that_will_cause_offsets_to_overflow_outside_the_slice() {
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 6, i32::MAX]));
let sliced = offsets.slice(0, 2); // [0, 3, 6]
assert_eq!(sliced.as_ref(), &[0, 3, 6]);

let result = sliced.subtract(-1);
assert_eq!(result.as_ref(), &[1, 4, 7]);
assert_eq!(result.len(), 3);
}

#[test]
fn when_shift_is_0_subtract_should_reuse_the_buffer_even_when_it_is_shared() {
// subtract(0) hits the early `return self` before any into_mutable,
// so the returned buffer is the exact same allocation even while shared.
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 6]));
let shared = offsets.clone(); // refcount now 2 -> shared
let result = offsets.subtract(0);
assert!(
result.ptr_eq(&shared),
"subtract(0) must return the same underlying buffer, even when shared"
);
}

#[test]
fn should_reuse_the_underline_data_when_the_buffer_is_not_shared() {
// Unique ownership, offset 0 -> into_mutable succeeds -> mutate in place,
// and MutableBuffer -> Buffer keeps the same allocation.
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![2, 5, 8]));
let ptr_before = offsets.as_ptr();
let result = offsets.subtract(2);
assert_eq!(
ptr_before,
result.as_ptr(),
"a non-shared buffer should be mutated in place, reusing the allocation"
);
assert_eq!(result.as_ref(), &[0, 3, 6]);
}

#[test]
fn should_create_a_new_buffer_when_the_buffer_is_shared() {
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![2, 5, 8]));
let shared = offsets.clone();
let ptr_before = offsets.as_ptr();
let result = offsets.subtract(2);
assert_ne!(
ptr_before,
result.as_ptr(),
"a shared buffer must not be mutated in place; a new allocation is created"
);
assert_eq!(result.as_ref(), &[0, 3, 6]);
// The shared view is untouched.
assert_eq!(shared.as_ref(), &[2, 5, 8]);
}

#[test]
fn when_shift_is_negative_it_should_shift_offsets_in_the_right_direction() {
// rhs = -2 -> offset - (-2) = offset + 2, so all offsets move up.
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 6]));
let result = offsets.subtract(-2);
assert_eq!(result.as_ref(), &[2, 5, 8]);
}

// Replace this test with test that assert a reuse after PR #10118 is merged
#[test]
fn for_sliced_unshared_buffer_shift_should_not_reuse_buffer() {
// Underlying [0, 3, 6, 9, 12]; slice -> view [3, 6, 9].
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![1, 3, 6, 9, 12]));
let sliced = offsets.slice(1, 2); // [3, 6, 9]
drop(offsets); // uniquely owned
assert_eq!(sliced.as_ref(), &[3, 6, 9]);

let ptr_before = sliced.as_ptr();
let result = sliced.subtract(1);

assert_ne!(
ptr_before,
result.as_ptr(),
"should not be reused until #10118 is merged"
);

assert_eq!(result.as_ref(), &[2, 5, 8]);
}
Comment on lines +978 to +997

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After pr #10118 is merged we should replace this test with the following test that verify reuse

(this test need some cleanup but the idea is the same)

#[test]
fn for_sliced_unshared_buffer_shift_should_reuse_buffer_but_only_modify_the_data_in_slice() {
    // Underlying [0, 3, 6, 9, 12]; slice -> view [3, 6, 9].
    let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![1, 3, 6, 9, 12]));
    let sliced = offsets.slice(1, 2); // [3, 6, 9]
    drop(offsets); // uniquely owned -> eligible for in-place reuse
    assert_eq!(sliced.as_ref(), &[3, 6, 9]);

    let ptr_before = sliced.as_ptr();
    let result = sliced.subtract(1);

    // Reuses the allocation...
    assert_eq!(
        ptr_before,
        result.as_ptr(),
        "uniquely-owned sliced buffer should be mutated in place"
    );

    // ...but only the slice is shifted, and the length stays the slice length.
    assert_eq!(result.as_ref(), &[2, 5, 8]);
    assert_eq!(result.len(), 3);
    let underlying_buffer = result.inner().inner();
    // data should be shifted by 1
    assert_eq!(underlying_buffer.ptr_offset(), std::mem::size_of::<i32>());
    let original_value_ptr = unsafe {underlying_buffer.data_ptr().as_ptr() as *mut i32};
    let value_in_ptr = unsafe {*original_value_ptr};
    assert_eq!(value_in_ptr, 1);
    let last_value_ptr = unsafe {(underlying_buffer.data_ptr().as_ptr() as *mut i32).add(5)};
    assert_eq!(unsafe {*last_value_ptr}, 12);
}


#[test]
fn for_sliced_but_start_at_0_unshared_buffer_shift_should_reuse_buffer() {
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![1, 3, 6, 9, 12]));
let sliced = offsets.slice(0, 2);
drop(offsets); // uniquely owned
assert_eq!(sliced.as_ref(), &[1, 3, 6]);

let ptr_before = sliced.as_ptr();
let result = sliced.subtract(1);

assert_eq!(ptr_before, result.as_ptr(), "should be reused");

assert_eq!(result.as_ref(), &[0, 2, 5]);
}

#[test]
fn for_sliced_shared_buffer_shifted_buffer_should_only_include_the_sliced_data() {
// Underlying: [0, 3, 6, 9, 12]; slice(1, 2) -> view [3, 6, 9].
// `offsets` stays alive, so the sliced buffer is shared -> Err branch.
// The Err branch copies `len` (= 3) elements from the *sliced* typed_data,
// so the result contains only the sliced data, shifted.
let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 3, 6, 9, 12]));
let sliced = offsets.slice(1, 2);
assert_eq!(sliced.as_ref(), &[3, 6, 9]);

let result = sliced.subtract(3);

assert_eq!(
result.as_ref(),
&[0, 3, 6],
"shifted result should contain only the sliced data"
);
assert_eq!(result.len(), 3);

// Assert that the underlying buffer of the result is not sliced to make sure it does not include the data outside the slice range from the original buffer
let underlying_buffer = result.inner().inner();
assert_eq!(underlying_buffer.ptr_offset(), 0);
assert_eq!(underlying_buffer.len(), 3 * std::mem::size_of::<i32>());
}
}
Loading