Skip to content

Commit

Permalink
[pipelineX](improvement) enable local shuffle by default (#28046)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Dec 6, 2023
1 parent fa5096f commit 2881799
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 19 deletions.
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -729,13 +729,13 @@ AggSinkOperatorX<LocalStateType>::AggSinkOperatorX(ObjectPool* pool, int operato
_output_tuple_desc(nullptr),
_needs_finalize(tnode.agg_node.need_finalize),
_is_merge(false),
_is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase),
_pool(pool),
_limit(tnode.limit),
_have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()),
_is_streaming(is_streaming),
_partition_exprs(tnode.agg_node.grouping_exprs) {
_is_first_phase = tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase;
}
_partition_exprs(tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate) {}

template <typename LocalStateType>
Status AggSinkOperatorX<LocalStateType>::init(const TPlanNode& tnode, RuntimeState* state) {
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
if (_probe_expr_ctxs.empty()) {
return _needs_finalize ? ExchangeType::PASSTHROUGH : ExchangeType::NOOP;
}
return ExchangeType::HASH_SHUFFLE;
return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE;
}

using DataSinkOperatorX<LocalStateType>::id;
Expand All @@ -396,7 +396,7 @@ class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {

bool _needs_finalize;
bool _is_merge;
bool _is_first_phase;
const bool _is_first_phase;

size_t _align_aggregate_states = 1;
/// The offset to the n-th aggregate function in a row of aggregate functions.
Expand All @@ -415,6 +415,7 @@ class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
const bool _is_streaming;

const std::vector<TExpr> _partition_exprs;
const bool _is_colocate;
};

} // namespace pipeline
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/status.h"
#include "exchange_sink_buffer.h"
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
#include "vec/columns/column_const.h"
#include "vec/exprs/vexpr.h"
#include "vec/sink/vdata_stream_sender.h"
Expand Down Expand Up @@ -212,7 +213,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
if (p._part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();
_partitioner.reset(
new vectorized::XXHashPartitioner<vectorized::ShuffleChannelIds>(channels.size()));
new vectorized::XXHashPartitioner<LocalExchangeChannelIds>(channels.size()));
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
Expand Down
22 changes: 15 additions & 7 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
}
_num_instances = request.local_params.size();
_runtime_profile.reset(new RuntimeProfile("PipelineContext"));
_start_timer = ADD_TIMER(_runtime_profile, "StartTime");
COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time());
_prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
SCOPED_TIMER(_prepare_timer);

Expand Down Expand Up @@ -231,7 +229,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
static_cast<void>(root_pipeline->set_sink(_sink));

RETURN_IF_ERROR(_plan_local_shuffle(request.num_buckets, request.bucket_seq_to_instance_idx));
RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets, request.bucket_seq_to_instance_idx));

// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
Expand All @@ -250,7 +248,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
return Status::OK();
}

Status PipelineXFragmentContext::_plan_local_shuffle(
Status PipelineXFragmentContext::_plan_local_exchange(
int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx) {
for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) {
_pipelines[pip_idx]->init_need_to_local_shuffle_by_source();
Expand All @@ -266,20 +264,21 @@ Status PipelineXFragmentContext::_plan_local_shuffle(
}
}

RETURN_IF_ERROR(_plan_local_shuffle(num_buckets, pip_idx, _pipelines[pip_idx],
bucket_seq_to_instance_idx));
RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx],
bucket_seq_to_instance_idx));
}
return Status::OK();
}

