From 03544944f7ef9b8fac0cdbcf62f8051153e6ce75 Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Tue, 2 Jul 2024 21:43:15 +0800 Subject: [PATCH 1/4] [enhancement](compaction) optimizing memory usage for compaction (#37099) --- be/src/common/config.cpp | 6 + be/src/common/config.h | 6 + be/src/olap/base_compaction.cpp | 10 ++ be/src/olap/base_tablet.h | 5 + be/src/olap/compaction.cpp | 14 +- be/src/olap/compaction.h | 2 + be/src/olap/cumulative_compaction.cpp | 11 +- be/src/olap/iterators.h | 15 +- be/src/olap/merger.cpp | 67 ++++++++- be/src/olap/merger.h | 6 +- be/src/olap/rowset/rowset_meta.h | 15 ++ be/src/olap/rowset/segcompaction.cpp | 2 +- be/src/olap/tablet_reader.h | 2 + be/src/vec/olap/vertical_block_reader.cpp | 23 ++- be/src/vec/olap/vertical_block_reader.h | 3 +- be/src/vec/olap/vertical_merge_iterator.cpp | 29 ++-- be/src/vec/olap/vertical_merge_iterator.h | 25 +++- be/test/olap/base_compaction_test.cpp | 84 +++++++++++ be/test/olap/rowid_conversion_test.cpp | 2 +- be/test/vec/olap/vertical_compaction_test.cpp | 32 +++- .../compaction_width_array_column.groovy | 137 ++++++++++++++++++ 21 files changed, 466 insertions(+), 30 deletions(-) create mode 100644 be/test/olap/base_compaction_test.cpp create mode 100644 regression-test/suites/compaction/compaction_width_array_column.groovy diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 11e2635047b291..b1b4fbd87fe64c 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -375,6 +375,7 @@ DEFINE_mInt32(max_single_replica_compaction_threads, "-1"); DEFINE_Bool(enable_base_compaction_idle_sched, "true"); DEFINE_mInt64(base_compaction_min_rowset_num, "5"); +DEFINE_mInt64(base_compaction_max_compaction_score, "20"); DEFINE_mDouble(base_compaction_min_data_ratio, "0.3"); DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024"); @@ -405,6 +406,7 @@ DEFINE_mInt64(compaction_min_size_mbytes, "64"); // cumulative compaction policy: min and max delta file's number DEFINE_mInt64(cumulative_compaction_min_deltas, "5"); DEFINE_mInt64(cumulative_compaction_max_deltas, "1000"); +DEFINE_mInt32(cumulative_compaction_max_deltas_factor, "10"); // This config can be set to limit thread number in multiget thread pool. DEFINE_mInt32(multi_get_max_threads, "10"); @@ -1250,6 +1252,10 @@ DEFINE_Int64(min_row_group_size, "134217728"); // The time out milliseconds for remote fetch schema RPC, default 60s DEFINE_mInt64(fetch_remote_schema_rpc_timeout_ms, "60000"); +DEFINE_mInt64(compaction_memory_bytes_limit, "1073741824"); + +DEFINE_mInt64(compaction_batch_size, "-1"); + // If set to false, the parquet reader will not use page index to filter data. // This is only for debug purpose, in case sometimes the page index // filter wrong data. diff --git a/be/src/common/config.h b/be/src/common/config.h index 26e7fe00c79b25..5d9a299c7a2fbe 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -426,6 +426,7 @@ DECLARE_mInt32(max_single_replica_compaction_threads); DECLARE_Bool(enable_base_compaction_idle_sched); DECLARE_mInt64(base_compaction_min_rowset_num); +DECLARE_mInt64(base_compaction_max_compaction_score); DECLARE_mDouble(base_compaction_min_data_ratio); DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes); @@ -456,6 +457,7 @@ DECLARE_mInt64(compaction_min_size_mbytes); // cumulative compaction policy: min and max delta file's number DECLARE_mInt64(cumulative_compaction_min_deltas); DECLARE_mInt64(cumulative_compaction_max_deltas); +DECLARE_mInt32(cumulative_compaction_max_deltas_factor); // This config can be set to limit thread number in multiget thread pool. DECLARE_mInt32(multi_get_max_threads); @@ -1333,6 +1335,10 @@ DECLARE_mInt64(fetch_remote_schema_rpc_timeout_ms); // The minimum row group size when exporting Parquet files. DECLARE_Int64(min_row_group_size); +DECLARE_mInt64(compaction_memory_bytes_limit); + +DECLARE_mInt64(compaction_batch_size); + DECLARE_mBool(enable_parquet_page_index); #ifdef BE_TEST diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 474909cbf45b65..a9455d453818bb 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -154,6 +154,16 @@ Status BaseCompaction::pick_rowsets_to_compact() { "situation, no need to do base compaction."); } + int score = 0; + int rowset_cnt = 0; + while (rowset_cnt < _input_rowsets.size()) { + score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score(); + if (score > config::base_compaction_max_compaction_score) { + break; + } + } + _input_rowsets.resize(rowset_cnt); + // 1. cumulative rowset must reach base_compaction_num_cumulative_deltas threshold if (_input_rowsets.size() > config::base_compaction_min_rowset_num) { VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 2fa494b420aab3..29109a9708dcea 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -22,6 +22,7 @@ #include #include "common/status.h" +#include "olap/iterators.h" #include "olap/tablet_fwd.h" #include "olap/tablet_meta.h" #include "util/metrics.h" @@ -100,6 +101,10 @@ class BaseTablet { IntCounter* flush_bytes = nullptr; IntCounter* flush_finish_count = nullptr; std::atomic published_count = 0; + + std::mutex sample_info_lock; + std::vector sample_infos; + Status last_compaction_status = Status::OK(); }; } /* namespace doris */ diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 677681f712f3ff..5a6dcc363b881e 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -316,6 +316,15 @@ bool Compaction::handle_ordered_data_compaction() { return st.ok(); } +int64_t Compaction::merge_way_num() { + int64_t way_num = 0; + for (auto&& rowset : _input_rowsets) { + way_num += rowset->rowset_meta()->get_merge_way_num(); + } + + return way_num; +} + Status Compaction::do_compaction_impl(int64_t permits) { OlapStopWatch watch; @@ -363,6 +372,7 @@ Status Compaction::do_compaction_impl(int64_t permits) { _tablet->enable_unique_key_merge_on_write())) { stats.rowid_conversion = &_rowid_conversion; } + int64_t way_num = merge_way_num(); Status res; { @@ -370,13 +380,15 @@ Status Compaction::do_compaction_impl(int64_t permits) { if (vertical_compaction) { res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, _input_rs_readers, _output_rs_writer.get(), - get_avg_segment_rows(), &stats); + get_avg_segment_rows(), way_num, &stats); } else { res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, _input_rs_readers, _output_rs_writer.get(), &stats); } } + _tablet->last_compaction_status = res; + if (!res.ok()) { LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res << ", tablet=" << _tablet->tablet_id() diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 5b1580f209defb..9c279f1ec5753f 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -105,8 +105,10 @@ class Compaction { private: bool _check_if_includes_input_rowsets(const RowsetIdUnorderedSet& commit_rowset_ids_set) const; void _load_segment_to_cache(); + int64_t merge_way_num(); protected: + // the root tracker for this compaction std::shared_ptr _mem_tracker; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 42748012cabfc6..9cfc6557adf042 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -116,9 +116,18 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { << ", tablet=" << _tablet->tablet_id(); } + int64_t max_score = config::cumulative_compaction_max_deltas; + auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); + bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8; + if (_tablet->last_compaction_status.is() || memory_usage_high) { + max_score = std::max(config::cumulative_compaction_max_deltas / + config::cumulative_compaction_max_deltas_factor, + config::cumulative_compaction_min_deltas + 1); + } + size_t compaction_score = 0; _tablet->cumulative_compaction_policy()->pick_input_rowsets( - _tablet.get(), candidate_rowsets, config::cumulative_compaction_max_deltas, + _tablet.get(), candidate_rowsets, max_score, config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version, &compaction_score, allow_delete_in_cumu_compaction()); diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index deb14ff554f658..5d752a2bf735a6 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include "common/status.h" @@ -122,6 +123,12 @@ class StorageReadOptions { size_t topn_limit = 0; }; +struct CompactionSampleInfo { + int64_t bytes = 0; + int64_t rows = 0; + int64_t group_data_size; +}; + class RowwiseIterator; using RowwiseIteratorUPtr = std::unique_ptr; class RowwiseIterator { @@ -134,7 +141,13 @@ class RowwiseIterator { // Input options may contain scan range in which this scan. // Return Status::OK() if init successfully, // Return other error otherwise - virtual Status init(const StorageReadOptions& opts) = 0; + virtual Status init(const StorageReadOptions& opts) { + return Status::NotSupported("to be implemented"); + } + + virtual Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) { + return Status::NotSupported("to be implemented"); + } // If there is any valid data, this function will load data // into input batch with Status::OK() returned diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index b73c5bda645563..37f1c2116d2f5f 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -33,6 +34,8 @@ #include "common/config.h" #include "common/logging.h" +#include "common/status.h" +#include "olap/iterators.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/rowid_conversion.h" @@ -42,6 +45,7 @@ #include "olap/rowset/segment_v2/segment_writer.h" #include "olap/storage_engine.h" #include "olap/tablet.h" +#include "olap/tablet_fwd.h" #include "olap/tablet_reader.h" #include "olap/utils.h" #include "util/slice.h" @@ -212,7 +216,8 @@ Status Merger::vertical_compact_one_group( const std::vector& column_group, vectorized::RowSourcesBuffer* row_source_buf, const std::vector& src_rowset_readers, RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output, - std::vector key_group_cluster_key_idxes) { + std::vector key_group_cluster_key_idxes, int64_t batch_size, + CompactionSampleInfo* sample_info) { // build tablet reader VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment; vectorized::VerticalBlockReader reader(row_source_buf); @@ -250,7 +255,8 @@ Status Merger::vertical_compact_one_group( reader_params.return_columns = column_group; reader_params.origin_return_columns = &reader_params.return_columns; - RETURN_IF_ERROR(reader.init(reader_params)); + reader_params.batch_size = batch_size; + RETURN_IF_ERROR(reader.init(reader_params, sample_info)); if (reader_params.record_rowids) { stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id()); @@ -356,6 +362,55 @@ Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType rea return Status::OK(); } +int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt) { + std::unique_lock lock(tablet->sample_info_lock); + CompactionSampleInfo info = tablet->sample_infos[group_index]; + if (way_cnt <= 0) { + LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " + << tablet->tablet_id() << " way cnt: " << way_cnt; + return 4096 - 32; + } + int64_t block_mem_limit = config::compaction_memory_bytes_limit / way_cnt; + if (tablet->last_compaction_status.is()) { + block_mem_limit /= 4; + } + + int64_t group_data_size = 0; + if (info.group_data_size > 0 && info.bytes > 0 && info.rows > 0) { + float smoothing_factor = 0.5; + group_data_size = int64_t(info.group_data_size * (1 - smoothing_factor) + + info.bytes / info.rows * smoothing_factor); + tablet->sample_infos[group_index].group_data_size = group_data_size; + } else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <= 0)) { + group_data_size = info.group_data_size; + } else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) { + group_data_size = info.bytes / info.rows; + tablet->sample_infos[group_index].group_data_size = group_data_size; + } else { + LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " + << tablet->tablet_id() << " group data size: " << info.group_data_size + << " row num: " << info.rows << " consume bytes: " << info.bytes; + return 1024 - 32; + } + + if (group_data_size <= 0) { + LOG(WARNING) << "estimate batch size for vertical compaction, tablet id: " + << tablet->tablet_id() << " unexpected group data size: " << group_data_size; + return 4096 - 32; + } + + tablet->sample_infos[group_index].bytes = 0; + tablet->sample_infos[group_index].rows = 0; + + int64_t batch_size = block_mem_limit / group_data_size; + int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), 32L); + LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " << tablet->tablet_id() + << " group data size: " << info.group_data_size << " row num: " << info.rows + << " consume bytes: " << info.bytes << " way cnt: " << way_cnt + << " batch size: " << res; + return res; +} + // steps to do vertical merge: // 1. split columns into column groups // 2. compact groups one by one, generate a row_source_buf when compact key group @@ -365,7 +420,7 @@ Status Merger::vertical_merge_rowsets(TabletSharedPtr tablet, ReaderType reader_ TabletSchemaSPtr tablet_schema, const std::vector& src_rowset_readers, RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, - Statistics* stats_output) { + int64_t merge_way_num, Statistics* stats_output) { LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id(); std::vector> column_groups; vertical_split_columns(tablet_schema, &column_groups); @@ -376,14 +431,18 @@ Status Merger::vertical_merge_rowsets(TabletSharedPtr tablet, ReaderType reader_ vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(), reader_type); + tablet->sample_infos.resize(column_groups.size(), {0, 0, 0}); // compact group one by one for (auto i = 0; i < column_groups.size(); ++i) { VLOG_NOTICE << "row source size: " << row_sources_buf.total_size(); bool is_key = (i == 0); + int64_t batch_size = config::compaction_batch_size != -1 + ? config::compaction_batch_size + : estimate_batch_size(i, tablet, merge_way_num); RETURN_IF_ERROR(vertical_compact_one_group( tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf, src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output, - key_group_cluster_key_idxes)); + key_group_cluster_key_idxes, batch_size, &(tablet->sample_infos[i]))); if (is_key) { RETURN_IF_ERROR(row_sources_buf.flush()); } diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h index ab948f55ed9e61..e2a6d814473f77 100644 --- a/be/src/olap/merger.h +++ b/be/src/olap/merger.h @@ -26,6 +26,7 @@ #include "olap/rowset/rowset_reader.h" #include "olap/tablet.h" #include "olap/tablet_schema.h" +#include "olap/iterators.h" namespace doris { class KeyBoundsPB; @@ -62,7 +63,7 @@ class Merger { static Status vertical_merge_rowsets( TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr tablet_schema, const std::vector& src_rowset_readers, - RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, + RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, int64_t merge_way_num, Statistics* stats_output); public: @@ -75,7 +76,8 @@ class Merger { vectorized::RowSourcesBuffer* row_source_buf, const std::vector& src_rowset_readers, RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output, - std::vector key_group_cluster_key_idxes); + std::vector key_group_cluster_key_idxes, int64_t batch_size, + CompactionSampleInfo* sample_info); // for segcompaction static Status vertical_compact_one_group(TabletSharedPtr tablet, ReaderType reader_type, diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 30457d30bc65da..d7a82b3d8a6fc0 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -256,6 +256,21 @@ class RowsetMeta { return score; } + uint32_t get_merge_way_num() const { + uint32_t way_num = 0; + if (!is_segments_overlapping()) { + if (num_segments() == 0) { + way_num = 0; + } else { + way_num = 1; + } + } else { + way_num = num_segments(); + CHECK(way_num > 0); + } + return way_num; + } + void get_segments_key_bounds(std::vector* segments_key_bounds) const { for (const KeyBoundsPB& key_range : _rowset_meta_pb.segments_key_bounds()) { segments_key_bounds->push_back(key_range); diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index 8fee04ccb80e43..9f7f0ec91f4338 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -102,7 +102,7 @@ Status SegcompactionWorker::_get_segcompaction_reader( reader_params.tablet = tablet; reader_params.return_columns = return_columns; reader_params.is_key_column_group = is_key; - return (*reader)->init(reader_params); + return (*reader)->init(reader_params, nullptr); } std::unique_ptr SegcompactionWorker::_create_segcompaction_writer( diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h index 3bf83ec296c04b..942c61f8207727 100644 --- a/be/src/olap/tablet_reader.h +++ b/be/src/olap/tablet_reader.h @@ -184,6 +184,8 @@ class TabletReader { void check_validation() const; std::string to_string() const; + + int64_t batch_size = -1; }; TabletReader() = default; diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index c472e678abdafc..58a2332d5a8d5d 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -24,6 +24,8 @@ #include #include +#include "olap/compaction.h" +#include "olap/iterators.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/rowset/rowset.h" @@ -107,7 +109,8 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para return Status::OK(); } -Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params) { +Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params, + CompactionSampleInfo* sample_info) { std::vector iterator_init_flag; std::vector rowset_ids; std::vector* segment_iters_ptr = read_params.segment_iters_ptr; @@ -156,7 +159,10 @@ Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params) // init collect iterator StorageReadOptions opts; opts.record_rowids = read_params.record_rowids; - RETURN_IF_ERROR(_vcollect_iter->init(opts)); + if (read_params.batch_size > 0) { + opts.block_row_max = read_params.batch_size; + } + RETURN_IF_ERROR(_vcollect_iter->init(opts, sample_info)); // In agg keys value columns compact, get first row for _init_agg_state if (!read_params.is_key_column_group && read_params.tablet->keys_type() == KeysType::AGG_KEYS) { @@ -203,11 +209,20 @@ void VerticalBlockReader::_init_agg_state(const ReaderParams& read_params) { } Status VerticalBlockReader::init(const ReaderParams& read_params) { + return init(read_params, nullptr); +} + +Status VerticalBlockReader::init(const ReaderParams& read_params, + CompactionSampleInfo* sample_info) { StorageReadOptions opts; - _reader_context.batch_size = opts.block_row_max; + if (read_params.batch_size > 0) { + _reader_context.batch_size = read_params.batch_size; + } else { + _reader_context.batch_size = opts.block_row_max; + } RETURN_IF_ERROR(TabletReader::init(read_params)); - auto status = _init_collect_iter(read_params); + auto status = _init_collect_iter(read_params, sample_info); if (!status.ok()) [[unlikely]] { if constexpr (std::is_same_v) { static_cast(_tablet.get())->report_error(status); diff --git a/be/src/vec/olap/vertical_block_reader.h b/be/src/vec/olap/vertical_block_reader.h index 77a01587b5873a..2043db4b00a590 100644 --- a/be/src/vec/olap/vertical_block_reader.h +++ b/be/src/vec/olap/vertical_block_reader.h @@ -56,6 +56,7 @@ class VerticalBlockReader final : public TabletReader { // Initialize VerticalBlockReader with tablet, data version and fetch range. Status init(const ReaderParams& read_params) override; + Status init(const ReaderParams& read_params, CompactionSampleInfo* sample_info); Status next_block_with_aggregation(Block* block, bool* eof) override; @@ -79,7 +80,7 @@ class VerticalBlockReader final : public TabletReader { // to minimize the comparison time in merge heap. Status _unique_key_next_block(Block* block, bool* eof); - Status _init_collect_iter(const ReaderParams& read_params); + Status _init_collect_iter(const ReaderParams& read_params, CompactionSampleInfo* sample_info); Status _get_segment_iterators(const ReaderParams& read_params, std::vector* segment_iters, diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp b/be/src/vec/olap/vertical_merge_iterator.cpp index 49916048b5cac9..95bf9d41c79779 100644 --- a/be/src/vec/olap/vertical_merge_iterator.cpp +++ b/be/src/vec/olap/vertical_merge_iterator.cpp @@ -21,12 +21,14 @@ #include #include +#include #include #include "common/compiler_util.h" // IWYU pragma: keep #include "common/config.h" #include "common/logging.h" #include "olap/field.h" +#include "olap/iterators.h" #include "olap/olap_common.h" #include "vec/columns/column.h" #include "vec/common/string_ref.h" @@ -327,13 +329,18 @@ Status VerticalMergeIteratorContext::copy_rows(Block* block, bool advanced) { return Status::OK(); } -Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts) { +Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts, + CompactionSampleInfo* sample_info) { if (LIKELY(_inited)) { return Status::OK(); } _block_row_max = opts.block_row_max; _record_rowids = opts.record_rowids; RETURN_IF_ERROR(_load_next_block()); + if (sample_info != nullptr) { + sample_info->bytes += bytes(); + sample_info->rows += rows(); + } if (valid()) { RETURN_IF_ERROR(advance()); } @@ -492,7 +499,8 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) { return Status::EndOfFile("no more data in segment"); } -Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) { +Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts, + CompactionSampleInfo* sample_info) { DCHECK(_origin_iters.size() == _iterator_init_flags.size()); _record_rowids = opts.record_rowids; if (_origin_iters.empty()) { @@ -520,7 +528,7 @@ Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) { for (size_t i = 0; i < num_iters; ++i) { if (_iterator_init_flags[i] || pre_iter_invalid) { auto& ctx = _ori_iter_ctx[i]; - RETURN_IF_ERROR(ctx->init(opts)); + RETURN_IF_ERROR(ctx->init(opts, sample_info)); if (!ctx->valid()) { pre_iter_invalid = true; continue; @@ -593,7 +601,8 @@ Status VerticalFifoMergeIterator::next_batch(Block* block) { return Status::EndOfFile("no more data in segment"); } -Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts) { +Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts, + CompactionSampleInfo* sample_info) { DCHECK(_origin_iters.size() == _iterator_init_flags.size()); DCHECK(_keys_type == KeysType::DUP_KEYS); _record_rowids = opts.record_rowids; @@ -613,7 +622,7 @@ Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts) { std::unique_ptr ctx( new VerticalMergeIteratorContext(std::move(iter), _rowset_ids[seg_order], _ori_return_cols, seg_order, _seq_col_idx)); - RETURN_IF_ERROR(ctx->init(opts)); + RETURN_IF_ERROR(ctx->init(opts, sample_info)); if (!ctx->valid()) { ++seg_order; continue; @@ -654,7 +663,7 @@ Status VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) { uint16_t order = row_source.get_source_num(); auto& ctx = _origin_iter_ctx[order]; // init ctx and this ctx must be valid - RETURN_IF_ERROR(ctx->init(_opts)); + RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); DCHECK(ctx->valid()); if (UNLIKELY(ctx->is_first_row())) { @@ -688,7 +697,7 @@ Status VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef auto row_source = _row_sources_buf->current(); uint16_t order = row_source.get_source_num(); auto& ctx = _origin_iter_ctx[order]; - RETURN_IF_ERROR(ctx->init(_opts)); + RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); DCHECK(ctx->valid()); if (!ctx->valid()) { LOG(INFO) << "VerticalMergeIteratorContext not valid"; @@ -727,7 +736,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) { uint16_t order = _row_sources_buf->current().get_source_num(); DCHECK(order < _origin_iter_ctx.size()); auto& ctx = _origin_iter_ctx[order]; - RETURN_IF_ERROR(ctx->init(_opts)); + RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); DCHECK(ctx->valid()); if (!ctx->valid()) { LOG(INFO) << "VerticalMergeIteratorContext not valid"; @@ -750,7 +759,8 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) { return st; } -Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) { +Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts, + CompactionSampleInfo* sample_info) { if (_origin_iters.empty()) { return Status::OK(); } @@ -765,6 +775,7 @@ Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) { } _origin_iters.clear(); + _sample_info = sample_info; _block_row_max = opts.block_row_max; return Status::OK(); } diff --git a/be/src/vec/olap/vertical_merge_iterator.h b/be/src/vec/olap/vertical_merge_iterator.h index f46a0446cf25a0..3751aa92c78b15 100644 --- a/be/src/vec/olap/vertical_merge_iterator.h +++ b/be/src/vec/olap/vertical_merge_iterator.h @@ -164,7 +164,7 @@ class VerticalMergeIteratorContext { ~VerticalMergeIteratorContext() = default; Status block_reset(const std::shared_ptr& block); - Status init(const StorageReadOptions& opts); + Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info = nullptr); bool compare(const VerticalMergeIteratorContext& rhs) const; Status copy_rows(Block* block, bool advanced = true); Status copy_rows(Block* block, size_t count); @@ -200,6 +200,22 @@ class VerticalMergeIteratorContext { return _block_row_locations[_index_in_block]; } + size_t bytes() { + if (_block) { + return _block->bytes(); + } else { + return 0; + } + } + + size_t rows() { + if (_block) { + return _block->rows(); + } else { + return 0; + } + } + private: // Load next block into _block Status _load_next_block(); @@ -255,7 +271,7 @@ class VerticalHeapMergeIterator : public RowwiseIterator { VerticalHeapMergeIterator(const VerticalHeapMergeIterator&) = delete; VerticalHeapMergeIterator& operator=(const VerticalHeapMergeIterator&) = delete; - Status init(const StorageReadOptions& opts) override; + Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) override; Status next_batch(Block* block) override; const Schema& schema() const override { return *_schema; } uint64_t merged_rows() const override { return _merged_rows; } @@ -321,7 +337,7 @@ class VerticalFifoMergeIterator : public RowwiseIterator { VerticalFifoMergeIterator(const VerticalFifoMergeIterator&) = delete; VerticalFifoMergeIterator& operator=(const VerticalFifoMergeIterator&) = delete; - Status init(const StorageReadOptions& opts) override; + Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) override; Status next_batch(Block* block) override; const Schema& schema() const override { return *_schema; } uint64_t merged_rows() const override { return _merged_rows; } @@ -367,7 +383,7 @@ class VerticalMaskMergeIterator : public RowwiseIterator { VerticalMaskMergeIterator(const VerticalMaskMergeIterator&) = delete; VerticalMaskMergeIterator& operator=(const VerticalMaskMergeIterator&) = delete; - Status init(const StorageReadOptions& opts) override; + Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) override; Status next_batch(Block* block) override; @@ -396,6 +412,7 @@ class VerticalMaskMergeIterator : public RowwiseIterator { size_t _filtered_rows = 0; RowSourcesBuffer* _row_sources_buf; StorageReadOptions _opts; + CompactionSampleInfo* _sample_info = nullptr; }; // segment merge iterator diff --git a/be/test/olap/base_compaction_test.cpp b/be/test/olap/base_compaction_test.cpp new file mode 100644 index 00000000000000..7d9abe54ed2163 --- /dev/null +++ b/be/test/olap/base_compaction_test.cpp @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/base_compaction.h" + +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "gtest/gtest_pred_impl.h" +#include "olap/cumulative_compaction.h" +#include "olap/cumulative_compaction_policy.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/rowset_meta.h" +#include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "olap/tablet_meta.h" +#include "util/uid_util.h" + +namespace doris { + +class TestBaseCompaction : public testing::Test {}; + +static RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping, + int data_size) { + auto rs_meta = std::make_shared(); + rs_meta->set_rowset_type(BETA_ROWSET); // important + rs_meta->_rowset_meta_pb.set_start_version(version.first); + rs_meta->_rowset_meta_pb.set_end_version(version.second); + rs_meta->set_num_segments(num_segments); + rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING); + rs_meta->set_total_disk_size(data_size); + RowsetSharedPtr rowset; + Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta), &rowset); + if (!st.ok()) { + return nullptr; + } + return rowset; +} + +TEST_F(TestBaseCompaction, filter_input_rowset) { + StorageEngine engine({}); + TabletMetaSharedPtr tablet_meta; + tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, + TCompressionType::LZ4F)); + TabletSharedPtr tablet(new Tablet(engine, tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY)); + tablet->_cumulative_point = 25; + BaseCompaction compaction(engine, tablet); + //std::vector rowsets; + + RowsetSharedPtr init_rs = create_rowset({0, 1}, 1, false, 0); + tablet->_rs_version_map.emplace(init_rs->version(), init_rs); + for (int i = 2; i < 30; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + tablet->_rs_version_map.emplace(rs->version(), rs); + } + Status st = compaction.pick_rowsets_to_compact(); + EXPECT_TRUE(st.ok()); + EXPECT_EQ(compaction._input_rowsets.front()->start_version(), 0); + EXPECT_EQ(compaction._input_rowsets.front()->end_version(), 1); + + EXPECT_EQ(compaction._input_rowsets.back()->start_version(), 21); + EXPECT_EQ(compaction._input_rowsets.back()->end_version(), 21); +} + +} // namespace doris diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index d28e9f7dfe93c4..2cacd1361bd467 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -355,7 +355,7 @@ class TestRowIdConversion : public testing::TestWithParam>>>>>> 6969ad0596 ([enhancement](compaction) optimizing memory usage for compaction (#37099)) ASSERT_TRUE(s.ok()) << s; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -597,8 +603,14 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) { Merger::Statistics stats; RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; +<<<<<<< HEAD s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, input_rs_readers, output_rs_writer.get(), 100, &stats); +======= + auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, + *tablet_schema, input_rs_readers, + output_rs_writer.get(), 100, num_segments, &stats); +>>>>>>> 6969ad0596 ([enhancement](compaction) optimizing memory usage for compaction (#37099)) ASSERT_TRUE(s.ok()) << s; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -705,7 +717,7 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) { RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, - input_rs_readers, output_rs_writer.get(), 10000, &stats); + input_rs_readers, output_rs_writer.get(), 10000, num_segments, &stats); EXPECT_TRUE(s.ok()); RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -813,8 +825,14 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) { Merger::Statistics stats; RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; +<<<<<<< HEAD st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, input_rs_readers, output_rs_writer.get(), 100, &stats); +======= + st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, + input_rs_readers, output_rs_writer.get(), 100, num_segments, + &stats); +>>>>>>> 6969ad0596 ([enhancement](compaction) optimizing memory usage for compaction (#37099)) ASSERT_TRUE(st.ok()) << st; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -915,8 +933,14 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) { Merger::Statistics stats; RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; +<<<<<<< HEAD st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, input_rs_readers, output_rs_writer.get(), 100, &stats); +======= + st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, + input_rs_readers, output_rs_writer.get(), 100, num_segments, + &stats); +>>>>>>> 6969ad0596 ([enhancement](compaction) optimizing memory usage for compaction (#37099)) ASSERT_TRUE(st.ok()) << st; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -1007,8 +1031,14 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) { Merger::Statistics stats; RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; +<<<<<<< HEAD s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, input_rs_readers, output_rs_writer.get(), 100, &stats); +======= + auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, + *tablet_schema, input_rs_readers, + output_rs_writer.get(), 100, num_segments, &stats); +>>>>>>> 6969ad0596 ([enhancement](compaction) optimizing memory usage for compaction (#37099)) EXPECT_TRUE(s.ok()); RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); diff --git a/regression-test/suites/compaction/compaction_width_array_column.groovy b/regression-test/suites/compaction/compaction_width_array_column.groovy new file mode 100644 index 00000000000000..4e3fed354c7d84 --- /dev/null +++ b/regression-test/suites/compaction/compaction_width_array_column.groovy @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('compaction_width_array_column', "p2") { + String backend_id; + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + def s3BucketName = getS3BucketName() + def random = new Random(); + + def s3WithProperties = """WITH S3 ( + |"AWS_ACCESS_KEY" = "${getS3AK()}", + |"AWS_SECRET_KEY" = "${getS3SK()}", + |"AWS_ENDPOINT" = "${getS3Endpoint()}", + |"AWS_REGION" = "${getS3Region()}") + |PROPERTIES( + |"exec_mem_limit" = "8589934592", + |"load_parallelism" = "3")""".stripMargin() + + // set fe configuration + sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" + + def tableName = "column_witdh_array" + + def table_create_task = { table_name -> + // drop table if exists + sql """drop table if exists ${table_name}""" + // create table + def create_table = new File("""${context.file.parent}/ddl/${table_name}.sql""").text + create_table = create_table.replaceAll("\\\$\\{table\\_name\\}", table_name) + sql create_table + } + + def table_load_task = { table_name -> + uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() + loadLabel = table_name + "_" + uniqueID + //loadLabel = table_name + '_load_5' + loadSql = new File("""${context.file.parent}/ddl/${table_name}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) + loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + loadSql = loadSql.replaceAll("\\\$\\{table\\_name\\}", table_name) + nowloadSql = loadSql + s3WithProperties + try_sql nowloadSql + + while (true) { + def stateResult = sql "show load where Label = '${loadLabel}'" + logger.info("load result is ${stateResult}") + def loadState = stateResult[stateResult.size() - 1][2].toString() + if ("CANCELLED".equalsIgnoreCase(loadState)) { + throw new IllegalStateException("load ${loadLabel} failed.") + } else if ("FINISHED".equalsIgnoreCase(loadState)) { + break + } + sleep(5000) + } + } + + table_create_task(tableName) + table_load_task(tableName) + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + boolean isOverLap = true + int tryCnt = 0; + while (isOverLap && tryCnt < 3) { + isOverLap = false + + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + assertEquals("success", compactJson.status.toLowerCase()) + } + + // wait for all compactions done + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + for (String rowset in (List) tabletJson.rowsets) { + logger.info("rowset info" + rowset) + String overLappingStr = rowset.split(" ")[3] + if (overLappingStr == "OVERLAPPING") { + isOverLap = true; + } + logger.info("is over lap " + isOverLap + " " + overLappingStr) + } + } + tryCnt++; + } + + assertFalse(isOverLap); +} From 3acd8c36eb146fb09ed9c0db2b36bfe3257652bc Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Mon, 8 Jul 2024 19:41:32 +0800 Subject: [PATCH 2/4] fix --- be/test/vec/olap/vertical_compaction_test.cpp | 40 +++---------------- 1 file changed, 5 insertions(+), 35 deletions(-) diff --git a/be/test/vec/olap/vertical_compaction_test.cpp b/be/test/vec/olap/vertical_compaction_test.cpp index 4747ea9ca4ed00..4a175f772c1297 100644 --- a/be/test/vec/olap/vertical_compaction_test.cpp +++ b/be/test/vec/olap/vertical_compaction_test.cpp @@ -490,14 +490,8 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) { Merger::Statistics stats; RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; -<<<<<<< HEAD s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, &stats); -======= - auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, - *tablet_schema, input_rs_readers, - output_rs_writer.get(), 100, num_segments, &stats); ->>>>>>> 6969ad0596 ([enhancement](compaction) optimizing memory usage for compaction (#37099)) + input_rs_readers, output_rs_writer.get(), 100, num_segments, &stats); ASSERT_TRUE(s.ok()) << s; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -603,14 +597,8 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) { Merger::Statistics stats; RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; -<<<<<<< HEAD s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, &stats); -======= - auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, - *tablet_schema, input_rs_readers, - output_rs_writer.get(), 100, num_segments, &stats); ->>>>>>> 6969ad0596 ([enhancement](compaction) optimizing memory usage for compaction (#37099)) + input_rs_readers, output_rs_writer.get(), 100, num_segments, &stats); ASSERT_TRUE(s.ok()) << s; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -825,14 +813,8 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) { Merger::Statistics stats; RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; -<<<<<<< HEAD st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, &stats); -======= - st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, num_segments, - &stats); ->>>>>>> 6969ad0596 ([enhancement](compaction) optimizing memory usage for compaction (#37099)) + input_rs_readers, output_rs_writer.get(), 100, num_segments, &stats); ASSERT_TRUE(st.ok()) << st; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -933,14 +915,8 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) { Merger::Statistics stats; RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; -<<<<<<< HEAD st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, &stats); -======= - st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, num_segments, - &stats); ->>>>>>> 6969ad0596 ([enhancement](compaction) optimizing memory usage for compaction (#37099)) + input_rs_readers, output_rs_writer.get(), 100, num_segments, &stats); ASSERT_TRUE(st.ok()) << st; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -1031,14 +1007,8 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) { Merger::Statistics stats; RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; -<<<<<<< HEAD s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, &stats); -======= - auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, - *tablet_schema, input_rs_readers, - output_rs_writer.get(), 100, num_segments, &stats); ->>>>>>> 6969ad0596 ([enhancement](compaction) optimizing memory usage for compaction (#37099)) + input_rs_readers, output_rs_writer.get(), 100, num_segments, &stats); EXPECT_TRUE(s.ok()); RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); From 4e3290f7d896273e93b50dc205428124111570cb Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Mon, 8 Jul 2024 19:53:10 +0800 Subject: [PATCH 3/4] fix --- be/src/olap/compaction.h | 1 - be/src/olap/cumulative_compaction.cpp | 6 +++--- be/src/olap/merger.h | 2 +- be/test/olap/rowid_conversion_test.cpp | 6 +++--- be/test/vec/olap/vertical_compaction_test.cpp | 18 ++++++++++++------ 5 files changed, 19 insertions(+), 14 deletions(-) diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 9c279f1ec5753f..5aa3e260194319 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -108,7 +108,6 @@ class Compaction { int64_t merge_way_num(); protected: - // the root tracker for this compaction std::shared_ptr _mem_tracker; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 9cfc6557adf042..f461de3a5e9b9c 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -127,9 +127,9 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { size_t compaction_score = 0; _tablet->cumulative_compaction_policy()->pick_input_rowsets( - _tablet.get(), candidate_rowsets, max_score, - config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version, - &compaction_score, allow_delete_in_cumu_compaction()); + _tablet.get(), candidate_rowsets, max_score, config::cumulative_compaction_min_deltas, + &_input_rowsets, &_last_delete_version, &compaction_score, + allow_delete_in_cumu_compaction()); // Cumulative compaction will process with at least 1 rowset. // So when there is no rowset being chosen, we should return Status::Error(): diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h index e2a6d814473f77..49ca1e5227fe6e 100644 --- a/be/src/olap/merger.h +++ b/be/src/olap/merger.h @@ -23,10 +23,10 @@ #include "common/status.h" #include "io/io_common.h" +#include "olap/iterators.h" #include "olap/rowset/rowset_reader.h" #include "olap/tablet.h" #include "olap/tablet_schema.h" -#include "olap/iterators.h" namespace doris { class KeyBoundsPB; diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index 2cacd1361bd467..658b104493f605 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -353,9 +353,9 @@ class TestRowIdConversion : public testing::TestWithParambuild(out_rowset)); @@ -598,7 +599,8 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) { RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, num_segments, &stats); + input_rs_readers, output_rs_writer.get(), 100, num_segments, + &stats); ASSERT_TRUE(s.ok()) << s; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -705,7 +707,8 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) { RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, - input_rs_readers, output_rs_writer.get(), 10000, num_segments, &stats); + input_rs_readers, output_rs_writer.get(), 10000, + num_segments, &stats); EXPECT_TRUE(s.ok()); RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -814,7 +817,8 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) { RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, num_segments, &stats); + input_rs_readers, output_rs_writer.get(), 100, num_segments, + &stats); ASSERT_TRUE(st.ok()) << st; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -916,7 +920,8 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) { RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, num_segments, &stats); + input_rs_readers, output_rs_writer.get(), 100, num_segments, + &stats); ASSERT_TRUE(st.ok()) << st; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -1008,7 +1013,8 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) { RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, num_segments, &stats); + input_rs_readers, output_rs_writer.get(), 100, num_segments, + &stats); EXPECT_TRUE(s.ok()); RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); From 7d8fff8c5fdffbcf308bfeb77c5b0c38de47a13d Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Tue, 9 Jul 2024 20:18:22 +0800 Subject: [PATCH 4/4] fix be ut --- be/test/olap/base_compaction_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/test/olap/base_compaction_test.cpp b/be/test/olap/base_compaction_test.cpp index 7d9abe54ed2163..ff53e842787665 100644 --- a/be/test/olap/base_compaction_test.cpp +++ b/be/test/olap/base_compaction_test.cpp @@ -63,7 +63,7 @@ TEST_F(TestBaseCompaction, filter_input_rowset) { TCompressionType::LZ4F)); TabletSharedPtr tablet(new Tablet(engine, tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY)); tablet->_cumulative_point = 25; - BaseCompaction compaction(engine, tablet); + BaseCompaction compaction(tablet); //std::vector rowsets; RowsetSharedPtr init_rs = create_rowset({0, 1}, 1, false, 0);