Skip to content

Commit

Permalink
[pipelineX](refactor) rename functions (apache#28846)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored and stephen committed Dec 28, 2023
1 parent 67c67fa commit e8955b4
Show file tree
Hide file tree
Showing 17 changed files with 25 additions and 25 deletions.
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,12 @@ class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
if (_probe_expr_ctxs.empty()) {
return _needs_finalize || DataSinkOperatorX<LocalStateType>::_child_x
->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<LocalStateType>::get_local_exchange_type();
: DataSinkOperatorX<LocalStateType>::required_data_distribution();
}
return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,15 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt

Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return DataSinkOperatorX<AnalyticSinkLocalState>::get_local_exchange_type();
return DataSinkOperatorX<AnalyticSinkLocalState>::required_data_distribution();
}

private:
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/assert_num_rows_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class AssertNumRowsOperatorX final : public StreamingOperatorX<AssertNumRowsLoca

[[nodiscard]] bool is_source() const override { return false; }

DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
return {ExchangeType::PASSTHROUGH};
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
return _sub_plan_query_statistics_recvr;
}

DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
if (OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
return {ExchangeType::NOOP};
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class HashJoinBuildSinkOperatorX final
._should_build_hash_table;
}

DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
} else if (_is_broadcast_join) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
SourceState& source_state) const override;

bool need_more_input_data(RuntimeState* state) const override;
DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/nested_loop_join_build_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class NestedLoopJoinBuildSinkOperatorX final
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/nested_loop_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class NestedLoopJoinProbeOperatorX final
return _old_version_flag ? _row_descriptor : *_intermediate_row_desc;
}

DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/partition_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorX<PartitionSortS
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
return DataSinkOperatorX<PartitionSortSinkLocalState>::get_local_exchange_type();
return DataSinkOperatorX<PartitionSortSinkLocalState>::required_data_distribution();
}
return {ExchangeType::PASSTHROUGH};
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ class ScanOperatorX : public OperatorX<LocalStateType> {

TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }

DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
if (_col_distribute_ids.empty() || OperatorX<LocalStateType>::ignore_data_distribution()) {
// 1. `_col_distribute_ids` is empty means storage distribution is not effective, so we prefer to do local shuffle.
// 2. `ignore_data_distribution()` returns true means we ignore the distribution.
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/set_probe_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class SetProbeSinkOperatorX final : public DataSinkOperatorX<SetProbeSinkLocalSt

Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/set_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class SetSinkOperatorX final : public DataSinkOperatorX<SetSinkLocalState<is_int

Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
if (_merge_by_exchange) {
// The current sort node is used for the ORDER BY
return {ExchangeType::PASSTHROUGH};
}
return DataSinkOperatorX<SortSinkLocalState>::get_local_exchange_type();
return DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
}

private:
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/streaming_aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class StreamingAggSinkOperatorX final : public AggSinkOperatorX<StreamingAggSink
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
DataDistribution get_local_exchange_type() const override {
DataDistribution required_data_distribution() const override {
return {ExchangeType::PASSTHROUGH};
}
};
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
}
}
void init_data_distribution() {
set_data_distribution(operatorXs.front()->get_local_exchange_type());
set_data_distribution(operatorXs.front()->required_data_distribution());
}
void set_data_distribution(const DataDistribution& data_distribution) {
_data_distribution = data_distribution;
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class OperatorXBase : public OperatorBase {
}
[[nodiscard]] std::string get_name() const override { return _op_name; }
[[nodiscard]] virtual DependencySPtr get_dependency(QueryContext* ctx) = 0;
[[nodiscard]] virtual DataDistribution get_local_exchange_type() const {
[[nodiscard]] virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution() && !is_source()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
Expand Down Expand Up @@ -481,7 +481,7 @@ class DataSinkOperatorXBase : public OperatorBase {
}

virtual void get_dependency(std::vector<DependencySPtr>& dependency, QueryContext* ctx) = 0;
virtual DataDistribution get_local_exchange_type() const {
virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
Expand Down
8 changes: 4 additions & 4 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,10 @@ Status PipelineXFragmentContext::_plan_local_exchange(
do_local_exchange = false;
// Plan local exchange for each operator.
for (; idx < ops.size();) {
if (ops[idx]->get_local_exchange_type().need_local_exchange()) {
if (ops[idx]->required_data_distribution().need_local_exchange()) {
RETURN_IF_ERROR(_add_local_exchange(
pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip,
ops[idx]->get_local_exchange_type(), &do_local_exchange, num_buckets,
ops[idx]->required_data_distribution(), &do_local_exchange, num_buckets,
bucket_seq_to_instance_idx, ignore_data_hash_distribution));
}
if (do_local_exchange) {
Expand All @@ -305,10 +305,10 @@ Status PipelineXFragmentContext::_plan_local_exchange(
idx++;
}
} while (do_local_exchange);
if (pip->sink_x()->get_local_exchange_type().need_local_exchange()) {
if (pip->sink_x()->required_data_distribution().need_local_exchange()) {
RETURN_IF_ERROR(_add_local_exchange(
pip_idx, idx, pip->sink_x()->node_id(), _runtime_state->obj_pool(), pip,
pip->sink_x()->get_local_exchange_type(), &do_local_exchange, num_buckets,
pip->sink_x()->required_data_distribution(), &do_local_exchange, num_buckets,
bucket_seq_to_instance_idx, ignore_data_hash_distribution));
}
return Status::OK();
Expand Down

0 comments on commit e8955b4

Please sign in to comment.