From 5b7a95665d3a14c4403027066188215a5ccafb0b Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 17 Jan 2026 14:33:24 -0500 Subject: [PATCH] Streaming columnar.rs example --- timely/examples/columnar.rs | 119 +++++------------------------------- 1 file changed, 16 insertions(+), 103 deletions(-) diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index 834f82cde..c677c28b9 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -129,66 +129,18 @@ fn main() { pub use container::Column; mod container { - /// A container based on a columnar store, encoded in aligned bytes. - pub enum Column { - /// The typed variant of the container. - Typed(C), - /// The binary variant of the container. - Bytes(timely_bytes::arc::Bytes), - /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason. - /// - /// Reasons could include misalignment, cloning of data, or wanting - /// to release the `Bytes` as a scarce resource. - Align(Box<[u64]>), - } + use columnar::bytes::stash::Stash; - impl Default for Column { - fn default() -> Self { Self::Typed(Default::default()) } - } + #[derive(Clone, Default)] + pub struct Column { pub stash: Stash } - // The clone implementation moves out of the `Bytes` variant into `Align`. - // This is optional and non-optimal, as the bytes clone is relatively free. - // But, we don't want to leak the uses of `Bytes`, is why we do this I think. - impl Clone for Column where C: Clone { - fn clone(&self) -> Self { - match self { - Column::Typed(t) => Column::Typed(t.clone()), - Column::Bytes(b) => { - assert!(b.len() % 8 == 0); - let mut alloc: Vec = vec![0; b.len() / 8]; - bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&b[..]); - Self::Align(alloc.into()) - }, - Column::Align(a) => Column::Align(a.clone()), - } - } - fn clone_from(&mut self, other: &Self) { - match (self, other) { - (Column::Typed(t0), Column::Typed(t1)) => { - // Derived `Clone` implementations for e.g. tuples cannot be relied on to call `clone_from`. - let t1 = t1.borrow(); - t0.clear(); - t0.extend_from_self(t1, 0..t1.len()); - } - (Column::Align(a0), Column::Align(a1)) => { a0.clone_from(a1); } - (x, y) => { *x = y.clone(); } - } - } - } - - use columnar::{Len, Index, FromBytes}; + use columnar::{Len, Index}; use columnar::bytes::{EncodeDecode, Indexed}; use columnar::common::IterOwn; impl Column { /// Borrows the contents no matter their representation. - #[inline(always)] pub fn borrow(&self) -> C::Borrowed<'_> { - match self { - Column::Typed(t) => t.borrow(), - Column::Bytes(b) => as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))), - Column::Align(a) => as FromBytes>::from_bytes(&mut Indexed::decode(a)), - } - } + #[inline(always)] pub fn borrow(&self) -> C::Borrowed<'_> { self.stash.borrow() } } impl timely::Accountable for Column { @@ -203,65 +155,26 @@ mod container { impl timely::container::SizableContainer for Column { fn at_capacity(&self) -> bool { - match self { - Self::Typed(t) => { + match &self.stash { + Stash::Typed(t) => { let length_in_bytes = 8 * Indexed::length_in_words(&t.borrow()); length_in_bytes >= (1 << 20) }, - Self::Bytes(_) => true, - Self::Align(_) => true, + Stash::Bytes(_) => true, + Stash::Align(_) => true, } } fn ensure_capacity(&mut self, _stash: &mut Option) { } } impl timely::container::PushInto for Column where C: columnar::Push { - #[inline] - fn push_into(&mut self, item: T) { - match self { - Column::Typed(t) => t.push(item), - Column::Align(_) | Column::Bytes(_) => { - // We really oughtn't be calling this in this case. - // We could convert to owned, but need more constraints on `C`. - unimplemented!("Pushing into Column::Bytes without first clearing"); - } - } - } + #[inline] fn push_into(&mut self, item: T) { use columnar::Push; self.stash.push(item) } } impl timely::dataflow::channels::ContainerBytes for Column { - fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { - // Our expectation / hope is that `bytes` is `u64` aligned and sized. - // If the alignment is borked, we can relocate. IF the size is borked, - // not sure what we do in that case. - assert!(bytes.len() % 8 == 0); - if bytemuck::try_cast_slice::<_, u64>(&bytes).is_ok() { - Self::Bytes(bytes) - } - else { - println!("Re-locating bytes for alignment reasons"); - let mut alloc: Vec = vec![0; bytes.len() / 8]; - bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&bytes[..]); - Self::Align(alloc.into()) - } - } - - fn length_in_bytes(&self) -> usize { - match self { - // We'll need one u64 for the length, then the length rounded up to a multiple of 8. - Column::Typed(t) => 8 * Indexed::length_in_words(&t.borrow()), - Column::Bytes(b) => b.len(), - Column::Align(a) => 8 * a.len(), - } - } - - fn into_bytes(&self, writer: &mut W) { - match self { - Column::Typed(t) => { Indexed::write(writer, &t.borrow()).unwrap() }, - Column::Bytes(b) => writer.write_all(b).unwrap(), - Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(), - } - } + fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { Self { stash: bytes.into() } } + fn length_in_bytes(&self) -> usize { self.stash.length_in_bytes() } + fn into_bytes(&self, writer: &mut W) { self.stash.into_bytes(writer) } } } @@ -270,7 +183,7 @@ use builder::ColumnBuilder; mod builder { use std::collections::VecDeque; - use columnar::bytes::{EncodeDecode, Indexed}; + use columnar::bytes::{EncodeDecode, Indexed, stash::Stash}; use super::Column; /// A container builder for `Column`. @@ -294,7 +207,7 @@ mod builder { if round - words < round / 10 { let mut alloc = Vec::with_capacity(round); Indexed::encode(&mut alloc, &self.current.borrow()); - self.pending.push_back(Column::Align(alloc.into_boxed_slice())); + self.pending.push_back(Column { stash: Stash::Align(alloc.into_boxed_slice()) }); self.current.clear(); } } @@ -317,7 +230,7 @@ mod builder { #[inline] fn finish(&mut self) -> Option<&mut Self::Container> { if !self.current.is_empty() { - self.pending.push_back(Column::Typed(std::mem::take(&mut self.current))); + self.pending.push_back(Column { stash: Stash::Typed(std::mem::take(&mut self.current)) }); } self.empty = self.pending.pop_front(); self.empty.as_mut()