From c16eccb1d220a749d7d1ec48b70e828fc74cce54 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 6 Mar 2019 15:46:22 -0600 Subject: [PATCH] =?UTF-8?q?PARQUET-1482:=20[C++]=20Add=20branch=20to=20Typ?= =?UTF-8?q?edRecordReader::ReadNewPage=20for=20=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …PageType::DATA_PAGE_V2 to address incompatibility with parquetjs. **Tests** This commit doesn't include tests right now; I am working on adding tests and was hoping for some initial feedback on the code changes. I may need to use an actual file generated by `parquetjs` to test this issue, so I wonder if adding `feeds1kMicros.parquet` from the JIRA task to the parquet-testing repository is an option for this. **Description** `parquetjs` seems to be writing Parquet V2 files with [`DataPageV2`](https://github.com/apache/parquet-format/blob/e93dd628d90aa076745558998f0bf5d9c262bf22/src/main/thrift/parquet.thrift#L529) pages, while `parquet-cpp` writes Parquet V2 files with [`DataPage`](https://github.com/apache/parquet-format/blob/e93dd628d90aa076745558998f0bf5d9c262bf22/src/main/thrift/parquet.thrift#L492) pages. Since `TypedRecordReader::ReadNewPage()` only had a branch for `PageType::DATA_PAGE`, the reader would return without reading any data for records that have `DATA_PAGE_V2` pages. This explains the behavior observed in [PARQUET-1482](https://issues.apache.org/jira/projects/PARQUET/issues/PARQUET-1482?filter=allopenissues). This commit adds a new if-else branch for the `DataPageV2` case in `TypedRecordReader::ReadNewPage()`. Since the `DataPageV2` branch needed to reuse the code from the `DataPage` case, I refactored the repetition/definition level decoder initialization and the data decoder initialization to two new methods in the `TypedRecordReader` class. These new methods are now called by the `DataPage` and `DataPageV2` initialization branches in `TypedRecordReader::ReadNewPage()`. There is an alternate implementation possible (with a smaller diff) by sharing the same else-if branch between `DataPage` and `DataPageV2` using a pointer-to-derived `shared_ptr`. However, since the Page superclass doesn't have the necessary `encoding()` or `num_values()` methods, I would need to add a common superclass to both `DataPage` and `DataPageV2` that defined these methods. I didn't do this because I was hesitant to modify the `Page` class hierarchy for this commit. Author: Wes McKinney Author: rdmello Author: Rylan Dmello Closes #3312 from rdmello/parquet_1482 and squashes the following commits: c5cb0f37 Add DataPage base class for DataPageV1 and DataPageV2 8df8328e PARQUET-1482: Adding basic unit test for DataPageV2 serialization and deserialization. 9df32222 PARQUET-1482: Add branch to TypedRecordReader::ReadNewPage for PageType::DATA_PAGE_V2 to address incompatibility with parquetjs. --- cpp/src/parquet/arrow/record_reader.cc | 186 +++++++++++++---------- cpp/src/parquet/column_page.h | 53 +++---- cpp/src/parquet/column_reader-test.cc | 2 +- cpp/src/parquet/column_reader.cc | 16 +- cpp/src/parquet/file-deserialize-test.cc | 54 ++++++- cpp/src/parquet/test-util.h | 10 +- 6 files changed, 198 insertions(+), 123 deletions(-) diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc index 39945afc78298..c800f36d2eab6 100644 --- a/cpp/src/parquet/arrow/record_reader.cc +++ b/cpp/src/parquet/arrow/record_reader.cc @@ -48,7 +48,7 @@ namespace BitUtil = ::arrow::BitUtil; // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index // encoding. -static bool IsDictionaryIndexEncoding(const Encoding::type& e) { +static bool IsDictionaryIndexEncoding(Encoding::type e) { return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY; } @@ -86,7 +86,7 @@ class RecordReader::RecordReaderImpl { virtual ~RecordReaderImpl() = default; - virtual int64_t ReadRecordData(const int64_t num_records) = 0; + virtual int64_t ReadRecordData(int64_t num_records) = 0; // Returns true if there are still values in this column. bool HasNext() { @@ -494,7 +494,7 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl { } // Return number of logical records read - int64_t ReadRecordData(const int64_t num_records) override { + int64_t ReadRecordData(int64_t num_records) override { // Conservative upper bound const int64_t possible_num_values = std::max(num_records, levels_written_ - levels_position_); @@ -580,6 +580,13 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl { DecoderType* current_decoder_; + // Initialize repetition and definition level decoders on the next data page. + int64_t InitializeLevelDecoders(const DataPage& page, + Encoding::type repetition_level_encoding, + Encoding::type definition_level_encoding); + + void InitializeDataDecoder(const DataPage& page, int64_t levels_bytes); + // Advance to the next data page bool ReadNewPage() override; @@ -717,11 +724,94 @@ inline void TypedRecordReader::ConfigureDictionary(const DictionaryPage* DCHECK(current_decoder_); } +// If the data page includes repetition and definition levels, we +// initialize the level decoders and return the number of encoded level bytes. +// The return value helps determine the number of bytes in the encoded data. +template +int64_t TypedRecordReader::InitializeLevelDecoders( + const DataPage& page, Encoding::type repetition_level_encoding, + Encoding::type definition_level_encoding) { + // Read a data page. + num_buffered_values_ = page.num_values(); + + // Have not decoded any values from the data page yet + num_decoded_values_ = 0; + + const uint8_t* buffer = page.data(); + int64_t levels_byte_size = 0; + + // Data page Layout: Repetition Levels - Definition Levels - encoded values. + // Levels are encoded as rle or bit-packed. + // Init repetition levels + if (descr_->max_repetition_level() > 0) { + int64_t rep_levels_bytes = repetition_level_decoder_.SetData( + repetition_level_encoding, descr_->max_repetition_level(), + static_cast(num_buffered_values_), buffer); + buffer += rep_levels_bytes; + levels_byte_size += rep_levels_bytes; + } + // TODO figure a way to set max_definition_level_ to 0 + // if the initial value is invalid + + // Init definition levels + if (descr_->max_definition_level() > 0) { + int64_t def_levels_bytes = definition_level_decoder_.SetData( + definition_level_encoding, descr_->max_definition_level(), + static_cast(num_buffered_values_), buffer); + levels_byte_size += def_levels_bytes; + } + + return levels_byte_size; +} + +// Get a decoder object for this page or create a new decoder if this is the +// first page with this encoding. +template +void TypedRecordReader::InitializeDataDecoder(const DataPage& page, + int64_t levels_byte_size) { + const uint8_t* buffer = page.data() + levels_byte_size; + const int64_t data_size = page.size() - levels_byte_size; + + Encoding::type encoding = page.encoding(); + + if (IsDictionaryIndexEncoding(encoding)) { + encoding = Encoding::RLE_DICTIONARY; + } + + auto it = decoders_.find(static_cast(encoding)); + if (it != decoders_.end()) { + DCHECK(it->second.get() != nullptr); + if (encoding == Encoding::RLE_DICTIONARY) { + DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); + } + current_decoder_ = it->second.get(); + } else { + switch (encoding) { + case Encoding::PLAIN: { + auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_); + current_decoder_ = decoder.get(); + decoders_[static_cast(encoding)] = std::move(decoder); + break; + } + case Encoding::RLE_DICTIONARY: + throw ParquetException("Dictionary page must be before data page."); + + case Encoding::DELTA_BINARY_PACKED: + case Encoding::DELTA_LENGTH_BYTE_ARRAY: + case Encoding::DELTA_BYTE_ARRAY: + ParquetException::NYI("Unsupported encoding"); + + default: + throw ParquetException("Unknown encoding type."); + } + } + current_decoder_->SetData(static_cast(num_buffered_values_), buffer, + static_cast(data_size)); +} + template bool TypedRecordReader::ReadNewPage() { // Loop until we find the next data page. - const uint8_t* buffer; - while (true) { current_page_ = pager_->NextPage(); if (!current_page_) { @@ -733,80 +823,18 @@ bool TypedRecordReader::ReadNewPage() { ConfigureDictionary(static_cast(current_page_.get())); continue; } else if (current_page_->type() == PageType::DATA_PAGE) { - const DataPage* page = static_cast(current_page_.get()); - - // Read a data page. - num_buffered_values_ = page->num_values(); - - // Have not decoded any values from the data page yet - num_decoded_values_ = 0; - - buffer = page->data(); - - // If the data page includes repetition and definition levels, we - // initialize the level decoder and subtract the encoded level bytes from - // the page size to determine the number of bytes in the encoded data. - int64_t data_size = page->size(); - - // Data page Layout: Repetition Levels - Definition Levels - encoded values. - // Levels are encoded as rle or bit-packed. - // Init repetition levels - if (descr_->max_repetition_level() > 0) { - int64_t rep_levels_bytes = repetition_level_decoder_.SetData( - page->repetition_level_encoding(), descr_->max_repetition_level(), - static_cast(num_buffered_values_), buffer); - buffer += rep_levels_bytes; - data_size -= rep_levels_bytes; - } - // TODO figure a way to set max_definition_level_ to 0 - // if the initial value is invalid - - // Init definition levels - if (descr_->max_definition_level() > 0) { - int64_t def_levels_bytes = definition_level_decoder_.SetData( - page->definition_level_encoding(), descr_->max_definition_level(), - static_cast(num_buffered_values_), buffer); - buffer += def_levels_bytes; - data_size -= def_levels_bytes; - } - - // Get a decoder object for this page or create a new decoder if this is the - // first page with this encoding. - Encoding::type encoding = page->encoding(); - - if (IsDictionaryIndexEncoding(encoding)) { - encoding = Encoding::RLE_DICTIONARY; - } - - auto it = decoders_.find(static_cast(encoding)); - if (it != decoders_.end()) { - DCHECK(it->second.get() != nullptr); - if (encoding == Encoding::RLE_DICTIONARY) { - DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY); - } - current_decoder_ = it->second.get(); - } else { - switch (encoding) { - case Encoding::PLAIN: { - auto decoder = MakeTypedDecoder(Encoding::PLAIN, descr_); - current_decoder_ = decoder.get(); - decoders_[static_cast(encoding)] = std::move(decoder); - break; - } - case Encoding::RLE_DICTIONARY: - throw ParquetException("Dictionary page must be before data page."); - - case Encoding::DELTA_BINARY_PACKED: - case Encoding::DELTA_LENGTH_BYTE_ARRAY: - case Encoding::DELTA_BYTE_ARRAY: - ParquetException::NYI("Unsupported encoding"); - - default: - throw ParquetException("Unknown encoding type."); - } - } - current_decoder_->SetData(static_cast(num_buffered_values_), buffer, - static_cast(data_size)); + const auto page = std::static_pointer_cast(current_page_); + const int64_t levels_byte_size = InitializeLevelDecoders( + *page, page->repetition_level_encoding(), page->definition_level_encoding()); + InitializeDataDecoder(*page, levels_byte_size); + return true; + } else if (current_page_->type() == PageType::DATA_PAGE_V2) { + const auto page = std::static_pointer_cast(current_page_); + // Repetition and definition levels are always encoded using RLE encoding + // in the DataPageV2 format. + const int64_t levels_byte_size = + InitializeLevelDecoders(*page, Encoding::RLE, Encoding::RLE); + InitializeDataDecoder(*page, levels_byte_size); return true; } else { // We don't know what this page type is. We're allowed to skip non-data diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h index c34eee775faba..3a0355ab56f6b 100644 --- a/cpp/src/parquet/column_page.h +++ b/cpp/src/parquet/column_page.h @@ -59,45 +59,54 @@ class Page { PageType::type type_; }; +/// \brief Base type for DataPageV1 and DataPageV2 including common attributes class DataPage : public Page { public: - DataPage(const std::shared_ptr& buffer, int32_t num_values, - Encoding::type encoding, Encoding::type definition_level_encoding, - Encoding::type repetition_level_encoding, + int32_t num_values() const { return num_values_; } + Encoding::type encoding() const { return encoding_; } + const EncodedStatistics& statistics() const { return statistics_; } + + protected: + DataPage(PageType::type type, const std::shared_ptr& buffer, int32_t num_values, + Encoding::type encoding, const EncodedStatistics& statistics = EncodedStatistics()) - : Page(buffer, PageType::DATA_PAGE), + : Page(buffer, type), num_values_(num_values), encoding_(encoding), - definition_level_encoding_(definition_level_encoding), - repetition_level_encoding_(repetition_level_encoding), statistics_(statistics) {} - int32_t num_values() const { return num_values_; } + int32_t num_values_; + Encoding::type encoding_; + EncodedStatistics statistics_; +}; - Encoding::type encoding() const { return encoding_; } +class DataPageV1 : public DataPage { + public: + DataPageV1(const std::shared_ptr& buffer, int32_t num_values, + Encoding::type encoding, Encoding::type definition_level_encoding, + Encoding::type repetition_level_encoding, + const EncodedStatistics& statistics = EncodedStatistics()) + : DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, statistics), + definition_level_encoding_(definition_level_encoding), + repetition_level_encoding_(repetition_level_encoding) {} Encoding::type repetition_level_encoding() const { return repetition_level_encoding_; } Encoding::type definition_level_encoding() const { return definition_level_encoding_; } - const EncodedStatistics& statistics() const { return statistics_; } - private: - int32_t num_values_; - Encoding::type encoding_; Encoding::type definition_level_encoding_; Encoding::type repetition_level_encoding_; - EncodedStatistics statistics_; }; -class CompressedDataPage : public DataPage { +class CompressedDataPage : public DataPageV1 { public: CompressedDataPage(const std::shared_ptr& buffer, int32_t num_values, Encoding::type encoding, Encoding::type definition_level_encoding, Encoding::type repetition_level_encoding, int64_t uncompressed_size, const EncodedStatistics& statistics = EncodedStatistics()) - : DataPage(buffer, num_values, encoding, definition_level_encoding, - repetition_level_encoding, statistics), + : DataPageV1(buffer, num_values, encoding, definition_level_encoding, + repetition_level_encoding, statistics), uncompressed_size_(uncompressed_size) {} int64_t uncompressed_size() const { return uncompressed_size_; } @@ -106,29 +115,23 @@ class CompressedDataPage : public DataPage { int64_t uncompressed_size_; }; -class DataPageV2 : public Page { +class DataPageV2 : public DataPage { public: DataPageV2(const std::shared_ptr& buffer, int32_t num_values, int32_t num_nulls, int32_t num_rows, Encoding::type encoding, int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length, bool is_compressed = false) - : Page(buffer, PageType::DATA_PAGE_V2), - num_values_(num_values), + : DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding), num_nulls_(num_nulls), num_rows_(num_rows), - encoding_(encoding), definition_levels_byte_length_(definition_levels_byte_length), repetition_levels_byte_length_(repetition_levels_byte_length), is_compressed_(is_compressed) {} - int32_t num_values() const { return num_values_; } - int32_t num_nulls() const { return num_nulls_; } int32_t num_rows() const { return num_rows_; } - Encoding::type encoding() const { return encoding_; } - int32_t definition_levels_byte_length() const { return definition_levels_byte_length_; } int32_t repetition_levels_byte_length() const { return repetition_levels_byte_length_; } @@ -136,10 +139,8 @@ class DataPageV2 : public Page { bool is_compressed() const { return is_compressed_; } private: - int32_t num_values_; int32_t num_nulls_; int32_t num_rows_; - Encoding::type encoding_; int32_t definition_levels_byte_length_; int32_t repetition_levels_byte_length_; bool is_compressed_; diff --git a/cpp/src/parquet/column_reader-test.cc b/cpp/src/parquet/column_reader-test.cc index 0475ca591de02..49ca15b20a201 100644 --- a/cpp/src/parquet/column_reader-test.cc +++ b/cpp/src/parquet/column_reader-test.cc @@ -332,7 +332,7 @@ TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) { shared_ptr dict_page = std::make_shared(dummy, 0, Encoding::PLAIN); - shared_ptr data_page = MakeDataPage( + shared_ptr data_page = MakeDataPage( &descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0); pages_.push_back(dict_page); pages_.push_back(data_page); diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 33d2f5cefb5f0..9f3e52f96db17 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -234,7 +234,7 @@ std::shared_ptr SerializedPageReader::NextPage() { seen_num_rows_ += header.num_values; - return std::make_shared( + return std::make_shared( page_buffer, header.num_values, FromThrift(header.encoding), FromThrift(header.definition_level_encoding), FromThrift(header.repetition_level_encoding), page_statistics); @@ -613,27 +613,27 @@ bool TypedColumnReaderImpl::ReadNewPage() { ConfigureDictionary(static_cast(current_page_.get())); continue; } else if (current_page_->type() == PageType::DATA_PAGE) { - const DataPage* page = static_cast(current_page_.get()); + const DataPageV1& page = static_cast(*current_page_); // Read a data page. - num_buffered_values_ = page->num_values(); + num_buffered_values_ = page.num_values(); // Have not decoded any values from the data page yet num_decoded_values_ = 0; - buffer = page->data(); + buffer = page.data(); // If the data page includes repetition and definition levels, we // initialize the level decoder and subtract the encoded level bytes from // the page size to determine the number of bytes in the encoded data. - int64_t data_size = page->size(); + int64_t data_size = page.size(); // Data page Layout: Repetition Levels - Definition Levels - encoded values. // Levels are encoded as rle or bit-packed. // Init repetition levels if (descr_->max_repetition_level() > 0) { int64_t rep_levels_bytes = repetition_level_decoder_.SetData( - page->repetition_level_encoding(), descr_->max_repetition_level(), + page.repetition_level_encoding(), descr_->max_repetition_level(), static_cast(num_buffered_values_), buffer); buffer += rep_levels_bytes; data_size -= rep_levels_bytes; @@ -644,7 +644,7 @@ bool TypedColumnReaderImpl::ReadNewPage() { // Init definition levels if (descr_->max_definition_level() > 0) { int64_t def_levels_bytes = definition_level_decoder_.SetData( - page->definition_level_encoding(), descr_->max_definition_level(), + page.definition_level_encoding(), descr_->max_definition_level(), static_cast(num_buffered_values_), buffer); buffer += def_levels_bytes; data_size -= def_levels_bytes; @@ -652,7 +652,7 @@ bool TypedColumnReaderImpl::ReadNewPage() { // Get a decoder object for this page or create a new decoder if this is the // first page with this encoding. - Encoding::type encoding = page->encoding(); + Encoding::type encoding = page.encoding(); if (IsDictionaryIndexEncoding(encoding)) { encoding = Encoding::RLE_DICTIONARY; diff --git a/cpp/src/parquet/file-deserialize-test.cc b/cpp/src/parquet/file-deserialize-test.cc index e62968e5d5dc9..f2b10c7455ffd 100644 --- a/cpp/src/parquet/file-deserialize-test.cc +++ b/cpp/src/parquet/file-deserialize-test.cc @@ -90,6 +90,21 @@ class TestPageSerde : public ::testing::Test { ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get())); } + void WriteDataPageHeaderV2(int max_serialized_len = 1024, int32_t uncompressed_size = 0, + int32_t compressed_size = 0) { + // Simplifying writing serialized data page V2 headers which may or may not + // have meaningful data associated with them + + // Serialize the Page header + page_header_.__set_data_page_header_v2(data_page_header_v2_); + page_header_.uncompressed_page_size = uncompressed_size; + page_header_.compressed_page_size = compressed_size; + page_header_.type = format::PageType::DATA_PAGE_V2; + + ThriftSerializer serializer; + ASSERT_NO_THROW(serializer.Serialize(&page_header_, out_stream_.get())); + } + void ResetStream() { out_stream_.reset(new InMemoryOutputStream); } void EndStream() { out_buffer_ = out_stream_->GetBuffer(); } @@ -101,12 +116,13 @@ class TestPageSerde : public ::testing::Test { std::unique_ptr page_reader_; format::PageHeader page_header_; format::DataPageHeader data_page_header_; + format::DataPageHeaderV2 data_page_header_v2_; }; void CheckDataPageHeader(const format::DataPageHeader expected, const Page* page) { ASSERT_EQ(PageType::DATA_PAGE, page->type()); - const DataPage* data_page = static_cast(page); + const DataPageV1* data_page = static_cast(page); ASSERT_EQ(expected.num_values, data_page->num_values()); ASSERT_EQ(expected.encoding, data_page->encoding()); ASSERT_EQ(expected.definition_level_encoding, data_page->definition_level_encoding()); @@ -120,7 +136,25 @@ void CheckDataPageHeader(const format::DataPageHeader expected, const Page* page } } -TEST_F(TestPageSerde, DataPage) { +// Overload for DataPageV2 tests. +void CheckDataPageHeader(const format::DataPageHeaderV2 expected, const Page* page) { + ASSERT_EQ(PageType::DATA_PAGE_V2, page->type()); + + const DataPageV2* data_page = static_cast(page); + ASSERT_EQ(expected.num_values, data_page->num_values()); + ASSERT_EQ(expected.num_nulls, data_page->num_nulls()); + ASSERT_EQ(expected.num_rows, data_page->num_rows()); + ASSERT_EQ(expected.encoding, data_page->encoding()); + ASSERT_EQ(expected.definition_levels_byte_length, + data_page->definition_levels_byte_length()); + ASSERT_EQ(expected.repetition_levels_byte_length, + data_page->repetition_levels_byte_length()); + ASSERT_EQ(expected.is_compressed, data_page->is_compressed()); + + // TODO: Tests for DataPageHeaderV2 statistics. +} + +TEST_F(TestPageSerde, DataPageV1) { format::PageHeader out_page_header; int stats_size = 512; @@ -134,6 +168,18 @@ TEST_F(TestPageSerde, DataPage) { ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_, current_page.get())); } +TEST_F(TestPageSerde, DataPageV2) { + format::PageHeader out_page_header; + + const int32_t num_rows = 4444; + data_page_header_.num_values = num_rows; + + ASSERT_NO_FATAL_FAILURE(WriteDataPageHeaderV2()); + InitSerializedPageReader(num_rows); + std::shared_ptr current_page = page_reader_->NextPage(); + ASSERT_NO_FATAL_FAILURE(CheckDataPageHeader(data_page_header_v2_, current_page.get())); +} + TEST_F(TestPageSerde, TestLargePageHeaders) { int stats_size = 256 * 1024; // 256 KB AddDummyStats(stats_size, data_page_header_); @@ -218,11 +264,11 @@ TEST_F(TestPageSerde, Compression) { InitSerializedPageReader(num_rows * num_pages, codec_type); std::shared_ptr page; - const DataPage* data_page; + const DataPageV1* data_page; for (int i = 0; i < num_pages; ++i) { int data_size = static_cast(faux_data[i].size()); page = page_reader_->NextPage(); - data_page = static_cast(page.get()); + data_page = static_cast(page.get()); ASSERT_EQ(data_size, data_page->size()); ASSERT_EQ(0, memcmp(faux_data[i].data(), data_page->data(), data_size)); } diff --git a/cpp/src/parquet/test-util.h b/cpp/src/parquet/test-util.h index ed7c7bb901621..a234fd436ba58 100644 --- a/cpp/src/parquet/test-util.h +++ b/cpp/src/parquet/test-util.h @@ -216,7 +216,7 @@ void DataPageBuilder::AppendValues(const ColumnDescriptor* d, } template -static shared_ptr MakeDataPage( +static shared_ptr MakeDataPage( const ColumnDescriptor* d, const vector& values, int num_vals, Encoding::type encoding, const uint8_t* indices, int indices_size, const vector& def_levels, int16_t max_def_level, @@ -243,9 +243,9 @@ static shared_ptr MakeDataPage( auto buffer = page_stream.GetBuffer(); - return std::make_shared(buffer, num_values, encoding, - page_builder.def_level_encoding(), - page_builder.rep_level_encoding()); + return std::make_shared(buffer, num_values, encoding, + page_builder.def_level_encoding(), + page_builder.rep_level_encoding()); } template @@ -357,7 +357,7 @@ static void PaginateDict(const ColumnDescriptor* d, rep_level_start = i * num_levels_per_page; rep_level_end = (i + 1) * num_levels_per_page; } - shared_ptr data_page = MakeDataPage( + shared_ptr data_page = MakeDataPage( d, {}, values_per_page[i], encoding, rle_indices[i]->data(), static_cast(rle_indices[i]->size()), slice(def_levels, def_level_start, def_level_end), max_def_level,