Skip to content

Commit

Permalink
Merge 4bb7595 into eadc1fe
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Feb 19, 2025
2 parents eadc1fe + 4bb7595 commit b5d1c6d
Show file tree
Hide file tree
Showing 25 changed files with 763 additions and 421 deletions.
14 changes: 13 additions & 1 deletion vortex-array/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,19 @@ pub const STATS_TO_WRITE: &[Stat] = &[
Stat::UncompressedSizeInBytes,
];

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Sequence, IntoPrimitive, TryFromPrimitive)]
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
Sequence,
IntoPrimitive,
TryFromPrimitive,
)]
#[repr(u8)]
pub enum Stat {
/// Frequency of each bit width (nulls are treated as 0)
Expand Down
12 changes: 0 additions & 12 deletions vortex-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ pub const V1_FOOTER_FBS_SIZE: usize = 32;

/// Constants that will never change (i.e., doing so would break backwards compatibility)
mod forever_constant {
use vortex_layout::LayoutId;

/// The extension for Vortex files
pub const VORTEX_FILE_EXTENSION: &str = "vortex";

Expand All @@ -125,13 +123,6 @@ mod forever_constant {
/// The size of the EOF marker in bytes
pub const EOF_SIZE: usize = 8;

/// The layout ID for a flat layout
pub const FLAT_LAYOUT_ID: LayoutId = LayoutId(1);
/// The layout ID for a chunked layout
pub const CHUNKED_LAYOUT_ID: LayoutId = LayoutId(2);
/// The layout ID for a column layout
pub const COLUMNAR_LAYOUT_ID: LayoutId = LayoutId(3);

#[cfg(test)]
mod test {
use super::*;
Expand All @@ -143,9 +134,6 @@ mod forever_constant {
assert_eq!(MAX_FOOTER_SIZE, 65527);
assert_eq!(MAGIC_BYTES, *b"VTXF");
assert_eq!(EOF_SIZE, 8);
assert_eq!(FLAT_LAYOUT_ID, LayoutId(1));
assert_eq!(CHUNKED_LAYOUT_ID, LayoutId(2));
assert_eq!(COLUMNAR_LAYOUT_ID, LayoutId(3));
}
}
}
196 changes: 51 additions & 145 deletions vortex-file/src/strategy.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
//! This module defines the default layout strategy for a Vortex file.
use std::collections::VecDeque;
use std::sync::{Arc, LazyLock};

use vortex_array::array::ChunkedArray;
use vortex_array::compute::slice;
use vortex_array::stats::STATS_TO_WRITE;
use vortex_array::{Array, IntoArray, IntoCanonical};
use vortex_array::stats::{PRUNING_STATS, STATS_TO_WRITE};
use vortex_array::Array;
use vortex_dtype::DType;
use vortex_error::{VortexExpect, VortexResult};
use vortex_error::VortexResult;
use vortex_layout::layouts::chunked::writer::{ChunkedLayoutOptions, ChunkedLayoutWriter};
use vortex_layout::layouts::flat::writer::FlatLayoutOptions;
use vortex_layout::layouts::flat::FlatLayout;
use vortex_layout::layouts::stats::writer::{StatsLayoutOptions, StatsLayoutWriter};
use vortex_layout::layouts::struct_::writer::StructLayoutWriter;
use vortex_layout::segments::SegmentWriter;
use vortex_layout::writers::{RepartitionWriter, RepartitionWriterOptions};
use vortex_layout::{Layout, LayoutStrategy, LayoutWriter, LayoutWriterExt};
use vortex_sampling_compressor::compressors::CompressionTree;
use vortex_sampling_compressor::{SamplingCompressor, DEFAULT_COMPRESSORS};
Expand All @@ -22,22 +22,7 @@ static COMPRESSOR: LazyLock<Arc<SamplingCompressor<'static>>> =

/// The default Vortex file layout strategy.
#[derive(Clone)]
pub struct VortexLayoutStrategy {
/// The minimum size of a block in bytes. The block will be cut at the next multiple
/// of the `block_len` larger than this size.
pub minimum_block_size: usize,
/// The divisor for the number of rows in a block.
pub block_len_multiple: usize,
}

impl Default for VortexLayoutStrategy {
fn default() -> Self {
Self {
minimum_block_size: 8 * (1 << 20), // 8MB
block_len_multiple: 8 * 1024, // 8192 rows
}
}
}
pub struct VortexLayoutStrategy;

impl LayoutStrategy for VortexLayoutStrategy {
fn new_writer(&self, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
Expand All @@ -47,134 +32,55 @@ impl LayoutStrategy for VortexLayoutStrategy {
.map(|w| w.boxed());
}

// Then we re-chunk each column per our strategy...
Ok(ColumnChunker::new(
dtype.clone(),
// ...compress each chunk using a sampling compressor...
SamplingCompressorWriter {
compressor: COMPRESSOR.clone(),
compress_like: None,
child: ChunkedLayoutWriter::new(
dtype,
ChunkedLayoutOptions {
// ...and write each chunk as a flat layout.
chunk_strategy: Arc::new(FlatLayoutOptions::default()),
..Default::default()
},
)
.boxed(),
}
// Otherwise, we finish with compressing the chunks.
let writer: Box<dyn LayoutWriter> = SamplingCompressorWriter {
compressor: COMPRESSOR.clone(),
compress_like: None,
child: ChunkedLayoutWriter::new(
dtype,
ChunkedLayoutOptions {
// ...and write each chunk as a flat layout.
chunk_strategy: Arc::new(FlatLayoutOptions::default()),
},
)
.boxed(),
self.clone(),
)
.boxed())
}
}

/// Each column is chunked into multiples of 8096 values, of at least 1MB in uncompressed size.
struct ColumnChunker {
dtype: DType,
chunks: VecDeque<Array>,
row_count: usize,
nbytes: usize,
writer: Box<dyn LayoutWriter>,
options: VortexLayoutStrategy,
}

impl ColumnChunker {
pub fn new(dtype: DType, writer: Box<dyn LayoutWriter>, options: VortexLayoutStrategy) -> Self {
Self {
dtype,
chunks: VecDeque::new(),
row_count: 0,
nbytes: 0,
writer,
options,
}
}

fn flush(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<()> {
if self.nbytes >= self.options.minimum_block_size {
let nblocks = self.row_count / self.options.block_len_multiple;

// If we don't have a full block, then continue anyway.
if nblocks == 0 {
// TODO(ngates): if we exceed a maximum block size, regardless of row count we should
// flush the chunk. This can happen for columns with very large cells.
return Ok(());
}

if nblocks > 1 {
// TODO(ngates): if we have _too_ many blocks, then we might look into slicing
// the chunks to be smaller blocks.
}

let mut chunks = Vec::with_capacity(self.chunks.len());
let mut remaining = nblocks * self.options.block_len_multiple;

while remaining > 0 {
let chunk = self.chunks.pop_front().vortex_expect("chunk is missing");
self.row_count -= chunk.len();
self.nbytes -= chunk.nbytes();

let len = chunk.len();

if len > remaining {
let left = slice(&chunk, 0, remaining)?;
let right = slice(&chunk, remaining, len)?;
self.row_count += right.len();
self.nbytes += right.nbytes();
self.chunks.push_front(right);

chunks.push(left);
remaining = 0;
} else {
chunks.push(chunk);
remaining -= len;
}
}

// Combine the chunks to and flush them to the layout.
assert!(!chunks.is_empty());
let chunk = ChunkedArray::try_new_unchecked(chunks, self.dtype.clone())
.into_canonical()?
.into_array();

self.writer.push_chunk(segments, chunk)?;
}
.boxed();

Ok(())
}
}

impl LayoutWriter for ColumnChunker {
fn push_chunk(&mut self, segments: &mut dyn SegmentWriter, chunk: Array) -> VortexResult<()> {
// We make sure the chunks are canonical so our nbytes measurement is accurate.
let chunk = chunk.into_canonical()?.into_array();

// Split chunks into 8192 blocks to make sure we don't over-size them.
let mut offset = 0;
while offset < chunk.len() {
let end = (offset + self.options.block_len_multiple).min(chunk.len());
let c = slice(&chunk, offset, end)?;
self.row_count += c.len();
self.nbytes += c.nbytes();
self.chunks.push_back(c);
offset = end;

self.flush(segments)?;
}
// Prior to compression, re-partition into size-based chunks.
let writer = RepartitionWriter::new(
dtype.clone(),
writer,
RepartitionWriterOptions {
block_size_minimum: 8 * (1 << 20), // 1 MB
block_len_multiple: 8192, // 8K rows
},
)
.boxed();

Ok(())
}
// Prior to repartitioning, we record statistics
let writer = RepartitionWriter::new(
dtype.clone(),
StatsLayoutWriter::try_new(
dtype,
writer,
Arc::new(FlatLayout),
StatsLayoutOptions {
block_size: 8192,
stats: PRUNING_STATS.into(),
},
)?
.boxed(),
RepartitionWriterOptions {
// No minimum block size in bytes
block_size_minimum: 0,
// Always repartition into 8K row blocks
block_len_multiple: 8192,
},
)
.boxed();

fn finish(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<Layout> {
let chunk =
ChunkedArray::try_new_unchecked(self.chunks.drain(..).collect(), self.dtype.clone())
.into_canonical()?
.into_array();
self.writer.push_chunk(segments, chunk)?;
self.writer.finish(segments)
Ok(writer)
}
}

Expand Down
6 changes: 3 additions & 3 deletions vortex-file/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use vortex_array::stream::ArrayStream;
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};
use vortex_flatbuffers::{FlatBuffer, FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt};
use vortex_io::VortexWrite;
use vortex_layout::stats::StatsLayoutWriter;
use vortex_layout::stats::FileStatsLayoutWriter;
use vortex_layout::{LayoutStrategy, LayoutWriter};

use crate::footer::{FileLayout, Postscript, Segment};
Expand All @@ -19,7 +19,7 @@ pub struct VortexWriteOptions {
impl Default for VortexWriteOptions {
fn default() -> Self {
Self {
strategy: Box::new(VortexLayoutStrategy::default()),
strategy: Box::new(VortexLayoutStrategy),
}
}
}
Expand All @@ -40,7 +40,7 @@ impl VortexWriteOptions {
mut stream: S,
) -> VortexResult<W> {
// Set up the root layout
let mut layout_writer = StatsLayoutWriter::new(
let mut layout_writer = FileStatsLayoutWriter::new(
self.strategy.new_writer(stream.dtype())?,
stream.dtype(),
PRUNING_STATS.into(),
Expand Down
2 changes: 2 additions & 0 deletions vortex-layout/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use vortex_array::aliases::hash_map::HashMap;

use crate::layouts::chunked::ChunkedLayout;
use crate::layouts::flat::FlatLayout;
use crate::layouts::stats::StatsLayout;
use crate::layouts::struct_::StructLayout;
use crate::vtable::LayoutVTableRef;
use crate::LayoutId;
Expand Down Expand Up @@ -41,6 +42,7 @@ impl Default for LayoutContext {
LayoutVTableRef::from_static(&ChunkedLayout),
LayoutVTableRef::from_static(&FlatLayout),
LayoutVTableRef::from_static(&StructLayout),
LayoutVTableRef::from_static(&StatsLayout),
]
.into_iter()
.map(|l| (l.id(), l))
Expand Down
6 changes: 5 additions & 1 deletion vortex-layout/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ impl Layout {
Inner::Owned(o) => {
let child = o.children[i].clone();
if child.dtype() != &dtype {
vortex_bail!("child dtype mismatch");
vortex_bail!(
"Child has dtype {}, but was requested with {}",
child.dtype(),
dtype
);
}
Ok(child)
}
Expand Down
Loading

0 comments on commit b5d1c6d

Please sign in to comment.