Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](compaction) optimizing memory usage for compaction (#37099) #37486

Merged
merged 5 commits into from
Aug 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ DEFINE_mInt32(max_single_replica_compaction_threads, "-1");

DEFINE_Bool(enable_base_compaction_idle_sched, "true");
DEFINE_mInt64(base_compaction_min_rowset_num, "5");
DEFINE_mInt64(base_compaction_max_compaction_score, "20");
DEFINE_mDouble(base_compaction_min_data_ratio, "0.3");
DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");

Expand Down Expand Up @@ -405,6 +406,7 @@ DEFINE_mInt64(compaction_min_size_mbytes, "64");
// cumulative compaction policy: min and max delta file's number
DEFINE_mInt64(cumulative_compaction_min_deltas, "5");
DEFINE_mInt64(cumulative_compaction_max_deltas, "1000");
DEFINE_mInt32(cumulative_compaction_max_deltas_factor, "10");

// This config can be set to limit thread number in multiget thread pool.
DEFINE_mInt32(multi_get_max_threads, "10");
Expand Down Expand Up @@ -1250,6 +1252,10 @@ DEFINE_Int64(min_row_group_size, "134217728");
// The time out milliseconds for remote fetch schema RPC, default 60s
DEFINE_mInt64(fetch_remote_schema_rpc_timeout_ms, "60000");

DEFINE_mInt64(compaction_memory_bytes_limit, "1073741824");

DEFINE_mInt64(compaction_batch_size, "-1");

// If set to false, the parquet reader will not use page index to filter data.
// This is only for debug purpose, in case sometimes the page index
// filter wrong data.
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ DECLARE_mInt32(max_single_replica_compaction_threads);

DECLARE_Bool(enable_base_compaction_idle_sched);
DECLARE_mInt64(base_compaction_min_rowset_num);
DECLARE_mInt64(base_compaction_max_compaction_score);
DECLARE_mDouble(base_compaction_min_data_ratio);
DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes);

Expand Down Expand Up @@ -456,6 +457,7 @@ DECLARE_mInt64(compaction_min_size_mbytes);
// cumulative compaction policy: min and max delta file's number
DECLARE_mInt64(cumulative_compaction_min_deltas);
DECLARE_mInt64(cumulative_compaction_max_deltas);
DECLARE_mInt32(cumulative_compaction_max_deltas_factor);

// This config can be set to limit thread number in multiget thread pool.
DECLARE_mInt32(multi_get_max_threads);
Expand Down Expand Up @@ -1333,6 +1335,10 @@ DECLARE_mInt64(fetch_remote_schema_rpc_timeout_ms);
// The minimum row group size when exporting Parquet files.
DECLARE_Int64(min_row_group_size);

DECLARE_mInt64(compaction_memory_bytes_limit);

DECLARE_mInt64(compaction_batch_size);

DECLARE_mBool(enable_parquet_page_index);

#ifdef BE_TEST
Expand Down
10 changes: 10 additions & 0 deletions be/src/olap/base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ Status BaseCompaction::pick_rowsets_to_compact() {
"situation, no need to do base compaction.");
}

int score = 0;
int rowset_cnt = 0;
while (rowset_cnt < _input_rowsets.size()) {
score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score();
if (score > config::base_compaction_max_compaction_score) {
break;
}
}
_input_rowsets.resize(rowset_cnt);

// 1. cumulative rowset must reach base_compaction_num_cumulative_deltas threshold
if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id()
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>

#include "common/status.h"
#include "olap/iterators.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_meta.h"
#include "util/metrics.h"
Expand Down Expand Up @@ -100,6 +101,10 @@ class BaseTablet {
IntCounter* flush_bytes = nullptr;
IntCounter* flush_finish_count = nullptr;
std::atomic<int64_t> published_count = 0;

std::mutex sample_info_lock;
std::vector<CompactionSampleInfo> sample_infos;
Status last_compaction_status = Status::OK();
};

} /* namespace doris */
14 changes: 13 additions & 1 deletion be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,15 @@ bool Compaction::handle_ordered_data_compaction() {
return st.ok();
}

int64_t Compaction::merge_way_num() {
int64_t way_num = 0;
for (auto&& rowset : _input_rowsets) {
way_num += rowset->rowset_meta()->get_merge_way_num();
}

return way_num;
}

Status Compaction::do_compaction_impl(int64_t permits) {
OlapStopWatch watch;

Expand Down Expand Up @@ -363,20 +372,23 @@ Status Compaction::do_compaction_impl(int64_t permits) {
_tablet->enable_unique_key_merge_on_write())) {
stats.rowid_conversion = &_rowid_conversion;
}
int64_t way_num = merge_way_num();

