Skip to content

Commit

Permalink
[fix](parquet) A row of complex type may be stored across more pages (#…
Browse files Browse the repository at this point in the history
…23277)

A row of complex type may be stored across two(or more) pages, and the parameter `align_rows` indicates that whether the reader should read the remaining value of the last row in previous page.
  • Loading branch information
AshinGau authored Aug 22, 2023
1 parent fbccd5c commit 9d2e23b
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 20 deletions.
63 changes: 44 additions & 19 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,25 @@ Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& doris_colu
return _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter);
}

/**
* Load the nested column data of complex type.
* A row of complex type may be stored across two(or more) pages, and the parameter `align_rows` indicates that
* whether the reader should read the remaining value of the last row in previous page.
*/
Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t batch_size,
size_t* read_rows, bool* eof, bool is_dict_filter) {
_rep_levels.resize(0);
_def_levels.resize(0);
size_t* read_rows, bool* eof, bool is_dict_filter,
bool align_rows = false) {
size_t origin_size = 0;
if (align_rows) {
origin_size = _rep_levels.size();
// just read the remaining values of the last row in previous page,
// so there's no a new row should be read.
batch_size = 0;
} else {
_rep_levels.resize(0);
_def_levels.resize(0);
}
size_t parsed_rows = 0;
size_t remaining_values = _chunk_reader->remaining_num_values();
bool has_rep_level = _chunk_reader->max_rep_level() > 0;
Expand All @@ -327,22 +341,18 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
_rep_levels.emplace_back(rep_level);
remaining_values--;
}
} else {
} else if (!align_rows) {
// case : required child columns in struct type
parsed_rows = std::min(remaining_values, batch_size);
remaining_values -= parsed_rows;
_rep_levels.resize(parsed_rows);
for (size_t i = 0; i < parsed_rows; ++i) {
_rep_levels[i] = 0;
}
_rep_levels.resize(parsed_rows, 0);
}
size_t parsed_values = _chunk_reader->remaining_num_values() - remaining_values;
_def_levels.resize(parsed_values);
_def_levels.resize(origin_size + parsed_values);
if (has_def_level) {
_chunk_reader->def_level_decoder().get_levels(&_def_levels[0], parsed_values);
_chunk_reader->def_level_decoder().get_levels(&_def_levels[origin_size], parsed_values);
} else {
for (size_t i = 0; i < parsed_values; ++i) {
_def_levels[i] = 0;
}
std::fill(_def_levels.begin() + origin_size, _def_levels.end(), 0);
}

MutableColumnPtr data_column;
Expand All @@ -360,14 +370,14 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
}
data_column = doris_column->assume_mutable();
}
size_t has_read = 0;
size_t has_read = origin_size;
size_t ancestor_nulls = 0;
null_map.emplace_back(0);
bool prev_is_null = false;
while (has_read < parsed_values) {
while (has_read < origin_size + parsed_values) {
level_t def_level = _def_levels[has_read++];
size_t loop_read = 1;
while (has_read < parsed_values && _def_levels[has_read] == def_level) {
while (has_read < origin_size + parsed_values && _def_levels[has_read] == def_level) {
has_read++;
loop_read++;
}
Expand Down Expand Up @@ -407,9 +417,24 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType
_chunk_reader->skip_values(ancestor_nulls, false);
}

*read_rows = parsed_rows;
if (_chunk_reader->remaining_num_values() == 0 && !_chunk_reader->has_next_page()) {
*eof = true;
if (!align_rows) {
*read_rows = parsed_rows;
}
if (_chunk_reader->remaining_num_values() == 0) {
if (_chunk_reader->has_next_page()) {
RETURN_IF_ERROR(_chunk_reader->next_page());
RETURN_IF_ERROR(_chunk_reader->load_page_data());
select_vector.reset();
return _read_nested_column(doris_column, type, select_vector, 0, read_rows, eof,
is_dict_filter, true);
} else {
*eof = true;
}
}
if (_rep_levels.size() > 0) {
// make sure the rows of complex type are aligned correctly,
// so the repetition level of first element should be 0, meaning a new row is started.
DCHECK_EQ(_rep_levels[0], 0);
}
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/vparquet_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class ScalarColumnReader : public ParquetColumnReader {
ColumnSelectVector& select_vector, bool is_dict_filter);
Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t batch_size,
size_t* read_rows, bool* eof, bool is_dict_filter);
size_t* read_rows, bool* eof, bool is_dict_filter, bool align_rows);
Status _try_load_dict_page(bool* loaded, bool* has_dict);
};

Expand Down
3 changes: 3 additions & 0 deletions regression-test/data/external_table_p2/tvf/test_tvf_p2.out
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@
-- !nested_types_orc --
20926 20928 20978 23258 20962 23258 23258

-- !row_cross_pages --
25001 25001 25001

Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,12 @@ suite("test_tvf_p2", "p2,external,tvf,external_remote,external_remote_tvf") {
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/orc/all_nested_types.orc",
"format" = "orc",
"fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""

// a row of complex type may be stored across more pages
qt_row_cross_pages """select count(id), count(m1), count(m2)
from hdfs(
"uri" = "hdfs://${nameNodeHost}:${hdfsPort}/catalog/tvf/parquet/row_cross_pages.parquet",
"format" = "parquet",
"fs.defaultFS" = "hdfs://${nameNodeHost}:${hdfsPort}")"""
}
}

0 comments on commit 9d2e23b

Please sign in to comment.