Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit cd740a3

Browse files
committedJan 7, 2025·
Add the path for scheduling variable-length full-zip data
1 parent fc74654 commit cd740a3

File tree

12 files changed

+801
-196
lines changed

12 files changed

+801
-196
lines changed
 

‎protos/encodings.proto

+49-15
Original file line numberDiff line numberDiff line change
@@ -230,15 +230,8 @@ message Binary {
230230
uint64 null_adjustment = 3;
231231
}
232232

233-
message BinaryMiniBlock {
234-
}
235-
236-
message BinaryBlock {
237-
}
238-
239-
message FsstMiniBlock {
240-
ArrayEncoding BinaryMiniBlock = 1;
241-
bytes symbol_table = 2;
233+
message Variable {
234+
uint32 bits_per_offset = 1;
242235
}
243236

244237
message Fsst {
@@ -285,10 +278,8 @@ message ArrayEncoding {
285278
BitpackedForNonNeg bitpacked_for_non_neg = 12;
286279
Constant constant = 13;
287280
Bitpack2 bitpack2 = 14;
288-
BinaryMiniBlock binary_mini_block = 15;
289-
FsstMiniBlock fsst_mini_block = 16;
290-
BinaryBlock binary_block = 17;
291-
PackedStructFixedWidthMiniBlock packed_struct_fixed_width_mini_block = 18;
281+
Variable variable = 15;
282+
PackedStructFixedWidthMiniBlock packed_struct_fixed_width_mini_block = 16;
292283
}
293284
}
294285

@@ -316,6 +307,34 @@ message ColumnEncoding {
316307
}
317308
}
318309

310+
// # Standardized Interpretation of Counting Terms
311+
//
312+
// When working with 2.1 encodings we have a number of different "counting terms" and it can be
313+
// difficult to understand what we mean when we are talking about a "number of values". Here is
314+
// a standard interpretation of these terms:
315+
//
316+
// TODO: This is a newly added standardization and hasn't yet been applied to all code.
317+
//
318+
// To understand these definitions consider a data type FIXED_SIZE_LIST<LIST<INT32>>.
319+
//
320+
// A "value" is an abstract term when we aren't being specific.
321+
//
322+
// - num_rows: This is the highest level counting term. A single row includes everything in the
323+
// fixed size list. This is what the user asks for when they asks for a range of rows.
324+
// - num_elements: The number of elements is the number of rows multiplied by the dimension of any
325+
// fixed size list wrappers. This is what you get when you flatten the FSL layer and
326+
// is the starting point for structural encoding. Note that an element can be a list
327+
// value or a single primitive value.
328+
// - num_items: The number of items is the number of values in the repetition and definition vectors
329+
// after everything has been flattened.
330+
// - num_visible_items: The number of visible items is the number of items after invisible items
331+
// have been removed. Invisible items are rep/def levels that don't correspond to an
332+
// actual value.
333+
//
334+
// Note that we haven't exactly defined LIST<FIXED_SIZE_LIST<..>> yet. Both FIXED_SIZE_LIST<LIST<..>>
335+
// and LIST<FIXED_SIZE_LIST<..>> haven't been fully implemented and tested.
336+
337+
/// Describes the meaning of each repdef layer in a mini-block layout
319338
enum RepDefLayer {
320339
// Should never be used, included for debugging purporses and general protobuf best practice
321340
REPDEF_UNSPECIFIED = 0;
@@ -375,10 +394,25 @@ message FullZipLayout {
375394
uint32 bits_rep = 1;
376395
// The number of bits of definition info (0 if there is no definition)
377396
uint32 bits_def = 2;
397+
// The number of bits of value info
398+
//
399+
// Note: we use bits here (and not bytes) for consistency with other encodings. However, in practice,
400+
// there is never a reason to use a bits per value that is not a multiple of 8. The complexity is not
401+
// worth the small savings in space since this encoding is typically used with large values already.
402+
oneof details {
403+
// If this is a fixed width block then we need to have a fixed number of bits per value
404+
uint32 bits_per_value = 3;
405+
// If this is a variable width block then we need to have a fixed number of bits per offset
406+
uint32 bits_per_offset = 4;
407+
}
408+
// The number of items in the page
409+
uint32 num_items = 5;
410+
// The number of visible items in the page
411+
uint32 num_visible_items = 6;
378412
// Description of the compression of values
379-
ArrayEncoding value_compression = 3;
413+
ArrayEncoding value_compression = 7;
380414
// The meaning of each repdef layer, used to interpret repdef buffers correctly
381-
repeated RepDefLayer layers = 4;
415+
repeated RepDefLayer layers = 8;
382416
}
383417

384418
/// A layout used for pages where all values are null

‎rust/lance-encoding/src/data.rs

+10
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,16 @@ impl VariableWidthBlock {
631631
})
632632
}
633633

