Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](move-memtable) fix move memtable core when use multi table load #35458

Merged
merged 2 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,26 +214,6 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
*file_reader = stream_load_ctx->pipe;
}

if (file_reader->get() == nullptr) {
return Status::OK();
}

auto multi_table_pipe = std::dynamic_pointer_cast<io::MultiTablePipe>(*file_reader);
if (multi_table_pipe == nullptr || runtime_state == nullptr) {
return Status::OK();
}

TUniqueId pipe_id;
if (runtime_state->enable_pipeline_x_exec()) {
pipe_id = io::StreamLoadPipe::calculate_pipe_id(runtime_state->query_id(),
runtime_state->fragment_id());
} else {
pipe_id = runtime_state->fragment_instance_id();
}
*file_reader = multi_table_pipe->get_pipe(pipe_id);
LOG(INFO) << "create pipe reader for fragment instance: " << pipe_id
<< " pipe: " << (*file_reader).get();

return Status::OK();
}

Expand Down
219 changes: 97 additions & 122 deletions be/src/io/fs/multi_table_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ Status MultiTablePipe::append_json(const char* data, size_t size) {
}

KafkaConsumerPipePtr MultiTablePipe::get_pipe_by_table(const std::string& table) {
auto pipe = _planned_pipes.find(table);
DCHECK(pipe != _planned_pipes.end());
return pipe->second;
auto pair = _planned_tables.find(table);
DCHECK(pair != _planned_tables.end());
return std::static_pointer_cast<io::KafkaConsumerPipe>(pair->second->pipe);
}