Status res;
{
SCOPED_TIMER(_merge_rowsets_latency_timer);
if (vertical_compaction) {
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(),
get_avg_segment_rows(), &stats);
get_avg_segment_rows(), way_num, &stats);
} else {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema,
_input_rs_readers, _output_rs_writer.get(), &stats);
}
}

_tablet->last_compaction_status = res;

if (!res.ok()) {
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
<< ", tablet=" << _tablet->tablet_id()
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class Compaction {
private:
bool _check_if_includes_input_rowsets(const RowsetIdUnorderedSet& commit_rowset_ids_set) const;
void _load_segment_to_cache();
int64_t merge_way_num();

protected:
// the root tracker for this compaction
Expand Down
15 changes: 12 additions & 3 deletions be/src/olap/cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,20 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
<< ", tablet=" << _tablet->tablet_id();
}

int64_t max_score = config::cumulative_compaction_max_deltas;
auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage();
bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8;
if (_tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || memory_usage_high) {
max_score = std::max(config::cumulative_compaction_max_deltas /
config::cumulative_compaction_max_deltas_factor,
config::cumulative_compaction_min_deltas + 1);
}

size_t compaction_score = 0;
_tablet->cumulative_compaction_policy()->pick_input_rowsets(
_tablet.get(), candidate_rowsets, config::cumulative_compaction_max_deltas,
config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version,
&compaction_score, allow_delete_in_cumu_compaction());
_tablet.get(), candidate_rowsets, max_score, config::cumulative_compaction_min_deltas,
&_input_rowsets, &_last_delete_version, &compaction_score,
allow_delete_in_cumu_compaction());

// Cumulative compaction will process with at least 1 rowset.
// So when there is no rowset being chosen, we should return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>():
Expand Down
15 changes: 14 additions & 1 deletion be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <cstddef>
#include <memory>

#include "common/status.h"
Expand Down Expand Up @@ -122,6 +123,12 @@ class StorageReadOptions {
size_t topn_limit = 0;
};

struct CompactionSampleInfo {
int64_t bytes = 0;
int64_t rows = 0;
int64_t group_data_size;
};

class RowwiseIterator;
using RowwiseIteratorUPtr = std::unique_ptr<RowwiseIterator>;
class RowwiseIterator {
Expand All @@ -134,7 +141,13 @@ class RowwiseIterator {
// Input options may contain scan range in which this scan.
// Return Status::OK() if init successfully,
// Return other error otherwise
virtual Status init(const StorageReadOptions& opts) = 0;
virtual Status init(const StorageReadOptions& opts) {
return Status::NotSupported("to be implemented");
}

virtual Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) {
return Status::NotSupported("to be implemented");
}

// If there is any valid data, this function will load data
// into input batch with Status::OK() returned
Expand Down
67 changes: 63 additions & 4 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <algorithm>
#include <iterator>
#include <memory>
#include <mutex>
#include <numeric>
#include <ostream>
#include <shared_mutex>
Expand All @@ -33,6 +34,8 @@

#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowid_conversion.h"
Expand All @@ -42,6 +45,7 @@
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_reader.h"
#include "olap/utils.h"
#include "util/slice.h"
Expand Down Expand Up @@ -212,7 +216,8 @@ Status Merger::vertical_compact_one_group(
const std::vector<uint32_t>& column_group, vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output,
std::vector<uint32_t> key_group_cluster_key_idxes) {
std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size,
CompactionSampleInfo* sample_info) {
// build tablet reader
VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment;
vectorized::VerticalBlockReader reader(row_source_buf);
Expand Down Expand Up @@ -250,7 +255,8 @@ Status Merger::vertical_compact_one_group(

reader_params.return_columns = column_group;
reader_params.origin_return_columns = &reader_params.return_columns;
RETURN_IF_ERROR(reader.init(reader_params));
reader_params.batch_size = batch_size;
RETURN_IF_ERROR(reader.init(reader_params, sample_info));

if (reader_params.record_rowids) {
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
Expand Down Expand Up @@ -356,6 +362,55 @@ Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType rea
return Status::OK();
}

int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt) {
std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
CompactionSampleInfo info = tablet->sample_infos[group_index];
if (way_cnt <= 0) {
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
<< tablet->tablet_id() << " way cnt: " << way_cnt;
return 4096 - 32;
}
int64_t block_mem_limit = config::compaction_memory_bytes_limit / way_cnt;
if (tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>()) {
block_mem_limit /= 4;
}

int64_t group_data_size = 0;
if (info.group_data_size > 0 && info.bytes > 0 && info.rows > 0) {
float smoothing_factor = 0.5;
group_data_size = int64_t(info.group_data_size * (1 - smoothing_factor) +
info.bytes / info.rows * smoothing_factor);
tablet->sample_infos[group_index].group_data_size = group_data_size;
} else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <= 0)) {
group_data_size = info.group_data_size;
} else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) {
group_data_size = info.bytes / info.rows;
tablet->sample_infos[group_index].group_data_size = group_data_size;
} else {
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
<< tablet->tablet_id() << " group data size: " << info.group_data_size
<< " row num: " << info.rows << " consume bytes: " << info.bytes;
return 1024 - 32;
}

