Skip to content

Commit

Permalink
[enhancement](memtable) make memtable memusage more accurate (apache#…
Browse files Browse the repository at this point in the history
…40912)

## Proposed changes

1. Add memtype to memtable, and save a weak ptr vector in memtable
writer, so that we could get different memory usage by traverse the
vector.
2. Using scoped memory usage to compute the mem usage of a memtable.
3. CHECK if the tracker is 0 when the memtable flush success.

---------

Co-authored-by: yiguolei <yiguolei@gmail.com>
  • Loading branch information
yiguolei and Doris-Extras committed Oct 19, 2024
1 parent f707a39 commit 6d04a65
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 96 deletions.
2 changes: 0 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,6 @@ DEFINE_mInt32(memtable_hard_limit_active_percent, "50");
// percent of (active memtables size / all memtables size) when reach soft limit
DEFINE_mInt32(memtable_soft_limit_active_percent, "50");

// memtable insert memory tracker will multiply input block size with this ratio
DEFINE_mDouble(memtable_insert_memory_ratio, "1.4");
// max write buffer size before flush, default 200MB
DEFINE_mInt64(write_buffer_size, "209715200");
// max buffer size used in memtable for the aggregated table, default 400MB
Expand Down
2 changes: 0 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -674,8 +674,6 @@ DECLARE_mInt32(memtable_hard_limit_active_percent);
// percent of (active memtables size / all memtables size) when reach soft limit
DECLARE_mInt32(memtable_soft_limit_active_percent);

// memtable insert memory tracker will multiply input block size with this ratio
DECLARE_mDouble(memtable_insert_memory_ratio);
// max write buffer size before flush, default 200MB
DECLARE_mInt64(write_buffer_size);
// max buffer size used in memtable for the aggregated table, default 400MB
Expand Down
40 changes: 16 additions & 24 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,16 @@ using namespace ErrorCode;

MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker)
: _tablet_id(tablet_id),
bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info)
: _mem_type(MemType::ACTIVE),
_tablet_id(tablet_id),
_enable_unique_key_mow(enable_unique_key_mow),
_keys_type(tablet_schema->keys_type()),
_tablet_schema(tablet_schema),
_insert_mem_tracker(insert_mem_tracker),
_flush_mem_tracker(flush_mem_tracker),
_is_first_insertion(true),
_agg_functions(tablet_schema->num_columns()),
_offsets_of_aggregate_states(tablet_schema->num_columns()),
_total_size_of_aggregate_states(0),
_mem_usage(0) {
_total_size_of_aggregate_states(0) {
g_memtable_cnt << 1;
_query_thread_context.init_unlocked();
_arena = std::make_unique<vectorized::Arena>();
Expand All @@ -82,6 +78,7 @@ MemTable::MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schem
}
// TODO: Support ZOrderComparator in the future
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
_mem_tracker = std::make_shared<MemTracker>();
}

void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
Expand Down Expand Up @@ -142,6 +139,13 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {

MemTable::~MemTable() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
if (_is_flush_success) {
// If the memtable is flush success, then its memtracker's consumption should be 0
if (_mem_tracker->consumption() != 0 && config::crash_in_memory_tracker_inaccurate) {
LOG(FATAL) << "memtable flush success but cosumption is not 0, it is "
<< _mem_tracker->consumption();
}
}
g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes();
g_memtable_cnt << -1;
if (_keys_type != KeysType::DUP_KEYS) {
Expand All @@ -159,13 +163,7 @@ MemTable::~MemTable() {
}
}
std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>());
_insert_mem_tracker->release(_mem_usage);
_flush_mem_tracker->set_consumption(0);
DCHECK_EQ(_insert_mem_tracker->consumption(), 0) << std::endl
<< _insert_mem_tracker->log_usage();
DCHECK_EQ(_flush_mem_tracker->consumption(), 0);
_arena.reset();
_agg_buffer_pool.clear();
_vec_row_comparator.reset();
_row_in_blocks.clear();
_agg_functions.clear();
Expand All @@ -180,6 +178,7 @@ int RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* r

