diff --git a/parquet/src/arrow/arrow_reader/read_plan.rs b/parquet/src/arrow/arrow_reader/read_plan.rs index 674ae2b8d964..b1baab1007c8 100644 --- a/parquet/src/arrow/arrow_reader/read_plan.rs +++ b/parquet/src/arrow/arrow_reader/read_plan.rs @@ -19,8 +19,9 @@ //! from a Parquet file use crate::arrow::array_reader::ArrayReader; -use crate::arrow::arrow_reader::selection::RowSelectionPolicy; -use crate::arrow::arrow_reader::selection::RowSelectionStrategy; +use crate::arrow::arrow_reader::selection::{ + RowSelectionInner, RowSelectionPolicy, RowSelectionStrategy, mask_to_selectors, +}; use crate::arrow::arrow_reader::{ ArrowPredicate, ParquetRecordBatchReader, RowSelection, RowSelectionCursor, RowSelector, }; @@ -157,6 +158,10 @@ impl ReadPlanBuilder { None => return RowSelectionStrategy::Selectors, }; + if selection.as_mask().is_some() { + return RowSelectionStrategy::Mask; + } + // total_rows: total number of rows selected / skipped // effective_count: number of non-empty selectors let (total_rows, effective_count) = @@ -302,19 +307,8 @@ impl ReadPlanBuilder { row_selection_policy: _, } = self; - let selection = selection.map(|s| s.trim()); - let row_selection_cursor = selection - .map(|s| { - let trimmed = s.trim(); - let selectors: Vec = trimmed.into(); - match selection_strategy { - RowSelectionStrategy::Mask => { - RowSelectionCursor::new_mask_from_selectors(selectors) - } - RowSelectionStrategy::Selectors => RowSelectionCursor::new_selectors(selectors), - } - }) + .map(|s| build_cursor(s.trim(), selection_strategy)) .unwrap_or(RowSelectionCursor::new_all()); ReadPlan { @@ -324,6 +318,24 @@ impl ReadPlanBuilder { } } +/// Lower a [`RowSelection`] to the cursor form requested by the resolved strategy. +fn build_cursor(selection: RowSelection, strategy: RowSelectionStrategy) -> RowSelectionCursor { + match (strategy, selection.into_inner()) { + (RowSelectionStrategy::Mask, RowSelectionInner::Mask(mask)) => { + RowSelectionCursor::new_mask_from_buffer(mask) + } + (RowSelectionStrategy::Mask, RowSelectionInner::Selectors(selectors)) => { + RowSelectionCursor::new_mask_from_selectors(selectors) + } + (RowSelectionStrategy::Selectors, RowSelectionInner::Selectors(selectors)) => { + RowSelectionCursor::new_selectors(selectors) + } + (RowSelectionStrategy::Selectors, RowSelectionInner::Mask(mask)) => { + RowSelectionCursor::new_selectors(mask_to_selectors(&mask)) + } + } +} + /// Builder for [`ReadPlan`] that applies a limit and offset to the read plan /// /// See [`ReadPlanBuilder::limited`] to create this builder. diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 2ddf812f9c39..cd34f18b4b87 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -19,13 +19,14 @@ use crate::arrow::ProjectionMask; use crate::errors::ParquetError; use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation}; use arrow_array::{Array, BooleanArray}; +use arrow_buffer::bit_iterator::BitSliceIterator; use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use arrow_select::filter::SlicesIterator; use std::cmp::Ordering; use std::collections::VecDeque; use std::ops::Range; -/// Policy for picking a strategy to materialise [`RowSelection`] during execution. +/// Policy for picking a strategy to materialize [`RowSelection`] during execution. /// /// Note that this is a user-provided preference, and the actual strategy used /// may differ based on safety considerations (e.g. page skipping). @@ -33,7 +34,7 @@ use std::ops::Range; pub enum RowSelectionPolicy { /// Use a queue of [`RowSelector`] values Selectors, - /// Use a boolean mask to materialise the selection + /// Use a boolean mask to materialize the selection Mask, /// Choose between [`Self::Mask`] and [`Self::Selectors`] based on selector density Auto { @@ -56,7 +57,7 @@ impl Default for RowSelectionPolicy { pub(crate) enum RowSelectionStrategy { /// Use a queue of [`RowSelector`] values Selectors, - /// Use a boolean mask to materialise the selection + /// Use a boolean mask to materialize the selection Mask, } @@ -127,6 +128,13 @@ impl RowSelector { /// RowSelection::from_consecutive_ranges(ranges.into_iter(), 20); /// let actual: Vec = selection.into(); /// assert_eq!(actual, expected); +/// +/// // or directly from a packed bitmap, when the upstream producer already +/// // has one. The bitmap is kept as-is rather than run-length-encoded. +/// use arrow_buffer::BooleanBuffer; +/// let mask = BooleanBuffer::from(vec![true, false, true, true]); +/// let selection = RowSelection::from_boolean_buffer(mask); +/// assert_eq!(selection.row_count(), 3); /// ``` /// /// A [`RowSelection`] maintains the following invariants: @@ -135,12 +143,345 @@ impl RowSelector { /// * Consecutive [`RowSelector`]s alternate skipping or selecting rows /// /// [`PageIndex`]: crate::file::page_index::column_index::ColumnIndexMetaData -#[derive(Debug, Clone, Default, Eq, PartialEq)] +#[derive(Default, Clone)] pub struct RowSelection { - selectors: Vec, + inner: RowSelectionInner, +} + +/// Internal storage for [`RowSelection`]. +#[derive(Debug, Clone)] +pub(crate) enum RowSelectionInner { + Selectors(Vec), + Mask(BooleanBuffer), +} + +impl Default for RowSelectionInner { + fn default() -> Self { + Self::Selectors(Vec::new()) + } +} + +impl std::fmt::Debug for RowSelection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self.inner { + RowSelectionInner::Selectors(s) => f + .debug_struct("RowSelection") + .field("selectors", s) + .finish(), + RowSelectionInner::Mask(m) => f + .debug_struct("RowSelection") + .field("mask_len", &m.len()) + .finish_non_exhaustive(), + } + } +} + +impl PartialEq for RowSelection { + fn eq(&self, other: &Self) -> bool { + match (&self.inner, &other.inner) { + (RowSelectionInner::Selectors(a), RowSelectionInner::Selectors(b)) => a == b, + (RowSelectionInner::Mask(a), RowSelectionInner::Mask(b)) => a == b, + (RowSelectionInner::Mask(mask), RowSelectionInner::Selectors(selectors)) + | (RowSelectionInner::Selectors(selectors), RowSelectionInner::Mask(mask)) => { + if selectors + .iter() + .try_fold(0usize, |acc, selector| acc.checked_add(selector.row_count)) + != Some(mask.len()) + { + return false; + } + + let mut slices = mask.set_slices().peekable(); + let mut cursor = 0usize; + + for selector in selectors { + let end = cursor + selector.row_count; + + if selector.skip { + if slices.peek().is_some_and(|(start, _)| *start < end) { + return false; + } + } else { + match slices.next() { + Some((start, slice_end)) if start == cursor && slice_end == end => {} + _ => return false, + } + } + + cursor = end; + } + + slices.next().is_none() + } + } + } +} + +impl Eq for RowSelection {} + +/// Iterator over the [`RowSelector`]s of a [`RowSelection`]. +/// +/// Yields owned [`RowSelector`] values; mask-backed selections stream the +/// run-length form via [`BitSliceIterator`] without allocation. +#[derive(Debug)] +pub enum RowSelectionIter<'a> { + Selectors(std::iter::Copied>), + Mask(MaskRunIter<'a>), +} + +impl Iterator for RowSelectionIter<'_> { + type Item = RowSelector; + + #[inline] + fn next(&mut self) -> Option { + match self { + Self::Selectors(it) => it.next(), + Self::Mask(it) => it.next(), + } + } +} + +/// Streaming RLE view of a [`BooleanBuffer`]. +#[derive(Debug)] +pub struct MaskRunIter<'a> { + slices: BitSliceIterator<'a>, + cursor: usize, + total: usize, + pending: Option, + finished: bool, +} + +impl<'a> MaskRunIter<'a> { + fn new(mask: &'a BooleanBuffer) -> Self { + Self { + slices: mask.set_slices(), + cursor: 0, + total: mask.len(), + pending: None, + finished: false, + } + } +} + +impl Iterator for MaskRunIter<'_> { + type Item = RowSelector; + + fn next(&mut self) -> Option { + if let Some(p) = self.pending.take() { + return Some(p); + } + if self.finished { + return None; + } + match self.slices.next() { + Some((start, end)) => { + let select = RowSelector::select(end - start); + if start > self.cursor { + let skip = RowSelector::skip(start - self.cursor); + self.pending = Some(select); + self.cursor = end; + Some(skip) + } else { + self.cursor = end; + Some(select) + } + } + None => { + self.finished = true; + if self.cursor < self.total { + let skip = RowSelector::skip(self.total - self.cursor); + self.cursor = self.total; + Some(skip) + } else { + None + } + } + } + } +} + +/// Materialize a [`BooleanBuffer`] into its RLE form. +pub(crate) fn mask_to_selectors(mask: &BooleanBuffer) -> Vec { + let total_rows = mask.len(); + if total_rows == 0 { + return Vec::new(); + } + let mut selectors: Vec = Vec::new(); + let mut last_end = 0; + for (start, end) in mask.set_slices() { + if start > last_end { + selectors.push(RowSelector::skip(start - last_end)); + } + selectors.push(RowSelector::select(end - start)); + last_end = end; + } + if last_end != total_rows { + selectors.push(RowSelector::skip(total_rows - last_end)); + } + selectors +} + +/// Bitwise AND of two mask-backed selections. Longer side's tail passes through. +fn intersect_masks(l: &BooleanBuffer, r: &BooleanBuffer) -> BooleanBuffer { + if l.len() == r.len() { + return l & r; + } + let common = l.len().min(r.len()); + let head = &l.slice(0, common) & &r.slice(0, common); + let (longer, longer_len) = if l.len() > r.len() { + (l, l.len()) + } else { + (r, r.len()) + }; + let tail = longer.slice(common, longer_len - common); + let mut builder = BooleanBufferBuilder::new(longer_len); + builder.append_buffer(&head); + builder.append_buffer(&tail); + builder.finish() +} + +/// Bitwise OR of two mask-backed selections. Longer side's tail passes through. +fn union_masks(l: &BooleanBuffer, r: &BooleanBuffer) -> BooleanBuffer { + if l.len() == r.len() { + return l | r; + } + let common = l.len().min(r.len()); + let head = &l.slice(0, common) | &r.slice(0, common); + let (longer, longer_len) = if l.len() > r.len() { + (l, l.len()) + } else { + (r, r.len()) + }; + let tail = longer.slice(common, longer_len - common); + let mut builder = BooleanBufferBuilder::new(longer_len); + builder.append_buffer(&head); + builder.append_buffer(&tail); + builder.finish() +} + +/// Applies `other` to the selected rows of `mask`, preserving the original row domain. +fn and_then_mask(mask: &BooleanBuffer, other: &RowSelection) -> BooleanBuffer { + let mut builder = BooleanBufferBuilder::new(mask.len()); + let mut other_iter = other.iter(); + let mut current = other_iter.next(); + let mut cursor = 0usize; + + // Iterate only over the set positions in `mask`; the gaps of unset bits + // are filled in bulk with `append_n` instead of bit-by-bit. + for set_idx in mask.set_indices() { + if set_idx > cursor { + builder.append_n(set_idx - cursor, false); + } + cursor = set_idx + 1; + + while current.as_ref().is_some_and(|s| s.row_count == 0) { + current = other_iter.next(); + } + let selector = current + .as_mut() + .expect("selection contains less than the number of selected rows"); + let selected = !selector.skip; + selector.row_count -= 1; + builder.append(selected); + } + if cursor < mask.len() { + builder.append_n(mask.len() - cursor, false); + } + + if current.is_some_and(|s| s.row_count != 0) || other_iter.any(|s| s.row_count != 0) { + panic!("selection exceeds the number of selected rows"); + } + + builder.finish() +} + +/// Skips the first `offset` selected rows of a mask-backed selection. +fn offset_mask(mask: BooleanBuffer, offset: usize) -> BooleanBuffer { + let popcount = mask.count_set_bits(); + if offset >= popcount { + return BooleanBuffer::new_unset(0); + } + // Position one past the `offset`-th set bit, i.e. the index of the first + // selected row to keep. + let pos = mask.find_nth_set_bit_position(0, offset); + let mut builder = BooleanBufferBuilder::new(mask.len()); + builder.append_n(pos, false); + builder.append_buffer(&mask.slice(pos, mask.len() - pos)); + builder.finish() +} + +/// Keeps only the first `limit` selected rows of a mask-backed selection. +fn limit_mask(mask: BooleanBuffer, limit: usize) -> BooleanBuffer { + // `find_nth_set_bit_position` returns `mask.len()` when there are fewer + // than `limit` set bits, so the slice naturally degrades to the original + // mask in that case. + let cut = mask.find_nth_set_bit_position(0, limit); + mask.slice(0, cut) } impl RowSelection { + fn from_selectors(selectors: Vec) -> Self { + Self { + inner: RowSelectionInner::Selectors(selectors), + } + } + + /// Create a [`RowSelection`] from a packed [`BooleanBuffer`]. + /// + /// Each set bit selects a row, each unset bit skips one. Unlike + /// [`Self::from_filters`], the bitmap is kept as-is rather than + /// eagerly run-length-encoded; [`Self::iter`] streams the RLE form + /// directly off the bitmap. + pub fn from_boolean_buffer(mask: BooleanBuffer) -> Self { + Self { + inner: RowSelectionInner::Mask(mask), + } + } + + /// Returns the underlying mask if this selection is mask-backed. + /// + /// Public so that engines composing selections (e.g. DataFusion's + /// `ParquetAccessPlan::into_overall_row_selection`) can concatenate + /// mask-backed selections without materialising the RLE form. + pub fn as_mask(&self) -> Option<&BooleanBuffer> { + match &self.inner { + RowSelectionInner::Mask(m) => Some(m), + _ => None, + } + } + + /// Consume the selection and return its internal storage. + pub(crate) fn into_inner(self) -> RowSelectionInner { + self.inner + } + + #[cfg(test)] + fn selectors(&self) -> Vec { + self.iter().collect() + } + + fn into_selectors_vec(self) -> Vec { + match self.inner { + RowSelectionInner::Selectors(s) => s, + RowSelectionInner::Mask(m) => mask_to_selectors(&m), + } + } + + /// Promote a mask-backed selection to selector backing in place. + fn selectors_mut(&mut self) -> &mut Vec { + if let RowSelectionInner::Mask(_) = &self.inner { + let mask = match std::mem::take(&mut self.inner) { + RowSelectionInner::Mask(m) => m, + RowSelectionInner::Selectors(_) => unreachable!(), + }; + self.inner = RowSelectionInner::Selectors(mask_to_selectors(&mask)); + } + match &mut self.inner { + RowSelectionInner::Selectors(s) => s, + RowSelectionInner::Mask(_) => unreachable!(), + } + } + /// Creates a [`RowSelection`] from a slice of [`BooleanArray`] /// /// # Panic @@ -191,7 +532,7 @@ impl RowSelection { selectors.push(RowSelector::skip(total_rows - last_end)) } - Self { selectors } + Self::from_selectors(selectors) } /// Given an offset index, return the byte ranges for all data pages selected by `self` @@ -206,7 +547,7 @@ impl RowSelection { let mut row_offset = 0; let mut pages = page_locations.iter().peekable(); - let mut selectors = self.selectors.iter().cloned(); + let mut selectors = self.iter(); let mut current_selector = selectors.next(); let mut current_page = pages.next(); @@ -285,10 +626,29 @@ impl RowSelection { /// Splits off the first `row_count` from this [`RowSelection`] pub fn split_off(&mut self, row_count: usize) -> Self { + if matches!(&self.inner, RowSelectionInner::Mask(_)) { + let mask = match std::mem::take(&mut self.inner) { + RowSelectionInner::Mask(m) => m, + RowSelectionInner::Selectors(_) => unreachable!(), + }; + let total = mask.len(); + if row_count >= total { + // Whole selection moves into the returned head; leave `self` as + // an empty mask so the backing is preserved. + self.inner = RowSelectionInner::Mask(BooleanBuffer::new_unset(0)); + return Self::from_boolean_buffer(mask); + } + let head = mask.slice(0, row_count); + let tail = mask.slice(row_count, total - row_count); + self.inner = RowSelectionInner::Mask(tail); + return Self::from_boolean_buffer(head); + } + + let selectors = self.selectors_mut(); let mut total_count = 0; // Find the index where the selector exceeds the row count - let find = self.selectors.iter().position(|selector| { + let find = selectors.iter().position(|selector| { total_count += selector.row_count; total_count > row_count }); @@ -296,29 +656,27 @@ impl RowSelection { let split_idx = match find { Some(idx) => idx, None => { - let selectors = std::mem::take(&mut self.selectors); - return Self { selectors }; + let drained = std::mem::take(selectors); + return Self::from_selectors(drained); } }; - let mut remaining = self.selectors.split_off(split_idx); + let mut remaining = selectors.split_off(split_idx); - // Always present as `split_idx < self.selectors.len` + // Always present as `split_idx < selectors.len` let next = remaining.first_mut().unwrap(); let overflow = total_count - row_count; if next.row_count != overflow { - self.selectors.push(RowSelector { + selectors.push(RowSelector { row_count: next.row_count - overflow, skip: next.skip, }) } next.row_count = overflow; - std::mem::swap(&mut remaining, &mut self.selectors); - Self { - selectors: remaining, - } + std::mem::swap(&mut remaining, selectors); + Self::from_selectors(remaining) } /// returns a [`RowSelection`] representing rows that are selected in both /// input [`RowSelection`]s. @@ -343,9 +701,13 @@ impl RowSelection { /// by this RowSelection /// pub fn and_then(&self, other: &Self) -> Self { + if let Some(mask) = self.as_mask() { + return Self::from_boolean_buffer(and_then_mask(mask, other)); + } + let mut selectors = vec![]; - let mut first = self.selectors.iter().cloned().peekable(); - let mut second = other.selectors.iter().cloned().peekable(); + let mut first = self.iter().peekable(); + let mut second = other.iter().peekable(); let mut to_skip = 0; while let Some(b) = second.peek_mut() { @@ -402,7 +764,7 @@ impl RowSelection { selectors.push(RowSelector::skip(to_skip)); } - Self { selectors } + Self::from_selectors(selectors) } /// Compute the intersection of two [`RowSelection`] @@ -412,7 +774,12 @@ impl RowSelection { /// /// returned: NNNNNNNNYYNYN pub fn intersection(&self, other: &Self) -> Self { - intersect_row_selections(&self.selectors, &other.selectors) + if let (Some(l), Some(r)) = (self.as_mask(), other.as_mask()) { + return Self::from_boolean_buffer(intersect_masks(l, r)); + } + let l = self.materialize_for_combine(); + let r = other.materialize_for_combine(); + intersect_row_selections(&l, &r) } /// Compute the union of two [`RowSelection`] @@ -422,96 +789,151 @@ impl RowSelection { /// /// returned: NYYYYYNNYYNYN pub fn union(&self, other: &Self) -> Self { - union_row_selections(&self.selectors, &other.selectors) + if let (Some(l), Some(r)) = (self.as_mask(), other.as_mask()) { + return Self::from_boolean_buffer(union_masks(l, r)); + } + let l = self.materialize_for_combine(); + let r = other.materialize_for_combine(); + union_row_selections(&l, &r) + } + + fn materialize_for_combine(&self) -> Vec { + match &self.inner { + RowSelectionInner::Selectors(s) => s.clone(), + RowSelectionInner::Mask(m) => mask_to_selectors(m), + } } /// Returns `true` if this [`RowSelection`] selects any rows pub fn selects_any(&self) -> bool { - self.selectors.iter().any(|x| !x.skip) + match &self.inner { + RowSelectionInner::Selectors(s) => s.iter().any(|x| !x.skip), + RowSelectionInner::Mask(m) => m.set_indices().next().is_some(), + } } /// Trims this [`RowSelection`] removing any trailing skips pub(crate) fn trim(mut self) -> Self { - while self.selectors.last().map(|x| x.skip).unwrap_or(false) { - self.selectors.pop(); + if let RowSelectionInner::Mask(m) = &self.inner { + // Position one past the last set bit (= 0 when there are none). + let popcount = m.count_set_bits(); + let new_len = if popcount == 0 { + 0 + } else { + m.find_nth_set_bit_position(0, popcount) + }; + if new_len != m.len() { + return Self { + inner: RowSelectionInner::Mask(m.slice(0, new_len)), + }; + } + return self; + } + let selectors = self.selectors_mut(); + while selectors.last().map(|x| x.skip).unwrap_or(false) { + selectors.pop(); } self } /// Applies an offset to this [`RowSelection`], skipping the first `offset` selected rows - pub(crate) fn offset(mut self, offset: usize) -> Self { + pub(crate) fn offset(self, offset: usize) -> Self { if offset == 0 { return self; } + let mut selectors = match self.inner { + RowSelectionInner::Mask(mask) => { + return Self::from_boolean_buffer(offset_mask(mask, offset)); + } + RowSelectionInner::Selectors(selectors) => selectors, + }; let mut selected_count = 0; let mut skipped_count = 0; // Find the index where the selector exceeds the row count - let find = self - .selectors - .iter() - .position(|selector| match selector.skip { - true => { - skipped_count += selector.row_count; - false - } - false => { - selected_count += selector.row_count; - selected_count > offset - } - }); + let find = selectors.iter().position(|selector| match selector.skip { + true => { + skipped_count += selector.row_count; + false + } + false => { + selected_count += selector.row_count; + selected_count > offset + } + }); let split_idx = match find { Some(idx) => idx, None => { - self.selectors.clear(); - return self; + selectors.clear(); + return Self::from_selectors(selectors); } }; - let mut selectors = Vec::with_capacity(self.selectors.len() - split_idx + 1); - selectors.push(RowSelector::skip(skipped_count + offset)); - selectors.push(RowSelector::select(selected_count - offset)); - selectors.extend_from_slice(&self.selectors[split_idx + 1..]); + let mut new_selectors = Vec::with_capacity(selectors.len() - split_idx + 1); + new_selectors.push(RowSelector::skip(skipped_count + offset)); + new_selectors.push(RowSelector::select(selected_count - offset)); + new_selectors.extend_from_slice(&selectors[split_idx + 1..]); - Self { selectors } + Self::from_selectors(new_selectors) } /// Limit this [`RowSelection`] to only select `limit` rows - pub(crate) fn limit(mut self, mut limit: usize) -> Self { + pub(crate) fn limit(self, mut limit: usize) -> Self { + let mut selectors = match self.inner { + RowSelectionInner::Mask(mask) => { + return Self::from_boolean_buffer(limit_mask(mask, limit)); + } + RowSelectionInner::Selectors(selectors) => selectors, + }; if limit == 0 { - self.selectors.clear(); + selectors.clear(); } - for (idx, selection) in self.selectors.iter_mut().enumerate() { + for (idx, selection) in selectors.iter_mut().enumerate() { if !selection.skip { if selection.row_count >= limit { selection.row_count = limit; - self.selectors.truncate(idx + 1); + selectors.truncate(idx + 1); break; } else { limit -= selection.row_count; } } } - self + Self::from_selectors(selectors) } - /// Returns an iterator over the [`RowSelector`]s for this - /// [`RowSelection`]. - pub fn iter(&self) -> impl Iterator { - self.selectors.iter() + /// Returns an iterator yielding the [`RowSelector`]s for this selection. + /// + /// Mask-backed selections stream the RLE form off the bitmap without + /// allocation. + pub fn iter(&self) -> RowSelectionIter<'_> { + match &self.inner { + RowSelectionInner::Selectors(s) => RowSelectionIter::Selectors(s.iter().copied()), + RowSelectionInner::Mask(m) => RowSelectionIter::Mask(MaskRunIter::new(m)), + } } /// Returns the number of selected rows pub fn row_count(&self) -> usize { - self.iter().filter(|s| !s.skip).map(|s| s.row_count).sum() + match &self.inner { + RowSelectionInner::Selectors(s) => { + s.iter().filter(|x| !x.skip).map(|x| x.row_count).sum() + } + RowSelectionInner::Mask(m) => m.count_set_bits(), + } } /// Returns the number of de-selected rows pub fn skipped_row_count(&self) -> usize { - self.iter().filter(|s| s.skip).map(|s| s.row_count).sum() + match &self.inner { + RowSelectionInner::Selectors(s) => { + s.iter().filter(|x| x.skip).map(|x| x.row_count).sum() + } + RowSelectionInner::Mask(m) => m.len() - m.count_set_bits(), + } } /// Expands the selection to align with batch boundaries. @@ -525,7 +947,7 @@ impl RowSelection { let mut expanded_ranges = Vec::new(); let mut row_offset = 0; - for selector in &self.selectors { + for selector in self.iter() { if selector.skip { row_offset += selector.row_count; } else { @@ -573,6 +995,12 @@ impl From> for RowSelection { } } +impl From for RowSelection { + fn from(mask: BooleanBuffer) -> Self { + Self::from_boolean_buffer(mask) + } +} + impl FromIterator for RowSelection { fn from_iter>(iter: T) -> Self { let iter = iter.into_iter(); @@ -599,19 +1027,57 @@ impl FromIterator for RowSelection { } } - Self { selectors } + Self::from_selectors(selectors) } } impl From for Vec { fn from(r: RowSelection) -> Self { - r.selectors + r.into_selectors_vec() } } impl From for VecDeque { fn from(r: RowSelection) -> Self { - r.selectors.into() + r.into_selectors_vec().into() + } +} + +impl FromIterator for RowSelection { + /// Concatenate multiple [`RowSelection`]s in iterator order. + /// + /// When every input is mask-backed the result stays mask-backed + /// (`BooleanBuffer`s are appended); otherwise falls back to flattening + /// through the per-`RowSelector` form. + fn from_iter>(iter: T) -> Self { + let items: Vec = iter.into_iter().collect(); + + let all_mask = items + .iter() + .all(|s| matches!(&s.inner, RowSelectionInner::Mask(_))); + + if all_mask { + let total_len: usize = items + .iter() + .map(|s| match &s.inner { + RowSelectionInner::Mask(m) => m.len(), + RowSelectionInner::Selectors(_) => unreachable!(), + }) + .sum(); + let mut builder = BooleanBufferBuilder::new(total_len); + for item in items { + match item.into_inner() { + RowSelectionInner::Mask(m) => builder.append_buffer(&m), + RowSelectionInner::Selectors(_) => unreachable!(), + } + } + return Self::from_boolean_buffer(builder.finish()); + } + + items + .into_iter() + .flat_map(|s| s.into_selectors_vec()) + .collect() } } @@ -889,7 +1355,7 @@ pub struct MaskChunk { /// [`ReadPlan`](crate::arrow::arrow_reader::ReadPlan). /// /// This keeps per-reader state such as the current position and delegates the -/// actual storage strategy to the internal `RowSelectionBacking`. +/// actual storage strategy to the internal `RowSelectionInner`. #[derive(Debug)] pub enum RowSelectionCursor { /// Reading all rows @@ -909,6 +1375,11 @@ impl RowSelectionCursor { }) } + /// Create a [`MaskCursor`] cursor backed by an existing bitmask. + pub(crate) fn new_mask_from_buffer(mask: BooleanBuffer) -> Self { + Self::Mask(MaskCursor { mask, position: 0 }) + } + /// Create a [`RowSelectionCursor::Selectors`] from the provided selectors pub(crate) fn new_selectors(selectors: Vec) -> Self { Self::Selectors(SelectorsCursor { @@ -949,14 +1420,14 @@ mod tests { let selection = RowSelection::from_filters(&filters[..1]); assert!(selection.selects_any()); assert_eq!( - selection.selectors, + selection.selectors(), vec![RowSelector::skip(3), RowSelector::select(4)] ); let selection = RowSelection::from_filters(&filters[..2]); assert!(selection.selects_any()); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(3), RowSelector::select(6), @@ -968,7 +1439,7 @@ mod tests { let selection = RowSelection::from_filters(&filters); assert!(selection.selects_any()); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(3), RowSelector::select(6), @@ -980,7 +1451,7 @@ mod tests { let selection = RowSelection::from_filters(&filters[2..3]); assert!(!selection.selects_any()); - assert_eq!(selection.selectors, vec![RowSelector::skip(4)]); + assert_eq!(selection.selectors(), vec![RowSelector::skip(4)]); } #[test] @@ -993,9 +1464,9 @@ mod tests { ]); let split = selection.split_off(34); - assert_eq!(split.selectors, vec![RowSelector::skip(34)]); + assert_eq!(split.selectors(), vec![RowSelector::skip(34)]); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::select(12), RowSelector::skip(3), @@ -1004,9 +1475,9 @@ mod tests { ); let split = selection.split_off(5); - assert_eq!(split.selectors, vec![RowSelector::select(5)]); + assert_eq!(split.selectors(), vec![RowSelector::select(5)]); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::select(7), RowSelector::skip(3), @@ -1016,20 +1487,20 @@ mod tests { let split = selection.split_off(8); assert_eq!( - split.selectors, + split.selectors(), vec![RowSelector::select(7), RowSelector::skip(1)] ); assert_eq!( - selection.selectors, + selection.selectors(), vec![RowSelector::skip(2), RowSelector::select(35)] ); let split = selection.split_off(200); assert_eq!( - split.selectors, + split.selectors(), vec![RowSelector::skip(2), RowSelector::select(35)] ); - assert!(selection.selectors.is_empty()); + assert!(selection.selectors().is_empty()); } #[test] @@ -1044,7 +1515,7 @@ mod tests { let selection = selection.offset(2); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(2), RowSelector::select(3), @@ -1057,7 +1528,7 @@ mod tests { let selection = selection.offset(5); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(30), RowSelector::select(5), @@ -1068,7 +1539,7 @@ mod tests { let selection = selection.offset(3); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(33), RowSelector::select(2), @@ -1079,13 +1550,13 @@ mod tests { let selection = selection.offset(2); assert_eq!( - selection.selectors, + selection.selectors(), vec![RowSelector::skip(68), RowSelector::select(6),] ); let selection = selection.offset(3); assert_eq!( - selection.selectors, + selection.selectors(), vec![RowSelector::skip(71), RowSelector::select(3),] ); } @@ -1132,7 +1603,7 @@ mod tests { ]); assert_eq!( - a.and_then(&b).selectors, + a.and_then(&b).selectors(), vec![ RowSelector::select(2), RowSelector::skip(1), @@ -1185,30 +1656,30 @@ mod tests { fn test_combine_2elements() { let a = vec![RowSelector::select(10), RowSelector::select(5)]; let a_expect = vec![RowSelector::select(15)]; - assert_eq!(RowSelection::from_iter(a).selectors, a_expect); + assert_eq!(RowSelection::from_iter(a).selectors(), a_expect); let b = vec![RowSelector::select(10), RowSelector::skip(5)]; let b_expect = vec![RowSelector::select(10), RowSelector::skip(5)]; - assert_eq!(RowSelection::from_iter(b).selectors, b_expect); + assert_eq!(RowSelection::from_iter(b).selectors(), b_expect); let c = vec![RowSelector::skip(10), RowSelector::select(5)]; let c_expect = vec![RowSelector::skip(10), RowSelector::select(5)]; - assert_eq!(RowSelection::from_iter(c).selectors, c_expect); + assert_eq!(RowSelection::from_iter(c).selectors(), c_expect); let d = vec![RowSelector::skip(10), RowSelector::skip(5)]; let d_expect = vec![RowSelector::skip(15)]; - assert_eq!(RowSelection::from_iter(d).selectors, d_expect); + assert_eq!(RowSelection::from_iter(d).selectors(), d_expect); } #[test] fn test_from_one_and_empty() { let a = vec![RowSelector::select(10)]; let selection1 = RowSelection::from(a.clone()); - assert_eq!(selection1.selectors, a); + assert_eq!(selection1.selectors(), a); let b = vec![]; let selection1 = RowSelection::from(b.clone()); - assert_eq!(selection1.selectors, b) + assert_eq!(selection1.selectors(), b) } #[test] @@ -1253,7 +1724,7 @@ mod tests { let res = intersect_row_selections(&a, &b); assert_eq!( - res.selectors, + res.selectors(), vec![ RowSelector::select(5), RowSelector::skip(4), @@ -1271,7 +1742,7 @@ mod tests { let b = vec![RowSelector::select(36), RowSelector::skip(36)]; let res = intersect_row_selections(&a, &b); assert_eq!( - res.selectors, + res.selectors(), vec![RowSelector::select(3), RowSelector::skip(69)] ); @@ -1286,7 +1757,7 @@ mod tests { ]; let res = intersect_row_selections(&a, &b); assert_eq!( - res.selectors, + res.selectors(), vec![RowSelector::select(2), RowSelector::skip(8)] ); @@ -1300,7 +1771,7 @@ mod tests { ]; let res = intersect_row_selections(&a, &b); assert_eq!( - res.selectors, + res.selectors(), vec![RowSelector::select(2), RowSelector::skip(8)] ); } @@ -1328,7 +1799,7 @@ mod tests { let expected = RowSelection::from_filters(&[BooleanArray::from(expected_bools)]); - let total_rows: usize = expected.selectors.iter().map(|s| s.row_count).sum(); + let total_rows: usize = expected.selectors().iter().map(|s| s.row_count).sum(); assert_eq!(a_len, total_rows); assert_eq!(a.and_then(&b), expected); @@ -1345,10 +1816,8 @@ mod tests { RowSelector::select(4), ]; - let round_tripped = RowSelection::from(selectors.clone()) - .iter() - .cloned() - .collect::>(); + let round_tripped: Vec = + RowSelection::from(selectors.clone()).iter().collect(); assert_eq!(selectors, round_tripped); } @@ -1369,7 +1838,7 @@ mod tests { let limited = selection.clone().limit(5); let expected = vec![RowSelector::select(5)]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); let limited = selection.clone().limit(15); let expected = vec![ @@ -1377,11 +1846,11 @@ mod tests { RowSelector::skip(10), RowSelector::select(5), ]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); let limited = selection.clone().limit(0); let expected = vec![]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); let limited = selection.clone().limit(30); let expected = vec![ @@ -1391,7 +1860,7 @@ mod tests { RowSelector::skip(10), RowSelector::select(10), ]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); let limited = selection.limit(100); let expected = vec![ @@ -1401,7 +1870,7 @@ mod tests { RowSelector::skip(10), RowSelector::select(10), ]; - assert_eq!(limited.selectors, expected); + assert_eq!(limited.selectors(), expected); } #[test] @@ -1542,7 +2011,7 @@ mod tests { let ranges = [1..3, 4..6, 6..6, 8..8, 9..10]; let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), 10); assert_eq!( - selection.selectors, + selection.selectors(), vec![ RowSelector::skip(1), RowSelector::select(2), @@ -1568,7 +2037,7 @@ mod tests { RowSelector::skip(0), RowSelector::select(2), ]); - assert_eq!(selection.selectors, vec![RowSelector::select(4)]); + assert_eq!(selection.selectors(), vec![RowSelector::select(4)]); let selection = RowSelection::from(vec![ RowSelector::select(0), @@ -1576,7 +2045,7 @@ mod tests { RowSelector::select(0), RowSelector::skip(2), ]); - assert_eq!(selection.selectors, vec![RowSelector::skip(4)]); + assert_eq!(selection.selectors(), vec![RowSelector::skip(4)]); } #[test] @@ -1600,7 +2069,7 @@ mod tests { let result = a.intersection(&b); assert_eq!( - result.selectors, + result.selectors(), vec![ RowSelector::skip(30), RowSelector::select(10), @@ -1638,9 +2107,9 @@ mod tests { assert_eq!( result.iter().collect::>(), vec![ - &RowSelector::skip(10), - &RowSelector::select(50), - &RowSelector::skip(10), + RowSelector::skip(10), + RowSelector::select(50), + RowSelector::skip(10), ] ); } @@ -1689,7 +2158,7 @@ mod tests { RowSelector::select(35), ]; - assert_eq!(selection.trim().selectors, expected); + assert_eq!(selection.trim().selectors(), expected); let selection = RowSelection::from(vec![ RowSelector::skip(34), @@ -1699,6 +2168,350 @@ mod tests { let expected = vec![RowSelector::skip(34), RowSelector::select(12)]; - assert_eq!(selection.trim().selectors, expected); + assert_eq!(selection.trim().selectors(), expected); + } + + #[test] + fn test_from_boolean_buffer() { + let bits = vec![ + false, false, true, true, false, true, false, false, true, false, false, false, false, + false, false, true, + ]; + let buf = BooleanBuffer::from(bits.clone()); + let selection = RowSelection::from_boolean_buffer(buf.clone()); + + assert!(selection.as_mask().is_some()); + assert_eq!(selection.row_count(), 5); + assert_eq!(selection.skipped_row_count(), 11); + assert!(selection.selects_any()); + + let from_filters = RowSelection::from_filters(&[BooleanArray::from(bits)]); + assert_eq!(selection, from_filters); + + let bits_tail = vec![true, false, true, false, false, false]; + let trimmed = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits_tail)).trim(); + assert!(trimmed.as_mask().is_some()); + assert_eq!(trimmed.as_mask().unwrap().len(), 3); + } + + #[test] + fn test_from_boolean_buffer_empty() { + let empty = RowSelection::from_boolean_buffer(BooleanBuffer::from(Vec::::new())); + assert!(empty.as_mask().is_some()); + assert_eq!(empty.row_count(), 0); + assert_eq!(empty.skipped_row_count(), 0); + assert!(!empty.selects_any()); + assert!(empty.selectors().is_empty()); + } + + #[test] + fn test_from_boolean_buffer_all_unset_does_not_select() { + let all_zero = RowSelection::from_boolean_buffer(BooleanBuffer::new_unset(1024)); + assert!(all_zero.as_mask().is_some()); + assert!(!all_zero.selects_any()); + assert_eq!(all_zero.row_count(), 0); + assert_eq!(all_zero.skipped_row_count(), 1024); + } + + #[test] + fn test_from_boolean_buffer_via_from_impl() { + let buf = BooleanBuffer::from(vec![true, false, true, true]); + let a = RowSelection::from(buf.clone()); + let b = RowSelection::from_boolean_buffer(buf); + assert_eq!(a, b); + assert!(a.as_mask().is_some()); + } + + #[test] + fn test_mask_backing_clone_preserves_backing() { + let buf = BooleanBuffer::from(vec![true, false, true]); + let original = RowSelection::from_boolean_buffer(buf); + let cloned = original.clone(); + assert!(cloned.as_mask().is_some()); + assert_eq!(original, cloned); + } + + #[test] + fn test_mask_backing_mutation_equivalence() { + let bits = vec![true, true, false, false, true, false, true, true]; + + let from_mask = { + let mut s = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())); + let split = s.split_off(3); + (split, s) + }; + let from_selectors = { + let mut s = RowSelection::from_filters(&[BooleanArray::from(bits.clone())]); + let split = s.split_off(3); + (split, s) + }; + assert_eq!(from_mask.0, from_selectors.0); + assert_eq!(from_mask.1, from_selectors.1); + assert!(from_mask.0.as_mask().is_some()); + assert!(from_mask.1.as_mask().is_some()); + + let limited_mask = + RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())).limit(3); + let limited_sel = RowSelection::from_filters(&[BooleanArray::from(bits.clone())]).limit(3); + assert!(limited_mask.as_mask().is_some()); + assert_eq!(limited_mask, limited_sel); + + let offset_mask = + RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())).offset(2); + let offset_sel = RowSelection::from_filters(&[BooleanArray::from(bits)]).offset(2); + assert!(offset_mask.as_mask().is_some()); + assert_eq!(offset_mask, offset_sel); + } + + #[test] + fn test_mask_backing_fuzz_equivalence() { + let mut rand = rng(); + for _ in 0..100 { + let len = rand.random_range(0..200); + let bits: Vec<_> = (0..len).map(|_| rand.random_bool(0.35)).collect(); + + let from_mask = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())); + let from_filters = RowSelection::from_filters(&[BooleanArray::from(bits.clone())]); + + assert_eq!(from_mask, from_filters); + assert_eq!(from_mask.row_count(), from_filters.row_count()); + assert_eq!( + from_mask.skipped_row_count(), + from_filters.skipped_row_count() + ); + assert_eq!(from_mask.selects_any(), from_filters.selects_any()); + + let inner_len: usize = bits.iter().map(|b| *b as usize).sum(); + let inner_bits: Vec<_> = (0..inner_len).map(|_| rand.random_bool(0.7)).collect(); + let inner = RowSelection::from_filters(&[BooleanArray::from(inner_bits)]); + let and_then_mask = from_mask.and_then(&inner); + assert!(and_then_mask.as_mask().is_some()); + assert_eq!(and_then_mask, from_filters.and_then(&inner)); + } + } + + #[test] + fn test_mask_and_then_preserves_backing() { + let outer_bits = vec![false, true, true, false, true, false, true]; + let inner_bits = vec![true, false, true, false]; + let outer_mask = RowSelection::from_boolean_buffer(BooleanBuffer::from(outer_bits.clone())); + let inner = RowSelection::from_filters(&[BooleanArray::from(inner_bits.clone())]); + + let result = outer_mask.and_then(&inner); + assert!(result.as_mask().is_some()); + + let outer_selectors = RowSelection::from_filters(&[BooleanArray::from(outer_bits)]); + let expected = outer_selectors.and_then(&inner); + assert_eq!(result, expected); + + let result_mask = result.as_mask().unwrap(); + let actual_bits: Vec<_> = (0..result_mask.len()) + .map(|i| result_mask.value(i)) + .collect(); + assert_eq!( + actual_bits, + vec![false, true, false, false, true, false, false] + ); + } + + #[test] + fn test_mask_offset_past_end_preserves_empty_mask_backing() { + let selection = + RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![true, false, true])) + .offset(2); + + assert!(selection.as_mask().is_some()); + assert_eq!(selection.as_mask().unwrap().len(), 0); + assert_eq!(selection.row_count(), 0); + assert_eq!(selection.skipped_row_count(), 0); + } + + #[test] + fn test_mask_limit_truncates_at_nth_selected_row() { + let selection = RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![ + false, true, false, true, false, true, false, + ])) + .limit(2); + + let mask = selection.as_mask().unwrap(); + assert_eq!(mask.len(), 4); + let actual_bits: Vec<_> = (0..mask.len()).map(|i| mask.value(i)).collect(); + assert_eq!(actual_bits, vec![false, true, false, true]); + } + + #[test] + fn test_mask_intersection_uses_bitwise() { + let a_bits = vec![true, true, false, true, false, true]; + let b_bits = vec![true, false, true, true, true, false]; + let a = RowSelection::from_boolean_buffer(BooleanBuffer::from(a_bits.clone())); + let b = RowSelection::from_boolean_buffer(BooleanBuffer::from(b_bits.clone())); + + let r = a.intersection(&b); + assert!(r.as_mask().is_some()); + + let expected: Vec = a_bits.iter().zip(&b_bits).map(|(x, y)| *x && *y).collect(); + let expected_sel = RowSelection::from_filters(&[BooleanArray::from(expected)]); + assert_eq!(r, expected_sel); + } + + #[test] + fn test_mask_union_uses_bitwise() { + let a_bits = vec![true, false, false, true, false, false]; + let b_bits = vec![false, true, false, false, true, false]; + let a = RowSelection::from_boolean_buffer(BooleanBuffer::from(a_bits.clone())); + let b = RowSelection::from_boolean_buffer(BooleanBuffer::from(b_bits.clone())); + + let r = a.union(&b); + assert!(r.as_mask().is_some()); + + let expected: Vec = a_bits.iter().zip(&b_bits).map(|(x, y)| *x || *y).collect(); + let expected_sel = RowSelection::from_filters(&[BooleanArray::from(expected)]); + assert_eq!(r, expected_sel); + } + + #[test] + fn test_mask_intersection_uneven_passes_tail_through() { + let a_bits = vec![true, true, true, true, true]; + let b_bits = vec![true, false, true]; + let a = RowSelection::from_boolean_buffer(BooleanBuffer::from(a_bits)); + let b = RowSelection::from_boolean_buffer(BooleanBuffer::from(b_bits)); + + let r = a.intersection(&b); + let r_mask = r.as_mask().unwrap(); + assert_eq!(r_mask.len(), 5); + let bits: Vec = (0..5).map(|i| r_mask.value(i)).collect(); + assert_eq!(bits, vec![true, false, true, true, true]); + } + + #[test] + fn test_mask_union_uneven_passes_tail_through() { + let a_bits = vec![true, false, true]; + let b_bits = vec![false, true, false, true, false]; + let a = RowSelection::from_boolean_buffer(BooleanBuffer::from(a_bits)); + let b = RowSelection::from_boolean_buffer(BooleanBuffer::from(b_bits)); + + let r = a.union(&b); + let r_mask = r.as_mask().unwrap(); + assert_eq!(r_mask.len(), 5); + let bits: Vec = (0..5).map(|i| r_mask.value(i)).collect(); + assert_eq!(bits, vec![true, true, true, true, false]); + } + + #[test] + fn test_mask_split_off_preserves_backing() { + let bits: Vec = (0..40).map(|i| i % 3 == 0).collect(); + let mut s = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())); + let head = s.split_off(15); + + assert!(head.as_mask().is_some()); + assert!(s.as_mask().is_some()); + + let head_sel = RowSelection::from_filters(&[BooleanArray::from(bits[..15].to_vec())]); + let tail_sel = RowSelection::from_filters(&[BooleanArray::from(bits[15..].to_vec())]); + assert_eq!(head, head_sel); + assert_eq!(s, tail_sel); + } + + #[test] + fn test_mask_split_off_past_end_returns_whole() { + let bits = vec![true, false, true]; + let mut s = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())); + let head = s.split_off(100); + + assert!(head.as_mask().is_some()); + assert_eq!(head.as_mask().unwrap().len(), 3); + // `self` keeps its mask backing and is left empty. + assert!(s.as_mask().is_some()); + assert_eq!(s.as_mask().unwrap().len(), 0); + assert_eq!(s.row_count(), 0); + assert_eq!(s.skipped_row_count(), 0); + } + + #[test] + fn test_mask_offset_exceeds_selected_returns_empty() { + let s = + RowSelection::from_boolean_buffer(BooleanBuffer::from(vec![true, true, false, true])); + let r = s.offset(10); + assert_eq!(r.row_count(), 0); + assert_eq!(r.skipped_row_count(), 0); + + let from_selectors = + RowSelection::from_filters(&[BooleanArray::from(vec![true, true, false, true])]) + .offset(10); + assert_eq!(r, from_selectors); + } + + #[test] + fn test_mask_limit_exceeds_selected_returns_all() { + let bits = vec![true, true, false, true]; + let s = RowSelection::from_boolean_buffer(BooleanBuffer::from(bits.clone())); + let r = s.limit(10); + assert_eq!(r.row_count(), 3); + + let from_selectors = RowSelection::from_filters(&[BooleanArray::from(bits)]).limit(10); + assert_eq!(r, from_selectors); + } + + #[test] + fn test_mask_trim_all_zero_collapses_to_empty() { + let s = RowSelection::from_boolean_buffer(BooleanBuffer::new_unset(128)); + let trimmed = s.trim(); + assert!(trimmed.as_mask().is_some()); + assert_eq!(trimmed.as_mask().unwrap().len(), 0); + } + + #[test] + fn test_from_iter_all_mask_preserves_mask_backing() { + let a_bits = vec![true, false, true, true]; + let b_bits = vec![false, true, false]; + let c_bits = vec![true, true, false, false, true]; + + let parts = vec![ + RowSelection::from_boolean_buffer(BooleanBuffer::from(a_bits.clone())), + RowSelection::from_boolean_buffer(BooleanBuffer::from(b_bits.clone())), + RowSelection::from_boolean_buffer(BooleanBuffer::from(c_bits.clone())), + ]; + let collected: RowSelection = parts.into_iter().collect(); + + let combined = a_bits + .iter() + .chain(b_bits.iter()) + .chain(c_bits.iter()) + .copied() + .collect::>(); + let expected = RowSelection::from_filters(&[BooleanArray::from(combined)]); + + assert!(collected.as_mask().is_some()); + assert_eq!(collected, expected); + } + + #[test] + fn test_from_iter_mixed_backing_falls_back_to_selectors() { + let a_bits = vec![true, false, true]; + let b_selectors = vec![RowSelector::skip(2), RowSelector::select(3)]; + let c_bits = vec![false, true]; + + let parts = vec![ + RowSelection::from_boolean_buffer(BooleanBuffer::from(a_bits.clone())), + RowSelection::from(b_selectors), + RowSelection::from_boolean_buffer(BooleanBuffer::from(c_bits.clone())), + ]; + let collected: RowSelection = parts.into_iter().collect(); + + assert!(collected.as_mask().is_none()); + + let combined_bits = vec![ + true, false, true, false, false, true, true, true, false, true, + ]; + let expected = RowSelection::from_filters(&[BooleanArray::from(combined_bits)]); + assert_eq!(collected, expected); + } + + #[test] + fn test_from_iter_empty_yields_empty_selection() { + let collected: RowSelection = std::iter::empty::().collect(); + assert_eq!(collected, RowSelection::default()); + assert!(collected.as_mask().is_some()); + assert_eq!(collected.as_mask().unwrap().len(), 0); } } diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index dacf1a2caad9..ffcb64a1f779 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -916,7 +916,7 @@ mod tests { #[test] // Verify that the size of RowGroupDecoderState does not grow too large fn test_structure_size() { - assert_eq!(std::mem::size_of::(), 232); + assert_eq!(std::mem::size_of::(), 256); } #[test]