Skip to content

Commit

Permalink
PARQUET-762: C++: Use optimistic allocation instead of Arrow Builders
Browse files Browse the repository at this point in the history
About 2x as fast on non-string data.

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes apache#183 from xhochy/PARQUET-762 and squashes the following commits:

d1d11b5 [Uwe L. Korn] PARQUET-762: C++: Use optimistic allocation instead of Arrow Builders

Change-Id: Ie5cb5255fd0c7a777573dc182d44cba06caa6299
  • Loading branch information
xhochy authored and wesm committed Nov 6, 2016
1 parent 0dc2120 commit 28b2011
Showing 1 changed file with 121 additions and 58 deletions.
179 changes: 121 additions & 58 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "arrow/table.h"
#include "arrow/types/primitive.h"
#include "arrow/types/string.h"
#include "arrow/util/bit-util.h"
#include "arrow/util/status.h"

using arrow::Array;
Expand All @@ -50,16 +51,16 @@ namespace arrow {

template <typename ArrowType>
struct ArrowTypeTraits {
typedef ::arrow::NumericBuilder<ArrowType> builder_type;
typedef ::arrow::NumericArray<ArrowType> array_type;
};

template <>
struct ArrowTypeTraits<BooleanType> {
typedef ::arrow::BooleanBuilder builder_type;
struct ArrowTypeTraits<::arrow::BooleanType> {
typedef ::arrow::BooleanArray array_type;
};

template <typename ArrowType>
using BuilderType = typename ArrowTypeTraits<ArrowType>::builder_type;
using ArrayType = typename ArrowTypeTraits<ArrowType>::array_type;

class FileReader::Impl {
public:
Expand Down Expand Up @@ -87,13 +88,13 @@ class FlatColumnReader::Impl {
template <typename ArrowType, typename ParquetType>
Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);

template <typename ArrowType>
Status InitDataBuffer(int batch_size);
template <typename ArrowType, typename ParquetType>
Status ReadNullableFlatBatch(const int16_t* def_levels,
typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
BuilderType<ArrowType>* builder);
void ReadNullableFlatBatch(const int16_t* def_levels,
typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read);
template <typename ArrowType, typename ParquetType>
Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read,
BuilderType<ArrowType>* builder);
void ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read);

private:
void NextRowGroup();
Expand All @@ -106,26 +107,6 @@ class FlatColumnReader::Impl {
(sizeof(InType) == sizeof(OutType)));
};

template <typename InType, typename OutType,
typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
Status ConvertPhysicalType(
const InType* in_ptr, int64_t length, const OutType** out_ptr) {
*out_ptr = reinterpret_cast<const OutType*>(in_ptr);
return Status::OK();
}

template <typename InType, typename OutType,
typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
Status ConvertPhysicalType(
const InType* in_ptr, int64_t length, const OutType** out_ptr) {
RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType)));
OutType* mutable_out_ptr =
reinterpret_cast<OutType*>(values_builder_buffer_.mutable_data());
std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
*out_ptr = mutable_out_ptr;
return Status::OK();
}

MemoryPool* pool_;
const ColumnDescriptor* descr_;
ParquetFileReader* reader_;
Expand All @@ -136,8 +117,12 @@ class FlatColumnReader::Impl {

PoolBuffer values_buffer_;
PoolBuffer def_levels_buffer_;
PoolBuffer values_builder_buffer_;
PoolBuffer valid_bytes_buffer_;
std::shared_ptr<PoolBuffer> data_buffer_;
uint8_t* data_buffer_ptr_;
std::shared_ptr<PoolBuffer> valid_bits_buffer_;
uint8_t* valid_bits_ptr_;
int64_t valid_bits_idx_;
int64_t null_count_;
};

FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
Expand Down Expand Up @@ -241,50 +226,97 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr,
}

template <typename ArrowType, typename ParquetType>
Status FlatColumnReader::Impl::ReadNonNullableBatch(typename ParquetType::c_type* values,
int64_t values_read, BuilderType<ArrowType>* builder) {
void FlatColumnReader::Impl::ReadNonNullableBatch(
typename ParquetType::c_type* values, int64_t values_read) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;

DCHECK(builder);
const ArrowCType* values_ptr = nullptr;
RETURN_NOT_OK(
(ConvertPhysicalType<ParquetCType, ArrowCType>(values, values_read, &values_ptr)));
RETURN_NOT_OK(builder->Append(values_ptr, values_read));
return Status::OK();
ArrowCType* out_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
std::copy(values, values + values_read, out_ptr + valid_bits_idx_);
valid_bits_idx_ += values_read;
}

template <>
void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
bool* values, int64_t values_read) {
for (int64_t i = 0; i < values_read; i++) {
if (values[i]) { ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_); }
valid_bits_idx_++;
}
}

