Skip to content

Commit

Permalink
change the api to define min_chunk_size and max_chunk_size and automa…
Browse files Browse the repository at this point in the history
…tically center the mask
  • Loading branch information
kszucs committed Feb 20, 2025
1 parent e02658f commit e82efee
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 62 deletions.
47 changes: 24 additions & 23 deletions cpp/src/parquet/column_chunker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,6 @@ constexpr uint64_t GEAR_HASH_TABLE[8][256] = {
0xbf037e245369a618, 0x8038164365f6e2b5, 0xe2e1f6163b4e8d08, 0x8df9314914f0857e},
};

const uint64_t AVG_LEN = 1024 * 1024;

// create a fake null array class with a GetView method returning 0 always
class FakeNullArray {
public:
Expand All @@ -564,25 +562,27 @@ class FakeNullArray {

static uint64_t GetMask(uint64_t min_size, uint64_t max_size) {
uint64_t avg_size = (min_size + max_size) / 2;
size_t mask_bits = static_cast<size_t>(std::ceil(std::log2(avg_size)));
size_t effective_bits = mask_bits - 3 - 5;
return (1ULL << effective_bits) - 1;
uint64_t target_size = avg_size - min_size;
size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size)));
// -3 because we are using 8 hash tables to have more gaussian-like distribution
// -1 narrows the chunk size distribution in order to avoid having too many hard
// cuts at the minimum and maximum chunk sizes
size_t effective_bits = mask_bits - 3 - 1;
return std::numeric_limits<uint64_t>::max() << (64 - effective_bits);
}

// rename it since it is not FastCDC anymore

FastCDC::FastCDC(const LevelInfo& level_info, uint64_t avg_len, uint8_t granurality_level)
ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info,
uint64_t min_size, uint64_t max_size)
: level_info_(level_info),
avg_len_(avg_len == 0 ? AVG_LEN : avg_len),
min_len_(static_cast<uint64_t>(avg_len_ * 0.5)),
max_len_(static_cast<uint64_t>(avg_len_ * 2.0)),
hash_mask_(GetMask(avg_len_, granurality_level + 3)) {}
min_size_(min_size),
max_size_(max_size),
hash_mask_(GetMask(min_size, max_size)) {}