if (group_data_size <= 0) {
LOG(WARNING) << "estimate batch size for vertical compaction, tablet id: "
<< tablet->tablet_id() << " unexpected group data size: " << group_data_size;
return 4096 - 32;
}

tablet->sample_infos[group_index].bytes = 0;
tablet->sample_infos[group_index].rows = 0;

int64_t batch_size = block_mem_limit / group_data_size;
int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), 32L);
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " << tablet->tablet_id()
<< " group data size: " << info.group_data_size << " row num: " << info.rows
<< " consume bytes: " << info.bytes << " way cnt: " << way_cnt
<< " batch size: " << res;
return res;
}

// steps to do vertical merge:
// 1. split columns into column groups
// 2. compact groups one by one, generate a row_source_buf when compact key group
Expand All @@ -365,7 +420,7 @@ Status Merger::vertical_merge_rowsets(TabletSharedPtr tablet, ReaderType reader_
TabletSchemaSPtr tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output) {
int64_t merge_way_num, Statistics* stats_output) {
LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id();
std::vector<std::vector<uint32_t>> column_groups;
vertical_split_columns(tablet_schema, &column_groups);
Expand All @@ -376,14 +431,18 @@ Status Merger::vertical_merge_rowsets(TabletSharedPtr tablet, ReaderType reader_

vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(), tablet->tablet_path(),
reader_type);
tablet->sample_infos.resize(column_groups.size(), {0, 0, 0});
// compact group one by one
for (auto i = 0; i < column_groups.size(); ++i) {
VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
bool is_key = (i == 0);
int64_t batch_size = config::compaction_batch_size != -1
? config::compaction_batch_size
: estimate_batch_size(i, tablet, merge_way_num);
RETURN_IF_ERROR(vertical_compact_one_group(
tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf,
src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output,
key_group_cluster_key_idxes));
key_group_cluster_key_idxes, batch_size, &(tablet->sample_infos[i])));
if (is_key) {
RETURN_IF_ERROR(row_sources_buf.flush());
}
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "common/status.h"
#include "io/io_common.h"
#include "olap/iterators.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
Expand Down Expand Up @@ -62,7 +63,7 @@ class Merger {
static Status vertical_merge_rowsets(
TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, int64_t merge_way_num,
Statistics* stats_output);

public:
Expand All @@ -75,7 +76,8 @@ class Merger {
vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output,
std::vector<uint32_t> key_group_cluster_key_idxes);
std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size,
CompactionSampleInfo* sample_info);

// for segcompaction
static Status vertical_compact_one_group(TabletSharedPtr tablet, ReaderType reader_type,
Expand Down
15 changes: 15 additions & 0 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,21 @@ class RowsetMeta {
return score;
}

uint32_t get_merge_way_num() const {
uint32_t way_num = 0;
if (!is_segments_overlapping()) {
if (num_segments() == 0) {
way_num = 0;
} else {
way_num = 1;
}
} else {
way_num = num_segments();
CHECK(way_num > 0);
}
return way_num;
}

void get_segments_key_bounds(std::vector<KeyBoundsPB>* segments_key_bounds) const {
for (const KeyBoundsPB& key_range : _rowset_meta_pb.segments_key_bounds()) {
segments_key_bounds->push_back(key_range);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
reader_params.tablet = tablet;
reader_params.return_columns = return_columns;
reader_params.is_key_column_group = is_key;
return (*reader)->init(reader_params);
return (*reader)->init(reader_params, nullptr);
}

std::unique_ptr<segment_v2::SegmentWriter> SegcompactionWorker::_create_segcompaction_writer(
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/tablet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ class TabletReader {
void check_validation() const;

std::string to_string() const;

int64_t batch_size = -1;
};

TabletReader() = default;
Expand Down
Loading
Loading