Skip to content

Commit

Permalink
[fix](parquet)Fix data column and null map column not equal when read…
Browse files Browse the repository at this point in the history
…ing Parquet complex type cross-page data (#47734)

### What problem does this PR solve?


Related PR: #23277

Problem Summary:
Previously, you may encounter this error when reading parquet complex
types. This PR is mainly to fix this problem.
```
[fragment_mgr.cpp:549] report error status: cur path:  xxx. parquet. Read parquet file   xxxx.parquet failed, reason = [E-1721][E-1721] Size of filter doesn't match size of column: size=3156, filter.size=12544

0#  doris::Exception::Exception(int, std::basic_string_view<char, std::char_traits<char> > const&) at //ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:173
1#  doris::Exception::Exception<unsigned long&, unsigned long&>(int, std::basic_string_view<char, std::char_traits<char> > const&, unsigned long&, unsigned long&) at //ldb_toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/basic_string.h:187
2#  doris::vectorized::ColumnVector<unsigned char>::filter(doris::vectorized::PODArray<unsigned char, 4096ul, Allocator<false, false, false, DefaultMemoryAllocator>, 16ul, 16ul> const&) at /be/src/vec/columns/columns_common.h:86
3#  doris::vectorized::ColumnNullable::filter(doris::vectorized::PODArray<unsigned char, 4096ul, Allocator<false, false, false, DefaultMemoryAllocator>, 16ul, 16ul> const&) at /be/src/vec/columns/column_nullable.cpp:373
4#  doris::vectorized::ColumnStruct::filter(doris::vectorized::PODArray<unsigned char, 4096ul, Allocator<false, false, false, DefaultMemoryAllocator>, 16ul, 16ul> const&) at /be/src/vec/columns/column_struct.cpp:289
5#  doris::vectorized::ColumnArray::filter_generic(doris::vectorized::PODArray<unsigned char, 4096ul, Allocator<false, false, false, DefaultMemoryAllocator>, 16ul, 16ul> const&) at /be/src/vec/common/cow.h:402
6#  doris::vectorized::ColumnArray::filter_nullable(doris::vectorized::PODArray<unsigned char, 4096ul, Allocator<false, false, false, DefaultMemoryAllocator>, 16ul, 16ul> const&) at /be/src/vec/columns/column_array.cpp:877
7#  doris::vectorized::ColumnNullable::filter(doris::vectorized::PODArray<unsigned char, 4096ul, Allocator<false, false, false, DefaultMemoryAllocator>, 16ul, 16ul> const&) at /be/src/vec/columns/column_nullable.cpp:371
8#  doris::vectorized::Block::filter_block_internal(doris::vectorized::Block*, std::vector<unsigned int, std::allocator<unsigned int> > const&, doris::vectorized::PODArray<unsigned char, 4096ul, Allocator<false, false, false, DefaultMemoryAllocator>, 16ul, 16ul> const&) at /be/src/vec/core/block.cpp:790
9#  doris::vectorized::RowGroupReader::next_batch(doris::vectorized::Block*, unsigned long, unsigned long*, bool*) at /be/src/vec/exec/format/parquet/vparquet_group_reader.cpp:0
10# doris::vectorized::ParquetReader::get_next_block(doris::vectorized::Block*, unsigned long*, bool*) at /be/src/common/status.h:486
11# doris::vectorized::IcebergTableReader::get_next_block(doris::vectorized::Block*, unsigned long*, bool*) at /be/src/common/status.h:491
12# doris::vectorized::VFileScanner::_get_block_wrapped(doris::RuntimeState*, doris::vectorized::Block*, bool*) at /be/src/common/status.h:491
13# doris::vectorized::VFileScanner::_get_block_impl(doris::RuntimeState*, doris::vectorized::Block*, bool*) at /be/src/common/status.h:491
14# doris::vectorized::VScanner::get_block(doris::RuntimeState*, doris::vectorized::Block*, bool*) at /be/src/vec/exec/scan/vscanner.cpp:0
15# doris::vectorized::VScanner::get_block_after_projects(doris::RuntimeState*, doris::vectorized::Block*, bool*) at /be/src/vec/exec/scan/vscanner.cpp:101
16# doris::vectorized::ScannerScheduler::_scanner_scan(std::shared_ptr<doris::vectorized::ScannerContext>, std::shared_ptr<doris::vectorized::ScanTask>) at /be/src/common/status.h:378
```
  • Loading branch information
hubgeter authored and Your Name committed Feb 18, 2025
1 parent dd86f9d commit bf8f265
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 10 deletions.
23 changes: 22 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,18 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
// just read the remaining values of the last row in previous page,
// so there's no a new row should be read.
batch_size = 0;
/*
* Since the function is repeatedly called to fetch data for the batch size,
* it causes `_rep_levels.resize(0); _def_levels.resize(0);`, resulting in the
* definition and repetition levels of the reader only containing the latter
* part of the batch (i.e., missing some parts). Therefore, when using the
* definition and repetition levels to fill the null_map for structs and maps,
* the function should not be called multiple times before filling.
* todo:
* We may need to consider reading the entire batch of data at once, as this approach
* would be more user-friendly in terms of function usage. However, we must consider that if the
* data spans multiple pages, memory usage may increase significantly.
*/
} else {
_rep_levels.resize(0);
_def_levels.resize(0);
Expand Down Expand Up @@ -746,7 +758,7 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
continue;
}

_read_column_names.insert(doris_name);
_read_column_names.emplace_back(doris_name);

