From ae1bb5140b1cec9270ec087354ed06941c5ba5e6 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 26 Feb 2016 09:52:46 -0800 Subject: [PATCH] PARQUET-494: Implement DictionaryEncoder and test dictionary decoding I incorporated quite a bit of code from Impala for this patch, but did a bunch of work to get everything working. In particular, I wasn't happy with the hash table implementation in `dict-encoder.h` and so have written a simple new one that we can benchmark and tune as necessary. The simplest way to pull in the DictEncoder (PARQUET-493) was to also bring in the `MemPool` implementation, suitably trimmed down. We can continue to refactor this as needed for parquet-cpp. I also did some light refactoring using `TYPED_TEST` in `plain-encoding-test` (now `encoding-test`). Author: Wes McKinney Closes #64 from wesm/PARQUET-494 and squashes the following commits: c634abe [Wes McKinney] Refactor to create TestEncoderBase a3a563a [Wes McKinney] Consolidate dictionary encoding code 2cc4ffe [Wes McKinney] Retrieve type_length() only once in PlainDecoder ctor 20ccd9e [Wes McKinney] Remove DictionaryEncoder shim layer for now dcfc0aa [Wes McKinney] Remove redundant Int96 comparison d98a2c0 [Wes McKinney] Dictionary encoding for booleans throws exception 05414f0 [Wes McKinney] Test dictionary encoding more types 9a5b1a4 [Wes McKinney] Enable include_order linting per PARQUET-539 f3f0efc [Wes McKinney] IWYU cleaning d4191c6 [Wes McKinney] Add header installs, fix clang warning 1347b13 [Wes McKinney] Rename plain-encoding-test to encoding-test 09bf0fa [Wes McKinney] Fix bugs and add dictionary repeats 2e6af48 [Wes McKinney] Fix some bugs. FixedLenByteArray remains to get working. 69b5b69 [Wes McKinney] Refactor test fixtures to be less coupled to state. process on getting dict encoding working 6b23716 [Wes McKinney] Create reusable DataType structs for test fixtures and other compile-time type resolution matters 67883fd [Wes McKinney] Bunch of combined work for dict encoding support: Change-Id: I0fe7d47373b9da106e700381bee6538199af8a69 --- cpp/src/parquet/column/column-reader-test.cc | 1 + cpp/src/parquet/column/levels-test.cc | 3 +- cpp/src/parquet/column/reader.cc | 5 +- cpp/src/parquet/column/scanner-test.cc | 54 +-- cpp/src/parquet/compression/codec-test.cc | 5 +- cpp/src/parquet/compression/codec.h | 4 +- cpp/src/parquet/compression/lz4-codec.cc | 3 +- cpp/src/parquet/compression/snappy-codec.cc | 3 +- cpp/src/parquet/encodings/CMakeLists.txt | 2 +- .../parquet/encodings/dictionary-encoding.h | 311 +++++++++++++++++- cpp/src/parquet/encodings/encoder.h | 5 - cpp/src/parquet/encodings/encoding-test.cc | 309 +++++++++++++++++ .../parquet/encodings/plain-encoding-test.cc | 232 ------------- cpp/src/parquet/encodings/plain-encoding.h | 96 +++--- cpp/src/parquet/file/reader-internal.h | 2 - cpp/src/parquet/reader-test.cc | 3 +- .../parquet/schema/schema-descriptor-test.cc | 3 +- cpp/src/parquet/types.h | 36 ++ cpp/src/parquet/util/CMakeLists.txt | 16 +- cpp/src/parquet/util/bit-stream-utils.h | 2 +- cpp/src/parquet/util/bit-util-test.cc | 5 +- cpp/src/parquet/util/buffer-builder.h | 61 ++++ cpp/src/parquet/util/cpu-info.cc | 10 +- cpp/src/parquet/util/dict-encoding.h | 36 ++ cpp/src/parquet/util/hash-util.h | 247 ++++++++++++++ cpp/src/parquet/util/mem-pool-test.cc | 247 ++++++++++++++ cpp/src/parquet/util/mem-pool.cc | 234 +++++++++++++ cpp/src/parquet/util/mem-pool.h | 208 ++++++++++++ cpp/src/parquet/util/output.h | 1 - cpp/src/parquet/util/rle-encoding.h | 2 +- cpp/src/parquet/util/rle-test.cc | 9 +- cpp/src/parquet/util/sse-util.h | 30 ++ cpp/src/parquet/util/stopwatch.h | 5 +- cpp/src/parquet/util/test-common.h | 13 +- 34 files changed, 1839 insertions(+), 364 deletions(-) create mode 100644 cpp/src/parquet/encodings/encoding-test.cc delete mode 100644 cpp/src/parquet/encodings/plain-encoding-test.cc create mode 100644 cpp/src/parquet/util/buffer-builder.h create mode 100644 cpp/src/parquet/util/dict-encoding.h create mode 100644 cpp/src/parquet/util/hash-util.h create mode 100644 cpp/src/parquet/util/mem-pool-test.cc create mode 100644 cpp/src/parquet/util/mem-pool.cc create mode 100644 cpp/src/parquet/util/mem-pool.h diff --git a/cpp/src/parquet/column/column-reader-test.cc b/cpp/src/parquet/column/column-reader-test.cc index 079201a9f74d9..e64ef28816ca9 100644 --- a/cpp/src/parquet/column/column-reader-test.cc +++ b/cpp/src/parquet/column/column-reader-test.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include diff --git a/cpp/src/parquet/column/levels-test.cc b/cpp/src/parquet/column/levels-test.cc index 0e3c20f222a8b..57aa56263c28a 100644 --- a/cpp/src/parquet/column/levels-test.cc +++ b/cpp/src/parquet/column/levels-test.cc @@ -15,13 +15,12 @@ // specific language governing permissions and limitations // under the License. +#include #include #include #include #include -#include - #include "parquet/column/levels.h" #include "parquet/types.h" diff --git a/cpp/src/parquet/column/reader.cc b/cpp/src/parquet/column/reader.cc index 4011347c24161..4cff8108092ee 100644 --- a/cpp/src/parquet/column/reader.cc +++ b/cpp/src/parquet/column/reader.cc @@ -52,8 +52,9 @@ void TypedColumnReader::ConfigureDictionary(const DictionaryPage* page) { // // TODO(wesm): investigate whether this all-or-nothing decoding of the // dictionary makes sense and whether performance can be improved - std::shared_ptr decoder( - new DictionaryDecoder(descr_, &dictionary)); + + auto decoder = std::make_shared >(descr_); + decoder->SetDict(&dictionary); decoders_[encoding] = decoder; current_decoder_ = decoders_[encoding].get(); diff --git a/cpp/src/parquet/column/scanner-test.cc b/cpp/src/parquet/column/scanner-test.cc index be6b42ee28653..785db08dc72a0 100644 --- a/cpp/src/parquet/column/scanner-test.cc +++ b/cpp/src/parquet/column/scanner-test.cc @@ -40,16 +40,6 @@ namespace parquet_cpp { using schema::NodePtr; -bool operator==(const Int96& a, const Int96& b) { - return a.value[0] == b.value[0] && - a.value[1] == b.value[1] && - a.value[2] == b.value[2]; -} - -bool operator==(const ByteArray& a, const ByteArray& b) { - return a.len == b.len && 0 == memcmp(a.ptr, b.ptr, a.len); -} - static int FLBA_LENGTH = 12; bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) { return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH); @@ -57,16 +47,10 @@ bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) { namespace test { -template class TypeValue { - public: - static const int value = N; -}; -template const int TypeValue::value; - -template +template class TestFlatScanner : public ::testing::Test { public: - typedef typename type_traits::value_type T; + typedef typename Type::c_type T; void InitValues() { random_numbers(num_values_, 0, std::numeric_limits::min(), @@ -106,7 +90,7 @@ class TestFlatScanner : public ::testing::Test { // Create values values_.resize(num_values_); InitValues(); - Paginate(d, values_, def_levels_, max_def_level, + Paginate(d, values_, def_levels_, max_def_level, rep_levels_, max_rep_level, levels_per_page, values_per_page, pages_); } @@ -116,8 +100,8 @@ class TestFlatScanner : public ::testing::Test { } void CheckResults(int batch_size, const ColumnDescriptor *d) { - TypedScanner* scanner = - reinterpret_cast* >(scanner_.get()); + TypedScanner* scanner = + reinterpret_cast* >(scanner_.get()); T val; bool is_null; int16_t def_level; @@ -158,14 +142,11 @@ class TestFlatScanner : public ::testing::Test { void InitDescriptors(std::shared_ptr& d1, std::shared_ptr& d2, std::shared_ptr& d3) { NodePtr type; - type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, - static_cast(TYPE::value)); + type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::type_num); d1.reset(new ColumnDescriptor(type, 0, 0)); - type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL, - static_cast(TYPE::value)); + type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL, Type::type_num); d2.reset(new ColumnDescriptor(type, 4, 0)); - type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED, - static_cast(TYPE::value)); + type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED, Type::type_num); d3.reset(new ColumnDescriptor(type, 4, 2)); } @@ -194,18 +175,18 @@ class TestFlatScanner : public ::testing::Test { }; template<> -void TestFlatScanner >::InitValues() { +void TestFlatScanner::InitValues() { values_ = flip_coins(num_values_, 0); } template<> -void TestFlatScanner >::InitValues() { +void TestFlatScanner::InitValues() { random_Int96_numbers(num_values_, 0, std::numeric_limits::min(), std::numeric_limits::max(), values_.data()); } template<> -void TestFlatScanner >::InitValues() { +void TestFlatScanner::InitValues() { int max_byte_array_len = 12; int num_bytes = max_byte_array_len + sizeof(uint32_t); size_t nbytes = num_values_ * num_bytes; @@ -215,7 +196,7 @@ void TestFlatScanner >::InitValues() { } template<> -void TestFlatScanner >::InitValues() { +void TestFlatScanner::InitValues() { size_t nbytes = num_values_ * FLBA_LENGTH; data_buffer_.resize(nbytes); random_fixed_byte_array(num_values_, 0, data_buffer_.data(), FLBA_LENGTH, @@ -223,7 +204,7 @@ void TestFlatScanner >::InitValues() { } template<> -void TestFlatScanner >::InitDescriptors( +void TestFlatScanner::InitDescriptors( std::shared_ptr& d1, std::shared_ptr& d2, std::shared_ptr& d3) { NodePtr type = schema::PrimitiveNode::MakeFLBA("c1", Repetition::REQUIRED, @@ -237,18 +218,13 @@ void TestFlatScanner >::InitDescriptors( d3.reset(new ColumnDescriptor(type, 4, 2)); } -typedef TestFlatScanner> TestFlatFLBAScanner; +typedef TestFlatScanner TestFlatFLBAScanner; static int num_levels_per_page = 100; static int num_pages = 20; static int batch_size = 32; -typedef ::testing::Types, TypeValue, - TypeValue, TypeValue, TypeValue, - TypeValue, TypeValue, - TypeValue > Primitives; - -TYPED_TEST_CASE(TestFlatScanner, Primitives); +TYPED_TEST_CASE(TestFlatScanner, ParquetTypes); TYPED_TEST(TestFlatScanner, TestScanner) { this->ExecuteAll(num_pages, num_levels_per_page, batch_size); diff --git a/cpp/src/parquet/compression/codec-test.cc b/cpp/src/parquet/compression/codec-test.cc index 610fb3796b57e..285559a014c24 100644 --- a/cpp/src/parquet/compression/codec-test.cc +++ b/cpp/src/parquet/compression/codec-test.cc @@ -15,14 +15,13 @@ // specific language governing permissions and limitations // under the License. +#include #include #include #include -#include -#include "parquet/util/test-common.h" - #include "parquet/compression/codec.h" +#include "parquet/util/test-common.h" using std::string; using std::vector; diff --git a/cpp/src/parquet/compression/codec.h b/cpp/src/parquet/compression/codec.h index bc73f02f3e074..df15d61c1344f 100644 --- a/cpp/src/parquet/compression/codec.h +++ b/cpp/src/parquet/compression/codec.h @@ -18,11 +18,11 @@ #ifndef PARQUET_COMPRESSION_CODEC_H #define PARQUET_COMPRESSION_CODEC_H +#include + #include #include -#include - #include "parquet/exception.h" #include "parquet/types.h" diff --git a/cpp/src/parquet/compression/lz4-codec.cc b/cpp/src/parquet/compression/lz4-codec.cc index a131031a7f2f4..81413bbc33ec3 100644 --- a/cpp/src/parquet/compression/lz4-codec.cc +++ b/cpp/src/parquet/compression/lz4-codec.cc @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -#include "parquet/compression/codec.h" - #include #include +#include "parquet/compression/codec.h" #include "parquet/exception.h" namespace parquet_cpp { diff --git a/cpp/src/parquet/compression/snappy-codec.cc b/cpp/src/parquet/compression/snappy-codec.cc index 91590dbf4e3ad..991dd0466a8ff 100644 --- a/cpp/src/parquet/compression/snappy-codec.cc +++ b/cpp/src/parquet/compression/snappy-codec.cc @@ -15,12 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include "parquet/compression/codec.h" - #include #include #include +#include "parquet/compression/codec.h" #include "parquet/exception.h" namespace parquet_cpp { diff --git a/cpp/src/parquet/encodings/CMakeLists.txt b/cpp/src/parquet/encodings/CMakeLists.txt index c9349afb255a2..eb4cc3cfc3b80 100644 --- a/cpp/src/parquet/encodings/CMakeLists.txt +++ b/cpp/src/parquet/encodings/CMakeLists.txt @@ -26,4 +26,4 @@ install(FILES plain-encoding.h DESTINATION include/parquet/encodings) -ADD_PARQUET_TEST(plain-encoding-test) +ADD_PARQUET_TEST(encoding-test) diff --git a/cpp/src/parquet/encodings/dictionary-encoding.h b/cpp/src/parquet/encodings/dictionary-encoding.h index b52aefb7070e7..eed06592c586f 100644 --- a/cpp/src/parquet/encodings/dictionary-encoding.h +++ b/cpp/src/parquet/encodings/dictionary-encoding.h @@ -20,10 +20,16 @@ #include #include +#include +#include #include #include "parquet/encodings/decoder.h" #include "parquet/encodings/encoder.h" +#include "parquet/encodings/plain-encoding.h" +#include "parquet/util/dict-encoding.h" +#include "parquet/util/hash-util.h" +#include "parquet/util/mem-pool.h" #include "parquet/util/rle-encoding.h" namespace parquet_cpp { @@ -36,14 +42,12 @@ class DictionaryDecoder : public Decoder { // Initializes the dictionary with values from 'dictionary'. The data in // dictionary is not guaranteed to persist in memory after this call so the // dictionary decoder needs to copy the data out if necessary. - DictionaryDecoder(const ColumnDescriptor* descr, - Decoder* dictionary) + explicit DictionaryDecoder(const ColumnDescriptor* descr) : Decoder(descr, Encoding::RLE_DICTIONARY) { - Init(dictionary); } // Perform type-specific initiatialization - void Init(Decoder* dictionary); + void SetDict(Decoder* dictionary); virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; @@ -83,20 +87,20 @@ class DictionaryDecoder : public Decoder { }; template -inline void DictionaryDecoder::Init(Decoder* dictionary) { +inline void DictionaryDecoder::SetDict(Decoder* dictionary) { int num_dictionary_values = dictionary->values_left(); dictionary_.resize(num_dictionary_values); dictionary->Decode(&dictionary_[0], num_dictionary_values); } template <> -inline void DictionaryDecoder::Init( +inline void DictionaryDecoder::SetDict( Decoder* dictionary) { ParquetException::NYI("Dictionary encoding is not implemented for boolean values"); } template <> -inline void DictionaryDecoder::Init( +inline void DictionaryDecoder::SetDict( Decoder* dictionary) { int num_dictionary_values = dictionary->values_left(); dictionary_.resize(num_dictionary_values); @@ -116,7 +120,7 @@ inline void DictionaryDecoder::Init( } template <> -inline void DictionaryDecoder::Init( +inline void DictionaryDecoder::SetDict( Decoder* dictionary) { int num_dictionary_values = dictionary->values_left(); dictionary_.resize(num_dictionary_values); @@ -134,6 +138,297 @@ inline void DictionaryDecoder::Init( } } +// ---------------------------------------------------------------------- +// Dictionary encoder + +// Initially imported from Apache Impala on 2016-02-22, and has been modified +// since for parquet-cpp + +// Initially 1024 elements +static constexpr int INITIAL_HASH_TABLE_SIZE = 1 << 10; + +typedef int32_t hash_slot_t; +static constexpr hash_slot_t HASH_SLOT_EMPTY = std::numeric_limits::max(); + +// The maximum load factor for the hash table before resizing. +static constexpr double MAX_HASH_LOAD = 0.7; + +/// See the dictionary encoding section of https://github.com/Parquet/parquet-format. +/// The encoding supports streaming encoding. Values are encoded as they are added while +/// the dictionary is being constructed. At any time, the buffered values can be +/// written out with the current dictionary size. More values can then be added to +/// the encoder, including new dictionary entries. +class DictEncoderBase { + public: + virtual ~DictEncoderBase() { + DCHECK(buffered_indices_.empty()); + } + + /// Writes out the encoded dictionary to buffer. buffer must be preallocated to + /// dict_encoded_size() bytes. + virtual void WriteDict(uint8_t* buffer) = 0; + + /// The number of entries in the dictionary. + virtual int num_entries() const = 0; + + /// Clears all the indices (but leaves the dictionary). + void ClearIndices() { buffered_indices_.clear(); } + + /// Returns a conservative estimate of the number of bytes needed to encode the buffered + /// indices. Used to size the buffer passed to WriteIndices(). + int EstimatedDataEncodedSize() { + return 1 + RleEncoder::MaxBufferSize(bit_width(), buffered_indices_.size()); + } + + /// The minimum bit width required to encode the currently buffered indices. + int bit_width() const { + if (UNLIKELY(num_entries() == 0)) return 0; + if (UNLIKELY(num_entries() == 1)) return 1; + return BitUtil::Log2(num_entries()); + } + + /// Writes out any buffered indices to buffer preceded by the bit width of this data. + /// Returns the number of bytes written. + /// If the supplied buffer is not big enough, returns -1. + /// buffer must be preallocated with buffer_len bytes. Use EstimatedDataEncodedSize() + /// to size buffer. + int WriteIndices(uint8_t* buffer, int buffer_len); + + int hash_table_size() { return hash_table_size_; } + int dict_encoded_size() { return dict_encoded_size_; } + + protected: + explicit DictEncoderBase(MemPool* pool) : + hash_table_size_(INITIAL_HASH_TABLE_SIZE), + mod_bitmask_(hash_table_size_ - 1), + hash_slots_(hash_table_size_, HASH_SLOT_EMPTY), + pool_(pool), + dict_encoded_size_(0) {} + + /// Size of the table. Must be a power of 2. + int hash_table_size_; + + // Store hash_table_size_ - 1, so that j & mod_bitmask_ is equivalent to j % + // hash_table_size_, but uses far fewer CPU cycles + int mod_bitmask_; + + // We use a fixed-size hash table with linear probing + // + // These values correspond to the uniques_ array + std::vector hash_slots_; + + // For ByteArray / FixedLenByteArray data. Not owned + MemPool* pool_; + + /// Indices that have not yet be written out by WriteIndices(). + std::vector buffered_indices_; + + /// The number of bytes needed to encode the dictionary. + int dict_encoded_size_; +}; + +template +class DictEncoder : public DictEncoderBase { + public: + explicit DictEncoder(MemPool* pool = nullptr, int type_length = -1) : + DictEncoderBase(pool), + type_length_(type_length) { } + + // TODO(wesm): think about how to address the construction semantics in + // encodings/dictionary-encoding.h + void set_mem_pool(MemPool* pool) { + pool_ = pool; + } + + void set_type_length(int type_length) { + type_length_ = type_length; + } + + /// Encode value. Note that this does not actually write any data, just + /// buffers the value's index to be written later. + void Put(const T& value); + + virtual void WriteDict(uint8_t* buffer); + + virtual int num_entries() const { return uniques_.size(); } + + private: + // The unique observed values + std::vector uniques_; + + bool SlotDifferent(const T& v, hash_slot_t slot); + void DoubleTableSize(); + + /// Size of each encoded dictionary value. -1 for variable-length types. + int type_length_; + + /// Hash function for mapping a value to a bucket. + inline uint32_t Hash(const T& value) const; + + /// Adds value to the hash table and updates dict_encoded_size_ + void AddDictKey(const T& value); +}; + +template +inline uint32_t DictEncoder::Hash(const T& value) const { + return HashUtil::Hash(&value, sizeof(value), 0); +} + +template<> +inline uint32_t DictEncoder::Hash(const ByteArray& value) const { + return HashUtil::Hash(value.ptr, value.len, 0); +} + +template<> +inline uint32_t DictEncoder::Hash( + const FixedLenByteArray& value) const { + return HashUtil::Hash(value.ptr, type_length_, 0); +} + +template +inline bool DictEncoder::SlotDifferent(const T& v, hash_slot_t slot) { + return v != uniques_[slot]; +} + +template <> +inline bool DictEncoder::SlotDifferent( + const FixedLenByteArray& v, hash_slot_t slot) { + return 0 != memcmp(v.ptr, uniques_[slot].ptr, type_length_); +} + +template +inline void DictEncoder::Put(const T& v) { + uint32_t j = Hash(v) & mod_bitmask_; + hash_slot_t index = hash_slots_[j]; + + // Find an empty slot + while (HASH_SLOT_EMPTY != index && SlotDifferent(v, index)) { + // Linear probing + ++j; + if (j == hash_table_size_) j = 0; + index = hash_slots_[j]; + } + + int bytes_added = 0; + if (index == HASH_SLOT_EMPTY) { + // Not in the hash table, so we insert it now + index = uniques_.size(); + hash_slots_[j] = index; + AddDictKey(v); + + if (UNLIKELY(uniques_.size() > + static_cast(hash_table_size_ * MAX_HASH_LOAD))) { + DoubleTableSize(); + } + } + + buffered_indices_.push_back(index); +} + +template +inline void DictEncoder::DoubleTableSize() { + int new_size = hash_table_size_ * 2; + std::vector new_hash_slots(new_size, HASH_SLOT_EMPTY); + hash_slot_t index, slot; + uint32_t j; + for (int i = 0; i < hash_table_size_; ++i) { + index = hash_slots_[i]; + + if (index == HASH_SLOT_EMPTY) { + continue; + } + + // Compute the hash value mod the new table size to start looking for an + // empty slot + const T& v = uniques_[index]; + + // Find an empty slot in the new hash table + j = Hash(v) & (new_size - 1); + slot = new_hash_slots[j]; + while (HASH_SLOT_EMPTY != slot && SlotDifferent(v, slot)) { + ++j; + if (j == new_size) j = 0; + slot = new_hash_slots[j]; + } + + // Copy the old slot index to the new hash table + new_hash_slots[j] = index; + } + + hash_table_size_ = new_size; + mod_bitmask_ = new_size - 1; + new_hash_slots.swap(hash_slots_); +} + +template +inline void DictEncoder::AddDictKey(const T& v) { + uniques_.push_back(v); + dict_encoded_size_ += sizeof(T); +} + +template<> +inline void DictEncoder::AddDictKey(const ByteArray& v) { + uint8_t* heap = pool_->Allocate(v.len); + if (UNLIKELY(v.len > 0 && heap == nullptr)) { + throw ParquetException("out of memory"); + } + memcpy(heap, v.ptr, v.len); + + uniques_.push_back(ByteArray(v.len, heap)); + dict_encoded_size_ += v.len + sizeof(uint32_t); +} + +template<> +inline void DictEncoder::AddDictKey(const FixedLenByteArray& v) { + uint8_t* heap = pool_->Allocate(type_length_); + if (UNLIKELY(type_length_ > 0 && heap == nullptr)) { + throw ParquetException("out of memory"); + } + memcpy(heap, v.ptr, type_length_); + + uniques_.push_back(FixedLenByteArray(heap)); + dict_encoded_size_ += type_length_; +} + +template +inline void DictEncoder::WriteDict(uint8_t* buffer) { + // For primitive types, only a memcpy + memcpy(buffer, &uniques_[0], sizeof(T) * uniques_.size()); +} + +// ByteArray and FLBA already have the dictionary encoded in their data heaps +template <> +inline void DictEncoder::WriteDict(uint8_t* buffer) { + for (const ByteArray& v : uniques_) { + memcpy(buffer, reinterpret_cast(&v.len), sizeof(uint32_t)); + buffer += sizeof(uint32_t); + memcpy(buffer, v.ptr, v.len); + buffer += v.len; + } +} + +template <> +inline void DictEncoder::WriteDict(uint8_t* buffer) { + for (const FixedLenByteArray& v : uniques_) { + memcpy(buffer, v.ptr, type_length_); + buffer += type_length_; + } +} + +inline int DictEncoderBase::WriteIndices(uint8_t* buffer, int buffer_len) { + // Write bit width in first byte + *buffer = bit_width(); + ++buffer; + --buffer_len; + + RleEncoder encoder(buffer, buffer_len, bit_width()); + for (int index : buffered_indices_) { + if (!encoder.Put(index)) return -1; + } + encoder.Flush(); + return 1 + encoder.len(); +} + } // namespace parquet_cpp #endif diff --git a/cpp/src/parquet/encodings/encoder.h b/cpp/src/parquet/encodings/encoder.h index 50ba48f9bd5bb..ce91a29b1acb2 100644 --- a/cpp/src/parquet/encodings/encoder.h +++ b/cpp/src/parquet/encodings/encoder.h @@ -39,11 +39,6 @@ class Encoder { virtual ~Encoder() {} - // Subclasses should override the ones they support - virtual void Encode(const T* src, int num_values, OutputStream* dst) { - throw ParquetException("Encoder does not implement this type."); - } - const Encoding::type encoding() const { return encoding_; } protected: diff --git a/cpp/src/parquet/encodings/encoding-test.cc b/cpp/src/parquet/encodings/encoding-test.cc new file mode 100644 index 0000000000000..10310ed9d9666 --- /dev/null +++ b/cpp/src/parquet/encodings/encoding-test.cc @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include "parquet/schema/descriptor.h" +#include "parquet/encodings/dictionary-encoding.h" +#include "parquet/encodings/plain-encoding.h" +#include "parquet/types.h" +#include "parquet/schema/types.h" +#include "parquet/util/bit-util.h" +#include "parquet/util/buffer.h" +#include "parquet/util/dict-encoding.h" +#include "parquet/util/output.h" +#include "parquet/util/test-common.h" + +using std::string; +using std::vector; + +namespace parquet_cpp { + +namespace test { + +TEST(VectorBooleanTest, TestEncodeDecode) { + // PARQUET-454 + int nvalues = 10000; + int nbytes = BitUtil::Ceil(nvalues, 8); + + // seed the prng so failure is deterministic + vector draws = flip_coins_seed(nvalues, 0.5, 0); + + PlainEncoder encoder(nullptr); + PlainDecoder decoder(nullptr); + + InMemoryOutputStream dst; + encoder.Encode(draws, nvalues, &dst); + + std::shared_ptr encode_buffer = dst.GetBuffer(); + ASSERT_EQ(nbytes, encode_buffer->size()); + + vector decode_buffer(nbytes); + const uint8_t* decode_data = &decode_buffer[0]; + + decoder.SetData(nvalues, encode_buffer->data(), encode_buffer->size()); + int values_decoded = decoder.Decode(&decode_buffer[0], nvalues); + ASSERT_EQ(nvalues, values_decoded); + + for (int i = 0; i < nvalues; ++i) { + ASSERT_EQ(draws[i], BitUtil::GetArrayBit(decode_data, i)) << i; + } +} + +// ---------------------------------------------------------------------- +// test data generation + +template +void GenerateData(int num_values, T* out, vector* heap) { + // seed the prng so failure is deterministic + random_numbers(num_values, 0, std::numeric_limits::min(), + std::numeric_limits::max(), out); +} + +template <> +void GenerateData(int num_values, bool* out, vector* heap) { + // seed the prng so failure is deterministic + random_bools(num_values, 0.5, 0, out); +} + +template <> +void GenerateData(int num_values, Int96* out, vector* heap) { + // seed the prng so failure is deterministic + random_Int96_numbers(num_values, 0, std::numeric_limits::min(), + std::numeric_limits::max(), out); +} + +template <> +void GenerateData(int num_values, ByteArray* out, vector* heap) { + // seed the prng so failure is deterministic + int max_byte_array_len = 12; + int num_bytes = max_byte_array_len + sizeof(uint32_t); + heap->resize(num_values * max_byte_array_len); + random_byte_array(num_values, 0, heap->data(), out, 2, max_byte_array_len); +} + +static int flba_length = 8; + +template <> +void GenerateData(int num_values, FLBA* out, vector* heap) { + // seed the prng so failure is deterministic + heap->resize(num_values * flba_length); + random_fixed_byte_array(num_values, 0, heap->data(), flba_length, out); +} + +template +void VerifyResults(T* result, T* expected, int num_values) { + for (int i = 0; i < num_values; ++i) { + ASSERT_EQ(expected[i], result[i]) << i; + } +} + +template <> +void VerifyResults(FLBA* result, FLBA* expected, int num_values) { + for (int i = 0; i < num_values; ++i) { + ASSERT_EQ(0, memcmp(expected[i].ptr, result[i].ptr, flba_length)) << i; + } +} + +// ---------------------------------------------------------------------- +// Create some column descriptors + +template +std::shared_ptr ExampleDescr() { + return nullptr; +} + +template <> +std::shared_ptr ExampleDescr() { + auto node = schema::PrimitiveNode::MakeFLBA("name", Repetition::OPTIONAL, + flba_length, LogicalType::UTF8); + return std::make_shared(node, 0, 0); +} + +// ---------------------------------------------------------------------- +// Plain encoding tests + +template +class TestEncodingBase : public ::testing::Test { + public: + typedef typename Type::c_type T; + static constexpr int TYPE = Type::type_num; + + void SetUp() { + descr_ = ExampleDescr(); + if (descr_) { + type_length_ = descr_->type_length(); + } + } + + void InitData(int nvalues, int repeats) { + num_values_ = nvalues * repeats; + input_bytes_.resize(num_values_ * sizeof(T)); + output_bytes_.resize(num_values_ * sizeof(T)); + draws_ = reinterpret_cast(input_bytes_.data()); + decode_buf_ = reinterpret_cast(output_bytes_.data()); + GenerateData(nvalues, draws_, &data_buffer_); + + // add some repeated values + for (int j = 1; j < repeats; ++j) { + for (int i = 0; i < nvalues; ++i) { + draws_[nvalues * j + i] = draws_[i]; + } + } + } + + virtual void CheckRoundtrip() = 0; + + void Execute(int nvalues, int repeats) { + InitData(nvalues, repeats); + CheckRoundtrip(); + } + + protected: + MemPool pool_; + + int num_values_; + int type_length_; + T* draws_; + T* decode_buf_; + vector input_bytes_; + vector output_bytes_; + vector data_buffer_; + + std::shared_ptr encode_buffer_; + std::shared_ptr descr_; +}; + +// Member variables are not visible to templated subclasses. Possibly figure +// out an alternative to this class layering at some point +#define USING_BASE_MEMBERS() \ + using TestEncodingBase::pool_; \ + using TestEncodingBase::descr_; \ + using TestEncodingBase::num_values_; \ + using TestEncodingBase::draws_; \ + using TestEncodingBase::data_buffer_; \ + using TestEncodingBase::type_length_; \ + using TestEncodingBase::encode_buffer_; \ + using TestEncodingBase::decode_buf_; + + +template +class TestPlainEncoding : public TestEncodingBase { + public: + typedef typename Type::c_type T; + static constexpr int TYPE = Type::type_num; + + virtual void CheckRoundtrip() { + PlainEncoder encoder(descr_.get()); + PlainDecoder decoder(descr_.get()); + InMemoryOutputStream dst; + encoder.Encode(draws_, num_values_, &dst); + + encode_buffer_ = dst.GetBuffer(); + + decoder.SetData(num_values_, encode_buffer_->data(), + encode_buffer_->size()); + int values_decoded = decoder.Decode(decode_buf_, num_values_); + ASSERT_EQ(num_values_, values_decoded); + VerifyResults(decode_buf_, draws_, num_values_); + } + + protected: + USING_BASE_MEMBERS(); +}; + +TYPED_TEST_CASE(TestPlainEncoding, ParquetTypes); + +TYPED_TEST(TestPlainEncoding, BasicRoundTrip) { + this->Execute(10000, 1); +} + +// ---------------------------------------------------------------------- +// Dictionary encoding tests + +typedef ::testing::Types DictEncodedTypes; + +template +class TestDictionaryEncoding : public TestEncodingBase { + public: + typedef typename Type::c_type T; + static constexpr int TYPE = Type::type_num; + + void CheckRoundtrip() { + DictEncoder encoder(&pool_, type_length_); + + dict_buffer_ = std::make_shared(); + auto indices = std::make_shared(); + + ASSERT_NO_THROW( + { + for (int i = 0; i < num_values_; ++i) { + encoder.Put(draws_[i]); + } + }); + dict_buffer_->Resize(encoder.dict_encoded_size()); + encoder.WriteDict(dict_buffer_->mutable_data()); + + indices->Resize(encoder.EstimatedDataEncodedSize()); + int actual_bytes = encoder.WriteIndices(indices->mutable_data(), + indices->size()); + indices->Resize(actual_bytes); + + PlainDecoder dict_decoder(descr_.get()); + dict_decoder.SetData(encoder.num_entries(), dict_buffer_->data(), + dict_buffer_->size()); + + DictionaryDecoder decoder(descr_.get()); + decoder.SetDict(&dict_decoder); + + decoder.SetData(num_values_, indices->data(), indices->size()); + int values_decoded = decoder.Decode(decode_buf_, num_values_); + ASSERT_EQ(num_values_, values_decoded); + + // TODO(wesm): The DictionaryDecoder must stay alive because the decoded + // values' data is owned by a buffer inside the DictionaryEncoder. We + // should revisit when data lifetime is reviewed more generally. + VerifyResults(decode_buf_, draws_, num_values_); + } + + protected: + USING_BASE_MEMBERS(); + std::shared_ptr dict_buffer_; +}; + +TYPED_TEST_CASE(TestDictionaryEncoding, DictEncodedTypes); + +TYPED_TEST(TestDictionaryEncoding, BasicRoundTrip) { + this->Execute(2500, 2); +} + +TEST(TestDictionaryEncoding, CannotDictDecodeBoolean) { + PlainDecoder dict_decoder(nullptr); + DictionaryDecoder decoder(nullptr); + + ASSERT_THROW(decoder.SetDict(&dict_decoder), ParquetException); +} + +} // namespace test + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/encodings/plain-encoding-test.cc b/cpp/src/parquet/encodings/plain-encoding-test.cc deleted file mode 100644 index 7ebd21fef0a3c..0000000000000 --- a/cpp/src/parquet/encodings/plain-encoding-test.cc +++ /dev/null @@ -1,232 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include -#include -#include -#include -#include - -#include - -#include "parquet/schema/descriptor.h" -#include "parquet/encodings/plain-encoding.h" -#include "parquet/types.h" -#include "parquet/schema/types.h" -#include "parquet/util/bit-util.h" -#include "parquet/util/buffer.h" -#include "parquet/util/output.h" -#include "parquet/util/test-common.h" - -using std::string; -using std::vector; - -namespace parquet_cpp { - -namespace test { - -TEST(VectorBooleanTest, TestEncodeDecode) { - // PARQUET-454 - size_t nvalues = 10000; - size_t nbytes = BitUtil::Ceil(nvalues, 8); - - // seed the prng so failure is deterministic - vector draws = flip_coins_seed(nvalues, 0.5, 0); - - PlainEncoder encoder(nullptr); - PlainDecoder decoder(nullptr); - - InMemoryOutputStream dst; - encoder.Encode(draws, nvalues, &dst); - - std::shared_ptr encode_buffer = dst.GetBuffer(); - ASSERT_EQ(nbytes, encode_buffer->size()); - - vector decode_buffer(nbytes); - const uint8_t* decode_data = &decode_buffer[0]; - - decoder.SetData(nvalues, encode_buffer->data(), encode_buffer->size()); - size_t values_decoded = decoder.Decode(&decode_buffer[0], nvalues); - ASSERT_EQ(nvalues, values_decoded); - - for (size_t i = 0; i < nvalues; ++i) { - ASSERT_EQ(draws[i], BitUtil::GetArrayBit(decode_data, i)) << i; - } -} - -template -class EncodeDecode{ - public: - void init_data(int nvalues) { - num_values_ = nvalues; - input_bytes_.resize(num_values_ * sizeof(T)); - output_bytes_.resize(num_values_ * sizeof(T)); - draws_ = reinterpret_cast(input_bytes_.data()); - decode_buf_ = reinterpret_cast(output_bytes_.data()); - } - - void generate_data() { - // seed the prng so failure is deterministic - random_numbers(num_values_, 0, std::numeric_limits::min(), - std::numeric_limits::max(), draws_); - } - - void encode_decode(ColumnDescriptor *d) { - PlainEncoder encoder(d); - PlainDecoder decoder(d); - - InMemoryOutputStream dst; - encoder.Encode(draws_, num_values_, &dst); - - encode_buffer_ = dst.GetBuffer(); - - decoder.SetData(num_values_, encode_buffer_->data(), - encode_buffer_->size()); - size_t values_decoded = decoder.Decode(decode_buf_, num_values_); - ASSERT_EQ(num_values_, values_decoded); - } - - void verify_results() { - for (size_t i = 0; i < num_values_; ++i) { - ASSERT_EQ(draws_[i], decode_buf_[i]) << i; - } - } - - void execute(int nvalues, ColumnDescriptor *d) { - init_data(nvalues); - generate_data(); - encode_decode(d); - verify_results(); - } - - private: - int num_values_; - T* draws_; - T* decode_buf_; - vector input_bytes_; - vector output_bytes_; - vector data_buffer_; - - std::shared_ptr encode_buffer_; -}; - -template<> -void EncodeDecode::generate_data() { - // seed the prng so failure is deterministic - random_bools(num_values_, 0.5, 0, draws_); -} - -template<> -void EncodeDecode::generate_data() { - // seed the prng so failure is deterministic - random_Int96_numbers(num_values_, 0, std::numeric_limits::min(), - std::numeric_limits::max(), draws_); -} - -template<> -void EncodeDecode::verify_results() { - for (size_t i = 0; i < num_values_; ++i) { - ASSERT_EQ(draws_[i].value[0], decode_buf_[i].value[0]) << i; - ASSERT_EQ(draws_[i].value[1], decode_buf_[i].value[1]) << i; - ASSERT_EQ(draws_[i].value[2], decode_buf_[i].value[2]) << i; - } -} - -template<> -void EncodeDecode::generate_data() { - // seed the prng so failure is deterministic - int max_byte_array_len = 12; - int num_bytes = max_byte_array_len + sizeof(uint32_t); - size_t nbytes = num_values_ * num_bytes; - data_buffer_.resize(nbytes); - random_byte_array(num_values_, 0, data_buffer_.data(), draws_, - max_byte_array_len); -} - -template<> -void EncodeDecode::verify_results() { - for (size_t i = 0; i < num_values_; ++i) { - ASSERT_EQ(draws_[i].len, decode_buf_[i].len) << i; - ASSERT_EQ(0, memcmp(draws_[i].ptr, decode_buf_[i].ptr, draws_[i].len)) << i; - } -} - -static int flba_length = 8; -template<> -void EncodeDecode::generate_data() { - // seed the prng so failure is deterministic - size_t nbytes = num_values_ * flba_length; - data_buffer_.resize(nbytes); - ASSERT_EQ(nbytes, data_buffer_.size()); - random_fixed_byte_array(num_values_, 0, data_buffer_.data(), flba_length, draws_); -} - -template<> -void EncodeDecode::verify_results() { - for (size_t i = 0; i < num_values_; ++i) { - ASSERT_EQ(0, memcmp(draws_[i].ptr, decode_buf_[i].ptr, flba_length)) << i; - } -} - -int num_values = 10000; - -TEST(BoolEncodeDecode, TestEncodeDecode) { - EncodeDecode obj; - obj.execute(num_values, nullptr); -} - -TEST(Int32EncodeDecode, TestEncodeDecode) { - EncodeDecode obj; - obj.execute(num_values, nullptr); -} - -TEST(Int64EncodeDecode, TestEncodeDecode) { - EncodeDecode obj; - obj.execute(num_values, nullptr); -} - -TEST(FloatEncodeDecode, TestEncodeDecode) { - EncodeDecode obj; - obj.execute(num_values, nullptr); -} - -TEST(DoubleEncodeDecode, TestEncodeDecode) { - EncodeDecode obj; - obj.execute(num_values, nullptr); -} - -TEST(Int96EncodeDecode, TestEncodeDecode) { - EncodeDecode obj; - obj.execute(num_values, nullptr); -} - -TEST(BAEncodeDecode, TestEncodeDecode) { - EncodeDecode obj; - obj.execute(num_values, nullptr); -} - -TEST(FLBAEncodeDecode, TestEncodeDecode) { - schema::NodePtr node; - node = schema::PrimitiveNode::MakeFLBA("name", Repetition::OPTIONAL, - flba_length, LogicalType::UTF8); - ColumnDescriptor d(node, 0, 0); - EncodeDecode obj; - obj.execute(num_values, &d); -} - -} // namespace test -} // namespace parquet_cpp diff --git a/cpp/src/parquet/encodings/plain-encoding.h b/cpp/src/parquet/encodings/plain-encoding.h index 83ee40c4548e6..9adabdfbf3572 100644 --- a/cpp/src/parquet/encodings/plain-encoding.h +++ b/cpp/src/parquet/encodings/plain-encoding.h @@ -40,7 +40,13 @@ class PlainDecoder : public Decoder { explicit PlainDecoder(const ColumnDescriptor* descr) : Decoder(descr, Encoding::PLAIN), - data_(NULL), len_(0) {} + data_(NULL), len_(0) { + if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) { + type_length_ = descr_->type_length(); + } else { + type_length_ = -1; + } + } virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; @@ -49,55 +55,69 @@ class PlainDecoder : public Decoder { } virtual int Decode(T* buffer, int max_values); + private: + using Decoder::descr_; const uint8_t* data_; int len_; + int type_length_; }; -template -inline int PlainDecoder::Decode(T* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - int size = max_values * sizeof(T); - if (len_ < size) ParquetException::EofException(); - memcpy(buffer, data_, size); - data_ += size; - len_ -= size; - num_values_ -= max_values; - return max_values; +// Decode routine templated on C++ type rather than type enum +template +inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values, + int type_length, T* out) { + int bytes_to_decode = num_values * sizeof(T); + if (data_size < bytes_to_decode) { + ParquetException::EofException(); + } + memcpy(out, data, bytes_to_decode); + return bytes_to_decode; } -// Template specialization for BYTE_ARRAY -// BA does not currently own its data -// the lifetime is tied to the input stream +// Template specialization for BYTE_ARRAY. The written values do not own their +// own data. template <> -inline int PlainDecoder::Decode(ByteArray* buffer, - int max_values) { - max_values = std::min(max_values, num_values_); - for (int i = 0; i < max_values; ++i) { - uint32_t len = buffer[i].len = *reinterpret_cast(data_); - if (len_ < sizeof(uint32_t) + len) ParquetException::EofException(); - buffer[i].ptr = data_ + sizeof(uint32_t); - data_ += sizeof(uint32_t) + len; - len_ -= sizeof(uint32_t) + len; +inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values, + int type_length, ByteArray* out) { + int bytes_decoded = 0; + int increment; + for (int i = 0; i < num_values; ++i) { + uint32_t len = out[i].len = *reinterpret_cast(data); + increment = sizeof(uint32_t) + len; + if (data_size < increment) ParquetException::EofException(); + out[i].ptr = data + sizeof(uint32_t); + data += increment; + data_size -= increment; + bytes_decoded += increment; } - num_values_ -= max_values; - return max_values; + return bytes_decoded; } -// Template specialization for FIXED_LEN_BYTE_ARRAY -// FLBA does not currently own its data -// the lifetime is tied to the input stream +// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not +// own their own data. template <> -inline int PlainDecoder::Decode( - FixedLenByteArray* buffer, int max_values) { - max_values = std::min(max_values, num_values_); - int len = descr_->type_length(); - for (int i = 0; i < max_values; ++i) { - if (len_ < len) ParquetException::EofException(); - buffer[i].ptr = data_; - data_ += len; - len_ -= len; +inline int DecodePlain(const uint8_t* data, int64_t data_size, + int num_values, int type_length, FixedLenByteArray* out) { + int bytes_to_decode = type_length * num_values; + if (data_size < bytes_to_decode) { + ParquetException::EofException(); } + for (int i = 0; i < num_values; ++i) { + out[i].ptr = data; + data += type_length; + data_size -= type_length; + } + return bytes_to_decode; +} + +template +inline int PlainDecoder::Decode(T* buffer, int max_values) { + max_values = std::min(max_values, num_values_); + int bytes_consumed = DecodePlain(data_, len_, max_values, + type_length_, buffer); + data_ += bytes_consumed; + len_ -= bytes_consumed; num_values_ -= max_values; return max_values; } @@ -155,7 +175,7 @@ class PlainEncoder : public Encoder { explicit PlainEncoder(const ColumnDescriptor* descr) : Encoder(descr, Encoding::PLAIN) {} - virtual void Encode(const T* src, int num_values, OutputStream* dst); + void Encode(const T* src, int num_values, OutputStream* dst); }; template <> diff --git a/cpp/src/parquet/file/reader-internal.h b/cpp/src/parquet/file/reader-internal.h index 7aff74a43d908..08d4607540137 100644 --- a/cpp/src/parquet/file/reader-internal.h +++ b/cpp/src/parquet/file/reader-internal.h @@ -31,8 +31,6 @@ namespace parquet_cpp { -class SchemaDescriptor; - // 16 MB is the default maximum page header size static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024; diff --git a/cpp/src/parquet/reader-test.cc b/cpp/src/parquet/reader-test.cc index be22e5a88341d..e99140c09483e 100644 --- a/cpp/src/parquet/reader-test.cc +++ b/cpp/src/parquet/reader-test.cc @@ -15,14 +15,13 @@ // specific language governing permissions and limitations // under the License. +#include #include #include #include #include #include -#include - #include "parquet/file/reader.h" #include "parquet/column/reader.h" #include "parquet/column/scanner.h" diff --git a/cpp/src/parquet/schema/schema-descriptor-test.cc b/cpp/src/parquet/schema/schema-descriptor-test.cc index 767761561ef58..3b1734b41aaee 100644 --- a/cpp/src/parquet/schema/schema-descriptor-test.cc +++ b/cpp/src/parquet/schema/schema-descriptor-test.cc @@ -17,13 +17,12 @@ // Schema / column descriptor correctness tests (from flat Parquet schemas) +#include #include #include #include #include -#include - #include "parquet/exception.h" #include "parquet/schema/descriptor.h" #include "parquet/schema/types.h" diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 8c5e1236531d9..f59f6a95701a2 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -128,11 +128,24 @@ struct PageType { // ---------------------------------------------------------------------- struct ByteArray { + ByteArray() {} + ByteArray(uint32_t len, const uint8_t* ptr) : len(len), ptr(ptr) {} uint32_t len; const uint8_t* ptr; + + bool operator==(const ByteArray& other) const { + return this->len == other.len && + 0 == memcmp(this->ptr, other.ptr, this->len); + } + + bool operator!=(const ByteArray& other) const { + return this->len != other.len || 0 != memcmp(this->ptr, other.ptr, this->len); + } }; struct FixedLenByteArray { + FixedLenByteArray() {} + explicit FixedLenByteArray(const uint8_t* ptr) : ptr(ptr) {} const uint8_t* ptr; }; @@ -140,6 +153,14 @@ typedef FixedLenByteArray FLBA; MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; + + bool operator==(const Int96& other) const { + return 0 == memcmp(this->value, other.value, 3 * sizeof(uint32_t)); + } + + bool operator!=(const Int96& other) const { + return !(*this == other); + } }; STRUCT_END(Int96, 12); @@ -241,6 +262,21 @@ struct type_traits { static constexpr const char* printf_code = "s"; }; +template +struct DataType { + static constexpr Type::type type_num = TYPE; + typedef typename type_traits::value_type c_type; +}; + +typedef DataType BooleanType; +typedef DataType Int32Type; +typedef DataType Int64Type; +typedef DataType Int96Type; +typedef DataType FloatType; +typedef DataType DoubleType; +typedef DataType ByteArrayType; +typedef DataType FLBAType; + template inline std::string format_fwf(int width) { std::stringstream ss; diff --git a/cpp/src/parquet/util/CMakeLists.txt b/cpp/src/parquet/util/CMakeLists.txt index 2b782fcd2e0ca..a009129dc9cc5 100644 --- a/cpp/src/parquet/util/CMakeLists.txt +++ b/cpp/src/parquet/util/CMakeLists.txt @@ -20,22 +20,27 @@ install(FILES bit-stream-utils.h bit-stream-utils.inline.h bit-util.h - cpu-info.h - sse-info.h + buffer-builder.h compiler-util.h + cpu-info.h + dict-encoding.h + hash-util.h + input.h logging.h macros.h + mem-pool.h + output.h rle-encoding.h stopwatch.h - input.h - output.h + sse-info.h DESTINATION include/parquet/util) add_library(parquet_util STATIC buffer.cc + cpu-info.cc input.cc + mem-pool.cc output.cc - cpu-info.cc ) if(PARQUET_BUILD_TESTS) @@ -58,5 +63,6 @@ endif() ADD_PARQUET_TEST(bit-util-test) ADD_PARQUET_TEST(buffer-test) +ADD_PARQUET_TEST(mem-pool-test) ADD_PARQUET_TEST(output-test) ADD_PARQUET_TEST(rle-test) diff --git a/cpp/src/parquet/util/bit-stream-utils.h b/cpp/src/parquet/util/bit-stream-utils.h index b93b90ed4f101..36361284eb177 100644 --- a/cpp/src/parquet/util/bit-stream-utils.h +++ b/cpp/src/parquet/util/bit-stream-utils.h @@ -20,9 +20,9 @@ #ifndef PARQUET_UTIL_BIT_STREAM_UTILS_H #define PARQUET_UTIL_BIT_STREAM_UTILS_H +#include #include #include -#include #include "parquet/util/compiler-util.h" #include "parquet/util/logging.h" diff --git a/cpp/src/parquet/util/bit-util-test.cc b/cpp/src/parquet/util/bit-util-test.cc index a8b6be09bb661..5ea4c119f7b46 100644 --- a/cpp/src/parquet/util/bit-util-test.cc +++ b/cpp/src/parquet/util/bit-util-test.cc @@ -19,11 +19,12 @@ #include #include -#include #include +#include #include -#include + +#include #include "parquet/util/bit-util.h" #include "parquet/util/bit-stream-utils.inline.h" diff --git a/cpp/src/parquet/util/buffer-builder.h b/cpp/src/parquet/util/buffer-builder.h new file mode 100644 index 0000000000000..6fab6c5b7a6c2 --- /dev/null +++ b/cpp/src/parquet/util/buffer-builder.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Initially imported from Impala on 2016-02-23 + +#ifndef PARQUET_UTIL_BUFFER_BUILDER_H +#define PARQUET_UTIL_BUFFER_BUILDER_H + +#include +#include + +namespace parquet_cpp { + +/// Utility class to build an in-memory buffer. +class BufferBuilder { + public: + BufferBuilder(uint8_t* dst_buffer, int dst_len) + : buffer_(dst_buffer), capacity_(dst_len), size_(0) { + } + + BufferBuilder(char* dst_buffer, int dst_len) + : buffer_(reinterpret_cast(dst_buffer)), + capacity_(dst_len), size_(0) { + } + + inline void Append(const void* buffer, int len) { + memcpy(buffer_ + size_, buffer, len); + size_ += len; + } + + template + inline void Append(const T& v) { + Append(&v, sizeof(T)); + } + + int capacity() const { return capacity_; } + int size() const { return size_; } + + private: + uint8_t* buffer_; + int capacity_; + int size_; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_UTIL_BUFFER_BUILDER_H diff --git a/cpp/src/parquet/util/cpu-info.cc b/cpp/src/parquet/util/cpu-info.cc index 610fb623ed042..2a9f59d2b11d1 100644 --- a/cpp/src/parquet/util/cpu-info.cc +++ b/cpp/src/parquet/util/cpu-info.cc @@ -24,16 +24,18 @@ #include #endif +#include +#include +#include +#include + #include + #include #include #include #include -#include #include -#include -#include -#include #include #include "parquet/exception.h" diff --git a/cpp/src/parquet/util/dict-encoding.h b/cpp/src/parquet/util/dict-encoding.h new file mode 100644 index 0000000000000..315b88ee91ea6 --- /dev/null +++ b/cpp/src/parquet/util/dict-encoding.h @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_UTIL_DICT_ENCODING_H +#define PARQUET_UTIL_DICT_ENCODING_H + +#include +#include +#include +#include + +#include "parquet/types.h" +#include "parquet/encodings/plain-encoding.h" +#include "parquet/util/hash-util.h" +#include "parquet/util/mem-pool.h" +#include "parquet/util/rle-encoding.h" + +namespace parquet_cpp { + +} // namespace parquet_cpp + +#endif // PARQUET_UTIL_DICT_ENCODING_H diff --git a/cpp/src/parquet/util/hash-util.h b/cpp/src/parquet/util/hash-util.h new file mode 100644 index 0000000000000..5572ca931a43e --- /dev/null +++ b/cpp/src/parquet/util/hash-util.h @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// From Apache Impala as of 2016-02-22 + +#ifndef PARQUET_UTIL_HASH_UTIL_H +#define PARQUET_UTIL_HASH_UTIL_H + +#include + +#include "parquet/util/compiler-util.h" +#include "parquet/util/cpu-info.h" +#include "parquet/util/logging.h" +#include "parquet/util/sse-util.h" + +namespace parquet_cpp { + +/// Utility class to compute hash values. +class HashUtil { + public: + /// Compute the Crc32 hash for data using SSE4 instructions. The input hash + /// parameter is the current hash/seed value. + /// This should only be called if SSE is supported. + /// This is ~4x faster than Fnv/Boost Hash. + /// TODO: crc32 hashes with different seeds do not result in different hash functions. + /// The resulting hashes are correlated. + /// TODO: update this to also use SSE4_crc32_u64 and SSE4_crc32_u16 where appropriate. + static uint32_t CrcHash(const void* data, int32_t bytes, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + uint32_t words = bytes / sizeof(uint32_t); + bytes = bytes % sizeof(uint32_t); + + const uint32_t* p = reinterpret_cast(data); + while (words--) { + hash = SSE4_crc32_u32(hash, *p); + ++p; + } + + const uint8_t* s = reinterpret_cast(p); + while (bytes--) { + hash = SSE4_crc32_u8(hash, *s); + ++s; + } + + // The lower half of the CRC hash has has poor uniformity, so swap the halves + // for anyone who only uses the first several bits of the hash. + hash = (hash << 16) | (hash >> 16); + return hash; + } + + /// CrcHash() specialized for 1-byte data + static inline uint32_t CrcHash1(const void* v, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + const uint8_t* s = reinterpret_cast(v); + hash = SSE4_crc32_u8(hash, *s); + hash = (hash << 16) | (hash >> 16); + return hash; + } + + /// CrcHash() specialized for 2-byte data + static inline uint32_t CrcHash2(const void* v, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + const uint16_t* s = reinterpret_cast(v); + hash = SSE4_crc32_u16(hash, *s); + hash = (hash << 16) | (hash >> 16); + return hash; + } + + /// CrcHash() specialized for 4-byte data + static inline uint32_t CrcHash4(const void* v, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + const uint32_t* p = reinterpret_cast(v); + hash = SSE4_crc32_u32(hash, *p); + hash = (hash << 16) | (hash >> 16); + return hash; + } + + /// CrcHash() specialized for 8-byte data + static inline uint32_t CrcHash8(const void* v, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + const uint64_t* p = reinterpret_cast(v); + hash = SSE4_crc32_u64(hash, *p); + hash = (hash << 16) | (hash >> 16); + return hash; + } + + /// CrcHash() specialized for 12-byte data + static inline uint32_t CrcHash12(const void* v, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + const uint64_t* p = reinterpret_cast(v); + hash = SSE4_crc32_u64(hash, *p); + ++p; + hash = SSE4_crc32_u32(hash, *reinterpret_cast(p)); + hash = (hash << 16) | (hash >> 16); + return hash; + } + + /// CrcHash() specialized for 16-byte data + static inline uint32_t CrcHash16(const void* v, uint32_t hash) { + DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2)); + const uint64_t* p = reinterpret_cast(v); + hash = SSE4_crc32_u64(hash, *p); + ++p; + hash = SSE4_crc32_u64(hash, *p); + hash = (hash << 16) | (hash >> 16); + return hash; + } + + static const uint64_t MURMUR_PRIME = 0xc6a4a7935bd1e995; + static const int MURMUR_R = 47; + + /// Murmur2 hash implementation returning 64-bit hashes. + static uint64_t MurmurHash2_64(const void* input, int len, uint64_t seed) { + uint64_t h = seed ^ (len * MURMUR_PRIME); + + const uint64_t* data = reinterpret_cast(input); + const uint64_t* end = data + (len / sizeof(uint64_t)); + + while (data != end) { + uint64_t k = *data++; + k *= MURMUR_PRIME; + k ^= k >> MURMUR_R; + k *= MURMUR_PRIME; + h ^= k; + h *= MURMUR_PRIME; + } + + const uint8_t* data2 = reinterpret_cast(data); + switch (len & 7) { + case 7: h ^= uint64_t(data2[6]) << 48; + case 6: h ^= uint64_t(data2[5]) << 40; + case 5: h ^= uint64_t(data2[4]) << 32; + case 4: h ^= uint64_t(data2[3]) << 24; + case 3: h ^= uint64_t(data2[2]) << 16; + case 2: h ^= uint64_t(data2[1]) << 8; + case 1: h ^= uint64_t(data2[0]); + h *= MURMUR_PRIME; + } + + h ^= h >> MURMUR_R; + h *= MURMUR_PRIME; + h ^= h >> MURMUR_R; + return h; + } + + /// default values recommended by http://isthe.com/chongo/tech/comp/fnv/ + static const uint32_t FNV_PRIME = 0x01000193; // 16777619 + static const uint32_t FNV_SEED = 0x811C9DC5; // 2166136261 + static const uint64_t FNV64_PRIME = 1099511628211UL; + static const uint64_t FNV64_SEED = 14695981039346656037UL; + + /// Implementation of the Fowler-Noll-Vo hash function. This is not as performant + /// as boost's hash on int types (2x slower) but has bit entropy. + /// For ints, boost just returns the value of the int which can be pathological. + /// For example, if the data is <1000, 2000, 3000, 4000, ..> and then the mod of 1000 + /// is taken on the hash, all values will collide to the same bucket. + /// For string values, Fnv is slightly faster than boost. + /// IMPORTANT: FNV hash suffers from poor diffusion of the least significant bit, + /// which can lead to poor results when input bytes are duplicated. + /// See FnvHash64to32() for how this can be mitigated. + static uint64_t FnvHash64(const void* data, int32_t bytes, uint64_t hash) { + const uint8_t* ptr = reinterpret_cast(data); + while (bytes--) { + hash = (*ptr ^ hash) * FNV64_PRIME; + ++ptr; + } + return hash; + } + + /// Return a 32-bit hash computed by invoking FNV-64 and folding the result to 32-bits. + /// This technique is recommended instead of FNV-32 since the LSB of an FNV hash is the + /// XOR of the LSBs of its input bytes, leading to poor results for duplicate inputs. + /// The input seed 'hash' is duplicated so the top half of the seed is not all zero. + /// Data length must be at least 1 byte: zero-length data should be handled separately, + /// for example using CombineHash with a unique constant value to avoid returning the + /// hash argument. Zero-length data gives terrible results: the initial hash value is + /// xored with itself cancelling all bits. + static uint32_t FnvHash64to32(const void* data, int32_t bytes, uint32_t hash) { + // IMPALA-2270: this function should never be used for zero-byte inputs. + DCHECK_GT(bytes, 0); + uint64_t hash_u64 = hash | ((uint64_t)hash << 32); + hash_u64 = FnvHash64(data, bytes, hash_u64); + return (hash_u64 >> 32) ^ (hash_u64 & 0xFFFFFFFF); + } + + /// Computes the hash value for data. Will call either CrcHash or MurmurHash + /// depending on hardware capabilities. + /// Seed values for different steps of the query execution should use different seeds + /// to prevent accidental key collisions. (See IMPALA-219 for more details). + static uint32_t Hash(const void* data, int32_t bytes, uint32_t seed) { + if (LIKELY(CpuInfo::IsSupported(CpuInfo::SSE4_2))) { + return CrcHash(data, bytes, seed); + } else { + return MurmurHash2_64(data, bytes, seed); + } + } + + /// The magic number (used in hash_combine()) 0x9e3779b9 = 2^32 / (golden ratio). + static const uint32_t HASH_COMBINE_SEED = 0x9e3779b9; + + /// Combine hashes 'value' and 'seed' to get a new hash value. Similar to + /// boost::hash_combine(), but for uint32_t. This function should be used with a + /// constant first argument to update the hash value for zero-length values such as + /// NULL, boolean, and empty strings. + static inline uint32_t HashCombine32(uint32_t value, uint32_t seed) { + return seed ^ (HASH_COMBINE_SEED + value + (seed << 6) + (seed >> 2)); + } + + // Get 32 more bits of randomness from a 32-bit hash: + static inline uint32_t Rehash32to32(const uint32_t hash) { + // Constants generated by uuidgen(1) with the -r flag + static const uint64_t m = 0x7850f11ec6d14889ull, a = 0x6773610597ca4c63ull; + // This is strongly universal hashing following Dietzfelbinger's "Universal hashing + // and k-wise independent random variables via integer arithmetic without primes". As + // such, for any two distinct uint32_t's hash1 and hash2, the probability (over the + // randomness of the constants) that any subset of bit positions of + // Rehash32to32(hash1) is equal to the same subset of bit positions + // Rehash32to32(hash2) is minimal. + return (static_cast(hash) * m + a) >> 32; + } + + static inline uint64_t Rehash32to64(const uint32_t hash) { + static const uint64_t m1 = 0x47b6137a44974d91ull, m2 = 0x8824ad5ba2b7289cull, + a1 = 0x705495c62df1424aull, a2 = 0x9efc49475c6bfb31ull; + const uint64_t hash1 = (static_cast(hash) * m1 + a1) >> 32; + const uint64_t hash2 = (static_cast(hash) * m2 + a2) >> 32; + return hash1 | (hash2 << 32); + } +}; + +} // namespace parquet_cpp + +#endif // PARQUET_UTIL_HASH_UTIL_H diff --git a/cpp/src/parquet/util/mem-pool-test.cc b/cpp/src/parquet/util/mem-pool-test.cc new file mode 100644 index 0000000000000..de0b399279918 --- /dev/null +++ b/cpp/src/parquet/util/mem-pool-test.cc @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Initially imported from Apache Impala on 2016-02-23, and has been modified +// since for parquet-cpp + +#include +#include +#include +#include + +#include "parquet/util/mem-pool.h" +#include "parquet/util/bit-util.h" + +namespace parquet_cpp { + +// Utility class to call private functions on MemPool. +class MemPoolTest { + public: + static bool CheckIntegrity(MemPool* pool, bool current_chunk_empty) { + return pool->CheckIntegrity(current_chunk_empty); + } + + static const int INITIAL_CHUNK_SIZE = MemPool::INITIAL_CHUNK_SIZE; + static const int MAX_CHUNK_SIZE = MemPool::MAX_CHUNK_SIZE; +}; + +const int MemPoolTest::INITIAL_CHUNK_SIZE; +const int MemPoolTest::MAX_CHUNK_SIZE; + +TEST(MemPoolTest, Basic) { + MemPool p; + MemPool p2; + MemPool p3; + + for (int iter = 0; iter < 2; ++iter) { + // allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes) + for (int i = 0; i < 768; ++i) { + // pads to 32 bytes + p.Allocate(25); + } + // we handed back 24K + EXPECT_EQ(24 * 1024, p.total_allocated_bytes()); + // .. and allocated 28K of chunks (4, 8, 16) + EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes()); + + // we're passing on the first two chunks, containing 12K of data; we're left with + // one chunk of 16K containing 12K of data + p2.AcquireData(&p, true); + EXPECT_EQ(12 * 1024, p.total_allocated_bytes()); + EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes()); + + // we allocate 8K, for which there isn't enough room in the current chunk, + // so another one is allocated (32K) + p.Allocate(8 * 1024); + EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes()); + + // we allocate 65K, which doesn't fit into the current chunk or the default + // size of the next allocated chunk (64K) + p.Allocate(65 * 1024); + EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes()); + if (iter == 0) { + EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); + } else { + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + } + EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // Clear() resets allocated data, but doesn't remove any chunks + p.Clear(); + EXPECT_EQ(0, p.total_allocated_bytes()); + if (iter == 0) { + EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); + } else { + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + } + EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // next allocation reuses existing chunks + p.Allocate(1024); + EXPECT_EQ(1024, p.total_allocated_bytes()); + if (iter == 0) { + EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes()); + } else { + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + } + EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // ... unless it doesn't fit into any available chunk + p.Allocate(120 * 1024); + EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes()); + if (iter == 0) { + EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes()); + } else { + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + } + EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // ... Try another chunk that fits into an existing chunk + p.Allocate(33 * 1024); + EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes()); + EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes()); + + // we're releasing 3 chunks, which get added to p2 + p2.AcquireData(&p, false); + EXPECT_EQ(0, p.total_allocated_bytes()); + EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes()); + EXPECT_EQ(0, p.GetTotalChunkSizes()); + + p3.AcquireData(&p2, true); // we're keeping the 65k chunk + EXPECT_EQ(33 * 1024, p2.total_allocated_bytes()); + EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes()); + + p.FreeAll(); + p2.FreeAll(); + p3.FreeAll(); + } +} + +// Test that we can keep an allocated chunk and a free chunk. +// This case verifies that when chunks are acquired by another memory pool the +// remaining chunks are consistent if there were more than one used chunk and some +// free chunks. +TEST(MemPoolTest, Keep) { + MemPool p; + p.Allocate(4*1024); + p.Allocate(8*1024); + p.Allocate(16*1024); + EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes()); + EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); + p.Clear(); + EXPECT_EQ(0, p.total_allocated_bytes()); + EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); + p.Allocate(1*1024); + p.Allocate(4*1024); + EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes()); + EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes()); + + MemPool p2; + p2.AcquireData(&p, true); + EXPECT_EQ(4 * 1024, p.total_allocated_bytes()); + EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes()); + EXPECT_EQ(1 * 1024, p2.total_allocated_bytes()); + EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes()); + + p.FreeAll(); + p2.FreeAll(); +} + +// Tests that we can return partial allocations. +TEST(MemPoolTest, ReturnPartial) { + MemPool p; + uint8_t* ptr = p.Allocate(1024); + EXPECT_EQ(1024, p.total_allocated_bytes()); + memset(ptr, 0, 1024); + p.ReturnPartialAllocation(1024); + + uint8_t* ptr2 = p.Allocate(1024); + EXPECT_EQ(1024, p.total_allocated_bytes()); + EXPECT_TRUE(ptr == ptr2); + p.ReturnPartialAllocation(1016); + + ptr2 = p.Allocate(1016); + EXPECT_EQ(1024, p.total_allocated_bytes()); + EXPECT_TRUE(ptr2 == ptr + 8); + p.ReturnPartialAllocation(512); + memset(ptr2, 1, 1016 - 512); + + uint8_t* ptr3 = p.Allocate(512); + EXPECT_EQ(1024, p.total_allocated_bytes()); + EXPECT_TRUE(ptr3 == ptr + 512); + memset(ptr3, 2, 512); + + for (int i = 0; i < 8; ++i) { + EXPECT_EQ(0, ptr[i]); + } + for (int i = 8; i < 512; ++i) { + EXPECT_EQ(1, ptr[i]); + } + for (int i = 512; i < 1024; ++i) { + EXPECT_EQ(2, ptr[i]); + } + + p.FreeAll(); +} + +// Test that the MemPool overhead is bounded when we make allocations of +// INITIAL_CHUNK_SIZE. +TEST(MemPoolTest, MemoryOverhead) { + MemPool p; + const int alloc_size = MemPoolTest::INITIAL_CHUNK_SIZE; + const int num_allocs = 1000; + int64_t total_allocated = 0; + + for (int i = 0; i < num_allocs; ++i) { + uint8_t* mem = p.Allocate(alloc_size); + ASSERT_TRUE(mem != NULL); + total_allocated += alloc_size; + + int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated; + // The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most + // one empty chunk at the end. + EXPECT_LE(wasted_memory, MemPoolTest::MAX_CHUNK_SIZE); + // The chunk doubling algorithm should not allocate chunks larger than the total + // amount of memory already allocated. + EXPECT_LE(wasted_memory, total_allocated); + } + + p.FreeAll(); +} + +// Test that the MemPool overhead is bounded when we make alternating large and small +// allocations. +TEST(MemPoolTest, FragmentationOverhead) { + MemPool p; + const int num_allocs = 100; + int64_t total_allocated = 0; + + for (int i = 0; i < num_allocs; ++i) { + int alloc_size = i % 2 == 0 ? 1 : MemPoolTest::MAX_CHUNK_SIZE; + uint8_t* mem = p.Allocate(alloc_size); + ASSERT_TRUE(mem != NULL); + total_allocated += alloc_size; + + int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated; + // Fragmentation should not waste more than half of each completed chunk. + EXPECT_LE(wasted_memory, total_allocated + MemPoolTest::MAX_CHUNK_SIZE); + } + + p.FreeAll(); +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/util/mem-pool.cc b/cpp/src/parquet/util/mem-pool.cc new file mode 100644 index 0000000000000..6e56c2816d9ff --- /dev/null +++ b/cpp/src/parquet/util/mem-pool.cc @@ -0,0 +1,234 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Initially imported from Apache Impala on 2016-02-23, and has been modified +// since for parquet-cpp + +#include "parquet/util/mem-pool.h" + +#include + +#include +#include +#include +#include + +#include "parquet/util/bit-util.h" + +namespace parquet_cpp { + +const int MemPool::INITIAL_CHUNK_SIZE; +const int MemPool::MAX_CHUNK_SIZE; + +MemPool::MemPool() + : current_chunk_idx_(-1), + next_chunk_size_(INITIAL_CHUNK_SIZE), + total_allocated_bytes_(0), + peak_allocated_bytes_(0), + total_reserved_bytes_(0) {} + +MemPool::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf) + : data(buf), + size(size), + allocated_bytes(0) { +} + +MemPool::~MemPool() { + int64_t total_bytes_released = 0; + for (size_t i = 0; i < chunks_.size(); ++i) { + total_bytes_released += chunks_[i].size; + free(chunks_[i].data); + } + + DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool"; +} + +void MemPool::Clear() { + current_chunk_idx_ = -1; + for (auto chunk = chunks_.begin(); chunk != chunks_.end(); ++chunk) { + chunk->allocated_bytes = 0; + } + total_allocated_bytes_ = 0; + DCHECK(CheckIntegrity(false)); +} + +void MemPool::FreeAll() { + int64_t total_bytes_released = 0; + for (size_t i = 0; i < chunks_.size(); ++i) { + total_bytes_released += chunks_[i].size; + free(chunks_[i].data); + } + chunks_.clear(); + next_chunk_size_ = INITIAL_CHUNK_SIZE; + current_chunk_idx_ = -1; + total_allocated_bytes_ = 0; + total_reserved_bytes_ = 0; +} + +bool MemPool::FindChunk(int64_t min_size) { + // Try to allocate from a free chunk. The first free chunk, if any, will be immediately + // after the current chunk. + int first_free_idx = current_chunk_idx_ + 1; + // (cast size() to signed int in order to avoid everything else being cast to + // unsigned long, in particular -1) + while (++current_chunk_idx_ < static_cast(chunks_.size())) { + // we found a free chunk + DCHECK_EQ(chunks_[current_chunk_idx_].allocated_bytes, 0); + + if (chunks_[current_chunk_idx_].size >= min_size) { + // This chunk is big enough. Move it before the other free chunks. + if (current_chunk_idx_ != first_free_idx) { + std::swap(chunks_[current_chunk_idx_], chunks_[first_free_idx]); + current_chunk_idx_ = first_free_idx; + } + break; + } + } + + if (current_chunk_idx_ == static_cast(chunks_.size())) { + // need to allocate new chunk. + int64_t chunk_size; + DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE); + DCHECK_LE(next_chunk_size_, MAX_CHUNK_SIZE); + + chunk_size = std::max(min_size, next_chunk_size_); + + // Allocate a new chunk. Return early if malloc fails. + uint8_t* buf = reinterpret_cast(malloc(chunk_size)); + if (UNLIKELY(buf == NULL)) { + DCHECK_EQ(current_chunk_idx_, static_cast(chunks_.size())); + current_chunk_idx_ = static_cast(chunks_.size()) - 1; + return false; + } + + // If there are no free chunks put it at the end, otherwise before the first free. + if (first_free_idx == static_cast(chunks_.size())) { + chunks_.push_back(ChunkInfo(chunk_size, buf)); + } else { + current_chunk_idx_ = first_free_idx; + auto insert_chunk = chunks_.begin() + current_chunk_idx_; + chunks_.insert(insert_chunk, ChunkInfo(chunk_size, buf)); + } + total_reserved_bytes_ += chunk_size; + // Don't increment the chunk size until the allocation succeeds: if an attempted + // large allocation fails we don't want to increase the chunk size further. + next_chunk_size_ = static_cast(std::min( + chunk_size * 2, MAX_CHUNK_SIZE)); + } + + DCHECK_LT(current_chunk_idx_, static_cast(chunks_.size())); + DCHECK(CheckIntegrity(true)); + return true; +} + +void MemPool::AcquireData(MemPool* src, bool keep_current) { + DCHECK(src->CheckIntegrity(false)); + int num_acquired_chunks; + if (keep_current) { + num_acquired_chunks = src->current_chunk_idx_; + } else if (src->GetFreeOffset() == 0) { + // nothing in the last chunk + num_acquired_chunks = src->current_chunk_idx_; + } else { + num_acquired_chunks = src->current_chunk_idx_ + 1; + } + + if (num_acquired_chunks <= 0) { + if (!keep_current) src->FreeAll(); + return; + } + + auto end_chunk = src->chunks_.begin() + num_acquired_chunks; + int64_t total_transfered_bytes = 0; + for (auto i = src->chunks_.begin(); i != end_chunk; ++i) { + total_transfered_bytes += i->size; + } + src->total_reserved_bytes_ -= total_transfered_bytes; + total_reserved_bytes_ += total_transfered_bytes; + + // insert new chunks after current_chunk_idx_ + auto insert_chunk = chunks_.begin() + current_chunk_idx_ + 1; + chunks_.insert(insert_chunk, src->chunks_.begin(), end_chunk); + src->chunks_.erase(src->chunks_.begin(), end_chunk); + current_chunk_idx_ += num_acquired_chunks; + + if (keep_current) { + src->current_chunk_idx_ = 0; + DCHECK(src->chunks_.size() == 1 || src->chunks_[1].allocated_bytes == 0); + total_allocated_bytes_ += src->total_allocated_bytes_ - src->GetFreeOffset(); + src->total_allocated_bytes_ = src->GetFreeOffset(); + } else { + src->current_chunk_idx_ = -1; + total_allocated_bytes_ += src->total_allocated_bytes_; + src->total_allocated_bytes_ = 0; + } + peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_); + + if (!keep_current) src->FreeAll(); + DCHECK(CheckIntegrity(false)); +} + +std::string MemPool::DebugString() { + std::stringstream out; + char str[16]; + out << "MemPool(#chunks=" << chunks_.size() << " ["; + for (int i = 0; i < chunks_.size(); ++i) { + sprintf(str, "0x%lx=", reinterpret_cast(chunks_[i].data)); // NOLINT + out << (i > 0 ? " " : "") + << str + << chunks_[i].size + << "/" << chunks_[i].allocated_bytes; + } + out << "] current_chunk=" << current_chunk_idx_ + << " total_sizes=" << GetTotalChunkSizes() + << " total_alloc=" << total_allocated_bytes_ + << ")"; + return out.str(); +} + +int64_t MemPool::GetTotalChunkSizes() const { + int64_t result = 0; + for (int i = 0; i < chunks_.size(); ++i) { + result += chunks_[i].size; + } + return result; +} + +bool MemPool::CheckIntegrity(bool current_chunk_empty) { + // check that current_chunk_idx_ points to the last chunk with allocated data + DCHECK_LT(current_chunk_idx_, static_cast(chunks_.size())); + int64_t total_allocated = 0; + for (int i = 0; i < chunks_.size(); ++i) { + DCHECK_GT(chunks_[i].size, 0); + if (i < current_chunk_idx_) { + DCHECK_GT(chunks_[i].allocated_bytes, 0); + } else if (i == current_chunk_idx_) { + if (current_chunk_empty) { + DCHECK_EQ(chunks_[i].allocated_bytes, 0); + } else { + DCHECK_GT(chunks_[i].allocated_bytes, 0); + } + } else { + DCHECK_EQ(chunks_[i].allocated_bytes, 0); + } + total_allocated += chunks_[i].allocated_bytes; + } + DCHECK_EQ(total_allocated, total_allocated_bytes_); + return true; +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/util/mem-pool.h b/cpp/src/parquet/util/mem-pool.h new file mode 100644 index 0000000000000..88a8715509c1c --- /dev/null +++ b/cpp/src/parquet/util/mem-pool.h @@ -0,0 +1,208 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Initially imported from Apache Impala on 2016-02-23, and has been modified +// since for parquet-cpp + +#ifndef PARQUET_UTIL_MEM_POOL_H +#define PARQUET_UTIL_MEM_POOL_H + +#include +#include +#include +#include +#include + +#include "parquet/util/logging.h" +#include "parquet/util/bit-util.h" + +namespace parquet_cpp { + +/// A MemPool maintains a list of memory chunks from which it allocates memory in +/// response to Allocate() calls; +/// Chunks stay around for the lifetime of the mempool or until they are passed on to +/// another mempool. +// +/// An Allocate() call will attempt to allocate memory from the chunk that was most +/// recently added; if that chunk doesn't have enough memory to +/// satisfy the allocation request, the free chunks are searched for one that is +/// big enough otherwise a new chunk is added to the list. +/// The current_chunk_idx_ always points to the last chunk with allocated memory. +/// In order to keep allocation overhead low, chunk sizes double with each new one +/// added, until they hit a maximum size. +// +/// Example: +/// MemPool* p = new MemPool(); +/// for (int i = 0; i < 1024; ++i) { +/// returns 8-byte aligned memory (effectively 24 bytes): +/// .. = p->Allocate(17); +/// } +/// at this point, 17K have been handed out in response to Allocate() calls and +/// 28K of chunks have been allocated (chunk sizes: 4K, 8K, 16K) +/// We track total and peak allocated bytes. At this point they would be the same: +/// 28k bytes. A call to Clear will return the allocated memory so +/// total_allocate_bytes_ +/// becomes 0 while peak_allocate_bytes_ remains at 28k. +/// p->Clear(); +/// the entire 1st chunk is returned: +/// .. = p->Allocate(4 * 1024); +/// 4K of the 2nd chunk are returned: +/// .. = p->Allocate(4 * 1024); +/// a new 20K chunk is created +/// .. = p->Allocate(20 * 1024); +// +/// MemPool* p2 = new MemPool(); +/// the new mempool receives all chunks containing data from p +/// p2->AcquireData(p, false); +/// At this point p.total_allocated_bytes_ would be 0 while p.peak_allocated_bytes_ +/// remains unchanged. +/// The one remaining (empty) chunk is released: +/// delete p; + +class MemPool { + public: + MemPool(); + + /// Frees all chunks of memory and subtracts the total allocated bytes + /// from the registered limits. + ~MemPool(); + + /// Allocates 8-byte aligned section of memory of 'size' bytes at the end + /// of the the current chunk. Creates a new chunk if there aren't any chunks + /// with enough capacity. + uint8_t* Allocate(int size) { + return Allocate(size); + } + + /// Returns 'byte_size' to the current chunk back to the mem pool. This can + /// only be used to return either all or part of the previous allocation returned + /// by Allocate(). + void ReturnPartialAllocation(int byte_size) { + DCHECK_GE(byte_size, 0); + DCHECK(current_chunk_idx_ != -1); + ChunkInfo& info = chunks_[current_chunk_idx_]; + DCHECK_GE(info.allocated_bytes, byte_size); + info.allocated_bytes -= byte_size; + total_allocated_bytes_ -= byte_size; + } + + /// Makes all allocated chunks available for re-use, but doesn't delete any chunks. + void Clear(); + + /// Deletes all allocated chunks. FreeAll() or AcquireData() must be called for + /// each mem pool + void FreeAll(); + + /// Absorb all chunks that hold data from src. If keep_current is true, let src hold on + /// to its last allocated chunk that contains data. + /// All offsets handed out by calls to GetCurrentOffset() for 'src' become invalid. + void AcquireData(MemPool* src, bool keep_current); + + std::string DebugString(); + + int64_t total_allocated_bytes() const { return total_allocated_bytes_; } + int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; } + int64_t total_reserved_bytes() const { return total_reserved_bytes_; } + + /// Return sum of chunk_sizes_. + int64_t GetTotalChunkSizes() const; + + private: + friend class MemPoolTest; + static const int INITIAL_CHUNK_SIZE = 4 * 1024; + + /// The maximum size of chunk that should be allocated. Allocations larger than this + /// size will get their own individual chunk. + static const int MAX_CHUNK_SIZE = 1024 * 1024; + + struct ChunkInfo { + uint8_t* data; // Owned by the ChunkInfo. + int64_t size; // in bytes + + /// bytes allocated via Allocate() in this chunk + int64_t allocated_bytes; + + explicit ChunkInfo(int64_t size, uint8_t* buf); + + ChunkInfo() + : data(NULL), + size(0), + allocated_bytes(0) {} + }; + + /// chunk from which we served the last Allocate() call; + /// always points to the last chunk that contains allocated data; + /// chunks 0..current_chunk_idx_ are guaranteed to contain data + /// (chunks_[i].allocated_bytes > 0 for i: 0..current_chunk_idx_); + /// -1 if no chunks present + int current_chunk_idx_; + + /// The size of the next chunk to allocate. + int64_t next_chunk_size_; + + /// sum of allocated_bytes_ + int64_t total_allocated_bytes_; + + /// Maximum number of bytes allocated from this pool at one time. + int64_t peak_allocated_bytes_; + + /// sum of all bytes allocated in chunks_ + int64_t total_reserved_bytes_; + + std::vector chunks_; + + /// Find or allocated a chunk with at least min_size spare capacity and update + /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_ + /// if a new chunk needs to be created. + bool FindChunk(int64_t min_size); + + /// Check integrity of the supporting data structures; always returns true but DCHECKs + /// all invariants. + /// If 'current_chunk_empty' is false, checks that the current chunk contains data. + bool CheckIntegrity(bool current_chunk_empty); + + /// Return offset to unoccpied space in current chunk. + int GetFreeOffset() const { + if (current_chunk_idx_ == -1) return 0; + return chunks_[current_chunk_idx_].allocated_bytes; + } + + template + uint8_t* Allocate(int size) { + if (size == 0) return NULL; + + int64_t num_bytes = BitUtil::RoundUp(size, 8); + if (current_chunk_idx_ == -1 + || num_bytes + chunks_[current_chunk_idx_].allocated_bytes + > chunks_[current_chunk_idx_].size) { + // If we couldn't allocate a new chunk, return NULL. + if (UNLIKELY(!FindChunk(num_bytes))) return NULL; + } + ChunkInfo& info = chunks_[current_chunk_idx_]; + uint8_t* result = info.data + info.allocated_bytes; + DCHECK_LE(info.allocated_bytes + num_bytes, info.size); + info.allocated_bytes += num_bytes; + total_allocated_bytes_ += num_bytes; + DCHECK_LE(current_chunk_idx_, chunks_.size() - 1); + peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_); + return result; + } +}; + +} // namespace parquet_cpp + +#endif // PARQUET_UTIL_MEM_POOL_H diff --git a/cpp/src/parquet/util/output.h b/cpp/src/parquet/util/output.h index 2a43a3614f550..b466e0eede977 100644 --- a/cpp/src/parquet/util/output.h +++ b/cpp/src/parquet/util/output.h @@ -20,7 +20,6 @@ #include #include -#include #include "parquet/util/macros.h" diff --git a/cpp/src/parquet/util/rle-encoding.h b/cpp/src/parquet/util/rle-encoding.h index 22b2c2fcaf0c6..77749f5fe6e36 100644 --- a/cpp/src/parquet/util/rle-encoding.h +++ b/cpp/src/parquet/util/rle-encoding.h @@ -20,8 +20,8 @@ #ifndef PARQUET_UTIL_RLE_ENCODING_H #define PARQUET_UTIL_RLE_ENCODING_H -#include #include +#include #include "parquet/util/compiler-util.h" #include "parquet/util/bit-stream-utils.inline.h" diff --git a/cpp/src/parquet/util/rle-test.cc b/cpp/src/parquet/util/rle-test.cc index df020f511eb72..5f18a6f317fa2 100644 --- a/cpp/src/parquet/util/rle-test.cc +++ b/cpp/src/parquet/util/rle-test.cc @@ -17,17 +17,18 @@ // From Apache Impala as of 2016-01-29 +#include +#include #include #include + +#include + #include #include #include #include -#include -#include -#include - #include "parquet/util/rle-encoding.h" #include "parquet/util/bit-stream-utils.inline.h" diff --git a/cpp/src/parquet/util/sse-util.h b/cpp/src/parquet/util/sse-util.h index 588c30a07f238..29bf2f9adf2a6 100644 --- a/cpp/src/parquet/util/sse-util.h +++ b/cpp/src/parquet/util/sse-util.h @@ -25,6 +25,7 @@ namespace parquet_cpp { + /// This class contains constants useful for text processing with SSE4.2 intrinsics. namespace SSEUtil { /// Number of characters that fit in 64/128 bit register. SSE provides instructions @@ -93,11 +94,17 @@ namespace SSEUtil { template static inline __m128i SSE4_cmpestrm(__m128i str1, int len1, __m128i str2, int len2) { +#ifdef __clang__ /// Use asm reg rather than Yz output constraint to workaround LLVM bug 13199 - /// clang doesn't support Y-prefixed asm constraints. register volatile __m128i result asm("xmm0"); __asm__ volatile ("pcmpestrm %5, %2, %1" : "=x"(result) : "x"(str1), "xm"(str2), "a"(len1), "d"(len2), "i"(MODE) : "cc"); +#else + __m128i result; + __asm__ volatile ("pcmpestrm %5, %2, %1" + : "=Yz"(result) : "x"(str1), "xm"(str2), "a"(len1), "d"(len2), "i"(MODE) : "cc"); +#endif return result; } @@ -114,11 +121,22 @@ static inline uint32_t SSE4_crc32_u8(uint32_t crc, uint8_t v) { return crc; } +static inline uint32_t SSE4_crc32_u16(uint32_t crc, uint16_t v) { + __asm__("crc32w %1, %0" : "+r"(crc) : "rm"(v)); + return crc; +} + static inline uint32_t SSE4_crc32_u32(uint32_t crc, uint32_t v) { __asm__("crc32l %1, %0" : "+r"(crc) : "rm"(v)); return crc; } +static inline uint32_t SSE4_crc32_u64(uint32_t crc, uint64_t v) { + uint64_t result = crc; + __asm__("crc32q %1, %0" : "+r"(result) : "rm"(v)); + return result; +} + static inline int64_t POPCNT_popcnt_u64(uint64_t a) { int64_t result; __asm__("popcntq %1, %0" : "=r"(result) : "mr"(a) : "cc"); @@ -148,7 +166,9 @@ static inline int SSE4_cmpestri( } #define SSE4_crc32_u8 _mm_crc32_u8 +#define SSE4_crc32_u16 _mm_crc32_u16 #define SSE4_crc32_u32 _mm_crc32_u32 +#define SSE4_crc32_u64 _mm_crc32_u64 #define POPCNT_popcnt_u64 _mm_popcnt_u64 #else // IR_COMPILE without SSE 4.2. @@ -174,11 +194,21 @@ static inline uint32_t SSE4_crc32_u8(uint32_t crc, uint8_t v) { return 0; } +static inline uint32_t SSE4_crc32_u16(uint32_t crc, uint16_t v) { + DCHECK(false) << "CPU doesn't support SSE 4.2"; + return 0; +} + static inline uint32_t SSE4_crc32_u32(uint32_t crc, uint32_t v) { DCHECK(false) << "CPU doesn't support SSE 4.2"; return 0; } +static inline uint32_t SSE4_crc32_u64(uint32_t crc, uint64_t v) { + DCHECK(false) << "CPU doesn't support SSE 4.2"; + return 0; +} + static inline int64_t POPCNT_popcnt_u64(uint64_t a) { DCHECK(false) << "CPU doesn't support SSE 4.2"; return 0; diff --git a/cpp/src/parquet/util/stopwatch.h b/cpp/src/parquet/util/stopwatch.h index 076cfc819f6d1..14da2c4302d87 100644 --- a/cpp/src/parquet/util/stopwatch.h +++ b/cpp/src/parquet/util/stopwatch.h @@ -18,11 +18,12 @@ #ifndef PARQUET_UTIL_STOPWATCH_H #define PARQUET_UTIL_STOPWATCH_H -#include #include -#include #include +#include +#include + namespace parquet_cpp { class StopWatch { diff --git a/cpp/src/parquet/util/test-common.h b/cpp/src/parquet/util/test-common.h index 49c4131e51d32..9975ed9fa4ddb 100644 --- a/cpp/src/parquet/util/test-common.h +++ b/cpp/src/parquet/util/test-common.h @@ -31,6 +31,10 @@ namespace parquet_cpp { namespace test { +typedef ::testing::Types ParquetTypes; + template static inline void assert_vector_equal(const vector& left, const vector& right) { @@ -167,9 +171,9 @@ void random_fixed_byte_array(int n, uint32_t seed, uint8_t *buf, int len, } void random_byte_array(int n, uint32_t seed, uint8_t *buf, - ByteArray* out, int max_size) { + ByteArray* out, int min_size, int max_size) { std::mt19937 gen(seed); - std::uniform_int_distribution d1(0, max_size); + std::uniform_int_distribution d1(min_size, max_size); std::uniform_int_distribution d2(0, 255); for (int i = 0; i < n; ++i) { out[i].len = d1(gen); @@ -181,6 +185,11 @@ void random_byte_array(int n, uint32_t seed, uint8_t *buf, } } +void random_byte_array(int n, uint32_t seed, uint8_t *buf, + ByteArray* out, int max_size) { + random_byte_array(n, seed, buf, out, 0, max_size); +} + } // namespace test } // namespace parquet_cpp