634+
pub fn offsets_as_block(&mut self) -> DataBlock {
635+
let offsets = self.offsets.borrow_and_clone();
636+
DataBlock::FixedWidth(FixedWidthDataBlock {
637+
data: offsets,
638+
bits_per_value: self.bits_per_offset as u64,
639+
num_values: self.num_values + 1,
640+
block_info: BlockInfo::new(),
641+
})
642+
}
643+
634644
pub fn data_size(&self) -> u64 {
635645
(self.data.len() + self.offsets.len()) as u64
636646
}

‎rust/lance-encoding/src/decoder.rs

+37-14
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ use lance_core::{Error, Result};
235235
use tracing::instrument;
236236

237237
use crate::buffer::LanceBuffer;
238-
use crate::data::DataBlock;
238+
use crate::data::{DataBlock, FixedWidthDataBlock, VariableWidthBlock};
239239
use crate::encoder::{values_column_encoding, EncodedBatch};
240240
use crate::encodings::logical::binary::BinaryFieldScheduler;
241241
use crate::encodings::logical::blob::BlobFieldScheduler;
@@ -248,7 +248,9 @@ use crate::encodings::logical::primitive::{
248248
use crate::encodings::logical::r#struct::{
249249
SimpleStructDecoder, SimpleStructScheduler, StructuralStructDecoder, StructuralStructScheduler,
250250
};
251-
use crate::encodings::physical::binary::{BinaryBlockDecompressor, BinaryMiniBlockDecompressor};
251+
use crate::encodings::physical::binary::{
252+
BinaryBlockDecompressor, BinaryMiniBlockDecompressor, VariableDecoder,
253+
};
252254
use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
253255
use crate::encodings::physical::fsst::FsstMiniBlockDecompressor;
254256
use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockDecompressor;
@@ -459,17 +461,20 @@ pub trait MiniBlockDecompressor: std::fmt::Debug + Send + Sync {
459461
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
460462
}
461463