template <typename T>
bool FastCDC::Roll(const T value) {
bool ContentDefinedChunker::Roll(const T value) {
constexpr size_t BYTE_WIDTH = sizeof(T);
chunk_size_ += BYTE_WIDTH;
if (chunk_size_ < min_len_) {
if (chunk_size_ < min_size_) {
return false;
}
auto bytes = reinterpret_cast<const uint8_t*>(&value);
Expand All @@ -594,9 +594,9 @@ bool FastCDC::Roll(const T value) {
return match;
}

bool FastCDC::Roll(std::string_view value) {
bool ContentDefinedChunker::Roll(std::string_view value) {
chunk_size_ += value.size();
if (chunk_size_ < min_len_) {
if (chunk_size_ < min_size_) {
return false;
}
bool match = false;
Expand All @@ -608,12 +608,12 @@ bool FastCDC::Roll(std::string_view value) {
return match;
}

bool FastCDC::Check(bool match) {
if (ARROW_PREDICT_FALSE(match && (++nth_run_ >= 7))) {
bool ContentDefinedChunker::Check(bool match) {
if (ARROW_PREDICT_FALSE(match && ++nth_run_ >= 7)) {
nth_run_ = 0;
chunk_size_ = 0;
return true;
} else if (ARROW_PREDICT_FALSE(chunk_size_ >= max_len_)) {
} else if (ARROW_PREDICT_FALSE(chunk_size_ >= max_size_)) {
chunk_size_ = 0;
return true;
} else {
Expand All @@ -622,9 +622,10 @@ bool FastCDC::Check(bool match) {
}

template <typename T>
const std::vector<Chunk> FastCDC::Calculate(const int16_t* def_levels,
const int16_t* rep_levels, int64_t num_levels,
const T& leaf_array) {
const std::vector<Chunk> ContentDefinedChunker::Calculate(const int16_t* def_levels,
const int16_t* rep_levels,
int64_t num_levels,
const T& leaf_array) {
std::vector<Chunk> result;
bool has_def_levels = level_info_.def_level > 0;
bool has_rep_levels = level_info_.rep_level > 0;
Expand Down Expand Up @@ -719,7 +720,7 @@ const std::vector<Chunk> FastCDC::Calculate(const int16_t* def_levels,
return Calculate(def_levels, rep_levels, num_levels, \
checked_cast<const ::arrow::ArrowType##Array&>(values));

const ::arrow::Result<std::vector<Chunk>> FastCDC::GetBoundaries(
const ::arrow::Result<std::vector<Chunk>> ContentDefinedChunker::GetBoundaries(
const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels,
const ::arrow::Array& values) {
auto type_id = values.type()->id();
Expand Down
15 changes: 6 additions & 9 deletions cpp/src/parquet/column_chunker.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@ struct Chunk {
levels_to_write(levels_to_write) {}
};

// have a chunker here

// rename it since it is not FastCDC anymore
class FastCDC {
class ContentDefinedChunker {
public:
FastCDC(const LevelInfo& level_info, uint64_t avg_len, uint8_t granurality_level = 5);
ContentDefinedChunker(const LevelInfo& level_info, uint64_t min_size,
uint64_t max_size);

const ::arrow::Result<std::vector<Chunk>> GetBoundaries(const int16_t* def_levels,
const int16_t* rep_levels,
Expand All @@ -62,12 +60,11 @@ class FastCDC {
int64_t num_levels, const T& leaf_array);

const internal::LevelInfo& level_info_;
const uint64_t avg_len_;
const uint64_t min_len_;
const uint64_t max_len_;
const uint64_t min_size_;
const uint64_t max_size_;
const uint64_t hash_mask_;

uint8_t nth_run_ = 0;
uint64_t nth_run_ = 0;
uint64_t chunk_size_ = 0;
uint64_t rolling_hash_ = 0;
};
Expand Down
216 changes: 209 additions & 7 deletions cpp/src/parquet/column_chunker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,212 @@
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include "arrow/array.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_decimal.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/type_fwd.h"
#include "arrow/type_traits.h"
#include "arrow/util/decimal.h"
#include "arrow/util/float16.h"
#include "parquet/arrow/reader.h"
#include "parquet/arrow/reader_internal.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/test_util.h"
#include "parquet/arrow/writer.h"
#include "parquet/column_writer.h"
#include "parquet/file_writer.h"
#include "parquet/page_index.h"
#include "parquet/test_util.h"

namespace parquet {

using ::arrow::Array;
using ::arrow::ChunkedArray;
using ::arrow::ConcatenateTables;
using ::arrow::default_memory_pool;
using ::arrow::Field;
using ::arrow::Result;
using ::arrow::Table;
using ::arrow::io::BufferReader;
using ::arrow::random::GenerateArray;
using ::arrow::random::GenerateBatch;
using ::parquet::arrow::FileReader;
using ::parquet::arrow::FileReaderBuilder;
using ::parquet::arrow::MakeSimpleTable;
using ::parquet::arrow::NonNullArray;
using ::parquet::arrow::WriteTable;

std::shared_ptr<Table> GenerateTable(const std::vector<std::shared_ptr<Field>>& fields,
int64_t size, int32_t seed = 42) {
auto batch = GenerateBatch(fields, size, seed);
return Table::FromRecordBatches({batch}).ValueOrDie();
}

std::shared_ptr<Table> ConcatAndCombine(
const std::vector<std::shared_ptr<Table>>& parts) {
auto table = ConcatenateTables(parts).ValueOrDie();
return table->CombineChunks().ValueOrDie();
}

Result<std::shared_ptr<Buffer>> WriteTableToBuffer(const std::shared_ptr<Table>& table,
uint64_t min_chunk_size,
uint64_t max_chunk_size,
int64_t row_group_size = 1024 * 1024) {
auto sink = CreateOutputStream();

auto write_props = WriterProperties::Builder()
.disable_dictionary()
->enable_cdc()
->cdc_size_range(min_chunk_size, max_chunk_size)
->build();
auto arrow_props = default_arrow_writer_properties();
RETURN_NOT_OK(WriteTable(*table, default_memory_pool(), sink, row_group_size,
write_props, arrow_props));
return sink->Finish();
}

std::vector<uint64_t> GetColumnPageLengths(const std::shared_ptr<Buffer>& data,
int column_index = 0) {
std::vector<uint64_t> page_lengths;

auto buffer_reader = std::make_shared<BufferReader>(data);
auto parquet_reader = ParquetFileReader::Open(std::move(buffer_reader));

auto metadata = parquet_reader->metadata();
for (int rg = 0; rg < metadata->num_row_groups(); rg++) {
auto page_reader = parquet_reader->RowGroup(rg)->GetColumnPageReader(column_index);
while (auto page = page_reader->NextPage()) {
if (page->type() == PageType::DATA_PAGE || page->type() == PageType::DATA_PAGE_V2) {
auto data_page = static_cast<DataPage*>(page.get());
page_lengths.push_back(data_page->num_values());
}
}
}

return page_lengths;
}

std::vector<uint64_t> WriteAndGetPageLengths(const std::shared_ptr<Table>& table,
uint64_t min_chunk_size,
uint64_t max_chunk_size,
int column_index = 0) {
auto buffer = WriteTableToBuffer(table, min_chunk_size, max_chunk_size).ValueOrDie();
return GetColumnPageLengths(buffer, column_index);
}

void AssertAllBetween(const std::vector<uint64_t>& values, uint64_t min, uint64_t max) {
// expect the last chunk since it is not guaranteed to be within the range
for (size_t i = 0; i < values.size() - 1; i++) {
ASSERT_GE(values[i], min);
ASSERT_LE(values[i], max);
}
ASSERT_LE(values.back(), max);
}

void AssertUpdateCase(const std::vector<uint64_t>& original,
const std::vector<uint64_t>& modified) {
ASSERT_EQ(original.size(), modified.size());
for (size_t i = 0; i < original.size(); i++) {
ASSERT_EQ(original[i], modified[i]);
}
}

void AssertDeleteCase(const std::vector<uint64_t>& original,
const std::vector<uint64_t>& modified) {
ASSERT_EQ(original.size(), modified.size());
size_t smaller_count = 0;
for (size_t i = 0; i < original.size(); i++) {
if (modified[i] < original[i]) {
smaller_count++;
ASSERT_LT(modified[i], original[i]);
} else {
ASSERT_EQ(modified[i], original[i]);
}
}
ASSERT_EQ(smaller_count, 1);
}

void AssertInsertCase(const std::vector<uint64_t>& original,
const std::vector<uint64_t>& modified) {
ASSERT_EQ(original.size(), modified.size());
size_t larger_count = 0;
for (size_t i = 0; i < original.size(); i++) {
if (modified[i] > original[i]) {
larger_count++;
ASSERT_GT(modified[i], original[i]);
} else {
ASSERT_EQ(modified[i], original[i]);
}
}
ASSERT_EQ(larger_count, 1);
}

void AssertAppendCase(const std::vector<uint64_t>& original,
const std::vector<uint64_t>& modified) {
ASSERT_GE(modified.size(), original.size());
for (size_t i = 0; i < original.size() - 1; i++) {
ASSERT_EQ(original[i], modified[i]);
}
ASSERT_GT(modified[original.size()], original[original.size()]);
}

// TODO:
// - test multiple edits
// - test nullable types
// - test nested types
// - test dictionary encoding
// - test multiple row groups

class TestColumnChunker : public ::testing::Test {};

TEST_F(TestColumnChunker, BasicOperation) {
auto dtype = ::arrow::uint64();
auto field = ::arrow::field("f0", dtype, false);

auto part1 = GenerateTable({field}, 128 * 1024);
auto part2 = GenerateTable({field}, 32, /*seed=*/1);
auto part3 = GenerateTable({field}, 128 * 1024);
auto part4 = GenerateTable({field}, 64);
auto part5 = GenerateTable({field}, 32 * 1024);
auto part6 = GenerateTable({field}, 32, /*seed=*/2);

auto base = ConcatAndCombine({part1, part2, part3});
auto updated = ConcatAndCombine({part1, part6, part3});
auto deleted = ConcatAndCombine({part1, part3});
auto inserted = ConcatAndCombine({part1, part2, part4, part3});
auto appended = ConcatAndCombine({part1, part2, part3, part5});

auto min_size = 128 * 1024;
auto max_size = 256 * 1024;

auto base_lengths = WriteAndGetPageLengths(base, min_size, max_size);
auto updated_lengths = WriteAndGetPageLengths(updated, min_size, max_size);
auto deleted_lengths = WriteAndGetPageLengths(deleted, min_size, max_size);
auto inserted_lengths = WriteAndGetPageLengths(inserted, min_size, max_size);
auto appended_lengths = WriteAndGetPageLengths(appended, min_size, max_size);

AssertAllBetween(base_lengths, min_size / 8, max_size / 8);
AssertAllBetween(updated_lengths, min_size / 8, max_size / 8);
AssertAllBetween(deleted_lengths, min_size / 8, max_size / 8);
AssertAllBetween(inserted_lengths, min_size / 8, max_size / 8);
AssertAllBetween(appended_lengths, min_size / 8, max_size / 8);

AssertUpdateCase(base_lengths, updated_lengths);
AssertDeleteCase(base_lengths, deleted_lengths);
AssertInsertCase(base_lengths, inserted_lengths);
AssertAppendCase(base_lengths, appended_lengths);
}

} // namespace parquet

// - check that the state is maintained across rowgroups, so the edits should be
// consistent
// - check that the edits are consistent between writes
// - some smoke testing like approach would be nice to test several arrow types
21 changes: 12 additions & 9 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,8 @@ class ColumnWriterImpl {
fallback_(false),
definition_levels_sink_(allocator_),
repetition_levels_sink_(allocator_),
content_defined_chunker_(level_info_, properties->cdc_avg_size()) {
content_defined_chunker_(level_info_, properties->cdc_size_range().first,
properties->cdc_size_range().second) {
definition_levels_rle_ =
std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
repetition_levels_rle_ =
Expand Down Expand Up @@ -894,7 +895,7 @@ class ColumnWriterImpl {

std::vector<std::unique_ptr<DataPage>> data_pages_;

internal::FastCDC content_defined_chunker_;
internal::ContentDefinedChunker content_defined_chunker_;

private:
void InitSinks() {
Expand Down Expand Up @@ -1341,15 +1342,17 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
content_defined_chunker_.GetBoundaries(
def_levels, rep_levels, num_levels, leaf_array));
for (auto chunk : boundaries) {
auto sliced_array = leaf_array.Slice(chunk.value_offset);
auto chunk_array = leaf_array.Slice(chunk.value_offset);
auto chunk_def_levels = AddIfNotNull(def_levels, chunk.level_offset);
auto chunk_rep_levels = AddIfNotNull(rep_levels, chunk.level_offset);
if (leaf_array.type()->id() == ::arrow::Type::DICTIONARY) {
ARROW_CHECK_OK(WriteArrowDictionary(
def_levels + chunk.level_offset, rep_levels + chunk.level_offset,
chunk.levels_to_write, *sliced_array, ctx, maybe_parent_nulls));
ARROW_CHECK_OK(WriteArrowDictionary(chunk_def_levels, chunk_rep_levels,
chunk.levels_to_write, *chunk_array, ctx,
maybe_parent_nulls));
} else {
ARROW_CHECK_OK(WriteArrowDense(
def_levels + chunk.level_offset, rep_levels + chunk.level_offset,
chunk.levels_to_write, *sliced_array, ctx, maybe_parent_nulls));
ARROW_CHECK_OK(WriteArrowDense(chunk_def_levels, chunk_rep_levels,
chunk.levels_to_write, *chunk_array, ctx,
maybe_parent_nulls));
}
if (num_buffered_values_ > 0) {
AddDataPage();
Expand Down
Loading

0 comments on commit e82efee

Please sign in to comment.