Skip to content

Commit

Permalink
[feature](api) add BE HTTP /api/load_streams (apache#36312) (apache#3…
Browse files Browse the repository at this point in the history
…6338)

cherry-pick apache#36312
  • Loading branch information
kaijchen authored Jun 16, 2024
1 parent 6bb670a commit 612f2ae
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 16 deletions.
66 changes: 66 additions & 0 deletions be/src/http/action/load_stream_action.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "http/action/load_stream_action.h"

#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <limits>
#include <string>
#include <vector>

#include "cloud/config.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_request.h"
#include "http/http_status.h"
#include "olap/olap_common.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "runtime/exec_env.h"
#include "runtime/load_stream_mgr.h"
#include "service/backend_options.h"

namespace doris {

const static std::string HEADER_JSON = "application/json";

void LoadStreamAction::handle(HttpRequest* req) {
req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.c_str());
HttpChannel::send_reply(req, HttpStatus::OK, _get_load_streams().ToString());
}

EasyJson LoadStreamAction::_get_load_streams() {
EasyJson response;

auto load_streams = ExecEnv::GetInstance()->load_stream_mgr()->get_all_load_stream_ids();

response["msg"] = "OK";
response["code"] = 0;
EasyJson data = response.Set("data", EasyJson::kObject);
data["host"] = BackendOptions::get_localhost();
EasyJson tablets = data.Set("load_streams", EasyJson::kArray);
for (auto& load_id : load_streams) {
EasyJson tablet = tablets.PushBack(EasyJson::kObject);
tablet["load_id"] = load_id;
}
response["count"] = load_streams.size();
return response;
}

} // namespace doris
42 changes: 42 additions & 0 deletions be/src/http/action/load_stream_action.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <string>

#include "http/http_handler.h"
#include "util/easy_json.h"

namespace doris {
class HttpRequest;

class ExecEnv;

// Get BE load stream info from http API.
class LoadStreamAction final : public HttpHandler {
public:
LoadStreamAction() = default;

~LoadStreamAction() override = default;

void handle(HttpRequest* req) override;

private:
static EasyJson _get_load_streams();
};
} // namespace doris
1 change: 1 addition & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "olap/tablet_manager.h"
#include "runtime/fragment_mgr.h"
#include "runtime/frontend_info.h"
#include "runtime/load_stream_mgr.h"
#include "util/debug_util.h"
#include "util/time.h"
#include "vec/sink/delta_writer_v2_pool.h"
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ class ExecEnv {
return _function_client_cache;
}
LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr.get(); }
std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr() { return _new_load_stream_mgr; }
SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
BlockSpillManager* block_spill_mgr() { return _block_spill_mgr; }
Expand Down Expand Up @@ -375,6 +376,7 @@ class ExecEnv {
BfdParser* _bfd_parser = nullptr;
BrokerMgr* _broker_mgr = nullptr;
LoadChannelMgr* _load_channel_mgr = nullptr;
std::unique_ptr<LoadStreamMgr> _load_stream_mgr;
// TODO(zhiqiang): Do not use shared_ptr in exec_env, we can not control its life cycle.
std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr;
BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr;
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#include "runtime/heartbeat_flags.h"
#include "runtime/load_channel_mgr.h"
#include "runtime/load_path_mgr.h"
#include "runtime/load_stream_mgr.h"
#include "runtime/memory/cache_manager.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
Expand Down Expand Up @@ -223,6 +224,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
_bfd_parser = BfdParser::create();
_broker_mgr = new BrokerMgr(this);
_load_channel_mgr = new LoadChannelMgr();
auto num_flush_threads = _store_paths.size() * config::flush_thread_num_per_store;
_load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads);
_new_load_stream_mgr = NewLoadStreamMgr::create_shared();
_internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
Expand Down
7 changes: 2 additions & 5 deletions be/src/runtime/load_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@

