Skip to content

Commit

Permalink
Bytes-able columnar container
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 27, 2024
1 parent 6e57b81 commit d49aa54
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 97 deletions.
3 changes: 0 additions & 3 deletions container/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,3 @@ edition.workspace = true
columnation = "0.1"
flatcontainer = "0.5"
serde = { version = "1.0", features = ["derive"] }
columnar = "0.1"
# columnar = { path = "../../columnar" }
# columnar = { git = "https://github.com/frankmcsherry/columnar" }
88 changes: 0 additions & 88 deletions container/src/columnar.rs

This file was deleted.

1 change: 0 additions & 1 deletion container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::collections::VecDeque;

pub mod columnation;
pub mod flatcontainer;
pub mod columnar;

/// A container transferring data through dataflow edges
///
Expand Down
1 change: 1 addition & 0 deletions timely/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ crossbeam-channel = "0.5"
smallvec = { version = "1.13.2", features = ["serde", "const_generics"] }

[dev-dependencies]
bytemuck = "1.18.0"
rand = { version = "0.8", features = ["small_rng"] }
columnar = "0.1"
# columnar = { git = "https://github.com/frankmcsherry/columnar" }
125 changes: 122 additions & 3 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use {
std::collections::HashMap,
timely::{Container, container::CapacityContainerBuilder},
timely::container::columnar::Columnar,
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
timely::dataflow::InputHandleCore,
timely::dataflow::operators::{Inspect, Operator, Probe},
Expand All @@ -12,15 +11,15 @@ use {

// Creates `WordCountContainer` and `WordCountReference` structs,
// as well as various implementations relating them to `WordCount`.
#[derive(Columnar)]
#[derive(columnar::Columnar)]
struct WordCount {
text: String,
diff: i64,
}

fn main() {

type Container = Columnar<<WordCount as columnar::Columnar>::Container>;
type Container = Column<<WordCount as columnar::Columnar>::Container>;

use columnar::Len;

Expand Down Expand Up @@ -105,3 +104,123 @@ fn main() {
})
.unwrap();
}


pub use container::Column;
mod container {

use timely_bytes::arc::Bytes;

/// A container based on a columnar store, encoded in aligned bytes.
pub enum Column<C> {
/// The typed variant of the container.
Typed(C),
/// The binary variant of the container.
Bytes(Bytes),
}

impl<C: Default> Default for Column<C> {
fn default() -> Self { Self::Typed(C::default()) }
}

impl<C: Clone> Clone for Column<C> {
fn clone(&self) -> Self {
match self {
Column::Typed(t) => Column::Typed(t.clone()),
Column::Bytes(_) => unimplemented!(),
}
}
}

use columnar::{Clear, Len, Index, bytes::{AsBytes, FromBytes}};
use columnar::bytes::serialization::decode;
use columnar::common::IterOwn;

use timely::Container;
impl<C: AsBytes + Clear + Len + Clone + Default + 'static> Container for Column<C>
where
for<'a> C::Borrowed<'a> : Len + Index,
{
fn len(&self) -> usize {
match self {
Column::Typed(t) => t.len(),
Column::Bytes(b) => <C::Borrowed<'_> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(&b[..]))).len(),
}
}
// Perhpas this should be an enum that allows the bytes to be un-set, but .. not sure what this should do.
fn clear(&mut self) {
match self {
Column::Typed(t) => t.clear(),
Column::Bytes(_) => unimplemented!(),
}
// unimplemented!()
}

type ItemRef<'a> = <C::Borrowed<'a> as Index>::Ref where Self: 'a;
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
fn iter<'a>(&'a self) -> Self::Iter<'a> {
match self {
Column::Typed(t) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut t.as_bytes().map(|(_, x)| x)).into_iter(),
Column::Bytes(b) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(&b[..]))).into_iter()
}
}

type Item<'a> = <C::Borrowed<'a> as Index>::Ref where Self: 'a;
type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
match self {
Column::Typed(t) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut t.as_bytes().map(|(_, x)| x)).into_iter(),
Column::Bytes(b) => <C::Borrowed<'a> as FromBytes>::from_bytes(&mut decode(bytemuck::cast_slice(&b[..]))).into_iter()
}
}
}

use timely::container::SizableContainer;
impl<C: AsBytes + Clear + Len + Clone + Default + 'static> SizableContainer for Column<C>
where
for<'a> C::Borrowed<'a> : Len + Index,
{
fn capacity(&self) -> usize { 1024 }
fn preferred_capacity() -> usize { 1024 }
fn reserve(&mut self, _additional: usize) { }
}

use timely::container::PushInto;
impl<C: columnar::Push<T>, T> PushInto<T> for Column<C> {
#[inline]
fn push_into(&mut self, item: T) {
match self {
Column::Typed(t) => t.push(item),
Column::Bytes(_) => unimplemented!(),
}
}
}

use timely::dataflow::channels::ContainerBytes;
impl<C: columnar::bytes::AsBytes> ContainerBytes for Column<C> {
fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self {
Self::Bytes(bytes)
}

fn length_in_bytes(&self) -> usize {
match self {
// We'll need one u64 for the length, then the length rounded up to a multiple of 8.
Column::Typed(t) => t.as_bytes().map(|(_, x)| 8 * (1 + (x.len()/8) + if x.len() % 8 == 0 { 0 } else { 1 })).sum(),
Column::Bytes(b) => b.len(),
}
}

fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
match self {
Column::Typed(t) => {
for (_align, _bytes) in t.as_bytes() {
// Each byte slice is a u64 length in bytes,
// followed by bytes padded to a multiple of eight bytes.
// writer.write_all(&)
}
},
Column::Bytes(b) => writer.write_all(&b[..]).unwrap(),
}
}
}
}
4 changes: 2 additions & 2 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::dataflow::channels::Message;
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
use crate::progress::Timestamp;
use crate::worker::AsWorker;
use crate::ExchangeData;
use crate::Data;

/// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContract<T, C> {
Expand Down Expand Up @@ -68,7 +68,7 @@ where
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
impl<T: Timestamp, C, H: 'static> ParallelizationContract<T, C> for ExchangeCore<C, H>
where
C: ExchangeData + PushPartitioned + crate::dataflow::channels::ContainerBytes,
C: Data + Send + PushPartitioned + crate::dataflow::channels::ContainerBytes,
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Message<T, C>>>>, H>;
Expand Down

0 comments on commit d49aa54

Please sign in to comment.