From 131ed678d3b90fd911ad8963e94d7a1b7c01142c Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 6 Dec 2023 10:12:58 +0800 Subject: [PATCH 1/5] [pipelineX](improvement) enable local shuffle by default --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 6 +++--- be/src/pipeline/exec/aggregation_sink_operator.h | 3 ++- gensrc/thrift/PlanNodes.thrift | 1 + 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index a07d19f2c9a161..cc4328b8c08e0c 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -729,13 +729,13 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operato _output_tuple_desc(nullptr), _needs_finalize(tnode.agg_node.need_finalize), _is_merge(false), + _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), _pool(pool), _limit(tnode.limit), _have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()), _is_streaming(is_streaming), - _partition_exprs(tnode.agg_node.grouping_exprs) { - _is_first_phase = tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase; -} + _partition_exprs(tnode.agg_node.grouping_exprs), + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate) {} template Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index a6fa439f2f524a..c871e64893277a 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -396,7 +396,7 @@ class AggSinkOperatorX : public DataSinkOperatorX { bool _needs_finalize; bool _is_merge; - bool _is_first_phase; + const bool _is_first_phase; size_t _align_aggregate_states = 1; /// The offset to the n-th aggregate function in a row of aggregate functions. @@ -415,6 +415,7 @@ class AggSinkOperatorX : public DataSinkOperatorX { const bool _is_streaming; const std::vector _partition_exprs; + const bool _is_colocate; }; } // namespace pipeline diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index b32259d35ce3be..b78bd90b4e6393 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -854,6 +854,7 @@ struct TAggregationNode { 6: optional bool use_streaming_preaggregation 7: optional list agg_sort_infos 8: optional bool is_first_phase + 9: optional bool is_colocate // 9: optional bool use_fixed_length_serialization_opt } From 19da93148b9b66d36fab59f054b2dba4546a160f Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 6 Dec 2023 11:38:19 +0800 Subject: [PATCH 2/5] update --- be/src/pipeline/exec/aggregation_sink_operator.h | 2 +- .../nereids/glue/translator/PhysicalPlanTranslator.java | 1 + .../main/java/org/apache/doris/planner/AggregationNode.java | 6 ++++++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index c871e64893277a..cd85390dce0a34 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -371,7 +371,7 @@ class AggSinkOperatorX : public DataSinkOperatorX { if (_probe_expr_ctxs.empty()) { return _needs_finalize ? ExchangeType::PASSTHROUGH : ExchangeType::NOOP; } - return ExchangeType::HASH_SHUFFLE; + return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE; } using DataSinkOperatorX::id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 72e56d09c4ec31..f53781a0b4e91b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -860,6 +860,7 @@ public PlanFragment visitPhysicalHashAggregate( && inputPlanFragment.getDataPartition().getType() != TPartitionType.RANDOM && aggregate.getAggregateParam().aggMode != AggMode.INPUT_TO_BUFFER) { inputPlanFragment.setHasColocatePlanNode(true); + aggregationNode.setColocate(true); } setPlanRoot(inputPlanFragment, aggregationNode, aggregate); if (aggregate.getStats() != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index 5d00144f05c605..f0e31157d3f957 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -59,6 +59,7 @@ public class AggregationNode extends PlanNode { // Set to true if this aggregation node needs to run the Finalize step. This // node is the root node of a distributed aggregation. private boolean needsFinalize; + private boolean isColocate = false; // If true, use streaming preaggregation algorithm. Not valid if this is a merge agg. private boolean useStreamingPreagg; @@ -277,6 +278,7 @@ protected void toThrift(TPlanNode msg) { msg.agg_node.setAggSortInfos(aggSortInfos); msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg); msg.agg_node.setIsFirstPhase(aggInfo.isFirstPhase()); + msg.agg_node.setIsColocate(isColocate); List groupingExprs = aggInfo.getGroupingExprs(); if (groupingExprs != null) { msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs)); @@ -375,4 +377,8 @@ public void finalize(Analyzer analyzer) throws UserException { aggInfo.getOutputTupleDesc().computeMemLayout(); } } + + public void setColocate(boolean colocate) { + isColocate = colocate; + } } From 221fd16e89a4897563a6823c7aca0a667f75bcf3 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 6 Dec 2023 11:39:24 +0800 Subject: [PATCH 3/5] update --- .../src/main/java/org/apache/doris/qe/SessionVariable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index bcd3a5980f557d..217ae8ac205cd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -771,7 +771,7 @@ public class SessionVariable implements Serializable, Writable { private boolean enableSharedScan = false; @VariableMgr.VarAttr(name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL) - private boolean enableLocalShuffle = false; + private boolean enableLocalShuffle = true; @VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL) public boolean enableAggState = false; From 59acedf1b874d593434fdbc0500e64a4f658f2f9 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 6 Dec 2023 13:46:44 +0800 Subject: [PATCH 4/5] update --- .../pipeline_x_fragment_context.cpp | 22 +++++++++++++------ .../pipeline_x/pipeline_x_fragment_context.h | 8 +++---- .../translator/PhysicalPlanTranslator.java | 2 ++ .../java/org/apache/doris/qe/Coordinator.java | 1 + .../org/apache/doris/qe/SessionVariable.java | 5 ++++- 5 files changed, 26 insertions(+), 12 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index e6949e66c349c2..294b9fc109f979 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -165,8 +165,6 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r } _num_instances = request.local_params.size(); _runtime_profile.reset(new RuntimeProfile("PipelineContext")); - _start_timer = ADD_TIMER(_runtime_profile, "StartTime"); - COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time()); _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime"); SCOPED_TIMER(_prepare_timer); @@ -231,7 +229,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r RETURN_IF_ERROR(_sink->init(request.fragment.output_sink)); static_cast(root_pipeline->set_sink(_sink)); - RETURN_IF_ERROR(_plan_local_shuffle(request.num_buckets, request.bucket_seq_to_instance_idx)); + RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets, request.bucket_seq_to_instance_idx)); // 4. Initialize global states in pipelines. for (PipelinePtr& pipeline : _pipelines) { @@ -250,7 +248,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r return Status::OK(); } -Status PipelineXFragmentContext::_plan_local_shuffle( +Status PipelineXFragmentContext::_plan_local_exchange( int num_buckets, const std::map& bucket_seq_to_instance_idx) { for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) { _pipelines[pip_idx]->init_need_to_local_shuffle_by_source(); @@ -266,13 +264,13 @@ Status PipelineXFragmentContext::_plan_local_shuffle( } } - RETURN_IF_ERROR(_plan_local_shuffle(num_buckets, pip_idx, _pipelines[pip_idx], - bucket_seq_to_instance_idx)); + RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx], + bucket_seq_to_instance_idx)); } return Status::OK(); } -Status PipelineXFragmentContext::_plan_local_shuffle( +Status PipelineXFragmentContext::_plan_local_exchange( int num_buckets, int pip_idx, PipelinePtr pip, const std::map& bucket_seq_to_instance_idx) { int idx = 0; @@ -280,6 +278,7 @@ Status PipelineXFragmentContext::_plan_local_shuffle( do { auto& ops = pip->operator_xs(); do_local_exchange = false; + // Plan local exchange for each operator. for (; idx < ops.size();) { if (ops[idx]->get_local_exchange_type() != ExchangeType::NOOP) { RETURN_IF_ERROR(_add_local_exchange( @@ -288,6 +287,10 @@ Status PipelineXFragmentContext::_plan_local_shuffle( &do_local_exchange, num_buckets, bucket_seq_to_instance_idx)); } if (do_local_exchange) { + // If local exchange is needed for current operator, we will split this pipeline to + // two pipelines by local exchange sink/source. And then we need to process remaining + // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1 + // is current operator was already processed) and continue to plan local exchange. idx = 2; break; } @@ -682,16 +685,21 @@ Status PipelineXFragmentContext::_add_local_exchange( switch (exchange_type) { case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique(_num_instances); + // If HASH_SHUFFLE local exchanger is planned, data will be always HASH distribution so we + // do not need to plan another shuffle local exchange in the rest of current pipeline. new_pip->set_need_to_local_shuffle(false); cur_pipe->set_need_to_local_shuffle(false); break; case ExchangeType::BUCKET_HASH_SHUFFLE: shared_state->exchanger = BucketShuffleExchanger::create_unique(_num_instances, num_buckets); + // Same as ExchangeType::HASH_SHUFFLE. new_pip->set_need_to_local_shuffle(false); cur_pipe->set_need_to_local_shuffle(false); break; case ExchangeType::PASSTHROUGH: + // If PASSTHROUGH local exchanger is planned, data will be split randomly. So we should make + // sure remaining operators should use local shuffle to make data distribution right. shared_state->exchanger = PassthroughExchanger::create_unique(_num_instances); new_pip->set_need_to_local_shuffle(cur_pipe->need_to_local_shuffle()); cur_pipe->set_need_to_local_shuffle(true); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index f78cf2aa8576d4..c8b042bd39d63e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -154,10 +154,10 @@ class PipelineXFragmentContext : public PipelineFragmentContext { const TPipelineFragmentParams& params, const RowDescriptor& row_desc, RuntimeState* state, DescriptorTbl& desc_tbl, PipelineId cur_pipeline_id); - Status _plan_local_shuffle(int num_buckets, - const std::map& bucket_seq_to_instance_idx); - Status _plan_local_shuffle(int num_buckets, int pip_idx, PipelinePtr pip, - const std::map& bucket_seq_to_instance_idx); + Status _plan_local_exchange(int num_buckets, + const std::map& bucket_seq_to_instance_idx); + Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip, + const std::map& bucket_seq_to_instance_idx); bool _has_inverted_index_or_partial_update(TOlapTableSink sink); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index f53781a0b4e91b..46e42dbe2c60ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -860,6 +860,8 @@ public PlanFragment visitPhysicalHashAggregate( && inputPlanFragment.getDataPartition().getType() != TPartitionType.RANDOM && aggregate.getAggregateParam().aggMode != AggMode.INPUT_TO_BUFFER) { inputPlanFragment.setHasColocatePlanNode(true); + // Set colocate info in agg node. This is a hint for local shuffling to decide which type of + // local exchanger will be used. aggregationNode.setColocate(true); } setPlanRoot(inputPlanFragment, aggregationNode, aggregate); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 30a746d36e861b..7b779b7cfafcc6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -302,6 +302,7 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { initQueryOptions(context); if (planner instanceof OriginalPlanner) { + // Enable local shuffle on pipelineX engine only if Nereids planner is applied. queryOptions.setEnableLocalShuffle(false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 217ae8ac205cd1..c39dfd6ee284ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -770,7 +770,10 @@ public class SessionVariable implements Serializable, Writable { needForward = true) private boolean enableSharedScan = false; - @VariableMgr.VarAttr(name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL) + @VariableMgr.VarAttr( + name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, + description = {"是否在pipelineX引擎上开启local shuffle优化", + "Whether to enable local shuffle on pipelineX engine."}) private boolean enableLocalShuffle = true; @VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL) From 5259537c845ede7c15b2647d74fbe7f8d23e03d0 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 6 Dec 2023 15:22:30 +0800 Subject: [PATCH 5/5] update --- be/src/pipeline/exec/exchange_sink_operator.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 8143e71a4ada93..f8c3e392184104 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -26,6 +26,7 @@ #include "common/status.h" #include "exchange_sink_buffer.h" #include "pipeline/exec/operator.h" +#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h" #include "vec/columns/column_const.h" #include "vec/exprs/vexpr.h" #include "vec/sink/vdata_stream_sender.h" @@ -212,7 +213,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf if (p._part_type == TPartitionType::HASH_PARTITIONED) { _partition_count = channels.size(); _partitioner.reset( - new vectorized::XXHashPartitioner(channels.size())); + new vectorized::XXHashPartitioner(channels.size())); RETURN_IF_ERROR(_partitioner->init(p._texprs)); RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc)); _profile->add_info_string("Partitioner",