Skip to content

Commit

Permalink
PARQUET-782: Support writing to Arrow sinks
Browse files Browse the repository at this point in the history
Author: Uwe L. Korn <uwelk@xhochy.com>

Closes apache#196 from xhochy/PARQUET-782 and squashes the following commits:

b89738a [Uwe L. Korn] Update arrow hash
041f66d [Uwe L. Korn] PARQUET-782: Support writing to Arrow sinks

Change-Id: I72112052137f23218d5516b92c41efa7743e2db5
  • Loading branch information
xhochy authored and wesm committed Nov 27, 2016
1 parent e822ebb commit 3527d29
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 11 deletions.
40 changes: 37 additions & 3 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "parquet/arrow/test-util.h"
#include "parquet/arrow/writer.h"

#include "arrow/io/memory.h"
#include "arrow/test-util.h"
#include "arrow/types/construct.h"
#include "arrow/types/primitive.h"
Expand Down Expand Up @@ -342,6 +343,29 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
this->ReadAndCheckSingleColumnTable(values);
}

TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) {
std::shared_ptr<Array> values;
ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values));
std::shared_ptr<Table> table = MakeSimpleTable(values, false);
this->sink_ = std::make_shared<InMemoryOutputStream>();
auto buffer = std::make_shared<::arrow::PoolBuffer>();
auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer);
ASSERT_OK_NO_THROW(WriteFlatTable(
table.get(), default_memory_pool(), arrow_sink_, 512, default_writer_properties()));

std::shared_ptr<ParquetBuffer> pbuffer =
std::make_shared<ParquetBuffer>(buffer->data(), buffer->size());
std::unique_ptr<RandomAccessSource> source(new BufferReader(pbuffer));
std::shared_ptr<::arrow::Table> out;
this->ReadTableFromFile(ParquetFileReader::Open(std::move(source)), &out);
ASSERT_EQ(1, out->num_columns());
ASSERT_EQ(values->length(), out->num_rows());

std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
}

TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
int64_t chunk_size = SMALL_SIZE / 4;
std::shared_ptr<Array> values;
Expand Down Expand Up @@ -456,10 +480,20 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
template <typename T>
using ParquetCDataType = typename ParquetDataType<T>::c_type;

template <typename T>
struct c_type_trait {
using ArrowCType = typename T::c_type;
};

template <>
struct c_type_trait<::arrow::BooleanType> {
using ArrowCType = uint8_t;
};

template <typename TestType>
class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
public:
typedef typename TestType::c_type T;
typedef typename c_type_trait<TestType>::ArrowCType T;

void MakeTestFile(std::vector<T>& values, int num_chunks,
std::unique_ptr<ParquetFileReader>* file_reader) {
Expand Down Expand Up @@ -497,7 +531,7 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {

std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get());
ExpectArrayT<TestType>(values.data(), chunked_array->chunk(0).get());
}

void CheckSingleColumnRequiredRead(int num_chunks) {
Expand All @@ -508,7 +542,7 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
std::shared_ptr<Array> out;
this->ReadSingleColumnFile(std::move(file_reader), &out);

ExpectArray<TestType>(values.data(), out.get());
ExpectArrayT<TestType>(values.data(), out.get());
}
};

Expand Down
20 changes: 20 additions & 0 deletions cpp/src/parquet/arrow/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,25 @@ std::shared_ptr<Buffer> ParquetReadSource::Read(int64_t nbytes) {
return result;
}

ParquetWriteSink::ParquetWriteSink(
const std::shared_ptr<::arrow::io::OutputStream>& stream)
: stream_(stream) {}

ParquetWriteSink::~ParquetWriteSink() {}

void ParquetWriteSink::Close() {
PARQUET_THROW_NOT_OK(stream_->Close());
}

int64_t ParquetWriteSink::Tell() {
int64_t position;
PARQUET_THROW_NOT_OK(stream_->Tell(&position));
return position;
}

void ParquetWriteSink::Write(const uint8_t* data, int64_t length) {
PARQUET_THROW_NOT_OK(stream_->Write(data, length));
}

} // namespace arrow
} // namespace parquet
19 changes: 19 additions & 0 deletions cpp/src/parquet/arrow/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ class PARQUET_EXPORT ParquetReadSource : public RandomAccessSource {
ParquetAllocator* allocator_;
};

class PARQUET_EXPORT ParquetWriteSink : public OutputStream {
public:
explicit ParquetWriteSink(const std::shared_ptr<::arrow::io::OutputStream>& stream);

virtual ~ParquetWriteSink();

// Close the output stream
void Close() override;

// Return the current position in the output stream relative to the start
int64_t Tell() override;

// Copy bytes into the output stream
void Write(const uint8_t* data, int64_t length) override;

private:
std::shared_ptr<::arrow::io::OutputStream> stream_;
};

} // namespace arrow
} // namespace parquet

Expand Down
73 changes: 73 additions & 0 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,79 @@ Status FlatColumnReader::Impl::TypedReadBatch(
}
}

template <>
Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
int batch_size, std::shared_ptr<Array>* out) {
int values_to_read = batch_size;
RETURN_NOT_OK(InitDataBuffer<::arrow::BooleanType>(batch_size));
valid_bits_idx_ = 0;
if (descr_->max_definition_level() > 0) {
valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_);
int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size) / 8;
valid_bits_buffer_->Resize(valid_bits_size);
valid_bits_ptr_ = valid_bits_buffer_->mutable_data();
memset(valid_bits_ptr_, 0, valid_bits_size);
null_count_ = 0;
}

