Skip to content

Commit

Permalink
Merge branch 'master' into fix-bvar-1
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Dec 21, 2023
2 parents 52eb0da + 4ee6612 commit 7447535
Show file tree
Hide file tree
Showing 96 changed files with 2,924 additions and 2,351 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,8 @@ DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
DEFINE_Int32(load_stream_messages_in_batch, "128");
// brpc streaming StreamWait seconds on EAGAIN
DEFINE_Int32(load_stream_eagain_wait_seconds, "60");
// max tasks per flush token in load stream
DEFINE_Int32(load_stream_flush_token_max_tasks, "2");

// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,8 @@ DECLARE_Int64(load_stream_max_buf_size);
DECLARE_Int32(load_stream_messages_in_batch);
// brpc streaming StreamWait seconds on EAGAIN
DECLARE_Int32(load_stream_eagain_wait_seconds);
// max tasks per flush token in load stream
DECLARE_Int32(load_stream_flush_token_max_tasks);

// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1230,14 +1230,15 @@ void StorageEngine::notify_listeners() {
}

bool StorageEngine::notify_listener(std::string_view name) {
bool found = false;
std::lock_guard<std::mutex> l(_report_mtx);
for (auto& listener : _report_listeners) {
if (listener->name() == name) {
listener->notify();
return true;
found = true;
}
}
return false;
return found;
}

// check whether any unused rowsets's id equal to rowset_id
Expand Down
27 changes: 20 additions & 7 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Status EnginePublishVersionTask::execute() {
}
#endif

std::vector<std::shared_ptr<TabletPublishTxnTask>> tablet_tasks;
// each partition
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
int64_t partition_id = par_ver_info.partition_id;
Expand Down Expand Up @@ -242,6 +243,7 @@ Status EnginePublishVersionTask::execute() {

auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>(
this, tablet, rowset, partition_id, transaction_id, version, tablet_info);
tablet_tasks.push_back(tablet_publish_txn_ptr);
auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); });
#ifndef NDEBUG
LOG(INFO) << "transaction_id: " << transaction_id << ", partition id: " << partition_id
Expand All @@ -254,6 +256,15 @@ Status EnginePublishVersionTask::execute() {
}
token->wait();

if (res.ok()) {
for (const auto& tablet_task : tablet_tasks) {
res = tablet_task->result();
if (!res.ok()) {
break;
}
}
}

