Skip to content

Commit

Permalink
[fix](local exchange) Fix local exchange blocked by a huge data block (
Browse files Browse the repository at this point in the history
…#38657)

If a huge block is push into local exchanger, it will be blocked due to
concurrent problems. This PR use a unique lock to resolve it .
  • Loading branch information
Gabriel39 authored Aug 1, 2024
1 parent 7e8415f commit bb1a9c9
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 67 deletions.
2 changes: 2 additions & 0 deletions be/src/pipeline/local_exchange/local_exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState<LocalEx
friend class PassToOneExchanger;
friend class LocalMergeSortExchanger;
friend class AdaptivePassthroughExchanger;
template <typename BlockType>
friend class Exchanger;

ExchangerBase* _exchanger = nullptr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, "
"_running_sink_operators: {}, _running_source_operators: {}, mem_usage: {}",
"_running_sink_operators: {}, _running_source_operators: {}, mem_usage: {}, "
"data queue info: {}",
Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
_exchanger->_running_sink_operators, _exchanger->_running_source_operators,
_shared_state->mem_usage.load());
_shared_state->mem_usage.load(),
_exchanger->data_queue_debug_string(_channel_id));
size_t i = 0;
fmt::format_to(debug_string_buffer, ", MemTrackers: ");
for (auto* mem_tracker : _shared_state->mem_trackers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class LocalExchangeSourceLocalState final : public PipelineXLocalState<LocalExch
friend class PassToOneExchanger;
friend class LocalMergeSortExchanger;
friend class AdaptivePassthroughExchanger;
template <typename BlockType>
friend class Exchanger;

ExchangerBase* _exchanger = nullptr;
int _channel_id;
Expand Down
115 changes: 50 additions & 65 deletions be/src/pipeline/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,37 @@

namespace doris::pipeline {

template <typename BlockType>
bool Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
LocalExchangeSinkLocalState& local_state,
BlockType&& block) {
std::unique_lock l(_m);
if (_data_queue[channel_id].enqueue(std::move(block))) {
local_state._shared_state->set_ready_to_read(channel_id);
return true;
}
return false;
}

template <typename BlockType>
bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& local_state,
BlockType& block, bool* eos) {
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(block)) {
return true;
} else if (all_finished) {
*eos = true;
} else {
std::unique_lock l(_m);
if (_data_queue[local_state._channel_id].try_dequeue(block)) {
return true;
}
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return false;
}

Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
LocalExchangeSinkLocalState& local_state) {
{
Expand Down Expand Up @@ -74,25 +105,18 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block
return Status::OK();
};

bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
if (_dequeue_data(local_state, partitioned_block, eos)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
block, partitioned_block.first->data_block);
RETURN_IF_ERROR(get_data(block));
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return Status::OK();
}

Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids,
vectorized::Block* block, bool eos,
LocalExchangeSinkLocalState& local_state) {
auto& data_queue = _data_queue;
const auto rows = block->rows();
auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows);
{
Expand Down Expand Up @@ -135,9 +159,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
it.second, new_block_wrapper->data_block.allocated_bytes(), false);
if (data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(it.second);
} else {

if (!_enqueue_data_and_set_ready(it.second, local_state,
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->sub_mem_usage(
it.second, new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
Expand All @@ -154,10 +178,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), false);
if (data_queue[i % _num_sources].enqueue(
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(i % _num_sources);
} else {
if (!_enqueue_data_and_set_ready(i % _num_sources, local_state,
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->sub_mem_usage(
i % _num_sources, new_block_wrapper->data_block.allocated_bytes(),
false);
Expand All @@ -177,9 +199,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
map[i], new_block_wrapper->data_block.allocated_bytes(), false);
if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(map[i]);
} else {
if (!_enqueue_data_and_set_ready(map[i], local_state,
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->sub_mem_usage(
map[i], new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
Expand All @@ -203,9 +224,7 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo
auto channel_id = (local_state._channel_id++) % _num_partitions;
size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(channel_id, memory_usage);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
} else {
if (!_enqueue_data_and_set_ready(channel_id, local_state, std::move(new_block))) {
local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}

Expand All @@ -224,19 +243,13 @@ void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
if (_dequeue_data(local_state, next_block, eos)) {
block->swap(next_block);
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return Status::OK();
}
Expand All @@ -245,9 +258,7 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block
LocalExchangeSinkLocalState& local_state) {
vectorized::Block new_block(in_block->clone_empty());
new_block.swap(*in_block);
if (_data_queue[0].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(0);
}
_enqueue_data_and_set_ready(0, local_state, std::move(new_block));

return Status::OK();
}
Expand All @@ -259,14 +270,8 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo
return Status::OK();
}
vectorized::Block next_block;
bool all_finished = _running_sink_operators == 0;
if (_data_queue[0].try_dequeue(next_block)) {
if (_dequeue_data(local_state, next_block, eos)) {
*block = std::move(next_block);
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return Status::OK();
}
Expand All @@ -283,9 +288,7 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_
size_t memory_usage = new_block.allocated_bytes();
add_mem_usage(local_state, memory_usage);

if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(0);
} else {
if (!_enqueue_data_and_set_ready(local_state._channel_id, local_state, std::move(new_block))) {
sub_mem_usage(local_state, memory_usage);
}
if (eos) {
Expand Down Expand Up @@ -402,9 +405,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block
for (size_t i = 0; i < _num_partitions; i++) {
auto mutable_block = vectorized::MutableBlock::create_unique(in_block->clone_empty());
RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, in_block->rows()));
if (_data_queue[i].enqueue(mutable_block->to_block())) {
local_state._shared_state->set_ready_to_read(i);
}
_enqueue_data_and_set_ready(i, local_state, mutable_block->to_block());
}

return Status::OK();
Expand All @@ -421,14 +422,8 @@ void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
if (_dequeue_data(local_state, next_block, eos)) {
*block = std::move(next_block);
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return Status::OK();
}
Expand All @@ -444,9 +439,8 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
auto channel_id = (local_state._channel_id++) % _num_partitions;
size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(channel_id, memory_usage);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
} else {

if (!_enqueue_data_and_set_ready(channel_id, local_state, std::move(new_block))) {
local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}

Expand Down Expand Up @@ -477,7 +471,6 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
const uint32_t* __restrict channel_ids,
vectorized::Block* block, bool eos,
LocalExchangeSinkLocalState& local_state) {
auto& data_queue = _data_queue;
const auto rows = block->rows();
auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
{
Expand Down Expand Up @@ -506,9 +499,7 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,

size_t memory_usage = new_block.allocated_bytes();
local_state._shared_state->add_mem_usage(i, memory_usage);
if (data_queue[i].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(i);
} else {
if (!_enqueue_data_and_set_ready(i, local_state, std::move(new_block))) {
local_state._shared_state->sub_mem_usage(i, memory_usage);
}
}
Expand All @@ -532,19 +523,13 @@ Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
bool* eos,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
bool all_finished = _running_sink_operators == 0;
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
if (_dequeue_data(local_state, next_block, eos)) {
block->swap(next_block);
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
} else if (all_finished) {
*eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
}
return Status::OK();
}
Expand Down
14 changes: 14 additions & 0 deletions be/src/pipeline/local_exchange/local_exchanger.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class ExchangerBase {
virtual std::vector<Dependency*> local_sink_state_dependency(int channel_id) { return {}; }
virtual std::vector<Dependency*> local_state_dependency(int channel_id) { return {}; }

virtual std::string data_queue_debug_string(int i) = 0;

protected:
friend struct LocalExchangeSharedState;
friend struct ShuffleBlockWrapper;
Expand Down Expand Up @@ -115,9 +117,21 @@ class Exchanger : public ExchangerBase {
: ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) {
}
~Exchanger() override = default;
std::string data_queue_debug_string(int i) override {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "Data Queue {}: [size approx = {}, eos = {}]",
_data_queue[i].data_queue.size_approx(), _data_queue[i].eos);
return fmt::to_string(debug_string_buffer);
}

protected:
bool _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState& local_state,
BlockType&& block);
bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& block, bool* eos);
std::vector<BlockQueue<BlockType>> _data_queue;

private:
std::mutex _m;
};

class LocalExchangeSourceLocalState;
Expand Down

0 comments on commit bb1a9c9

Please sign in to comment.