Skip to content

Commit

Permalink
PARQUET-818: Refactoring to utilize common IO, buffer, memory managem…
Browse files Browse the repository at this point in the history
…ent abstractions and implementations

This refactoring is a bit of a bloodbath, but I've attempted to preserve as much API backwards compatibility as possible.

Several points

* Arrow does not use exceptions, so will need to be very careful about making sure that no Status goes unchecked. I've tried to get most of them, but might have missed some
* parquet-cpp still exposes an abstract file read and write API as before, but this makes it easy to pass in an Arrow file handle (e.g. HDFS, OS files, memory maps, etc.)
* Custom memory allocators will need to subclass `arrow::MemoryPool` instead. If this becomes onerous for some reason, we can try to find alternatives, but basically it's the exact same class as `parquet::MemoryAllocator`

Does not require any upstream changes in Arrow.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes apache#210 from wesm/arrow-consolidation and squashes the following commits:

ef81084 [Wes McKinney] Configurable Arrow linkage. Slight .travis.yml cleaning
50b44f0 [Wes McKinney] Make some const refs
8438f86 [Wes McKinney] Revert ParquetFileReader::Open to use std::unique_ptr<RandomAccessFile>
671d981 [Wes McKinney] Actually tee output to console
ca8df13 [Wes McKinney] Do not hide test output from travis logs
f516115 [Wes McKinney] Add public link libs to dependencies to avoid race conditions with external projects
414c75f [Wes McKinney] README cleanups
be1acb5 [Wes McKinney] Move thirdparty ep's / setup to separate cmake module
46342ea [Wes McKinney] Remove unneeded ParquetAllocator interface, cleaning
b546f08 [Wes McKinney] Use MemoryAllocator alias within parquet core
8c1226d [Wes McKinney] Add Arrow to list of third party deps. Needs to be added to thirdparty
f9d8a2a [Wes McKinney] Check some unchecked Statuses
0d04820 [Wes McKinney] Fix benchmark builds. Do not fail in benchmarks if gtest.h is included due to <tr1/tuple> issue
ee312af [Wes McKinney] cpplint
6a05cd9 [Wes McKinney] Update installed header files
8d962f1 [Wes McKinney] Build and unit tests pass again
c82e2b4 [Wes McKinney] More refactoring
6ec5b71 [Wes McKinney] Re-expose original abstract IO interfaces, add Arrow subclasses that wrap inptu
c320c95 [Wes McKinney] clang-format
f10080c [Wes McKinney] Fix missed include
6ade22f [Wes McKinney] First cut refactoring, not fully compiling yet

Change-Id: I3e7c09a9b603d834db9781082838f945a0994df6
  • Loading branch information
wesm committed Dec 30, 2016
1 parent 307466a commit cde8cc1
Show file tree
Hide file tree
Showing 66 changed files with 1,455 additions and 2,656 deletions.
5 changes: 1 addition & 4 deletions cpp/src/parquet/api/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
#define PARQUET_API_IO_H

#include "parquet/exception.h"
#include "parquet/util/buffer.h"
#include "parquet/util/input.h"
#include "parquet/util/mem-allocator.h"
#include "parquet/util/output.h"
#include "parquet/util/memory.h"

#endif // PARQUET_API_IO_H
5 changes: 0 additions & 5 deletions cpp/src/parquet/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
# parquet_arrow : Arrow <-> Parquet adapter

set(PARQUET_ARROW_SRCS
io.cc
reader.cc
schema.cc
writer.cc
Expand Down Expand Up @@ -76,16 +75,13 @@ if (PARQUET_BUILD_STATIC)
endif()

ADD_PARQUET_TEST(arrow-schema-test)
ADD_PARQUET_TEST(arrow-io-test)
ADD_PARQUET_TEST(arrow-reader-writer-test)

if (PARQUET_BUILD_STATIC)
ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_static)
ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_static)
ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_static)
else()
ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_shared)
ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_shared)
ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_shared)
endif()

Expand All @@ -100,7 +96,6 @@ endif()

# Headers: top level
install(FILES
io.h
reader.h
schema.h
utils.h
Expand Down
140 changes: 0 additions & 140 deletions cpp/src/parquet/arrow/arrow-io-test.cc

This file was deleted.

6 changes: 3 additions & 3 deletions cpp/src/parquet/arrow/arrow-reader-writer-benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "parquet/column/writer.h"
#include "parquet/file/reader-internal.h"
#include "parquet/file/writer-internal.h"
#include "parquet/util/input.h"
#include "parquet/util/memory.h"

#include "arrow/api.h"

Expand Down Expand Up @@ -132,8 +132,8 @@ static void BM_ReadColumn(::benchmark::State& state) {
std::shared_ptr<Buffer> buffer = output->GetBuffer();

while (state.KeepRunning()) {
auto reader = ParquetFileReader::Open(
std::unique_ptr<RandomAccessSource>(new BufferReader(buffer)));
auto reader =
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
std::shared_ptr<::arrow::Table> table;
filereader.ReadFlatTable(&table);
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@
#include "arrow/test-util.h"

using arrow::Array;
using arrow::Buffer;
using arrow::ChunkedArray;
using arrow::default_memory_pool;
using arrow::io::BufferReader;
using arrow::PoolBuffer;
using arrow::PrimitiveArray;
using arrow::Status;
using arrow::Table;

using ParquetBuffer = parquet::Buffer;
using ParquetType = parquet::Type;
using parquet::schema::GroupNode;
using parquet::schema::NodePtr;
Expand Down Expand Up @@ -203,9 +204,8 @@ class TestParquetIO : public ::testing::Test {
}

std::unique_ptr<ParquetFileReader> ReaderFromSink() {
std::shared_ptr<ParquetBuffer> buffer = sink_->GetBuffer();
std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
return ParquetFileReader::Open(std::move(source));
std::shared_ptr<Buffer> buffer = sink_->GetBuffer();
return ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
}

void ReadSingleColumnFile(
Expand Down Expand Up @@ -357,9 +357,9 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) {
ASSERT_OK_NO_THROW(WriteFlatTable(
table.get(), default_memory_pool(), arrow_sink_, 512, default_writer_properties()));

std::shared_ptr<ParquetBuffer> pbuffer =
std::make_shared<ParquetBuffer>(buffer->data(), buffer->size());
std::unique_ptr<RandomAccessSource> source(new BufferReader(pbuffer));
auto pbuffer = std::make_shared<Buffer>(buffer->data(), buffer->size());

auto source = std::make_shared<BufferReader>(pbuffer);
std::shared_ptr<::arrow::Table> out;
this->ReadTableFromFile(ParquetFileReader::Open(std::move(source)), &out);
ASSERT_EQ(1, out->num_columns());
Expand Down
127 changes: 0 additions & 127 deletions cpp/src/parquet/arrow/io.cc

This file was deleted.

Loading

0 comments on commit cde8cc1

Please sign in to comment.