Skip to content

Commit

Permalink
[fix](load) fix memtracking orphan too large (#28600)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Dec 19, 2023
1 parent d7dd7b7 commit 9434ee5
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 1 deletion.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ DEFINE_mInt32(memtable_soft_limit_active_percent, "50");
// Alignment
DEFINE_Int32(memory_max_alignment, "16");

// memtable insert memory tracker will multiply input block size with this ratio
DEFINE_mDouble(memtable_insert_memory_ratio, "1.4");
// max write buffer size before flush, default 200MB
DEFINE_mInt64(write_buffer_size, "209715200");
// max buffer size used in memtable for the aggregated table, default 400MB
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ DECLARE_mInt32(memtable_soft_limit_active_percent);
// Alignment
DECLARE_Int32(memory_max_alignment);

// memtable insert memory tracker will multiply input block size with this ratio
DECLARE_mDouble(memtable_insert_memory_ratio);
// max write buffer size before flush, default 200MB
DECLARE_mInt64(write_buffer_size);
// max buffer size used in memtable for the aggregated table, default 400MB
Expand Down
11 changes: 10 additions & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
namespace doris {

bvar::Adder<int64_t> g_memtable_cnt("memtable_cnt");
bvar::Adder<int64_t> g_memtable_input_block_allocated_size("memtable_input_block_allocated_size");

using namespace ErrorCode;

Expand Down Expand Up @@ -137,6 +138,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
}

MemTable::~MemTable() {
g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes();
g_memtable_cnt << -1;
if (_keys_type != KeysType::DUP_KEYS) {
for (auto it = _row_in_blocks.begin(); it != _row_in_blocks.end(); it++) {
Expand Down Expand Up @@ -198,14 +200,18 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<ui

auto num_rows = row_idxs.size();
size_t cursor_in_mutableblock = _input_mutable_block.rows();
auto block_size0 = _input_mutable_block.allocated_bytes();
if (is_append) {
// Append the block, call insert range from
_input_mutable_block.add_rows(&target_block, 0, target_block.rows());
num_rows = target_block.rows();
} else {
_input_mutable_block.add_rows(&target_block, row_idxs.data(), row_idxs.data() + num_rows);
}
size_t input_size = target_block.bytes() * num_rows / target_block.rows();
auto block_size1 = _input_mutable_block.allocated_bytes();
g_memtable_input_block_allocated_size << block_size1 - block_size0;
size_t input_size = target_block.bytes() * num_rows / target_block.rows() *
config::memtable_insert_memory_ratio;
_mem_usage += input_size;
_insert_mem_tracker->consume(input_size);
for (int i = 0; i < num_rows; i++) {
Expand Down Expand Up @@ -504,6 +510,9 @@ std::unique_ptr<vectorized::Block> MemTable::to_block() {
!_tablet_schema->cluster_key_idxes().empty()) {
_sort_by_cluster_keys();
}
_input_mutable_block.clear();
_insert_mem_tracker->release(_mem_usage);
_mem_usage = 0;
return vectorized::Block::create_unique(_output_mutable_block.to_block());
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/memtable_memory_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ bvar::Status<int64_t> g_memtable_flush_memory("mm_limiter_mem_flush", 0);
bvar::Status<int64_t> g_memtable_load_memory("mm_limiter_mem_load", 0);
bvar::Status<int64_t> g_load_hard_mem_limit("mm_limiter_limit_hard", 0);
bvar::Status<int64_t> g_load_soft_mem_limit("mm_limiter_limit_soft", 0);
bvar::Status<int64_t> g_orphan_memory("mm_limiter_mem_orphan", 0);

// Calculate the total memory limit of all load tasks on this BE
static int64_t calc_process_max_load_memory(int64_t process_mem_limit) {
Expand Down Expand Up @@ -236,6 +237,7 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
g_memtable_load_memory.set_value(_mem_usage);
VLOG_DEBUG << "refreshed mem_tracker, num writers: " << _writers.size();
THREAD_MEM_TRACKER_TRANSFER_TO(_mem_usage - _mem_tracker->consumption(), _mem_tracker.get());
g_orphan_memory.set_value(ExecEnv::GetInstance()->orphan_mem_tracker()->consumption());
if (!_hard_limit_reached()) {
_hard_limit_end_cond.notify_all();
}
Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "runtime/tablets_channel.h"

#include <bvar/bvar.h>
#include <fmt/format.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
Expand All @@ -41,13 +42,17 @@
#include "olap/storage_engine.h"
#include "olap/txn_manager.h"
#include "runtime/load_channel.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "vec/core/block.h"

namespace doris {
class SlotDescriptor;

bvar::Adder<int64_t> g_tablets_channel_send_data_allocated_size(
"tablets_channel_send_data_allocated_size");

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);

std::atomic<uint64_t> BaseTabletsChannel::_s_tablet_writer_count;
Expand Down Expand Up @@ -521,6 +526,10 @@ Status BaseTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request
<< "block rows: " << send_data.rows()
<< ", tablet_ids_size: " << request.tablet_ids_size();

g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
Defer defer {
[&]() { g_tablets_channel_send_data_allocated_size << -send_data.allocated_bytes(); }};

auto write_tablet_data = [&](uint32_t tablet_id,
std::function<Status(BaseDeltaWriter * writer)> write_func) {
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
Expand Down

0 comments on commit 9434ee5

Please sign in to comment.