462-
pub trait PerValueDecompressor: std::fmt::Debug + Send + Sync {
464+
pub trait FixedPerValueDecompressor: std::fmt::Debug + Send + Sync {
463465
/// Decompress one or more values
464-
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock>;
466+
fn decompress(&self, data: FixedWidthDataBlock) -> Result<DataBlock>;
465467
/// The number of bits in each value
466468
///
467-
/// Returns 0 if the data type is variable-width
468-
///
469469
/// Currently (and probably long term) this must be a multiple of 8
470470
fn bits_per_value(&self) -> u64;
471471
}
472472

473+
pub trait VariablePerValueDecompressor: std::fmt::Debug + Send + Sync {
474+
/// Decompress one or more values
475+
fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock>;
476+
}
477+
473478
pub trait BlockDecompressor: std::fmt::Debug + Send + Sync {
474479
fn decompress(&self, data: LanceBuffer) -> Result<DataBlock>;
475480
}
@@ -480,10 +485,15 @@ pub trait DecompressorStrategy: std::fmt::Debug + Send + Sync {
480485
description: &pb::ArrayEncoding,
481486
) -> Result<Box<dyn MiniBlockDecompressor>>;
482487

483-
fn create_per_value_decompressor(
488+
fn create_fixed_per_value_decompressor(
489+
&self,
490+
description: &pb::ArrayEncoding,
491+
) -> Result<Box<dyn FixedPerValueDecompressor>>;
492+
493+
fn create_variable_per_value_decompressor(
484494
&self,
485495
description: &pb::ArrayEncoding,
486-
) -> Result<Box<dyn PerValueDecompressor>>;
496+
) -> Result<Box<dyn VariablePerValueDecompressor>>;
487497

488498
fn create_block_decompressor(
489499
&self,
@@ -506,10 +516,10 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
506516
pb::array_encoding::ArrayEncoding::Bitpack2(description) => {
507517
Ok(Box::new(BitpackMiniBlockDecompressor::new(description)))
508518
}
509-
pb::array_encoding::ArrayEncoding::BinaryMiniBlock(_) => {
519+
pb::array_encoding::ArrayEncoding::Variable(_) => {
510520
Ok(Box::new(BinaryMiniBlockDecompressor::default()))
511521
}
512-
pb::array_encoding::ArrayEncoding::FsstMiniBlock(description) => {
522+
pb::array_encoding::ArrayEncoding::Fsst(description) => {
513523
Ok(Box::new(FsstMiniBlockDecompressor::new(description)))
514524
}
515525
pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
@@ -521,15 +531,28 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
521531
}
522532
}
523533

524-
fn create_per_value_decompressor(
534+
fn create_fixed_per_value_decompressor(
525535
&self,
526536
description: &pb::ArrayEncoding,
527-
) -> Result<Box<dyn PerValueDecompressor>> {
537+
) -> Result<Box<dyn FixedPerValueDecompressor>> {
528538
match description.array_encoding.as_ref().unwrap() {
529539
pb::array_encoding::ArrayEncoding::Flat(flat) => {
530540
Ok(Box::new(ValueDecompressor::new(flat)))
531541
}
532-
_ => todo!(),
542+
_ => todo!("fixed-per-value decompressor for {:?}", description),
543+
}
544+
}
545+
546+
fn create_variable_per_value_decompressor(
547+
&self,
548+
description: &pb::ArrayEncoding,
549+
) -> Result<Box<dyn VariablePerValueDecompressor>> {
550+
match description.array_encoding.as_ref().unwrap() {
551+
&pb::array_encoding::ArrayEncoding::Variable(variable) => {
552+
assert!(variable.bits_per_offset < u8::MAX as u32);
553+
Ok(Box::new(VariableDecoder::default()))
554+
}
555+
_ => todo!("variable-per-value decompressor for {:?}", description),
533556
}
534557
}
535558

@@ -548,7 +571,7 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
548571
constant.num_values,
549572
)))
550573
}
551-
pb::array_encoding::ArrayEncoding::BinaryBlock(_) => {
574+
pb::array_encoding::ArrayEncoding::Variable(_) => {
552575
Ok(Box::new(BinaryBlockDecompressor::default()))
553576
}
554577
_ => todo!(),

‎rust/lance-encoding/src/encoder.rs

+34-11
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,16 @@ use crate::encodings::logical::list::ListStructuralEncoder;
2424
use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
2525
use crate::encodings::logical::r#struct::StructFieldEncoder;
2626
use crate::encodings::logical::r#struct::StructStructuralEncoder;
27-
use crate::encodings::physical::binary::{BinaryBlockEncoder, BinaryMiniBlockEncoder};
27+
use crate::encodings::physical::binary::{BinaryMiniBlockEncoder, VariableEncoder};
2828
use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder;
2929
use crate::encodings::physical::bitpack_fastlanes::{
3030
compute_compressed_bit_width_for_non_neg, BitpackMiniBlockEncoder,
3131
};
3232
use crate::encodings::physical::block_compress::{CompressionConfig, CompressionScheme};
3333
use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
34-
use crate::encodings::physical::fsst::{FsstArrayEncoder, FsstMiniBlockEncoder};
34+
use crate::encodings::physical::fsst::{
35+
FsstArrayEncoder, FsstMiniBlockEncoder, FsstPerValueEncoder,
36+
};
3537
use crate::encodings::physical::packed_struct::PackedStructEncoder;
3638
use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockEncoder;
3739
use crate::format::ProtobufUtils;
@@ -217,11 +219,21 @@ pub trait MiniBlockCompressor: std::fmt::Debug + Send + Sync {
217219
/// A single buffer of value data and a buffer of offsets
218220
///
219221
/// TODO: In the future we may allow metadata buffers
222+
#[derive(Debug)]
220223
pub enum PerValueDataBlock {
221224
Fixed(FixedWidthDataBlock),
222225
Variable(VariableWidthBlock),
223226
}
224227

228+
impl PerValueDataBlock {
229+
pub fn data_size(&self) -> u64 {
230+
match self {
231+
Self::Fixed(fixed) => fixed.data_size(),
232+
Self::Variable(variable) => variable.data_size(),
233+
}
234+
}
235+
}
236+
225237
/// Trait for compression algorithms that are suitable for use in the zipped structural encoding
226238
///
227239
/// This compression must return either a FixedWidthDataBlock or a VariableWidthBlock. This is because
@@ -884,8 +896,23 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
884896
let encoder = Box::new(ValueEncoder::default());
885897
Ok(encoder)
886898
}
887-
DataBlock::VariableWidth(_variable_width) => {
888-
todo!()
899+
DataBlock::VariableWidth(variable_width) => {
900+
if variable_width.bits_per_offset == 32 {
901+
let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
902+
let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
903+
904+
let variable_compression = Box::new(VariableEncoder::default());
905+
906+
if max_len >= FSST_LEAST_INPUT_MAX_LENGTH
907+
&& data_size >= FSST_LEAST_INPUT_SIZE as u64
908+
{
909+
Ok(Box::new(FsstPerValueEncoder::new(variable_compression)))
910+
} else {
911+
Ok(variable_compression)
912+
}
913+
} else {
914+
todo!("Implement MiniBlockCompression for VariableWidth DataBlock with 64 bits offsets.")
915+
}
889916
}
890917
_ => unreachable!(),
891918
}
@@ -905,13 +932,9 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
905932
Ok((encoder, encoding))
906933
}
907934
DataBlock::VariableWidth(variable_width) => {
908-
if variable_width.bits_per_offset == 32 {
909-
let encoder = Box::new(BinaryBlockEncoder::default());
910-
let encoding = ProtobufUtils::binary_block();
911-
Ok((encoder, encoding))
912-
} else {
913-
todo!("Implement BlockCompression for VariableWidth DataBlock with 64 bits offsets.")
914-
}
935+
let encoder = Box::new(VariableEncoder::default());
936+
let encoding = ProtobufUtils::variable(variable_width.bits_per_offset);
937+
Ok((encoder, encoding))
915938
}
916939
_ => unreachable!(),
917940
}

