Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug](scan) fix core dump due to store_path_map #23084

Merged
merged 1 commit into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class ExecEnv {
BlockSpillManager* block_spill_mgr() { return _block_spill_mgr; }

const std::vector<StorePath>& store_paths() const { return _store_paths; }
size_t store_path_to_index(const std::string& path) { return _store_path_map[path]; }

StorageEngine* storage_engine() { return _storage_engine; }
void set_storage_engine(StorageEngine* storage_engine) { _storage_engine = storage_engine; }

Expand Down Expand Up @@ -199,8 +199,7 @@ class ExecEnv {

bool _is_init;
std::vector<StorePath> _store_paths;
// path => store index
std::map<std::string, size_t> _store_path_map;

// Leave protected so that subclasses can override
ExternalScanContextMgr* _external_scan_context_mgr = nullptr;
doris::vectorized::VDataStreamMgr* _vstream_mgr = nullptr;
Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
return Status::OK();
}
_store_paths = store_paths;
// path_name => path_index
for (int i = 0; i < store_paths.size(); i++) {
_store_path_map[store_paths[i].path] = i;
}

_external_scan_context_mgr = new ExternalScanContextMgr(this);
_vstream_mgr = new doris::vectorized::VDataStreamMgr();
Expand Down
148 changes: 0 additions & 148 deletions be/src/util/priority_work_stealing_thread_pool.hpp

This file was deleted.

5 changes: 2 additions & 3 deletions be/src/util/work_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class WorkThreadPool {
public:
int priority;
WorkFunction work_function;
int queue_id;
bool operator<(const Task& o) const { return priority < o.priority; }

Task& operator++() {
Expand Down Expand Up @@ -88,12 +87,12 @@ class WorkThreadPool {
virtual bool offer(Task task) { return _work_queue.blocking_put(task); }

virtual bool offer(WorkFunction func) {
WorkThreadPool::Task task = {0, func, 0};
WorkThreadPool::Task task = {0, func};
return _work_queue.blocking_put(task);
}

virtual bool try_offer(WorkFunction func) {
WorkThreadPool::Task task = {0, func, 0};
WorkThreadPool::Task task = {0, func};
return _work_queue.try_put(task);
}

Expand Down
8 changes: 3 additions & 5 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include "util/blocking_queue.hpp"
#include "util/cpu_info.h"
#include "util/defer_op.h"
#include "util/priority_work_stealing_thread_pool.hpp"
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/threadpool.h"
Expand Down Expand Up @@ -101,9 +100,9 @@ Status ScannerScheduler::init(ExecEnv* env) {
}

// 2. local scan thread pool
_local_scan_thread_pool.reset(new PriorityWorkStealingThreadPool(
config::doris_scanner_thread_pool_thread_num, env->store_paths().size(),
config::doris_scanner_thread_pool_queue_size, "local_scan"));
_local_scan_thread_pool.reset(
new PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size, "local_scan"));

// 3. remote scan thread pool
ThreadPoolBuilder("RemoteScanThreadPool")
Expand Down Expand Up @@ -243,7 +242,6 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) {
this->_scanner_scan(this, ctx, scanner);
};
task.priority = nice;
task.queue_id = (*iter)->queue_id();
ret = _local_scan_thread_pool->offer(task);
}
} else {
Expand Down
7 changes: 2 additions & 5 deletions be/src/vec/exec/scan/vscanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class VScanner {

void update_wait_worker_timer() { _scanner_wait_worker_timer += _watch.elapsed_time(); }

int64_t get_scanner_wait_worker_timer() { return _scanner_wait_worker_timer; }
int64_t get_scanner_wait_worker_timer() const { return _scanner_wait_worker_timer; }

void update_scan_cpu_timer() { _scan_cpu_timer += _cpu_watch.elapsed_time(); }

Expand All @@ -117,13 +117,11 @@ class VScanner {
bool is_open() { return _is_open; }
void set_opened() { _is_open = true; }

int queue_id() { return _state->exec_env()->store_path_to_index("xxx"); }

virtual doris::TabletStorageType get_storage_type() {
return doris::TabletStorageType::STORAGE_TYPE_REMOTE;
}

bool need_to_close() { return _need_to_close; }
bool need_to_close() const { return _need_to_close; }

void mark_to_need_to_close() {
// If the scanner is failed during init or open, then not need update counters
Expand Down Expand Up @@ -156,7 +154,6 @@ class VScanner {
_conjuncts.clear();
}

protected:
RuntimeState* _state;
VScanNode* _parent;
// Set if scan node has sort limit info
Expand Down