Skip to content

Commit

Permalink
Update columnar (#611)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Dec 17, 2024
1 parent 2398b79 commit 47e0722
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 55 deletions.
2 changes: 1 addition & 1 deletion communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ license = "MIT"
default = ["getopts"]

[dependencies]
columnar = "0.1"
columnar = "0.2"
getopts = { version = "0.2.21", optional = true }
byteorder = "1.5"
serde = { version = "1.0", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion timely/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ default = ["getopts"]
getopts = ["getopts-dep", "timely_communication/getopts"]

[dependencies]
columnar = "0.1"
columnar = "0.2"
getopts-dep = { package = "getopts", version = "0.2.21", optional = true }
bincode = { version = "1.0" }
byteorder = "1.5"
Expand Down
106 changes: 53 additions & 53 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ struct WordCount {

fn main() {

type Container = Column<<WordCount as columnar::Columnar>::Container>;
type Container = Column<WordCount>;

use columnar::Len;

Expand Down Expand Up @@ -55,7 +55,7 @@ fn main() {
)
.container::<Container>()
.unary_frontier(
ExchangeCore::<ColumnBuilder<<WordCount as columnar::Columnar>::Container>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
ExchangeCore::<ColumnBuilder<WordCount>,_>::new_core(|x: &WordCountReference<&str,&i64>| x.text.len() as u64),
"WordCount",
|_capability, _info| {
let mut queues = HashMap::new();
Expand Down Expand Up @@ -114,12 +114,15 @@ fn main() {
pub use container::Column;
mod container {

use columnar::Columnar;
use columnar::Container as FooBozzle;

use timely_bytes::arc::Bytes;

/// A container based on a columnar store, encoded in aligned bytes.
pub enum Column<C> {
pub enum Column<C: Columnar> {
/// The typed variant of the container.
Typed(C),
Typed(C::Container),
/// The binary variant of the container.
Bytes(Bytes),
/// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
Expand All @@ -129,11 +132,11 @@ mod container {
Align(Box<[u64]>),
}

impl<C: Default> Default for Column<C> {
fn default() -> Self { Self::Typed(C::default()) }
impl<C: Columnar> Default for Column<C> {
fn default() -> Self { Self::Typed(Default::default()) }
}

impl<C: Clone> Clone for Column<C> {
impl<C: Columnar> Clone for Column<C> where C::Container: Clone {
fn clone(&self) -> Self {
match self {
Column::Typed(t) => Column::Typed(t.clone()),
Expand All @@ -148,64 +151,55 @@ mod container {
}
}

use columnar::{Clear, Len, Index, bytes::{AsBytes, FromBytes}};
use columnar::{Clear, Len, Index, AsBytes, FromBytes};
use columnar::bytes::serialization::decode;
use columnar::common::IterOwn;

use timely::Container;
impl<C: AsBytes + Clear + Len + Clone + Default + 'static> Container for Column<C>
where
for<'a> C::Borrowed<'a> : Len + Index,
{
impl<C: Columnar> Container for Column<C> {
fn len(&self) -> usize {
match self {
Column::Typed(t) => t.len(),
Column::Bytes(b) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).len(),
Column::Align(a) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(a)).len(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).len(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(a)).len(),
}
}
// This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into.
fn clear(&mut self) {
match self {
Column::Typed(t) => t.clear(),
Column::Bytes(_) => *self = Column::Typed(C::default()),
Column::Align(_) => *self = Column::Typed(C::default()),
Column::Bytes(_) => *self = Column::Typed(Default::default()),
Column::Align(_) => *self = Column::Typed(Default::default()),
}
}

type ItemRef<'a> = <C::Borrowed<'a> as Index>::Ref where Self: 'a;
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
type ItemRef<'a> = C::Ref<'a>;
type Iter<'a> = IterOwn<<C::Container as columnar::Container<C>>::Borrowed<'a>>;
fn iter<'a>(&'a self) -> Self::Iter<'a> {
match self {
Column::Typed(t) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut t.as_bytes().map(|(_, x)| x)).into_iter(),
Column::Bytes(b) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(),
Column::Align(a) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(),
Column::Typed(t) => t.borrow().into_iter(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(),
}
}

type Item<'a> = <C::Borrowed<'a> as Index>::Ref where Self: 'a;
type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
type Item<'a> = C::Ref<'a>;
type DrainIter<'a> = IterOwn<<C::Container as columnar::Container<C>>::Borrowed<'a>>;
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
match self {
Column::Typed(t) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut t.as_bytes().map(|(_, x)| x)).into_iter(),
Column::Bytes(b) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(),
Column::Align(a) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(),
Column::Typed(t) => t.borrow().into_iter(),
Column::Bytes(b) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(b))).into_iter(),
Column::Align(a) => <<C::Container as columnar::Container<C>>::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(a)).into_iter(),
}
}
}

use timely::container::SizableContainer;
impl<C: AsBytes + Clear + Len + Clone + Default + 'static> SizableContainer for Column<C>
where
for<'a> C::Borrowed<'a> : Len + Index,
{
impl<C: Columnar> SizableContainer for Column<C> {
fn at_capacity(&self) -> bool {
match self {
Self::Typed(t) => {
let length_in_bytes: usize =
t.as_bytes()
.map(|(_, x)| 8 * (1 + (x.len()/8) + if x.len() % 8 == 0 { 0 } else { 1 }))
.sum();
let length_in_bytes = t.borrow().length_in_words() * 8;
length_in_bytes >= (1 << 20)
},
Self::Bytes(_) => true,
Expand All @@ -216,9 +210,10 @@ mod container {
}

use timely::container::PushInto;
impl<C: columnar::Push<T>, T> PushInto<T> for Column<C> {
impl<C: Columnar, T> PushInto<T> for Column<C> where C::Container: columnar::Push<T> {
#[inline]
fn push_into(&mut self, item: T) {
use columnar::Push;
match self {
Column::Typed(t) => t.push(item),
Column::Align(_) | Column::Bytes(_) => {
Expand All @@ -231,7 +226,7 @@ mod container {
}

use timely::dataflow::channels::ContainerBytes;
impl<C: columnar::bytes::AsBytes> ContainerBytes for Column<C> {
impl<C: Columnar> ContainerBytes for Column<C> {
fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self {
// Our expectation / hope is that `bytes` is `u64` aligned and sized.
// If the alignment is borked, we can relocate. IF the size is borked,
Expand All @@ -251,7 +246,7 @@ mod container {
fn length_in_bytes(&self) -> usize {
match self {
// We'll need one u64 for the length, then the length rounded up to a multiple of 8.
Column::Typed(t) => 8 * t.length_in_words(),
Column::Typed(t) => 8 * t.borrow().length_in_words(),
Column::Bytes(b) => b.len(),
Column::Align(a) => 8 * a.len(),
}
Expand All @@ -260,10 +255,11 @@ mod container {
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
match self {
Column::Typed(t) => {
use columnar::Container;
// Columnar data is serialized as a sequence of `u64` values, with each `[u8]` slice
// serialize as first its length in bytes, and then as many `u64` values as needed.
// Padding should be added, but only for alignment; no specific values are required.
for (align, bytes) in t.as_bytes() {
for (align, bytes) in t.borrow().as_bytes() {
assert!(align <= 8);
let length: u64 = bytes.len().try_into().unwrap();
writer.write_all(bytemuck::cast_slice(std::slice::from_ref(&length))).unwrap();
Expand All @@ -284,42 +280,49 @@ use builder::ColumnBuilder;
mod builder {

use std::collections::VecDeque;
use columnar::{Clear, Len, Index, bytes::AsBytes};
use columnar::{Columnar, Clear, Len, AsBytes, Push};
use super::Column;

/// A container builder for `Column<C>`.
#[derive(Default)]
pub struct ColumnBuilder<C> {
pub struct ColumnBuilder<C: Columnar> {
/// Container that we're writing to.
current: C,
current: C::Container,
/// Empty allocation.
empty: Option<Column<C>>,
/// Completed containers pending to be sent.
pending: VecDeque<Column<C>>,
}

use timely::container::PushInto;
impl<C: columnar::Push<T> + Clear + AsBytes, T> PushInto<T> for ColumnBuilder<C> {
impl<C: Columnar, T> PushInto<T> for ColumnBuilder<C> where C::Container: columnar::Push<T> {
#[inline]
fn push_into(&mut self, item: T) {
self.current.push(item);
// If there is less than 10% slop with 2MB backing allocations, mint a container.
let words = self.current.length_in_words();
use columnar::Container;
let words = self.current.borrow().length_in_words();
let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
if round - words < round / 10 {
let mut alloc = Vec::with_capacity(round);
columnar::bytes::serialization::encode(&mut alloc, self.current.as_bytes());
columnar::bytes::serialization::encode(&mut alloc, self.current.borrow().as_bytes());
self.pending.push_back(Column::Align(alloc.into_boxed_slice()));
self.current.clear();
}
}
}

impl<C: Columnar> Default for ColumnBuilder<C> {
fn default() -> Self {
ColumnBuilder {
current: Default::default(),
empty: None,
pending: Default::default(),
}
}
}

use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
impl<C: AsBytes + Clear + Len + Clone + Default + 'static> ContainerBuilder for ColumnBuilder<C>
where
for<'a> C::Borrowed<'a> : Len + Index,
{
impl<C: Columnar> ContainerBuilder for ColumnBuilder<C> where C::Container: Clone {
type Container = Column<C>;

#[inline]
Expand All @@ -342,8 +345,5 @@ mod builder {
}
}

impl<C: AsBytes + Clear + Len + Clone + Default + 'static> LengthPreservingContainerBuilder for ColumnBuilder<C>
where
for<'a> C::Borrowed<'a> : Len + Index,
{ }
impl<C: Columnar> LengthPreservingContainerBuilder for ColumnBuilder<C> where C::Container: Clone { }
}

0 comments on commit 47e0722

Please sign in to comment.