‎rust/lance-encoding/src/encodings/logical/list.rs

+55-4
Original file line numberDiff line numberDiff line change
@@ -1583,8 +1583,43 @@ mod tests {
15831583
.await;
15841584
}
15851585

1586+
#[rstest]
15861587
#[test_log::test(tokio::test)]
1587-
async fn test_simple_sliced_list() {
1588+
async fn test_simple_string_list(
1589+
#[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1590+
structural_encoding: &str,
1591+
) {
1592+
let items_builder = StringBuilder::new();
1593+
let mut list_builder = ListBuilder::new(items_builder);
1594+
list_builder.append_value([Some("a"), Some("bc"), Some("def")]);
1595+
list_builder.append_value([Some("gh"), None]);
1596+
list_builder.append_null();
1597+
list_builder.append_value([Some("ijk"), Some("lmnop"), Some("qrs")]);
1598+
let list_array = list_builder.finish();
1599+
1600+
let mut field_metadata = HashMap::new();
1601+
field_metadata.insert(
1602+
STRUCTURAL_ENCODING_META_KEY.to_string(),
1603+
structural_encoding.into(),
1604+
);
1605+
1606+
let test_cases = TestCases::default()
1607+
.with_range(0..2)
1608+
.with_range(0..3)
1609+
.with_range(1..3)
1610+
.with_indices(vec![1, 3])
1611+
.with_indices(vec![2])
1612+
.with_file_version(LanceFileVersion::V2_1);
1613+
check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
1614+
.await;
1615+
}
1616+
1617+
#[rstest]
1618+
#[test_log::test(tokio::test)]
1619+
async fn test_simple_sliced_list(
1620+
#[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1621+
structural_encoding: &str,
1622+
) {
15881623
let items_builder = Int32Builder::new();
15891624
let mut list_builder = ListBuilder::new(items_builder);
15901625
list_builder.append_value([Some(1), Some(2), Some(3)]);
@@ -1595,18 +1630,28 @@ mod tests {
15951630

15961631
let list_array = list_array.slice(1, 2);
15971632

1633+
let mut field_metadata = HashMap::new();
1634+
field_metadata.insert(
1635+
STRUCTURAL_ENCODING_META_KEY.to_string(),
1636+
structural_encoding.into(),
1637+
);
1638+
15981639
let test_cases = TestCases::default()
15991640
.with_range(0..2)
16001641
.with_range(1..2)
16011642
.with_indices(vec![0])
16021643
.with_indices(vec![1])
16031644
.with_file_version(LanceFileVersion::V2_1);
1604-
check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, HashMap::new())
1645+
check_round_trip_encoding_of_data(vec![Arc::new(list_array)], &test_cases, field_metadata)
16051646
.await;
16061647
}
16071648

1649+
#[rstest]
16081650
#[test_log::test(tokio::test)]
1609-
async fn test_list_with_garbage_nulls() {
1651+
async fn test_list_with_garbage_nulls(
1652+
#[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)]
1653+
structural_encoding: &str,
1654+
) {
16101655
// In Arrow, list nulls are allowed to be non-empty, with masked garbage values
16111656
// Here we make a list with a null row in the middle with 3 garbage values
16121657
let items = UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
@@ -1620,13 +1665,19 @@ mod tests {
16201665
Some(list_validity),
16211666
);
16221667

1668+
let mut field_metadata = HashMap::new();
1669+
field_metadata.insert(
1670+
STRUCTURAL_ENCODING_META_KEY.to_string(),
1671+
structural_encoding.into(),
1672+
);
1673+
16231674
let test_cases = TestCases::default()
16241675
.with_range(0..3)
16251676
.with_range(1..2)
16261677
.with_indices(vec![1])
16271678
.with_indices(vec![2])
16281679
.with_file_version(LanceFileVersion::V2_1);
1629-
check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, HashMap::new())
1680+
check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, field_metadata)
16301681
.await;
16311682
}
16321683

‎rust/lance-encoding/src/encodings/logical/primitive.rs

+432-67
Large diffs are not rendered by default.

‎rust/lance-encoding/src/encodings/physical.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -285,9 +285,7 @@ pub fn decoder_from_array_encoding(
285285
// 2.1 only
286286
pb::array_encoding::ArrayEncoding::Constant(_) => unreachable!(),
287287
pb::array_encoding::ArrayEncoding::Bitpack2(_) => unreachable!(),
288-
pb::array_encoding::ArrayEncoding::BinaryMiniBlock(_) => unreachable!(),
289-
pb::array_encoding::ArrayEncoding::FsstMiniBlock(_) => unreachable!(),
290-
pb::array_encoding::ArrayEncoding::BinaryBlock(_) => unreachable!(),
288+
pb::array_encoding::ArrayEncoding::Variable(_) => unreachable!(),
291289
pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(_) => unreachable!(),
292290
}
293291
}

