diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index cf40d0576d17..db503130460c 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -652,7 +652,9 @@ mod tests { let valid = [false, false, true, true, false, true, true, false, false]; let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); + output + .pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()) + .unwrap(); let array = output.into_array(Some(valid_buffer), ArrowType::Utf8); let strings = array.as_any().downcast_ref::().unwrap(); @@ -706,7 +708,9 @@ mod tests { let valid = [false, false, true, true, false, false]; let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()); + output + .pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()) + .unwrap(); let array = output.into_array(Some(valid_buffer), ArrowType::Utf8); let strings = array.as_any().downcast_ref::().unwrap(); diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index 01dd9bcf8b09..e31b15116997 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -433,7 +433,9 @@ mod tests { let mut valid = vec![false, false, true, true, false, true]; let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - output.pad_nulls(0, 3, valid.len(), valid_buffer.as_slice()); + output + .pad_nulls(0, 3, valid.len(), valid_buffer.as_slice()) + .unwrap(); assert!(matches!(output, DictionaryBuffer::Dict { .. })); @@ -441,7 +443,9 @@ mod tests { valid.extend_from_slice(&[false, false, true, true, false, true, true, false]); let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - output.pad_nulls(6, 4, 8, valid_buffer.as_slice()); + output + .pad_nulls(6, 4, 8, valid_buffer.as_slice()) + .unwrap(); assert!(matches!(output, DictionaryBuffer::Dict { .. })); @@ -512,7 +516,9 @@ mod tests { let valid = [true, true, true, true, true]; let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - output.pad_nulls(0, 5, 5, valid_buffer.as_slice()); + output + .pad_nulls(0, 5, 5, valid_buffer.as_slice()) + .unwrap(); assert!(matches!(output, DictionaryBuffer::Dict { .. })); @@ -656,7 +662,7 @@ mod tests { decoder.set_data(encoding, page, 8, None).unwrap(); assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0); - output.pad_nulls(0, 0, 8, &[0]); + output.pad_nulls(0, 0, 8, &[0]).unwrap(); let array = output .into_array(Some(Buffer::from(&[0])), &data_type) .unwrap(); @@ -671,7 +677,7 @@ mod tests { decoder.set_data(encoding, page, 8, None).unwrap(); assert_eq!(decoder.skip_values(1024).unwrap(), 0); - output.pad_nulls(0, 0, 8, &[0]); + output.pad_nulls(0, 0, 8, &[0]).unwrap(); let array = output .into_array(Some(Buffer::from(&[0])), &data_type) .unwrap(); diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index c134261609be..ba785021455d 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -794,7 +794,9 @@ mod tests { let valid = [false, false, true, true, false, true, true, false, false]; let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()); + output + .pad_nulls(0, 4, valid.len(), valid_buffer.as_slice()) + .unwrap(); let array = output.into_array(Some(valid_buffer), &ArrowType::Utf8View); let strings = array.as_any().downcast_ref::().unwrap(); diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index f7e83510cf6f..f48a219160a0 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -310,10 +310,16 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { values_read: usize, levels_read: usize, valid_mask: &[u8], - ) { + ) -> Result<()> { let byte_length = self.byte_length.unwrap_or_default(); - assert_eq!(self.buffer.len(), (read_offset + values_read) * byte_length); + if self.buffer.len() != (read_offset + values_read) * byte_length { + return Err(general_err!( + "found inconsistent buffer length while padding nulls: expected {} bytes, got {}", + (read_offset + values_read) * byte_length, + self.buffer.len() + )); + } self.buffer .resize((read_offset + levels_read) * byte_length, 0); @@ -339,6 +345,7 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { }; move_values(&mut self.buffer, byte_length, values_range, valid_mask, op); } + Ok(()) } } diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs index c3cd5744d285..abf76530296c 100644 --- a/parquet/src/arrow/buffer/dictionary_buffer.rs +++ b/parquet/src/arrow/buffer/dictionary_buffer.rs @@ -206,7 +206,7 @@ impl ValuesBuffer for DictionaryBuffer Result<()> { match self { Self::Dict { keys, .. } => { keys.resize(read_offset + levels_read, K::default()); @@ -294,7 +294,9 @@ mod tests { let mut valid = vec![false, false, true, true, false, true, true, true]; let valid_buffer = Buffer::from_iter(valid.iter().cloned()); - buffer.pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice()); + buffer + .pad_nulls(0, values.len(), valid.len(), valid_buffer.as_slice()) + .unwrap(); // Read some data not preserving the dictionary @@ -305,7 +307,9 @@ mod tests { valid.extend_from_slice(&[false, false, true, false, true]); let null_buffer = Buffer::from_iter(valid.iter().cloned()); - buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice()); + buffer + .pad_nulls(read_offset, 2, 5, null_buffer.as_slice()) + .unwrap(); assert_eq!(buffer.len(), 13); let split = std::mem::replace(&mut buffer, DictionaryBuffer::with_capacity(0)); diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs index 2d9d1c9b6be8..3090842679c6 100644 --- a/parquet/src/arrow/buffer/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -157,8 +157,14 @@ impl ValuesBuffer for OffsetBuffer { values_read: usize, levels_read: usize, valid_mask: &[u8], - ) { - assert_eq!(self.offsets.len(), read_offset + values_read + 1); + ) -> Result<()> { + if self.offsets.len() != read_offset + values_read + 1 { + return Err(general_err!( + "found inconsistent offsets while padding nulls: expected {} offsets, got {}", + read_offset + values_read + 1, + self.offsets.len() + )); + } self.offsets .resize(read_offset + levels_read + 1, I::default()); @@ -173,8 +179,9 @@ impl ValuesBuffer for OffsetBuffer { .rev() .zip(iter_set_bits_rev(valid_mask)) { - assert!(level_pos >= value_pos); - assert!(level_pos < last_pos); + if level_pos < value_pos || level_pos >= last_pos { + return Err(general_err!("found corrupt level data while padding nulls")); + } let end_offset = offsets[value_pos + 1]; let start_offset = offsets[value_pos]; @@ -185,7 +192,7 @@ impl ValuesBuffer for OffsetBuffer { } if level_pos == value_pos { - return; + return Ok(()); } offsets[level_pos] = start_offset; @@ -197,6 +204,7 @@ impl ValuesBuffer for OffsetBuffer { for x in &mut offsets[values_range.start + 1..last_pos] { *x = last_start_offset } + Ok(()) } } @@ -268,7 +276,9 @@ mod tests { let valid_mask = Buffer::from_iter(valid.iter().copied()); // Both trailing and leading nulls - buffer.pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice()); + buffer + .pad_nulls(1, values.len() - 1, valid.len() - 1, valid_mask.as_slice()) + .unwrap(); let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8); let strings = array.as_any().downcast_ref::().unwrap(); @@ -330,11 +340,44 @@ mod tests { buffer.check_valid_utf8(12).unwrap_err(); } + #[test] + fn test_pad_nulls_corrupt_input_returns_err() { + // Corrupt input must produce a decode error rather than panicking. + + // Offsets inconsistent with `values_read`: only one value was pushed, + // but three are claimed to have been read. + let mut buffer = OffsetBuffer::::with_capacity(0); + buffer.try_push("a".as_bytes(), false).unwrap(); + let valid_mask = Buffer::from_iter([true, false, false]); + let err = buffer + .pad_nulls(0, 3, 3, valid_mask.as_slice()) + .unwrap_err(); + assert!( + err.to_string().contains("inconsistent offsets"), + "unexpected error: {err}" + ); + + // Valid mask has fewer set bits than `values_read`, which previously + // tripped an assertion in the null-padding loop. + let mut buffer = OffsetBuffer::::with_capacity(0); + for v in ["a", "b", "c"] { + buffer.try_push(v.as_bytes(), false).unwrap(); + } + let valid_mask = Buffer::from_iter([true, false, false]); + let err = buffer + .pad_nulls(0, 3, 3, valid_mask.as_slice()) + .unwrap_err(); + assert!( + err.to_string().contains("corrupt level data"), + "unexpected error: {err}" + ); + } + #[test] fn test_pad_nulls_empty() { let mut buffer = OffsetBuffer::::with_capacity(0); let valid_mask = Buffer::from_iter(std::iter::repeat_n(false, 9)); - buffer.pad_nulls(0, 0, 9, valid_mask.as_slice()); + buffer.pad_nulls(0, 0, 9, valid_mask.as_slice()).unwrap(); let array = buffer.into_array(Some(valid_mask), ArrowType::Utf8); let strings = array.as_any().downcast_ref::().unwrap(); diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index b1bdeb64e5c0..f20ca116e906 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -16,6 +16,7 @@ // under the License. use crate::arrow::record_reader::buffer::ValuesBuffer; +use crate::errors::Result; use arrow_array::{ArrayRef, BinaryViewArray, StringViewArray}; use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer}; use arrow_schema::DataType as ArrowType; @@ -81,9 +82,9 @@ impl ValuesBuffer for ViewBuffer { values_read: usize, levels_read: usize, valid_mask: &[u8], - ) { + ) -> Result<()> { self.views - .pad_nulls(read_offset, values_read, levels_read, valid_mask); + .pad_nulls(read_offset, values_read, levels_read, valid_mask) } } @@ -146,7 +147,9 @@ mod tests { let valid = [true, false, false, true, false, false, true]; let valid_mask = Buffer::from_iter(valid.iter().copied()); - buffer.pad_nulls(1, 2, valid.len() - 1, valid_mask.as_slice()); + buffer + .pad_nulls(1, 2, valid.len() - 1, valid_mask.as_slice()) + .unwrap(); let array = buffer.into_array(Some(valid_mask), &ArrowType::Utf8View); let strings = array diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 6e0855dda376..40fb598e596e 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -16,6 +16,7 @@ // under the License. use crate::arrow::buffer::bit_util::iter_set_bits_rev; +use crate::errors::Result; /// A buffer that supports padding with nulls pub trait ValuesBuffer { @@ -39,13 +40,15 @@ pub trait ValuesBuffer { /// - `levels_read` - the number of levels read /// - `valid_mask` - a packed mask of valid levels /// + /// Returns an error if the inputs are inconsistent, for example because the + /// decoded data was corrupt. This must not panic on such input. fn pad_nulls( &mut self, read_offset: usize, values_read: usize, levels_read: usize, valid_mask: &[u8], - ); + ) -> Result<()>; } impl ValuesBuffer for Vec { @@ -59,7 +62,7 @@ impl ValuesBuffer for Vec { values_read: usize, levels_read: usize, valid_mask: &[u8], - ) { + ) -> Result<()> { self.resize(read_offset + levels_read, T::default()); let values_range = read_offset..read_offset + values_read; @@ -70,5 +73,6 @@ impl ValuesBuffer for Vec { } self[level_pos] = self[value_pos]; } + Ok(()) } } diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index d2d1326239ba..625aa674b2fb 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -251,7 +251,7 @@ where values_read, levels_read, def_levels.nulls().as_slice(), - ); + )?; } self.num_records += records_read;