template <typename ArrowType, typename ParquetType>
Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
BuilderType<ArrowType>* builder) {
void FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read) {
using ArrowCType = typename ArrowType::c_type;

DCHECK(builder);
RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * sizeof(ArrowCType)));
RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
auto values_ptr = reinterpret_cast<ArrowCType*>(values_builder_buffer_.mutable_data());
uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
auto data_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
int values_idx = 0;
for (int64_t i = 0; i < levels_read; i++) {
if (def_levels[i] < descr_->max_definition_level()) {
valid_bytes[i] = 0;
null_count_++;
} else {
valid_bytes[i] = 1;
values_ptr[i] = values[values_idx++];
::arrow::BitUtil::SetBit(valid_bits_ptr_, valid_bits_idx_);
data_ptr[valid_bits_idx_] = values[values_idx++];
}
valid_bits_idx_++;
}
RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes));
}

template <>
void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
const int16_t* def_levels, bool* values, int64_t values_read, int64_t levels_read) {
int values_idx = 0;
for (int64_t i = 0; i < levels_read; i++) {
if (def_levels[i] < descr_->max_definition_level()) {
null_count_++;
} else {
::arrow::BitUtil::SetBit(valid_bits_ptr_, valid_bits_idx_);
if (values[values_idx++]) {
::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_);
}
}
valid_bits_idx_++;
}
}

template <typename ArrowType>
Status FlatColumnReader::Impl::InitDataBuffer(int batch_size) {
using ArrowCType = typename ArrowType::c_type;
data_buffer_ = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(data_buffer_->Resize(batch_size * sizeof(ArrowCType)));
data_buffer_ptr_ = data_buffer_->mutable_data();

return Status::OK();
}

template <>
Status FlatColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_size) {
data_buffer_ = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(data_buffer_->Resize(::arrow::BitUtil::CeilByte(batch_size) / 8));
data_buffer_ptr_ = data_buffer_->mutable_data();
memset(data_buffer_ptr_, 0, data_buffer_->size());

return Status::OK();
}

template <typename ArrowType, typename ParquetType>
Status FlatColumnReader::Impl::TypedReadBatch(
int batch_size, std::shared_ptr<Array>* out) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;

int values_to_read = batch_size;
BuilderType<ArrowType> builder(pool_, field_->type);
RETURN_NOT_OK(InitDataBuffer<ArrowType>(batch_size));
valid_bits_idx_ = 0;
if (descr_->max_definition_level() > 0) {
valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_);
int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size) / 8;
valid_bits_buffer_->Resize(valid_bits_size);
valid_bits_ptr_ = valid_bits_buffer_->mutable_data();
memset(valid_bits_ptr_, 0, valid_bits_size);
null_count_ = 0;
}

while ((values_to_read > 0) && column_reader_) {
values_buffer_.Resize(values_to_read * sizeof(ParquetCType));
if (descr_->max_definition_level() > 0) {
Expand All @@ -299,17 +331,48 @@ Status FlatColumnReader::Impl::TypedReadBatch(
values_to_read, def_levels, nullptr, values, &values_read));
values_to_read -= levels_read;
if (descr_->max_definition_level() == 0) {
RETURN_NOT_OK(
(ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read, &builder)));
ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read);
} else {
// As per the defintion and checks for flat columns:
// descr_->max_definition_level() == 1
RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>(
def_levels, values, values_read, levels_read, &builder)));
ReadNullableFlatBatch<ArrowType, ParquetType>(
def_levels, values, values_read, levels_read);
}
if (!column_reader_->HasNext()) { NextRowGroup(); }
}
return builder.Finish(out);

if (descr_->max_definition_level() > 0) {
// TODO: Shrink arrays in the case they are too large
if (valid_bits_idx_ < batch_size * 0.8) {
// Shrink arrays as they are larger than the output.
// TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays
// without the need for a copy. Given a decent underlying allocator this
// should still free some underlying pages to the OS.

auto data_buffer = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(ArrowCType)));
memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size());
data_buffer_ = data_buffer;

auto valid_bits_buffer = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(
valid_bits_buffer->Resize(::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8));
memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(),
valid_bits_buffer->size());
valid_bits_buffer_ = valid_bits_buffer;
}
*out = std::make_shared<ArrayType<ArrowType>>(
field_->type, valid_bits_idx_, data_buffer_, null_count_, valid_bits_buffer_);
// Relase the ownership
data_buffer_.reset();
valid_bits_buffer_.reset();
return Status::OK();
} else {
*out = std::make_shared<ArrayType<ArrowType>>(
field_->type, valid_bits_idx_, data_buffer_);
data_buffer_.reset();
return Status::OK();
}
}

template <>
Expand Down

0 comments on commit 28b2011

Please sign in to comment.