‎rust/lance-encoding/src/encodings/physical/binary.rs

+34-10
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,20 @@ use snafu::{location, Location};
1616

1717
use futures::{future::BoxFuture, FutureExt};
1818

19-
use crate::decoder::{BlockDecompressor, LogicalPageDecoder, MiniBlockDecompressor};
20-
use crate::encoder::{BlockCompressor, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor};
19+
use crate::decoder::{
20+
BlockDecompressor, LogicalPageDecoder, MiniBlockDecompressor, VariablePerValueDecompressor,
21+
};
22+
use crate::encoder::{
23+
BlockCompressor, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, PerValueCompressor,
24+
PerValueDataBlock,
25+
};
2126
use crate::encodings::logical::primitive::PrimitiveFieldDecoder;
2227

2328
use crate::buffer::LanceBuffer;
2429
use crate::data::{
2530
BlockInfo, DataBlock, FixedWidthDataBlock, NullableDataBlock, VariableWidthBlock,
2631
};
27-
use crate::format::ProtobufUtils;
32+
use crate::format::{pb, ProtobufUtils};
2833
use crate::{
2934
decoder::{PageScheduler, PrimitivePageDecoder},
3035
encoder::{ArrayEncoder, EncodedArray},
@@ -687,16 +692,13 @@ impl BinaryMiniBlockEncoder {
687692
chunks,
688693
num_values: data.num_values,
689694
},
690-
ProtobufUtils::binary_miniblock(),
695+
ProtobufUtils::variable(/*bits_per_value=*/ 32),
691696
)
692697
}
693698
}
694699

