Skip to content

Commit

Permalink
Vendor Arrow Parquet Writer (facebookincubator#6586)
Browse files Browse the repository at this point in the history
Summary:
As discussed in [0], this PR is a first step at vendoring Arrow's Parquet writer into Velox. This moves all the most important Parquet related classes, but still uses Arrow core underneath. The next PRs will continue removing the arrow dependencies, and moving them to Velox types, vectors, and buffers.

[0] - facebookincubator#6193

Pull Request resolved: facebookincubator#6586

Reviewed By: xiaoxmeng

Differential Revision: D49334179

Pulled By: pedroerp

fbshipit-source-id: ade4d4d0d46f410044ff36b6f46a019e401b9ce8
  • Loading branch information
pedroerp authored and facebook-github-bot committed Sep 18, 2023
1 parent f7bcc0f commit fe01c0c
Show file tree
Hide file tree
Showing 65 changed files with 46,399 additions and 37 deletions.
3 changes: 2 additions & 1 deletion velox/dwio/common/tests/E2EFilterTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ void E2EFilterTestBase::readWithFilter(
auto input = std::make_unique<BufferedInput>(
std::make_shared<InMemoryReadFile>(data), readerOpts.getMemoryPool());
auto reader = makeReader(readerOpts, std::move(input));
// The spec must stay live over the lifetime of the reader.

// The spec must stay live over the lifetime of the reader.
setUpRowReaderOptions(rowReaderOpts, spec);
OwnershipChecker ownershipChecker;
auto rowReader = reader->createRowReader(rowReaderOpts);
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ void PageReader::makeDecoder() {
break;
case Encoding::DELTA_BINARY_PACKED:
default:
VELOX_UNSUPPORTED("Encoding not supported yet");
VELOX_UNSUPPORTED("Encoding not supported yet: {}", encoding_);
}
}

Expand Down
13 changes: 11 additions & 2 deletions velox/dwio/parquet/writer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

add_subdirectory(arrow)

add_library(velox_dwio_arrow_parquet_writer Writer.cpp)

target_link_libraries(velox_dwio_arrow_parquet_writer velox_dwio_common
velox_arrow_bridge parquet arrow fmt::fmt)
target_link_libraries(
velox_dwio_arrow_parquet_writer
velox_dwio_arrow_parquet_writer_lib
velox_dwio_arrow_parquet_writer_util_lib
velox_dwio_common
velox_arrow_bridge
parquet
arrow
fmt::fmt)
73 changes: 40 additions & 33 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,22 @@
#include "velox/vector/arrow/Bridge.h"

#include <arrow/c/bridge.h>
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include <parquet/arrow/writer.h>

#include "velox/dwio/parquet/writer/Writer.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Writer.h"

namespace facebook::velox::parquet {

using facebook::velox::parquet::arrow::ArrowWriterProperties;
using facebook::velox::parquet::arrow::Compression;
using facebook::velox::parquet::arrow::WriterProperties;
using facebook::velox::parquet::arrow::arrow::FileWriter;

// Utility for buffering Arrow output with a DataBuffer.
class ArrowDataBufferSink : public arrow::io::OutputStream {
class ArrowDataBufferSink : public ::arrow::io::OutputStream {
public:
/// @param growRatio Growth factor used when invoking the reserve() method of
/// DataSink, thereby helping to minimize frequent memcpy operations.
Expand All @@ -34,7 +42,7 @@ class ArrowDataBufferSink : public arrow::io::OutputStream {
double growRatio)
: sink_(std::move(sink)), growRatio_(growRatio), buffer_(pool) {}

arrow::Status Write(const std::shared_ptr<arrow::Buffer>& data) override {
::arrow::Status Write(const std::shared_ptr<::arrow::Buffer>& data) override {
auto requestCapacity = buffer_.size() + data->size();
if (requestCapacity > buffer_.capacity()) {
buffer_.reserve(growRatio_ * (requestCapacity));
Expand All @@ -43,32 +51,32 @@ class ArrowDataBufferSink : public arrow::io::OutputStream {
buffer_.size(),
reinterpret_cast<const char*>(data->data()),
data->size());
return arrow::Status::OK();
return ::arrow::Status::OK();
}

arrow::Status Write(const void* data, int64_t nbytes) override {
::arrow::Status Write(const void* data, int64_t nbytes) override {
auto requestCapacity = buffer_.size() + nbytes;
if (requestCapacity > buffer_.capacity()) {
buffer_.reserve(growRatio_ * (requestCapacity));
}
buffer_.append(buffer_.size(), reinterpret_cast<const char*>(data), nbytes);
return arrow::Status::OK();
return ::arrow::Status::OK();
}

arrow::Status Flush() override {
::arrow::Status Flush() override {
bytesFlushed_ += buffer_.size();
sink_->write(std::move(buffer_));
return arrow::Status::OK();
return ::arrow::Status::OK();
}

arrow::Result<int64_t> Tell() const override {
::arrow::Result<int64_t> Tell() const override {
return bytesFlushed_ + buffer_.size();
}

arrow::Status Close() override {
::arrow::Status Close() override {
ARROW_RETURN_NOT_OK(Flush());
sink_->close();
return arrow::Status::OK();
return ::arrow::Status::OK();
}

bool closed() const override {
Expand All @@ -83,37 +91,37 @@ class ArrowDataBufferSink : public arrow::io::OutputStream {
};

struct ArrowContext {
std::unique_ptr<::parquet::arrow::FileWriter> writer;
std::shared_ptr<arrow::Schema> schema;
std::shared_ptr<::parquet::WriterProperties> properties;
std::unique_ptr<FileWriter> writer;
std::shared_ptr<::arrow::Schema> schema;
std::shared_ptr<WriterProperties> properties;
uint64_t stagingRows = 0;
int64_t stagingBytes = 0;
// columns, Arrays
std::vector<std::vector<std::shared_ptr<arrow::Array>>> stagingChunks;
std::vector<std::vector<std::shared_ptr<::arrow::Array>>> stagingChunks;
};

::parquet::Compression::type getArrowParquetCompression(
Compression::type getArrowParquetCompression(
common::CompressionKind compression) {
if (compression == common::CompressionKind_SNAPPY) {
return ::parquet::Compression::SNAPPY;
return Compression::SNAPPY;
} else if (compression == common::CompressionKind_GZIP) {
return ::parquet::Compression::GZIP;
return Compression::GZIP;
} else if (compression == common::CompressionKind_ZSTD) {
return ::parquet::Compression::ZSTD;
return Compression::ZSTD;
} else if (compression == common::CompressionKind_NONE) {
return ::parquet::Compression::UNCOMPRESSED;
return Compression::UNCOMPRESSED;
} else if (compression == common::CompressionKind_LZ4) {
return ::parquet::Compression::LZ4_HADOOP;
return Compression::LZ4_HADOOP;
} else {
VELOX_FAIL("Unsupported compression {}", compression);
}
}

std::shared_ptr<::parquet::WriterProperties> getArrowParquetWriterOptions(
std::shared_ptr<WriterProperties> getArrowParquetWriterOptions(
const parquet::WriterOptions& options,
const std::unique_ptr<DefaultFlushPolicy>& flushPolicy) {
auto builder = ::parquet::WriterProperties::Builder();
::parquet::WriterProperties::Builder* properties = &builder;
auto builder = WriterProperties::Builder();
WriterProperties::Builder* properties = &builder;
if (!options.enableDictionary) {
properties = properties->disable_dictionary();
}
Expand Down Expand Up @@ -158,29 +166,28 @@ Writer::Writer(
void Writer::flush() {
if (arrowContext_->stagingRows > 0) {
if (!arrowContext_->writer) {
auto arrowProperties =
::parquet::ArrowWriterProperties::Builder().build();
auto arrowProperties = ArrowWriterProperties::Builder().build();
PARQUET_ASSIGN_OR_THROW(
arrowContext_->writer,
::parquet::arrow::FileWriter::Open(
FileWriter::Open(
*arrowContext_->schema.get(),
arrow::default_memory_pool(),
::arrow::default_memory_pool(),
stream_,
arrowContext_->properties,
arrowProperties));
}

auto fields = arrowContext_->schema->fields();
std::vector<std::shared_ptr<arrow::ChunkedArray>> chunks;
std::vector<std::shared_ptr<::arrow::ChunkedArray>> chunks;
for (int colIdx = 0; colIdx < fields.size(); colIdx++) {
auto dataType = fields.at(colIdx)->type();
auto chunk =
arrow::ChunkedArray::Make(
::arrow::ChunkedArray::Make(
std::move(arrowContext_->stagingChunks.at(colIdx)), dataType)
.ValueOrDie();
chunks.push_back(chunk);
}
auto table = arrow::Table::Make(
auto table = ::arrow::Table::Make(
arrowContext_->schema,
std::move(chunks),
static_cast<int64_t>(arrowContext_->stagingRows));
Expand Down Expand Up @@ -216,13 +223,13 @@ void Writer::write(const VectorPtr& data) {
exportToArrow(data, array, generalPool_.get());
exportToArrow(data, schema);
PARQUET_ASSIGN_OR_THROW(
auto recordBatch, arrow::ImportRecordBatch(&array, &schema));
auto recordBatch, ::arrow::ImportRecordBatch(&array, &schema));
if (!arrowContext_->schema) {
arrowContext_->schema = recordBatch->schema();
for (int colIdx = 0; colIdx < arrowContext_->schema->num_fields();
colIdx++) {
arrowContext_->stagingChunks.push_back(
std::vector<std::shared_ptr<arrow::Array>>());
std::vector<std::shared_ptr<::arrow::Array>>());
}
}

Expand Down
Loading

0 comments on commit fe01c0c

Please sign in to comment.