Skip to content

Commit

Permalink
fix: column aware row encoding: improve the implementation and add be…
Browse files Browse the repository at this point in the history
…nch (#17818)
  • Loading branch information
fuyufjh authored Jul 30, 2024
1 parent 2fb78f0 commit 8984ae2
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 26 deletions.
4 changes: 4 additions & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ harness = false
name = "bench_hash_key_encoding"
harness = false

[[bench]]
name = "bench_column_aware_row_encoding"
harness = false

[[bench]]
name = "bench_data_chunk_encoding"
harness = false
Expand Down
136 changes: 136 additions & 0 deletions src/common/benches/bench_column_aware_row_encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use rand::{Rng, SeedableRng};
use risingwave_common::catalog::ColumnId;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{DataType, Date, ScalarImpl};
use risingwave_common::util::value_encoding::column_aware_row_encoding::*;
use risingwave_common::util::value_encoding::*;

fn bench_column_aware_encoding_16_columns(c: &mut Criterion) {
let mut rng = rand::rngs::StdRng::seed_from_u64(42);

// The schema is inspired by the TPC-H lineitem table
let data_types = Arc::new([
DataType::Int64,
DataType::Int64,
DataType::Int64,
DataType::Int32,
DataType::Decimal,
DataType::Decimal,
DataType::Decimal,
DataType::Decimal,
DataType::Varchar,
DataType::Varchar,
DataType::Date,
DataType::Date,
DataType::Date,
DataType::Varchar,
DataType::Varchar,
DataType::Varchar,
]);
let row = OwnedRow::new(vec![
Some(ScalarImpl::Int64(rng.gen())),
Some(ScalarImpl::Int64(rng.gen())),
Some(ScalarImpl::Int64(rng.gen())),
Some(ScalarImpl::Int32(rng.gen())),
Some(ScalarImpl::Decimal("1.0".parse().unwrap())),
Some(ScalarImpl::Decimal("114.514".parse().unwrap())),
None,
Some(ScalarImpl::Decimal("0.08".parse().unwrap())),
Some(ScalarImpl::Utf8("A".into())),
Some(ScalarImpl::Utf8("B".into())),
Some(ScalarImpl::Date(Date::from_ymd_uncheck(2024, 7, 1))),
Some(ScalarImpl::Date(Date::from_ymd_uncheck(2024, 7, 2))),
Some(ScalarImpl::Date(Date::from_ymd_uncheck(2024, 7, 3))),
Some(ScalarImpl::Utf8("D".into())),
None,
Some(ScalarImpl::Utf8("No comments".into())),
]);

let column_ids = (1..=data_types.len())
.map(|i| ColumnId::from(i as i32))
.collect::<Vec<_>>();

c.bench_function("column_aware_row_encoding_16_columns_encode", |b| {
let serializer = Serializer::new(&column_ids[..]);
b.iter(|| {
black_box(serializer.serialize(&row));
});
});

let serializer = Serializer::new(&column_ids[..]);
let encoded = serializer.serialize(&row);

c.bench_function("column_aware_row_encoding_16_columns_decode", |b| {
let deserializer =
Deserializer::new(&column_ids[..], data_types.clone(), std::iter::empty());
b.iter(|| {
let result = deserializer.deserialize(&encoded).unwrap();
black_box(result);
});
});
}

fn bench_column_aware_encoding_4_columns(c: &mut Criterion) {
let mut rng = rand::rngs::StdRng::seed_from_u64(42);

// The schema is inspired by the TPC-H nation table
let data_types = Arc::new([
DataType::Int32,
DataType::Varchar,
DataType::Int32,
DataType::Varchar,
]);
let row = OwnedRow::new(vec![
Some(ScalarImpl::Int32(rng.gen())),
Some(ScalarImpl::Utf8("United States".into())),
Some(ScalarImpl::Int32(rng.gen())),
Some(ScalarImpl::Utf8("No comments".into())),
]);

let column_ids = (1..=data_types.len())
.map(|i| ColumnId::from(i as i32))
.collect::<Vec<_>>();

c.bench_function("column_aware_row_encoding_4_columns_encode", |b| {
let serializer = Serializer::new(&column_ids[..]);
b.iter(|| {
black_box(serializer.serialize(&row));
});
});

let serializer = Serializer::new(&column_ids[..]);
let encoded = serializer.serialize(&row);

c.bench_function("column_aware_row_encoding_4_columns_decode", |b| {
let deserializer =
Deserializer::new(&column_ids[..], data_types.clone(), std::iter::empty());
b.iter(|| {
let result = deserializer.deserialize(&encoded).unwrap();
black_box(result);
});
});
}

criterion_group!(
benches,
bench_column_aware_encoding_16_columns,
bench_column_aware_encoding_4_columns,
);
criterion_main!(benches);
44 changes: 18 additions & 26 deletions src/common/src/util/value_encoding/column_aware_row_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,15 @@
//! We have a `Serializer` and a `Deserializer` for each schema of `Row`, which can be reused
//! until schema changes
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::collections::HashSet;
use std::sync::Arc;

use ahash::HashMap;
use bitflags::bitflags;

use super::*;
use crate::catalog::ColumnId;

// deprecated design of have a Width to represent number of datum
// may be considered should `ColumnId` representation be optimized
// #[derive(Clone, Copy)]
// enum Width {
// Mid(u8),
// Large(u16),
// Extra(u32),
// }

bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Flag: u8 {
Expand Down Expand Up @@ -176,9 +168,11 @@ impl ValueRowSerializer for Serializer {
/// Should non-null default values be specified, a new field could be added to Deserializer
#[derive(Clone)]
pub struct Deserializer {
needed_column_ids: BTreeMap<i32, usize>,
required_column_ids: HashMap<i32, usize>,
schema: Arc<[DataType]>,
default_column_values: Vec<(usize, Datum)>,

/// A row with default values for each column or `None` if no default value is specified
default_row: Vec<Datum>,
}

impl Deserializer {
Expand All @@ -188,14 +182,18 @@ impl Deserializer {
column_with_default: impl Iterator<Item = (usize, Datum)>,
) -> Self {
assert_eq!(column_ids.len(), schema.len());
let mut default_row: Vec<Datum> = vec![None; schema.len()];
for (i, datum) in column_with_default {
default_row[i] = datum;
}
Self {
needed_column_ids: column_ids
required_column_ids: column_ids
.iter()
.enumerate()
.map(|(i, c)| (c.get_id(), i))
.collect::<BTreeMap<_, _>>(),
.collect::<HashMap<_, _>>(),
schema,
default_column_values: column_with_default.collect(),
default_row,
}
}
}
Expand All @@ -214,12 +212,11 @@ impl ValueRowDeserializer for Deserializer {
let data_start_idx = offsets_start_idx + datum_num * offset_bytes;
let offsets = &encoded_bytes[offsets_start_idx..data_start_idx];
let data = &encoded_bytes[data_start_idx..];
let mut datums: Vec<Option<Datum>> = vec![None; self.schema.len()];
let mut contained_indices = BTreeSet::new();

let mut row = self.default_row.clone();
for i in 0..datum_num {
let this_id = encoded_bytes.get_i32_le();
if let Some(&decoded_idx) = self.needed_column_ids.get(&this_id) {
contained_indices.insert(decoded_idx);
if let Some(&decoded_idx) = self.required_column_ids.get(&this_id) {
let this_offset_start_idx = i * offset_bytes;
let mut this_offset_slice =
&offsets[this_offset_start_idx..(this_offset_start_idx + offset_bytes)];
Expand All @@ -246,15 +243,10 @@ impl ValueRowDeserializer for Deserializer {
&mut data_slice,
)?)
};
datums[decoded_idx] = Some(data);
}
}
for (id, datum) in &self.default_column_values {
if !contained_indices.contains(id) {
datums[*id].get_or_insert(datum.clone());
row[decoded_idx] = data;
}
}
Ok(datums.into_iter().map(|d| d.unwrap_or(None)).collect())
Ok(row)
}
}

Expand Down

0 comments on commit 8984ae2

Please sign in to comment.