diff --git a/arrow-array/src/array/list_array.rs b/arrow-array/src/array/list_array.rs index 24f7774f2b7d..f74ff7d3f364 100644 --- a/arrow-array/src/array/list_array.rs +++ b/arrow-array/src/array/list_array.rs @@ -38,7 +38,7 @@ use std::sync::Arc; /// [`StringArray`]: crate::array::StringArray /// [`LargeStringArray`]: crate::array::LargeStringArray pub trait OffsetSizeTrait: - ArrowNativeType + std::ops::AddAssign + Integer + num_traits::CheckedAdd + ArrowNativeType + std::ops::AddAssign + Integer + num_traits::CheckedAdd + num_traits::CheckedSub { /// True for 64 bit offset size and false for 32 bit offset size const IS_LARGE: bool; diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs index d3a4b3cffbda..3e3033ca5633 100644 --- a/arrow-buffer/src/buffer/offset.rs +++ b/arrow-buffer/src/buffer/offset.rs @@ -16,7 +16,7 @@ // under the License. use crate::buffer::ScalarBuffer; -use crate::{ArrowNativeType, MutableBuffer, NullBuffer, OffsetBufferBuilder}; +use crate::{ArrowNativeType, Buffer, MutableBuffer, NullBuffer, OffsetBufferBuilder}; use std::ops::Deref; /// A non-empty buffer of monotonically increasing, positive integers. @@ -326,6 +326,78 @@ impl OffsetBuffer { end_offset_of_last_valid_value != last_offset } + + /// Subtract `rhs` from all offsets + /// This will try to reuse the existing allocation as much as possible + /// + /// Panics: this will panic if `rhs` > the first offset or if `rhs` will lead to overflow (when `rhs` is negative) + pub fn subtract(self, rhs: O) -> Self + where + O: std::ops::Sub + std::cmp::PartialOrd + num_traits::CheckedSub, + { + if rhs == O::usize_as(0) { + return self; + } + + let len = self.len(); + + // Offset buffer is guaranteed to be non-empty + assert!( + self[0] >= rhs, + "shifted offsets will become negative which is not allowed" + ); + + // If negative, make sure that this will not create an overflow + if rhs < O::usize_as(0) { + self[len - 1].checked_sub(&rhs).expect("must not overflow"); + } + + let original_buffer = self.into_inner().into_inner(); + let original_length = original_buffer.len(); + let buffer_offset = original_buffer.ptr_offset(); + + // Remove this once https://github.com/apache/arrow-rs/issues/10117 is resolved + let into_mutable_buffer_result = if buffer_offset != 0 { + Err(original_buffer) + } else { + original_buffer.into_mutable() + }; + + let output_buffer = match into_mutable_buffer_result { + Ok(mut mutable) => { + let mut_sliced = mutable.typed_data_mut::(); + + // Remove this once https://github.com/apache/arrow-rs/pull/10118 is merged + let mut_sliced = &mut mut_sliced[0..original_length / O::get_byte_width()]; + + mut_sliced + .iter_mut() + .for_each(|offset| *offset = *offset - rhs); + + // Remove this slice once https://github.com/apache/arrow-rs/pull/10118 is merged + Buffer::from(mutable).slice_with_length(buffer_offset, original_length) + } + Err(original_buffer) => { + let shifted = original_buffer + .typed_data::() + .iter() + .map(|item| *item - rhs) + .collect::>(); + + Buffer::from_vec(shifted) + } + }; + + let output_buffer = ScalarBuffer::::from(output_buffer); + + // This is safe as we keep the following properties: + // 1. buffer is non-empty - the output buffer is derived from a valid offset buffer + // 2. values are greater than or equal to zero - we validated before that the first offset is greater than or equal to the shift + // and the first buffer value is coming from a valid offset buffer, + // and the input values are monotonically increasing since they are coming from a valid offset buffer so checking the first offset is enough + // 3. monotonically increasing values - we subtract from all offset the same value, thus keeping the same property as the input buffer which is a valid offset buffer + unsafe { OffsetBuffer::new_unchecked(output_buffer) } + } } impl Deref for OffsetBuffer { @@ -810,4 +882,157 @@ mod tests { let nulls = NullBuffer::new_valid(5); // expects 3 offsets.has_non_empty_nulls(Some(&nulls)); } + + #[test] + #[should_panic(expected = "shifted offsets will become negative which is not allowed")] + fn should_panic_for_subtract_by_value_that_will_cause_offsets_to_be_less_than_zero() { + // self[0] = 0, rhs = 1 -> 0 >= 1 is false -> assert fires + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 3, 6])); + offsets.subtract(1); + } + + #[test] + fn subtract_by_value_that_will_cause_offsets_to_be_less_than_zero_for_outside_the_slice() { + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 1, 4, 7])); + let sliced = offsets.slice(1, 2); // [1, 4, 7] + drop(offsets); + assert_eq!(sliced.as_ref(), &[1, 4, 7]); + + let result = sliced.subtract(1); + assert_eq!(result.as_ref(), &[0, 3, 6]); + assert_eq!(result.len(), 3); + } + + #[test] + #[should_panic(expected = "must not overflow")] + fn should_panic_subtract_by_value_that_will_cause_offsets_to_overflow() { + // rhs = -1 (negative). self[0] = 0 >= -1 passes. + // last offset i32::MAX - (-1) = i32::MAX + 1 -> checked_sub returns None -> expect fires + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 5, i32::MAX])); + offsets.subtract(-1); + } + + #[test] + fn subtract_by_value_that_will_cause_offsets_to_overflow_outside_the_slice() { + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 3, 6, i32::MAX])); + let sliced = offsets.slice(0, 2); // [0, 3, 6] + assert_eq!(sliced.as_ref(), &[0, 3, 6]); + + let result = sliced.subtract(-1); + assert_eq!(result.as_ref(), &[1, 4, 7]); + assert_eq!(result.len(), 3); + } + + #[test] + fn when_shift_is_0_subtract_should_reuse_the_buffer_even_when_it_is_shared() { + // subtract(0) hits the early `return self` before any into_mutable, + // so the returned buffer is the exact same allocation even while shared. + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 3, 6])); + let shared = offsets.clone(); // refcount now 2 -> shared + let result = offsets.subtract(0); + assert!( + result.ptr_eq(&shared), + "subtract(0) must return the same underlying buffer, even when shared" + ); + } + + #[test] + fn should_reuse_the_underline_data_when_the_buffer_is_not_shared() { + // Unique ownership, offset 0 -> into_mutable succeeds -> mutate in place, + // and MutableBuffer -> Buffer keeps the same allocation. + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![2, 5, 8])); + let ptr_before = offsets.as_ptr(); + let result = offsets.subtract(2); + assert_eq!( + ptr_before, + result.as_ptr(), + "a non-shared buffer should be mutated in place, reusing the allocation" + ); + assert_eq!(result.as_ref(), &[0, 3, 6]); + } + + #[test] + fn should_create_a_new_buffer_when_the_buffer_is_shared() { + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![2, 5, 8])); + let shared = offsets.clone(); + let ptr_before = offsets.as_ptr(); + let result = offsets.subtract(2); + assert_ne!( + ptr_before, + result.as_ptr(), + "a shared buffer must not be mutated in place; a new allocation is created" + ); + assert_eq!(result.as_ref(), &[0, 3, 6]); + // The shared view is untouched. + assert_eq!(shared.as_ref(), &[2, 5, 8]); + } + + #[test] + fn when_shift_is_negative_it_should_shift_offsets_in_the_right_direction() { + // rhs = -2 -> offset - (-2) = offset + 2, so all offsets move up. + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 3, 6])); + let result = offsets.subtract(-2); + assert_eq!(result.as_ref(), &[2, 5, 8]); + } + + // Replace this test with test that assert a reuse after PR #10118 is merged + #[test] + fn for_sliced_unshared_buffer_shift_should_not_reuse_buffer() { + // Underlying [0, 3, 6, 9, 12]; slice -> view [3, 6, 9]. + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![1, 3, 6, 9, 12])); + let sliced = offsets.slice(1, 2); // [3, 6, 9] + drop(offsets); // uniquely owned + assert_eq!(sliced.as_ref(), &[3, 6, 9]); + + let ptr_before = sliced.as_ptr(); + let result = sliced.subtract(1); + + assert_ne!( + ptr_before, + result.as_ptr(), + "should not be reused until #10118 is merged" + ); + + assert_eq!(result.as_ref(), &[2, 5, 8]); + } + + #[test] + fn for_sliced_but_start_at_0_unshared_buffer_shift_should_reuse_buffer() { + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![1, 3, 6, 9, 12])); + let sliced = offsets.slice(0, 2); + drop(offsets); // uniquely owned + assert_eq!(sliced.as_ref(), &[1, 3, 6]); + + let ptr_before = sliced.as_ptr(); + let result = sliced.subtract(1); + + assert_eq!(ptr_before, result.as_ptr(), "should be reused"); + + assert_eq!(result.as_ref(), &[0, 2, 5]); + } + + #[test] + fn for_sliced_shared_buffer_shifted_buffer_should_only_include_the_sliced_data() { + // Underlying: [0, 3, 6, 9, 12]; slice(1, 2) -> view [3, 6, 9]. + // `offsets` stays alive, so the sliced buffer is shared -> Err branch. + // The Err branch copies `len` (= 3) elements from the *sliced* typed_data, + // so the result contains only the sliced data, shifted. + let offsets = OffsetBuffer::new(ScalarBuffer::::from(vec![0, 3, 6, 9, 12])); + let sliced = offsets.slice(1, 2); + assert_eq!(sliced.as_ref(), &[3, 6, 9]); + + let result = sliced.subtract(3); + + assert_eq!( + result.as_ref(), + &[0, 3, 6], + "shifted result should contain only the sliced data" + ); + assert_eq!(result.len(), 3); + + // Assert that the underlying buffer of the result is not sliced to make sure it does not include the data outside the slice range from the original buffer + let underlying_buffer = result.inner().inner(); + assert_eq!(underlying_buffer.ptr_offset(), 0); + assert_eq!(underlying_buffer.len(), 3 * std::mem::size_of::()); + } }