static std::string_view get_first_part(const char* dat, char delimiter) {
Expand All @@ -78,15 +78,15 @@ static std::string_view get_first_part(const char* dat, char delimiter) {
}

Status MultiTablePipe::finish() {
for (auto& pair : _planned_pipes) {
RETURN_IF_ERROR(pair.second->finish());
for (auto& pair : _planned_tables) {
RETURN_IF_ERROR(pair.second->pipe->finish());
}
return Status::OK();
}

void MultiTablePipe::cancel(const std::string& reason) {
for (auto& pair : _planned_pipes) {
pair.second->cancel(reason);
for (auto& pair : _planned_tables) {
pair.second->pipe->cancel(reason);
}
}

Expand All @@ -101,19 +101,29 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
return Status::InternalError("empty data");
}
KafkaConsumerPipePtr pipe = nullptr;
auto iter = _planned_pipes.find(table);
if (iter != _planned_pipes.end()) {
pipe = iter->second;
auto iter = _planned_tables.find(table);
if (iter != _planned_tables.end()) {
pipe = std::static_pointer_cast<io::KafkaConsumerPipe>(iter->second->pipe);
RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
"append failed in planned kafka pipe");
} else {
iter = _unplanned_pipes.find(table);
if (iter == _unplanned_pipes.end()) {
iter = _unplanned_tables.find(table);
if (iter == _unplanned_tables.end()) {
std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(doris::ExecEnv::GetInstance());
ctx->id = UniqueId::gen_uid();
pipe = std::make_shared<io::KafkaConsumerPipe>();
LOG(INFO) << "create new unplanned pipe: " << pipe.get() << ", ctx: " << _ctx->brief();
_unplanned_pipes.emplace(table, pipe);
ctx->pipe = pipe;
#ifndef BE_TEST
RETURN_NOT_OK_STATUS_WITH_WARN(
doris::ExecEnv::GetInstance()->new_load_stream_mgr()->put(ctx->id, ctx),
"put stream load ctx error");
#endif
_unplanned_tables.emplace(table, ctx);
LOG(INFO) << "create new unplanned table ctx, table: " << table
<< "load id: " << ctx->id << ", txn id: " << _ctx->txn_id;
} else {
pipe = iter->second;
pipe = std::static_pointer_cast<io::KafkaConsumerPipe>(iter->second->pipe);
}

// It is necessary to determine whether the sum of pipe_current_capacity and size is greater than pipe_max_capacity,
Expand All @@ -124,7 +134,7 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size
auto pipe_current_capacity = pipe->current_capacity();
auto pipe_max_capacity = pipe->max_capacity();
if (_unplanned_row_cnt >= _row_threshold ||
_unplanned_pipes.size() >= _wait_tables_threshold ||
_unplanned_tables.size() >= _wait_tables_threshold ||
pipe_current_capacity + size > pipe_max_capacity) {
LOG(INFO) << fmt::format(
"unplanned row cnt={} reach row_threshold={} or "
Expand All @@ -151,112 +161,106 @@ Status MultiTablePipe::dispatch(const std::string& table, const char* data, size

#ifndef BE_TEST
Status MultiTablePipe::request_and_exec_plans() {
if (_unplanned_pipes.empty()) {
if (_unplanned_tables.empty()) {
return Status::OK();
}

// get list of table names in unplanned pipes
std::vector<std::string> tables;
fmt::memory_buffer log_buffer;
log_buffer.clear();
fmt::format_to(log_buffer, "request plans for {} tables: [ ", _unplanned_pipes.size());
for (auto& pair : _unplanned_pipes) {
tables.push_back(pair.first);
fmt::format_to(log_buffer, "request plans for {} tables: [ ", _unplanned_tables.size());
for (auto& pair : _unplanned_tables) {
fmt::format_to(log_buffer, "{} ", pair.first);
}
fmt::format_to(log_buffer, "]");
LOG(INFO) << fmt::to_string(log_buffer);

TStreamLoadPutRequest request;
set_request_auth(&request, _ctx->auth);
request.db = _ctx->db;
request.table_names = tables;
request.__isset.table_names = true;
request.txnId = _ctx->txn_id;
request.formatType = _ctx->format;
request.__set_compress_type(_ctx->compress_type);
request.__set_header_type(_ctx->header_type);
request.__set_loadId(_ctx->id.to_thrift());
request.fileType = TFileType::FILE_STREAM;
request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
request.__set_user(_ctx->qualified_user);
request.__set_cloud_cluster(_ctx->cloud_cluster);
// no need to register new_load_stream_mgr coz it is already done in routineload submit task

// plan this load
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
TNetworkAddress master_addr = exec_env->master_info()->network_address;
int64_t stream_load_put_start_time = MonotonicNanos();
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, this](FrontendServiceConnection& client) {
client->streamLoadMultiTablePut(_ctx->multi_table_put_result, request);
}));
_ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;

Status plan_status(Status::create(_ctx->multi_table_put_result.status));
if (!plan_status.ok()) {
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << _ctx->brief();
return plan_status;
}

Status st;
if (_ctx->multi_table_put_result.__isset.params &&
!_ctx->multi_table_put_result.__isset.pipeline_params) {
st = exec_plans(exec_env, _ctx->multi_table_put_result.params);
} else if (!_ctx->multi_table_put_result.__isset.params &&
_ctx->multi_table_put_result.__isset.pipeline_params) {
st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params);
} else {
return Status::Aborted("too many or too few params are set in multi_table_put_result.");
}
for (auto& pair : _unplanned_tables) {
TStreamLoadPutRequest request;
set_request_auth(&request, _ctx->auth);
std::vector<std::string> tables;
tables.push_back(pair.first);
request.db = _ctx->db;
request.table_names = tables;
request.__isset.table_names = true;
request.txnId = _ctx->txn_id;
request.formatType = _ctx->format;
request.__set_compress_type(_ctx->compress_type);
request.__set_header_type(_ctx->header_type);
request.__set_loadId((pair.second->id).to_thrift());
request.fileType = TFileType::FILE_STREAM;
request.__set_thrift_rpc_timeout_ms(config::thrift_rpc_timeout_ms);
request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
request.__set_user(_ctx->qualified_user);
request.__set_cloud_cluster(_ctx->cloud_cluster);
// no need to register new_load_stream_mgr coz it is already done in routineload submit task

// plan this load
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
TNetworkAddress master_addr = exec_env->master_info()->network_address;
int64_t stream_load_put_start_time = MonotonicNanos();
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, this](FrontendServiceConnection& client) {
client->streamLoadMultiTablePut(_ctx->multi_table_put_result, request);
}));
_ctx->stream_load_put_cost_nanos = MonotonicNanos() - stream_load_put_start_time;

Status plan_status(Status::create(_ctx->multi_table_put_result.status));
if (!plan_status.ok()) {
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status << _ctx->brief();
return plan_status;
}

if (_ctx->multi_table_put_result.__isset.params &&
!_ctx->multi_table_put_result.__isset.pipeline_params) {
st = exec_plans(exec_env, _ctx->multi_table_put_result.params);
} else if (!_ctx->multi_table_put_result.__isset.params &&
_ctx->multi_table_put_result.__isset.pipeline_params) {
st = exec_plans(exec_env, _ctx->multi_table_put_result.pipeline_params);
} else {
return Status::Aborted("too many or too few params are set in multi_table_put_result.");
}
if (!st.ok()) {
return st;
}
}
_unplanned_tables.clear();
return st;
}

template <typename ExecParam>
Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> params) {
// put unplanned pipes into planned pipes and clear unplanned pipes
for (auto& pipe : _unplanned_pipes) {
_ctx->table_list.push_back(pipe.first);
_planned_pipes.emplace(pipe.first, pipe.second);
for (auto& pair : _unplanned_tables) {
_ctx->table_list.push_back(pair.first);
_planned_tables.emplace(pair.first, pair.second);
}
LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}, returned plan cnt={}",
_unplanned_pipes.size(), _planned_pipes.size(), params.size())
_unplanned_tables.size(), _planned_tables.size(), params.size())
<< ", ctx: " << _ctx->brief();
_unplanned_pipes.clear();

