Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Dec 15, 2023
1 parent 2018ab2 commit b11254b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 18 deletions.
34 changes: 16 additions & 18 deletions be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ Status VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
return Status::OK();
}

void VDataStreamRecvr::SenderQueue::try_set_dep_ready_without_lock() {
if (!_dependency) {
return;
}
const bool should_wait = !_is_cancelled && _block_queue.empty() && _num_remaining_senders > 0;
if (!should_wait) {
_dependency->set_ready();
}
}

Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number,
int64_t packet_seq,
::google::protobuf::Closure** done) {
Expand Down Expand Up @@ -178,9 +188,7 @@ Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_num

if (!empty) {
_block_queue.emplace_back(std::move(block), block_byte_size);
if (_dependency) {
_dependency->set_ready();
}
try_set_dep_ready_without_lock();
}
// if done is nullptr, this function can't delay this response
if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
Expand Down Expand Up @@ -234,9 +242,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {

if (!empty) {
_block_queue.emplace_back(std::move(nblock), block_mem_size);
if (_dependency) {
_dependency->set_ready();
}
try_set_dep_ready_without_lock();
_data_arrival_cv.notify_one();
}

Expand Down Expand Up @@ -273,9 +279,7 @@ void VDataStreamRecvr::SenderQueue::decrement_senders(int be_number) {
<< print_id(_recvr->fragment_instance_id()) << " node_id=" << _recvr->dest_node_id()
<< " #senders=" << _num_remaining_senders;
if (_num_remaining_senders == 0) {
if (_dependency) {
_dependency->set_ready();
}
try_set_dep_ready_without_lock();
_data_arrival_cv.notify_one();
}
}
Expand All @@ -288,9 +292,7 @@ void VDataStreamRecvr::SenderQueue::cancel(Status cancel_status) {
}
_is_cancelled = true;
_cancel_status = cancel_status;
if (_dependency) {
_dependency->set_ready();
}
try_set_dep_ready_without_lock();
VLOG_QUERY << "cancelled stream: _fragment_instance_id="
<< print_id(_recvr->fragment_instance_id())
<< " node_id=" << _recvr->dest_node_id();
Expand Down Expand Up @@ -318,9 +320,7 @@ void VDataStreamRecvr::SenderQueue::close() {
// is clear will be memory leak
std::lock_guard<std::mutex> l(_lock);
_is_cancelled = true;
if (_dependency) {
_dependency->set_ready();
}
try_set_dep_ready_without_lock();

for (auto closure_pair : _pending_closures) {
closure_pair.first->Run();
Expand Down Expand Up @@ -561,9 +561,7 @@ void VDataStreamRecvr::PipSenderQueue::add_block(Block* block, bool use_move) {
return;
}
_block_queue.emplace_back(std::move(nblock), block_mem_size);
if (_dependency) {
_dependency->set_ready();
}
try_set_dep_ready_without_lock();
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_mem_size);
_recvr->update_blocks_memory_usage(block_mem_size);
_data_arrival_cv.notify_one();
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/runtime/vdata_stream_recvr.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ class VDataStreamRecvr::SenderQueue {
friend struct pipeline::ExchangeDataDependency;
Status _inner_get_batch_without_lock(Block* block, bool* eos);

void try_set_dep_ready_without_lock();

// Not managed by this class
VDataStreamRecvr* _recvr = nullptr;
std::mutex _lock;
Expand Down

0 comments on commit b11254b

Please sign in to comment.