Skip to content

Commit

Permalink
PARQUET-764: Support batches for PLAIN boolean writes that aren't a m…
Browse files Browse the repository at this point in the history
…ultiple of 8

cc @majetideepak

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes apache#185 from xhochy/PARQUET-764 and squashes the following commits:

926e61f [Uwe L. Korn] Get rid of some re-allocations
e12dc4e [Uwe L. Korn] Fix multiline comment
2ef2da5 [Uwe L. Korn] PARQUET-764: Support batches for PLAIN boolean writes that aren't a multiple of 8

Change-Id: Idb13885291c05933fef7f817f3f50c6ed567be10
  • Loading branch information
xhochy authored and wesm committed Nov 6, 2016
1 parent bdb3da3 commit 0dc2120
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 42 deletions.
20 changes: 18 additions & 2 deletions cpp/src/parquet/column/column-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compressio
}

typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
BooleanType, ByteArrayType, FLBAType>
TestTypes;
BooleanType, ByteArrayType, FLBAType> TestTypes;

TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);

Expand Down Expand Up @@ -421,5 +420,22 @@ TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
ASSERT_EQ(0, this->values_read_);
}

// PARQUET-764
// Correct bitpacking for boolean write at non-byte boundaries
using TestBooleanValuesWriter = TestPrimitiveWriter<BooleanType>;
TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) {
this->SetUpSchema(Repetition::REQUIRED);
auto writer = this->BuildWriter();
for (int i = 0; i < SMALL_SIZE; i++) {
bool value = (i % 2 == 0) ? true : false;
writer->WriteBatch(1, nullptr, nullptr, &value);
}
writer->Close();
this->ReadColumn();
for (int i = 0; i < SMALL_SIZE; i++) {
ASSERT_EQ((i % 2 == 0) ? true : false, this->values_out_[i]) << i;
}
}

} // namespace test
} // namespace parquet
97 changes: 57 additions & 40 deletions cpp/src/parquet/encodings/plain-encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,59 +181,76 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
explicit PlainEncoder(
const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
: Encoder<BooleanType>(descr, Encoding::PLAIN, allocator),
values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) {}
bits_available_(IN_MEMORY_DEFAULT_CAPACITY * 8),
bits_buffer_(IN_MEMORY_DEFAULT_CAPACITY, allocator),
values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) {
bit_writer_.reset(new BitWriter(bits_buffer_.mutable_data(), bits_buffer_.size()));
}

int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); }
int64_t EstimatedDataEncodedSize() override {
return values_sink_->Tell() + bit_writer_->bytes_written();
}

std::shared_ptr<Buffer> FlushValues() override {
if (bits_available_ > 0) {
bit_writer_->Flush();
values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written());
bits_available_ = 0;
bit_writer_->Clear();
bits_available_ = bits_buffer_.size() * 8;
}

std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer();
values_sink_.reset(
new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, this->allocator_));
return buffer;
}

void Put(const bool* src, int num_values) override {
Encode(src, num_values, values_sink_.get());
}

void Put(const std::vector<bool>& src, int num_values) {
Encode(src, num_values, values_sink_.get());
}

void Encode(const bool* src, int num_values, OutputStream* dst) {
int bytes_required = BitUtil::Ceil(num_values, 8);
OwnedMutableBuffer tmp_buffer(bytes_required, allocator_);

BitWriter bit_writer(&tmp_buffer[0], bytes_required);
for (int i = 0; i < num_values; ++i) {
bit_writer.PutValue(src[i], 1);
}
bit_writer.Flush();

// Write the result to the output stream
dst->Write(bit_writer.buffer(), bit_writer.bytes_written());
#define PLAINDECODER_BOOLEAN_PUT(input_type, function_attributes) \
void Put(input_type src, int num_values) function_attributes { \
int bit_offset = 0; \
if (bits_available_ > 0) { \
int bits_to_write = std::min(bits_available_, num_values); \
for (int i = 0; i < bits_to_write; i++) { \
bit_writer_->PutValue(src[i], 1); \
} \
bits_available_ -= bits_to_write; \
bit_offset = bits_to_write; \
\
if (bits_available_ == 0) { \
bit_writer_->Flush(); \
values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \
bit_writer_->Clear(); \
} \
} \
\
int bits_remaining = num_values - bit_offset; \
while (bit_offset < num_values) { \
bits_available_ = bits_buffer_.size() * 8; \
\
int bits_to_write = std::min(bits_available_, bits_remaining); \
for (int i = bit_offset; i < bit_offset + bits_to_write; i++) { \
bit_writer_->PutValue(src[i], 1); \
} \
bit_offset += bits_to_write; \
bits_available_ -= bits_to_write; \
bits_remaining -= bits_to_write; \
\
if (bits_available_ == 0) { \
bit_writer_->Flush(); \
values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written()); \
bit_writer_->Clear(); \
} \
} \
}

void Encode(const std::vector<bool>& src, int num_values, OutputStream* dst) {
int bytes_required = BitUtil::Ceil(num_values, 8);

// TODO(wesm)
// Use a temporary buffer for now and copy, because the BitWriter is not
// aware of OutputStream. Later we can add some kind of Request/Flush API
// to OutputStream
OwnedMutableBuffer tmp_buffer(bytes_required, allocator_);

BitWriter bit_writer(&tmp_buffer[0], bytes_required);
for (int i = 0; i < num_values; ++i) {
bit_writer.PutValue(src[i], 1);
}
bit_writer.Flush();

// Write the result to the output stream
dst->Write(bit_writer.buffer(), bit_writer.bytes_written());
}
PLAINDECODER_BOOLEAN_PUT(const bool*, override)
PLAINDECODER_BOOLEAN_PUT(const std::vector<bool>&, )

protected:
int bits_available_;
std::unique_ptr<BitWriter> bit_writer_;
OwnedMutableBuffer bits_buffer_;
std::shared_ptr<InMemoryOutputStream> values_sink_;
};

Expand Down

0 comments on commit 0dc2120

Please sign in to comment.