From 3527d2916727f9beae5a263f96dbceefcedcd710 Mon Sep 17 00:00:00 2001 From: "Uwe L. Korn" Date: Sun, 27 Nov 2016 17:24:27 -0500 Subject: [PATCH] PARQUET-782: Support writing to Arrow sinks Author: Uwe L. Korn Closes #196 from xhochy/PARQUET-782 and squashes the following commits: b89738a [Uwe L. Korn] Update arrow hash 041f66d [Uwe L. Korn] PARQUET-782: Support writing to Arrow sinks Change-Id: I72112052137f23218d5516b92c41efa7743e2db5 --- .../parquet/arrow/arrow-reader-writer-test.cc | 40 +++++++++- cpp/src/parquet/arrow/io.cc | 20 +++++ cpp/src/parquet/arrow/io.h | 19 +++++ cpp/src/parquet/arrow/reader.cc | 73 +++++++++++++++++++ cpp/src/parquet/arrow/test-util.h | 20 +++-- cpp/src/parquet/arrow/writer.cc | 8 ++ cpp/src/parquet/arrow/writer.h | 7 ++ 7 files changed, 176 insertions(+), 11 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 5ec70f39f817f..7bcb59033fb60 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -24,6 +24,7 @@ #include "parquet/arrow/test-util.h" #include "parquet/arrow/writer.h" +#include "arrow/io/memory.h" #include "arrow/test-util.h" #include "arrow/types/construct.h" #include "arrow/types/primitive.h" @@ -342,6 +343,29 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { this->ReadAndCheckSingleColumnTable(values); } +TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) { + std::shared_ptr values; + ASSERT_OK(NonNullArray(LARGE_SIZE, &values)); + std::shared_ptr table = MakeSimpleTable(values, false); + this->sink_ = std::make_shared(); + auto buffer = std::make_shared<::arrow::PoolBuffer>(); + auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer); + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), default_memory_pool(), arrow_sink_, 512, default_writer_properties())); + + std::shared_ptr pbuffer = + std::make_shared(buffer->data(), buffer->size()); + std::unique_ptr source(new BufferReader(pbuffer)); + std::shared_ptr<::arrow::Table> out; + this->ReadTableFromFile(ParquetFileReader::Open(std::move(source)), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(values->length(), out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +} + TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { int64_t chunk_size = SMALL_SIZE / 4; std::shared_ptr values; @@ -456,10 +480,20 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) { template using ParquetCDataType = typename ParquetDataType::c_type; +template +struct c_type_trait { + using ArrowCType = typename T::c_type; +}; + +template <> +struct c_type_trait<::arrow::BooleanType> { + using ArrowCType = uint8_t; +}; + template class TestPrimitiveParquetIO : public TestParquetIO { public: - typedef typename TestType::c_type T; + typedef typename c_type_trait::ArrowCType T; void MakeTestFile(std::vector& values, int num_chunks, std::unique_ptr* file_reader) { @@ -497,7 +531,7 @@ class TestPrimitiveParquetIO : public TestParquetIO { std::shared_ptr chunked_array = out->column(0)->data(); ASSERT_EQ(1, chunked_array->num_chunks()); - ExpectArray(values.data(), chunked_array->chunk(0).get()); + ExpectArrayT(values.data(), chunked_array->chunk(0).get()); } void CheckSingleColumnRequiredRead(int num_chunks) { @@ -508,7 +542,7 @@ class TestPrimitiveParquetIO : public TestParquetIO { std::shared_ptr out; this->ReadSingleColumnFile(std::move(file_reader), &out); - ExpectArray(values.data(), out.get()); + ExpectArrayT(values.data(), out.get()); } }; diff --git a/cpp/src/parquet/arrow/io.cc b/cpp/src/parquet/arrow/io.cc index 8f3aa3e8d4142..74464f2f87013 100644 --- a/cpp/src/parquet/arrow/io.cc +++ b/cpp/src/parquet/arrow/io.cc @@ -103,5 +103,25 @@ std::shared_ptr ParquetReadSource::Read(int64_t nbytes) { return result; } +ParquetWriteSink::ParquetWriteSink( + const std::shared_ptr<::arrow::io::OutputStream>& stream) + : stream_(stream) {} + +ParquetWriteSink::~ParquetWriteSink() {} + +void ParquetWriteSink::Close() { + PARQUET_THROW_NOT_OK(stream_->Close()); +} + +int64_t ParquetWriteSink::Tell() { + int64_t position; + PARQUET_THROW_NOT_OK(stream_->Tell(&position)); + return position; +} + +void ParquetWriteSink::Write(const uint8_t* data, int64_t length) { + PARQUET_THROW_NOT_OK(stream_->Write(data, length)); +} + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/io.h b/cpp/src/parquet/arrow/io.h index 119f8de18d8a0..a068a4e318109 100644 --- a/cpp/src/parquet/arrow/io.h +++ b/cpp/src/parquet/arrow/io.h @@ -76,6 +76,25 @@ class PARQUET_EXPORT ParquetReadSource : public RandomAccessSource { ParquetAllocator* allocator_; }; +class PARQUET_EXPORT ParquetWriteSink : public OutputStream { + public: + explicit ParquetWriteSink(const std::shared_ptr<::arrow::io::OutputStream>& stream); + + virtual ~ParquetWriteSink(); + + // Close the output stream + void Close() override; + + // Return the current position in the output stream relative to the start + int64_t Tell() override; + + // Copy bytes into the output stream + void Write(const uint8_t* data, int64_t length) override; + + private: + std::shared_ptr<::arrow::io::OutputStream> stream_; +}; + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index bc9ec8f8b4c5c..2d2b5cd2bb8dd 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -375,6 +375,79 @@ Status FlatColumnReader::Impl::TypedReadBatch( } } +template <> +Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>( + int batch_size, std::shared_ptr* out) { + int values_to_read = batch_size; + RETURN_NOT_OK(InitDataBuffer<::arrow::BooleanType>(batch_size)); + valid_bits_idx_ = 0; + if (descr_->max_definition_level() > 0) { + valid_bits_buffer_ = std::make_shared(pool_); + int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size) / 8; + valid_bits_buffer_->Resize(valid_bits_size); + valid_bits_ptr_ = valid_bits_buffer_->mutable_data(); + memset(valid_bits_ptr_, 0, valid_bits_size); + null_count_ = 0; + } + + while ((values_to_read > 0) && column_reader_) { + values_buffer_.Resize(values_to_read * sizeof(bool)); + if (descr_->max_definition_level() > 0) { + def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); + } + auto reader = dynamic_cast*>(column_reader_.get()); + int64_t values_read; + int64_t levels_read; + int16_t* def_levels = reinterpret_cast(def_levels_buffer_.mutable_data()); + auto values = reinterpret_cast(values_buffer_.mutable_data()); + PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( + values_to_read, def_levels, nullptr, values, &values_read)); + values_to_read -= levels_read; + if (descr_->max_definition_level() == 0) { + ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(values, values_read); + } else { + // As per the defintion and checks for flat columns: + // descr_->max_definition_level() == 1 + ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>( + def_levels, values, values_read, levels_read); + } + if (!column_reader_->HasNext()) { NextRowGroup(); } + } + + if (descr_->max_definition_level() > 0) { + // TODO: Shrink arrays in the case they are too large + if (valid_bits_idx_ < batch_size * 0.8) { + // Shrink arrays as they are larger than the output. + // TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays + // without the need for a copy. Given a decent underlying allocator this + // should still free some underlying pages to the OS. + + auto data_buffer = std::make_shared(pool_); + RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(bool))); + memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size()); + data_buffer_ = data_buffer; + + auto valid_bits_buffer = std::make_shared(pool_); + RETURN_NOT_OK( + valid_bits_buffer->Resize(::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8)); + memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(), + valid_bits_buffer->size()); + valid_bits_buffer_ = valid_bits_buffer; + } + *out = std::make_shared<::arrow::BooleanArray>( + field_->type, valid_bits_idx_, data_buffer_, null_count_, valid_bits_buffer_); + // Relase the ownership + data_buffer_.reset(); + valid_bits_buffer_.reset(); + return Status::OK(); + } else { + *out = std::make_shared<::arrow::BooleanArray>( + field_->type, valid_bits_idx_, data_buffer_); + data_buffer_.reset(); + return Status::OK(); + } +} + template <> Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>( int batch_size, std::shared_ptr* out) { diff --git a/cpp/src/parquet/arrow/test-util.h b/cpp/src/parquet/arrow/test-util.h index 92798ff6092b9..dedd398b7321f 100644 --- a/cpp/src/parquet/arrow/test-util.h +++ b/cpp/src/parquet/arrow/test-util.h @@ -37,6 +37,9 @@ using is_arrow_int = std::is_integral; template using is_arrow_string = std::is_same; +template +using is_arrow_bool = std::is_same; + template typename std::enable_if::value, Status>::type NonNullArray( size_t size, std::shared_ptr* out) { @@ -70,8 +73,9 @@ typename std::enable_if::value, Status>::type NonNull return builder.Finish(out); } -template <> -Status NonNullArray<::arrow::BooleanType>(size_t size, std::shared_ptr* out) { +template +typename std::enable_if::value, Status>::type NonNullArray( + size_t size, std::shared_ptr* out) { std::vector values; ::arrow::test::randint(size, 0, 1, &values); ::arrow::BooleanBuilder builder( @@ -135,8 +139,8 @@ typename std::enable_if::value, Status>::type Nullabl } // This helper function only supports (size/2) nulls yet. -template <> -Status NullableArray<::arrow::BooleanType>( +template +typename std::enable_if::value, Status>::type NullableArray( size_t size, size_t num_nulls, std::shared_ptr* out) { std::vector values; ::arrow::test::randint(size, 0, 1, &values); @@ -176,19 +180,19 @@ void ExpectArray(T* expected, Array* result) { } template -void ExpectArray(typename ArrowType::c_type* expected, Array* result) { +void ExpectArrayT(void* expected, Array* result) { ::arrow::PrimitiveArray* p_array = static_cast<::arrow::PrimitiveArray*>(result); for (int64_t i = 0; i < result->length(); i++) { - EXPECT_EQ(expected[i], + EXPECT_EQ(reinterpret_cast(expected)[i], reinterpret_cast(p_array->data()->data())[i]); } } template <> -void ExpectArray<::arrow::BooleanType>(uint8_t* expected, Array* result) { +void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) { ::arrow::BooleanBuilder builder( ::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>()); - builder.Append(expected, result->length()); + builder.Append(reinterpret_cast(expected), result->length()); std::shared_ptr expected_array; EXPECT_OK(builder.Finish(&expected_array)); diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index e4d374551c039..b7c7d20c6d105 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -20,6 +20,7 @@ #include #include +#include "parquet/arrow/io.h" #include "parquet/arrow/schema.h" #include "parquet/arrow/utils.h" @@ -370,6 +371,13 @@ Status WriteFlatTable(const Table* table, MemoryPool* pool, return writer.Close(); } +Status WriteFlatTable(const Table* table, MemoryPool* pool, + const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size, + const std::shared_ptr& properties) { + auto parquet_sink = std::make_shared(sink); + return WriteFlatTable(table, pool, parquet_sink, chunk_size, properties); +} + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 92524d8ce7a2b..a82c2f619bc1d 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -23,6 +23,8 @@ #include "parquet/api/schema.h" #include "parquet/api/writer.h" +#include "arrow/io/interfaces.h" + namespace arrow { class Array; @@ -71,6 +73,11 @@ ::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table, int64_t chunk_size, const std::shared_ptr& properties = default_writer_properties()); +::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table, + ::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink, + int64_t chunk_size, + const std::shared_ptr& properties = default_writer_properties()); + } // namespace arrow } // namespace parquet