From 1f07404dac920bf81f852f834622f2fc30f8dcfc Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 21 May 2024 18:38:17 +0800 Subject: [PATCH] GH-41321: [C++][Parquet] More strict Parquet level checking (#41346) ### Rationale for this change In https://github.com/apache/arrow/issues/41321 , user reports a corrupt when reading from a corrupt parquet file. This is because we lost some checking. Current code works on reading a normal parquet file. But when reading a corrupt file, this need to be more strict. **Currently this patch just enhance the checking on Parquet Level, the correspond value check would be add in later patches** ### What changes are included in this PR? More strict parquet checkings on Level ### Are these changes tested? Already exists test, maybe we can introduce parquet file as test file ### Are there any user-facing changes? More strict checkings * GitHub Issue: #41321 Lead-authored-by: mwish Co-authored-by: mwish Signed-off-by: mwish --- cpp/src/parquet/column_reader.cc | 109 ++++++++++++++++---------- cpp/src/parquet/column_reader.h | 2 +- cpp/src/parquet/column_reader_test.cc | 76 +++++++++++++++++- 3 files changed, 143 insertions(+), 44 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index a4794c564733a..cfd2fea3746f4 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -101,6 +101,10 @@ inline void CheckNumberDecoded(int64_t number_decoded, int64_t expected) { std::to_string(expected)); } } + +constexpr std::string_view kErrorRepDefLevelNotMatchesNumValues = + "Number of decoded rep / def levels do not match num_values in page header"; + } // namespace LevelDecoder::LevelDecoder() : num_values_remaining_(0) {} @@ -907,6 +911,8 @@ class ColumnReaderImplBase { static_cast(data_size)); } + // Available values in the current data page, value includes repeated values + // and nulls. int64_t available_values_current_page() const { return num_buffered_values_ - num_decoded_values_; } @@ -933,7 +939,7 @@ class ColumnReaderImplBase { int64_t num_buffered_values_; // The number of values from the current data page that have been decoded - // into memory + // into memory or skipped over. int64_t num_decoded_values_; ::arrow::MemoryPool* pool_; @@ -1026,28 +1032,36 @@ class TypedColumnReaderImpl : public TypedColumnReader, // Read definition and repetition levels. Also return the number of definition levels // and number of values to read. This function is called before reading values. + // + // ReadLevels will throw exception when any num-levels read is not equal to the number + // of the levels can be read. void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, - int64_t* num_def_levels, int64_t* values_to_read) { - batch_size = - std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); + int64_t* num_def_levels, int64_t* non_null_values_to_read) { + batch_size = std::min(batch_size, this->available_values_current_page()); // If the field is required and non-repeated, there are no definition levels if (this->max_def_level_ > 0 && def_levels != nullptr) { *num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); + if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) { + throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); + } // TODO(wesm): this tallying of values-to-decode can be performed with better // cache-efficiency if fused with the level decoding. - *values_to_read += + *non_null_values_to_read += std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_); } else { // Required field, read all values - *values_to_read = batch_size; + if (num_def_levels != nullptr) { + *num_def_levels = 0; + } + *non_null_values_to_read = batch_size; } // Not present for non-repeated fields if (this->max_rep_level_ > 0 && rep_levels != nullptr) { int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); - if (def_levels != nullptr && *num_def_levels != num_rep_levels) { - throw ParquetException("Number of decoded rep / def levels did not match"); + if (batch_size != num_rep_levels) { + throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); } } } @@ -1090,8 +1104,7 @@ int64_t TypedColumnReaderImpl::ReadBatchWithDictionary( *indices_read = ReadDictionaryIndices(indices_to_read, indices); int64_t total_indices = std::max(num_def_levels, *indices_read); // Some callers use a batch size of 0 just to get the dictionary. - int64_t expected_values = - std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); + int64_t expected_values = std::min(batch_size, this->available_values_current_page()); if (total_indices == 0 && expected_values > 0) { std::stringstream ss; ss << "Read 0 values, expected " << expected_values; @@ -1106,7 +1119,8 @@ template int64_t TypedColumnReaderImpl::ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, int64_t* values_read) { - // HasNext invokes ReadNewPage + // HasNext might invoke ReadNewPage until a data page with + // `available_values_current_page() > 0` is found. if (!HasNext()) { *values_read = 0; return 0; @@ -1115,20 +1129,31 @@ int64_t TypedColumnReaderImpl::ReadBatch(int64_t batch_size, int16_t* def // TODO(wesm): keep reading data pages until batch_size is reached, or the // row group is finished int64_t num_def_levels = 0; - int64_t values_to_read = 0; - ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &values_to_read); - - *values_read = this->ReadValues(values_to_read, values); + // Number of non-null values to read within `num_def_levels`. + int64_t non_null_values_to_read = 0; + ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, + &non_null_values_to_read); + // Should not return more values than available in the current data page, + // since currently, ReadLevels would only consume level from current + // data page. + if (ARROW_PREDICT_FALSE(num_def_levels > this->available_values_current_page())) { + throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); + } + if (non_null_values_to_read != 0) { + *values_read = this->ReadValues(non_null_values_to_read, values); + } else { + *values_read = 0; + } + // Adjust total_values, since if max_def_level_ == 0, num_def_levels would + // be 0 and `values_read` would adjust to `available_values_current_page()`. int64_t total_values = std::max(num_def_levels, *values_read); - int64_t expected_values = - std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); + int64_t expected_values = std::min(batch_size, this->available_values_current_page()); if (total_values == 0 && expected_values > 0) { std::stringstream ss; ss << "Read 0 values, expected " << expected_values; ParquetException::EofException(ss.str()); } this->ConsumeBufferedValues(total_values); - return total_values; } @@ -1137,7 +1162,8 @@ int64_t TypedColumnReaderImpl::ReadBatchSpaced( int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read, int64_t* values_read, int64_t* null_count_out) { - // HasNext invokes ReadNewPage + // HasNext might invoke ReadNewPage until a data page with + // `available_values_current_page() > 0` is found. if (!HasNext()) { *levels_read = 0; *values_read = 0; @@ -1145,21 +1171,24 @@ int64_t TypedColumnReaderImpl::ReadBatchSpaced( return 0; } + // Number of non-null values to read int64_t total_values; // TODO(wesm): keep reading data pages until batch_size is reached, or the // row group is finished - batch_size = - std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); + batch_size = std::min(batch_size, this->available_values_current_page()); // If the field is required and non-repeated, there are no definition levels if (this->max_def_level_ > 0) { int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); + if (ARROW_PREDICT_FALSE(num_def_levels != batch_size)) { + throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); + } // Not present for non-repeated fields if (this->max_rep_level_ > 0) { int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); - if (num_def_levels != num_rep_levels) { - throw ParquetException("Number of decoded rep / def levels did not match"); + if (ARROW_PREDICT_FALSE(num_def_levels != num_rep_levels)) { + throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); } } @@ -1401,26 +1430,21 @@ class TypedRecordReader : public TypedColumnReaderImpl, int16_t* def_levels = this->def_levels() + levels_written_; int16_t* rep_levels = this->rep_levels() + levels_written_; - // Not present for non-repeated fields - int64_t levels_read = 0; + if (ARROW_PREDICT_FALSE(this->ReadDefinitionLevels(batch_size, def_levels) != + batch_size)) { + throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); + } if (this->max_rep_level_ > 0) { - levels_read = this->ReadDefinitionLevels(batch_size, def_levels); - if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { - throw ParquetException("Number of decoded rep / def levels did not match"); + int64_t rep_levels_read = this->ReadRepetitionLevels(batch_size, rep_levels); + if (ARROW_PREDICT_FALSE(rep_levels_read != batch_size)) { + throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); } - } else if (this->max_def_level_ > 0) { - levels_read = this->ReadDefinitionLevels(batch_size, def_levels); } - // Exhausted column chunk - if (levels_read == 0) { - break; - } - - levels_written_ += levels_read; + levels_written_ += batch_size; records_read += ReadRecordData(num_records - records_read); } else { - // No repetition or definition levels + // No repetition and definition levels, we can read values directly batch_size = std::min(num_records - records_read, batch_size); records_read += ReadRecordData(batch_size); } @@ -1574,13 +1598,14 @@ class TypedRecordReader : public TypedColumnReaderImpl, int16_t* def_levels = this->def_levels() + levels_written_; int16_t* rep_levels = this->rep_levels() + levels_written_; - int64_t levels_read = 0; - levels_read = this->ReadDefinitionLevels(batch_size, def_levels); - if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { - throw ParquetException("Number of decoded rep / def levels did not match"); + if (this->ReadDefinitionLevels(batch_size, def_levels) != batch_size) { + throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); + } + if (this->ReadRepetitionLevels(batch_size, rep_levels) != batch_size) { + throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); } - levels_written_ += levels_read; + levels_written_ += batch_size; int64_t remaining_records = num_records - skipped_records; // This updates at_record_start_. skipped_records += DelimitAndSkipRecordsInBuffer(remaining_records); diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h index 086f6c0e55806..29e1b2a25e437 100644 --- a/cpp/src/parquet/column_reader.h +++ b/cpp/src/parquet/column_reader.h @@ -197,7 +197,7 @@ class PARQUET_EXPORT ColumnReader { template class TypedColumnReader : public ColumnReader { public: - typedef typename DType::c_type T; + using T = typename DType::c_type; // Read a batch of repetition levels, definition levels, and values from the // column. diff --git a/cpp/src/parquet/column_reader_test.cc b/cpp/src/parquet/column_reader_test.cc index a48573966a905..9096f195687fb 100644 --- a/cpp/src/parquet/column_reader_test.cc +++ b/cpp/src/parquet/column_reader_test.cc @@ -415,7 +415,7 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) { &descr, values, /*num_values=*/2, Encoding::PLAIN, /*indices=*/{}, /*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_, /*rep_levels=*/{}, - /*max_rep_level=*/0); + /*max_rep_level=*/max_rep_level_); pages_.push_back(data_page); InitReader(&descr); auto reader = static_cast(reader_.get()); @@ -431,6 +431,80 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) { ParquetException); } +// GH-41321: When max_def_level > 0 or max_rep_level > 0, and +// Page has more or less levels than the `num_values` in +// PageHeader. We should detect and throw exception. +TEST_F(TestPrimitiveReader, DefRepLevelNotExpected) { + auto do_check = [&](const NodePtr& type, const std::vector& input_def_levels, + const std::vector& input_rep_levels, int num_values) { + std::vector values(num_values, false); + const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); + + // The data page falls back to plain encoding + std::shared_ptr dummy = AllocateBuffer(); + std::shared_ptr data_page = MakeDataPage( + &descr, values, /*num_values=*/num_values, Encoding::PLAIN, /*indices=*/{}, + /*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_, + /*rep_levels=*/input_rep_levels, + /*max_rep_level=*/max_rep_level_); + pages_.push_back(data_page); + InitReader(&descr); + auto reader = static_cast(reader_.get()); + ASSERT_TRUE(reader->HasNext()); + + constexpr int batch_size = 10; + std::vector def_levels(batch_size, 0); + std::vector rep_levels(batch_size, 0); + bool values_out[batch_size]; + int64_t values_read; + EXPECT_THROW_THAT( + [&]() { + reader->ReadBatch(batch_size, def_levels.data(), rep_levels.data(), values_out, + &values_read); + }, + ParquetException, + ::testing::Property(&ParquetException::what, + ::testing::HasSubstr("Number of decoded rep / def levels do " + "not match num_values in page header"))); + }; + // storing def-levels less than value in page-header + { + max_def_level_ = 1; + max_rep_level_ = 0; + NodePtr type = schema::Boolean("a", Repetition::OPTIONAL); + std::vector input_def_levels(1, 1); + std::vector input_rep_levels{}; + do_check(type, input_def_levels, input_rep_levels, /*num_values=*/3); + } + // storing def-levels more than value in page-header + { + max_def_level_ = 1; + max_rep_level_ = 0; + NodePtr type = schema::Boolean("a", Repetition::OPTIONAL); + std::vector input_def_levels(2, 1); + std::vector input_rep_levels{}; + do_check(type, input_def_levels, input_rep_levels, /*num_values=*/1); + } + // storing rep-levels less than value in page-header + { + max_def_level_ = 0; + max_rep_level_ = 1; + NodePtr type = schema::Boolean("a", Repetition::REPEATED); + std::vector input_def_levels{}; + std::vector input_rep_levels(3, 0); + do_check(type, input_def_levels, input_rep_levels, /*num_values=*/4); + } + // storing rep-levels more than value in page-header + { + max_def_level_ = 0; + max_rep_level_ = 1; + NodePtr type = schema::Boolean("a", Repetition::REPEATED); + std::vector input_def_levels{}; + std::vector input_rep_levels(2, 1); + do_check(type, input_def_levels, input_rep_levels, /*num_values=*/1); + } +} + // Repetition level byte length reported in Page but Max Repetition level // is zero for the column. TEST_F(TestPrimitiveReader, TestRepetitionLvlBytesWithMaxRepetitionZero) {