Status MemTable::insert(const vectorized::Block* input_block,
const std::vector<uint32_t>& row_idxs) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
if (_is_first_insertion) {
_is_first_insertion = false;
auto clone_block = input_block->clone_without_columns(&_column_offset);
Expand Down Expand Up @@ -214,10 +213,6 @@ Status MemTable::insert(const vectorized::Block* input_block,
row_idxs.data() + num_rows, &_column_offset));
auto block_size1 = _input_mutable_block.allocated_bytes();
g_memtable_input_block_allocated_size << block_size1 - block_size0;
auto input_size = size_t(input_block->bytes() * num_rows / input_block->rows() *
config::memtable_insert_memory_ratio);
_mem_usage += input_size;
_insert_mem_tracker->consume(input_size);
for (int i = 0; i < num_rows; i++) {
_row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i});
}
Expand Down Expand Up @@ -467,10 +462,6 @@ void MemTable::_aggregate() {
}
if constexpr (!is_final) {
// if is not final, we collect the agg results to input_block and then continue to insert
size_t shrunked_after_agg = _output_mutable_block.allocated_bytes();
// flush will not run here, so will not duplicate `_flush_mem_tracker`
_insert_mem_tracker->consume(shrunked_after_agg - _mem_usage);
_mem_usage = shrunked_after_agg;
_input_mutable_block.swap(_output_mutable_block);
//TODO(weixang):opt here.
std::unique_ptr<vectorized::Block> empty_input_block = in_block.create_same_struct_block(0);
Expand All @@ -483,6 +474,7 @@ void MemTable::_aggregate() {
}

void MemTable::shrink_memtable_by_agg() {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
if (_keys_type == KeysType::DUP_KEYS) {
return;
}
Expand Down Expand Up @@ -532,8 +524,8 @@ Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
}
g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes();
_input_mutable_block.clear();
_insert_mem_tracker->release(_mem_usage);
_mem_usage = 0;
// After to block, all data in arena is saved in the block
_arena.reset();
*res = vectorized::Block::create_unique(_output_mutable_block.to_block());
return Status::OK();
}
Expand Down
37 changes: 18 additions & 19 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ class TabletSchema;
class TupleDescriptor;
enum KeysType : int;

// Active: the memtable is currently used by writer to insert into blocks
// Write_finished: the memtable finished write blocks and in the queue waiting for flush
// FLUSH: the memtable is under flushing, write segment to disk.
enum MemType { ACTIVE = 0, WRITE_FINISHED = 1, FLUSH = 2 };

