From 2a08d5a5f83aaf991b014fabf73a2e06c066f2f6 Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Wed, 14 Aug 2024 09:31:15 +0800 Subject: [PATCH] [fix](compaction) fix mismatch between segment key and value column rows during compaction (#37960) (#38251) (#38356) (#39263) pick master #37960 #38251 #38356 --- .../rowset/vertical_beta_rowset_writer.cpp | 55 +++++++++---------- 1 file changed, 26 insertions(+), 29 deletions(-) diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index 05730ec9f3aef3..31b9e94f7329d2 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -95,35 +95,32 @@ Status VerticalBetaRowsetWriter::add_columns(const vectorized::Block* block, RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows)); } else { // value columns - uint32_t num_rows_written = _segment_writers[_cur_writer_idx]->num_rows_written(); - VLOG_NOTICE << "num_rows_written: " << num_rows_written - << ", _cur_writer_idx: " << _cur_writer_idx; - uint32_t num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count(); - // init if it's first value column write in current segment - if (_cur_writer_idx == 0 && num_rows_written == 0) { - VLOG_NOTICE << "init first value column segment writer"; - RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key)); - } - // when splitting segment, need to make rows align between key columns and value columns - size_t start_offset = 0, limit = num_rows; - if (num_rows_written + num_rows >= num_rows_key_group && - _cur_writer_idx < _segment_writers.size() - 1) { - RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block( - block, 0, num_rows_key_group - num_rows_written)); - RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx])); - start_offset = num_rows_key_group - num_rows_written; - limit = num_rows - start_offset; - ++_cur_writer_idx; - // switch to next writer - RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key)); - num_rows_written = 0; - num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count(); - } - if (limit > 0) { - RETURN_IF_ERROR( - _segment_writers[_cur_writer_idx]->append_block(block, start_offset, limit)); - DCHECK(_segment_writers[_cur_writer_idx]->num_rows_written() <= - _segment_writers[_cur_writer_idx]->row_count()); + int64_t left = num_rows; + while (left > 0) { + uint32_t num_rows_written = _segment_writers[_cur_writer_idx]->num_rows_written(); + VLOG_NOTICE << "num_rows_written: " << num_rows_written + << ", _cur_writer_idx: " << _cur_writer_idx; + uint32_t num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count(); + CHECK_LT(num_rows_written, num_rows_key_group); + // init if it's first value column write in current segment + if (num_rows_written == 0) { + VLOG_NOTICE << "init first value column segment writer"; + RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key)); + } + + int64_t to_write = num_rows_written + left >= num_rows_key_group + ? num_rows_key_group - num_rows_written + : left; + RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, num_rows - left, + to_write)); + left -= to_write; + CHECK_GE(left, 0); + + if (num_rows_key_group == num_rows_written + to_write && + _cur_writer_idx < _segment_writers.size() - 1) { + RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx])); + ++_cur_writer_idx; + } } } if (is_key) {