namespace doris {

LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num,
FifoThreadPool* heavy_work_pool, FifoThreadPool* light_work_pool)
: _num_threads(segment_file_writer_thread_num),
_heavy_work_pool(heavy_work_pool),
_light_work_pool(light_work_pool) {
LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num)
: _num_threads(segment_file_writer_thread_num) {
static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool")
.set_min_threads(segment_file_writer_thread_num)
.set_max_threads(segment_file_writer_thread_num)
Expand Down
16 changes: 14 additions & 2 deletions be/src/runtime/load_stream_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class POpenStreamSinkRequest;

class LoadStreamMgr {
public:
LoadStreamMgr(uint32_t segment_file_writer_thread_num, FifoThreadPool* heavy_work_pool,
FifoThreadPool* light_work_pool);
LoadStreamMgr(uint32_t segment_file_writer_thread_num);
~LoadStreamMgr();

Status open_load_stream(const POpenLoadStreamRequest* request, LoadStream*& load_stream);
Expand All @@ -47,12 +46,25 @@ class LoadStreamMgr {
}
}

std::vector<std::string> get_all_load_stream_ids() {
std::vector<std::string> result;
std::lock_guard<std::mutex> lock(_lock);

for (auto& [id, _] : _load_streams_map) {
result.push_back(id.to_string());
}
return result;
}

// only used by ut
size_t get_load_stream_num() { return _load_streams_map.size(); }

FifoThreadPool* heavy_work_pool() { return _heavy_work_pool; }
FifoThreadPool* light_work_pool() { return _light_work_pool; }

void set_heavy_work_pool(FifoThreadPool* pool) { _heavy_work_pool = pool; }
void set_light_work_pool(FifoThreadPool* pool) { _light_work_pool = pool; }

private:
std::mutex _lock;
std::unordered_map<UniqueId, LoadStreamPtr> _load_streams_map;
Expand Down
5 changes: 5 additions & 0 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "http/action/health_action.h"
#include "http/action/http_stream.h"
#include "http/action/jeprofile_actions.h"
#include "http/action/load_stream_action.h"
#include "http/action/meta_action.h"
#include "http/action/metrics_action.h"
#include "http/action/pad_rowset_action.h"
Expand Down Expand Up @@ -188,6 +189,10 @@ Status HttpService::start() {
_ev_http_server->register_handler(HttpMethod::GET, "/api/running_pipeline_tasks/{duration}",
long_pipeline_task_action);

// Register BE LoadStream action
LoadStreamAction* load_stream_action = _pool.add(new LoadStreamAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/load_streams", load_stream_action);

// Register Tablets Info action
TabletsInfoAction* tablets_info_action =
_pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
Expand Down
10 changes: 5 additions & 5 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,7 @@ PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
config::brpc_light_work_pool_max_queue_size != -1
? config::brpc_light_work_pool_max_queue_size
: std::max(10240, CpuInfo::num_cores() * 320),
"brpc_light"),
_load_stream_mgr(new LoadStreamMgr(
exec_env->store_paths().size() * config::flush_thread_num_per_store,
&_heavy_work_pool, &_light_work_pool)) {
"brpc_light") {
REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
[this]() { return _heavy_work_pool.get_queue_size(); });
REGISTER_HOOK_METRIC(light_work_pool_queue_size,
Expand All @@ -239,6 +236,9 @@ PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
REGISTER_HOOK_METRIC(light_work_max_threads,
[]() { return config::brpc_light_work_pool_threads; });

_exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool);
_exec_env->load_stream_mgr()->set_light_work_pool(&_light_work_pool);

CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
CHECK_EQ(0, bthread_key_create(&AsyncIO::btls_io_ctx_key, AsyncIO::io_ctx_key_deleter));
}
Expand Down Expand Up @@ -389,7 +389,7 @@ void PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
}

LoadStream* load_stream = nullptr;
auto st = _load_stream_mgr->open_load_stream(request, load_stream);
auto st = _exec_env->load_stream_mgr()->open_load_stream(request, load_stream);
if (!st.ok()) {
st.to_protobuf(response->mutable_status());
return;
Expand Down
3 changes: 0 additions & 3 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ namespace doris {
class ExecEnv;
class PHandShakeRequest;
class PHandShakeResponse;
class LoadStreamMgr;
class RuntimeState;

class PInternalServiceImpl : public PBackendService {
Expand Down Expand Up @@ -279,8 +278,6 @@ class PInternalServiceImpl : public PBackendService {
// otherwise as light interface
FifoThreadPool _heavy_work_pool;
FifoThreadPool _light_work_pool;

std::unique_ptr<LoadStreamMgr> _load_stream_mgr;
};

} // namespace doris
4 changes: 3 additions & 1 deletion be/test/runtime/load_stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,9 @@ class LoadStreamMgrTest : public testing::Test {

static_cast<void>(k_engine->start_bg_threads());

_load_stream_mgr = std::make_unique<LoadStreamMgr>(4, &_heavy_work_pool, &_light_work_pool);
_load_stream_mgr = std::make_unique<LoadStreamMgr>(4);
_load_stream_mgr->set_heavy_work_pool(&_heavy_work_pool);
_load_stream_mgr->set_light_work_pool(&_light_work_pool);
_stream_service = new StreamService(_load_stream_mgr.get());
CHECK_EQ(0, _server->AddService(_stream_service, brpc::SERVER_OWNS_SERVICE));
brpc::ServerOptions server_options;
Expand Down

0 comments on commit 612f2ae

Please sign in to comment.