From d49aa54929cc5555485662bffa26cad700061dad Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 26 Nov 2024 20:53:47 -0500 Subject: [PATCH] Bytes-able columnar container --- container/Cargo.toml | 3 - container/src/columnar.rs | 88 ------------------- container/src/lib.rs | 1 - timely/Cargo.toml | 1 + timely/examples/columnar.rs | 125 ++++++++++++++++++++++++++- timely/src/dataflow/channels/pact.rs | 4 +- 6 files changed, 125 insertions(+), 97 deletions(-) delete mode 100644 container/src/columnar.rs diff --git a/container/Cargo.toml b/container/Cargo.toml index eb77feb09..c767637a3 100644 --- a/container/Cargo.toml +++ b/container/Cargo.toml @@ -9,6 +9,3 @@ edition.workspace = true columnation = "0.1" flatcontainer = "0.5" serde = { version = "1.0", features = ["derive"] } -columnar = "0.1" -# columnar = { path = "../../columnar" } -# columnar = { git = "https://github.com/frankmcsherry/columnar" } diff --git a/container/src/columnar.rs b/container/src/columnar.rs deleted file mode 100644 index 5ca03c1ed..000000000 --- a/container/src/columnar.rs +++ /dev/null @@ -1,88 +0,0 @@ -//! Present a columnar container as a timely container. - -use serde::{Serialize, Deserialize}; - -pub use columnar::*; -use columnar::common::IterOwn; - -use crate::{Container, SizableContainer, PushInto}; - -/// A container based on a `columnar` store. -#[derive(Clone, Default, Serialize, Deserialize)] -pub struct Columnar { - store: C, -} - -impl Container for Columnar -where - for<'a> &'a C: columnar::Index, -{ - fn len(&self) -> usize { self.store.len() } - fn clear(&mut self) { self.store.clear() } - - type ItemRef<'a> = <&'a C as Index>::Ref where Self: 'a; - type Iter<'a> = IterOwn<&'a C>; - fn iter<'a>(&'a self) -> Self::Iter<'a> { (&self.store).into_iter() } - - type Item<'a> = <&'a C as Index>::Ref where Self: 'a; - type DrainIter<'a> = IterOwn<&'a C>; - fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { (&self.store).into_iter() } -} - -impl SizableContainer for Columnar -where - for<'a> &'a C: columnar::Index, -{ - fn capacity(&self) -> usize { 1024 } - fn preferred_capacity() -> usize { 1024 } - fn reserve(&mut self, _additional: usize) { } -} - -impl, T> PushInto for Columnar { - #[inline] - fn push_into(&mut self, item: T) { - self.store.push(item); - } -} - - -use columnar::bytes::{AsBytes, FromBytes, serialization::decode}; - -/// A container based on a columnar store, encoded in aligned bytes. -#[derive(Clone, Default)] -pub struct ColumnarBytes { - bytes: B, - phantom: std::marker::PhantomData, -} - -impl + Clone + Default + 'static, C: AsBytes + Clone + Default + 'static> Container for ColumnarBytes -where - for<'a> C::Borrowed<'a> : Len + Clear + Index, -{ - fn len(&self) -> usize { - as FromBytes>::from_bytes(&mut decode(&self.bytes)).len() - } - // Perhpas this should be an enum that allows the bytes to be un-set, but .. not sure what this should do. - fn clear(&mut self) { unimplemented!() } - - type ItemRef<'a> = as Index>::Ref where Self: 'a; - type Iter<'a> = IterOwn>; - fn iter<'a>(&'a self) -> Self::Iter<'a> { - as FromBytes>::from_bytes(&mut decode(&self.bytes)).into_iter() - } - - type Item<'a> = as Index>::Ref where Self: 'a; - type DrainIter<'a> = IterOwn>; - fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { - as FromBytes>::from_bytes(&mut decode(&self.bytes)).into_iter() - } -} - -impl + Clone + Default + 'static, C: AsBytes + Clone + Default + 'static> SizableContainer for ColumnarBytes -where - for<'a> C::Borrowed<'a> : Len + Clear + Index, -{ - fn capacity(&self) -> usize { 1024 } - fn preferred_capacity() -> usize { 1024 } - fn reserve(&mut self, _additional: usize) { } -} diff --git a/container/src/lib.rs b/container/src/lib.rs index 40b38531c..a42abe888 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -6,7 +6,6 @@ use std::collections::VecDeque; pub mod columnation; pub mod flatcontainer; -pub mod columnar; /// A container transferring data through dataflow edges /// diff --git a/timely/Cargo.toml b/timely/Cargo.toml index db6595fc4..861cac8b9 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -30,6 +30,7 @@ crossbeam-channel = "0.5" smallvec = { version = "1.13.2", features = ["serde", "const_generics"] } [dev-dependencies] +bytemuck = "1.18.0" rand = { version = "0.8", features = ["small_rng"] } columnar = "0.1" # columnar = { git = "https://github.com/frankmcsherry/columnar" } diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index 9c0ed97d8..bcd0b2d8a 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -3,7 +3,6 @@ use { std::collections::HashMap, timely::{Container, container::CapacityContainerBuilder}, - timely::container::columnar::Columnar, timely::dataflow::channels::pact::{ExchangeCore, Pipeline}, timely::dataflow::InputHandleCore, timely::dataflow::operators::{Inspect, Operator, Probe}, @@ -12,7 +11,7 @@ use { // Creates `WordCountContainer` and `WordCountReference` structs, // as well as various implementations relating them to `WordCount`. -#[derive(Columnar)] +#[derive(columnar::Columnar)] struct WordCount { text: String, diff: i64, @@ -20,7 +19,7 @@ struct WordCount { fn main() { - type Container = Columnar<::Container>; + type Container = Column<::Container>; use columnar::Len; @@ -105,3 +104,123 @@ fn main() { }) .unwrap(); } + + +pub use container::Column; +mod container { + + use timely_bytes::arc::Bytes; + + /// A container based on a columnar store, encoded in aligned bytes. + pub enum Column { + /// The typed variant of the container. + Typed(C), + /// The binary variant of the container. + Bytes(Bytes), + } + + impl Default for Column { + fn default() -> Self { Self::Typed(C::default()) } + } + + impl Clone for Column { + fn clone(&self) -> Self { + match self { + Column::Typed(t) => Column::Typed(t.clone()), + Column::Bytes(_) => unimplemented!(), + } + } + } + + use columnar::{Clear, Len, Index, bytes::{AsBytes, FromBytes}}; + use columnar::bytes::serialization::decode; + use columnar::common::IterOwn; + + use timely::Container; + impl Container for Column + where + for<'a> C::Borrowed<'a> : Len + Index, + { + fn len(&self) -> usize { + match self { + Column::Typed(t) => t.len(), + Column::Bytes(b) => as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(&b[..]))).len(), + } + } + // Perhpas this should be an enum that allows the bytes to be un-set, but .. not sure what this should do. + fn clear(&mut self) { + match self { + Column::Typed(t) => t.clear(), + Column::Bytes(_) => unimplemented!(), + } + // unimplemented!() + } + + type ItemRef<'a> = as Index>::Ref where Self: 'a; + type Iter<'a> = IterOwn>; + fn iter<'a>(&'a self) -> Self::Iter<'a> { + match self { + Column::Typed(t) => as FromBytes>::from_bytes(&mut t.as_bytes().map(|(_, x)| x)).into_iter(), + Column::Bytes(b) => as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(&b[..]))).into_iter() + } + } + + type Item<'a> = as Index>::Ref where Self: 'a; + type DrainIter<'a> = IterOwn>; + fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { + match self { + Column::Typed(t) => as FromBytes>::from_bytes(&mut t.as_bytes().map(|(_, x)| x)).into_iter(), + Column::Bytes(b) => as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(&b[..]))).into_iter() + } + } + } + + use timely::container::SizableContainer; + impl SizableContainer for Column + where + for<'a> C::Borrowed<'a> : Len + Index, + { + fn capacity(&self) -> usize { 1024 } + fn preferred_capacity() -> usize { 1024 } + fn reserve(&mut self, _additional: usize) { } + } + + use timely::container::PushInto; + impl, T> PushInto for Column { + #[inline] + fn push_into(&mut self, item: T) { + match self { + Column::Typed(t) => t.push(item), + Column::Bytes(_) => unimplemented!(), + } + } + } + + use timely::dataflow::channels::ContainerBytes; + impl ContainerBytes for Column { + fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { + Self::Bytes(bytes) + } + + fn length_in_bytes(&self) -> usize { + match self { + // We'll need one u64 for the length, then the length rounded up to a multiple of 8. + Column::Typed(t) => t.as_bytes().map(|(_, x)| 8 * (1 + (x.len()/8) + if x.len() % 8 == 0 { 0 } else { 1 })).sum(), + Column::Bytes(b) => b.len(), + } + } + + fn into_bytes(&self, writer: &mut W) { + match self { + Column::Typed(t) => { + for (_align, _bytes) in t.as_bytes() { + // Each byte slice is a u64 length in bytes, + // followed by bytes padded to a multiple of eight bytes. + // writer.write_all(&) + } + }, + Column::Bytes(b) => writer.write_all(&b[..]).unwrap(), + } + } + } +} \ No newline at end of file diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index ae2261f66..13248e4d5 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -19,7 +19,7 @@ use crate::dataflow::channels::Message; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; use crate::progress::Timestamp; use crate::worker::AsWorker; -use crate::ExchangeData; +use crate::Data; /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. pub trait ParallelizationContract { @@ -68,7 +68,7 @@ where // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. impl ParallelizationContract for ExchangeCore where - C: ExchangeData + PushPartitioned + crate::dataflow::channels::ContainerBytes, + C: Data + Send + PushPartitioned + crate::dataflow::channels::ContainerBytes, for<'a> H: FnMut(&C::Item<'a>) -> u64 { type Pusher = ExchangePusher>>>, H>;