695700
impl MiniBlockCompressor for BinaryMiniBlockEncoder {
696-
fn compress(
697-
&self,
698-
data: DataBlock,
699-
) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
701+
fn compress(&self, data: DataBlock) -> Result<(MiniBlockCompressed, pb::ArrayEncoding)> {
700702
match data {
701703
DataBlock::VariableWidth(variable_width) => Ok(self.chunk_data(variable_width)),
702704
_ => Err(Error::InvalidInput {
@@ -740,9 +742,11 @@ impl MiniBlockDecompressor for BinaryMiniBlockDecompressor {
740742
}
741743
}
742744

745+
/// Most basic encoding for variable-width data which does no compression at all
743746
#[derive(Debug, Default)]
744-
pub struct BinaryBlockEncoder {}
745-
impl BlockCompressor for BinaryBlockEncoder {
747+
pub struct VariableEncoder {}
748+
749+
impl BlockCompressor for VariableEncoder {
746750
fn compress(&self, data: DataBlock) -> Result<LanceBuffer> {
747751
let num_values: u32 = data
748752
.num_values()
@@ -785,6 +789,26 @@ impl BlockCompressor for BinaryBlockEncoder {
785789
}
786790
}
787791

792+
impl PerValueCompressor for VariableEncoder {
793+
fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
794+
let DataBlock::VariableWidth(variable) = data else {
795+
panic!("BinaryPerValueCompressor can only work with Variable Width DataBlock.");
796+
};
797+
798+
let encoding = ProtobufUtils::variable(variable.bits_per_offset);
799+
Ok((PerValueDataBlock::Variable(variable), encoding))
800+
}
801+
}
802+
803+
#[derive(Debug, Default)]
804+
pub struct VariableDecoder {}
805+
806+
impl VariablePerValueDecompressor for VariableDecoder {
807+
fn decompress(&self, data: VariableWidthBlock) -> Result<DataBlock> {
808+
Ok(DataBlock::VariableWidth(data))
809+
}
810+
}
811+
788812
#[derive(Debug, Default)]
789813
pub struct BinaryBlockDecompressor {}
790814

‎rust/lance-encoding/src/encodings/physical/fsst.rs

+74-26
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@ use crate::{
1414
buffer::LanceBuffer,
1515
data::{BlockInfo, DataBlock, NullableDataBlock, VariableWidthBlock},
1616
decoder::{MiniBlockDecompressor, PageScheduler, PrimitivePageDecoder},
17-
encoder::{ArrayEncoder, EncodedArray},
18-
encoder::{MiniBlockCompressed, MiniBlockCompressor},
19-
format::pb::{self},
20-
format::ProtobufUtils,
17+
encoder::{
18+
ArrayEncoder, EncodedArray, MiniBlockCompressed, MiniBlockCompressor, PerValueCompressor,
19+
PerValueDataBlock,
20+
},
21+
format::{
22+
pb::{self},
23+
ProtobufUtils,
24+
},
2125
EncodingsIo,
2226
};
2327

@@ -202,14 +206,13 @@ impl ArrayEncoder for FsstArrayEncoder {
202206
}
203207
}
204208

205-
#[derive(Debug, Default)]
206-
pub struct FsstMiniBlockEncoder {}
209+
struct FsstCompressed {
210+
data: VariableWidthBlock,
211+
symbol_table: Vec<u8>,
212+
}
207213

208-
impl MiniBlockCompressor for FsstMiniBlockEncoder {
209-
fn compress(
210-
&self,
211-
data: DataBlock,
212-
) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
214+
impl FsstCompressed {
215+
fn fsst_compress(data: DataBlock) -> Result<Self> {
213216
match data {
214217
DataBlock::VariableWidth(mut variable_width) => {
215218
let offsets = variable_width.offsets.borrow_to_typed_slice::<i32>();
@@ -231,29 +234,22 @@ impl MiniBlockCompressor for FsstMiniBlockEncoder {
231234
)?;
232235

233236
// construct `DataBlock` for BinaryMiniBlockEncoder, we may want some `DataBlock` construct methods later
234-
let data_block = DataBlock::VariableWidth(VariableWidthBlock {
237+
let compressed = VariableWidthBlock {
235238
data: LanceBuffer::reinterpret_vec(dest_values),
236239
bits_per_offset: 32,
237240
offsets: LanceBuffer::reinterpret_vec(dest_offsets),
238241
num_values: variable_width.num_values,
239242
block_info: BlockInfo::new(),
240-
});
241-
242-
// compress the fsst compressed data using `BinaryMiniBlockEncoder`
243-
let binary_compressor =
244-
Box::new(BinaryMiniBlockEncoder::default()) as Box<dyn MiniBlockCompressor>;
243+
};
245244

246-
let (binary_miniblock_compressed, binary_array_encoding) =
247-
binary_compressor.compress(data_block)?;
248-
249-
Ok((
250-
binary_miniblock_compressed,
251-
ProtobufUtils::fsst_mini_block(binary_array_encoding, symbol_table),
252-
))
245+
Ok(Self {
246+
data: compressed,
247+
symbol_table,
248+
})
253249
}
254250
_ => Err(Error::InvalidInput {
255251
source: format!(
256-
"Cannot compress a data block of type {} with BinaryMiniBlockEncoder",
252+
"Cannot compress a data block of type {} with FsstEncoder",
257253
data.name()
258254
)
259255
.into(),
@@ -263,13 +259,65 @@ impl MiniBlockCompressor for FsstMiniBlockEncoder {
263259
}
264260
}
265261

262+
#[derive(Debug, Default)]
263+
pub struct FsstMiniBlockEncoder {}
264+
265+
impl MiniBlockCompressor for FsstMiniBlockEncoder {
266+
fn compress(
267+
&self,
268+
data: DataBlock,
269+
) -> Result<(MiniBlockCompressed, crate::format::pb::ArrayEncoding)> {
270+
let compressed = FsstCompressed::fsst_compress(data)?;
271+
272+
let data_block = DataBlock::VariableWidth(compressed.data);
273+
274+
// compress the fsst compressed data using `BinaryMiniBlockEncoder`
275+
let binary_compressor =
276+
Box::new(BinaryMiniBlockEncoder::default()) as Box<dyn MiniBlockCompressor>;
277+
278+
let (binary_miniblock_compressed, binary_array_encoding) =
279+
binary_compressor.compress(data_block)?;
280+
281+
Ok((
282+
binary_miniblock_compressed,
283+
ProtobufUtils::fsst(binary_array_encoding, compressed.symbol_table),
284+
))
285+
}
286+
}
287+
288+
#[derive(Debug)]
289+
pub struct FsstPerValueEncoder {
290+
inner: Box<dyn PerValueCompressor>,
291+
}
292+
293+
impl FsstPerValueEncoder {
294+
pub fn new(inner: Box<dyn PerValueCompressor>) -> Self {
295+
Self { inner }
296+
}
297+
}
298+
299+
impl PerValueCompressor for FsstPerValueEncoder {
300+
fn compress(&self, data: DataBlock) -> Result<(PerValueDataBlock, pb::ArrayEncoding)> {
301+
let compressed = FsstCompressed::fsst_compress(data)?;
302+
303+
let data_block = DataBlock::VariableWidth(compressed.data);
304+
305+
let (binary_compressed, binary_array_encoding) = self.inner.compress(data_block)?;
306+
307+
Ok((
308+
binary_compressed,
309+
ProtobufUtils::fsst(binary_array_encoding, compressed.symbol_table),
310+
))
311+
}
312+
}
313+
266314
#[derive(Debug)]
267315
pub struct FsstMiniBlockDecompressor {
268316
symbol_table: Vec<u8>,
269317
}
270318

271319
impl FsstMiniBlockDecompressor {
272-
pub fn new(description: &pb::FsstMiniBlock) -> Self {
320+
pub fn new(description: &pb::Fsst) -> Self {
273321
Self {
274322
symbol_table: description.symbol_table.clone(),
275323
}

‎rust/lance-encoding/src/encodings/physical/value.rs

+16-21
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ use crate::buffer::LanceBuffer;
1414
use crate::data::{
1515
BlockInfo, ConstantDataBlock, DataBlock, FixedSizeListBlock, FixedWidthDataBlock,
1616
};
17-
use crate::decoder::PerValueDecompressor;
18-
use crate::decoder::{BlockDecompressor, MiniBlockDecompressor};
17+
use crate::decoder::{BlockDecompressor, FixedPerValueDecompressor, MiniBlockDecompressor};
1918
use crate::encoder::{
2019
BlockCompressor, MiniBlockChunk, MiniBlockCompressed, MiniBlockCompressor, PerValueCompressor,
2120
PerValueDataBlock, MAX_MINIBLOCK_BYTES, MAX_MINIBLOCK_VALUES,
@@ -413,37 +412,33 @@ impl ValueDecompressor {
413412
bytes_per_value: description.bits_per_value / 8,
414413
}
415414
}
416-
}
417415

418-
impl BlockDecompressor for ValueDecompressor {
419-
fn decompress(&self, data: LanceBuffer) -> Result<DataBlock> {
420-
let num_values = data.len() as u64 / self.bytes_per_value;
421-
assert_eq!(data.len() as u64 % self.bytes_per_value, 0);
422-
Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
416+
fn buffer_to_block(&self, data: LanceBuffer) -> DataBlock {
417+
DataBlock::FixedWidth(FixedWidthDataBlock {
423418
bits_per_value: self.bytes_per_value * 8,
419+
num_values: data.len() as u64 / self.bytes_per_value,
424420
data,
425-
num_values,
426421
block_info: BlockInfo::new(),
427-
}))
422+
})
423+
}
424+
}
425+
426+
impl BlockDecompressor for ValueDecompressor {
427+
fn decompress(&self, data: LanceBuffer) -> Result<DataBlock> {
428+
Ok(self.buffer_to_block(data))
428429
}
429430
}
430431

431432
impl MiniBlockDecompressor for ValueDecompressor {
432433
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
433-
debug_assert!(data.len() as u64 >= num_values * self.bytes_per_value);
434-
435-
Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
436-
data,
437-
bits_per_value: self.bytes_per_value * 8,
438-
num_values,
439-
block_info: BlockInfo::new(),
440-
}))
434+
assert_eq!(data.len() as u64, num_values * self.bytes_per_value);
435+
Ok(self.buffer_to_block(data))
441436
}
442437
}
443438

444-
impl PerValueDecompressor for ValueDecompressor {
445-
fn decompress(&self, data: LanceBuffer, num_values: u64) -> Result<DataBlock> {
446-
MiniBlockDecompressor::decompress(self, data, num_values)
439+
impl FixedPerValueDecompressor for ValueDecompressor {
440+
fn decompress(&self, data: FixedWidthDataBlock) -> Result<DataBlock> {
441+
Ok(DataBlock::FixedWidth(data))
447442
}
448443

449444
fn bits_per_value(&self) -> u64 {

‎rust/lance-encoding/src/format.rs

+58-25
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ pub mod pb {
1717
use pb::{
1818
array_encoding::ArrayEncoding as ArrayEncodingEnum,
1919
buffer::BufferType,
20+
full_zip_layout,
2021
nullable::{AllNull, NoNull, Nullability, SomeNull},
2122
page_layout::Layout,
22-
AllNullLayout, ArrayEncoding, Binary, BinaryBlock, BinaryMiniBlock, Bitpack2, Bitpacked,
23-
BitpackedForNonNeg, Dictionary, FixedSizeBinary, FixedSizeList, Flat, Fsst, FsstMiniBlock,
24-
MiniBlockLayout, Nullable, PackedStruct, PackedStructFixedWidthMiniBlock, PageLayout,
25-
RepDefLayer,
23+
AllNullLayout, ArrayEncoding, Binary, Bitpack2, Bitpacked, BitpackedForNonNeg, Dictionary,
24+
FixedSizeBinary, FixedSizeList, Flat, Fsst, MiniBlockLayout, Nullable, PackedStruct,
25+
PackedStructFixedWidthMiniBlock, PageLayout, RepDefLayer, Variable,
2626
};
2727

2828
use crate::{
@@ -145,25 +145,21 @@ impl ProtobufUtils {
145145
}
146146
}
147147

148-
pub fn binary_miniblock() -> ArrayEncoding {
148+
pub fn variable(bits_per_offset: u8) -> ArrayEncoding {
149149
ArrayEncoding {
150-
array_encoding: Some(ArrayEncodingEnum::BinaryMiniBlock(BinaryMiniBlock {})),
151-
}
152-
}
153-
154-
pub fn binary_block() -> ArrayEncoding {
155-
ArrayEncoding {
156-
array_encoding: Some(ArrayEncodingEnum::BinaryBlock(BinaryBlock {})),
150+
array_encoding: Some(ArrayEncodingEnum::Variable(Variable {
151+
bits_per_offset: bits_per_offset as u32,
152+
})),
157153
}
158154
}
159155

160156
// Construct a `FsstMiniBlock` ArrayEncoding, the inner `binary_mini_block` encoding is actually
161157
// not used and `FsstMiniBlockDecompressor` constructs a `binary_mini_block` in a `hard-coded` fashion.
162158
// This can be an optimization later.
163-
pub fn fsst_mini_block(data: ArrayEncoding, symbol_table: Vec<u8>) -> ArrayEncoding {
159+
pub fn fsst(data: ArrayEncoding, symbol_table: Vec<u8>) -> ArrayEncoding {
164160
ArrayEncoding {
165-
array_encoding: Some(ArrayEncodingEnum::FsstMiniBlock(Box::new(FsstMiniBlock {
166-
binary_mini_block: Some(Box::new(data)),
161+
array_encoding: Some(ArrayEncodingEnum::Fsst(Box::new(Fsst {
162+
binary: Some(Box::new(data)),
167163
symbol_table,
168164
}))),
169165
}
@@ -246,15 +242,6 @@ impl ProtobufUtils {
246242
}
247243
}
248244

249-
pub fn fsst(data: ArrayEncoding, symbol_table: Vec<u8>) -> ArrayEncoding {
250-
ArrayEncoding {
251-
array_encoding: Some(ArrayEncodingEnum::Fsst(Box::new(Fsst {
252-
binary: Some(Box::new(data)),
253-
symbol_table,
254-
}))),
255-
}
256-
}
257-
258245
fn def_inter_to_repdef_layer(def: DefinitionInterpretation) -> i32 {
259246
match def {
260247
DefinitionInterpretation::AllValidItem => RepDefLayer::RepdefAllValidItem as i32,
@@ -309,17 +296,23 @@ impl ProtobufUtils {
309296
}
310297
}
311298

312-
pub fn full_zip_layout(
299+
fn full_zip_layout(
313300
bits_rep: u8,
314301
bits_def: u8,
302+
details: full_zip_layout::Details,
315303
value_encoding: ArrayEncoding,
316304
def_meaning: &[DefinitionInterpretation],
305+
num_items: u32,
306+
num_visible_items: u32,
317307
) -> PageLayout {
318308
PageLayout {
319309
layout: Some(Layout::FullZipLayout(pb::FullZipLayout {
320310
bits_rep: bits_rep as u32,
321311
bits_def: bits_def as u32,
312+
details: Some(details),
322313
value_compression: Some(value_encoding),
314+
num_items,
315+
num_visible_items,
323316
layers: def_meaning
324317
.iter()
325318
.map(|&def| Self::def_inter_to_repdef_layer(def))
@@ -328,6 +321,46 @@ impl ProtobufUtils {
328321
}
329322
}
330323

324+
pub fn fixed_full_zip_layout(
325+
bits_rep: u8,
326+
bits_def: u8,
327+
bits_per_value: u32,
328+
value_encoding: ArrayEncoding,
329+
def_meaning: &[DefinitionInterpretation],
330+
num_items: u32,
331+
num_visible_items: u32,
332+
) -> PageLayout {
333+
Self::full_zip_layout(
334+
bits_rep,
335+
bits_def,
336+
full_zip_layout::Details::BitsPerValue(bits_per_value),
337+
value_encoding,
338+
def_meaning,
339+
num_items,
340+
num_visible_items,
341+
)
342+
}
343+
344+
pub fn variable_full_zip_layout(
345+
bits_rep: u8,
346+
bits_def: u8,
347+
bits_per_offset: u32,
348+
value_encoding: ArrayEncoding,
349+
def_meaning: &[DefinitionInterpretation],
350+
num_items: u32,
351+
num_visible_items: u32,
352+
) -> PageLayout {
353+
Self::full_zip_layout(
354+
bits_rep,
355+
bits_def,
356+
full_zip_layout::Details::BitsPerOffset(bits_per_offset),
357+
value_encoding,
358+
def_meaning,
359+
num_items,
360+
num_visible_items,
361+
)
362+
}
363+
331364
pub fn all_null_layout(def_meaning: &[DefinitionInterpretation]) -> PageLayout {
332365
PageLayout {
333366
layout: Some(Layout::AllNullLayout(AllNullLayout {

‎rust/lance-encoding/src/repdef.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1783,6 +1783,7 @@ pub enum ControlWordIterator<'a> {
17831783
}
17841784

17851785
/// Describes the properties of a control word
1786+
#[derive(Debug)]
17861787
pub struct ControlWordDesc {
17871788
pub is_new_row: bool,
17881789
pub is_visible: bool,

0 commit comments

Comments
 (0)
Please sign in to comment.