diff --git a/cpp/src/parquet/column/CMakeLists.txt b/cpp/src/parquet/column/CMakeLists.txt index 20f61676a931b..7eb334ecc89fc 100644 --- a/cpp/src/parquet/column/CMakeLists.txt +++ b/cpp/src/parquet/column/CMakeLists.txt @@ -17,6 +17,10 @@ # Headers: top level install(FILES + page.h reader.h + serialized-page.h scanner.h DESTINATION include/parquet/column) + +ADD_PARQUET_TEST(column-reader-test) diff --git a/cpp/src/parquet/column/column-reader-test.cc b/cpp/src/parquet/column/column-reader-test.cc new file mode 100644 index 0000000000000..88f4465b98017 --- /dev/null +++ b/cpp/src/parquet/column/column-reader-test.cc @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include + +#include + +#include "parquet/types.h" +#include "parquet/column/page.h" +#include "parquet/column/reader.h" +#include "parquet/column/test-util.h" + +#include "parquet/util/test-common.h" + +using std::string; +using std::vector; +using std::shared_ptr; +using parquet::FieldRepetitionType; +using parquet::SchemaElement; +using parquet::Encoding; +using parquet::Type; + +namespace parquet_cpp { + +namespace test { + +class TestPrimitiveReader : public ::testing::Test { + public: + void SetUp() {} + + void TearDown() {} + + void InitReader(const SchemaElement* element) { + pager_.reset(new test::MockPageReader(pages_)); + reader_ = ColumnReader::Make(element, std::move(pager_)); + } + + protected: + std::shared_ptr reader_; + std::unique_ptr pager_; + vector > pages_; +}; + +template +static vector slice(const vector& values, size_t start, size_t end) { + if (end < start) { + return vector(0); + } + + vector out(end - start); + for (size_t i = start; i < end; ++i) { + out[i - start] = values[i]; + } + return out; +} + + +TEST_F(TestPrimitiveReader, TestInt32FlatRequired) { + vector values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; + size_t num_values = values.size(); + Encoding::type value_encoding = Encoding::PLAIN; + + vector page1; + test::DataPageBuilder page_builder(&page1); + page_builder.AppendValues(values, Encoding::PLAIN); + pages_.push_back(page_builder.Finish()); + + // TODO: simplify this + SchemaElement element; + element.__set_type(Type::INT32); + element.__set_repetition_type(FieldRepetitionType::REQUIRED); + InitReader(&element); + + Int32Reader* reader = static_cast(reader_.get()); + + vector result(10, -1); + + size_t values_read = 0; + size_t batch_actual = reader->ReadBatch(10, nullptr, nullptr, + &result[0], &values_read); + ASSERT_EQ(10, batch_actual); + ASSERT_EQ(10, values_read); + + ASSERT_TRUE(vector_equal(result, values)); +} + +TEST_F(TestPrimitiveReader, TestInt32FlatOptional) { + vector values = {1, 2, 3, 4, 5}; + vector def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1}; + + size_t num_values = values.size(); + Encoding::type value_encoding = Encoding::PLAIN; + + vector page1; + test::DataPageBuilder page_builder(&page1); + + // Definition levels precede the values + page_builder.AppendDefLevels(def_levels, 1, Encoding::RLE); + page_builder.AppendValues(values, Encoding::PLAIN); + + pages_.push_back(page_builder.Finish()); + + // TODO: simplify this + SchemaElement element; + element.__set_type(Type::INT32); + element.__set_repetition_type(FieldRepetitionType::OPTIONAL); + InitReader(&element); + + Int32Reader* reader = static_cast(reader_.get()); + + std::vector vexpected; + std::vector dexpected; + + size_t values_read = 0; + size_t batch_actual = 0; + + vector vresult(3, -1); + vector dresult(5, -1); + + batch_actual = reader->ReadBatch(5, &dresult[0], nullptr, + &vresult[0], &values_read); + ASSERT_EQ(5, batch_actual); + ASSERT_EQ(3, values_read); + + ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3))); + ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5))); + + batch_actual = reader->ReadBatch(5, &dresult[0], nullptr, + &vresult[0], &values_read); + ASSERT_EQ(5, batch_actual); + ASSERT_EQ(2, values_read); + + ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5))); + ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10))); + + // EOS, pass all nullptrs to check for improper writes. Do not segfault / + // core dump + batch_actual = reader->ReadBatch(5, nullptr, nullptr, + nullptr, &values_read); + ASSERT_EQ(0, batch_actual); + ASSERT_EQ(0, values_read); +} + +} // namespace test + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/column/page.h b/cpp/src/parquet/column/page.h new file mode 100644 index 0000000000000..46f5d624e700c --- /dev/null +++ b/cpp/src/parquet/column/page.h @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module defines an abstract interface for iterating through pages in a +// Parquet column chunk within a row group. It could be extended in the future +// to iterate through all data pages in all chunks in a file. + +#ifndef PARQUET_COLUMN_PAGE_H +#define PARQUET_COLUMN_PAGE_H + +#include "parquet/thrift/parquet_types.h" + +namespace parquet_cpp { + +// Note: Copying the specific page header Thrift metadata to the Page object +// (instead of using a pointer) presently so that data pages can be +// decompressed and processed in parallel. We can turn the header members of +// these classes into pointers at some point, but the downside is that +// applications materializing multiple data pages at once will have to have a +// data container that manages the lifetime of the deserialized +// parquet::PageHeader structs. +// +// TODO: Parallel processing is not yet safe because of memory-ownership +// semantics (the PageReader may or may not own the memory referenced by a +// page) +class Page { + // TODO(wesm): In the future Parquet implementations may store the crc code + // in parquet::PageHeader. parquet-mr currently does not, so we also skip it + // here, both on the read and write path + public: + Page(const uint8_t* buffer, size_t buffer_size, parquet::PageType::type type) : + buffer_(buffer), + buffer_size_(buffer_size), + type_(type) {} + + parquet::PageType::type type() const { + return type_; + } + + // @returns: a pointer to the page's data + const uint8_t* data() const { + return buffer_; + } + + // @returns: the total size in bytes of the page's data buffer + size_t size() const { + return buffer_size_; + } + + private: + const uint8_t* buffer_; + size_t buffer_size_; + + parquet::PageType::type type_; +}; + + +class DataPage : public Page { + public: + DataPage(const uint8_t* buffer, size_t buffer_size, + const parquet::DataPageHeader& header) : + Page(buffer, buffer_size, parquet::PageType::DATA_PAGE), + header_(header) {} + + size_t num_values() const { + return header_.num_values; + } + + parquet::Encoding::type encoding() const { + return header_.encoding; + } + + private: + parquet::DataPageHeader header_; +}; + + +class DataPageV2 : public Page { + public: + DataPageV2(const uint8_t* buffer, size_t buffer_size, + const parquet::DataPageHeaderV2& header) : + Page(buffer, buffer_size, parquet::PageType::DATA_PAGE_V2), + header_(header) {} + + private: + parquet::DataPageHeaderV2 header_; +}; + + +class DictionaryPage : public Page { + public: + DictionaryPage(const uint8_t* buffer, size_t buffer_size, + const parquet::DictionaryPageHeader& header) : + Page(buffer, buffer_size, parquet::PageType::DICTIONARY_PAGE), + header_(header) {} + + size_t num_values() const { + return header_.num_values; + } + + private: + parquet::DictionaryPageHeader header_; +}; + +// Abstract page iterator interface. This way, we can feed column pages to the +// ColumnReader through whatever mechanism we choose +class PageReader { + public: + virtual ~PageReader() {} + + // @returns: shared_ptr(nullptr) on EOS, std::shared_ptr + // containing new Page otherwise + virtual std::shared_ptr NextPage() = 0; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_COLUMN_PAGE_H diff --git a/cpp/src/parquet/column/reader.cc b/cpp/src/parquet/column/reader.cc index edfea49fc2c90..91e026ac901e9 100644 --- a/cpp/src/parquet/column/reader.cc +++ b/cpp/src/parquet/column/reader.cc @@ -18,45 +18,62 @@ #include "parquet/column/reader.h" #include +#include #include #include -#include "parquet/compression/codec.h" -#include "parquet/encodings/encodings.h" -#include "parquet/thrift/util.h" -#include "parquet/util/input_stream.h" +#include "parquet/column/page.h" -const int DATA_PAGE_SIZE = 64 * 1024; +#include "parquet/encodings/encodings.h" namespace parquet_cpp { -using parquet::CompressionCodec; using parquet::Encoding; using parquet::FieldRepetitionType; using parquet::PageType; using parquet::Type; -ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata, - const parquet::SchemaElement* schema, std::unique_ptr stream) - : metadata_(metadata), - schema_(schema), - stream_(std::move(stream)), +ColumnReader::ColumnReader(const parquet::SchemaElement* schema, + std::unique_ptr pager) + : schema_(schema), + pager_(std::move(pager)), num_buffered_values_(0), - num_decoded_values_(0) { - switch (metadata->codec) { - case CompressionCodec::UNCOMPRESSED: - break; - case CompressionCodec::SNAPPY: - decompressor_.reset(new SnappyCodec()); - break; - default: - ParquetException::NYI("Reading compressed data"); + num_decoded_values_(0) {} + +template +void TypedColumnReader::ConfigureDictionary(const DictionaryPage* page) { + auto it = decoders_.find(Encoding::RLE_DICTIONARY); + if (it != decoders_.end()) { + throw ParquetException("Column cannot have more than one dictionary."); } - config_ = Config::DefaultConfig(); + PlainDecoder dictionary(schema_); + dictionary.SetData(page->num_values(), page->data(), page->size()); + + // The dictionary is fully decoded during DictionaryDecoder::Init, so the + // DictionaryPage buffer is no longer required after this step + // + // TODO(wesm): investigate whether this all-or-nothing decoding of the + // dictionary makes sense and whether performance can be improved + std::shared_ptr decoder( + new DictionaryDecoder(schema_, &dictionary)); + + decoders_[Encoding::RLE_DICTIONARY] = decoder; + current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get(); } +static size_t InitializeLevelDecoder(const uint8_t* buffer, + int16_t max_level, std::unique_ptr& decoder) { + int num_definition_bytes = *reinterpret_cast(buffer); + + decoder.reset(new RleDecoder(buffer + sizeof(uint32_t), + num_definition_bytes, + BitUtil::NumRequiredBits(max_level))); + + return sizeof(uint32_t) + num_definition_bytes; +} + // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index // encoding. static bool IsDictionaryIndexEncoding(const Encoding::type& e) { @@ -66,68 +83,44 @@ static bool IsDictionaryIndexEncoding(const Encoding::type& e) { template bool TypedColumnReader::ReadNewPage() { // Loop until we find the next data page. + const uint8_t* buffer; while (true) { - int bytes_read = 0; - const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read); - if (bytes_read == 0) return false; - uint32_t header_size = bytes_read; - DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); - stream_->Read(header_size, &bytes_read); - - int compressed_len = current_page_header_.compressed_page_size; - int uncompressed_len = current_page_header_.uncompressed_page_size; - - // Read the compressed data page. - buffer = stream_->Read(compressed_len, &bytes_read); - if (bytes_read != compressed_len) ParquetException::EofException(); - - // Uncompress it if we need to - if (decompressor_ != NULL) { - // Grow the uncompressed buffer if we need to. - if (uncompressed_len > decompression_buffer_.size()) { - decompression_buffer_.resize(uncompressed_len); - } - decompressor_->Decompress(compressed_len, buffer, uncompressed_len, - &decompression_buffer_[0]); - buffer = &decompression_buffer_[0]; + current_page_ = pager_->NextPage(); + if (!current_page_) { + // EOS + return false; } - if (current_page_header_.type == PageType::DICTIONARY_PAGE) { - auto it = decoders_.find(Encoding::RLE_DICTIONARY); - if (it != decoders_.end()) { - throw ParquetException("Column cannot have more than one dictionary."); - } - - PlainDecoder dictionary(schema_); - dictionary.SetData(current_page_header_.dictionary_page_header.num_values, - buffer, uncompressed_len); - std::shared_ptr decoder( - new DictionaryDecoder(schema_, &dictionary)); - - decoders_[Encoding::RLE_DICTIONARY] = decoder; - current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get(); + if (current_page_->type() == PageType::DICTIONARY_PAGE) { + ConfigureDictionary(static_cast(current_page_.get())); continue; - } else if (current_page_header_.type == PageType::DATA_PAGE) { + } else if (current_page_->type() == PageType::DATA_PAGE) { + const DataPage* page = static_cast(current_page_.get()); + // Read a data page. - num_buffered_values_ = current_page_header_.data_page_header.num_values; + num_buffered_values_ = page->num_values(); // Have not decoded any values from the data page yet num_decoded_values_ = 0; + buffer = page->data(); + + // If the data page includes repetition and definition levels, we + // initialize the level decoder and subtract the encoded level bytes from + // the page size to determine the number of bytes in the encoded data. + size_t data_size = page->size(); + // Read definition levels. if (schema_->repetition_type != FieldRepetitionType::REQUIRED) { - int num_definition_bytes = *reinterpret_cast(buffer); - - // Temporary hack until schema resolution + // Temporary hack until schema resolution implemented max_definition_level_ = 1; - buffer += sizeof(uint32_t); - definition_level_decoder_.reset( - new RleDecoder(buffer, num_definition_bytes, 1)); - buffer += num_definition_bytes; - uncompressed_len -= sizeof(uint32_t); - uncompressed_len -= num_definition_bytes; + size_t def_levels_bytes = InitializeLevelDecoder(buffer, + max_definition_level_, definition_level_decoder_); + + buffer += def_levels_bytes; + data_size -= def_levels_bytes; } else { // REQUIRED field max_definition_level_ = 0; @@ -137,7 +130,8 @@ bool TypedColumnReader::ReadNewPage() { // Get a decoder object for this page or create a new decoder if this is the // first page with this encoding. - Encoding::type encoding = current_page_header_.data_page_header.encoding; + Encoding::type encoding = page->encoding(); + if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY; auto it = decoders_.find(encoding); @@ -163,10 +157,11 @@ bool TypedColumnReader::ReadNewPage() { throw ParquetException("Unknown encoding type."); } } - current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len); + current_decoder_->SetData(num_buffered_values_, buffer, data_size); return true; } else { - // We don't know what this page type is. We're allowed to skip non-data pages. + // We don't know what this page type is. We're allowed to skip non-data + // pages. continue; } } @@ -206,27 +201,26 @@ size_t ColumnReader::ReadRepetitionLevels(size_t batch_size, int16_t* levels) { // ---------------------------------------------------------------------- // Dynamic column reader constructor -std::shared_ptr ColumnReader::Make(const parquet::ColumnMetaData* metadata, - const parquet::SchemaElement* element, std::unique_ptr stream) { - switch (metadata->type) { +std::shared_ptr ColumnReader::Make( + const parquet::SchemaElement* element, + std::unique_ptr pager) { + switch (element->type) { case Type::BOOLEAN: - return std::make_shared(metadata, element, std::move(stream)); + return std::make_shared(element, std::move(pager)); case Type::INT32: - return std::make_shared(metadata, element, std::move(stream)); + return std::make_shared(element, std::move(pager)); case Type::INT64: - return std::make_shared(metadata, element, std::move(stream)); + return std::make_shared(element, std::move(pager)); case Type::INT96: - return std::make_shared(metadata, element, std::move(stream)); + return std::make_shared(element, std::move(pager)); case Type::FLOAT: - return std::make_shared(metadata, element, std::move(stream)); + return std::make_shared(element, std::move(pager)); case Type::DOUBLE: - return std::make_shared(metadata, element, std::move(stream)); + return std::make_shared(element, std::move(pager)); case Type::BYTE_ARRAY: - return std::make_shared(metadata, element, - std::move(stream)); + return std::make_shared(element, std::move(pager)); case Type::FIXED_LEN_BYTE_ARRAY: - return std::make_shared(metadata, element, - std::move(stream)); + return std::make_shared(element, std::move(pager)); default: ParquetException::NYI("type reader not implemented"); } diff --git a/cpp/src/parquet/column/reader.h b/cpp/src/parquet/column/reader.h index 8f857c4a3bec3..27ff67813c5da 100644 --- a/cpp/src/parquet/column/reader.h +++ b/cpp/src/parquet/column/reader.h @@ -28,9 +28,11 @@ #include "parquet/exception.h" #include "parquet/types.h" + +#include "parquet/column/page.h" + #include "parquet/thrift/parquet_constants.h" #include "parquet/thrift/parquet_types.h" -#include "parquet/util/input_stream.h" #include "parquet/encodings/encodings.h" #include "parquet/util/rle-encoding.h" @@ -52,21 +54,10 @@ class Scanner; class ColumnReader { public: - struct Config { - int batch_size; - - static Config DefaultConfig() { - Config config; - config.batch_size = 128; - return config; - } - }; - - ColumnReader(const parquet::ColumnMetaData*, const parquet::SchemaElement*, - std::unique_ptr stream); + ColumnReader(const parquet::SchemaElement*, std::unique_ptr); - static std::shared_ptr Make(const parquet::ColumnMetaData*, - const parquet::SchemaElement*, std::unique_ptr stream); + static std::shared_ptr Make(const parquet::SchemaElement*, + std::unique_ptr); // Returns true if there are still values in this column. bool HasNext() { @@ -81,11 +72,7 @@ class ColumnReader { } parquet::Type::type type() const { - return metadata_->type; - } - - const parquet::ColumnMetaData* metadata() const { - return metadata_; + return schema_->type; } const parquet::SchemaElement* schema() const { @@ -105,17 +92,10 @@ class ColumnReader { // Returns the number of decoded repetition levels size_t ReadRepetitionLevels(size_t batch_size, int16_t* levels); - Config config_; - - const parquet::ColumnMetaData* metadata_; const parquet::SchemaElement* schema_; - std::unique_ptr stream_; - // Compression codec to use. - std::unique_ptr decompressor_; - std::vector decompression_buffer_; - - parquet::PageHeader current_page_header_; + std::unique_ptr pager_; + std::shared_ptr current_page_; // Not set if full schema for this field has no optional or repeated elements std::unique_ptr definition_level_decoder_; @@ -145,12 +125,10 @@ class TypedColumnReader : public ColumnReader { public: typedef typename type_traits::value_type T; - TypedColumnReader(const parquet::ColumnMetaData* metadata, - const parquet::SchemaElement* schema, std::unique_ptr stream) : - ColumnReader(metadata, schema, std::move(stream)), + TypedColumnReader(const parquet::SchemaElement* schema, + std::unique_ptr pager) : + ColumnReader(schema, std::move(pager)), current_decoder_(NULL) { - size_t value_byte_size = type_traits::value_byte_size; - values_buffer_.resize(config_.batch_size * value_byte_size); } // Read a batch of repetition levels, definition levels, and values from the @@ -181,18 +159,20 @@ class TypedColumnReader : public ColumnReader { // @returns: the number of values read into the out buffer size_t ReadValues(size_t batch_size, T* out); - // Map of compression type to decompressor object. + // Map of encoding type to the respective decoder object. For example, a + // column chunk's data pages may include both dictionary-encoded and + // plain-encoded data. std::unordered_map > decoders_; + void ConfigureDictionary(const DictionaryPage* page); + DecoderType* current_decoder_; - std::vector values_buffer_; }; template inline size_t TypedColumnReader::ReadValues(size_t batch_size, T* out) { size_t num_decoded = current_decoder_->Decode(out, batch_size); - num_decoded_values_ += num_decoded; return num_decoded; } @@ -212,9 +192,22 @@ inline size_t TypedColumnReader::ReadBatch(int batch_size, int16_t* def_le size_t num_def_levels = 0; size_t num_rep_levels = 0; + size_t values_to_read = 0; + // If the field is required and non-repeated, there are no definition levels if (definition_level_decoder_) { num_def_levels = ReadDefinitionLevels(batch_size, def_levels); + + // TODO(wesm): this tallying of values-to-decode can be performed with better + // cache-efficiency if fused with the level decoding. + for (size_t i = 0; i < num_def_levels; ++i) { + if (def_levels[i] == max_definition_level_) { + ++values_to_read; + } + } + } else { + // Required field, read all values + values_to_read = batch_size; } // Not present for non-repeated fields @@ -226,18 +219,11 @@ inline size_t TypedColumnReader::ReadBatch(int batch_size, int16_t* def_le } } - // TODO(wesm): this tallying of values-to-decode can be performed with better - // cache-efficiency if fused with the level decoding. - size_t values_to_read = 0; - for (size_t i = 0; i < num_def_levels; ++i) { - if (def_levels[i] == max_definition_level_) { - ++values_to_read; - } - } - *values_read = ReadValues(values_to_read, values); + size_t total_values = std::max(num_def_levels, *values_read); + num_decoded_values_ += total_values; - return num_def_levels; + return total_values; } diff --git a/cpp/src/parquet/column/serialized-page.cc b/cpp/src/parquet/column/serialized-page.cc new file mode 100644 index 0000000000000..1cbaf4d399a89 --- /dev/null +++ b/cpp/src/parquet/column/serialized-page.cc @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/column/serialized-page.h" + +#include + +#include "parquet/exception.h" +#include "parquet/thrift/util.h" +#include "parquet/util/input_stream.h" + +using parquet::PageType; + +namespace parquet_cpp { + +// ---------------------------------------------------------------------- +// SerializedPageReader deserializes Thrift metadata and pages that have been +// assembled in a serialized stream for storing in a Parquet files + +SerializedPageReader::SerializedPageReader(std::unique_ptr stream, + parquet::CompressionCodec::type codec) : + stream_(std::move(stream)) { + switch (codec) { + case parquet::CompressionCodec::UNCOMPRESSED: + break; + case parquet::CompressionCodec::SNAPPY: + decompressor_.reset(new SnappyCodec()); + break; + default: + ParquetException::NYI("Reading compressed data"); + } +} + +// TODO(wesm): this may differ from file to file +static constexpr int DATA_PAGE_SIZE = 64 * 1024; + +std::shared_ptr SerializedPageReader::NextPage() { + // Loop here because there may be unhandled page types that we skip until + // finding a page that we do know what to do with + while (true) { + int bytes_read = 0; + const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read); + if (bytes_read == 0) { + return std::shared_ptr(nullptr); + } + + // This gets used, then set by DeserializeThriftMsg + uint32_t header_size = bytes_read; + DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); + + // Advance the stream offset + stream_->Read(header_size, &bytes_read); + + int compressed_len = current_page_header_.compressed_page_size; + int uncompressed_len = current_page_header_.uncompressed_page_size; + + // Read the compressed data page. + buffer = stream_->Read(compressed_len, &bytes_read); + if (bytes_read != compressed_len) ParquetException::EofException(); + + // Uncompress it if we need to + if (decompressor_ != NULL) { + // Grow the uncompressed buffer if we need to. + if (uncompressed_len > decompression_buffer_.size()) { + decompression_buffer_.resize(uncompressed_len); + } + decompressor_->Decompress(compressed_len, buffer, uncompressed_len, + &decompression_buffer_[0]); + buffer = &decompression_buffer_[0]; + } + + if (current_page_header_.type == PageType::DICTIONARY_PAGE) { + return std::make_shared(buffer, uncompressed_len, + current_page_header_.dictionary_page_header); + } else if (current_page_header_.type == PageType::DATA_PAGE) { + return std::make_shared(buffer, uncompressed_len, + current_page_header_.data_page_header); + } else if (current_page_header_.type == PageType::DATA_PAGE_V2) { + ParquetException::NYI("data page v2"); + } else { + // We don't know what this page type is. We're allowed to skip non-data + // pages. + continue; + } + } + return std::shared_ptr(nullptr); +} + +} // namespace parquet_cpp diff --git a/cpp/src/parquet/column/serialized-page.h b/cpp/src/parquet/column/serialized-page.h new file mode 100644 index 0000000000000..2735c3cd535a9 --- /dev/null +++ b/cpp/src/parquet/column/serialized-page.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module defines an abstract interface for iterating through pages in a +// Parquet column chunk within a row group. It could be extended in the future +// to iterate through all data pages in all chunks in a file. + +#ifndef PARQUET_COLUMN_SERIALIZED_PAGE_H +#define PARQUET_COLUMN_SERIALIZED_PAGE_H + +#include +#include + +#include "parquet/column/page.h" +#include "parquet/compression/codec.h" +#include "parquet/util/input_stream.h" +#include "parquet/thrift/parquet_types.h" + +namespace parquet_cpp { + +// This subclass delimits pages appearing in a serialized stream, each preceded +// by a serialized Thrift parquet::PageHeader indicating the type of each page +// and the page metadata. +class SerializedPageReader : public PageReader { + public: + SerializedPageReader(std::unique_ptr stream, + parquet::CompressionCodec::type codec); + + virtual ~SerializedPageReader() {} + + // Implement the PageReader interface + virtual std::shared_ptr NextPage(); + + private: + std::unique_ptr stream_; + + parquet::PageHeader current_page_header_; + std::shared_ptr current_page_; + + // Compression codec to use. + std::unique_ptr decompressor_; + std::vector decompression_buffer_; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_COLUMN_SERIALIZED_PAGE_H diff --git a/cpp/src/parquet/column/test-util.h b/cpp/src/parquet/column/test-util.h new file mode 100644 index 0000000000000..80f3fa1ae9400 --- /dev/null +++ b/cpp/src/parquet/column/test-util.h @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// This module defines an abstract interface for iterating through pages in a +// Parquet column chunk within a row group. It could be extended in the future +// to iterate through all data pages in all chunks in a file. + +#ifndef PARQUET_COLUMN_TEST_UTIL_H +#define PARQUET_COLUMN_TEST_UTIL_H + +#include +#include +#include + +#include "parquet/column/page.h" + +using parquet::Encoding; + +namespace parquet_cpp { + +namespace test { + +class MockPageReader : public PageReader { + public: + explicit MockPageReader(const std::vector >& pages) : + pages_(pages), + page_index_(0) {} + + // Implement the PageReader interface + virtual std::shared_ptr NextPage() { + if (page_index_ == pages_.size()) { + // EOS to consumer + return std::shared_ptr(nullptr); + } + return pages_[page_index_++]; + } + + private: + std::vector > pages_; + size_t page_index_; +}; + +// TODO(wesm): this is only used for testing for now + +static constexpr int DEFAULT_DATA_PAGE_SIZE = 64 * 1024; +static constexpr int INIT_BUFFER_SIZE = 1024; + +template +class DataPageBuilder { + public: + typedef typename type_traits::value_type T; + + // The passed vector is the owner of the page's data + explicit DataPageBuilder(std::vector* out) : + out_(out), + buffer_size_(0), + num_values_(0), + have_def_levels_(false), + have_rep_levels_(false), + have_values_(false) { + out_->resize(INIT_BUFFER_SIZE); + buffer_capacity_ = INIT_BUFFER_SIZE; + } + + void AppendDefLevels(const std::vector& levels, + int16_t max_level, parquet::Encoding::type encoding) { + AppendLevels(levels, max_level, encoding); + + num_values_ = std::max(levels.size(), num_values_); + header_.__set_definition_level_encoding(encoding); + have_def_levels_ = true; + } + + void AppendRepLevels(const std::vector& levels, + int16_t max_level, parquet::Encoding::type encoding) { + AppendLevels(levels, max_level, encoding); + + num_values_ = std::max(levels.size(), num_values_); + header_.__set_repetition_level_encoding(encoding); + have_rep_levels_ = true; + } + + void AppendValues(const std::vector& values, + parquet::Encoding::type encoding) { + if (encoding != Encoding::PLAIN) { + ParquetException::NYI("only plain encoding currently implemented"); + } + size_t bytes_to_encode = values.size() * sizeof(T); + Reserve(bytes_to_encode); + + PlainEncoder encoder(nullptr); + size_t nbytes = encoder.Encode(&values[0], values.size(), Head()); + // In case for some reason it's fewer than bytes_to_encode + buffer_size_ += nbytes; + + num_values_ = std::max(values.size(), num_values_); + header_.__set_encoding(encoding); + have_values_ = true; + } + + std::shared_ptr Finish() { + if (!have_values_) { + throw ParquetException("A data page must at least contain values"); + } + header_.__set_num_values(num_values_); + return std::make_shared(&(*out_)[0], buffer_size_, header_); + } + + private: + std::vector* out_; + + size_t buffer_size_; + size_t buffer_capacity_; + + parquet::DataPageHeader header_; + + size_t num_values_; + + bool have_def_levels_; + bool have_rep_levels_; + bool have_values_; + + void Reserve(size_t nbytes) { + while ((nbytes + buffer_size_) > buffer_capacity_) { + // TODO(wesm): limit to one reserve when this loop runs more than once + size_t new_capacity = 2 * buffer_capacity_; + out_->resize(new_capacity); + buffer_capacity_ = new_capacity; + } + } + + uint8_t* Head() { + return &(*out_)[buffer_size_]; + } + + // Used internally for both repetition and definition levels + void AppendLevels(const std::vector& levels, int16_t max_level, + parquet::Encoding::type encoding) { + if (encoding != Encoding::RLE) { + ParquetException::NYI("only rle encoding currently implemented"); + } + + // TODO: compute a more precise maximum size for the encoded levels + std::vector encode_buffer(DEFAULT_DATA_PAGE_SIZE); + + RleEncoder encoder(&encode_buffer[0], encode_buffer.size(), + BitUtil::NumRequiredBits(max_level)); + + // TODO(wesm): push down vector encoding + for (int16_t level : levels) { + if (!encoder.Put(level)) { + throw ParquetException("out of space"); + } + } + + uint32_t rle_bytes = encoder.Flush(); + size_t levels_footprint = sizeof(uint32_t) + rle_bytes; + Reserve(levels_footprint); + + *reinterpret_cast(Head()) = rle_bytes; + memcpy(Head() + sizeof(uint32_t), encoder.buffer(), rle_bytes); + buffer_size_ += levels_footprint; + } +}; + +} // namespace test + +} // namespace parquet_cpp + +#endif // PARQUET_COLUMN_TEST_UTIL_H diff --git a/cpp/src/parquet/encodings/encodings.h b/cpp/src/parquet/encodings/encodings.h index b30146a2c5919..4fb3d9a9c5a7a 100644 --- a/cpp/src/parquet/encodings/encodings.h +++ b/cpp/src/parquet/encodings/encodings.h @@ -67,6 +67,40 @@ class Decoder { int num_values_; }; + +// Base class for value encoders. Since encoders may or not have state (e.g., +// dictionary encoding) we use a class instance to maintain any state. +// +// TODO(wesm): Encode interface API is temporary +template +class Encoder { + public: + typedef typename type_traits::value_type T; + + virtual ~Encoder() {} + + // TODO(wesm): use an output stream + + // Subclasses should override the ones they support + // + // @returns: the number of bytes written to dst + virtual size_t Encode(const T* src, int num_values, uint8_t* dst) { + throw ParquetException("Encoder does not implement this type."); + return 0; + } + + const parquet::Encoding::type encoding() const { return encoding_; } + + protected: + explicit Encoder(const parquet::SchemaElement* schema, + const parquet::Encoding::type& encoding) + : schema_(schema), encoding_(encoding) {} + + // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY + const parquet::SchemaElement* schema_; + const parquet::Encoding::type encoding_; +}; + } // namespace parquet_cpp #include "parquet/encodings/plain-encoding.h" diff --git a/cpp/src/parquet/encodings/plain-encoding.h b/cpp/src/parquet/encodings/plain-encoding.h index e8f8977104187..11e70c7506124 100644 --- a/cpp/src/parquet/encodings/plain-encoding.h +++ b/cpp/src/parquet/encodings/plain-encoding.h @@ -22,8 +22,13 @@ #include +using parquet::Type; + namespace parquet_cpp { +// ---------------------------------------------------------------------- +// Encoding::PLAIN decoder implementation + template class PlainDecoder : public Decoder { public: @@ -60,7 +65,7 @@ inline int PlainDecoder::Decode(T* buffer, int max_values) { // Template specialization for BYTE_ARRAY template <> -inline int PlainDecoder::Decode(ByteArray* buffer, +inline int PlainDecoder::Decode(ByteArray* buffer, int max_values) { max_values = std::min(max_values, num_values_); for (int i = 0; i < max_values; ++i) { @@ -76,7 +81,7 @@ inline int PlainDecoder::Decode(ByteArray* buffer, // Template specialization for FIXED_LEN_BYTE_ARRAY template <> -inline int PlainDecoder::Decode( +inline int PlainDecoder::Decode( FixedLenByteArray* buffer, int max_values) { max_values = std::min(max_values, num_values_); int len = schema_->type_length; @@ -91,10 +96,10 @@ inline int PlainDecoder::Decode( } template <> -class PlainDecoder : public Decoder { +class PlainDecoder : public Decoder { public: explicit PlainDecoder(const parquet::SchemaElement* schema) : - Decoder(schema, parquet::Encoding::PLAIN) {} + Decoder(schema, parquet::Encoding::PLAIN) {} virtual void SetData(int num_values, const uint8_t* data, int len) { num_values_ = num_values; @@ -113,6 +118,49 @@ class PlainDecoder : public Decoder +class PlainEncoder : public Encoder { + public: + typedef typename type_traits::value_type T; + + explicit PlainEncoder(const parquet::SchemaElement* schema) : + Encoder(schema, parquet::Encoding::PLAIN) {} + + virtual size_t Encode(const T* src, int num_values, uint8_t* dst); +}; + +template +inline size_t PlainEncoder::Encode(const T* buffer, int num_values, + uint8_t* dst) { + size_t nbytes = num_values * sizeof(T); + memcpy(dst, buffer, nbytes); + return nbytes; +} + +template <> +inline size_t PlainEncoder::Encode( + const bool* src, int num_values, uint8_t* dst) { + ParquetException::NYI("bool encoding"); + return 0; +} + +template <> +inline size_t PlainEncoder::Encode(const ByteArray* src, + int num_values, uint8_t* dst) { + ParquetException::NYI("byte array encoding"); + return 0; +} + +template <> +inline size_t PlainEncoder::Encode( + const FixedLenByteArray* src, int num_values, uint8_t* dst) { + ParquetException::NYI("FLBA encoding"); + return 0; +} + } // namespace parquet_cpp #endif diff --git a/cpp/src/parquet/reader.cc b/cpp/src/parquet/reader.cc index a43a2a51fdaa6..a4e767e2a5df6 100644 --- a/cpp/src/parquet/reader.cc +++ b/cpp/src/parquet/reader.cc @@ -25,7 +25,9 @@ #include #include "parquet/column/reader.h" +#include "parquet/column/serialized-page.h" #include "parquet/column/scanner.h" + #include "parquet/exception.h" #include "parquet/thrift/util.h" #include "parquet/util/input_stream.h" @@ -115,8 +117,13 @@ std::shared_ptr RowGroupReader::Column(size_t i) { } // TODO(wesm): This presumes a flat schema - std::shared_ptr reader = ColumnReader::Make(&col.meta_data, - &this->parent_->metadata_.schema[i + 1], std::move(input)); + const parquet::SchemaElement* schema = &parent_->metadata_.schema[i + 1]; + + std::unique_ptr pager( + new SerializedPageReader(std::move(input), col.meta_data.codec)); + + std::shared_ptr reader = ColumnReader::Make(schema, + std::move(pager)); column_readers_[i] = reader; return reader; @@ -269,7 +276,7 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) { size_t nColumns = group_reader->num_columns(); for (int c = 0; c < group_reader->num_columns(); ++c) { - const parquet::ColumnMetaData* meta_data = group_reader->Column(c)->metadata(); + const parquet::ColumnMetaData* meta_data = group_reader->column_metadata(c); stream << "Column " << c << ": " << meta_data->num_values << " rows, " << meta_data->statistics.null_count << " null values, " diff --git a/cpp/src/parquet/reader.h b/cpp/src/parquet/reader.h index 4c9211916a6ec..16927a7679e34 100644 --- a/cpp/src/parquet/reader.h +++ b/cpp/src/parquet/reader.h @@ -83,6 +83,10 @@ class RowGroupReader { // column. Ownership is shared with the RowGroupReader. std::shared_ptr Column(size_t i); + const parquet::ColumnMetaData* column_metadata(size_t i) const { + return &row_group_->columns[i].meta_data; + } + size_t num_columns() const { return row_group_->columns.size(); } diff --git a/cpp/src/parquet/util/bit-util.h b/cpp/src/parquet/util/bit-util.h index 4db585a0ccc67..7a2e9212faf79 100644 --- a/cpp/src/parquet/util/bit-util.h +++ b/cpp/src/parquet/util/bit-util.h @@ -276,6 +276,14 @@ class BitUtil { static T UnsetBit(T v, int bitpos) { return v & ~(static_cast(0x1) << bitpos); } + + // Returns the minimum number of bits needed to represent the value of 'x' + static inline int NumRequiredBits(uint64_t x) { + for (int i = 63; i >= 0; --i) { + if (x & 1L << i) return i + 1; + } + return 0; + } }; } // namespace parquet_cpp diff --git a/cpp/src/parquet/util/test-common.h b/cpp/src/parquet/util/test-common.h new file mode 100644 index 0000000000000..38bc32c5c844e --- /dev/null +++ b/cpp/src/parquet/util/test-common.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_UTIL_TEST_COMMON_H +#define PARQUET_UTIL_TEST_COMMON_H + +#include +#include + +using std::vector; + +namespace parquet_cpp { + +namespace test { + +template +static inline bool vector_equal(const vector& left, const vector& right) { + if (left.size() != right.size()) { + return false; + } + + for (size_t i = 0; i < left.size(); ++i) { + if (left[i] != right[i]) { + std::cerr << "index " << i + << " left was " << left[i] + << " right was " << right[i] + << std::endl; + return false; + } + } + + return true; +} + +} // namespace test + +} // namespace parquet_cpp + +#endif // PARQUET_UTIL_TEST_COMMON_H