Skip to content

Commit

Permalink
[code](pipelineX) refine some pipelineX code (apache#28570)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored and HappenLee committed Jan 12, 2024
1 parent f8c90de commit 6daf531
Show file tree
Hide file tree
Showing 14 changed files with 130 additions and 82 deletions.
14 changes: 6 additions & 8 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const {
}

Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_sender_id = info.sender_id;
Expand Down Expand Up @@ -174,9 +174,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
id, p._dest_node_id, _sender_id, _state->be_number(), state->get_query_ctx());

register_channels(_sink_buffer.get());

_exchange_sink_dependency = AndDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
auto* _exchange_sink_dependency = _dependency;
_queue_dependency = ExchangeSinkQueueDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
Expand Down Expand Up @@ -237,7 +235,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
}

Status ExchangeSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
RETURN_IF_ERROR(Base::open(state));
auto& p = _parent->cast<ExchangeSinkOperatorX>();
if (p._part_type == TPartitionType::HASH_PARTITIONED ||
p._part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
Expand Down Expand Up @@ -522,8 +520,7 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status)

std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
PipelineXSinkLocalState<>::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {}, _busy_channels = {})",
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load());
return fmt::to_string(debug_string_buffer);
Expand All @@ -536,6 +533,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_close_timer);
COUNTER_UPDATE(_wait_queue_timer, _queue_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
if (_broadcast_dependency) {
COUNTER_UPDATE(_wait_broadcast_buffer_timer, _broadcast_dependency->watcher_elapse_time());
}
Expand All @@ -545,7 +543,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
}
_sink_buffer->update_profile(profile());
_sink_buffer->close();
return PipelineXSinkLocalState<>::close(state, exec_status);
return Base::close(state, exec_status);
}

} // namespace doris::pipeline
16 changes: 11 additions & 5 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,20 +144,25 @@ class LocalExchangeChannelDependency final : public Dependency {
// TODO(gabriel): blocked by memory
};

class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndDependency> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<AndDependency>;

public:
ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: PipelineXSinkLocalState<>(parent, state),
: Base(parent, state),
current_channel_idx(0),
only_local_exchange(false),
_serializer(this) {}
_serializer(this) {
_finish_dependency = std::make_shared<FinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
}

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Dependency* dependency() override { return _exchange_sink_dependency.get(); }
Dependency* finishdependency() override { return _finish_dependency.get(); }
Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1);
void register_channels(pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer);
Status get_next_available_buffer(vectorized::BroadcastPBlockHolder** holder);
Expand Down Expand Up @@ -231,11 +236,12 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;

std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency;
std::shared_ptr<AndDependency> _exchange_sink_dependency;
std::shared_ptr<BroadcastDependency> _broadcast_dependency;
std::vector<std::shared_ptr<LocalExchangeChannelDependency>> _local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
int _partition_count;

std::shared_ptr<Dependency> _finish_dependency;
};

class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {
Expand Down
14 changes: 6 additions & 8 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ bool ExchangeSourceOperator::is_pending_finish() const {
}

ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<>(state, parent), num_rows_skipped(0), is_ready(false) {}
: Base(state, parent), num_rows_skipped(0), is_ready(false) {}

std::string ExchangeLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
PipelineXLocalState<>::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Queues: (");
const auto& queues = stream_recvr->sender_queues();
for (size_t i = 0; i < queues.size(); i++) {
Expand All @@ -68,15 +67,14 @@ std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const {
}

Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<ExchangeSourceOperatorX>();
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(),
profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
source_dependency = AndDependency::create_shared(_parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
auto* source_dependency = _dependency;
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
metrics.resize(queues.size());
Expand All @@ -101,7 +99,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
Status ExchangeLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(PipelineXLocalState<>::open(state));
RETURN_IF_ERROR(Base::open(state));
return Status::OK();
}

Expand Down Expand Up @@ -215,7 +213,7 @@ Status ExchangeLocalState::close(RuntimeState* state) {
if (_parent->cast<ExchangeSourceOperatorX>()._is_merging) {
vsort_exec_exprs.close(state);
}
return PipelineXLocalState<>::close(state);
return Base::close(state);
}

Status ExchangeSourceOperatorX::close(RuntimeState* state) {
Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,22 @@ struct ExchangeDataDependency final : public Dependency {
};

class ExchangeSourceOperatorX;
class ExchangeLocalState final : public PipelineXLocalState<> {
class ExchangeLocalState final : public PipelineXLocalState<AndDependency> {
ENABLE_FACTORY_CREATOR(ExchangeLocalState);

public:
using Base = PipelineXLocalState<AndDependency>;
ExchangeLocalState(RuntimeState* state, OperatorXBase* parent);

Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
Dependency* dependency() override { return source_dependency.get(); }
std::string debug_string(int indentation_level) const override;
std::shared_ptr<doris::vectorized::VDataStreamRecvr> stream_recvr;
doris::vectorized::VSortExecExprs vsort_exec_exprs;
int64_t num_rows_skipped;
bool is_ready;

std::shared_ptr<AndDependency> source_dependency;
std::vector<std::shared_ptr<ExchangeDataDependency>> deps;

std::vector<RuntimeProfile::Counter*> metrics;
Expand Down
10 changes: 7 additions & 3 deletions be/src/pipeline/exec/multi_cast_data_stream_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,13 @@ RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() const
MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state,
OperatorXBase* parent)
: Base(state, parent),
vectorized::RuntimeFilterConsumer(
static_cast<Parent*>(parent)->dest_id_from_sink(), parent->runtime_filter_descs(),
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {};
vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
parent->runtime_filter_descs(),
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
};

Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/multi_cast_data_stream_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,11 @@ class MultiCastDataStreamSourceLocalState final

friend class MultiCastDataStreamerSourceOperatorX;

RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }

private:
vectorized::VExprContextSPtrs _output_expr_contexts;
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
};

class MultiCastDataStreamerSourceOperatorX final
Expand Down
14 changes: 12 additions & 2 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,14 @@ std::string ScanOperator::debug_string() const {

template <typename Derived>
ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalStateBase(state, parent) {}
: ScanLocalStateBase(state, parent) {
_finish_dependency = std::make_shared<FinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
}

template <typename Derived>
bool ScanLocalState<Derived>::ready_to_read() {
Expand Down Expand Up @@ -1311,6 +1318,9 @@ Status ScanLocalState<Derived>::_init_profile() {

_max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT);

_wait_for_finish_dependency_timer =
ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");

return Status::OK();
}

Expand Down Expand Up @@ -1442,7 +1452,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this), state);
}
COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time());

COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
return PipelineXLocalState<>::close(state);
}

Expand Down
9 changes: 9 additions & 0 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::Runt
RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr;
// time of prefilter input block from scanner
RuntimeProfile::Counter* _wait_for_eos_timer = nullptr;

RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
};

template <typename LocalStateType>
Expand Down Expand Up @@ -211,6 +213,9 @@ class ScanLocalState : public ScanLocalStateBase {

Dependency* dependency() override { return _scan_dependency.get(); }

RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); };
Dependency* finishdependency() override { return _finish_dependency.get(); }

protected:
template <typename LocalStateType>
friend class ScanOperatorX;
Expand Down Expand Up @@ -405,6 +410,10 @@ class ScanLocalState : public ScanLocalStateBase {
std::atomic<bool> _eos = false;

std::mutex _block_lock;

std::shared_ptr<RuntimeFilterDependency> _filter_dependency;

std::shared_ptr<Dependency> _finish_dependency;
};

template <typename LocalStateType>
Expand Down
Loading

0 comments on commit 6daf531

Please sign in to comment.