for (auto& plan : params) {
DBUG_EXECUTE_IF("MultiTablePipe.exec_plans.failed",
{ return Status::Aborted("MultiTablePipe.exec_plans.failed"); });
if (!plan.__isset.table_name ||
_planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
_unplanned_tables.find(plan.table_name) == _unplanned_tables.end()) {
return Status::Aborted("Missing vital param: table_name");
}

if constexpr (std::is_same_v<ExecParam, TExecPlanFragmentParams>) {
RETURN_IF_ERROR(
put_pipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name]));
LOG(INFO) << "fragment_instance_id=" << print_id(plan.params.fragment_instance_id)
<< " table=" << plan.table_name << ", ctx: " << _ctx->brief();
} else if constexpr (std::is_same_v<ExecParam, TPipelineFragmentParams>) {
auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id);
RETURN_IF_ERROR(put_pipe(pipe_id, _planned_pipes[plan.table_name]));
LOG(INFO) << "pipe_id=" << pipe_id << ", table=" << plan.table_name
<< ", ctx: " << _ctx->brief();
} else {
LOG(WARNING) << "illegal exec param type, need `TExecPlanFragmentParams` or "
"`TPipelineFragmentParams`, will crash"
<< ", ctx: " << _ctx->brief();
CHECK(false);
}

_inflight_cnt++;

RETURN_IF_ERROR(exec_env->fragment_mgr()->exec_plan_fragment(
plan, [this](RuntimeState* state, Status* status) {
plan, [this, plan](RuntimeState* state, Status* status) {
DCHECK(state);
auto pair = _planned_tables.find(plan.table_name);
if (pair == _planned_tables.end()) {
LOG(WARNING) << "failed to get ctx, table: " << plan.table_name;
} else {
doris::ExecEnv::GetInstance()->new_load_stream_mgr()->remove(
pair->second->id);
}

{
std::lock_guard<std::mutex> l(_tablet_commit_infos_lock);
_tablet_commit_infos.insert(_tablet_commit_infos.end(),
Expand Down Expand Up @@ -300,12 +304,12 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para
#else
Status MultiTablePipe::request_and_exec_plans() {
// put unplanned pipes into planned pipes
for (auto& pipe : _unplanned_pipes) {
_planned_pipes.emplace(pipe.first, pipe.second);
for (auto& pipe : _unplanned_tables) {
_planned_tables.emplace(pipe.first, pipe.second);
}
LOG(INFO) << fmt::format("{} tables plan complete, planned table cnt={}",
_unplanned_pipes.size(), _planned_pipes.size());
_unplanned_pipes.clear();
_unplanned_tables.size(), _planned_tables.size());
_unplanned_tables.clear();
return Status::OK();
}

Expand All @@ -330,35 +334,6 @@ void MultiTablePipe::_handle_consumer_finished() {
_ctx->promise.set_value(_status); // when all done, finish the routine load task
}

Status MultiTablePipe::put_pipe(const TUniqueId& pipe_id,
std::shared_ptr<io::StreamLoadPipe> pipe) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(pipe_id);
if (it != std::end(_pipe_map)) {
return Status::InternalError("id already exist");
}
_pipe_map.emplace(pipe_id, pipe);
return Status::OK();
}

std::shared_ptr<io::StreamLoadPipe> MultiTablePipe::get_pipe(const TUniqueId& pipe_id) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(pipe_id);
if (it == std::end(_pipe_map)) {
return {};
}
return it->second;
}

void MultiTablePipe::remove_pipe(const TUniqueId& pipe_id) {
std::lock_guard<std::mutex> l(_pipe_map_lock);
auto it = _pipe_map.find(pipe_id);
if (it != std::end(_pipe_map)) {
_pipe_map.erase(it);
VLOG_NOTICE << "remove stream load pipe: " << pipe_id;
}
}

template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
std::vector<TExecPlanFragmentParams> params);
template Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
Expand Down
11 changes: 2 additions & 9 deletions be/src/io/fs/multi_table_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ class MultiTablePipe : public KafkaConsumerPipe {

void cancel(const std::string& reason) override;

// register <instance id, pipe> pair
Status put_pipe(const TUniqueId& pipe_id, std::shared_ptr<io::StreamLoadPipe> pipe);

std::shared_ptr<io::StreamLoadPipe> get_pipe(const TUniqueId& pipe_id);

void remove_pipe(const TUniqueId& pipe_id);

private:
// parse table name from data
std::string parse_dst_table(const char* data, size_t size);
Expand All @@ -82,8 +75,8 @@ class MultiTablePipe : public KafkaConsumerPipe {
void _handle_consumer_finished();

private:
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _planned_pipes;
std::unordered_map<std::string /*table*/, KafkaConsumerPipePtr> _unplanned_pipes;
std::unordered_map<std::string /*table*/, std::shared_ptr<StreamLoadContext>> _planned_tables;
std::unordered_map<std::string /*table*/, std::shared_ptr<StreamLoadContext>> _unplanned_tables;
std::atomic<uint64_t> _unplanned_row_cnt {0}; // trigger plan request when exceed threshold
// inflight count, when it is zero, means consume and all plans is finished
std::atomic<uint64_t> _inflight_cnt {1};
Expand Down
Loading