From 1141cb9df5774d57ca04c9441fb9b2ee94529a19 Mon Sep 17 00:00:00 2001 From: chaoyli Date: Sun, 7 Nov 2021 23:19:36 +0800 Subject: [PATCH 01/10] Exit the process gracefully. It's to used to detect memory leak upon exiting the process. Every thread in StorageEngine is already check the existed state. If the existed state is set to be true, the thread will be exited. But the the thread and the class in StarRocks is not seperated very well. So the pull request do the best effort to the graceful exiting. --- be/src/agent/agent_server.cpp | 28 ++++- be/src/agent/multi_worker_pool.cpp | 6 ++ be/src/agent/multi_worker_pool.h | 5 +- be/src/agent/task_worker_pool.cpp | 100 ++++++++++++++---- be/src/agent/task_worker_pool.h | 13 ++- be/src/common/daemon.cpp | 12 ++- be/src/runtime/exec_env.cpp | 4 +- be/src/runtime/exec_env.h | 2 +- be/src/runtime/external_scan_context_mgr.cpp | 5 +- be/src/runtime/external_scan_context_mgr.h | 6 +- .../routine_load/routine_load_task_executor.h | 1 - be/src/service/starrocks_main.cpp | 39 ++++--- be/src/storage/delete_handler.h | 2 +- be/src/storage/merger.cpp | 11 +- be/src/storage/olap_server.cpp | 41 +++---- be/src/storage/push_handler.cpp | 10 +- be/src/storage/schema_change.cpp | 21 +++- be/src/storage/storage_engine.cpp | 33 +++--- be/src/storage/storage_engine.h | 9 +- be/src/storage/task/engine_checksum_task.cpp | 5 +- be/src/storage/task/engine_clone_task.cpp | 53 ++++++++++ .../task/engine_storage_migration_task.cpp | 12 +++ be/src/storage/update_manager.cpp | 1 + be/src/storage/vectorized/compaction.cpp | 22 +++- be/src/storage/vectorized/compaction.h | 2 +- be/src/storage/vectorized/schema_change.cpp | 20 +++- 26 files changed, 356 insertions(+), 107 deletions(-) diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 86a1d31560a3a1..2ab3eb110eb38a 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -96,7 +96,33 @@ AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info) #undef CREATE_AND_START_POOL } -AgentServer::~AgentServer() = default; +AgentServer::~AgentServer() { +#ifndef STOP_POOL +#define STOP_POOL(type, pool_name) pool_name->stop(); +#endif + + STOP_POOL(CREATE_TABLE, _create_tablet_workers); + STOP_POOL(DROP_TABLE, _drop_tablet_workers); + // Both PUSH and REALTIME_PUSH type use _push_workers + STOP_POOL(PUSH, _push_workers); + STOP_POOL(PUBLISH_VERSION, _publish_version_workers); + STOP_POOL(CLEAR_TRANSACTION_TASK, _clear_transaction_task_workers); + STOP_POOL(DELETE, _delete_workers); + STOP_POOL(ALTER_TABLE, _alter_tablet_workers); + STOP_POOL(CLONE, _clone_workers); + STOP_POOL(STORAGE_MEDIUM_MIGRATE, _storage_medium_migrate_workers); + STOP_POOL(CHECK_CONSISTENCY, _check_consistency_workers); + STOP_POOL(REPORT_TASK, _report_task_workers); + STOP_POOL(REPORT_DISK_STATE, _report_disk_state_workers); + STOP_POOL(REPORT_OLAP_TABLE, _report_tablet_workers); + STOP_POOL(UPLOAD, _upload_workers); + STOP_POOL(DOWNLOAD, _download_workers); + STOP_POOL(MAKE_SNAPSHOT, _make_snapshot_workers); + STOP_POOL(RELEASE_SNAPSHOT, _release_snapshot_workers); + STOP_POOL(MOVE, _move_dir_workers); + STOP_POOL(UPDATE_TABLET_META_INFO, _update_tablet_meta_info_workers); +#undef STOP_POOL +} // TODO(lingbin): each task in the batch may have it own status or FE must check and // resend request when something is wrong(BE may need some logic to guarantee idempotence. diff --git a/be/src/agent/multi_worker_pool.cpp b/be/src/agent/multi_worker_pool.cpp index 6c91d51a3b5958..77881c96aedaa5 100644 --- a/be/src/agent/multi_worker_pool.cpp +++ b/be/src/agent/multi_worker_pool.cpp @@ -20,6 +20,12 @@ void MultiWorkerPool::start() { } } +void MultiWorkerPool::stop() { + for (const auto& pool : _pools) { + pool->stop(); + } +} + void MultiWorkerPool::submit_publish_version_task(const TAgentTaskRequest& task) { auto req = task.publish_version_req; for (auto& partition : req.partition_version_infos) { diff --git a/be/src/agent/multi_worker_pool.h b/be/src/agent/multi_worker_pool.h index fbd3ec294a88db..9ed8285f9a4612 100644 --- a/be/src/agent/multi_worker_pool.h +++ b/be/src/agent/multi_worker_pool.h @@ -15,10 +15,11 @@ class MultiWorkerPool : public TaskWorkerPool { MultiWorkerPool(const TaskWorkerType worker_type, ExecEnv* env, const TMasterInfo& master_info, int worker_num); ~MultiWorkerPool() override = default; - ; void start() override; + void stop() override; + // submit task to queue and wait to be executed void submit_task(const TAgentTaskRequest& task) override; @@ -27,4 +28,4 @@ class MultiWorkerPool : public TaskWorkerPool { std::vector> _pools; }; -} // namespace starrocks \ No newline at end of file +} // namespace starrocks diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 221a2773785034..28e96625ee60ce 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -90,8 +90,7 @@ TaskWorkerPool::~TaskWorkerPool() { // "In glibc 2.25 we implemented a new version of POSIX condition variables to provide stronger // ordering guarantees. The change in the implementation caused the undefined behaviour // to change." - std::lock_guard l(_worker_thread_lock); - _worker_thread_condition_variable->notify_all(); + stop(); delete _worker_thread_condition_variable; } @@ -169,6 +168,16 @@ void TaskWorkerPool::start() { #endif } +void TaskWorkerPool::stop() { + if (_stopped) { + return; + } + _stopped = true; + std::lock_guard l(_worker_thread_lock); + _worker_thread_condition_variable->notify_all(); + sleep(1); // wait thread to exit +} + void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) { const TTaskType::type task_type = task.task_type; int64_t signature = task.signature; @@ -290,9 +299,12 @@ void* TaskWorkerPool::_create_tablet_worker_thread_callback(void* arg_this) { TCreateTabletReq create_tablet_req; { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); create_tablet_req = agent_task_req.create_tablet_req; @@ -355,9 +367,12 @@ void* TaskWorkerPool::_drop_tablet_worker_thread_callback(void* arg_this) { TDropTabletReq drop_tablet_req; { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); drop_tablet_req = agent_task_req.drop_tablet_req; @@ -407,9 +422,12 @@ void* TaskWorkerPool::_alter_tablet_worker_thread_callback(void* arg_this) { TAgentTaskRequest agent_task_req; { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); worker_pool_this->_tasks.pop_front(); @@ -555,9 +573,12 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { int32_t index = 0; do { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } index = worker_pool_this->_get_next_task_index( config::push_worker_count_normal_priority + config::push_worker_count_high_priority, @@ -574,6 +595,10 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { worker_pool_this->_tasks.erase(worker_pool_this->_tasks.begin() + index); } while (false); + if (worker_pool_this->_stopped) { + break; + } + #ifndef BE_TEST if (index < 0) { // there is no high priority task in queue @@ -650,9 +675,12 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) { TPublishVersionRequest publish_version_req; { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); publish_version_req = agent_task_req.publish_version_req; @@ -715,9 +743,12 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t TClearTransactionTaskRequest clear_transaction_task_req; { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); clear_transaction_task_req = agent_task_req.clear_transaction_task_req; @@ -773,9 +804,12 @@ void* TaskWorkerPool::_update_tablet_meta_worker_thread_callback(void* arg_this) TUpdateTabletMetaInfoReq update_tablet_meta_req; { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); update_tablet_meta_req = agent_task_req.update_tablet_meta_info_req; @@ -842,9 +876,12 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); clone_req = agent_task_req.clone_req; @@ -939,9 +976,12 @@ void* TaskWorkerPool::_storage_medium_migrate_worker_thread_callback(void* arg_t TStorageMediumMigrateReq storage_medium_migrate_req; { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); storage_medium_migrate_req = agent_task_req.storage_medium_migrate_req; @@ -1039,9 +1079,12 @@ void* TaskWorkerPool::_check_consistency_worker_thread_callback(void* arg_this) TCheckConsistencyReq check_consistency_req; { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); check_consistency_req = agent_task_req.check_consistency_req; @@ -1092,7 +1135,7 @@ void* TaskWorkerPool::_report_task_worker_thread_callback(void* arg_this) { request.__set_backend(worker_pool_this->_backend); #ifndef BE_TEST - while (true) { + while ((!worker_pool_this->_stopped)) { #endif { std::lock_guard task_signatures_lock(_s_task_signatures_lock); @@ -1124,7 +1167,7 @@ void* TaskWorkerPool::_report_disk_state_worker_thread_callback(void* arg_this) request.__set_backend(worker_pool_this->_backend); #ifndef BE_TEST - while (true) { + while ((!worker_pool_this->_stopped)) { if (worker_pool_this->_master_info.network_address.port == 0) { // port == 0 means not received heartbeat yet // sleep a short time and try again @@ -1186,7 +1229,7 @@ void* TaskWorkerPool::_report_tablet_worker_thread_callback(void* arg_this) { AgentStatus status = STARROCKS_SUCCESS; #ifndef BE_TEST - while (true) { + while ((!worker_pool_this->_stopped)) { if (worker_pool_this->_master_info.network_address.port == 0) { // port == 0 means not received heartbeat yet // sleep a short time and try again @@ -1206,7 +1249,7 @@ void* TaskWorkerPool::_report_tablet_worker_thread_callback(void* arg_this) { StorageEngine::instance()->wait_for_report_notify(config::report_tablet_interval_seconds, true); continue; #else - return (void*)0; + return (void*)0; #endif } int64_t max_compaction_score = @@ -1243,9 +1286,12 @@ void* TaskWorkerPool::_upload_worker_thread_callback(void* arg_this) { TUploadReq upload_request; { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); upload_request = agent_task_req.upload_req; @@ -1298,9 +1344,12 @@ void* TaskWorkerPool::_download_worker_thread_callback(void* arg_this) { TDownloadReq download_request; { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); download_request = agent_task_req.download_req; @@ -1355,9 +1404,12 @@ void* TaskWorkerPool::_make_snapshot_thread_callback(void* arg_this) { TSnapshotRequest snapshot_request; { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); snapshot_request = agent_task_req.snapshot_req; @@ -1430,9 +1482,12 @@ void* TaskWorkerPool::_release_snapshot_thread_callback(void* arg_this) { TReleaseSnapshotRequest release_snapshot_request; { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); release_snapshot_request = agent_task_req.release_snapshot_req; @@ -1497,9 +1552,12 @@ void* TaskWorkerPool::_move_dir_thread_callback(void* arg_this) { TMoveDirReq move_dir_req; { std::unique_lock l(worker_pool_this->_worker_thread_lock); - while (worker_pool_this->_tasks.empty()) { + while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } + if (worker_pool_this->_stopped) { + break; + } agent_task_req = worker_pool_this->_tasks.front(); move_dir_req = agent_task_req.move_dir_req; diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 7181741ceba6a8..509c83abcbb835 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -49,13 +49,11 @@ class TaskWorkerPool { PUSH, REALTIME_PUSH, PUBLISH_VERSION, - // Deprecated - CLEAR_ALTER_TASK, + CLEAR_ALTER_TASK, // Deprecated CLEAR_TRANSACTION_TASK, DELETE, ALTER_TABLE, - // Deprecated - QUERY_SPLIT_KEY, + QUERY_SPLIT_KEY, // Deprecated CLONE, STORAGE_MEDIUM_MIGRATE, CHECK_CONSISTENCY, @@ -76,9 +74,12 @@ class TaskWorkerPool { TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* env, const TMasterInfo& master_info, int worker_num); virtual ~TaskWorkerPool(); - // Start the task worker callback thread + // start the task worker callback thread virtual void start(); + // stop the task worker callback thread + virtual void stop(); + // Submit task to task pool // // Input parameters: @@ -142,6 +143,8 @@ class TaskWorkerPool { static std::mutex _s_task_signatures_lock; static std::map> _s_task_signatures; + std::atomic _stopped{false}; + TaskWorkerPool(const TaskWorkerPool&) = delete; const TaskWorkerPool& operator=(const TaskWorkerPool&) = delete; }; // class TaskWorkerPool diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 3a441471d2c0ce..23c6a1326aebb0 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -34,6 +34,7 @@ #include "runtime/user_function_cache.h" #include "runtime/vectorized/time_types.h" #include "storage/options.h" +#include "storage/storage_engine.h" #include "util/cpu_info.h" #include "util/debug_util.h" #include "util/disk_info.h" @@ -128,7 +129,12 @@ void* calculate_metrics(void* dummy) { std::map lst_net_send_bytes; std::map lst_net_receive_bytes; - while (true) { + StorageEngine* storage_engine = ExecEnv::GetInstance()->storage_engine(); + bool bg_worker_stopped = false; + if (storage_engine != nullptr) { + bg_worker_stopped = storage_engine->bg_worker_stopped(); + } + while (!bg_worker_stopped) { StarRocksMetrics::instance()->metrics()->trigger_hook(); if (last_ts == -1L) { @@ -174,6 +180,10 @@ void* calculate_metrics(void* dummy) { } sleep(15); // 15 seconds + storage_engine = ExecEnv::GetInstance()->storage_engine(); + if (storage_engine != nullptr) { + bg_worker_stopped = storage_engine->bg_worker_stopped(); + } } return nullptr; diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 4c6668500773a4..f734e6da96a7ca 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -230,7 +230,7 @@ Status ExecEnv::_init_mem_tracker() { return Status::OK(); } -void ExecEnv::_destory() { +void ExecEnv::_destroy() { if (_runtime_filter_worker) { delete _runtime_filter_worker; _runtime_filter_worker = nullptr; @@ -391,7 +391,7 @@ void ExecEnv::_destory() { } void ExecEnv::destroy(ExecEnv* env) { - env->_destory(); + env->_destroy(); } void ExecEnv::set_storage_engine(StorageEngine* storage_engine) { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 100e6e60316f93..c4a9cb5b536a04 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -152,7 +152,7 @@ class ExecEnv { private: Status _init(const std::vector& store_paths); - void _destory(); + void _destroy(); Status _init_mem_tracker(); diff --git a/be/src/runtime/external_scan_context_mgr.cpp b/be/src/runtime/external_scan_context_mgr.cpp index 0eb5004f6926f4..f2ecc77c780e00 100644 --- a/be/src/runtime/external_scan_context_mgr.cpp +++ b/be/src/runtime/external_scan_context_mgr.cpp @@ -33,10 +33,11 @@ namespace starrocks { -ExternalScanContextMgr::ExternalScanContextMgr(ExecEnv* exec_env) : _exec_env(exec_env), _is_stop(false) { +ExternalScanContextMgr::ExternalScanContextMgr(ExecEnv* exec_env) : _exec_env(exec_env) { // start the reaper thread for gc the expired context _keep_alive_reaper = std::make_unique( std::bind(std::mem_fn(&ExternalScanContextMgr::gc_expired_context), this)); + _keep_alive_reaper->detach(); REGISTER_GAUGE_STARROCKS_METRIC(active_scan_context_count, [this]() { std::lock_guard l(_lock); return _active_contexts.size(); @@ -98,7 +99,7 @@ Status ExternalScanContextMgr::clear_scan_context(const std::string& context_id) void ExternalScanContextMgr::gc_expired_context() { #ifndef BE_TEST - while (!_is_stop) { + while (true) { std::this_thread::sleep_for(std::chrono::seconds(starrocks::config::scan_context_gc_interval_min * 60)); time_t current_time = time(nullptr); std::vector> expired_contexts; diff --git a/be/src/runtime/external_scan_context_mgr.h b/be/src/runtime/external_scan_context_mgr.h index 48d2382fdc4918..f88d1a44f8006d 100644 --- a/be/src/runtime/external_scan_context_mgr.h +++ b/be/src/runtime/external_scan_context_mgr.h @@ -51,10 +51,7 @@ class ExternalScanContextMgr { public: ExternalScanContextMgr(ExecEnv* exec_env); - ~ExternalScanContextMgr() { - _is_stop = true; - _keep_alive_reaper->join(); - } + ~ExternalScanContextMgr() {} Status create_scan_context(std::shared_ptr* p_context); @@ -66,7 +63,6 @@ class ExternalScanContextMgr { ExecEnv* _exec_env; std::map> _active_contexts; void gc_expired_context(); - bool _is_stop; std::unique_ptr _keep_alive_reaper; std::mutex _lock; }; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index b2fbe284388c3e..7c18563b6224b4 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -62,7 +62,6 @@ class RoutineLoadTaskExecutor { _thread_pool.shutdown(); _thread_pool.join(); - LOG(INFO) << _task_map.size() << " not executed tasks left, cleanup"; for (auto& it : _task_map) { auto ctx = it.second; if (ctx->unref()) { diff --git a/be/src/service/starrocks_main.cpp b/be/src/service/starrocks_main.cpp index 823a14e6bf5364..149bef34455ceb 100644 --- a/be/src/service/starrocks_main.cpp +++ b/be/src/service/starrocks_main.cpp @@ -227,8 +227,8 @@ int main(int argc, char** argv) { } // 2. brpc service - starrocks::BRpcService brpc_service(exec_env); - status = brpc_service.start(starrocks::config::brpc_port); + std::unique_ptr brpc_service = std::make_unique(exec_env); + status = brpc_service->start(starrocks::config::brpc_port); if (!status.ok()) { LOG(ERROR) << "BRPC service did not start correctly, exiting"; starrocks::shutdown_logging(); @@ -236,9 +236,10 @@ int main(int argc, char** argv) { } // 3. http service - starrocks::HttpService http_service(exec_env, starrocks::config::webserver_port, - starrocks::config::webserver_num_workers); - status = http_service.start(); + std::unique_ptr http_service = + std::make_unique(exec_env, starrocks::config::webserver_port, + starrocks::config::webserver_num_workers); + status = http_service->start(); if (!status.ok()) { LOG(ERROR) << "Internal Error:" << status.message(); LOG(ERROR) << "StarRocks Be http service did not start correctly, exiting"; @@ -260,24 +261,34 @@ int main(int argc, char** argv) { } status = heartbeat_thrift_server->start(); - if (!status.ok()) { - LOG(ERROR) << "StarRocks BE HeartBeat Service did not start correctly, exiting"; - starrocks::shutdown_logging(); - exit(1); - } + if (!status.ok()) { + LOG(ERROR) << "Doris BE HeartBeat Service did not start correctly. Error=" + << status.to_string(); + starrocks::shutdown_logging(); + exit(1); + } else { + LOG(INFO) << "Doris BE HeartBeat Service started correctly."; + } while (!starrocks::k_starrocks_exit) { -#if defined(LEAK_SANITIZER) - __lsan_do_leak_check(); -#endif sleep(10); } heartbeat_thrift_server->stop(); heartbeat_thrift_server->join(); + delete heartbeat_thrift_server; + + http_service.reset(); + brpc_service.reset(); + be_server->stop(); be_server->join(); - delete be_server; + + engine->stop(); + delete engine; + + starrocks::ExecEnv::destroy(exec_env); + return 0; } diff --git a/be/src/storage/delete_handler.h b/be/src/storage/delete_handler.h index 8028cb71bd5062..975b24bf7ac426 100644 --- a/be/src/storage/delete_handler.h +++ b/be/src/storage/delete_handler.h @@ -83,7 +83,7 @@ struct DeleteConditions { // bool filter_data; // filter_data = delete_handler.is_filter_data(data_version, row_cursor); // 3. If there are many rows to check, call is_filter_data() repeatly -// 4. destory +// 4. destroy // delete_handler.finalize(); // // NOTE: diff --git a/be/src/storage/merger.cpp b/be/src/storage/merger.cpp index a783d2b602ed2f..de11f9113b084a 100644 --- a/be/src/storage/merger.cpp +++ b/be/src/storage/merger.cpp @@ -27,8 +27,10 @@ #include "storage/olap_define.h" #include "storage/reader.h" #include "storage/row_cursor.h" +#include "storage/storage_engine.h" #include "storage/tablet.h" #include "util/trace.h" +#include "runtime/exec_env.h" namespace starrocks { @@ -72,7 +74,8 @@ OLAPStatus Merger::merge_rowsets(int64_t mem_limit, const TabletSharedPtr& table // The following procedure would last for long time, half of one day, etc. int64_t output_rows = 0; - while (true) { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + while (true && !bg_worker_stopped) { ObjectPool objectPool; bool eof = false; // Read one row into row_cursor @@ -87,6 +90,12 @@ OLAPStatus Merger::merge_rowsets(int64_t mem_limit, const TabletSharedPtr& table // the memory allocate by mem pool has been copied, // so we should release memory immediately mem_pool->clear(); + + bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + } + + if (bg_worker_stopped) { + return OLAP_ERR_ALTER_STATUS_ERR; } if (stats_output != nullptr) { diff --git a/be/src/storage/olap_server.cpp b/be/src/storage/olap_server.cpp index ca28af30ff1607..e875a84dfe013b 100644 --- a/be/src/storage/olap_server.cpp +++ b/be/src/storage/olap_server.cpp @@ -40,14 +40,14 @@ using std::string; namespace starrocks { // TODO(yingchun): should be more graceful in the future refactor. -#define SLEEP_IN_BG_WORKER(seconds) \ - int64_t left_seconds = (seconds); \ - while (!_stop_bg_worker && left_seconds > 0) { \ - sleep(1); \ - --left_seconds; \ - } \ - if (_stop_bg_worker) { \ - break; \ +#define SLEEP_IN_BG_WORKER(seconds) \ + int64_t left_seconds = (seconds); \ + while (!_bg_worker_stopped && left_seconds > 0) { \ + sleep(1); \ + --left_seconds; \ + } \ + if (_bg_worker_stopped) { \ + break; \ } // number of running SCHEMA-CHANGE threads @@ -56,6 +56,7 @@ volatile uint32_t g_schema_change_active_threads = 0; Status StorageEngine::start_bg_threads() { _update_cache_expire_thread = std::thread([this] { _update_cache_expire_thread_callback(nullptr); }); LOG(INFO) << "update cache expire thread started"; + _update_cache_expire_thread.detach(); _unused_rowset_monitor_thread = std::thread([this] { _unused_rowset_monitor_thread_callback(nullptr); }); _unused_rowset_monitor_thread.detach(); @@ -167,7 +168,7 @@ void* StorageEngine::_fd_cache_clean_callback(void* arg) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - while (!_stop_bg_worker) { + while (!_bg_worker_stopped) { int32_t interval = config::file_descriptor_cache_clean_interval; if (interval <= 0) { LOG(WARNING) << "config of file descriptor clean interval is illegal: " << interval << "force set to 3600"; @@ -188,7 +189,7 @@ void* StorageEngine::_base_compaction_thread_callback(void* arg, DataDir* data_d //string last_base_compaction_fs; //TTabletId last_base_compaction_tablet_id = -1; Status status = Status::OK(); - while (!_stop_bg_worker) { + while (!_bg_worker_stopped) { // must be here, because this thread is start on start and if (!data_dir->reach_capacity_limit(0)) { status = _perform_base_compaction(data_dir); @@ -213,7 +214,7 @@ void* StorageEngine::_update_compaction_thread_callback(void* arg, DataDir* data ProfilerRegisterThread(); #endif Status status = Status::OK(); - while (!_stop_bg_worker) { + while (!_bg_worker_stopped) { // must be here, because this thread is start on start and if (!data_dir->reach_capacity_limit(0)) { status = _perform_update_compaction(data_dir); @@ -250,7 +251,7 @@ void* StorageEngine::_garbage_sweeper_thread_callback(void* arg) { const double pi = 4 * std::atan(1); double usage = 1.0; - while (!_stop_bg_worker) { + while (!_bg_worker_stopped) { usage *= 100.0; // when disk usage is less than 60%, ratio is about 1; // when disk usage is between [60%, 75%], ratio drops from 0.87 to 0.27; @@ -281,7 +282,7 @@ void* StorageEngine::_disk_stat_monitor_thread_callback(void* arg) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - while (!_stop_bg_worker) { + while (!_bg_worker_stopped) { _start_disk_stat_monitor(); int32_t interval = config::disk_stat_monitor_interval; @@ -302,7 +303,7 @@ void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir* LOG(INFO) << "try to start cumulative compaction process!"; Status status = Status::OK(); - while (!_stop_bg_worker) { + while (!_bg_worker_stopped) { // must be here, because this thread is start on start and if (!data_dir->reach_capacity_limit(0)) { status = _perform_cumulative_compaction(data_dir); @@ -327,7 +328,7 @@ void* StorageEngine::_update_cache_expire_thread_callback(void* arg) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - while (!_stop_bg_worker) { + while (!_bg_worker_stopped) { int32_t expire_sec = config::update_cache_expire_sec; if (expire_sec <= 0) { LOG(WARNING) << "update_cache_expire_sec config is illegal: " << expire_sec << ", force set to 360"; @@ -346,7 +347,7 @@ void* StorageEngine::_unused_rowset_monitor_thread_callback(void* arg) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - while (!_stop_bg_worker) { + while (!_bg_worker_stopped) { start_delete_unused_rowset(); int32_t interval = config::unused_rowset_monitor_interval; @@ -367,7 +368,7 @@ void* StorageEngine::_path_gc_thread_callback(void* arg) { LOG(INFO) << "try to start path gc thread!"; - while (!_stop_bg_worker) { + while (!_bg_worker_stopped) { LOG(INFO) << "try to perform path gc by tablet!"; ((DataDir*)arg)->perform_path_gc_by_tablet(); @@ -393,13 +394,13 @@ void* StorageEngine::_path_scan_thread_callback(void* arg) { #endif LOG(INFO) << "wait 10min to start path scan thread"; - while (!_stop_bg_worker) { + while (!_bg_worker_stopped) { SLEEP_IN_BG_WORKER(600); break; } LOG(INFO) << "try to start path scan thread!"; - while (!_stop_bg_worker) { + while (!_bg_worker_stopped) { LOG(INFO) << "try to perform path scan!"; ((DataDir*)arg)->perform_path_scan(); @@ -420,7 +421,7 @@ void* StorageEngine::_tablet_checkpoint_callback(void* arg) { ProfilerRegisterThread(); #endif LOG(INFO) << "try to start tablet meta checkpoint thread!"; - while (!_stop_bg_worker) { + while (!_bg_worker_stopped) { LOG(INFO) << "begin to do tablet meta checkpoint:" << ((DataDir*)arg)->path(); int64_t start_time = UnixMillis(); _tablet_manager->do_tablet_meta_checkpoint((DataDir*)arg); diff --git a/be/src/storage/push_handler.cpp b/be/src/storage/push_handler.cpp index 89256ea01a25e3..d954564ca9d1a9 100644 --- a/be/src/storage/push_handler.cpp +++ b/be/src/storage/push_handler.cpp @@ -385,7 +385,8 @@ OLAPStatus PushHandler::_convert_v2(const TabletSharedPtr& cur_tablet, RowsetSha // 4. Read data from broker and write into Rowset of cur_tablet VLOG(3) << "start to convert etl file to delta."; - while (!reader->eof()) { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + while (!reader->eof() && !bg_worker_stopped) { res = reader->next(&row); if (OLAP_SUCCESS != res) { LOG(WARNING) << "read next row failed." @@ -403,7 +404,14 @@ OLAPStatus PushHandler::_convert_v2(const TabletSharedPtr& cur_tablet, RowsetSha } num_rows++; } + bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + } + + if (bg_worker_stopped) { + res = OLAP_ERR_PUSH_INPUT_DATA_ERROR; + break; } + if (res != OLAP_SUCCESS) { reader->close(); break; diff --git a/be/src/storage/schema_change.cpp b/be/src/storage/schema_change.cpp index 1620ef38c9491f..52a705e4a8e4fe 100644 --- a/be/src/storage/schema_change.cpp +++ b/be/src/storage/schema_change.cpp @@ -938,7 +938,9 @@ bool RowBlockMerger::merge(const std::vector& row_block_arr, RowsetWr row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema()); - while (!_heap.empty()) { + StorageEngine* storage_engine = ExecEnv::GetInstance()->storage_engine(); + bool bg_worker_stopped = storage_engine->bg_worker_stopped(); + while (!_heap.empty() && !bg_worker_stopped) { init_row_with_others(&row_cursor, *(_heap.top().row_cursor), mem_pool.get(), agg_object_pool.get()); if (!_pop_heap()) { @@ -976,7 +978,13 @@ bool RowBlockMerger::merge(const std::vector& row_block_arr, RowsetWr // so we should release memory immediately mem_pool->clear(); agg_object_pool = std::make_unique(); + bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); } + + if (bg_worker_stopped) { + return false; + } + if (rowset_writer->flush() != OLAP_SUCCESS) { LOG(WARNING) << "failed to finalizing writer."; process_err(); @@ -1128,7 +1136,10 @@ bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWr RowBlock* ref_row_block = nullptr; rowset_reader->next_block(&ref_row_block); - while (ref_row_block != nullptr && ref_row_block->has_remaining()) { + + StorageEngine* storage_engine = ExecEnv::GetInstance()->storage_engine(); + bool bg_worker_stopped = storage_engine->bg_worker_stopped(); + while (!bg_worker_stopped && ref_row_block != nullptr && ref_row_block->has_remaining()) { #ifndef BE_TEST Status st = tls_thread_status.mem_tracker()->check_mem_limit("DirectSchemaChange"); if (!st.ok()) { @@ -1171,6 +1182,12 @@ bool SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader, RowsetWr ref_row_block->clear(); rowset_reader->next_block(&ref_row_block); + bg_worker_stopped = storage_engine->bg_worker_stopped(); + } + + if (bg_worker_stopped) { + result = false; + goto DIRECTLY_PROCESS_ERR; } if (OLAP_SUCCESS != rowset_writer->flush()) { diff --git a/be/src/storage/storage_engine.cpp b/be/src/storage/storage_engine.cpp index ae8903bf33f25f..578990e97bc397 100644 --- a/be/src/storage/storage_engine.cpp +++ b/be/src/storage/storage_engine.cpp @@ -126,7 +126,11 @@ StorageEngine::StorageEngine(const EngineOptions& options) } StorageEngine::~StorageEngine() { - _clear(); +#ifdef BE_TEST + if (_s_instance == this) { + _s_instance = nullptr; + } +#endif } void StorageEngine::load_data_dirs(const std::vector& data_dirs) { @@ -454,24 +458,19 @@ bool StorageEngine::_delete_tablets_on_unused_root_path() { return !tablet_info_vec.empty(); } -void StorageEngine::_clear() { - SAFE_DELETE(_index_stream_lru_cache); - _file_cache.reset(); - - std::lock_guard l(_store_lock); - for (auto& store_pair : _store_map) { - store_pair.second->stop_bg_worker(); - delete store_pair.second; - store_pair.second = nullptr; +void StorageEngine::stop() { + { + std::lock_guard l(_store_lock); + for (auto& store_pair : _store_map) { + store_pair.second->stop_bg_worker(); + delete store_pair.second; + store_pair.second = nullptr; + } + _store_map.clear(); } - _store_map.clear(); + _bg_worker_stopped = true; - _stop_bg_worker = true; -#ifdef BE_TEST - if (_s_instance == this) { - _s_instance = nullptr; - } -#endif + sleep(30); // wait five seconds to exit all threads gracefully } void StorageEngine::clear_transaction_task(const TTransactionId transaction_id) { diff --git a/be/src/storage/storage_engine.h b/be/src/storage/storage_engine.h index 98ee54b99c5c48..acb532681d6f42 100644 --- a/be/src/storage/storage_engine.h +++ b/be/src/storage/storage_engine.h @@ -171,13 +171,15 @@ class StorageEngine { // start all backgroud threads. This should be call after env is ready. Status start_bg_threads(); + void stop(); + + bool bg_worker_stopped() { return _bg_worker_stopped; } + private: // Instance should be inited from `static open()` // MUST NOT be called in other circumstances. Status _open(); - // Clear status(tables, ...) - void _clear(); Status _init_store_map(); @@ -265,7 +267,6 @@ class StorageEngine { int32_t _effective_cluster_id; bool _is_all_cluster_id_exist; - Cache* _file_descriptor_lru_cache = nullptr; Cache* _index_stream_lru_cache = nullptr; // _file_cache is a lru_cache for file descriptors of files opened by starrocks, @@ -283,7 +284,7 @@ class StorageEngine { // map, if we use RowsetId as the key, we need custom hash func std::unordered_map _unused_rowsets; - bool _stop_bg_worker = false; + bool _bg_worker_stopped = false; // thread to expire update cache; std::thread _update_cache_expire_thread; std::thread _unused_rowset_monitor_thread; diff --git a/be/src/storage/task/engine_checksum_task.cpp b/be/src/storage/task/engine_checksum_task.cpp index 79480ace5f8029..74f17239f14032 100644 --- a/be/src/storage/task/engine_checksum_task.cpp +++ b/be/src/storage/task/engine_checksum_task.cpp @@ -24,6 +24,7 @@ #include #include "runtime/current_thread.h" +#include "runtime/exec_env.h" #include "storage/reader.h" #include "storage/vectorized/chunk_helper.h" #include "storage/vectorized/tablet_reader.h" @@ -115,7 +116,8 @@ OLAPStatus EngineChecksumTask::_compute_checksum() { auto chunk = vectorized::ChunkHelper::new_chunk(schema, reader_params.chunk_size); st = reader.get_next(chunk.get()); - while (st.ok()) { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + while (st.ok() && !bg_worker_stopped) { #ifndef BE_TEST Status st = _mem_tracker->check_mem_limit("ConsistencyCheck"); if (!st.ok()) { @@ -133,6 +135,7 @@ OLAPStatus EngineChecksumTask::_compute_checksum() { } chunk->reset(); st = reader.get_next(chunk.get()); + bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); } if (!st.is_end_of_file() && !st.ok()) { diff --git a/be/src/storage/task/engine_clone_task.cpp b/be/src/storage/task/engine_clone_task.cpp index e095073df69052..cebe2a30114d84 100644 --- a/be/src/storage/task/engine_clone_task.cpp +++ b/be/src/storage/task/engine_clone_task.cpp @@ -294,6 +294,12 @@ Status EngineCloneTask::_clone_copy(DataDir& data_dir, const string& local_data_ Status EngineCloneTask::_make_snapshot(const std::string& ip, int port, TTableId tablet_id, TSchemaHash schema_hash, int timeout_s, const std::vector* missed_versions, std::string* snapshot_path, int32_t* snapshot_format) { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + if (!bg_worker_stopped) { + return Status::InternalError( + "Process is going to quit. The snapshot should be stopped as soon as possible."); + } + TSnapshotRequest request; request.__set_tablet_id(tablet_id); request.__set_schema_hash(schema_hash); @@ -332,6 +338,12 @@ Status EngineCloneTask::_make_snapshot(const std::string& ip, int port, TTableId } Status EngineCloneTask::_release_snapshot(const std::string& ip, int port, const std::string& snapshot_path) { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + if (!bg_worker_stopped) { + return Status::InternalError( + "Process is going to quit. The snapshot should be stopped as soon as possible."); + } + TAgentResult result; RETURN_IF_ERROR(ThriftRpcHelper::rpc( ip, port, [&snapshot_path, &result](BackendServiceConnection& client) { @@ -342,6 +354,12 @@ Status EngineCloneTask::_release_snapshot(const std::string& ip, int port, const Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& remote_url_prefix, const std::string& local_path) { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + if (!bg_worker_stopped) { + return Status::InternalError( + "Process is going to quit. The download should be stopped as soon as possible."); + } + // Check local path exist, if exist, remove it, then create the dir // local_file_full_path = tabletid/clone. for a specific tablet, there should be only one folder // if this folder exists, then should remove it @@ -366,6 +384,11 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re // Avoid of data is not complete, we copy the header file at last. // The header file's name is end of .hdr. for (int i = 0; i < file_name_list.size() - 1; ++i) { + bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + if (!bg_worker_stopped) { + return Status::InternalError( + "Process is going to quit. The download should be stopped as soon as possible."); + } StringPiece sp(file_name_list[i]); if (sp.ends_with(".hdr")) { std::swap(file_name_list[i], file_name_list[file_name_list.size() - 1]); @@ -437,6 +460,12 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re Status EngineCloneTask::_finish_clone(Tablet* tablet, const string& clone_dir, int64_t committed_version, bool incremental_clone) { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + if (!bg_worker_stopped) { + return Status::InternalError( + "Process is going to quit. The clone should be stopped as soon as possible."); + } + if (tablet->updates() != nullptr) { return _finish_clone_primary(tablet, clone_dir); } @@ -486,6 +515,12 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const string& clone_dir, i // link files from clone dir, if file exists, skip it for (const string& clone_file : clone_files) { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + if (!bg_worker_stopped) { + return Status::InternalError( + "Process is going to quit. The clone should be stopped as soon as possible."); + } + if (local_files.find(clone_file) != local_files.end()) { VLOG(3) << "find same file when clone, skip it. " << "tablet=" << tablet->full_name() << ", clone_file=" << clone_file; @@ -528,6 +563,12 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const string& clone_dir, i Status EngineCloneTask::_clone_incremental_data(Tablet* tablet, const TabletMeta& cloned_tablet_meta, int64_t committed_version) { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + if (!bg_worker_stopped) { + return Status::InternalError( + "Process is going to quit. The clone should be stopped as soon as possible."); + } + LOG(INFO) << "begin to incremental clone. tablet=" << tablet->full_name() << ", committed_version=" << committed_version; @@ -560,6 +601,12 @@ Status EngineCloneTask::_clone_incremental_data(Tablet* tablet, const TabletMeta } Status EngineCloneTask::_clone_full_data(Tablet* tablet, TabletMeta* cloned_tablet_meta) { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + if (!bg_worker_stopped) { + return Status::InternalError( + "Process is going to quit. The clone should be stopped as soon as possible."); + } + Version cloned_max_version = cloned_tablet_meta->max_version(); LOG(INFO) << "begin to full clone. tablet=" << tablet->full_name() << ", cloned_max_version=" << cloned_max_version.first << "-" << cloned_max_version.second; @@ -649,6 +696,12 @@ Status EngineCloneTask::_clone_full_data(Tablet* tablet, TabletMeta* cloned_tabl } Status EngineCloneTask::_finish_clone_primary(Tablet* tablet, const std::string& clone_dir) { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + if (!bg_worker_stopped) { + return Status::InternalError( + "Process is going to quit. The snapshot should be stopped as soon as possible."); + } + auto meta_file = strings::Substitute("$0/meta", clone_dir); auto res = SnapshotManager::instance()->parse_snapshot_meta(meta_file); if (!res.ok()) { diff --git a/be/src/storage/task/engine_storage_migration_task.cpp b/be/src/storage/task/engine_storage_migration_task.cpp index 2c69672361a8bf..25df078f3eedc2 100644 --- a/be/src/storage/task/engine_storage_migration_task.cpp +++ b/be/src/storage/task/engine_storage_migration_task.cpp @@ -24,6 +24,7 @@ #include "storage/snapshot_manager.h" #include "storage/tablet_meta_manager.h" #include "util/defer_op.h" +#include "runtime/exec_env.h" namespace starrocks { @@ -63,6 +64,12 @@ OLAPStatus EngineStorageMigrationTask::execute() { } OLAPStatus EngineStorageMigrationTask::_storage_migrate(TabletSharedPtr tablet) { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + if (!bg_worker_stopped) { + LOG(WARNING) << "Process is going to quit. The migration should be stopped as soon as possible."; + return OLAP_ERR_OTHER_ERROR; + } + OLAPStatus res = OLAP_SUCCESS; LOG(INFO) << "begin to process storage migrate. tablet_id=" << _tablet_id << ", schema_hash=" << _schema_hash << ", tablet=" << tablet->full_name() << ", dest_store=" << _dest_store->path(); @@ -322,6 +329,11 @@ OLAPStatus EngineStorageMigrationTask::_copy_index_and_data_files( const std::vector& consistent_rowsets) const { OLAPStatus status = OLAP_SUCCESS; for (const auto& rs : consistent_rowsets) { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + if (!bg_worker_stopped) { + status = OLAP_ERR_OTHER_ERROR; + break; + } status = rs->copy_files_to(schema_hash_path); if (status != OLAP_SUCCESS) { break; diff --git a/be/src/storage/update_manager.cpp b/be/src/storage/update_manager.cpp index 3dadcc9d4ab311..5cd694d6f0aa42 100644 --- a/be/src/storage/update_manager.cpp +++ b/be/src/storage/update_manager.cpp @@ -45,6 +45,7 @@ UpdateManager::~UpdateManager() { if (_index_cache_mem_tracker) { _index_cache_mem_tracker.reset(); } + _apply_thread_pool->shutdown(); } Status UpdateManager::init() { diff --git a/be/src/storage/vectorized/compaction.cpp b/be/src/storage/vectorized/compaction.cpp index 0892c20b92c2b5..eb974519b084b8 100644 --- a/be/src/storage/vectorized/compaction.cpp +++ b/be/src/storage/vectorized/compaction.cpp @@ -6,6 +6,7 @@ #include "gutil/strings/substitute.h" #include "runtime/current_thread.h" +#include "runtime/exec_env.h" #include "storage/rowset/rowset_factory.h" #include "storage/vectorized/chunk_helper.h" #include "storage/vectorized/tablet_reader.h" @@ -96,7 +97,7 @@ Status Compaction::do_compaction_impl() { TRACE("check correctness finished"); // 4. modify rowsets in memory - modify_rowsets(); + RETURN_IF_ERROR(modify_rowsets()); TRACE("modify rowsets finished"); // 5. update last success compaction time @@ -188,6 +189,8 @@ Status Compaction::merge_rowsets(int64_t mem_limit, Statistics* stats_output) { } #endif + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + while (!bg_worker_stopped) { chunk->reset(); Status status = reader.get_next(chunk.get()); if (!status.ok()) { @@ -206,6 +209,13 @@ Status Compaction::merge_rowsets(int64_t mem_limit, Statistics* stats_output) { return Status::InternalError("writer add_chunk error."); } output_rows += chunk->num_rows(); + + bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + } + + if (bg_worker_stopped) { + return Status::InternalError( + "Process is going to quit. The compaction should be stopped as soon as possible."); } if (stats_output != nullptr) { @@ -224,13 +234,21 @@ Status Compaction::merge_rowsets(int64_t mem_limit, Statistics* stats_output) { return Status::OK(); } -void Compaction::modify_rowsets() { +Status Compaction::modify_rowsets() { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + if (bg_worker_stopped) { + return Status::InternalError( + "Process is going to quit. The compaction should be stopped as soon as possible."); + } + std::vector output_rowsets; output_rowsets.push_back(_output_rowset); std::unique_lock wrlock(_tablet->get_header_lock()); _tablet->modify_rowsets(output_rowsets, _input_rowsets); _tablet->save_meta(); + + return Status::OK(); } Status Compaction::check_version_continuity(const std::vector& rowsets) { diff --git a/be/src/storage/vectorized/compaction.h b/be/src/storage/vectorized/compaction.h index 7a4775987bd0ee..53c070c3e982fd 100644 --- a/be/src/storage/vectorized/compaction.h +++ b/be/src/storage/vectorized/compaction.h @@ -57,7 +57,7 @@ class Compaction { // return others on error Status merge_rowsets(int64_t mem_limit, Statistics* stats_output); - void modify_rowsets(); + Status modify_rowsets(); Status construct_output_rowset_writer(); diff --git a/be/src/storage/vectorized/schema_change.cpp b/be/src/storage/vectorized/schema_change.cpp index f9e0ced2dd841f..5f702f21f9e088 100644 --- a/be/src/storage/vectorized/schema_change.cpp +++ b/be/src/storage/vectorized/schema_change.cpp @@ -579,7 +579,9 @@ bool ChunkMerger::merge(std::vector& chunk_arr, RowsetWriter* rowset_w _aggregator = std::make_unique(&new_schema, config::vector_chunk_size, 0); } - while (!_heap.empty()) { + StorageEngine* storage_engine = ExecEnv::GetInstance()->storage_engine(); + bool bg_worker_stopped = storage_engine->bg_worker_stopped(); + while (!_heap.empty() && !bg_worker_stopped) { if (tmp_chunk->reach_capacity_limit() || nread >= config::vector_chunk_size) { if (_tablet->keys_type() == KeysType::AGG_KEYS) { aggregate_chunk(*_aggregator, tmp_chunk, rowset_writer); @@ -596,6 +598,11 @@ bool ChunkMerger::merge(std::vector& chunk_arr, RowsetWriter* rowset_w process_err(); return false; } + bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + } + + if (bg_worker_stopped) { + return false; } if (_tablet->keys_type() == KeysType::AGG_KEYS) { @@ -676,6 +683,8 @@ bool SchemaChangeDirectly::process(vectorized::TabletReader* reader, RowsetWrite std::unique_ptr mem_pool(new MemPool()); do { + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + if (bg_worker_stopped) { return false; } #ifndef BE_TEST Status st = tls_thread_status.mem_tracker()->check_mem_limit("DirectSchemaChange"); if (!st.ok()) { @@ -763,7 +772,9 @@ bool SchemaChangeWithSorting::process(vectorized::TabletReader* reader, RowsetWr this->_chunk_allocator->release(it, it->num_rows()); } }); - while (true) { + StorageEngine* storage_engine = ExecEnv::GetInstance()->storage_engine(); + bool bg_worker_stopped = storage_engine->bg_worker_stopped(); + while (!bg_worker_stopped) { #ifndef BE_TEST Status st = tls_thread_status.mem_tracker()->check_mem_limit("SortSchemaChange"); if (!st.ok()) { @@ -824,6 +835,11 @@ bool SchemaChangeWithSorting::process(vectorized::TabletReader* reader, RowsetWr chunk_arr.push_back(new_chunk); mem_pool->clear(); + bg_worker_stopped = storage_engine->bg_worker_stopped(); + } + + if (bg_worker_stopped) { + return false; } if (!chunk_arr.empty()) { From f803325f52cb959622ea5ebb229245e64e8f8845 Mon Sep 17 00:00:00 2001 From: chaoyli Date: Mon, 8 Nov 2021 21:36:14 +0800 Subject: [PATCH 02/10] Exit the process gracefully. It's to used to detect memory leak upon exiting the process. Every thread in StorageEngine is already check the existed state. If the existed state is set to be true, the thread will be exited. But the the thread and the class in StarRocks is not seperated very well. So the pull request do the best effort to the graceful exiting. --- be/src/agent/task_worker_pool.cpp | 32 +++++++++++++++++++++++++++++++ be/src/common/daemon.cpp | 12 +++++++++++- be/src/runtime/current_thread.h | 2 +- be/src/runtime/exec_env.cpp | 4 ---- be/src/storage/storage_engine.cpp | 3 +++ 5 files changed, 47 insertions(+), 6 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 28e96625ee60ce..dbf476bfcfec9d 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -302,9 +302,11 @@ void* TaskWorkerPool::_create_tablet_worker_thread_callback(void* arg_this) { while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); create_tablet_req = agent_task_req.create_tablet_req; @@ -370,9 +372,11 @@ void* TaskWorkerPool::_drop_tablet_worker_thread_callback(void* arg_this) { while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); drop_tablet_req = agent_task_req.drop_tablet_req; @@ -425,9 +429,11 @@ void* TaskWorkerPool::_alter_tablet_worker_thread_callback(void* arg_this) { while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); worker_pool_this->_tasks.pop_front(); @@ -576,9 +582,11 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif index = worker_pool_this->_get_next_task_index( config::push_worker_count_normal_priority + config::push_worker_count_high_priority, @@ -595,9 +603,11 @@ void* TaskWorkerPool::_push_worker_thread_callback(void* arg_this) { worker_pool_this->_tasks.erase(worker_pool_this->_tasks.begin() + index); } while (false); +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif #ifndef BE_TEST if (index < 0) { @@ -678,9 +688,11 @@ void* TaskWorkerPool::_publish_version_worker_thread_callback(void* arg_this) { while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); publish_version_req = agent_task_req.publish_version_req; @@ -746,9 +758,11 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); clear_transaction_task_req = agent_task_req.clear_transaction_task_req; @@ -807,9 +821,11 @@ void* TaskWorkerPool::_update_tablet_meta_worker_thread_callback(void* arg_this) while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); update_tablet_meta_req = agent_task_req.update_tablet_meta_info_req; @@ -879,9 +895,11 @@ void* TaskWorkerPool::_clone_worker_thread_callback(void* arg_this) { while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); clone_req = agent_task_req.clone_req; @@ -979,9 +997,11 @@ void* TaskWorkerPool::_storage_medium_migrate_worker_thread_callback(void* arg_t while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); storage_medium_migrate_req = agent_task_req.storage_medium_migrate_req; @@ -1082,9 +1102,11 @@ void* TaskWorkerPool::_check_consistency_worker_thread_callback(void* arg_this) while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); check_consistency_req = agent_task_req.check_consistency_req; @@ -1289,9 +1311,11 @@ void* TaskWorkerPool::_upload_worker_thread_callback(void* arg_this) { while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); upload_request = agent_task_req.upload_req; @@ -1347,9 +1371,11 @@ void* TaskWorkerPool::_download_worker_thread_callback(void* arg_this) { while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); download_request = agent_task_req.download_req; @@ -1407,9 +1433,11 @@ void* TaskWorkerPool::_make_snapshot_thread_callback(void* arg_this) { while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); snapshot_request = agent_task_req.snapshot_req; @@ -1485,9 +1513,11 @@ void* TaskWorkerPool::_release_snapshot_thread_callback(void* arg_this) { while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); release_snapshot_request = agent_task_req.release_snapshot_req; @@ -1555,9 +1585,11 @@ void* TaskWorkerPool::_move_dir_thread_callback(void* arg_this) { while (worker_pool_this->_tasks.empty() && !(worker_pool_this->_stopped)) { worker_pool_this->_worker_thread_condition_variable->wait(l); } +#ifndef BE_TEST if (worker_pool_this->_stopped) { break; } +#endif agent_task_req = worker_pool_this->_tasks.front(); move_dir_req = agent_task_req.move_dir_req; diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 23c6a1326aebb0..8cbd7e9ae267fa 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -72,7 +72,12 @@ void* tcmalloc_gc_thread(void* dummy) { using namespace starrocks::vectorized; const static float kFreeRatio = 0.5; GCHelper gch(config::tc_gc_period, config::memory_maintenance_sleep_time_s, MonoTime::Now()); - while (true) { + StorageEngine* storage_engine = ExecEnv::GetInstance()->storage_engine(); + bool bg_worker_stopped = false; + if (storage_engine != nullptr) { + bg_worker_stopped = storage_engine->bg_worker_stopped(); + } + while (!bg_worker_stopped) { sleep(config::memory_maintenance_sleep_time_s); #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) MallocExtension::instance()->MarkThreadBusy(); @@ -107,6 +112,11 @@ void* tcmalloc_gc_thread(void* dummy) { } MallocExtension::instance()->MarkThreadIdle(); #endif + + storage_engine = ExecEnv::GetInstance()->storage_engine(); + if (storage_engine != nullptr) { + bg_worker_stopped = storage_engine->bg_worker_stopped(); + } } return nullptr; diff --git a/be/src/runtime/current_thread.h b/be/src/runtime/current_thread.h index 5ea7c51b02f2e0..42bf4b1b2e850d 100644 --- a/be/src/runtime/current_thread.h +++ b/be/src/runtime/current_thread.h @@ -91,4 +91,4 @@ class CurrentThread { }; inline thread_local CurrentThread tls_thread_status; -} // namespace starrocks \ No newline at end of file +} // namespace starrocks diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index f734e6da96a7ca..122318c711b09d 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -255,10 +255,6 @@ void ExecEnv::_destroy() { delete _stream_load_executor; _stream_load_executor = nullptr; } - if (_storage_engine) { - delete _storage_engine; - _storage_engine = nullptr; - } if (_brpc_stub_cache) { delete _brpc_stub_cache; _brpc_stub_cache = nullptr; diff --git a/be/src/storage/storage_engine.cpp b/be/src/storage/storage_engine.cpp index 578990e97bc397..e200ca5db18b14 100644 --- a/be/src/storage/storage_engine.cpp +++ b/be/src/storage/storage_engine.cpp @@ -471,6 +471,9 @@ void StorageEngine::stop() { _bg_worker_stopped = true; sleep(30); // wait five seconds to exit all threads gracefully + + SAFE_DELETE(_index_stream_lru_cache); + _file_cache.reset(); } void StorageEngine::clear_transaction_task(const TTransactionId transaction_id) { From b0c2469c8b4c9577fcba681dd95a5191cf5c313d Mon Sep 17 00:00:00 2001 From: chaoyli Date: Tue, 9 Nov 2021 12:46:45 +0800 Subject: [PATCH 03/10] fix unit test bug --- be/src/agent/task_worker_pool.cpp | 2 +- be/src/service/starrocks_main.cpp | 20 +++-- be/src/storage/delete_handler.h | 2 +- be/src/storage/merger.cpp | 2 +- be/src/storage/olap_server.cpp | 16 ++-- be/src/storage/push_handler.cpp | 2 +- be/src/storage/storage_engine.cpp | 2 +- be/src/storage/storage_engine.h | 3 +- be/src/storage/task/engine_clone_task.cpp | 21 ++---- .../task/engine_storage_migration_task.cpp | 2 +- be/src/storage/vectorized/compaction.cpp | 74 +++++++++---------- be/src/storage/vectorized/schema_change.cpp | 4 +- .../external_scan_context_mgr_test.cpp | 4 - 13 files changed, 71 insertions(+), 83 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index dbf476bfcfec9d..1c41363adbed9d 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1271,7 +1271,7 @@ void* TaskWorkerPool::_report_tablet_worker_thread_callback(void* arg_this) { StorageEngine::instance()->wait_for_report_notify(config::report_tablet_interval_seconds, true); continue; #else - return (void*)0; + return (void*)0; #endif } int64_t max_compaction_score = diff --git a/be/src/service/starrocks_main.cpp b/be/src/service/starrocks_main.cpp index 149bef34455ceb..54a5ff2116ec98 100644 --- a/be/src/service/starrocks_main.cpp +++ b/be/src/service/starrocks_main.cpp @@ -236,9 +236,8 @@ int main(int argc, char** argv) { } // 3. http service - std::unique_ptr http_service = - std::make_unique(exec_env, starrocks::config::webserver_port, - starrocks::config::webserver_num_workers); + std::unique_ptr http_service = std::make_unique( + exec_env, starrocks::config::webserver_port, starrocks::config::webserver_num_workers); status = http_service->start(); if (!status.ok()) { LOG(ERROR) << "Internal Error:" << status.message(); @@ -261,14 +260,13 @@ int main(int argc, char** argv) { } status = heartbeat_thrift_server->start(); - if (!status.ok()) { - LOG(ERROR) << "Doris BE HeartBeat Service did not start correctly. Error=" - << status.to_string(); - starrocks::shutdown_logging(); - exit(1); - } else { - LOG(INFO) << "Doris BE HeartBeat Service started correctly."; - } + if (!status.ok()) { + LOG(ERROR) << "Doris BE HeartBeat Service did not start correctly. Error=" << status.to_string(); + starrocks::shutdown_logging(); + exit(1); + } else { + LOG(INFO) << "Doris BE HeartBeat Service started correctly."; + } while (!starrocks::k_starrocks_exit) { sleep(10); diff --git a/be/src/storage/delete_handler.h b/be/src/storage/delete_handler.h index 975b24bf7ac426..6d4ffcb2600dd8 100644 --- a/be/src/storage/delete_handler.h +++ b/be/src/storage/delete_handler.h @@ -83,7 +83,7 @@ struct DeleteConditions { // bool filter_data; // filter_data = delete_handler.is_filter_data(data_version, row_cursor); // 3. If there are many rows to check, call is_filter_data() repeatly -// 4. destroy +// 4. destroy // delete_handler.finalize(); // // NOTE: diff --git a/be/src/storage/merger.cpp b/be/src/storage/merger.cpp index de11f9113b084a..4f8c65569a82ef 100644 --- a/be/src/storage/merger.cpp +++ b/be/src/storage/merger.cpp @@ -24,13 +24,13 @@ #include #include +#include "runtime/exec_env.h" #include "storage/olap_define.h" #include "storage/reader.h" #include "storage/row_cursor.h" #include "storage/storage_engine.h" #include "storage/tablet.h" #include "util/trace.h" -#include "runtime/exec_env.h" namespace starrocks { diff --git a/be/src/storage/olap_server.cpp b/be/src/storage/olap_server.cpp index e875a84dfe013b..516132b5519db3 100644 --- a/be/src/storage/olap_server.cpp +++ b/be/src/storage/olap_server.cpp @@ -40,14 +40,14 @@ using std::string; namespace starrocks { // TODO(yingchun): should be more graceful in the future refactor. -#define SLEEP_IN_BG_WORKER(seconds) \ - int64_t left_seconds = (seconds); \ - while (!_bg_worker_stopped && left_seconds > 0) { \ - sleep(1); \ - --left_seconds; \ - } \ - if (_bg_worker_stopped) { \ - break; \ +#define SLEEP_IN_BG_WORKER(seconds) \ + int64_t left_seconds = (seconds); \ + while (!_bg_worker_stopped && left_seconds > 0) { \ + sleep(1); \ + --left_seconds; \ + } \ + if (_bg_worker_stopped) { \ + break; \ } // number of running SCHEMA-CHANGE threads diff --git a/be/src/storage/push_handler.cpp b/be/src/storage/push_handler.cpp index d954564ca9d1a9..ea390e0ba3f124 100644 --- a/be/src/storage/push_handler.cpp +++ b/be/src/storage/push_handler.cpp @@ -405,7 +405,7 @@ OLAPStatus PushHandler::_convert_v2(const TabletSharedPtr& cur_tablet, RowsetSha num_rows++; } bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); - } + } if (bg_worker_stopped) { res = OLAP_ERR_PUSH_INPUT_DATA_ERROR; diff --git a/be/src/storage/storage_engine.cpp b/be/src/storage/storage_engine.cpp index e200ca5db18b14..c00cdd87b893e6 100644 --- a/be/src/storage/storage_engine.cpp +++ b/be/src/storage/storage_engine.cpp @@ -464,7 +464,7 @@ void StorageEngine::stop() { for (auto& store_pair : _store_map) { store_pair.second->stop_bg_worker(); delete store_pair.second; - store_pair.second = nullptr; + store_pair.second = nullptr; } _store_map.clear(); } diff --git a/be/src/storage/storage_engine.h b/be/src/storage/storage_engine.h index acb532681d6f42..c17182895ade50 100644 --- a/be/src/storage/storage_engine.h +++ b/be/src/storage/storage_engine.h @@ -172,7 +172,7 @@ class StorageEngine { Status start_bg_threads(); void stop(); - + bool bg_worker_stopped() { return _bg_worker_stopped; } private: @@ -180,7 +180,6 @@ class StorageEngine { // MUST NOT be called in other circumstances. Status _open(); - Status _init_store_map(); void _update_storage_medium_type_count(); diff --git a/be/src/storage/task/engine_clone_task.cpp b/be/src/storage/task/engine_clone_task.cpp index cebe2a30114d84..1a1c35ad16228c 100644 --- a/be/src/storage/task/engine_clone_task.cpp +++ b/be/src/storage/task/engine_clone_task.cpp @@ -296,8 +296,7 @@ Status EngineCloneTask::_make_snapshot(const std::string& ip, int port, TTableId std::string* snapshot_path, int32_t* snapshot_format) { bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); if (!bg_worker_stopped) { - return Status::InternalError( - "Process is going to quit. The snapshot should be stopped as soon as possible."); + return Status::InternalError("Process is going to quit. The snapshot should be stopped as soon as possible."); } TSnapshotRequest request; @@ -340,8 +339,7 @@ Status EngineCloneTask::_make_snapshot(const std::string& ip, int port, TTableId Status EngineCloneTask::_release_snapshot(const std::string& ip, int port, const std::string& snapshot_path) { bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); if (!bg_worker_stopped) { - return Status::InternalError( - "Process is going to quit. The snapshot should be stopped as soon as possible."); + return Status::InternalError("Process is going to quit. The snapshot should be stopped as soon as possible."); } TAgentResult result; @@ -356,8 +354,7 @@ Status EngineCloneTask::_download_files(DataDir* data_dir, const std::string& re const std::string& local_path) { bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); if (!bg_worker_stopped) { - return Status::InternalError( - "Process is going to quit. The download should be stopped as soon as possible."); + return Status::InternalError("Process is going to quit. The download should be stopped as soon as possible."); } // Check local path exist, if exist, remove it, then create the dir @@ -462,8 +459,7 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const string& clone_dir, i bool incremental_clone) { bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); if (!bg_worker_stopped) { - return Status::InternalError( - "Process is going to quit. The clone should be stopped as soon as possible."); + return Status::InternalError("Process is going to quit. The clone should be stopped as soon as possible."); } if (tablet->updates() != nullptr) { @@ -565,8 +561,7 @@ Status EngineCloneTask::_clone_incremental_data(Tablet* tablet, const TabletMeta int64_t committed_version) { bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); if (!bg_worker_stopped) { - return Status::InternalError( - "Process is going to quit. The clone should be stopped as soon as possible."); + return Status::InternalError("Process is going to quit. The clone should be stopped as soon as possible."); } LOG(INFO) << "begin to incremental clone. tablet=" << tablet->full_name() @@ -603,8 +598,7 @@ Status EngineCloneTask::_clone_incremental_data(Tablet* tablet, const TabletMeta Status EngineCloneTask::_clone_full_data(Tablet* tablet, TabletMeta* cloned_tablet_meta) { bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); if (!bg_worker_stopped) { - return Status::InternalError( - "Process is going to quit. The clone should be stopped as soon as possible."); + return Status::InternalError("Process is going to quit. The clone should be stopped as soon as possible."); } Version cloned_max_version = cloned_tablet_meta->max_version(); @@ -698,8 +692,7 @@ Status EngineCloneTask::_clone_full_data(Tablet* tablet, TabletMeta* cloned_tabl Status EngineCloneTask::_finish_clone_primary(Tablet* tablet, const std::string& clone_dir) { bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); if (!bg_worker_stopped) { - return Status::InternalError( - "Process is going to quit. The snapshot should be stopped as soon as possible."); + return Status::InternalError("Process is going to quit. The snapshot should be stopped as soon as possible."); } auto meta_file = strings::Substitute("$0/meta", clone_dir); diff --git a/be/src/storage/task/engine_storage_migration_task.cpp b/be/src/storage/task/engine_storage_migration_task.cpp index 25df078f3eedc2..162672c8394437 100644 --- a/be/src/storage/task/engine_storage_migration_task.cpp +++ b/be/src/storage/task/engine_storage_migration_task.cpp @@ -21,10 +21,10 @@ #include "storage/task/engine_storage_migration_task.h" +#include "runtime/exec_env.h" #include "storage/snapshot_manager.h" #include "storage/tablet_meta_manager.h" #include "util/defer_op.h" -#include "runtime/exec_env.h" namespace starrocks { diff --git a/be/src/storage/vectorized/compaction.cpp b/be/src/storage/vectorized/compaction.cpp index eb974519b084b8..99a31eb4a1648d 100644 --- a/be/src/storage/vectorized/compaction.cpp +++ b/be/src/storage/vectorized/compaction.cpp @@ -189,56 +189,56 @@ Status Compaction::merge_rowsets(int64_t mem_limit, Statistics* stats_output) { } #endif - bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); - while (!bg_worker_stopped) { - chunk->reset(); - Status status = reader.get_next(chunk.get()); - if (!status.ok()) { - if (status.is_end_of_file()) { - break; - } else { - return Status::InternalError("reader get_next error."); + bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); + while (!bg_worker_stopped) { + chunk->reset(); + Status status = reader.get_next(chunk.get()); + if (!status.ok()) { + if (status.is_end_of_file()) { + break; + } else { + return Status::InternalError("reader get_next error."); + } } - } - ChunkHelper::padding_char_columns(char_field_indexes, schema, _tablet->tablet_schema(), chunk.get()); + ChunkHelper::padding_char_columns(char_field_indexes, schema, _tablet->tablet_schema(), chunk.get()); - OLAPStatus olap_status = _output_rs_writer->add_chunk(*chunk); - if (olap_status != OLAP_SUCCESS) { - LOG(WARNING) << "writer add_chunk error, err=" << olap_status; - return Status::InternalError("writer add_chunk error."); + OLAPStatus olap_status = _output_rs_writer->add_chunk(*chunk); + if (olap_status != OLAP_SUCCESS) { + LOG(WARNING) << "writer add_chunk error, err=" << olap_status; + return Status::InternalError("writer add_chunk error."); + } + output_rows += chunk->num_rows(); + + bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); } - output_rows += chunk->num_rows(); - bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); - } + if (bg_worker_stopped) { + return Status::InternalError( + "Process is going to quit. The compaction should be stopped as soon as possible."); + } - if (bg_worker_stopped) { - return Status::InternalError( - "Process is going to quit. The compaction should be stopped as soon as possible."); - } + if (stats_output != nullptr) { + stats_output->output_rows = output_rows; + stats_output->merged_rows = reader.merged_rows(); + stats_output->filtered_rows = reader.stats().rows_del_filtered; + } - if (stats_output != nullptr) { - stats_output->output_rows = output_rows; - stats_output->merged_rows = reader.merged_rows(); - stats_output->filtered_rows = reader.stats().rows_del_filtered; - } + OLAPStatus olap_status = _output_rs_writer->flush(); + if (olap_status != OLAP_SUCCESS) { + LOG(WARNING) << "failed to flush rowset when merging rowsets of tablet " + _tablet->full_name() + << ", err=" << olap_status; + return Status::InternalError("failed to flush rowset when merging rowsets of tablet error."); + } - OLAPStatus olap_status = _output_rs_writer->flush(); - if (olap_status != OLAP_SUCCESS) { - LOG(WARNING) << "failed to flush rowset when merging rowsets of tablet " + _tablet->full_name() - << ", err=" << olap_status; - return Status::InternalError("failed to flush rowset when merging rowsets of tablet error."); + return Status::OK(); } - - return Status::OK(); } Status Compaction::modify_rowsets() { bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); - if (bg_worker_stopped) { - return Status::InternalError( - "Process is going to quit. The compaction should be stopped as soon as possible."); + if (bg_worker_stopped) { + return Status::InternalError("Process is going to quit. The compaction should be stopped as soon as possible."); } std::vector output_rowsets; diff --git a/be/src/storage/vectorized/schema_change.cpp b/be/src/storage/vectorized/schema_change.cpp index 5f702f21f9e088..f0bbef37d52e5c 100644 --- a/be/src/storage/vectorized/schema_change.cpp +++ b/be/src/storage/vectorized/schema_change.cpp @@ -684,7 +684,9 @@ bool SchemaChangeDirectly::process(vectorized::TabletReader* reader, RowsetWrite std::unique_ptr mem_pool(new MemPool()); do { bool bg_worker_stopped = ExecEnv::GetInstance()->storage_engine()->bg_worker_stopped(); - if (bg_worker_stopped) { return false; } + if (bg_worker_stopped) { + return false; + } #ifndef BE_TEST Status st = tls_thread_status.mem_tracker()->check_mem_limit("DirectSchemaChange"); if (!st.ok()) { diff --git a/be/test/runtime/external_scan_context_mgr_test.cpp b/be/test/runtime/external_scan_context_mgr_test.cpp index 8f57f90ece4873..6868e435043051 100644 --- a/be/test/runtime/external_scan_context_mgr_test.cpp +++ b/be/test/runtime/external_scan_context_mgr_test.cpp @@ -59,7 +59,6 @@ class ExternalScanContextMgrTest : public testing::Test { TEST_F(ExternalScanContextMgrTest, create_normal) { std::shared_ptr context; ExternalScanContextMgr context_mgr(&_exec_env); - context_mgr._is_stop = true; Status st = context_mgr.create_scan_context(&context); ASSERT_TRUE(st.ok()); ASSERT_TRUE(context != nullptr); @@ -68,7 +67,6 @@ TEST_F(ExternalScanContextMgrTest, create_normal) { TEST_F(ExternalScanContextMgrTest, get_normal) { std::shared_ptr context; ExternalScanContextMgr context_mgr(&_exec_env); - context_mgr._is_stop = true; Status st = context_mgr.create_scan_context(&context); ASSERT_TRUE(st.ok()); ASSERT_TRUE(context != nullptr); @@ -84,7 +82,6 @@ TEST_F(ExternalScanContextMgrTest, get_abnormal) { std::string context_id = "not_exist"; std::shared_ptr result; ExternalScanContextMgr context_mgr(&_exec_env); - context_mgr._is_stop = true; Status st = context_mgr.get_scan_context(context_id, &result); ASSERT_TRUE(!st.ok()); ASSERT_TRUE(result == nullptr); @@ -93,7 +90,6 @@ TEST_F(ExternalScanContextMgrTest, get_abnormal) { TEST_F(ExternalScanContextMgrTest, clear_context) { std::shared_ptr context; ExternalScanContextMgr context_mgr(&_exec_env); - context_mgr._is_stop = true; Status st = context_mgr.create_scan_context(&context); ASSERT_TRUE(st.ok()); ASSERT_TRUE(context != nullptr); From da22894d55198c5f965c9dbfd6ab81bb96dbf99c Mon Sep 17 00:00:00 2001 From: chaoyli Date: Tue, 9 Nov 2021 17:51:28 +0800 Subject: [PATCH 04/10] Fix http server memory leak --- be/src/http/action/pprof_actions.cpp | 78 +--------------------------- be/src/http/action/pprof_actions.h | 61 ++++++++++++++++++++-- be/src/service/http_service.cpp | 60 +++++++++++++++++++-- be/src/service/http_service.h | 3 ++ 4 files changed, 117 insertions(+), 85 deletions(-) diff --git a/be/src/http/action/pprof_actions.cpp b/be/src/http/action/pprof_actions.cpp index 033489abb124fd..7297341c9e7c70 100644 --- a/be/src/http/action/pprof_actions.cpp +++ b/be/src/http/action/pprof_actions.cpp @@ -31,15 +31,14 @@ #include #include "common/config.h" +#include "common/status.h" #include "http/ev_http_server.h" #include "http/http_channel.h" #include "http/http_handler.h" #include "http/http_headers.h" #include "http/http_request.h" #include "http/http_response.h" -#include "runtime/exec_env.h" #include "util/bfd_parser.h" -#include "util/file_utils.h" namespace starrocks { @@ -50,14 +49,6 @@ static const int kPprofDefaultSampleSecs = 30; // Protect, only one thread can work static std::mutex kPprofActionMutex; -class HeapAction : public HttpHandler { -public: - HeapAction() = default; - ~HeapAction() override = default; - - void handle(HttpRequest* req) override; -}; - void HeapAction::handle(HttpRequest* req) { #if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER) (void)kPprofDefaultSampleSecs; // Avoid unused variable warning. @@ -90,14 +81,6 @@ void HeapAction::handle(HttpRequest* req) { #endif } -class GrowthAction : public HttpHandler { -public: - GrowthAction() = default; - ~GrowthAction() override = default; - - void handle(HttpRequest* req) override; -}; - void GrowthAction::handle(HttpRequest* req) { #if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER) std::string str = "Growth profiling is not available with address sanitizer builds."; @@ -112,14 +95,6 @@ void GrowthAction::handle(HttpRequest* req) { #endif } -class ProfileAction : public HttpHandler { -public: - ProfileAction() = default; - ~ProfileAction() override = default; - - void handle(HttpRequest* req) override; -}; - void ProfileAction::handle(HttpRequest* req) { #if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER) std::string str = "CPU profiling is not available with address sanitizer builds."; @@ -155,28 +130,6 @@ void ProfileAction::handle(HttpRequest* req) { #endif } -class PmuProfileAction : public HttpHandler { -public: - PmuProfileAction() = default; - ~PmuProfileAction() override = default; - void handle(HttpRequest* req) override {} -}; - -class ContentionAction : public HttpHandler { -public: - ContentionAction() = default; - ~ContentionAction() override = default; - - void handle(HttpRequest* req) override {} -}; - -class CmdlineAction : public HttpHandler { -public: - CmdlineAction() = default; - ~CmdlineAction() override = default; - void handle(HttpRequest* req) override; -}; - void CmdlineAction::handle(HttpRequest* req) { FILE* fp = fopen("/proc/self/cmdline", "r"); if (fp == nullptr) { @@ -193,17 +146,6 @@ void CmdlineAction::handle(HttpRequest* req) { HttpChannel::send_reply(req, str); } -class SymbolAction : public HttpHandler { -public: - SymbolAction(BfdParser* parser) : _parser(parser) {} - ~SymbolAction() override = default; - - void handle(HttpRequest* req) override; - -private: - BfdParser* _parser; -}; - void SymbolAction::handle(HttpRequest* req) { // TODO: Implement symbol resolution. Without this, the binary needs to be passed // to pprof to resolve all symbols. @@ -243,22 +185,4 @@ void SymbolAction::handle(HttpRequest* req) { } } -Status PprofActions::setup(ExecEnv* exec_env, EvHttpServer* http_server) { - if (!config::pprof_profile_dir.empty()) { - FileUtils::create_dir(config::pprof_profile_dir); - } - - http_server->register_handler(HttpMethod::GET, "/pprof/heap", new HeapAction()); - http_server->register_handler(HttpMethod::GET, "/pprof/growth", new GrowthAction()); - http_server->register_handler(HttpMethod::GET, "/pprof/profile", new ProfileAction()); - http_server->register_handler(HttpMethod::GET, "/pprof/pmuprofile", new PmuProfileAction()); - http_server->register_handler(HttpMethod::GET, "/pprof/contention", new ContentionAction()); - http_server->register_handler(HttpMethod::GET, "/pprof/cmdline", new CmdlineAction()); - auto action = new SymbolAction(exec_env->bfd_parser()); - http_server->register_handler(HttpMethod::GET, "/pprof/symbol", action); - http_server->register_handler(HttpMethod::HEAD, "/pprof/symbol", action); - http_server->register_handler(HttpMethod::POST, "/pprof/symbol", action); - return Status::OK(); -} - } // namespace starrocks diff --git a/be/src/http/action/pprof_actions.h b/be/src/http/action/pprof_actions.h index 91a17011d9211d..d526b89bea7f6b 100644 --- a/be/src/http/action/pprof_actions.h +++ b/be/src/http/action/pprof_actions.h @@ -22,16 +22,67 @@ #ifndef STARROCKS_BE_SRC_HTTP_ACTION_PPROF_ACTIONS_H #define STARROCKS_BE_SRC_HTTP_ACTION_PPROF_ACTIONS_H -#include "common/status.h" +#include "http/http_handler.h" namespace starrocks { -class EvHttpServer; -class ExecEnv; +class BfdParser; -class PprofActions { +class HeapAction : public HttpHandler { public: - static Status setup(ExecEnv* exec_env, EvHttpServer* http_server); + HeapAction() = default; + ~HeapAction() override = default; + + void handle(HttpRequest* req) override; +}; + +class GrowthAction : public HttpHandler { +public: + GrowthAction() = default; + ~GrowthAction() override = default; + + void handle(HttpRequest* req) override; +}; + +class ProfileAction : public HttpHandler { +public: + ProfileAction() = default; + ~ProfileAction() override = default; + + void handle(HttpRequest* req) override; +}; + +class PmuProfileAction : public HttpHandler { +public: + PmuProfileAction() = default; + ~PmuProfileAction() override = default; + void handle(HttpRequest* req) override {} +}; + +class ContentionAction : public HttpHandler { +public: + ContentionAction() = default; + ~ContentionAction() override = default; + + void handle(HttpRequest* req) override {} +}; + +class CmdlineAction : public HttpHandler { +public: + CmdlineAction() = default; + ~CmdlineAction() override = default; + void handle(HttpRequest* req) override; +}; + +class SymbolAction : public HttpHandler { +public: + SymbolAction(BfdParser* parser) : _parser(parser) {} + ~SymbolAction() override = default; + + void handle(HttpRequest* req) override; + +private: + BfdParser* _parser; }; } // namespace starrocks diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index f2ea58c9c99eea..8111a19b789c4c 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -21,6 +21,7 @@ #include "service/http_service.h" +#include "gutil/stl_util.h" #include "http/action/checksum_action.h" #include "http/action/compaction_action.h" #include "http/action/health_action.h" @@ -40,6 +41,7 @@ #include "http/web_page_handler.h" #include "runtime/exec_env.h" #include "runtime/load_path_mgr.h" +#include "util/file_utils.h" #include "util/starrocks_metrics.h" namespace starrocks { @@ -49,13 +51,19 @@ HttpService::HttpService(ExecEnv* env, int port, int num_threads) _ev_http_server(new EvHttpServer(port, num_threads)), _web_page_handler(new WebPageHandler(_ev_http_server.get())) {} -HttpService::~HttpService() = default; +HttpService::~HttpService() { + _ev_http_server.reset(); + _web_page_handler.reset(); + STLDeleteElements(&_http_handlers); +} Status HttpService::start() { add_default_path_handlers(_web_page_handler.get(), _env->process_mem_tracker()); // register load - _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load", new StreamLoadAction(_env)); + StreamLoadAction* stream_load_action = new StreamLoadAction(_env); + _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load", stream_load_action); + _http_handlers.emplace_back(stream_load_action); // register download action std::vector allow_paths; @@ -65,57 +73,103 @@ Status HttpService::start() { DownloadAction* download_action = new DownloadAction(_env, allow_paths); _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_download_load", download_action); _ev_http_server->register_handler(HttpMethod::GET, "/api/_download_load", download_action); + _http_handlers.emplace_back(download_action); DownloadAction* tablet_download_action = new DownloadAction(_env, allow_paths); _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_tablet/_download", tablet_download_action); _ev_http_server->register_handler(HttpMethod::GET, "/api/_tablet/_download", tablet_download_action); + _http_handlers.emplace_back(tablet_download_action); DownloadAction* error_log_download_action = new DownloadAction(_env, _env->load_path_mgr()->get_load_error_file_dir()); _ev_http_server->register_handler(HttpMethod::GET, "/api/_load_error_log", error_log_download_action); _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_load_error_log", error_log_download_action); + _http_handlers.emplace_back(error_log_download_action); // Register BE health action HealthAction* health_action = new HealthAction(_env); _ev_http_server->register_handler(HttpMethod::GET, "/api/health", health_action); + _http_handlers.emplace_back(health_action); // register pprof actions - PprofActions::setup(_env, _ev_http_server.get()); + if (!config::pprof_profile_dir.empty()) { + FileUtils::create_dir(config::pprof_profile_dir); + } + + HeapAction* heap_action = new HeapAction(); + _ev_http_server->register_handler(HttpMethod::GET, "/pprof/heap", heap_action); + _http_handlers.emplace_back(heap_action); + + GrowthAction* growth_action = new GrowthAction(); + _ev_http_server->register_handler(HttpMethod::GET, "/pprof/growth", growth_action); + _http_handlers.emplace_back(growth_action); + + ProfileAction* profile_action = new ProfileAction(); + _ev_http_server->register_handler(HttpMethod::GET, "/pprof/profile", profile_action); + _http_handlers.emplace_back(profile_action); + + PmuProfileAction* pmu_profile_action = new PmuProfileAction(); + _ev_http_server->register_handler(HttpMethod::GET, "/pprof/pmuprofile", pmu_profile_action); + _http_handlers.emplace_back(pmu_profile_action); + + ContentionAction* contention_action = new ContentionAction(); + _ev_http_server->register_handler(HttpMethod::GET, "/pprof/contention", contention_action); + _http_handlers.emplace_back(contention_action); + + CmdlineAction* cmdline_action = new CmdlineAction(); + _ev_http_server->register_handler(HttpMethod::GET, "/pprof/cmdline", cmdline_action); + _http_handlers.emplace_back(cmdline_action); + + SymbolAction* symbol_action = new SymbolAction(_env->bfd_parser()); + _ev_http_server->register_handler(HttpMethod::GET, "/pprof/symbol", symbol_action); + _ev_http_server->register_handler(HttpMethod::HEAD, "/pprof/symbol", symbol_action); + _ev_http_server->register_handler(HttpMethod::POST, "/pprof/symbol", symbol_action); + _http_handlers.emplace_back(symbol_action); // register metrics { auto action = new MetricsAction(StarRocksMetrics::instance()->metrics()); _ev_http_server->register_handler(HttpMethod::GET, "/metrics", action); + _http_handlers.emplace_back(action); } MetaAction* meta_action = new MetaAction(HEADER); _ev_http_server->register_handler(HttpMethod::GET, "/api/meta/header/{tablet_id}/{schema_hash}", meta_action); + _http_handlers.emplace_back(meta_action); #ifndef BE_TEST // Register BE checksum action ChecksumAction* checksum_action = new ChecksumAction(_env); _ev_http_server->register_handler(HttpMethod::GET, "/api/checksum", checksum_action); + _http_handlers.emplace_back(checksum_action); // Register BE reload tablet action ReloadTabletAction* reload_tablet_action = new ReloadTabletAction(_env); _ev_http_server->register_handler(HttpMethod::GET, "/api/reload_tablet", reload_tablet_action); + _http_handlers.emplace_back(reload_tablet_action); RestoreTabletAction* restore_tablet_action = new RestoreTabletAction(_env); _ev_http_server->register_handler(HttpMethod::POST, "/api/restore_tablet", restore_tablet_action); + _http_handlers.emplace_back(restore_tablet_action); // Register BE snapshot action SnapshotAction* snapshot_action = new SnapshotAction(_env); _ev_http_server->register_handler(HttpMethod::GET, "/api/snapshot", snapshot_action); + _http_handlers.emplace_back(snapshot_action); #endif // 2 compaction actions CompactionAction* show_compaction_action = new CompactionAction(CompactionActionType::SHOW_INFO); _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/show", show_compaction_action); + _http_handlers.emplace_back(show_compaction_action); + CompactionAction* run_compaction_action = new CompactionAction(CompactionActionType::RUN_COMPACTION); _ev_http_server->register_handler(HttpMethod::POST, "/api/compaction/run", run_compaction_action); + _http_handlers.emplace_back(run_compaction_action); UpdateConfigAction* update_config_action = new UpdateConfigAction(_env); _ev_http_server->register_handler(HttpMethod::POST, "/api/update_config", update_config_action); + _http_handlers.emplace_back(update_config_action); RETURN_IF_ERROR(_ev_http_server->start()); return Status::OK(); diff --git a/be/src/service/http_service.h b/be/src/service/http_service.h index 0d5da8a9e53124..6e909233ff6e18 100644 --- a/be/src/service/http_service.h +++ b/be/src/service/http_service.h @@ -29,6 +29,7 @@ namespace starrocks { class ExecEnv; class EvHttpServer; +class HttpHandler; class WebPageHandler; // HTTP service for StarRocks BE @@ -44,6 +45,8 @@ class HttpService { std::unique_ptr _ev_http_server; std::unique_ptr _web_page_handler; + + std::vector _http_handlers; }; } // namespace starrocks From 25349f2200639ff8ecc726fd5d056732df4b3d4b Mon Sep 17 00:00:00 2001 From: chaoyli Date: Tue, 9 Nov 2021 20:21:32 +0800 Subject: [PATCH 05/10] Fix CurrentThread thread crash --- be/src/runtime/current_thread.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/be/src/runtime/current_thread.h b/be/src/runtime/current_thread.h index 42bf4b1b2e850d..a3e3f4c77c3575 100644 --- a/be/src/runtime/current_thread.h +++ b/be/src/runtime/current_thread.h @@ -7,6 +7,7 @@ #include "gen_cpp/Types_types.h" #include "runtime/exec_env.h" #include "runtime/mem_tracker.h" +#include "storage/storage_engine.h" #include "util/uid_util.h" namespace starrocks { @@ -19,6 +20,10 @@ class CurrentThread { ~CurrentThread() { commit(); } void commit() { + StorageEngine* storage_engine = ExecEnv::GetInstance()->storage_engine(); + if (storage_engine != nullptr && storage_engine->bg_worker_stopped()) { + return; + } MemTracker* cur_tracker = mem_tracker(); if (_cache_size != 0 && cur_tracker != nullptr) { cur_tracker->consume(_cache_size); From 74cf7082aa3dcfaacf621ef45d8cfc69b7190e81 Mon Sep 17 00:00:00 2001 From: chaoyli Date: Tue, 9 Nov 2021 22:22:36 +0800 Subject: [PATCH 06/10] Fix unit test bug --- be/src/storage/storage_engine.cpp | 2 ++ be/src/storage/update_manager.cpp | 4 +++- be/test/storage/delta_writer_test.cpp | 3 ++- be/test/storage/rowset/beta_rowset_test.cpp | 1 + 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/be/src/storage/storage_engine.cpp b/be/src/storage/storage_engine.cpp index c00cdd87b893e6..72e12346015b0b 100644 --- a/be/src/storage/storage_engine.cpp +++ b/be/src/storage/storage_engine.cpp @@ -470,7 +470,9 @@ void StorageEngine::stop() { } _bg_worker_stopped = true; +#ifndef BE_TEST sleep(30); // wait five seconds to exit all threads gracefully +#endif SAFE_DELETE(_index_stream_lru_cache); _file_cache.reset(); diff --git a/be/src/storage/update_manager.cpp b/be/src/storage/update_manager.cpp index 5cd694d6f0aa42..85918f9057ab17 100644 --- a/be/src/storage/update_manager.cpp +++ b/be/src/storage/update_manager.cpp @@ -45,7 +45,9 @@ UpdateManager::~UpdateManager() { if (_index_cache_mem_tracker) { _index_cache_mem_tracker.reset(); } - _apply_thread_pool->shutdown(); + if (_apply_thread_pool != nullptr) { + _apply_thread_pool->shutdown(); + } } Status UpdateManager::init() { diff --git a/be/test/storage/delta_writer_test.cpp b/be/test/storage/delta_writer_test.cpp index 92395c391932f0..412280cb0fa67f 100644 --- a/be/test/storage/delta_writer_test.cpp +++ b/be/test/storage/delta_writer_test.cpp @@ -75,8 +75,10 @@ void set_up() { } void tear_down() { + k_engine->stop(); delete k_engine; k_engine = nullptr; + ExecEnv::GetInstance()->set_storage_engine(nullptr); system("rm -rf ./data_test"); FileUtils::remove_all(std::string(getenv("STARROCKS_HOME")) + UNUSED_PREFIX); delete k_tablet_meta_mem_tracker; @@ -302,7 +304,6 @@ TEST_F(TestDeltaWriter, open) { TDropTabletReq drop_request; auto tablet_id = 10003; - auto schema_hash = 270068375; st = k_engine->tablet_manager()->drop_tablet(tablet_id); ASSERT_TRUE(st.ok()) << st.to_string(); } diff --git a/be/test/storage/rowset/beta_rowset_test.cpp b/be/test/storage/rowset/beta_rowset_test.cpp index 44d6045192265e..a10ced71fcdfd1 100644 --- a/be/test/storage/rowset/beta_rowset_test.cpp +++ b/be/test/storage/rowset/beta_rowset_test.cpp @@ -89,6 +89,7 @@ class BetaRowsetTest : public testing::Test { } void TearDown() override { + k_engine->stop(); delete k_engine; k_engine = nullptr; starrocks::ExecEnv::GetInstance()->set_storage_engine(nullptr); From b911bc9bccf4c2a75c057c1c5f9c4674b53710b4 Mon Sep 17 00:00:00 2001 From: chaoyli Date: Thu, 11 Nov 2021 10:03:00 +0800 Subject: [PATCH 07/10] Make thread been joinable --- be/src/agent/task_worker_pool.cpp | 31 +++---------------- be/src/agent/task_worker_pool.h | 1 + be/src/service/starrocks_main.cpp | 4 +-- be/src/storage/olap_server.cpp | 23 -------------- .../default_value_column_iterator.cpp | 2 +- be/src/storage/storage_engine.cpp | 28 +++++++++++++++-- 6 files changed, 34 insertions(+), 55 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 1c41363adbed9d..c9bb6819b9edc9 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -21,7 +21,6 @@ #include "agent/task_worker_pool.h" -#include #include #include @@ -175,7 +174,9 @@ void TaskWorkerPool::stop() { _stopped = true; std::lock_guard l(_worker_thread_lock); _worker_thread_condition_variable->notify_all(); - sleep(1); // wait thread to exit + for (uint32_t i = 0; i < _worker_count; ++i) { + _worker_threads[i].join(); + } } void TaskWorkerPool::submit_task(const TAgentTaskRequest& task) { @@ -215,30 +216,8 @@ void TaskWorkerPool::_remove_task_info(const TTaskType::type task_type, int64_t } void TaskWorkerPool::_spawn_callback_worker_thread(CALLBACK_FUNCTION callback_func) { - pthread_t thread; - sigset_t mask; - sigset_t omask; - int err = 0; - - // TODO: why need to catch these signals, should leave a comment - sigemptyset(&mask); - sigaddset(&mask, SIGCHLD); - sigaddset(&mask, SIGHUP); - sigaddset(&mask, SIGPIPE); - pthread_sigmask(SIG_SETMASK, &mask, &omask); - - while (true) { - err = pthread_create(&thread, nullptr, callback_func, this); - if (err != 0) { - LOG(WARNING) << "failed to spawn a thread. error: " << err; -#ifndef BE_TEST - sleep(config::sleep_one_second); -#endif - } else { - pthread_detach(thread); - break; - } - } + std::thread worker_thread(callback_func, this); + _worker_threads.emplace_back(std::move(worker_thread)); } void TaskWorkerPool::_finish_task(const TFinishTaskRequest& finish_task_request) { diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 509c83abcbb835..e01c4bad2e8074 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -144,6 +144,7 @@ class TaskWorkerPool { static std::map> _s_task_signatures; std::atomic _stopped{false}; + std::vector _worker_threads; TaskWorkerPool(const TaskWorkerPool&) = delete; const TaskWorkerPool& operator=(const TaskWorkerPool&) = delete; diff --git a/be/src/service/starrocks_main.cpp b/be/src/service/starrocks_main.cpp index 54a5ff2116ec98..d1e630758cc63a 100644 --- a/be/src/service/starrocks_main.cpp +++ b/be/src/service/starrocks_main.cpp @@ -261,11 +261,11 @@ int main(int argc, char** argv) { status = heartbeat_thrift_server->start(); if (!status.ok()) { - LOG(ERROR) << "Doris BE HeartBeat Service did not start correctly. Error=" << status.to_string(); + LOG(ERROR) << "StarRocks BE HeartBeat Service did not start correctly. Error=" << status.to_string(); starrocks::shutdown_logging(); exit(1); } else { - LOG(INFO) << "Doris BE HeartBeat Service started correctly."; + LOG(INFO) << "StarRocks BE HeartBeat Service started correctly."; } while (!starrocks::k_starrocks_exit) { diff --git a/be/src/storage/olap_server.cpp b/be/src/storage/olap_server.cpp index 516132b5519db3..bbe4d1af2ae416 100644 --- a/be/src/storage/olap_server.cpp +++ b/be/src/storage/olap_server.cpp @@ -56,20 +56,16 @@ volatile uint32_t g_schema_change_active_threads = 0; Status StorageEngine::start_bg_threads() { _update_cache_expire_thread = std::thread([this] { _update_cache_expire_thread_callback(nullptr); }); LOG(INFO) << "update cache expire thread started"; - _update_cache_expire_thread.detach(); _unused_rowset_monitor_thread = std::thread([this] { _unused_rowset_monitor_thread_callback(nullptr); }); - _unused_rowset_monitor_thread.detach(); LOG(INFO) << "unused rowset monitor thread started"; // start thread for monitoring the snapshot and trash folder _garbage_sweeper_thread = std::thread([this] { _garbage_sweeper_thread_callback(nullptr); }); - _garbage_sweeper_thread.detach(); LOG(INFO) << "garbage sweeper thread started"; // start thread for monitoring the tablet with io error _disk_stat_monitor_thread = std::thread([this] { _disk_stat_monitor_thread_callback(nullptr); }); - _disk_stat_monitor_thread.detach(); LOG(INFO) << "disk stat monitor thread started"; // convert store map to vector @@ -100,9 +96,6 @@ Status StorageEngine::start_bg_threads() { _base_compaction_thread_callback(nullptr, data_dirs[i % data_dir_num]); }); } - for (auto& thread : _base_compaction_threads) { - thread.detach(); - } LOG(INFO) << "base compaction threads started. number: " << base_compaction_num_threads; _cumulative_compaction_threads.reserve(cumulative_compaction_num_threads); @@ -111,9 +104,6 @@ Status StorageEngine::start_bg_threads() { _cumulative_compaction_thread_callback(nullptr, data_dirs[i % data_dir_num]); }); } - for (auto& thread : _cumulative_compaction_threads) { - thread.detach(); - } LOG(INFO) << "cumulative compaction threads started. number: " << cumulative_compaction_num_threads; int32_t update_compaction_num_threads_per_disk = @@ -125,23 +115,16 @@ Status StorageEngine::start_bg_threads() { _update_compaction_thread_callback(nullptr, data_dirs[i % data_dir_num]); }); } - for (auto& thread : _update_compaction_threads) { - thread.detach(); - } LOG(INFO) << "update compaction threads started. number: " << update_compaction_num_threads; // tablet checkpoint thread for (auto data_dir : data_dirs) { _tablet_checkpoint_threads.emplace_back([this, data_dir] { _tablet_checkpoint_callback((void*)data_dir); }); } - for (auto& thread : _tablet_checkpoint_threads) { - thread.detach(); - } LOG(INFO) << "tablet checkpoint thread started"; // fd cache clean thread _fd_cache_clean_thread = std::thread([this] { _fd_cache_clean_callback(nullptr); }); - _fd_cache_clean_thread.detach(); LOG(INFO) << "fd cache clean thread started"; // path scan and gc thread @@ -151,12 +134,6 @@ Status StorageEngine::start_bg_threads() { _path_gc_threads.emplace_back([this, data_dir] { _path_gc_thread_callback((void*)data_dir); }); } - for (auto& thread : _path_scan_threads) { - thread.detach(); - } - for (auto& thread : _path_gc_threads) { - thread.detach(); - } LOG(INFO) << "path scan/gc threads started. number:" << get_stores().size(); } diff --git a/be/src/storage/rowset/segment_v2/default_value_column_iterator.cpp b/be/src/storage/rowset/segment_v2/default_value_column_iterator.cpp index 3e7574335c869a..84c5970ce33d88 100644 --- a/be/src/storage/rowset/segment_v2/default_value_column_iterator.cpp +++ b/be/src/storage/rowset/segment_v2/default_value_column_iterator.cpp @@ -140,4 +140,4 @@ Status DefaultValueColumnIterator::get_row_ranges_by_zone_map( } } // namespace segment_v2 -} // namespace starrocks \ No newline at end of file +} // namespace starrocks diff --git a/be/src/storage/storage_engine.cpp b/be/src/storage/storage_engine.cpp index 72e12346015b0b..3bbf9513973b6f 100644 --- a/be/src/storage/storage_engine.cpp +++ b/be/src/storage/storage_engine.cpp @@ -470,9 +470,31 @@ void StorageEngine::stop() { } _bg_worker_stopped = true; -#ifndef BE_TEST - sleep(30); // wait five seconds to exit all threads gracefully -#endif + _update_cache_expire_thread.join(); + _unused_rowset_monitor_thread.join(); + _garbage_sweeper_thread.join(); + _disk_stat_monitor_thread.join(); + for (auto& thread : _base_compaction_threads) { + thread.join(); + } + for (auto& thread : _cumulative_compaction_threads) { + thread.join(); + } + for (auto& thread : _update_compaction_threads) { + thread.join(); + } + for (auto& thread : _tablet_checkpoint_threads) { + thread.join(); + } + _fd_cache_clean_thread.join(); + if (config::path_gc_check) { + for (auto& thread : _path_scan_threads) { + thread.join(); + } + for (auto& thread : _path_gc_threads) { + thread.join(); + } + } SAFE_DELETE(_index_stream_lru_cache); _file_cache.reset(); From 575220b62ca2b565c3f593811c744e4d729d17fe Mon Sep 17 00:00:00 2001 From: chaoyli Date: Thu, 11 Nov 2021 16:07:23 +0800 Subject: [PATCH 08/10] Make thread been joinable --- be/src/agent/agent_server.h | 3 ++- be/src/agent/multi_worker_pool.cpp | 5 ++--- be/src/agent/multi_worker_pool.h | 13 +++++++------ be/src/agent/task_worker_pool.cpp | 7 ------- be/src/agent/task_worker_pool.h | 8 ++++---- be/src/storage/storage_engine.cpp | 2 +- be/src/storage/storage_engine.h | 4 ++-- 7 files changed, 18 insertions(+), 24 deletions(-) diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index 042fa84e0907e3..6fa186b2e906f5 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -31,6 +31,7 @@ namespace starrocks { +class MultiWorkerPool; class TaskWorkerPool; // Each method corresponds to one RPC from FE Master, see BackendService. @@ -62,7 +63,7 @@ class AgentServer { std::unique_ptr _create_tablet_workers; std::unique_ptr _drop_tablet_workers; std::unique_ptr _push_workers; - std::unique_ptr _publish_version_workers; + std::unique_ptr _publish_version_workers; std::unique_ptr _clear_transaction_task_workers; std::unique_ptr _delete_workers; std::unique_ptr _alter_tablet_workers; diff --git a/be/src/agent/multi_worker_pool.cpp b/be/src/agent/multi_worker_pool.cpp index 77881c96aedaa5..68a5a3bed52b46 100644 --- a/be/src/agent/multi_worker_pool.cpp +++ b/be/src/agent/multi_worker_pool.cpp @@ -4,9 +4,8 @@ namespace starrocks { -MultiWorkerPool::MultiWorkerPool(const TaskWorkerType worker_type, ExecEnv* env, const TMasterInfo& master_info, - int worker_num) - : TaskWorkerPool(worker_type, env, master_info, worker_num) { +MultiWorkerPool::MultiWorkerPool(const TaskWorkerPool::TaskWorkerType worker_type, ExecEnv* env, + const TMasterInfo& master_info, int worker_num) { DCHECK(worker_num > 0); for (int i = 0; i < worker_num; i++) { auto pool = std::make_shared(worker_type, env, master_info, 1); diff --git a/be/src/agent/multi_worker_pool.h b/be/src/agent/multi_worker_pool.h index 9ed8285f9a4612..63747b4e3de19b 100644 --- a/be/src/agent/multi_worker_pool.h +++ b/be/src/agent/multi_worker_pool.h @@ -10,18 +10,19 @@ namespace starrocks { // We create MultiWorkerPool for processing publish version task, these tasks are // submitted to one task pool according to its partition id, so the tasks belong to // the same partition will be processed by the same worker thread. -class MultiWorkerPool : public TaskWorkerPool { +class MultiWorkerPool { public: - MultiWorkerPool(const TaskWorkerType worker_type, ExecEnv* env, const TMasterInfo& master_info, int worker_num); + MultiWorkerPool(const TaskWorkerPool::TaskWorkerType worker_type, ExecEnv* env, const TMasterInfo& master_info, + int worker_num); - ~MultiWorkerPool() override = default; + ~MultiWorkerPool() = default; - void start() override; + void start(); - void stop() override; + void stop(); // submit task to queue and wait to be executed - void submit_task(const TAgentTaskRequest& task) override; + void submit_task(const TAgentTaskRequest& task); private: void submit_publish_version_task(const TAgentTaskRequest& task); diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index c9bb6819b9edc9..67fca284feaf11 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -83,12 +83,6 @@ TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* e } TaskWorkerPool::~TaskWorkerPool() { - // We should notify all waiting threads to destroy this condition variable - // If we don't notify, pthread_cond_destroy will hang in some GLIBC version. - // See https://bugzilla.redhat.com/show_bug.cgi?id=1647381 - // "In glibc 2.25 we implemented a new version of POSIX condition variables to provide stronger - // ordering guarantees. The change in the implementation caused the undefined behaviour - // to change." stop(); delete _worker_thread_condition_variable; } @@ -172,7 +166,6 @@ void TaskWorkerPool::stop() { return; } _stopped = true; - std::lock_guard l(_worker_thread_lock); _worker_thread_condition_variable->notify_all(); for (uint32_t i = 0; i < _worker_count; ++i) { _worker_threads[i].join(); diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index e01c4bad2e8074..1aaca2249f76c5 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -72,19 +72,19 @@ class TaskWorkerPool { typedef void* (*CALLBACK_FUNCTION)(void*); TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* env, const TMasterInfo& master_info, int worker_num); - virtual ~TaskWorkerPool(); + ~TaskWorkerPool(); // start the task worker callback thread - virtual void start(); + void start(); // stop the task worker callback thread - virtual void stop(); + void stop(); // Submit task to task pool // // Input parameters: // * task: the task need callback thread to do - virtual void submit_task(const TAgentTaskRequest& task); + void submit_task(const TAgentTaskRequest& task); private: bool _register_task_info(const TTaskType::type task_type, int64_t signature); diff --git a/be/src/storage/storage_engine.cpp b/be/src/storage/storage_engine.cpp index 3bbf9513973b6f..87886825ca1092 100644 --- a/be/src/storage/storage_engine.cpp +++ b/be/src/storage/storage_engine.cpp @@ -468,7 +468,7 @@ void StorageEngine::stop() { } _store_map.clear(); } - _bg_worker_stopped = true; + _bg_worker_stopped.store(true, std::memory_order_release); _update_cache_expire_thread.join(); _unused_rowset_monitor_thread.join(); diff --git a/be/src/storage/storage_engine.h b/be/src/storage/storage_engine.h index c17182895ade50..e1bb64eef59bfc 100644 --- a/be/src/storage/storage_engine.h +++ b/be/src/storage/storage_engine.h @@ -173,7 +173,7 @@ class StorageEngine { void stop(); - bool bg_worker_stopped() { return _bg_worker_stopped; } + bool bg_worker_stopped() { return _bg_worker_stopped.load(std::memory_order_consume); } private: // Instance should be inited from `static open()` @@ -283,7 +283,7 @@ class StorageEngine { // map, if we use RowsetId as the key, we need custom hash func std::unordered_map _unused_rowsets; - bool _bg_worker_stopped = false; + std::atomic _bg_worker_stopped{false}; // thread to expire update cache; std::thread _update_cache_expire_thread; std::thread _unused_rowset_monitor_thread; From b928d444337857c8bc750055be66ac85e15ba215 Mon Sep 17 00:00:00 2001 From: chaoyli Date: Thu, 11 Nov 2021 19:49:06 +0800 Subject: [PATCH 09/10] Make thread been joinable --- be/src/agent/task_worker_pool.h | 1 + be/src/service/brpc.h | 4 ---- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 1aaca2249f76c5..178acea50f96b3 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include diff --git a/be/src/service/brpc.h b/be/src/service/brpc.h index 97b9fc3bc00a11..a3d8b05884f3d8 100644 --- a/be/src/service/brpc.h +++ b/be/src/service/brpc.h @@ -32,10 +32,6 @@ #undef DISALLOW_IMPLICIT_CONSTRUCTORS #endif -#ifdef arraysize -#undef arraysize -#endif - #undef OVERRIDE #undef FINAL From 5ab6af4b6b0cb1139d30f87fd8a791872a830441 Mon Sep 17 00:00:00 2001 From: chaoyli Date: Fri, 12 Nov 2021 11:32:08 +0800 Subject: [PATCH 10/10] Make thread been joinable --- be/src/agent/task_worker_pool.cpp | 4 ++- be/src/storage/olap_server.cpp | 40 ++++++++++++++-------------- be/src/storage/storage_engine.cpp | 44 +++++++++++++++++++++++-------- be/test/test_main.cpp | 3 +++ 4 files changed, 59 insertions(+), 32 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 67fca284feaf11..592d24ee5a8092 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -168,7 +168,9 @@ void TaskWorkerPool::stop() { _stopped = true; _worker_thread_condition_variable->notify_all(); for (uint32_t i = 0; i < _worker_count; ++i) { - _worker_threads[i].join(); + if (_worker_threads[i].joinable()) { + _worker_threads[i].join(); + } } } diff --git a/be/src/storage/olap_server.cpp b/be/src/storage/olap_server.cpp index bbe4d1af2ae416..fa1ff4565eca84 100644 --- a/be/src/storage/olap_server.cpp +++ b/be/src/storage/olap_server.cpp @@ -40,14 +40,14 @@ using std::string; namespace starrocks { // TODO(yingchun): should be more graceful in the future refactor. -#define SLEEP_IN_BG_WORKER(seconds) \ - int64_t left_seconds = (seconds); \ - while (!_bg_worker_stopped && left_seconds > 0) { \ - sleep(1); \ - --left_seconds; \ - } \ - if (_bg_worker_stopped) { \ - break; \ +#define SLEEP_IN_BG_WORKER(seconds) \ + int64_t left_seconds = (seconds); \ + while (!_bg_worker_stopped.load(std::memory_order_consume) && left_seconds > 0) { \ + sleep(1); \ + --left_seconds; \ + } \ + if (_bg_worker_stopped.load(std::memory_order_consume)) { \ + break; \ } // number of running SCHEMA-CHANGE threads @@ -145,7 +145,7 @@ void* StorageEngine::_fd_cache_clean_callback(void* arg) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - while (!_bg_worker_stopped) { + while (!_bg_worker_stopped.load(std::memory_order_consume)) { int32_t interval = config::file_descriptor_cache_clean_interval; if (interval <= 0) { LOG(WARNING) << "config of file descriptor clean interval is illegal: " << interval << "force set to 3600"; @@ -166,7 +166,7 @@ void* StorageEngine::_base_compaction_thread_callback(void* arg, DataDir* data_d //string last_base_compaction_fs; //TTabletId last_base_compaction_tablet_id = -1; Status status = Status::OK(); - while (!_bg_worker_stopped) { + while (!_bg_worker_stopped.load(std::memory_order_consume)) { // must be here, because this thread is start on start and if (!data_dir->reach_capacity_limit(0)) { status = _perform_base_compaction(data_dir); @@ -191,7 +191,7 @@ void* StorageEngine::_update_compaction_thread_callback(void* arg, DataDir* data ProfilerRegisterThread(); #endif Status status = Status::OK(); - while (!_bg_worker_stopped) { + while (!_bg_worker_stopped.load(std::memory_order_consume)) { // must be here, because this thread is start on start and if (!data_dir->reach_capacity_limit(0)) { status = _perform_update_compaction(data_dir); @@ -228,7 +228,7 @@ void* StorageEngine::_garbage_sweeper_thread_callback(void* arg) { const double pi = 4 * std::atan(1); double usage = 1.0; - while (!_bg_worker_stopped) { + while (!_bg_worker_stopped.load(std::memory_order_consume)) { usage *= 100.0; // when disk usage is less than 60%, ratio is about 1; // when disk usage is between [60%, 75%], ratio drops from 0.87 to 0.27; @@ -259,7 +259,7 @@ void* StorageEngine::_disk_stat_monitor_thread_callback(void* arg) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - while (!_bg_worker_stopped) { + while (!_bg_worker_stopped.load(std::memory_order_consume)) { _start_disk_stat_monitor(); int32_t interval = config::disk_stat_monitor_interval; @@ -280,7 +280,7 @@ void* StorageEngine::_cumulative_compaction_thread_callback(void* arg, DataDir* LOG(INFO) << "try to start cumulative compaction process!"; Status status = Status::OK(); - while (!_bg_worker_stopped) { + while (!_bg_worker_stopped.load(std::memory_order_consume)) { // must be here, because this thread is start on start and if (!data_dir->reach_capacity_limit(0)) { status = _perform_cumulative_compaction(data_dir); @@ -305,7 +305,7 @@ void* StorageEngine::_update_cache_expire_thread_callback(void* arg) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - while (!_bg_worker_stopped) { + while (!_bg_worker_stopped.load(std::memory_order_consume)) { int32_t expire_sec = config::update_cache_expire_sec; if (expire_sec <= 0) { LOG(WARNING) << "update_cache_expire_sec config is illegal: " << expire_sec << ", force set to 360"; @@ -324,7 +324,7 @@ void* StorageEngine::_unused_rowset_monitor_thread_callback(void* arg) { #ifdef GOOGLE_PROFILER ProfilerRegisterThread(); #endif - while (!_bg_worker_stopped) { + while (!_bg_worker_stopped.load(std::memory_order_consume)) { start_delete_unused_rowset(); int32_t interval = config::unused_rowset_monitor_interval; @@ -345,7 +345,7 @@ void* StorageEngine::_path_gc_thread_callback(void* arg) { LOG(INFO) << "try to start path gc thread!"; - while (!_bg_worker_stopped) { + while (!_bg_worker_stopped.load(std::memory_order_consume)) { LOG(INFO) << "try to perform path gc by tablet!"; ((DataDir*)arg)->perform_path_gc_by_tablet(); @@ -371,13 +371,13 @@ void* StorageEngine::_path_scan_thread_callback(void* arg) { #endif LOG(INFO) << "wait 10min to start path scan thread"; - while (!_bg_worker_stopped) { + while (!_bg_worker_stopped.load(std::memory_order_consume)) { SLEEP_IN_BG_WORKER(600); break; } LOG(INFO) << "try to start path scan thread!"; - while (!_bg_worker_stopped) { + while (!_bg_worker_stopped.load(std::memory_order_consume)) { LOG(INFO) << "try to perform path scan!"; ((DataDir*)arg)->perform_path_scan(); @@ -398,7 +398,7 @@ void* StorageEngine::_tablet_checkpoint_callback(void* arg) { ProfilerRegisterThread(); #endif LOG(INFO) << "try to start tablet meta checkpoint thread!"; - while (!_bg_worker_stopped) { + while (!_bg_worker_stopped.load(std::memory_order_consume)) { LOG(INFO) << "begin to do tablet meta checkpoint:" << ((DataDir*)arg)->path(); int64_t start_time = UnixMillis(); _tablet_manager->do_tablet_meta_checkpoint((DataDir*)arg); diff --git a/be/src/storage/storage_engine.cpp b/be/src/storage/storage_engine.cpp index 87886825ca1092..249533438f4675 100644 --- a/be/src/storage/storage_engine.cpp +++ b/be/src/storage/storage_engine.cpp @@ -470,29 +470,51 @@ void StorageEngine::stop() { } _bg_worker_stopped.store(true, std::memory_order_release); - _update_cache_expire_thread.join(); - _unused_rowset_monitor_thread.join(); - _garbage_sweeper_thread.join(); - _disk_stat_monitor_thread.join(); + if (_update_cache_expire_thread.joinable()) { + _update_cache_expire_thread.join(); + } + if (_unused_rowset_monitor_thread.joinable()) { + _unused_rowset_monitor_thread.join(); + } + if (_garbage_sweeper_thread.joinable()) { + _garbage_sweeper_thread.join(); + } + if (_disk_stat_monitor_thread.joinable()) { + _disk_stat_monitor_thread.join(); + } for (auto& thread : _base_compaction_threads) { - thread.join(); + if (thread.joinable()) { + thread.join(); + } } for (auto& thread : _cumulative_compaction_threads) { - thread.join(); + if (thread.joinable()) { + thread.join(); + } } for (auto& thread : _update_compaction_threads) { - thread.join(); + if (thread.joinable()) { + thread.join(); + } } for (auto& thread : _tablet_checkpoint_threads) { - thread.join(); + if (thread.joinable()) { + thread.join(); + } + } + if (_fd_cache_clean_thread.joinable()) { + _fd_cache_clean_thread.join(); } - _fd_cache_clean_thread.join(); if (config::path_gc_check) { for (auto& thread : _path_scan_threads) { - thread.join(); + if (thread.joinable()) { + thread.join(); + } } for (auto& thread : _path_gc_threads) { - thread.join(); + if (thread.joinable()) { + thread.join(); + } } } diff --git a/be/test/test_main.cpp b/be/test/test_main.cpp index 5fa615a12a5e57..3f6ed1a5e38311 100644 --- a/be/test/test_main.cpp +++ b/be/test/test_main.cpp @@ -77,6 +77,9 @@ int main(int argc, char** argv) { starrocks::StorageEngine::instance()->update_manager()->clear_cache(); (void)butil::DeleteFile(storage_root, true); starrocks::vectorized::TEST_clear_all_columns_this_thread(); + // delete engine + engine->stop(); + delete engine; // destroy exec env starrocks::tls_thread_status.set_mem_tracker(nullptr); starrocks::ExecEnv::destroy(exec_env);