while ((values_to_read > 0) && column_reader_) {
values_buffer_.Resize(values_to_read * sizeof(bool));
if (descr_->max_definition_level() > 0) {
def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
}
auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get());
int64_t values_read;
int64_t levels_read;
int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
values_to_read, def_levels, nullptr, values, &values_read));
values_to_read -= levels_read;
if (descr_->max_definition_level() == 0) {
ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(values, values_read);
} else {
// As per the defintion and checks for flat columns:
// descr_->max_definition_level() == 1
ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
def_levels, values, values_read, levels_read);
}
if (!column_reader_->HasNext()) { NextRowGroup(); }
}

if (descr_->max_definition_level() > 0) {
// TODO: Shrink arrays in the case they are too large
if (valid_bits_idx_ < batch_size * 0.8) {
// Shrink arrays as they are larger than the output.
// TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays
// without the need for a copy. Given a decent underlying allocator this
// should still free some underlying pages to the OS.

auto data_buffer = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(bool)));
memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size());
data_buffer_ = data_buffer;

auto valid_bits_buffer = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(
valid_bits_buffer->Resize(::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8));
memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(),
valid_bits_buffer->size());
valid_bits_buffer_ = valid_bits_buffer;
}
*out = std::make_shared<::arrow::BooleanArray>(
field_->type, valid_bits_idx_, data_buffer_, null_count_, valid_bits_buffer_);
// Relase the ownership
data_buffer_.reset();
valid_bits_buffer_.reset();
return Status::OK();
} else {
*out = std::make_shared<::arrow::BooleanArray>(
field_->type, valid_bits_idx_, data_buffer_);
data_buffer_.reset();
return Status::OK();
}
}

template <>
Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
int batch_size, std::shared_ptr<Array>* out) {
Expand Down
20 changes: 12 additions & 8 deletions cpp/src/parquet/arrow/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ using is_arrow_int = std::is_integral<typename ArrowType::c_type>;
template <typename ArrowType>
using is_arrow_string = std::is_same<ArrowType, ::arrow::StringType>;

template <typename ArrowType>
using is_arrow_bool = std::is_same<ArrowType, ::arrow::BooleanType>;

template <class ArrowType>
typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullArray(
size_t size, std::shared_ptr<Array>* out) {
Expand Down Expand Up @@ -70,8 +73,9 @@ typename std::enable_if<is_arrow_string<ArrowType>::value, Status>::type NonNull
return builder.Finish(out);
}

template <>
Status NonNullArray<::arrow::BooleanType>(size_t size, std::shared_ptr<Array>* out) {
template <class ArrowType>
typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullArray(
size_t size, std::shared_ptr<Array>* out) {
std::vector<uint8_t> values;
::arrow::test::randint<uint8_t>(size, 0, 1, &values);
::arrow::BooleanBuilder builder(
Expand Down Expand Up @@ -135,8 +139,8 @@ typename std::enable_if<is_arrow_string<ArrowType>::value, Status>::type Nullabl
}

// This helper function only supports (size/2) nulls yet.
template <>
Status NullableArray<::arrow::BooleanType>(
template <class ArrowType>
typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableArray(
size_t size, size_t num_nulls, std::shared_ptr<Array>* out) {
std::vector<uint8_t> values;
::arrow::test::randint<uint8_t>(size, 0, 1, &values);
Expand Down Expand Up @@ -176,19 +180,19 @@ void ExpectArray(T* expected, Array* result) {
}

template <typename ArrowType>
void ExpectArray(typename ArrowType::c_type* expected, Array* result) {
void ExpectArrayT(void* expected, Array* result) {
::arrow::PrimitiveArray* p_array = static_cast<::arrow::PrimitiveArray*>(result);
for (int64_t i = 0; i < result->length(); i++) {
EXPECT_EQ(expected[i],
EXPECT_EQ(reinterpret_cast<typename ArrowType::c_type*>(expected)[i],
reinterpret_cast<const typename ArrowType::c_type*>(p_array->data()->data())[i]);
}
}

template <>
void ExpectArray<::arrow::BooleanType>(uint8_t* expected, Array* result) {
void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) {
::arrow::BooleanBuilder builder(
::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>());
builder.Append(expected, result->length());
builder.Append(reinterpret_cast<uint8_t*>(expected), result->length());

std::shared_ptr<Array> expected_array;
EXPECT_OK(builder.Finish(&expected_array));
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <vector>

#include "parquet/arrow/io.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/utils.h"

Expand Down Expand Up @@ -370,6 +371,13 @@ Status WriteFlatTable(const Table* table, MemoryPool* pool,
return writer.Close();
}

Status WriteFlatTable(const Table* table, MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
const std::shared_ptr<WriterProperties>& properties) {
auto parquet_sink = std::make_shared<ParquetWriteSink>(sink);
return WriteFlatTable(table, pool, parquet_sink, chunk_size, properties);
}

} // namespace arrow

} // namespace parquet
7 changes: 7 additions & 0 deletions cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "parquet/api/schema.h"
#include "parquet/api/writer.h"

#include "arrow/io/interfaces.h"

namespace arrow {

class Array;
Expand Down Expand Up @@ -71,6 +73,11 @@ ::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table,
int64_t chunk_size,
const std::shared_ptr<WriterProperties>& properties = default_writer_properties());

::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table,
::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink,
int64_t chunk_size,
const std::shared_ptr<WriterProperties>& properties = default_writer_properties());

} // namespace arrow

} // namespace parquet
Expand Down

0 comments on commit 3527d29

Please sign in to comment.