Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,9 @@ mod tests {
let valid = [false, false, true, true, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
output
.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice())
.unwrap();
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();

Expand Down Expand Up @@ -706,7 +708,9 @@ mod tests {
let valid = [false, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice());
output
.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice())
.unwrap();
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();

Expand Down
16 changes: 11 additions & 5 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,15 +433,19 @@ mod tests {

let mut valid = vec![false, false, true, true, false, true];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
output.pad_nulls(0, 3, valid.len(), valid_buffer.as_slice());
output
.pad_nulls(0, 3, valid.len(), valid_buffer.as_slice())
.unwrap();

assert!(matches!(output, DictionaryBuffer::Dict { .. }));

assert_eq!(decoder.read(&mut output, 4).unwrap(), 4);

valid.extend_from_slice(&[false, false, true, true, false, true, true, false]);
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
output.pad_nulls(6, 4, 8, valid_buffer.as_slice());
output
.pad_nulls(6, 4, 8, valid_buffer.as_slice())
.unwrap();

assert!(matches!(output, DictionaryBuffer::Dict { .. }));

Expand Down Expand Up @@ -512,7 +516,9 @@ mod tests {

let valid = [true, true, true, true, true];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
output.pad_nulls(0, 5, 5, valid_buffer.as_slice());
output
.pad_nulls(0, 5, 5, valid_buffer.as_slice())
.unwrap();

assert!(matches!(output, DictionaryBuffer::Dict { .. }));

Expand Down Expand Up @@ -656,7 +662,7 @@ mod tests {
decoder.set_data(encoding, page, 8, None).unwrap();
assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);

output.pad_nulls(0, 0, 8, &[0]);
output.pad_nulls(0, 0, 8, &[0]).unwrap();
let array = output
.into_array(Some(Buffer::from(&[0])), &data_type)
.unwrap();
Expand All @@ -671,7 +677,7 @@ mod tests {
decoder.set_data(encoding, page, 8, None).unwrap();
assert_eq!(decoder.skip_values(1024).unwrap(), 0);

output.pad_nulls(0, 0, 8, &[0]);
output.pad_nulls(0, 0, 8, &[0]).unwrap();
let array = output
.into_array(Some(Buffer::from(&[0])), &data_type)
.unwrap();
Expand Down
4 changes: 3 additions & 1 deletion parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,9 @@ mod tests {
let valid = [false, false, true, true, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
output
.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice())
.unwrap();
let array = output.into_array(Some(valid_buffer), &ArrowType::Utf8View);
let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();

Expand Down
11 changes: 9 additions & 2 deletions parquet/src/arrow/array_reader/fixed_len_byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,16 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
values_read: usize,
levels_read: usize,
valid_mask: &[u8],
) {
) -> Result<()> {
let byte_length = self.byte_length.unwrap_or_default();

assert_eq!(self.buffer.len(), (read_offset + values_read) * byte_length);
if self.buffer.len() != (read_offset + values_read) * byte_length {
return Err(general_err!(
"found inconsistent buffer length while padding nulls: expected {} bytes, got {}",
(read_offset + values_read) * byte_length,
self.buffer.len()
));
}
self.buffer
.resize((read_offset + levels_read) * byte_length, 0);

Expand All @@ -339,6 +345,7 @@ impl ValuesBuffer for FixedLenByteArrayBuffer {
};
move_values(&mut self.buffer, byte_length, values_range, valid_mask, op);
}
Ok(())
}
}

Expand Down
10 changes: 7 additions & 3 deletions parquet/src/arrow/buffer/dictionary_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl<K: ArrowNativeType, V: OffsetSizeTrait> ValuesBuffer for DictionaryBuffer<K
values_read: usize,
levels_read: usize,
valid_mask: &[u8],
) {
) -> Result<()> {
match self {
Self::Dict { keys, .. } => {
keys.resize(read_offset + levels_read, K::default());
Expand Down Expand Up @@ -294,7 +294,9 @@ mod tests {

let mut valid = vec![false, false, true, true, false, true, true, true];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice());
buffer
.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice())
.unwrap();

// Read some data not preserving the dictionary

Expand All @@ -305,7 +307,9 @@ mod tests {

valid.extend_from_slice(&[false, false, true, false, true]);
let null_buffer = Buffer::from_iter(valid.iter().cloned());
buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice());
buffer
.pad_nulls(read_offset, 2, 5, null_buffer.as_slice())
.unwrap();

assert_eq!(buffer.len(), 13);
let split = std::mem::replace(&mut buffer, DictionaryBuffer::with_capacity(0));
Expand Down
57 changes: 50 additions & 7 deletions parquet/src/arrow/buffer/offset_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,14 @@ impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
values_read: usize,
levels_read: usize,
valid_mask: &[u8],
) {
assert_eq!(self.offsets.len(), read_offset + values_read + 1);
) -> Result<()> {
if self.offsets.len() != read_offset + values_read + 1 {
return Err(general_err!(
"found inconsistent offsets while padding nulls: expected {} offsets, got {}",
read_offset + values_read + 1,
self.offsets.len()
));
}
self.offsets
.resize(read_offset + levels_read + 1, I::default());

Expand All @@ -173,8 +179,9 @@ impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
.rev()
.zip(iter_set_bits_rev(valid_mask))
{
assert!(level_pos >= value_pos);
assert!(level_pos < last_pos);
if level_pos < value_pos || level_pos >= last_pos {
return Err(general_err!("found corrupt level data while padding nulls"));
}

let end_offset = offsets[value_pos + 1];
let start_offset = offsets[value_pos];
Expand All @@ -185,7 +192,7 @@ impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
}

if level_pos == value_pos {
return;
return Ok(());
}

offsets[level_pos] = start_offset;
Expand All @@ -197,6 +204,7 @@ impl<I: OffsetSizeTrait> ValuesBuffer for OffsetBuffer<I> {
for x in &mut offsets[values_range.start + 1..last_pos] {
*x = last_start_offset
}
Ok(())
}
}

Expand Down Expand Up @@ -268,7 +276,9 @@ mod tests {
let valid_mask = Buffer::from_iter(valid.iter().copied());

// Both trailing and leading nulls
buffer.pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice());
buffer
.pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice())
.unwrap();

let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
Expand Down Expand Up @@ -330,11 +340,44 @@ mod tests {
buffer.check_valid_utf8(12).unwrap_err();
}

#[test]
fn test_pad_nulls_corrupt_input_returns_err() {
// Corrupt input must produce a decode error rather than panicking.

// Offsets inconsistent with `values_read`: only one value was pushed,
// but three are claimed to have been read.
let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
buffer.try_push("a".as_bytes(), false).unwrap();
let valid_mask = Buffer::from_iter([true, false, false]);
let err = buffer
.pad_nulls(0, 3, 3, valid_mask.as_slice())
.unwrap_err();
assert!(
err.to_string().contains("inconsistent offsets"),
"unexpected error: {err}"
);

// Valid mask has fewer set bits than `values_read`, which previously
// tripped an assertion in the null-padding loop.
let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
for v in ["a", "b", "c"] {
buffer.try_push(v.as_bytes(), false).unwrap();
}
let valid_mask = Buffer::from_iter([true, false, false]);
let err = buffer
.pad_nulls(0, 3, 3, valid_mask.as_slice())
.unwrap_err();
assert!(
err.to_string().contains("corrupt level data"),
"unexpected error: {err}"
);
}

#[test]
fn test_pad_nulls_empty() {
let mut buffer = OffsetBuffer::<i32>::with_capacity(0);
let valid_mask = Buffer::from_iter(std::iter::repeat_n(false, 9));
buffer.pad_nulls(0, 0, 9, valid_mask.as_slice());
buffer.pad_nulls(0, 0, 9, valid_mask.as_slice()).unwrap();

let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
Expand Down
9 changes: 6 additions & 3 deletions parquet/src/arrow/buffer/view_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::arrow::record_reader::buffer::ValuesBuffer;
use crate::errors::Result;
use arrow_array::{ArrayRef, BinaryViewArray, StringViewArray};
use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer};
use arrow_schema::DataType as ArrowType;
Expand Down Expand Up @@ -81,9 +82,9 @@ impl ValuesBuffer for ViewBuffer {
values_read: usize,
levels_read: usize,
valid_mask: &[u8],
) {
) -> Result<()> {
self.views
.pad_nulls(read_offset, values_read, levels_read, valid_mask);
.pad_nulls(read_offset, values_read, levels_read, valid_mask)
}
}