select_vector.reset();
size_t field_rows = 0;
Expand All @@ -758,6 +770,15 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
is_dict_filter));
*read_rows = field_rows;
*eof = field_eof;
/*
* Considering the issue in the `_read_nested_column` function where data may span across pages, leading
* to missing definition and repetition levels, when filling the null_map of the struct later, it is
* crucial to use the definition and repetition levels from the first read column
* (since `_read_nested_column` is not called repeatedly).
*
* It is worth mentioning that, theoretically, any sub-column can be chosen to fill the null_map,
* and selecting the shortest one will offer better performance
*/
} else {
while (field_rows < *read_rows && !field_eof) {
size_t loop_rows = 0;
Expand Down
21 changes: 14 additions & 7 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,24 +280,30 @@ class StructColumnReader : public ParquetColumnReader {
if (!_read_column_names.empty()) {
// can't use _child_readers[*_read_column_names.begin()]
// because the operator[] of std::unordered_map is not const :(
return _child_readers.find(*_read_column_names.begin())->second->get_rep_level();
/*
* Considering the issue in the `_read_nested_column` function where data may span across pages, leading
* to missing definition and repetition levels, when filling the null_map of the struct later, it is
* crucial to use the definition and repetition levels from the first read column,
* that is `_read_column_names.front()`.
*/
return _child_readers.find(_read_column_names.front())->second->get_rep_level();
}
return _child_readers.begin()->second->get_rep_level();
}

const std::vector<level_t>& get_def_level() const override {
if (!_read_column_names.empty()) {
return _child_readers.find(*_read_column_names.begin())->second->get_def_level();
return _child_readers.find(_read_column_names.front())->second->get_def_level();
}
return _child_readers.begin()->second->get_def_level();
}

Statistics statistics() override {
Statistics st;
for (const auto& reader : _child_readers) {
// make sure the field is read
if (_read_column_names.find(reader.first) != _read_column_names.end()) {
Statistics cst = reader.second->statistics();
for (const auto& column_name : _read_column_names) {
auto reader = _child_readers.find(column_name);
if (reader != _child_readers.end()) {
Statistics cst = reader->second->statistics();
st.merge(cst);
}
}
Expand All @@ -308,7 +314,8 @@ class StructColumnReader : public ParquetColumnReader {

private:
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _child_readers;
std::set<std::string> _read_column_names;
std::vector<std::string> _read_column_names;
//Need to use vector instead of set,see `get_rep_level()` for the reason.
};

}; // namespace doris::vectorized
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,12 @@
-- !viewfs --
25001 25001 25001

-- !row_cross_pages_2 --
149923 149923

-- !row_cross_pages_3 --
74815 74815

-- !row_cross_pages_4 --
457 457

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !1 --
1

-- !2 --
5000

-- !3 --
5000

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") {
suite("test_tvf_p0", "p0,external,tvf,external_docker,hive") {
String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String nameNodeHost = context.config.otherConfigs.get("externalEnvIp")
Expand Down Expand Up @@ -46,7 +46,7 @@ suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") {
"format" = "orc");
"""

// a row of complex type may be stored across more pages
// (1): a row of complex type may be stored across more pages
qt_row_cross_pages """select count(id), count(m1), count(m2)
from hdfs(
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages.parquet",
Expand All @@ -73,5 +73,25 @@ suite("test_tvf_p2", "p0,external,tvf,external_docker,hive") {
"format" = "parquet",
"fs.viewfs.mounttable.my-cluster.link./ns1" = "hdfs://${nameNodeHost}:${hdfsPort}/",
"fs.viewfs.mounttable.my-cluster.homedir" = "/ns1")"""

// (2): a row of complex type may be stored across more pages
qt_row_cross_pages_2 """select count(id), count(experiment)
from hdfs(
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet",
"format" = "parquet");
""" //149923

qt_row_cross_pages_3 """select count(id), count(experiment)
from hdfs(
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet",
"format" = "parquet") where id > 49923 ;
""" // 74815

qt_row_cross_pages_4 """select count(id), count(experiment)
from hdfs(
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages_2.parquet",
"format" = "parquet") where id < 300 ;
""" //457

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_parquet_complex_cross_page", "p2,external,hive,external_remote,external_remote_hive") {

String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
//hudi hive use same catalog in p2.
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("disable test")
return;
}

String props = context.config.otherConfigs.get("hudiEmrCatalog")
String hms_catalog_name = "test_parquet_complex_cross_page"

sql """drop catalog if exists ${hms_catalog_name};"""
sql """
CREATE CATALOG IF NOT EXISTS ${hms_catalog_name}
PROPERTIES (
${props}
,'hive.version' = '3.1.3'
);
"""

logger.info("catalog " + hms_catalog_name + " created")
sql """switch ${hms_catalog_name};"""
logger.info("switched to catalog " + hms_catalog_name)
sql """ use regression;"""

sql """ set dry_run_query=true; """

qt_1 """ SELECT * FROM test_parquet_complex_cross_page WHERE device_id='DZ692' and format_time between 1737693770300 and 1737693770500
and date between '20250124' and '20250124' and project='GA20230001' ; """
qt_2 """ SELECT functions_pnc_ssm_road_di_objects from test_parquet_complex_cross_page ; """
qt_3 """ select * from test_parquet_complex_cross_page ; """

sql """drop catalog ${hms_catalog_name};"""
}

0 comments on commit bf8f265

Please sign in to comment.