Status PipelineXFragmentContext::_plan_local_shuffle(
Status PipelineXFragmentContext::_plan_local_exchange(
int num_buckets, int pip_idx, PipelinePtr pip,
const std::map<int, int>& bucket_seq_to_instance_idx) {
int idx = 0;
bool do_local_exchange = false;
do {
auto& ops = pip->operator_xs();
do_local_exchange = false;
// Plan local exchange for each operator.
for (; idx < ops.size();) {
if (ops[idx]->get_local_exchange_type() != ExchangeType::NOOP) {
RETURN_IF_ERROR(_add_local_exchange(
Expand All @@ -288,6 +287,10 @@ Status PipelineXFragmentContext::_plan_local_shuffle(
&do_local_exchange, num_buckets, bucket_seq_to_instance_idx));
}
if (do_local_exchange) {
// If local exchange is needed for current operator, we will split this pipeline to
// two pipelines by local exchange sink/source. And then we need to process remaining
// operators in this pipeline so we set idx to 2 (0 is local exchange source and 1
// is current operator was already processed) and continue to plan local exchange.
idx = 2;
break;
}
Expand Down Expand Up @@ -682,16 +685,21 @@ Status PipelineXFragmentContext::_add_local_exchange(
switch (exchange_type) {
case ExchangeType::HASH_SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(_num_instances);
// If HASH_SHUFFLE local exchanger is planned, data will be always HASH distribution so we
// do not need to plan another shuffle local exchange in the rest of current pipeline.
new_pip->set_need_to_local_shuffle(false);
cur_pipe->set_need_to_local_shuffle(false);
break;
case ExchangeType::BUCKET_HASH_SHUFFLE:
shared_state->exchanger =
BucketShuffleExchanger::create_unique(_num_instances, num_buckets);
// Same as ExchangeType::HASH_SHUFFLE.
new_pip->set_need_to_local_shuffle(false);
cur_pipe->set_need_to_local_shuffle(false);
break;
case ExchangeType::PASSTHROUGH:
// If PASSTHROUGH local exchanger is planned, data will be split randomly. So we should make
// sure remaining operators should use local shuffle to make data distribution right.
shared_state->exchanger = PassthroughExchanger::create_unique(_num_instances);
new_pip->set_need_to_local_shuffle(cur_pipe->need_to_local_shuffle());
cur_pipe->set_need_to_local_shuffle(true);
Expand Down
8 changes: 4 additions & 4 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
const TPipelineFragmentParams& params, const RowDescriptor& row_desc,
RuntimeState* state, DescriptorTbl& desc_tbl,
PipelineId cur_pipeline_id);
Status _plan_local_shuffle(int num_buckets,
const std::map<int, int>& bucket_seq_to_instance_idx);
Status _plan_local_shuffle(int num_buckets, int pip_idx, PipelinePtr pip,
const std::map<int, int>& bucket_seq_to_instance_idx);
Status _plan_local_exchange(int num_buckets,
const std::map<int, int>& bucket_seq_to_instance_idx);
Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip,
const std::map<int, int>& bucket_seq_to_instance_idx);

bool _has_inverted_index_or_partial_update(TOlapTableSink sink);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,9 @@ public PlanFragment visitPhysicalHashAggregate(
&& inputPlanFragment.getDataPartition().getType() != TPartitionType.RANDOM
&& aggregate.getAggregateParam().aggMode != AggMode.INPUT_TO_BUFFER) {
inputPlanFragment.setHasColocatePlanNode(true);
// Set colocate info in agg node. This is a hint for local shuffling to decide which type of
// local exchanger will be used.
aggregationNode.setColocate(true);
}
setPlanRoot(inputPlanFragment, aggregationNode, aggregate);
if (aggregate.getStats() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class AggregationNode extends PlanNode {
// Set to true if this aggregation node needs to run the Finalize step. This
// node is the root node of a distributed aggregation.
private boolean needsFinalize;
private boolean isColocate = false;

// If true, use streaming preaggregation algorithm. Not valid if this is a merge agg.
private boolean useStreamingPreagg;
Expand Down Expand Up @@ -277,6 +278,7 @@ protected void toThrift(TPlanNode msg) {
msg.agg_node.setAggSortInfos(aggSortInfos);
msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg);
msg.agg_node.setIsFirstPhase(aggInfo.isFirstPhase());
msg.agg_node.setIsColocate(isColocate);
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
if (groupingExprs != null) {
msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs));
Expand Down Expand Up @@ -375,4 +377,8 @@ public void finalize(Analyzer analyzer) throws UserException {
aggInfo.getOutputTupleDesc().computeMemLayout();
}
}

public void setColocate(boolean colocate) {
isColocate = colocate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {

initQueryOptions(context);
if (planner instanceof OriginalPlanner) {
// Enable local shuffle on pipelineX engine only if Nereids planner is applied.
queryOptions.setEnableLocalShuffle(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,11 @@ public class SessionVariable implements Serializable, Writable {
needForward = true)
private boolean enableSharedScan = false;

@VariableMgr.VarAttr(name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL)
private boolean enableLocalShuffle = false;
@VariableMgr.VarAttr(
name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
description = {"是否在pipelineX引擎上开启local shuffle优化",
"Whether to enable local shuffle on pipelineX engine."})
private boolean enableLocalShuffle = true;

@VariableMgr.VarAttr(name = ENABLE_AGG_STATE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL)
public boolean enableAggState = false;
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ struct TAggregationNode {
6: optional bool use_streaming_preaggregation
7: optional list<TSortInfo> agg_sort_infos
8: optional bool is_first_phase
9: optional bool is_colocate
// 9: optional bool use_fixed_length_serialization_opt
}

Expand Down

0 comments on commit 2881799

Please sign in to comment.