Expand Down Expand Up @@ -146,7 +147,9 @@ mod tests {
let valid = [true, false, false, true, false, false, true];
let valid_mask = Buffer::from_iter(valid.iter().copied());

buffer.pad_nulls(1, 2, valid.len() - 1, valid_mask.as_slice());
buffer
.pad_nulls(1, 2, valid.len() - 1, valid_mask.as_slice())
.unwrap();

let array = buffer.into_array(Some(valid_mask), &ArrowType::Utf8View);
let strings = array
Expand Down
8 changes: 6 additions & 2 deletions parquet/src/arrow/record_reader/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::arrow::buffer::bit_util::iter_set_bits_rev;
use crate::errors::Result;

/// A buffer that supports padding with nulls
pub trait ValuesBuffer {
Expand All @@ -39,13 +40,15 @@ pub trait ValuesBuffer {
/// - `levels_read` - the number of levels read
/// - `valid_mask` - a packed mask of valid levels
///
/// Returns an error if the inputs are inconsistent, for example because the
/// decoded data was corrupt. This must not panic on such input.
fn pad_nulls(
&mut self,
read_offset: usize,
values_read: usize,
levels_read: usize,
valid_mask: &[u8],
);
) -> Result<()>;
}

impl<T: Copy + Default> ValuesBuffer for Vec<T> {
Expand All @@ -59,7 +62,7 @@ impl<T: Copy + Default> ValuesBuffer for Vec<T> {
values_read: usize,
levels_read: usize,
valid_mask: &[u8],
) {
) -> Result<()> {
self.resize(read_offset + levels_read, T::default());

let values_range = read_offset..read_offset + values_read;
Expand All @@ -70,5 +73,6 @@ impl<T: Copy + Default> ValuesBuffer for Vec<T> {
}
self[level_pos] = self[value_pos];
}
Ok(())
}
}
2 changes: 1 addition & 1 deletion parquet/src/arrow/record_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ where
values_read,
levels_read,
def_levels.nulls().as_slice(),
);
)?;
}

self.num_records += records_read;
Expand Down