Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 16 additions & 103 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,66 +129,18 @@ fn main() {
pub use container::Column;
mod container {

/// A container based on a columnar store, encoded in aligned bytes.
pub enum Column<C> {
/// The typed variant of the container.
Typed(C),
/// The binary variant of the container.
Bytes(timely_bytes::arc::Bytes),
/// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
///
/// Reasons could include misalignment, cloning of data, or wanting
/// to release the `Bytes` as a scarce resource.
Align(Box<[u64]>),
}
use columnar::bytes::stash::Stash;

impl<C: Default> Default for Column<C> {
fn default() -> Self { Self::Typed(Default::default()) }
}
#[derive(Clone, Default)]
pub struct Column<C> { pub stash: Stash<C, timely_bytes::arc::Bytes> }

// The clone implementation moves out of the `Bytes` variant into `Align`.
// This is optional and non-optimal, as the bytes clone is relatively free.
// But, we don't want to leak the uses of `Bytes`, is why we do this I think.
impl<C: columnar::Container> Clone for Column<C> where C: Clone {
fn clone(&self) -> Self {
match self {
Column::Typed(t) => Column::Typed(t.clone()),
Column::Bytes(b) => {
assert!(b.len() % 8 == 0);
let mut alloc: Vec<u64> = vec![0; b.len() / 8];
bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&b[..]);
Self::Align(alloc.into())
},
Column::Align(a) => Column::Align(a.clone()),
}
}
fn clone_from(&mut self, other: &Self) {
match (self, other) {
(Column::Typed(t0), Column::Typed(t1)) => {
// Derived `Clone` implementations for e.g. tuples cannot be relied on to call `clone_from`.
let t1 = t1.borrow();
t0.clear();
t0.extend_from_self(t1, 0..t1.len());
}
(Column::Align(a0), Column::Align(a1)) => { a0.clone_from(a1); }
(x, y) => { *x = y.clone(); }
}
}
}

use columnar::{Len, Index, FromBytes};
use columnar::{Len, Index};
use columnar::bytes::{EncodeDecode, Indexed};
use columnar::common::IterOwn;

impl<C: columnar::ContainerBytes> Column<C> {
/// Borrows the contents no matter their representation.
#[inline(always)] pub fn borrow(&self) -> C::Borrowed<'_> {
match self {
Column::Typed(t) => t.borrow(),
Column::Bytes(b) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))),
Column::Align(a) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut Indexed::decode(a)),
}
}
#[inline(always)] pub fn borrow(&self) -> C::Borrowed<'_> { self.stash.borrow() }
}

impl<C: columnar::ContainerBytes> timely::Accountable for Column<C> {
Expand All @@ -203,65 +155,26 @@ mod container {

impl<C: columnar::ContainerBytes> timely::container::SizableContainer for Column<C> {
fn at_capacity(&self) -> bool {
match self {
Self::Typed(t) => {
match &self.stash {
Stash::Typed(t) => {
let length_in_bytes = 8 * Indexed::length_in_words(&t.borrow());
length_in_bytes >= (1 << 20)
},
Self::Bytes(_) => true,
Self::Align(_) => true,
Stash::Bytes(_) => true,
Stash::Align(_) => true,
}
}
fn ensure_capacity(&mut self, _stash: &mut Option<Self>) { }
}

impl<C: columnar::Container, T> timely::container::PushInto<T> for Column<C> where C: columnar::Push<T> {
#[inline]
fn push_into(&mut self, item: T) {
match self {
Column::Typed(t) => t.push(item),
Column::Align(_) | Column::Bytes(_) => {
// We really oughtn't be calling this in this case.
// We could convert to owned, but need more constraints on `C`.
unimplemented!("Pushing into Column::Bytes without first clearing");
}
}
}
#[inline] fn push_into(&mut self, item: T) { use columnar::Push; self.stash.push(item) }
}

impl<C: columnar::ContainerBytes> timely::dataflow::channels::ContainerBytes for Column<C> {
fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self {
// Our expectation / hope is that `bytes` is `u64` aligned and sized.
// If the alignment is borked, we can relocate. IF the size is borked,
// not sure what we do in that case.
assert!(bytes.len() % 8 == 0);
if bytemuck::try_cast_slice::<_, u64>(&bytes).is_ok() {
Self::Bytes(bytes)
}
else {
println!("Re-locating bytes for alignment reasons");
let mut alloc: Vec<u64> = vec![0; bytes.len() / 8];
bytemuck::cast_slice_mut(&mut alloc[..]).copy_from_slice(&bytes[..]);
Self::Align(alloc.into())
}
}

fn length_in_bytes(&self) -> usize {
match self {
// We'll need one u64 for the length, then the length rounded up to a multiple of 8.
Column::Typed(t) => 8 * Indexed::length_in_words(&t.borrow()),
Column::Bytes(b) => b.len(),
Column::Align(a) => 8 * a.len(),
}
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
match self {
Column::Typed(t) => { Indexed::write(writer, &t.borrow()).unwrap() },
Column::Bytes(b) => writer.write_all(b).unwrap(),
Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
}
}
fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { Self { stash: bytes.into() } }
fn length_in_bytes(&self) -> usize { self.stash.length_in_bytes() }
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) { self.stash.into_bytes(writer) }
}
}

Expand All @@ -270,7 +183,7 @@ use builder::ColumnBuilder;
mod builder {

use std::collections::VecDeque;
use columnar::bytes::{EncodeDecode, Indexed};
use columnar::bytes::{EncodeDecode, Indexed, stash::Stash};
use super::Column;

/// A container builder for `Column<C>`.
Expand All @@ -294,7 +207,7 @@ mod builder {
if round - words < round / 10 {
let mut alloc = Vec::with_capacity(round);
Indexed::encode(&mut alloc, &self.current.borrow());
self.pending.push_back(Column::Align(alloc.into_boxed_slice()));
self.pending.push_back(Column { stash: Stash::Align(alloc.into_boxed_slice()) });
self.current.clear();
}
}
Expand All @@ -317,7 +230,7 @@ mod builder {
#[inline]
fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.current.is_empty() {
self.pending.push_back(Column::Typed(std::mem::take(&mut self.current)));
self.pending.push_back(Column { stash: Stash::Typed(std::mem::take(&mut self.current)) });
}
self.empty = self.pending.pop_front();
self.empty.as_mut()
Expand Down
Loading