// row pos in _input_mutable_block
struct RowInBlock {
size_t _row_pos;
Expand Down Expand Up @@ -171,16 +176,11 @@ class MemTable {
public:
MemTable(int64_t tablet_id, std::shared_ptr<TabletSchema> tablet_schema,
const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info,
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker);
bool enable_unique_key_mow, PartialUpdateInfo* partial_update_info);
~MemTable();

int64_t tablet_id() const { return _tablet_id; }
size_t memory_usage() const {
return _insert_mem_tracker->consumption() + _arena->used_size() +
_flush_mem_tracker->consumption();
}
size_t memory_usage() const { return _mem_tracker->consumption(); }
// insert tuple from (row_pos) to (row_pos+num_rows)
Status insert(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs);

Expand All @@ -196,10 +196,16 @@ class MemTable {

const MemTableStat& stat() { return _stat; }

std::shared_ptr<MemTracker> flush_mem_tracker() { return _flush_mem_tracker; }

QueryThreadContext query_thread_context() { return _query_thread_context; }

std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }

void set_flush_success() { _is_flush_success = true; }

MemType get_mem_type() { return _mem_type; }

void update_mem_type(MemType memtype) { _mem_type = memtype; }

private:
// for vectorized
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
Expand All @@ -209,28 +215,23 @@ class MemTable {
Status _to_block(std::unique_ptr<vectorized::Block>* res);

private:
std::atomic<MemType> _mem_type;
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
bool _is_partial_update = false;
bool _is_flush_success = false;
const KeysType _keys_type;
std::shared_ptr<TabletSchema> _tablet_schema;

std::shared_ptr<RowInBlockComparator> _vec_row_comparator;

QueryThreadContext _query_thread_context;

// `_insert_manual_mem_tracker` manually records the memory value of memtable insert()
// `_flush_hook_mem_tracker` automatically records the memory value of memtable flush() through mem hook.
// Is used to flush when _insert_manual_mem_tracker larger than write_buffer_size and run flush memtable
// when the sum of all memtable (_insert_manual_mem_tracker + _flush_hook_mem_tracker) exceeds the limit.
std::shared_ptr<MemTracker> _insert_mem_tracker;
std::shared_ptr<MemTracker> _flush_mem_tracker;
std::shared_ptr<MemTracker> _mem_tracker;
// Only the rows will be inserted into block can allocate memory from _arena.
// In this way, we can make MemTable::memory_usage() to be more accurate, and eventually
// reduce the number of segment files that are generated by current load
std::unique_ptr<vectorized::Arena> _arena;
// The object buffer pool for convert tuple to row
ObjectPool _agg_buffer_pool;

void _init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
const TupleDescriptor* tuple_desc);
Expand Down Expand Up @@ -264,8 +265,6 @@ class MemTable {
std::vector<size_t> _offsets_of_aggregate_states;
size_t _total_size_of_aggregate_states;
std::vector<RowInBlock*> _row_in_blocks;
// Memory usage without _arena.
size_t _mem_usage;

size_t _num_columns;
int32_t _seq_col_idx_in_block = -1;
Expand Down
24 changes: 11 additions & 13 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ class MemtableFlushTask final : public Runnable {
ENABLE_FACTORY_CREATOR(MemtableFlushTask);

public:
MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::unique_ptr<MemTable> memtable,
MemtableFlushTask(std::shared_ptr<FlushToken> flush_token, std::shared_ptr<MemTable> memtable,
int32_t segment_id, int64_t submit_task_time)
: _flush_token(flush_token),
_memtable(std::move(memtable)),
_memtable(memtable),
_segment_id(segment_id),
_submit_task_time(submit_task_time) {
g_flush_task_num << 1;
Expand All @@ -60,15 +60,15 @@ class MemtableFlushTask final : public Runnable {
void run() override {
auto token = _flush_token.lock();
if (token) {
token->_flush_memtable(std::move(_memtable), _segment_id, _submit_task_time);
token->_flush_memtable(_memtable, _segment_id, _submit_task_time);
} else {
LOG(WARNING) << "flush token is deconstructed, ignore the flush task";
}
}

private:
std::weak_ptr<FlushToken> _flush_token;
std::unique_ptr<MemTable> _memtable;
std::shared_ptr<MemTable> _memtable;
int32_t _segment_id;
int64_t _submit_task_time;
};
Expand All @@ -83,7 +83,7 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
return os;
}

Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) {
Status FlushToken::submit(std::shared_ptr<MemTable> mem_table) {
{
std::shared_lock rdlk(_flush_status_lock);
DBUG_EXECUTE_IF("FlushToken.submit_flush_error", {
Expand All @@ -98,9 +98,8 @@ Status FlushToken::submit(std::unique_ptr<MemTable> mem_table) {
return Status::OK();
}
int64_t submit_task_time = MonotonicNanos();
auto task = MemtableFlushTask::create_shared(shared_from_this(), std::move(mem_table),
_rowset_writer->allocate_segment_id(),
submit_task_time);
auto task = MemtableFlushTask::create_shared(
shared_from_this(), mem_table, _rowset_writer->allocate_segment_id(), submit_task_time);
Status ret = _thread_pool->submit(std::move(task));
if (ret.ok()) {
// _wait_running_task_finish was executed after this function, so no need to notify _cond here
Expand Down Expand Up @@ -136,20 +135,19 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in
VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id()
<< ", memsize: " << memtable->memory_usage()
<< ", rows: " << memtable->stat().raw_rows;
memtable->update_mem_type(MemType::FLUSH);
int64_t duration_ns;
SCOPED_RAW_TIMER(&duration_ns);
SCOPED_ATTACH_TASK(memtable->query_thread_context());
signal::set_signal_task_id(_rowset_writer->load_id());
signal::tablet_id = memtable->tablet_id();
{
SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
std::unique_ptr<vectorized::Block> block;
// During to block method, it will release old memory and create new block, so that
// we could not scoped it.
RETURN_IF_ERROR(memtable->to_block(&block));
memtable->flush_mem_tracker()->consume(block->allocated_bytes());
SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker());
RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(), segment_id, flush_size));
}
memtable->set_flush_success();
_memtable_stat += memtable->stat();
DorisMetrics::instance()->memtable_flush_total->increment(1);
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
Expand All @@ -158,7 +156,7 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, in
return Status::OK();
}

void FlushToken::_flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t segment_id,
void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
int64_t submit_task_time) {
Defer defer {[&]() {
std::lock_guard<std::mutex> lock(_mutex);
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class FlushToken : public std::enable_shared_from_this<FlushToken> {
public:
FlushToken(ThreadPool* thread_pool) : _flush_status(Status::OK()), _thread_pool(thread_pool) {}

Status submit(std::unique_ptr<MemTable> mem_table);
Status submit(std::shared_ptr<MemTable> mem_table);

// error has happens, so we cancel this token
// And remove all tasks in the queue.
Expand All @@ -87,7 +87,7 @@ class FlushToken : public std::enable_shared_from_this<FlushToken> {
private:
friend class MemtableFlushTask;

void _flush_memtable(std::unique_ptr<MemTable> memtable_ptr, int32_t segment_id,
void _flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t segment_id,
int64_t submit_task_time);

Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size);
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/memtable_memory_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <bvar/bvar.h>

#include "common/config.h"
#include "olap/memtable.h"
#include "olap/memtable_writer.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
Expand Down Expand Up @@ -237,13 +238,14 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
_active_writers.clear();
for (auto it = _writers.begin(); it != _writers.end();) {
if (auto writer = it->lock()) {
// The memtable is currently used by writer to insert blocks.
auto active_usage = writer->active_memtable_mem_consumption();
_active_mem_usage += active_usage;
if (active_usage > 0) {
_active_writers.push_back(writer);
}
_flush_mem_usage += writer->mem_consumption(MemType::FLUSH);
_write_mem_usage += writer->mem_consumption(MemType::WRITE);
_write_mem_usage += writer->mem_consumption(MemType::WRITE_FINISHED);
++it;
} else {
*it = std::move(_writers.back());
Expand Down
Loading

0 comments on commit 6d04a65

Please sign in to comment.