From bb1a9c98d42c224fefb1140158a7c52ec651c1c3 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 1 Aug 2024 16:05:53 +0800 Subject: [PATCH] [fix](local exchange) Fix local exchange blocked by a huge data block (#38657) If a huge block is push into local exchanger, it will be blocked due to concurrent problems. This PR use a unique lock to resolve it . --- .../local_exchange_sink_operator.h | 2 + .../local_exchange_source_operator.cpp | 6 +- .../local_exchange_source_operator.h | 2 + .../local_exchange/local_exchanger.cpp | 115 ++++++++---------- .../pipeline/local_exchange/local_exchanger.h | 14 +++ 5 files changed, 72 insertions(+), 67 deletions(-) diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index 0ff1df260012b7..faa48d209f4b1e 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -56,6 +56,8 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState + friend class Exchanger; ExchangerBase* _exchanger = nullptr; diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp index 56f0a157cdee8d..6b0cca2d71a969 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp @@ -75,11 +75,13 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, " - "_running_sink_operators: {}, _running_source_operators: {}, mem_usage: {}", + "_running_sink_operators: {}, _running_source_operators: {}, mem_usage: {}, " + "data queue info: {}", Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, _exchanger->_running_sink_operators, _exchanger->_running_source_operators, - _shared_state->mem_usage.load()); + _shared_state->mem_usage.load(), + _exchanger->data_queue_debug_string(_channel_id)); size_t i = 0; fmt::format_to(debug_string_buffer, ", MemTrackers: "); for (auto* mem_tracker : _shared_state->mem_trackers) { diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h b/be/src/pipeline/local_exchange/local_exchange_source_operator.h index f9fa4cfa4edfe3..d2f68d4ebaca31 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h @@ -51,6 +51,8 @@ class LocalExchangeSourceLocalState final : public PipelineXLocalState + friend class Exchanger; ExchangerBase* _exchanger = nullptr; int _channel_id; diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 27b7fc7e7fd3f7..647ddcfba2d87e 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -26,6 +26,37 @@ namespace doris::pipeline { +template +bool Exchanger::_enqueue_data_and_set_ready(int channel_id, + LocalExchangeSinkLocalState& local_state, + BlockType&& block) { + std::unique_lock l(_m); + if (_data_queue[channel_id].enqueue(std::move(block))) { + local_state._shared_state->set_ready_to_read(channel_id); + return true; + } + return false; +} + +template +bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState& local_state, + BlockType& block, bool* eos) { + bool all_finished = _running_sink_operators == 0; + if (_data_queue[local_state._channel_id].try_dequeue(block)) { + return true; + } else if (all_finished) { + *eos = true; + } else { + std::unique_lock l(_m); + if (_data_queue[local_state._channel_id].try_dequeue(block)) { + return true; + } + COUNTER_UPDATE(local_state._get_block_failed_counter, 1); + local_state._dependency->block(); + } + return false; +} + Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) { { @@ -74,17 +105,11 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block return Status::OK(); }; - bool all_finished = _running_sink_operators == 0; - if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { + if (_dequeue_data(local_state, partitioned_block, eos)) { SCOPED_TIMER(local_state._copy_data_timer); mutable_block = vectorized::VectorizedUtils::build_mutable_mem_reuse_block( block, partitioned_block.first->data_block); RETURN_IF_ERROR(get_data(block)); - } else if (all_finished) { - *eos = true; - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - local_state._dependency->block(); } return Status::OK(); } @@ -92,7 +117,6 @@ Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, bool eos, LocalExchangeSinkLocalState& local_state) { - auto& data_queue = _data_queue; const auto rows = block->rows(); auto row_idx = std::make_shared>(rows); { @@ -135,9 +159,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (size > 0) { local_state._shared_state->add_mem_usage( it.second, new_block_wrapper->data_block.allocated_bytes(), false); - if (data_queue[it.second].enqueue({new_block_wrapper, {row_idx, start, size}})) { - local_state._shared_state->set_ready_to_read(it.second); - } else { + + if (!_enqueue_data_and_set_ready(it.second, local_state, + {new_block_wrapper, {row_idx, start, size}})) { local_state._shared_state->sub_mem_usage( it.second, new_block_wrapper->data_block.allocated_bytes(), false); new_block_wrapper->unref(local_state._shared_state); @@ -154,10 +178,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (size > 0) { local_state._shared_state->add_mem_usage( i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), false); - if (data_queue[i % _num_sources].enqueue( - {new_block_wrapper, {row_idx, start, size}})) { - local_state._shared_state->set_ready_to_read(i % _num_sources); - } else { + if (!_enqueue_data_and_set_ready(i % _num_sources, local_state, + {new_block_wrapper, {row_idx, start, size}})) { local_state._shared_state->sub_mem_usage( i % _num_sources, new_block_wrapper->data_block.allocated_bytes(), false); @@ -177,9 +199,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest if (size > 0) { local_state._shared_state->add_mem_usage( map[i], new_block_wrapper->data_block.allocated_bytes(), false); - if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, start, size}})) { - local_state._shared_state->set_ready_to_read(map[i]); - } else { + if (!_enqueue_data_and_set_ready(map[i], local_state, + {new_block_wrapper, {row_idx, start, size}})) { local_state._shared_state->sub_mem_usage( map[i], new_block_wrapper->data_block.allocated_bytes(), false); new_block_wrapper->unref(local_state._shared_state); @@ -203,9 +224,7 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo auto channel_id = (local_state._channel_id++) % _num_partitions; size_t memory_usage = new_block.allocated_bytes(); local_state._shared_state->add_mem_usage(channel_id, memory_usage); - if (_data_queue[channel_id].enqueue(std::move(new_block))) { - local_state._shared_state->set_ready_to_read(channel_id); - } else { + if (!_enqueue_data_and_set_ready(channel_id, local_state, std::move(new_block))) { local_state._shared_state->sub_mem_usage(channel_id, memory_usage); } @@ -224,19 +243,13 @@ void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) { Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - bool all_finished = _running_sink_operators == 0; - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + if (_dequeue_data(local_state, next_block, eos)) { block->swap(next_block); local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); if (_free_block_limit == 0 || _free_blocks.size_approx() < _free_block_limit * _num_sources) { _free_blocks.enqueue(std::move(next_block)); } - } else if (all_finished) { - *eos = true; - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - local_state._dependency->block(); } return Status::OK(); } @@ -245,9 +258,7 @@ Status PassToOneExchanger::sink(RuntimeState* state, vectorized::Block* in_block LocalExchangeSinkLocalState& local_state) { vectorized::Block new_block(in_block->clone_empty()); new_block.swap(*in_block); - if (_data_queue[0].enqueue(std::move(new_block))) { - local_state._shared_state->set_ready_to_read(0); - } + _enqueue_data_and_set_ready(0, local_state, std::move(new_block)); return Status::OK(); } @@ -259,14 +270,8 @@ Status PassToOneExchanger::get_block(RuntimeState* state, vectorized::Block* blo return Status::OK(); } vectorized::Block next_block; - bool all_finished = _running_sink_operators == 0; - if (_data_queue[0].try_dequeue(next_block)) { + if (_dequeue_data(local_state, next_block, eos)) { *block = std::move(next_block); - } else if (all_finished) { - *eos = true; - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - local_state._dependency->block(); } return Status::OK(); } @@ -283,9 +288,7 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, vectorized::Block* in_ size_t memory_usage = new_block.allocated_bytes(); add_mem_usage(local_state, memory_usage); - if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) { - local_state._shared_state->set_ready_to_read(0); - } else { + if (!_enqueue_data_and_set_ready(local_state._channel_id, local_state, std::move(new_block))) { sub_mem_usage(local_state, memory_usage); } if (eos) { @@ -402,9 +405,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block for (size_t i = 0; i < _num_partitions; i++) { auto mutable_block = vectorized::MutableBlock::create_unique(in_block->clone_empty()); RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, in_block->rows())); - if (_data_queue[i].enqueue(mutable_block->to_block())) { - local_state._shared_state->set_ready_to_read(i); - } + _enqueue_data_and_set_ready(i, local_state, mutable_block->to_block()); } return Status::OK(); @@ -421,14 +422,8 @@ void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) { Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - bool all_finished = _running_sink_operators == 0; - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + if (_dequeue_data(local_state, next_block, eos)) { *block = std::move(next_block); - } else if (all_finished) { - *eos = true; - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - local_state._dependency->block(); } return Status::OK(); } @@ -444,9 +439,8 @@ Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state, auto channel_id = (local_state._channel_id++) % _num_partitions; size_t memory_usage = new_block.allocated_bytes(); local_state._shared_state->add_mem_usage(channel_id, memory_usage); - if (_data_queue[channel_id].enqueue(std::move(new_block))) { - local_state._shared_state->set_ready_to_read(channel_id); - } else { + + if (!_enqueue_data_and_set_ready(channel_id, local_state, std::move(new_block))) { local_state._shared_state->sub_mem_usage(channel_id, memory_usage); } @@ -477,7 +471,6 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, const uint32_t* __restrict channel_ids, vectorized::Block* block, bool eos, LocalExchangeSinkLocalState& local_state) { - auto& data_queue = _data_queue; const auto rows = block->rows(); auto row_idx = std::make_shared>(rows); { @@ -506,9 +499,7 @@ Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state, size_t memory_usage = new_block.allocated_bytes(); local_state._shared_state->add_mem_usage(i, memory_usage); - if (data_queue[i].enqueue(std::move(new_block))) { - local_state._shared_state->set_ready_to_read(i); - } else { + if (!_enqueue_data_and_set_ready(i, local_state, std::move(new_block))) { local_state._shared_state->sub_mem_usage(i, memory_usage); } } @@ -532,19 +523,13 @@ Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized:: bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; - bool all_finished = _running_sink_operators == 0; - if (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + if (_dequeue_data(local_state, next_block, eos)) { block->swap(next_block); if (_free_block_limit == 0 || _free_blocks.size_approx() < _free_block_limit * _num_sources) { _free_blocks.enqueue(std::move(next_block)); } local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes()); - } else if (all_finished) { - *eos = true; - } else { - COUNTER_UPDATE(local_state._get_block_failed_counter, 1); - local_state._dependency->block(); } return Status::OK(); } diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 2c4f8f5b78509e..6cd64126069051 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -55,6 +55,8 @@ class ExchangerBase { virtual std::vector local_sink_state_dependency(int channel_id) { return {}; } virtual std::vector local_state_dependency(int channel_id) { return {}; } + virtual std::string data_queue_debug_string(int i) = 0; + protected: friend struct LocalExchangeSharedState; friend struct ShuffleBlockWrapper; @@ -115,9 +117,21 @@ class Exchanger : public ExchangerBase { : ExchangerBase(running_sink_operators, num_sources, num_partitions, free_block_limit) { } ~Exchanger() override = default; + std::string data_queue_debug_string(int i) override { + fmt::memory_buffer debug_string_buffer; + fmt::format_to(debug_string_buffer, "Data Queue {}: [size approx = {}, eos = {}]", + _data_queue[i].data_queue.size_approx(), _data_queue[i].eos); + return fmt::to_string(debug_string_buffer); + } protected: + bool _enqueue_data_and_set_ready(int channel_id, LocalExchangeSinkLocalState& local_state, + BlockType&& block); + bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& block, bool* eos); std::vector> _data_queue; + +private: + std::mutex _m; }; class LocalExchangeSourceLocalState;