_succ_tablets->clear();
// check if the related tablet remained all have the version
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
Expand Down Expand Up @@ -343,24 +354,24 @@ void TabletPublishTxnTask::handle() {
rowset_update_lock.lock();
}
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
_result = StorageEngine::instance()->txn_manager()->publish_txn(
_partition_id, _tablet, _transaction_id, _version, &_stats);
if (!publish_status.ok()) {
if (!_result.ok()) {
LOG(WARNING) << "failed to publish version. rowset_id=" << _rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
<< ", res=" << publish_status;
<< ", res=" << _result;
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}

// add visible rowset to tablet
int64_t t1 = MonotonicMicros();
publish_status = _tablet->add_inc_rowset(_rowset);
_result = _tablet->add_inc_rowset(_rowset);
_stats.add_inc_rowset_us = MonotonicMicros() - t1;
if (!publish_status.ok() && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
if (!_result.ok() && !_result.is<PUSH_VERSION_ALREADY_EXIST>()) {
LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << _rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
<< ", res=" << publish_status;
<< ", res=" << _result;
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}
Expand All @@ -370,9 +381,11 @@ void TabletPublishTxnTask::handle() {
LOG(INFO) << "publish version successfully on tablet"
<< ", table_id=" << _tablet->table_id() << ", tablet=" << _tablet->tablet_id()
<< ", transaction_id=" << _transaction_id << ", version=" << _version.first
<< ", num_rows=" << _rowset->num_rows() << ", res=" << publish_status
<< ", num_rows=" << _rowset->num_rows() << ", res=" << _result
<< ", cost: " << cost_us << "(us) "
<< (cost_us > 500 * 1000 ? _stats.to_string() : "");

_result = Status::OK();
}

void AsyncTabletPublishTask::handle() {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/task/engine_publish_version_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class TabletPublishTxnTask {
~TabletPublishTxnTask() = default;

void handle();
Status result() { return _result; }

private:
EnginePublishVersionTask* _engine_publish_version_task = nullptr;
Expand All @@ -80,6 +81,7 @@ class TabletPublishTxnTask {
Version _version;
TabletInfo _tablet_info;
TabletPublishStatistics _stats;
Status _result;
};

class EnginePublishVersionTask : public EngineTask {
Expand Down
37 changes: 31 additions & 6 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block, bool
return Status::OK();
}

Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, bool* eos) {
Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* block,
bool* find_block, bool* eos) {
*find_block = false;
*eos = false;
std::unique_lock l(mutex);
Expand All @@ -65,22 +66,40 @@ Status LoadBlockQueue::get_block(vectorized::Block* block, bool* find_block, boo
need_commit = true;
}
}
while (status.ok() && _block_queue.empty() &&
while (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() &&
(!need_commit || (need_commit && !_load_ids.empty()))) {
CHECK_EQ(_single_block_queue_bytes->load(), 0);
auto left_milliseconds = _group_commit_interval_ms;
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
if (!need_commit) {
left_milliseconds = _group_commit_interval_ms -
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
left_milliseconds = _group_commit_interval_ms - duration;
if (left_milliseconds <= 0) {
need_commit = true;
break;
}
} else {
if (duration >= 10 * _group_commit_interval_ms) {
std::stringstream ss;
ss << "[";
for (auto& id : _load_ids) {
ss << id.to_string() << ", ";
}
ss << "]";
LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", duration=" << duration << ", load_ids=" << ss.str()
<< ", runtime_state=" << runtime_state;
}
}
_get_cond.wait_for(l, std::chrono::milliseconds(left_milliseconds));
}
if (runtime_state->is_cancelled()) {
auto st = Status::Cancelled(runtime_state->cancel_reason());
_cancel_without_lock(st);
return st;
}
if (!_block_queue.empty()) {
auto fblock = _block_queue.front();
block->swap(*fblock.get());
Expand Down Expand Up @@ -120,6 +139,12 @@ Status LoadBlockQueue::add_load_id(const UniqueId& load_id) {
void LoadBlockQueue::cancel(const Status& st) {
DCHECK(!st.ok());
std::unique_lock l(mutex);
_cancel_without_lock(st);
}

void LoadBlockQueue::_cancel_without_lock(const Status& st) {
LOG(INFO) << "cancel group_commit, instance_id=" << load_instance_id << ", label=" << label
<< ", status=" << st.to_string();
status = st;
while (!_block_queue.empty()) {
{
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class LoadBlockQueue {
};

Status add_block(std::shared_ptr<vectorized::Block> block, bool write_wal);
Status get_block(vectorized::Block* block, bool* find_block, bool* eos);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block,
bool* eos);
Status add_load_id(const UniqueId& load_id);
void remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
Expand All @@ -72,6 +73,7 @@ class LoadBlockQueue {
Status status = Status::OK();

private:
void _cancel_without_lock(const Status& st);
std::chrono::steady_clock::time_point _start_time;

std::condition_variable _put_cond;
Expand Down
13 changes: 11 additions & 2 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "runtime/load_stream.h"

#include <brpc/stream.h>
#include <bthread/bthread.h>
#include <bthread/condition_variable.h>
#include <bthread/mutex.h>
#include <olap/rowset/rowset_factory.h>
Expand Down Expand Up @@ -136,7 +137,11 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
LOG(INFO) << "write data failed " << *this;
}
};
return _flush_tokens[new_segid % _flush_tokens.size()]->submit_func(flush_func);
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
while (flush_token->num_tasks() >= config::load_stream_flush_token_max_tasks) {
bthread_usleep(10 * 1000); // 10ms
}
return flush_token->submit_func(flush_func);
}

Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data) {
Expand Down Expand Up @@ -170,7 +175,11 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data
LOG(INFO) << "add segment failed " << *this;
}
};
return _flush_tokens[new_segid % _flush_tokens.size()]->submit_func(add_segment_func);
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
while (flush_token->num_tasks() >= config::load_stream_flush_token_max_tasks) {
bthread_usleep(10 * 1000); // 10ms
}
return flush_token->submit_func(add_segment_func);
}

Status TabletStream::close() {
Expand Down
16 changes: 14 additions & 2 deletions be/src/vec/columns/column_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/get_least_supertype.h"

#ifdef __AVX2__
#include "util/jsonb_parser_simd.h"
#else
#include "util/jsonb_parser.h"
#endif

namespace doris::vectorized {
namespace {

Expand Down Expand Up @@ -1155,8 +1161,14 @@ void ColumnObject::merge_sparse_to_root_column() {
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
root.Accept(writer);
bool res = parser.parse(buffer.GetString(), buffer.GetSize());
CHECK(res) << "buffer:" << std::string(buffer.GetString(), buffer.GetSize())
<< ", row_num:" << i;
if (!res) {
throw Exception(ErrorCode::INVALID_ARGUMENT,
"parse json failed, doc: {}"
", row_num:{}"
", error:{}",
std::string(buffer.GetString(), buffer.GetSize()), i,
JsonbErrMsg::getErrMsg(parser.getErrorCode()));
}
result_column_ptr->insert_data(parser.getWriter().getOutput()->getBuffer(),
parser.getWriter().getOutput()->getSize());
result_column_nullable->get_null_map_data().push_back(0);
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
#include "common/status.h"
#include "exprs/json_functions.h"
#include "runtime/jsonb_value.h"

#ifdef __AVX2__
#include "util/jsonb_parser_simd.h"
#else
#include "util/jsonb_parser.h"
#endif
namespace doris {
namespace vectorized {

Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,11 @@ std::string OrcReader::_get_field_name_lower_case(const orc::Type* orc_type, int
}

Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (_io_ctx && _io_ctx->should_stop) {
*eof = true;
*read_rows = 0;
return Status::OK();
}
if (_push_down_agg_type == TPushAggOp::type::COUNT) {
auto rows = std::min(get_remaining_rows(), (int64_t)_batch_size);

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/group_commit_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ GroupCommitScanNode::GroupCommitScanNode(ObjectPool* pool, const TPlanNode& tnod
Status GroupCommitScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
bool find_node = false;
while (!find_node && !*eos) {
RETURN_IF_ERROR(load_block_queue->get_block(block, &find_node, eos));
RETURN_IF_ERROR(load_block_queue->get_block(state, block, &find_node, eos));
}
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class PipScannerContext : public vectorized::ScannerContext {
{
std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
if (_blocks_queues[id].empty()) {
*eos = _is_finished || _should_stop;
*eos = done();
return Status::OK();
}
if (_process_status.is<ErrorCode::CANCELLED>()) {
Expand Down
15 changes: 8 additions & 7 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
int num_running_scanners = _num_running_scanners;

bool is_scheduled = false;
if (to_be_schedule && _num_running_scanners == 0) {
if (!done() && to_be_schedule && _num_running_scanners == 0) {
is_scheduled = true;
auto state = _scanner_scheduler->submit(shared_from_this());
if (state.ok()) {
Expand All @@ -287,8 +287,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
if (wait) {
// scanner batch wait time
SCOPED_TIMER(_scanner_wait_batch_timer);
while (!(!_blocks_queue.empty() || _is_finished || !status().ok() ||
state->is_cancelled())) {
while (!(!_blocks_queue.empty() || done() || !status().ok() || state->is_cancelled())) {
if (!is_scheduled && _num_running_scanners == 0 && should_be_scheduled()) {
LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue
<< ", serving_blocks_num " << serving_blocks_num
Expand Down Expand Up @@ -330,7 +329,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
}
}
} else {
*eos = _is_finished;
*eos = done();
}
}

Expand Down Expand Up @@ -400,8 +399,7 @@ void ScannerContext::dec_num_scheduling_ctx() {

void ScannerContext::set_ready_to_finish() {
// `_should_stop == true` means this task has already ended and wait for pending finish now.
if (_finish_dependency && _should_stop && _num_running_scanners == 0 &&
_num_scheduling_ctx == 0) {
if (_finish_dependency && done() && _num_running_scanners == 0 && _num_scheduling_ctx == 0) {
_finish_dependency->set_ready();
}
}
Expand Down Expand Up @@ -524,6 +522,9 @@ std::string ScannerContext::debug_string() {

void ScannerContext::reschedule_scanner_ctx() {
std::lock_guard l(_transfer_lock);
if (done()) {
return;
}
auto state = _scanner_scheduler->submit(shared_from_this());
//todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times?
if (state.ok()) {
Expand All @@ -546,7 +547,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) {
_num_running_scanners--;
set_ready_to_finish();

if (should_be_scheduled()) {
if (!done() && should_be_scheduled()) {
auto state = _scanner_scheduler->submit(shared_from_this());
if (state.ok()) {
_num_scheduling_ctx++;
Expand Down
6 changes: 2 additions & 4 deletions be/src/vec/exec/scan/vscanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,10 @@ Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
if (state->is_cancelled()) {
return Status::Cancelled("cancelled");
}

*eof = *eof || _should_stop;
// set eof to true if per scanner limit is reached
// currently for query: ORDER BY key LIMIT n
if (_limit > 0 && _num_rows_return >= _limit) {
*eof = true;
}
*eof = *eof || (_limit > 0 && _num_rows_return >= _limit);

return Status::OK();
}
Expand Down
Loading

0 comments on commit 7447535

Please sign in to comment.