From 0dc2120c2a18bc4bc0ba5a92c4b608ab919ff9fe Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 6 Nov 2016 14:18:11 -0500 Subject: [PATCH] PARQUET-764: Support batches for PLAIN boolean writes that aren't a multiple of 8 cc @majetideepak Author: Uwe L. Korn Closes #185 from xhochy/PARQUET-764 and squashes the following commits: 926e61f [Uwe L. Korn] Get rid of some re-allocations e12dc4e [Uwe L. Korn] Fix multiline comment 2ef2da5 [Uwe L. Korn] PARQUET-764: Support batches for PLAIN boolean writes that aren't a multiple of 8 Change-Id: Idb13885291c05933fef7f817f3f50c6ed567be10 --- cpp/src/parquet/column/column-writer-test.cc | 20 +++- cpp/src/parquet/encodings/plain-encoding.h | 97 ++++++++++++-------- 2 files changed, 75 insertions(+), 42 deletions(-) diff --git a/cpp/src/parquet/column/column-writer-test.cc b/cpp/src/parquet/column/column-writer-test.cc index 2269e8f736d2c..0a20ac16ed7e6 100644 --- a/cpp/src/parquet/column/column-writer-test.cc +++ b/cpp/src/parquet/column/column-writer-test.cc @@ -214,8 +214,7 @@ void TestPrimitiveWriter::ReadColumnFully(Compression::type compressio } typedef ::testing::Types - TestTypes; + BooleanType, ByteArrayType, FLBAType> TestTypes; TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes); @@ -421,5 +420,22 @@ TEST_F(TestNullValuesWriter, OptionalNullValueChunk) { ASSERT_EQ(0, this->values_read_); } +// PARQUET-764 +// Correct bitpacking for boolean write at non-byte boundaries +using TestBooleanValuesWriter = TestPrimitiveWriter; +TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) { + this->SetUpSchema(Repetition::REQUIRED); + auto writer = this->BuildWriter(); + for (int i = 0; i < SMALL_SIZE; i++) { + bool value = (i % 2 == 0) ? true : false; + writer->WriteBatch(1, nullptr, nullptr, &value); + } + writer->Close(); + this->ReadColumn(); + for (int i = 0; i < SMALL_SIZE; i++) { + ASSERT_EQ((i % 2 == 0) ? true : false, this->values_out_[i]) << i; + } +} + } // namespace test } // namespace parquet diff --git a/cpp/src/parquet/encodings/plain-encoding.h b/cpp/src/parquet/encodings/plain-encoding.h index eee3f65f5fa89..b960bd27e3acf 100644 --- a/cpp/src/parquet/encodings/plain-encoding.h +++ b/cpp/src/parquet/encodings/plain-encoding.h @@ -181,59 +181,76 @@ class PlainEncoder : public Encoder { explicit PlainEncoder( const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator()) : Encoder(descr, Encoding::PLAIN, allocator), - values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) {} + bits_available_(IN_MEMORY_DEFAULT_CAPACITY * 8), + bits_buffer_(IN_MEMORY_DEFAULT_CAPACITY, allocator), + values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) { + bit_writer_.reset(new BitWriter(bits_buffer_.mutable_data(), bits_buffer_.size())); + } - int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); } + int64_t EstimatedDataEncodedSize() override { + return values_sink_->Tell() + bit_writer_->bytes_written(); + } std::shared_ptr FlushValues() override { + if (bits_available_ > 0) { + bit_writer_->Flush(); + values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); + bits_available_ = 0; + bit_writer_->Clear(); + bits_available_ = bits_buffer_.size() * 8; + } + std::shared_ptr buffer = values_sink_->GetBuffer(); values_sink_.reset( new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, this->allocator_)); return buffer; } - void Put(const bool* src, int num_values) override { - Encode(src, num_values, values_sink_.get()); - } - - void Put(const std::vector& src, int num_values) { - Encode(src, num_values, values_sink_.get()); - } - - void Encode(const bool* src, int num_values, OutputStream* dst) { - int bytes_required = BitUtil::Ceil(num_values, 8); - OwnedMutableBuffer tmp_buffer(bytes_required, allocator_); - - BitWriter bit_writer(&tmp_buffer[0], bytes_required); - for (int i = 0; i < num_values; ++i) { - bit_writer.PutValue(src[i], 1); - } - bit_writer.Flush(); - - // Write the result to the output stream - dst->Write(bit_writer.buffer(), bit_writer.bytes_written()); +#define PLAINDECODER_BOOLEAN_PUT(input_type, function_attributes) \ + void Put(input_type src, int num_values) function_attributes { \ + int bit_offset = 0; \ + if (bits_available_ > 0) { \ + int bits_to_write = std::min(bits_available_, num_values); \ + for (int i = 0; i < bits_to_write; i++) { \ + bit_writer_->PutValue(src[i], 1); \ + } \ + bits_available_ -= bits_to_write; \ + bit_offset = bits_to_write; \ + \ + if (bits_available_ == 0) { \ + bit_writer_->Flush(); \ + values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \ + bit_writer_->Clear(); \ + } \ + } \ + \ + int bits_remaining = num_values - bit_offset; \ + while (bit_offset < num_values) { \ + bits_available_ = bits_buffer_.size() * 8; \ + \ + int bits_to_write = std::min(bits_available_, bits_remaining); \ + for (int i = bit_offset; i < bit_offset + bits_to_write; i++) { \ + bit_writer_->PutValue(src[i], 1); \ + } \ + bit_offset += bits_to_write; \ + bits_available_ -= bits_to_write; \ + bits_remaining -= bits_to_write; \ + \ + if (bits_available_ == 0) { \ + bit_writer_->Flush(); \ + values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \ + bit_writer_->Clear(); \ + } \ + } \ } - void Encode(const std::vector& src, int num_values, OutputStream* dst) { - int bytes_required = BitUtil::Ceil(num_values, 8); - - // TODO(wesm) - // Use a temporary buffer for now and copy, because the BitWriter is not - // aware of OutputStream. Later we can add some kind of Request/Flush API - // to OutputStream - OwnedMutableBuffer tmp_buffer(bytes_required, allocator_); - - BitWriter bit_writer(&tmp_buffer[0], bytes_required); - for (int i = 0; i < num_values; ++i) { - bit_writer.PutValue(src[i], 1); - } - bit_writer.Flush(); - - // Write the result to the output stream - dst->Write(bit_writer.buffer(), bit_writer.bytes_written()); - } + PLAINDECODER_BOOLEAN_PUT(const bool*, override) + PLAINDECODER_BOOLEAN_PUT(const std::vector&, ) protected: + int bits_available_; + std::unique_ptr bit_writer_; + OwnedMutableBuffer bits_buffer_; std::shared_ptr values_sink_; };