Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a StatsLayout to store logical statistics #2340

Merged
merged 28 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion vortex-array/src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,19 @@ pub const STATS_TO_WRITE: &[Stat] = &[
Stat::UncompressedSizeInBytes,
];

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Sequence, IntoPrimitive, TryFromPrimitive)]
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Hash,
Sequence,
IntoPrimitive,
TryFromPrimitive,
)]
#[repr(u8)]
pub enum Stat {
/// Frequency of each bit width (nulls are treated as 0)
Expand Down
12 changes: 0 additions & 12 deletions vortex-file/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ pub const V1_FOOTER_FBS_SIZE: usize = 32;

/// Constants that will never change (i.e., doing so would break backwards compatibility)
mod forever_constant {
use vortex_layout::LayoutId;

/// The extension for Vortex files
pub const VORTEX_FILE_EXTENSION: &str = "vortex";

Expand All @@ -125,13 +123,6 @@ mod forever_constant {
/// The size of the EOF marker in bytes
pub const EOF_SIZE: usize = 8;

/// The layout ID for a flat layout
pub const FLAT_LAYOUT_ID: LayoutId = LayoutId(1);
/// The layout ID for a chunked layout
pub const CHUNKED_LAYOUT_ID: LayoutId = LayoutId(2);
/// The layout ID for a column layout
pub const COLUMNAR_LAYOUT_ID: LayoutId = LayoutId(3);

#[cfg(test)]
mod test {
use super::*;
Expand All @@ -143,9 +134,6 @@ mod forever_constant {
assert_eq!(MAX_FOOTER_SIZE, 65527);
assert_eq!(MAGIC_BYTES, *b"VTXF");
assert_eq!(EOF_SIZE, 8);
assert_eq!(FLAT_LAYOUT_ID, LayoutId(1));
assert_eq!(CHUNKED_LAYOUT_ID, LayoutId(2));
assert_eq!(COLUMNAR_LAYOUT_ID, LayoutId(3));
}
}
}
196 changes: 51 additions & 145 deletions vortex-file/src/strategy.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
//! This module defines the default layout strategy for a Vortex file.
use std::collections::VecDeque;
use std::sync::{Arc, LazyLock};

use vortex_array::array::ChunkedArray;
use vortex_array::compute::slice;
use vortex_array::stats::STATS_TO_WRITE;
use vortex_array::{Array, IntoArray, IntoCanonical};
use vortex_array::stats::{PRUNING_STATS, STATS_TO_WRITE};
use vortex_array::Array;
use vortex_dtype::DType;
use vortex_error::{VortexExpect, VortexResult};
use vortex_error::VortexResult;
use vortex_layout::layouts::chunked::writer::{ChunkedLayoutOptions, ChunkedLayoutWriter};
use vortex_layout::layouts::flat::writer::FlatLayoutOptions;
use vortex_layout::layouts::flat::FlatLayout;
use vortex_layout::layouts::stats::writer::{StatsLayoutOptions, StatsLayoutWriter};
use vortex_layout::layouts::struct_::writer::StructLayoutWriter;
use vortex_layout::segments::SegmentWriter;
use vortex_layout::writers::{RepartitionWriter, RepartitionWriterOptions};
use vortex_layout::{Layout, LayoutStrategy, LayoutWriter, LayoutWriterExt};
use vortex_sampling_compressor::compressors::CompressionTree;
use vortex_sampling_compressor::{SamplingCompressor, DEFAULT_COMPRESSORS};
Expand All @@ -22,22 +22,7 @@ static COMPRESSOR: LazyLock<Arc<SamplingCompressor<'static>>> =

/// The default Vortex file layout strategy.
#[derive(Clone)]
pub struct VortexLayoutStrategy {
/// The minimum size of a block in bytes. The block will be cut at the next multiple
/// of the `block_len` larger than this size.
pub minimum_block_size: usize,
/// The divisor for the number of rows in a block.
pub block_len_multiple: usize,
}

impl Default for VortexLayoutStrategy {
fn default() -> Self {
Self {
minimum_block_size: 8 * (1 << 20), // 8MB
block_len_multiple: 8 * 1024, // 8192 rows
}
}
}
pub struct VortexLayoutStrategy;

impl LayoutStrategy for VortexLayoutStrategy {
fn new_writer(&self, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
Expand All @@ -47,134 +32,55 @@ impl LayoutStrategy for VortexLayoutStrategy {
.map(|w| w.boxed());
}

// Then we re-chunk each column per our strategy...
Ok(ColumnChunker::new(
dtype.clone(),
// ...compress each chunk using a sampling compressor...
SamplingCompressorWriter {
compressor: COMPRESSOR.clone(),
compress_like: None,
child: ChunkedLayoutWriter::new(
dtype,
ChunkedLayoutOptions {
// ...and write each chunk as a flat layout.
chunk_strategy: Arc::new(FlatLayoutOptions::default()),
..Default::default()
},
)
.boxed(),
}
// Otherwise, we finish with compressing the chunks.
let writer: Box<dyn LayoutWriter> = SamplingCompressorWriter {
compressor: COMPRESSOR.clone(),
compress_like: None,
child: ChunkedLayoutWriter::new(
dtype,
ChunkedLayoutOptions {
// ...and write each chunk as a flat layout.
chunk_strategy: Arc::new(FlatLayoutOptions::default()),
},
)
.boxed(),
self.clone(),
)
.boxed())
}
}

/// Each column is chunked into multiples of 8096 values, of at least 1MB in uncompressed size.
struct ColumnChunker {
dtype: DType,
chunks: VecDeque<Array>,
row_count: usize,
nbytes: usize,
writer: Box<dyn LayoutWriter>,
options: VortexLayoutStrategy,
}

impl ColumnChunker {
pub fn new(dtype: DType, writer: Box<dyn LayoutWriter>, options: VortexLayoutStrategy) -> Self {
Self {
dtype,
chunks: VecDeque::new(),
row_count: 0,
nbytes: 0,
writer,
options,
}
}

fn flush(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<()> {
if self.nbytes >= self.options.minimum_block_size {
let nblocks = self.row_count / self.options.block_len_multiple;

// If we don't have a full block, then continue anyway.
if nblocks == 0 {
// TODO(ngates): if we exceed a maximum block size, regardless of row count we should
// flush the chunk. This can happen for columns with very large cells.
return Ok(());
}

if nblocks > 1 {
// TODO(ngates): if we have _too_ many blocks, then we might look into slicing
// the chunks to be smaller blocks.
}

let mut chunks = Vec::with_capacity(self.chunks.len());
let mut remaining = nblocks * self.options.block_len_multiple;

while remaining > 0 {
let chunk = self.chunks.pop_front().vortex_expect("chunk is missing");
self.row_count -= chunk.len();
self.nbytes -= chunk.nbytes();

let len = chunk.len();

if len > remaining {
let left = slice(&chunk, 0, remaining)?;
let right = slice(&chunk, remaining, len)?;
self.row_count += right.len();
self.nbytes += right.nbytes();
self.chunks.push_front(right);

chunks.push(left);
remaining = 0;
} else {
chunks.push(chunk);
remaining -= len;
}
}

// Combine the chunks to and flush them to the layout.
assert!(!chunks.is_empty());
let chunk = ChunkedArray::try_new_unchecked(chunks, self.dtype.clone())
.into_canonical()?
.into_array();

self.writer.push_chunk(segments, chunk)?;
}
.boxed();

Ok(())
}
}

impl LayoutWriter for ColumnChunker {
fn push_chunk(&mut self, segments: &mut dyn SegmentWriter, chunk: Array) -> VortexResult<()> {
// We make sure the chunks are canonical so our nbytes measurement is accurate.
let chunk = chunk.into_canonical()?.into_array();

// Split chunks into 8192 blocks to make sure we don't over-size them.
let mut offset = 0;
while offset < chunk.len() {
let end = (offset + self.options.block_len_multiple).min(chunk.len());
let c = slice(&chunk, offset, end)?;
self.row_count += c.len();
self.nbytes += c.nbytes();
self.chunks.push_back(c);
offset = end;

self.flush(segments)?;
}
// Prior to compression, re-partition into size-based chunks.
let writer = RepartitionWriter::new(
dtype.clone(),
writer,
RepartitionWriterOptions {
block_size_minimum: 8 * (1 << 20), // 1 MB
block_len_multiple: 8192, // 8K rows
},
)
.boxed();

Ok(())
}
// Prior to repartitioning, we record statistics
let writer = RepartitionWriter::new(
dtype.clone(),
StatsLayoutWriter::try_new(
dtype,
writer,
Arc::new(FlatLayout),
StatsLayoutOptions {
block_size: 8192,
stats: PRUNING_STATS.into(),
},
)?
.boxed(),
RepartitionWriterOptions {
// No minimum block size in bytes
block_size_minimum: 0,
// Always repartition into 8K row blocks
block_len_multiple: 8192,
},
)
.boxed();

fn finish(&mut self, segments: &mut dyn SegmentWriter) -> VortexResult<Layout> {
let chunk =
ChunkedArray::try_new_unchecked(self.chunks.drain(..).collect(), self.dtype.clone())
.into_canonical()?
.into_array();
self.writer.push_chunk(segments, chunk)?;
self.writer.finish(segments)
Ok(writer)
}
}

Expand Down
6 changes: 3 additions & 3 deletions vortex-file/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use vortex_array::stream::ArrayStream;
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};
use vortex_flatbuffers::{FlatBuffer, FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt};
use vortex_io::VortexWrite;
use vortex_layout::stats::StatsLayoutWriter;
use vortex_layout::stats::FileStatsLayoutWriter;
use vortex_layout::{LayoutStrategy, LayoutWriter};

use crate::footer::{FileLayout, Postscript, Segment};
Expand All @@ -19,7 +19,7 @@ pub struct VortexWriteOptions {
impl Default for VortexWriteOptions {
fn default() -> Self {
Self {
strategy: Box::new(VortexLayoutStrategy::default()),
strategy: Box::new(VortexLayoutStrategy),
}
}
}
Expand All @@ -40,7 +40,7 @@ impl VortexWriteOptions {
mut stream: S,
) -> VortexResult<W> {
// Set up the root layout
let mut layout_writer = StatsLayoutWriter::new(
let mut layout_writer = FileStatsLayoutWriter::new(
self.strategy.new_writer(stream.dtype())?,
stream.dtype(),
PRUNING_STATS.into(),
Expand Down
2 changes: 2 additions & 0 deletions vortex-layout/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use vortex_array::aliases::hash_map::HashMap;

use crate::layouts::chunked::ChunkedLayout;
use crate::layouts::flat::FlatLayout;
use crate::layouts::stats::StatsLayout;
use crate::layouts::struct_::StructLayout;
use crate::vtable::LayoutVTableRef;
use crate::LayoutId;
Expand Down Expand Up @@ -41,6 +42,7 @@ impl Default for LayoutContext {
LayoutVTableRef::from_static(&ChunkedLayout),
LayoutVTableRef::from_static(&FlatLayout),
LayoutVTableRef::from_static(&StructLayout),
LayoutVTableRef::from_static(&StatsLayout),
]
.into_iter()
.map(|l| (l.id(), l))
Expand Down
6 changes: 5 additions & 1 deletion vortex-layout/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ impl Layout {
Inner::Owned(o) => {
let child = o.children[i].clone();
if child.dtype() != &dtype {
vortex_bail!("child dtype mismatch");
vortex_bail!(
"Child has dtype {}, but was requested with {}",
child.dtype(),
dtype
);
}
Ok(child)
}
Expand Down
Loading
Loading