diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index dd0070fba695d..4b81f7e3a0e04 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -332,6 +332,7 @@ public final class SystemSessionProperties private static final String NATIVE_EXECUTION_EXECUTABLE_PATH = "native_execution_executable_path"; private static final String NATIVE_EXECUTION_PROGRAM_ARGUMENTS = "native_execution_program_arguments"; public static final String NATIVE_EXECUTION_PROCESS_REUSE_ENABLED = "native_execution_process_reuse_enabled"; + public static final String NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING = "native_min_columnar_encoding_channels_to_prefer_row_wise_encoding"; private final List> sessionProperties; @@ -1824,6 +1825,11 @@ public SystemSessionProperties( booleanProperty(INLINE_PROJECTIONS_ON_VALUES, "Whether to evaluate project node on values node", featuresConfig.getInlineProjectionsOnValues(), + false), + integerProperty( + NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, + "Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only", + queryManagerConfig.getMinColumnarEncodingChannelsToPreferRowWiseEncoding(), false)); } @@ -3100,4 +3106,9 @@ public static boolean isInlineProjectionsOnValues(Session session) { return session.getSystemProperty(INLINE_PROJECTIONS_ON_VALUES, Boolean.class); } + + public static int getMinColumnarEncodingChannelsToPreferRowWiseEncoding(Session session) + { + return session.getSystemProperty(NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, Integer.class); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java index 4a60332718bb6..1ce3742f48c2a 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java @@ -100,6 +100,8 @@ public class QueryManagerConfig private int rateLimiterCacheLimit = 1000; private int rateLimiterCacheWindowMinutes = 5; + private int minColumnarEncodingChannelsToPreferRowWiseEncoding = 1000; + @Min(1) public int getScheduleSplitBatchSize() { @@ -738,6 +740,19 @@ public QueryManagerConfig setEnableWorkerIsolation(boolean enableWorkerIsolation return this; } + public int getMinColumnarEncodingChannelsToPreferRowWiseEncoding() + { + return minColumnarEncodingChannelsToPreferRowWiseEncoding; + } + + @Config("min-columnar-encoding-channels-to-prefer-row-wise-encoding") + @ConfigDescription("Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only") + public QueryManagerConfig setMinColumnarEncodingChannelsToPreferRowWiseEncoding(int minColumnarEncodingChannelsToPreferRowWiseEncoding) + { + this.minColumnarEncodingChannelsToPreferRowWiseEncoding = minColumnarEncodingChannelsToPreferRowWiseEncoding; + return this; + } + public enum ExchangeMaterializationStrategy { NONE, diff --git a/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java b/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java index 9b5d0c72cabba..2c8cec755b598 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java @@ -64,6 +64,7 @@ import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; +import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; @@ -284,6 +285,7 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges( outputs, Optional.empty(), false, + COLUMNAR, Optional.empty()); ExchangeNode writerRemoteSource = new ExchangeNode( diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 938bc146dae03..dea97b8d6fd21 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -288,6 +288,8 @@ public class FeaturesConfig private boolean eagerPlanValidationEnabled; private int eagerPlanValidationThreadPoolSize = 20; + private boolean prestoSparkExecutionEnvironment; + public enum PartitioningPrecisionStrategy { // Let Presto decide when to repartition @@ -2846,4 +2848,16 @@ public int getEagerPlanValidationThreadPoolSize() { return this.eagerPlanValidationThreadPoolSize; } + + public boolean isPrestoSparkExecutionEnvironment() + { + return prestoSparkExecutionEnvironment; + } + + @Config("presto-spark-execution-environment") + public FeaturesConfig setPrestoSparkExecutionEnvironment(boolean prestoSparkExecutionEnvironment) + { + this.prestoSparkExecutionEnvironment = prestoSparkExecutionEnvironment; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index eb3d4e7ff872f..a3da01b778bf4 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -305,7 +305,16 @@ private PlanNode createRemoteStreamingExchange(ExchangeNode exchange, RewriteCon .map(PlanFragment::getId) .collect(toImmutableList()); - return new RemoteSourceNode(exchange.getSourceLocation(), exchange.getId(), exchange.getStatsEquivalentPlanNode(), childrenIds, exchange.getOutputVariables(), exchange.isEnsureSourceOrdering(), exchange.getOrderingScheme(), exchange.getType()); + return new RemoteSourceNode( + exchange.getSourceLocation(), + exchange.getId(), + exchange.getStatsEquivalentPlanNode(), + childrenIds, + exchange.getOutputVariables(), + exchange.isEnsureSourceOrdering(), + exchange.getOrderingScheme(), + exchange.getType(), + exchange.getPartitioningScheme().getEncoding()); } protected void setDistributionForExchange(ExchangeNode.Type exchangeType, PartitioningScheme partitioningScheme, RewriteContext context) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index f232d9a7c528f..3b511ecf2ac31 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -324,11 +324,6 @@ import static com.facebook.presto.sql.analyzer.ExpressionTreeUtils.createSymbolReference; import static com.facebook.presto.sql.gen.LambdaBytecodeGenerator.compileLambdaProvider; import static com.facebook.presto.sql.planner.RowExpressionInterpreter.rowExpressionInterpreter; -import static com.facebook.presto.sql.planner.SystemPartitioningHandle.COORDINATOR_DISTRIBUTION; -import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION; -import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION; -import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION; -import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static com.facebook.presto.sql.planner.plan.AssignmentUtils.identityAssignments; import static com.facebook.presto.sql.relational.Expressions.constant; import static com.facebook.presto.sql.tree.SortItem.Ordering.ASCENDING; @@ -539,11 +534,7 @@ public LocalExecutionPlan plan( private OutputFactory createOutputFactory(TaskContext taskContext, PartitioningScheme partitioningScheme, OutputBuffer outputBuffer) { - if (partitioningScheme.getPartitioning().getHandle().equals(FIXED_BROADCAST_DISTRIBUTION) || - partitioningScheme.getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION) || - partitioningScheme.getPartitioning().getHandle().equals(SCALED_WRITER_DISTRIBUTION) || - partitioningScheme.getPartitioning().getHandle().equals(SINGLE_DISTRIBUTION) || - partitioningScheme.getPartitioning().getHandle().equals(COORDINATOR_DISTRIBUTION)) { + if (partitioningScheme.isSingleOrBroadcastOrArbitrary()) { return new TaskOutputFactory(outputBuffer); } @@ -557,11 +548,7 @@ private OutputFactory createOutputFactory(TaskContext taskContext, PartitioningS private Optional createOutputPartitioning(TaskContext taskContext, PartitioningScheme partitioningScheme) { - if (partitioningScheme.getPartitioning().getHandle().equals(FIXED_BROADCAST_DISTRIBUTION) || - partitioningScheme.getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION) || - partitioningScheme.getPartitioning().getHandle().equals(SCALED_WRITER_DISTRIBUTION) || - partitioningScheme.getPartitioning().getHandle().equals(SINGLE_DISTRIBUTION) || - partitioningScheme.getPartitioning().getHandle().equals(COORDINATOR_DISTRIBUTION)) { + if (partitioningScheme.isSingleOrBroadcastOrArbitrary()) { return Optional.empty(); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java index bfceaabb6e908..5570b14859cb2 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java @@ -229,6 +229,7 @@ private static SubPlan reassignPartitioningHandleIfNecessaryHelper(Metadata meta outputPartitioningScheme.getOutputLayout(), outputPartitioningScheme.getHashColumn(), outputPartitioningScheme.isReplicateNullsAndAny(), + outputPartitioningScheme.getEncoding(), outputPartitioningScheme.getBucketToPartition()), fragment.getStageExecutionDescriptor(), fragment.isOutputTableWriterFragment(), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java index 32184097720da..fd4dec41e7777 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java @@ -35,6 +35,7 @@ import com.facebook.presto.sql.planner.iterative.rule.CrossJoinWithOrFilterToInnerJoin; import com.facebook.presto.sql.planner.iterative.rule.DesugarLambdaExpression; import com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType; +import com.facebook.presto.sql.planner.iterative.rule.DetermineRemotePartitionedExchangeEncoding; import com.facebook.presto.sql.planner.iterative.rule.DetermineSemiJoinDistributionType; import com.facebook.presto.sql.planner.iterative.rule.EliminateCrossJoins; import com.facebook.presto.sql.planner.iterative.rule.EvaluateZeroLimit; @@ -929,7 +930,14 @@ public PlanOptimizers( // Precomputed hashes - this assumes that partitioning will not change builder.add(new HashGenerationOptimizer(metadata.getFunctionAndTypeManager())); - + builder.add(new IterativeOptimizer( + metadata, + ruleStats, + statsCalculator, + costCalculator, + ImmutableSet.of(new DetermineRemotePartitionedExchangeEncoding( + featuresConfig.isNativeExecutionEnabled(), + featuresConfig.isPrestoSparkExecutionEnvironment())))); builder.add(new MetadataDeleteOptimizer(metadata)); // TODO: consider adding a formal final plan sanitization optimizer that prepares the plan for transmission/execution/logging diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/SystemPartitioningHandle.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/SystemPartitioningHandle.java index d7e7b8ee73100..8a4174c9cefe7 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/SystemPartitioningHandle.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/SystemPartitioningHandle.java @@ -101,7 +101,7 @@ public SystemPartitionFunction getFunction() @Override public boolean isSingleNode() { - return partitioning == SystemPartitioning.COORDINATOR_ONLY || partitioning == SystemPartitioning.SINGLE; + return function == SystemPartitionFunction.SINGLE; } @Override @@ -110,6 +110,18 @@ public boolean isCoordinatorOnly() return partitioning == SystemPartitioning.COORDINATOR_ONLY; } + @Override + public boolean isBroadcast() + { + return function == SystemPartitionFunction.BROADCAST; + } + + @Override + public boolean isArbitrary() + { + return function == SystemPartitionFunction.ROUND_ROBIN; + } + @Override public boolean equals(Object o) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineRemotePartitionedExchangeEncoding.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineRemotePartitionedExchangeEncoding.java new file mode 100644 index 0000000000000..0162e39e8efba --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/DetermineRemotePartitionedExchangeEncoding.java @@ -0,0 +1,137 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.iterative.rule; + +import com.facebook.presto.Session; +import com.facebook.presto.common.type.FixedWidthType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.matching.Captures; +import com.facebook.presto.matching.Pattern; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.facebook.presto.sql.planner.iterative.Rule; +import com.facebook.presto.sql.planner.plan.ExchangeNode; +import com.google.common.annotations.VisibleForTesting; + +import static com.facebook.presto.SystemSessionProperties.getMinColumnarEncodingChannelsToPreferRowWiseEncoding; +import static com.facebook.presto.spi.plan.ExchangeEncoding.ROW_WISE; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; +import static com.facebook.presto.sql.planner.plan.Patterns.Exchange.scope; +import static com.facebook.presto.sql.planner.plan.Patterns.Exchange.type; +import static com.facebook.presto.sql.planner.plan.Patterns.exchange; + +public class DetermineRemotePartitionedExchangeEncoding + implements Rule +{ + private static final Pattern PATTERN = exchange() + .with(scope().equalTo(REMOTE_STREAMING)) + .with(type().equalTo(REPARTITION)); + + private final boolean nativeExecution; + private final boolean prestoSparkExecutionEnvironment; + + public DetermineRemotePartitionedExchangeEncoding(boolean nativeExecution, boolean prestoSparkExecutionEnvironment) + { + this.nativeExecution = nativeExecution; + this.prestoSparkExecutionEnvironment = prestoSparkExecutionEnvironment; + } + + @Override + public Pattern getPattern() + { + return PATTERN; + } + + @Override + public boolean isEnabled(Session session) + { + return nativeExecution || prestoSparkExecutionEnvironment; + } + + @Override + public Result apply(ExchangeNode node, Captures captures, Context context) + { + if (prestoSparkExecutionEnvironment) { + // In Presto on Spark, row-wise encoding is always used for non-special shuffles (i.e., excluding broadcast, single, and arbitrary shuffles). + // To accurately reflect this in the plan, the exchange encoding is set here. + // Presto on Spark does not check the ExchangeEncoding specified in the plan. + return determineForPrestoOnSpark(node); + } + if (nativeExecution) { + return determineForNativeExecution(context.getSession(), node); + } + // Presto Java runtime does not support row-wise encoding + return Result.empty(); + } + + private Result determineForPrestoOnSpark(ExchangeNode node) + { + // keep columnar for special cases + if (node.getPartitioningScheme().isSingleOrBroadcastOrArbitrary()) { + return Result.empty(); + } + if (node.getPartitioningScheme().getEncoding() == ROW_WISE) { + // leave untouched if already visited + return Result.empty(); + } + // otherwise switch to row-wise + return Result.ofPlanNode(node.withRowWiseEncoding()); + } + + private Result determineForNativeExecution(Session session, ExchangeNode node) + { + // keep columnar for special cases + if (node.getPartitioningScheme().isSingleOrBroadcastOrArbitrary()) { + return Result.empty(); + } + if (node.getPartitioningScheme().getEncoding() == ROW_WISE) { + // leave untouched if already visited + return Result.empty(); + } + int minChannelsToPreferRowWiseEncoding = getMinColumnarEncodingChannelsToPreferRowWiseEncoding(session); + if (estimateNumberOfOutputColumnarChannels(node) >= minChannelsToPreferRowWiseEncoding) { + return Result.ofPlanNode(node.withRowWiseEncoding()); + } + return Result.empty(); + } + + @VisibleForTesting + static long estimateNumberOfOutputColumnarChannels(ExchangeNode node) + { + return node.getOutputVariables().stream() + .map(VariableReferenceExpression::getType) + .mapToLong(DetermineRemotePartitionedExchangeEncoding::estimateNumberOfColumnarChannels) + .sum(); + } + + @VisibleForTesting + static long estimateNumberOfColumnarChannels(Type type) + { + if (type instanceof FixedWidthType) { + // nulls and values + return 2; + } + if (!type.getTypeParameters().isEmpty()) { + // complex type + // nulls and offsets + long result = 2; + for (Type parameter : type.getTypeParameters()) { + result += estimateNumberOfColumnarChannels(parameter); + } + return result; + } + // nulls, offsets, values + return 3; + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java index 36ac4334d73db..d3bc4c8b562ba 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java @@ -233,6 +233,7 @@ private PlanNode pushPartial(AggregationNode aggregation, ExchangeNode exchange, aggregationOutputs, exchange.getPartitioningScheme().getHashColumn(), exchange.getPartitioningScheme().isReplicateNullsAndAny(), + exchange.getPartitioningScheme().getEncoding(), exchange.getPartitioningScheme().getBucketToPartition()); return new ExchangeNode( diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushProjectionThroughExchange.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushProjectionThroughExchange.java index 52ca3baaf998c..cc568f94a01ee 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushProjectionThroughExchange.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushProjectionThroughExchange.java @@ -146,6 +146,7 @@ public Result apply(ProjectNode project, Captures captures, Context context) outputBuilder.build(), exchange.getPartitioningScheme().getHashColumn(), exchange.getPartitioningScheme().isReplicateNullsAndAny(), + exchange.getPartitioningScheme().getEncoding(), exchange.getPartitioningScheme().getBucketToPartition()); PlanNode result = new ExchangeNode( diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughAssignUniqueId.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughAssignUniqueId.java index 84e01e63c38eb..c98ddfe1c3e22 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughAssignUniqueId.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughAssignUniqueId.java @@ -81,6 +81,7 @@ public Result apply(ExchangeNode node, Captures captures, Context context) removeVariable(partitioningScheme.getOutputLayout(), assignUniqueId.getIdVariable()), partitioningScheme.getHashColumn(), partitioningScheme.isReplicateNullsAndAny(), + partitioningScheme.getEncoding(), partitioningScheme.getBucketToPartition()), ImmutableList.of(assignUniqueId.getSource()), ImmutableList.of(removeVariable(getOnlyElement(node.getInputs()), assignUniqueId.getIdVariable())), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughGroupId.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughGroupId.java index 6d2bced948cbb..085dcc44f2f6f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughGroupId.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughGroupId.java @@ -152,6 +152,7 @@ public Result apply(ExchangeNode node, Captures captures, Context context) outputLayout, partitioningScheme.getHashColumn(), partitioningScheme.isReplicateNullsAndAny(), + partitioningScheme.getEncoding(), partitioningScheme.getBucketToPartition()), ImmutableList.of(groupIdNode.getSource()), ImmutableList.of(inputs), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index 9d677ac23247a..d06d3561aba5a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -120,6 +120,7 @@ import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; import static com.facebook.presto.operator.aggregation.AggregationUtils.hasSingleNodeExecutionPreference; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; +import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR; import static com.facebook.presto.spi.plan.LimitNode.Step.PARTIAL; import static com.facebook.presto.sql.planner.FragmentTableScanCounter.getNumberOfTableScans; import static com.facebook.presto.sql.planner.FragmentTableScanCounter.hasMultipleTableScans; @@ -1034,6 +1035,7 @@ public PlanWithProperties visitSemiJoin(SemiJoinNode node, PreferredProperties p filteringSource.getNode().getOutputVariables(), Optional.empty(), true, + COLUMNAR, Optional.empty())), filteringSource.getProperties()); } @@ -1075,6 +1077,7 @@ public PlanWithProperties visitSemiJoin(SemiJoinNode node, PreferredProperties p filteringSource.getNode().getOutputVariables(), Optional.empty(), true, + COLUMNAR, Optional.empty())), filteringSource.getProperties()); } @@ -1250,6 +1253,7 @@ public PlanWithProperties visitUnion(UnionNode node, PreferredProperties parentP source.getNode().getOutputVariables(), Optional.empty(), nullsAndAnyReplicated, + COLUMNAR, Optional.empty())), source.getProperties()); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HashGenerationOptimizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HashGenerationOptimizer.java index 8f24422d9f26c..ca693e3301ce4 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HashGenerationOptimizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HashGenerationOptimizer.java @@ -587,6 +587,7 @@ public PlanWithProperties visitExchange(ExchangeNode node, HashComputationSet pa .build(), partitionVariables.map(newHashVariables::get), partitioningScheme.isReplicateNullsAndAny(), + partitioningScheme.getEncoding(), partitioningScheme.getBucketToPartition()); // add hash variables to sources diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergePartialAggregationsWithFilter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergePartialAggregationsWithFilter.java index f872704c7057e..8db400fb2b980 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergePartialAggregationsWithFilter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergePartialAggregationsWithFilter.java @@ -356,6 +356,7 @@ public PlanNode visitExchange(ExchangeNode node, RewriteContext context children.get(children.size() - 1).getOutputVariables(), node.getPartitioningScheme().getHashColumn(), node.getPartitioningScheme().isReplicateNullsAndAny(), + node.getPartitioningScheme().getEncoding(), node.getPartitioningScheme().getBucketToPartition()); return new ExchangeNode( diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PartitioningUtils.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PartitioningUtils.java index 73d228eab1b73..4cbd60d20c475 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PartitioningUtils.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PartitioningUtils.java @@ -344,7 +344,7 @@ public static PartitioningScheme translateOutputLayout(PartitioningScheme partit .map(oldOutputLayout::indexOf) .map(newOutputLayout::get); - return new PartitioningScheme(newPartitioning, newOutputLayout, newHashSymbol, partitioningScheme.isReplicateNullsAndAny(), partitioningScheme.getBucketToPartition()); + return new PartitioningScheme(newPartitioning, newOutputLayout, newHashSymbol, partitioningScheme.isReplicateNullsAndAny(), partitioningScheme.getEncoding(), partitioningScheme.getBucketToPartition()); } // Translates VariableReferenceExpression in arguments according to translator, keeps other arguments unchanged. diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java index 08ea868f8d566..09f4b2b2373a5 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java @@ -169,6 +169,7 @@ public PlanNode visitExchange(ExchangeNode node, RewriteContext rewrittenSources = ImmutableList.builder(); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java index 5e7331b566e48..cca820e193c3e 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java @@ -305,6 +305,7 @@ private PartitioningScheme canonicalize(PartitioningScheme scheme, PlanNode sour mapAndDistinctVariable(source.getOutputVariables()), scheme.getHashColumn().map(this::map), scheme.isReplicateNullsAndAny(), + scheme.getEncoding(), scheme.getBucketToPartition()); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java index e6dc7e4c3020e..cc81a53c45d60 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java @@ -334,6 +334,7 @@ public PlanNode visitExchange(ExchangeNode node, RewriteContext context) outputs.build(), canonicalize(node.getPartitioningScheme().getHashColumn()), node.getPartitioningScheme().isReplicateNullsAndAny(), + node.getPartitioningScheme().getEncoding(), node.getPartitioningScheme().getBucketToPartition()); Optional orderingScheme = node.getOrderingScheme().map(this::canonicalizeAndDistinct); @@ -398,7 +399,8 @@ public PlanNode visitRemoteSource(RemoteSourceNode node, RewriteContext co canonicalizeAndDistinct(node.getOutputVariables()), node.isEnsureSourceOrdering(), node.getOrderingScheme().map(this::canonicalizeAndDistinct), - node.getExchangeType()); + node.getExchangeType(), + node.getEncoding()); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/ExchangeNode.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/ExchangeNode.java index 237257a5e7e72..12928728b6d86 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/ExchangeNode.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/ExchangeNode.java @@ -30,12 +30,14 @@ import java.util.List; import java.util.Optional; +import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_PASSTHROUGH_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_MATERIALIZED; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPLICATE; @@ -137,6 +139,7 @@ public ExchangeNode( checkArgument(!scope.isLocal() || partitioningScheme.getPartitioning().getArguments().stream().allMatch(VariableReferenceExpression.class::isInstance), "local exchanges do not support constant partition function arguments"); + checkArgument(scope == REMOTE_STREAMING || partitioningScheme.getEncoding() == COLUMNAR, "Only REMOTE_STREAMING can be ROW_WISE: %s", partitioningScheme.getEncoding()); checkArgument(!scope.isRemote() || type == REPARTITION || !partitioningScheme.isReplicateNullsAndAny(), "Only REPARTITION can replicate remotely"); checkArgument(scope != REMOTE_MATERIALIZED || type == REPARTITION, "Only REPARTITION can be REMOTE_MATERIALIZED: %s", type); @@ -189,6 +192,7 @@ public static ExchangeNode partitionedExchange(PlanNodeId id, Scope scope, PlanN child.getOutputVariables(), hashColumn, replicateNullsAndAny, + COLUMNAR, Optional.empty())); } @@ -337,4 +341,19 @@ public PlanNode assignStatsEquivalentPlanNode(Optional statsEquivalent { return new ExchangeNode(getSourceLocation(), getId(), statsEquivalentPlanNode, type, scope, partitioningScheme, sources, inputs, ensureSourceOrdering, orderingScheme); } + + public ExchangeNode withRowWiseEncoding() + { + return new ExchangeNode( + getSourceLocation(), + getId(), + getStatsEquivalentPlanNode(), + type, + scope, + partitioningScheme.withRowWiseEncoding(), + sources, + inputs, + ensureSourceOrdering, + orderingScheme); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/Patterns.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/Patterns.java index e60ab4f29a128..0c3e2afc9052a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/Patterns.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/Patterns.java @@ -316,4 +316,17 @@ public static Property>> rows() return property("rows", ValuesNode::getRows); } } + + public static class Exchange + { + public static Property scope() + { + return property("scope", ExchangeNode::getScope); + } + + public static Property type() + { + return property("type", ExchangeNode::getType); + } + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/RemoteSourceNode.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/RemoteSourceNode.java index 4bb79dc901d98..f5e980a4470b7 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/RemoteSourceNode.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/RemoteSourceNode.java @@ -14,6 +14,7 @@ package com.facebook.presto.sql.planner.plan; import com.facebook.presto.spi.SourceLocation; +import com.facebook.presto.spi.plan.ExchangeEncoding; import com.facebook.presto.spi.plan.OrderingScheme; import com.facebook.presto.spi.plan.PlanFragmentId; import com.facebook.presto.spi.plan.PlanNode; @@ -28,6 +29,7 @@ import java.util.List; import java.util.Optional; +import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -40,6 +42,7 @@ public class RemoteSourceNode private final boolean ensureSourceOrdering; private final Optional orderingScheme; private final ExchangeNode.Type exchangeType; // This is needed to "unfragment" to compute stats correctly. + private final ExchangeEncoding encoding; public RemoteSourceNode( Optional sourceLocation, @@ -49,7 +52,8 @@ public RemoteSourceNode( List outputVariables, boolean ensureSourceOrdering, Optional orderingScheme, - ExchangeNode.Type exchangeType) + ExchangeNode.Type exchangeType, + ExchangeEncoding encoding) { super(sourceLocation, id, statsEquivalentPlanNode); @@ -58,6 +62,7 @@ public RemoteSourceNode( this.ensureSourceOrdering = ensureSourceOrdering; this.orderingScheme = requireNonNull(orderingScheme, "orderingScheme is null"); this.exchangeType = requireNonNull(exchangeType, "exchangeType is null"); + this.encoding = requireNonNull(encoding, "encoding is null"); } @JsonCreator @@ -68,9 +73,10 @@ public RemoteSourceNode( @JsonProperty("outputVariables") List outputVariables, @JsonProperty("ensureSourceOrdering") boolean ensureSourceOrdering, @JsonProperty("orderingScheme") Optional orderingScheme, - @JsonProperty("exchangeType") ExchangeNode.Type exchangeType) + @JsonProperty("exchangeType") ExchangeNode.Type exchangeType, + @JsonProperty("encoding") ExchangeEncoding encoding) { - this(sourceLocation, id, Optional.empty(), sourceFragmentIds, outputVariables, ensureSourceOrdering, orderingScheme, exchangeType); + this(sourceLocation, id, Optional.empty(), sourceFragmentIds, outputVariables, ensureSourceOrdering, orderingScheme, exchangeType, encoding); } public RemoteSourceNode( @@ -82,7 +88,7 @@ public RemoteSourceNode( Optional orderingScheme, ExchangeNode.Type exchangeType) { - this(sourceLocation, id, ImmutableList.of(sourceFragmentId), outputVariables, ensureSourceOrdering, orderingScheme, exchangeType); + this(sourceLocation, id, ImmutableList.of(sourceFragmentId), outputVariables, ensureSourceOrdering, orderingScheme, exchangeType, COLUMNAR); } @Override @@ -122,6 +128,12 @@ public ExchangeNode.Type getExchangeType() return exchangeType; } + @JsonProperty + public ExchangeEncoding getEncoding() + { + return encoding; + } + @Override public R accept(InternalPlanVisitor visitor, C context) { @@ -138,6 +150,6 @@ public PlanNode replaceChildren(List newChildren) @Override public PlanNode assignStatsEquivalentPlanNode(Optional statsEquivalentPlanNode) { - return new RemoteSourceNode(getSourceLocation(), getId(), statsEquivalentPlanNode, sourceFragmentIds, outputVariables, ensureSourceOrdering, orderingScheme, exchangeType); + return new RemoteSourceNode(getSourceLocation(), getId(), statsEquivalentPlanNode, sourceFragmentIds, outputVariables, ensureSourceOrdering, orderingScheme, exchangeType, encoding); } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java index 77dff87b9b330..a5caacd845e7e 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java @@ -426,6 +426,7 @@ private static String formatFragment( Joiner.on(", ").join(partitioningScheme.getPartitioning().getArguments()), formatHash(partitioningScheme.getHashColumn()))); } + builder.append(indentString(1)).append(format("Output encoding: %s%n", fragment.getPartitioningScheme().getEncoding())); builder.append(indentString(1)).append(format("Stage Execution Strategy: %s%n", fragment.getStageExecutionDescriptor().getStageExecutionStrategy())); TypeProvider typeProvider = TypeProvider.fromVariables(fragment.getVariables()); @@ -1220,8 +1221,9 @@ else if (node.getScope().isLocal()) { else { addNode(node, format("%sExchange", UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, node.getScope().toString())), - format("[%s%s]%s", + format("[%s - %s%s]%s", node.getType(), + node.getPartitioningScheme().getEncoding(), node.getPartitioningScheme().isReplicateNullsAndAny() ? " - REPLICATE NULLS AND ANY" : "", formatHash(node.getPartitioningScheme().getHashColumn()))); } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java index 4ef563ad42406..f4f92c137538a 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java @@ -83,7 +83,8 @@ public void testDefaults() .setRateLimiterBucketMaxSize(100) .setRateLimiterCacheLimit(1000) .setRateLimiterCacheWindowMinutes(5) - .setEnableWorkerIsolation(false)); + .setEnableWorkerIsolation(false) + .setMinColumnarEncodingChannelsToPreferRowWiseEncoding(1000)); } @Test @@ -137,7 +138,7 @@ public void testExplicitPropertyMappings() .put("query-manager.rate-limiter-cache-window-minutes", "60") .put("query.cte-partitioning-provider-catalog", "hive") .put("query-manager.enable-worker-isolation", "true") - + .put("min-columnar-encoding-channels-to-prefer-row-wise-encoding", "123") .build(); QueryManagerConfig expected = new QueryManagerConfig() @@ -188,7 +189,8 @@ public void testExplicitPropertyMappings() .setRateLimiterCacheLimit(10000) .setRateLimiterCacheWindowMinutes(60) .setCtePartitioningProviderCatalog("hive") - .setEnableWorkerIsolation(true); + .setEnableWorkerIsolation(true) + .setMinColumnarEncodingChannelsToPreferRowWiseEncoding(123); ConfigAssertions.assertFullMapping(properties, expected); } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlStageExecution.java b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlStageExecution.java index 9017fef3e165e..00c8d0afd4d7c 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlStageExecution.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlStageExecution.java @@ -50,6 +50,7 @@ import static com.facebook.presto.execution.SqlStageExecution.createSqlStageExecution; import static com.facebook.presto.execution.buffer.OutputBuffers.BufferType.ARBITRARY; import static com.facebook.presto.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers; +import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; @@ -163,7 +164,8 @@ private static PlanFragment createExchangePlanFragment() ImmutableList.of(new VariableReferenceExpression(Optional.empty(), "column", VARCHAR)), false, Optional.empty(), - REPARTITION); + REPARTITION, + COLUMNAR); return new PlanFragment( new PlanFragmentId(0), diff --git a/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestAdaptivePhasedExecutionPolicy.java b/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestAdaptivePhasedExecutionPolicy.java index ff7754e1a9950..34a707792fccd 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestAdaptivePhasedExecutionPolicy.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestAdaptivePhasedExecutionPolicy.java @@ -66,6 +66,7 @@ import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.execution.SqlStageExecution.createSqlStageExecution; import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager; +import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; @@ -183,7 +184,8 @@ private static PlanNode getRemoteSourcePlanNode(PlanFragmentId fragmentId) ImmutableList.of(new VariableReferenceExpression(Optional.empty(), "column", VARCHAR)), false, Optional.empty(), - REPARTITION); + REPARTITION, + COLUMNAR); return planNode; } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestPhasedExecutionSchedule.java b/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestPhasedExecutionSchedule.java index bcc885a4a6b8c..6af221cd777d7 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestPhasedExecutionSchedule.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestPhasedExecutionSchedule.java @@ -45,6 +45,7 @@ import java.util.stream.Stream; import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR; import static com.facebook.presto.spi.plan.JoinDistributionType.REPLICATED; import static com.facebook.presto.spi.plan.JoinType.INNER; import static com.facebook.presto.spi.plan.JoinType.RIGHT; @@ -164,7 +165,8 @@ private static PlanFragment createExchangePlanFragment(String name, PlanFragment fragments[0].getPartitioningScheme().getOutputLayout(), false, Optional.empty(), - REPARTITION); + REPARTITION, + COLUMNAR); return createFragment(planNode); } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 332ebeac14234..86f9263c1c9d7 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -246,7 +246,8 @@ public void testDefaults() .setUseHistograms(false) .setInlineProjectionsOnValues(false) .setEagerPlanValidationEnabled(false) - .setEagerPlanValidationThreadPoolSize(20)); + .setEagerPlanValidationThreadPoolSize(20) + .setPrestoSparkExecutionEnvironment(false)); } @Test @@ -442,6 +443,7 @@ public void testExplicitPropertyMappings() .put("optimizer.inline-projections-on-values", "true") .put("eager-plan-validation-enabled", "true") .put("eager-plan-validation-thread-pool-size", "2") + .put("presto-spark-execution-environment", "true") .build(); FeaturesConfig expected = new FeaturesConfig() @@ -634,7 +636,8 @@ public void testExplicitPropertyMappings() .setUseHistograms(true) .setInlineProjectionsOnValues(true) .setEagerPlanValidationEnabled(true) - .setEagerPlanValidationThreadPoolSize(2); + .setEagerPlanValidationThreadPoolSize(2) + .setPrestoSparkExecutionEnvironment(true); assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/TestCanonicalPlanGenerator.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/TestCanonicalPlanGenerator.java index 360a9d421e6a6..ca678636d3abe 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/TestCanonicalPlanGenerator.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/TestCanonicalPlanGenerator.java @@ -346,7 +346,7 @@ public void testCanonicalPartitioningScheme() .filter(f -> !f.isSynthetic()) .map(Field::getName) .collect(toImmutableSet()), - ImmutableSet.of("partitioning", "outputLayout", "hashColumn", "replicateNullsAndAny", "bucketToPartition")); + ImmutableSet.of("partitioning", "outputLayout", "hashColumn", "replicateNullsAndAny", "encoding", "bucketToPartition")); assertEquals( Arrays.stream(Partitioning.class.getDeclaredFields()) .filter(f -> !f.isSynthetic()) diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineRemotePartitionedExchangeEncoding.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineRemotePartitionedExchangeEncoding.java new file mode 100644 index 0000000000000..8452b176950a0 --- /dev/null +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestDetermineRemotePartitionedExchangeEncoding.java @@ -0,0 +1,240 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.sql.planner.iterative.rule; + +import com.facebook.presto.Session; +import com.facebook.presto.common.type.ArrayType; +import com.facebook.presto.common.type.RowType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.cost.StatsProvider; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.plan.ExchangeEncoding; +import com.facebook.presto.spi.plan.Partitioning; +import com.facebook.presto.spi.plan.PartitioningHandle; +import com.facebook.presto.spi.plan.PartitioningScheme; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeId; +import com.facebook.presto.spi.plan.ValuesNode; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.facebook.presto.sql.planner.assertions.MatchResult; +import com.facebook.presto.sql.planner.assertions.Matcher; +import com.facebook.presto.sql.planner.assertions.PlanMatchPattern; +import com.facebook.presto.sql.planner.assertions.SymbolAliases; +import com.facebook.presto.sql.planner.iterative.rule.test.RuleAssert; +import com.facebook.presto.sql.planner.iterative.rule.test.RuleTester; +import com.facebook.presto.sql.planner.plan.ExchangeNode; +import com.google.common.collect.ImmutableList; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Optional; +import java.util.stream.IntStream; + +import static com.facebook.presto.SystemSessionProperties.NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING; +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.DecimalType.createDecimalType; +import static com.facebook.presto.common.type.RealType.REAL; +import static com.facebook.presto.common.type.VarcharType.VARCHAR; +import static com.facebook.presto.common.type.VarcharType.createVarcharType; +import static com.facebook.presto.spi.plan.ExchangeEncoding.ROW_WISE; +import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION; +import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION; +import static com.facebook.presto.sql.planner.assertions.MatchResult.NO_MATCH; +import static com.facebook.presto.sql.planner.assertions.MatchResult.match; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node; +import static com.facebook.presto.sql.planner.iterative.rule.DetermineRemotePartitionedExchangeEncoding.estimateNumberOfColumnarChannels; +import static com.facebook.presto.sql.planner.iterative.rule.DetermineRemotePartitionedExchangeEncoding.estimateNumberOfOutputColumnarChannels; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.partitionedExchange; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; +import static org.testng.Assert.assertEquals; + +public class TestDetermineRemotePartitionedExchangeEncoding +{ + private static final int MIN_COLUMNAR_STREAMS = 100; + + private RuleTester tester; + + @BeforeClass + public void setUp() + { + tester = new RuleTester(); + } + + @AfterClass(alwaysRun = true) + public void tearDown() + { + tester.close(); + tester = null; + } + + @Test + public void testPrestoOnSpark() + { + // special exchanges are always columnar + assertForPrestoOnSpark() + .on(p -> createExchange(FIXED_ARBITRARY_DISTRIBUTION, MIN_COLUMNAR_STREAMS)) + .doesNotFire(); + // do not fire twice + assertForPrestoOnSpark() + .on(p -> createExchange(FIXED_HASH_DISTRIBUTION, MIN_COLUMNAR_STREAMS).withRowWiseEncoding()) + .doesNotFire(); + // hash based exchanges are always row wise in Presto on Spark + assertForPrestoOnSpark() + .on(p -> createExchange(FIXED_HASH_DISTRIBUTION, MIN_COLUMNAR_STREAMS - 1)) + .matches(exchangeEncoding(ROW_WISE)); + } + + @Test + public void testPresto() + { + // exchanges are always columnar in Presto + assertForPresto() + .on(p -> createExchange(FIXED_ARBITRARY_DISTRIBUTION, MIN_COLUMNAR_STREAMS)) + .doesNotFire(); + assertForPresto() + .on(p -> createExchange(FIXED_HASH_DISTRIBUTION, MIN_COLUMNAR_STREAMS - 1)) + .doesNotFire(); + assertForPresto() + .on(p -> createExchange(FIXED_HASH_DISTRIBUTION, MIN_COLUMNAR_STREAMS + 1)) + .doesNotFire(); + } + + @Test + public void testNative() + { + // special exchanges are always columnar + assertForNative() + .on(p -> createExchange(FIXED_ARBITRARY_DISTRIBUTION, MIN_COLUMNAR_STREAMS)) + .doesNotFire(); + // hash based exchange with the total number of output columnar streams lower than threshold is columnar + assertForNative() + .on(p -> createExchange(FIXED_HASH_DISTRIBUTION, MIN_COLUMNAR_STREAMS - 1)) + .doesNotFire(); + // otherwise row wise + assertForNative() + .on(p -> createExchange(FIXED_HASH_DISTRIBUTION, MIN_COLUMNAR_STREAMS)) + .matches(exchangeEncoding(ROW_WISE)); + } + + private RuleAssert assertForPrestoOnSpark() + { + return createAssert(false, true); + } + + private RuleAssert assertForNative() + { + return createAssert(true, false); + } + + private RuleAssert assertForPresto() + { + return createAssert(false, false); + } + + private RuleAssert createAssert(boolean nativeExecution, boolean prestoSparkExecutionEnvironment) + { + return tester.assertThat(new DetermineRemotePartitionedExchangeEncoding(nativeExecution, prestoSparkExecutionEnvironment)) + .setSystemProperty(NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, MIN_COLUMNAR_STREAMS + ""); + } + + private static ExchangeNode createExchange(PartitioningHandle handle, int numberOfOutputColumnarStreams) + { + int numberOfBigintColumns = numberOfOutputColumnarStreams / 2; + List types = IntStream.range(0, numberOfBigintColumns) + .mapToObj(i -> BIGINT) + .collect(toImmutableList()); + ExchangeNode exchangeNode = createExchangeNode(handle, types, types); + assertEquals(estimateNumberOfOutputColumnarChannels(exchangeNode), numberOfBigintColumns * 2); + return exchangeNode; + } + + private static PlanMatchPattern exchangeEncoding(ExchangeEncoding encoding) + { + return node(ExchangeNode.class, node(ValuesNode.class)).with(new ExchangeEncodingMatcher(encoding)); + } + + @Test + public void testEstimateNumberOfOutputColumnarChannels() + { + assertEquals(estimateNumberOfOutputColumnarChannels(createExchangeNode(FIXED_HASH_DISTRIBUTION, ImmutableList.of(BIGINT), ImmutableList.of(BIGINT))), 2); + assertEquals(estimateNumberOfOutputColumnarChannels(createExchangeNode(FIXED_HASH_DISTRIBUTION, ImmutableList.of(BIGINT, VARCHAR), ImmutableList.of(BIGINT))), 2); + } + + private static ExchangeNode createExchangeNode(PartitioningHandle handle, List inputTypes, List outputTypes) + { + return partitionedExchange( + new PlanNodeId("exchange"), + REMOTE_STREAMING, + new ValuesNode( + Optional.empty(), + new PlanNodeId("values"), + createExpressions(inputTypes), + ImmutableList.of(), + Optional.empty()), + new PartitioningScheme( + Partitioning.create(handle, ImmutableList.of()), + createExpressions(outputTypes))); + } + + private static List createExpressions(List types) + { + ImmutableList.Builder result = ImmutableList.builder(); + for (int i = 0; i < types.size(); i++) { + result.add(new VariableReferenceExpression(Optional.empty(), "exp_" + i, types.get(i))); + } + return result.build(); + } + + @Test + public void testEstimateNumberOfColumnarChannels() + { + assertEquals(estimateNumberOfColumnarChannels(BIGINT), 2); + assertEquals(estimateNumberOfColumnarChannels(REAL), 2); + assertEquals(estimateNumberOfColumnarChannels(VARCHAR), 3); + assertEquals(estimateNumberOfColumnarChannels(createVarcharType(10)), 3); + assertEquals(estimateNumberOfColumnarChannels(createDecimalType(3, 2)), 2); + assertEquals(estimateNumberOfColumnarChannels(createDecimalType(30, 2)), 2); + assertEquals(estimateNumberOfColumnarChannels(new ArrayType(BIGINT)), 4); + assertEquals(estimateNumberOfColumnarChannels(new ArrayType(VARCHAR)), 5); + assertEquals(estimateNumberOfColumnarChannels(RowType.anonymous(ImmutableList.of(BIGINT, VARCHAR))), 7); + } + + private static class ExchangeEncodingMatcher + implements Matcher + { + private final ExchangeEncoding encoding; + + private ExchangeEncodingMatcher(ExchangeEncoding encoding) + { + this.encoding = requireNonNull(encoding, "encoding is null"); + } + + @Override + public boolean shapeMatches(PlanNode node) + { + return node instanceof ExchangeNode; + } + + @Override + public MatchResult detailMatches(PlanNode node, StatsProvider stats, Session session, Metadata metadata, SymbolAliases symbolAliases) + { + ExchangeNode exchangeNode = (ExchangeNode) node; + return exchangeNode.getPartitioningScheme().getEncoding() == encoding ? match() : NO_MATCH; + } + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java index bf06694b8006e..7568d1e170494 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/iterative/rule/test/PlanBuilder.java @@ -110,6 +110,7 @@ import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.UnknownType.UNKNOWN; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; +import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR; import static com.facebook.presto.spi.plan.LimitNode.Step.FINAL; import static com.facebook.presto.sql.analyzer.ExpressionAnalyzer.getExpressionTypes; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION; @@ -331,7 +332,7 @@ public RemoteSourceNode remoteSource(List sourceFragmentIds) public RemoteSourceNode remoteSource(PlanNodeId planNodeId, List sourceFragmentIds, List outputVariables) { - return new RemoteSourceNode(Optional.empty(), planNodeId, sourceFragmentIds, outputVariables, false, Optional.empty(), REPARTITION); + return new RemoteSourceNode(Optional.empty(), planNodeId, sourceFragmentIds, outputVariables, false, Optional.empty(), REPARTITION, COLUMNAR); } public RemoteSourceNode remoteSource(List sourceFragmentIds, PlanNode statsEquivalentPlanNode) @@ -343,7 +344,8 @@ public RemoteSourceNode remoteSource(List sourceFragmentIds, Pla sourceFragmentIds, ImmutableList.of(), false, Optional.empty(), - REPARTITION); + REPARTITION, + COLUMNAR); } public CallExpression binaryOperation(OperatorType operatorType, RowExpression left, RowExpression right) diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp index e74b3111b9421..94b7788496e8b 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp @@ -332,6 +332,16 @@ core::LocalPartitionNode::Type toLocalExchangeType( } } +VectorSerde::Kind toVeloxSerdeKind(protocol::ExchangeEncoding encoding) { + switch (encoding) { + case protocol::ExchangeEncoding::COLUMNAR: + return VectorSerde::Kind::kPresto; + case protocol::ExchangeEncoding::ROW_WISE: + return VectorSerde::Kind::kCompactRow; + } + VELOX_UNSUPPORTED("Unsupported encoding: {}.", fmt::underlying(encoding)); +} + std::shared_ptr buildLocalSystemPartitionNode( const std::shared_ptr& node, core::LocalPartitionNode::Type type, @@ -1830,7 +1840,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( planFragment.planNode = core::PartitionedOutputNode::single( partitionedOutputNodeId, outputType, - velox::VectorSerde::Kind::kPresto, + toVeloxSerdeKind((partitioningScheme.encoding)), sourceNode); return planFragment; case protocol::SystemPartitioning::FIXED: { @@ -1842,7 +1852,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( planFragment.planNode = core::PartitionedOutputNode::single( partitionedOutputNodeId, outputType, - velox::VectorSerde::Kind::kPresto, + toVeloxSerdeKind((partitioningScheme.encoding)), sourceNode); return planFragment; } @@ -1855,7 +1865,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( partitioningScheme.replicateNullsAndAny, std::make_shared(), outputType, - velox::VectorSerde::Kind::kPresto, + toVeloxSerdeKind((partitioningScheme.encoding)), sourceNode); return planFragment; } @@ -1866,7 +1876,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( planFragment.planNode = core::PartitionedOutputNode::single( partitionedOutputNodeId, outputType, - velox::VectorSerde::Kind::kPresto, + toVeloxSerdeKind((partitioningScheme.encoding)), sourceNode); return planFragment; } @@ -1880,7 +1890,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( std::make_shared( inputType, keyChannels, constValues), outputType, - velox::VectorSerde::Kind::kPresto, + toVeloxSerdeKind((partitioningScheme.encoding)), sourceNode); return planFragment; } @@ -1889,7 +1899,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( partitionedOutputNodeId, 1, outputType, - velox::VectorSerde::Kind::kPresto, + toVeloxSerdeKind((partitioningScheme.encoding)), sourceNode); return planFragment; } @@ -1908,7 +1918,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( planFragment.planNode = core::PartitionedOutputNode::arbitrary( partitionedOutputNodeId, std::move(outputType), - velox::VectorSerde::Kind::kPresto, + toVeloxSerdeKind((partitioningScheme.encoding)), std::move(sourceNode)); return planFragment; } @@ -1927,7 +1937,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( planFragment.planNode = core::PartitionedOutputNode::single( partitionedOutputNodeId, outputType, - velox::VectorSerde::Kind::kPresto, + toVeloxSerdeKind((partitioningScheme.encoding)), sourceNode); return planFragment; } @@ -1943,7 +1953,7 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan( partitioningScheme.replicateNullsAndAny, std::shared_ptr(std::move(spec)), toRowType(partitioningScheme.outputLayout, typeParser_), - velox::VectorSerde::Kind::kPresto, + toVeloxSerdeKind((partitioningScheme.encoding)), sourceNode); return planFragment; } @@ -1980,10 +1990,10 @@ velox::core::PlanNodePtr VeloxInteractiveQueryPlanConverter::toVeloxQueryPlan( rowType, sortingKeys, sortingOrders, - velox::VectorSerde::Kind::kPresto); + toVeloxSerdeKind(node->encoding)); } return std::make_shared( - node->id, rowType, velox::VectorSerde::Kind::kPresto); + node->id, rowType, toVeloxSerdeKind(node->encoding)); } velox::connector::CommitStrategy diff --git a/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp b/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp index f6ec94bd1dd1b..715780befa84a 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp +++ b/presto-native-execution/presto_cpp/main/types/tests/PlanConverterTest.cpp @@ -144,6 +144,7 @@ TEST_F(PlanConverterTest, partitionedOutput) { ASSERT_EQ(keys.size(), 2); ASSERT_EQ(keys[0]->toString(), "1 elements starting at 0 {cluster_label_v2}"); ASSERT_EQ(keys[1]->toString(), "\"expr_181\""); + ASSERT_EQ(partitionedOutput->serdeKind(), VectorSerde::Kind::kCompactRow); } // Final Agg stage plan for select regionkey, sum(1) from nation group by 1 diff --git a/presto-native-execution/presto_cpp/main/types/tests/data/FinalAgg.json b/presto-native-execution/presto_cpp/main/types/tests/data/FinalAgg.json index 236d5956eb66f..66ac079326a62 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/data/FinalAgg.json +++ b/presto-native-execution/presto_cpp/main/types/tests/data/FinalAgg.json @@ -47,7 +47,8 @@ "name":"$hashvalue", "type":"bigint" }, - "replicateNullsAndAny":false + "replicateNullsAndAny":false, + "encoding":"COLUMNAR" }, "sources":[ { @@ -74,7 +75,8 @@ } ], "ensureSourceOrdering":false, - "exchangeType":"REPARTITION" + "exchangeType":"REPARTITION", + "encoding":"COLUMNAR" } ], "inputs":[ @@ -239,6 +241,7 @@ } ], "replicateNullsAndAny":false, + "encoding":"COLUMNAR", "bucketToPartition":[ 0 ] diff --git a/presto-native-execution/presto_cpp/main/types/tests/data/OffsetLimit.json b/presto-native-execution/presto_cpp/main/types/tests/data/OffsetLimit.json index bb61d99d580b1..517bffe31002b 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/data/OffsetLimit.json +++ b/presto-native-execution/presto_cpp/main/types/tests/data/OffsetLimit.json @@ -51,7 +51,8 @@ "type":"bigint" } ], - "replicateNullsAndAny":false + "replicateNullsAndAny":false, + "encoding":"COLUMNAR" }, "sources":[ { @@ -102,7 +103,8 @@ "type":"bigint" } ], - "replicateNullsAndAny":false + "replicateNullsAndAny":false, + "encoding":"COLUMNAR" }, "sources":[ { @@ -153,7 +155,8 @@ "type":"bigint" } ], - "replicateNullsAndAny":false + "replicateNullsAndAny":false, + "encoding":"COLUMNAR" }, "sources":[ { @@ -200,7 +203,8 @@ } ] }, - "exchangeType":"GATHER" + "exchangeType":"GATHER", + "encoding":"COLUMNAR" }, "partitionBy":[ @@ -476,6 +480,7 @@ } ], "replicateNullsAndAny":false, + "encoding":"COLUMNAR", "bucketToPartition":[ 0 ] diff --git a/presto-native-execution/presto_cpp/main/types/tests/data/Output.json b/presto-native-execution/presto_cpp/main/types/tests/data/Output.json index 897371b84f2ee..10f6452605c62 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/data/Output.json +++ b/presto-native-execution/presto_cpp/main/types/tests/data/Output.json @@ -22,7 +22,8 @@ } ], "ensureSourceOrdering":false, - "exchangeType":"GATHER" + "exchangeType":"GATHER", + "encoding":"COLUMNAR" }, "columnNames":[ "regionkey", @@ -89,6 +90,7 @@ } ], "replicateNullsAndAny":false, + "encoding":"COLUMNAR", "bucketToPartition":[ 0 ] diff --git a/presto-native-execution/presto_cpp/main/types/tests/data/PartitionedOutput.json b/presto-native-execution/presto_cpp/main/types/tests/data/PartitionedOutput.json index 453926f5c9e43..19bc5f3756223 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/data/PartitionedOutput.json +++ b/presto-native-execution/presto_cpp/main/types/tests/data/PartitionedOutput.json @@ -463,7 +463,8 @@ 1, 2, 3 - ] + ], + "encoding":"ROW_WISE" }, "stageExecutionDescriptor":{ "stageExecutionStrategy":"UNGROUPED_EXECUTION", diff --git a/presto-native-execution/presto_cpp/main/types/tests/data/ScanAgg.json b/presto-native-execution/presto_cpp/main/types/tests/data/ScanAgg.json index cdda8bdb383d2..20a54c15041be 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/data/ScanAgg.json +++ b/presto-native-execution/presto_cpp/main/types/tests/data/ScanAgg.json @@ -320,6 +320,7 @@ } ], "replicateNullsAndAny":false, + "encoding":"COLUMNAR", "bucketToPartition":[ 0 ] diff --git a/presto-native-execution/presto_cpp/main/types/tests/data/ScanAggBatch.json b/presto-native-execution/presto_cpp/main/types/tests/data/ScanAggBatch.json index e1d2670f7497e..c7c257398ff3d 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/data/ScanAggBatch.json +++ b/presto-native-execution/presto_cpp/main/types/tests/data/ScanAggBatch.json @@ -299,6 +299,7 @@ } ], "replicateNullsAndAny":false, + "encoding":"COLUMNAR", "bucketToPartition":[ 0, 1, diff --git a/presto-native-execution/presto_cpp/main/types/tests/data/ScanAggCustomConnectorId.json b/presto-native-execution/presto_cpp/main/types/tests/data/ScanAggCustomConnectorId.json index abdb01bcf446d..d6f4496daae44 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/data/ScanAggCustomConnectorId.json +++ b/presto-native-execution/presto_cpp/main/types/tests/data/ScanAggCustomConnectorId.json @@ -266,6 +266,7 @@ } ], "replicateNullsAndAny":false, + "encoding":"COLUMNAR", "bucketToPartition":[ 0 ] diff --git a/presto-native-execution/presto_cpp/main/types/tests/data/ValuesPipeTest.json b/presto-native-execution/presto_cpp/main/types/tests/data/ValuesPipeTest.json index 82145395b3829..c1d4e930f42d1 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/data/ValuesPipeTest.json +++ b/presto-native-execution/presto_cpp/main/types/tests/data/ValuesPipeTest.json @@ -34,7 +34,8 @@ "type": "varchar(1)" } ], - "replicateNullsAndAny": false + "replicateNullsAndAny": false, + "encoding":"COLUMNAR" }, "sources": [ { @@ -202,6 +203,7 @@ } ], "replicateNullsAndAny": false, + "encoding":"COLUMNAR", "bucketToPartition": [ 0 ] diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp index 3f9a3f1564beb..38acd7d33419b 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp +++ b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp @@ -4635,6 +4635,45 @@ void from_json(const json& j, ExchangeNodeType& e) { ->first; } } // namespace facebook::presto::protocol +namespace facebook::presto::protocol { +// Loosly copied this here from NLOHMANN_JSON_SERIALIZE_ENUM() + +// NOLINTNEXTLINE: cppcoreguidelines-avoid-c-arrays +static const std::pair ExchangeEncoding_enum_table[] = + { // NOLINT: cert-err58-cpp + {ExchangeEncoding::COLUMNAR, "COLUMNAR"}, + {ExchangeEncoding::ROW_WISE, "ROW_WISE"}}; +void to_json(json& j, const ExchangeEncoding& e) { + static_assert( + std::is_enum::value, + "ExchangeEncoding must be an enum!"); + const auto* it = std::find_if( + std::begin(ExchangeEncoding_enum_table), + std::end(ExchangeEncoding_enum_table), + [e](const std::pair& ej_pair) -> bool { + return ej_pair.first == e; + }); + j = ((it != std::end(ExchangeEncoding_enum_table)) + ? it + : std::begin(ExchangeEncoding_enum_table)) + ->second; +} +void from_json(const json& j, ExchangeEncoding& e) { + static_assert( + std::is_enum::value, + "ExchangeEncoding must be an enum!"); + const auto* it = std::find_if( + std::begin(ExchangeEncoding_enum_table), + std::end(ExchangeEncoding_enum_table), + [&j](const std::pair& ej_pair) -> bool { + return ej_pair.second == j; + }); + e = ((it != std::end(ExchangeEncoding_enum_table)) + ? it + : std::begin(ExchangeEncoding_enum_table)) + ->first; +} +} // namespace facebook::presto::protocol /* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -4789,6 +4828,13 @@ void to_json(json& j, const PartitioningScheme& p) { "PartitioningScheme", "bool", "replicateNullsAndAny"); + to_json_key( + j, + "encoding", + p.encoding, + "PartitioningScheme", + "ExchangeEncoding", + "encoding"); to_json_key( j, "bucketToPartition", @@ -4827,6 +4873,13 @@ void from_json(const json& j, PartitioningScheme& p) { "PartitioningScheme", "bool", "replicateNullsAndAny"); + from_json_key( + j, + "encoding", + p.encoding, + "PartitioningScheme", + "ExchangeEncoding", + "encoding"); from_json_key( j, "bucketToPartition", @@ -7997,6 +8050,13 @@ void to_json(json& j, const RemoteSourceNode& p) { "RemoteSourceNode", "ExchangeNodeType", "exchangeType"); + to_json_key( + j, + "encoding", + p.encoding, + "RemoteSourceNode", + "ExchangeEncoding", + "encoding"); } void from_json(const json& j, RemoteSourceNode& p) { @@ -8037,6 +8097,13 @@ void from_json(const json& j, RemoteSourceNode& p) { "RemoteSourceNode", "ExchangeNodeType", "exchangeType"); + from_json_key( + j, + "encoding", + p.encoding, + "RemoteSourceNode", + "ExchangeEncoding", + "encoding"); } } // namespace facebook::presto::protocol namespace facebook::presto::protocol { diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h index e2fdc6063d55a..cbce83539ca17 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h +++ b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h @@ -1281,6 +1281,11 @@ extern void to_json(json& j, const ExchangeNodeType& e); extern void from_json(const json& j, ExchangeNodeType& e); } // namespace facebook::presto::protocol namespace facebook::presto::protocol { +enum class ExchangeEncoding { COLUMNAR, ROW_WISE }; +extern void to_json(json& j, const ExchangeEncoding& e); +extern void from_json(const json& j, ExchangeEncoding& e); +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { struct PartitioningHandle { std::shared_ptr connectorId = {}; std::shared_ptr transactionHandle = {}; @@ -1303,6 +1308,7 @@ struct PartitioningScheme { List outputLayout = {}; std::shared_ptr hashColumn = {}; bool replicateNullsAndAny = {}; + ExchangeEncoding encoding = {}; std::shared_ptr> bucketToPartition = {}; }; void to_json(json& j, const PartitioningScheme& p); @@ -1889,6 +1895,7 @@ struct RemoteSourceNode : public PlanNode { bool ensureSourceOrdering = {}; std::shared_ptr orderingScheme = {}; ExchangeNodeType exchangeType = {}; + ExchangeEncoding encoding = {}; RemoteSourceNode() noexcept; }; diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.yml b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.yml index d1cb2bd690a95..27cf79acf4632 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.yml +++ b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.yml @@ -17,6 +17,7 @@ EnumMap: ExchangeNode: Type: ExchangeNodeType Scope: ExchangeNodeScope + Encoding: ExchangeEncoding LimitNode: Step: LimitNodeStep @@ -314,3 +315,4 @@ JavaClasses: - presto-main/src/main/java/com/facebook/presto/connector/system/SystemTransactionHandle.java - presto-spi/src/main/java/com/facebook/presto/spi/function/AggregationFunctionMetadata.java - presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/json/JsonBasedUdfFunctionMetadata.java + - presto-spi/src/main/java/com/facebook/presto/spi/plan/ExchangeEncoding.java diff --git a/presto-native-execution/presto_cpp/presto_protocol/tests/data/ExchangeNode.json b/presto-native-execution/presto_cpp/presto_protocol/tests/data/ExchangeNode.json index 0a558a3411b02..cabec3d73bf9f 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/tests/data/ExchangeNode.json +++ b/presto-native-execution/presto_cpp/presto_protocol/tests/data/ExchangeNode.json @@ -26,7 +26,8 @@ "type": "varchar(1)" } ], - "replicateNullsAndAny": false + "replicateNullsAndAny": false, + "encoding":"COLUMNAR" }, "sources": [], "inputs": [ diff --git a/presto-native-execution/presto_cpp/presto_protocol/tests/data/FilterNode.json b/presto-native-execution/presto_cpp/presto_protocol/tests/data/FilterNode.json index 6e66e5d4c62a5..7da48e2fc6bea 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/tests/data/FilterNode.json +++ b/presto-native-execution/presto_cpp/presto_protocol/tests/data/FilterNode.json @@ -29,7 +29,8 @@ "type": "varchar(1)" } ], - "replicateNullsAndAny": false + "replicateNullsAndAny": false, + "encoding":"COLUMNAR" }, "sources": [], "inputs": [ diff --git a/presto-native-execution/presto_cpp/presto_protocol/tests/data/OutputNode.json b/presto-native-execution/presto_cpp/presto_protocol/tests/data/OutputNode.json index ce9e7378bffd0..b2811c1227e43 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/tests/data/OutputNode.json +++ b/presto-native-execution/presto_cpp/presto_protocol/tests/data/OutputNode.json @@ -29,7 +29,8 @@ "type": "varchar(1)" } ], - "replicateNullsAndAny": false + "replicateNullsAndAny": false, + "encoding":"COLUMNAR" }, "sources": [], "inputs": [ diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeAggregations.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeAggregations.java index 3729247bc9aed..676fa449b31e0 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeAggregations.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeAggregations.java @@ -16,8 +16,10 @@ import com.facebook.presto.Session; import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.tests.AbstractTestQueryFramework; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static com.facebook.presto.SystemSessionProperties.NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING; import static com.facebook.presto.SystemSessionProperties.OPTIMIZE_DISTINCT_AGGREGATIONS; import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createLineitem; import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createNation; @@ -42,87 +44,87 @@ protected void createTables() createRegion(queryRunner); } - @Test - public void testAggregations() + @Test(dataProvider = "exchangeEncodingProvider") + public void testAggregations(String exchangeEncoding) { - assertQuery("SELECT count(*) FROM nation"); - assertQuery("SELECT regionkey, count(*) FROM nation GROUP BY regionkey"); + assertQuery(getSession(exchangeEncoding), "SELECT count(*) FROM nation"); + assertQuery(getSession(exchangeEncoding), "SELECT regionkey, count(*) FROM nation GROUP BY regionkey"); - assertQuery("SELECT avg(discount), avg(quantity) FROM lineitem"); - assertQuery( + assertQuery(getSession(exchangeEncoding), "SELECT avg(discount), avg(quantity) FROM lineitem"); + assertQuery(getSession(exchangeEncoding), "SELECT linenumber, avg(discount), avg(quantity) FROM lineitem GROUP BY linenumber"); - assertQuery("SELECT sum(totalprice) FROM orders"); - assertQuery("SELECT orderpriority, sum(totalprice) FROM orders GROUP BY orderpriority"); + assertQuery(getSession(exchangeEncoding), "SELECT sum(totalprice) FROM orders"); + assertQuery(getSession(exchangeEncoding), "SELECT orderpriority, sum(totalprice) FROM orders GROUP BY orderpriority"); - assertQuery("SELECT custkey, min(totalprice), max(orderkey) FROM orders GROUP BY custkey"); + assertQuery(getSession(exchangeEncoding), "SELECT custkey, min(totalprice), max(orderkey) FROM orders GROUP BY custkey"); - assertQuery("SELECT bitwise_and_agg(orderkey), bitwise_and_agg(suppkey), bitwise_or_agg(partkey), bitwise_or_agg(linenumber) FROM lineitem"); - assertQuery("SELECT orderkey, bitwise_and_agg(orderkey), bitwise_and_agg(suppkey) FROM lineitem GROUP BY orderkey"); - assertQuery("SELECT bitwise_and_agg(custkey), bitwise_or_agg(orderkey) FROM orders"); - assertQuery("SELECT shippriority, bitwise_and_agg(orderkey), bitwise_or_agg(custkey) FROM orders GROUP BY shippriority"); + assertQuery(getSession(exchangeEncoding), "SELECT bitwise_and_agg(orderkey), bitwise_and_agg(suppkey), bitwise_or_agg(partkey), bitwise_or_agg(linenumber) FROM lineitem"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey, bitwise_and_agg(orderkey), bitwise_and_agg(suppkey) FROM lineitem GROUP BY orderkey"); + assertQuery(getSession(exchangeEncoding), "SELECT bitwise_and_agg(custkey), bitwise_or_agg(orderkey) FROM orders"); + assertQuery(getSession(exchangeEncoding), "SELECT shippriority, bitwise_and_agg(orderkey), bitwise_or_agg(custkey) FROM orders GROUP BY shippriority"); - assertQuery("SELECT sum(custkey), clerk FROM orders GROUP BY clerk HAVING sum(custkey) > 10000"); + assertQuery(getSession(exchangeEncoding), "SELECT sum(custkey), clerk FROM orders GROUP BY clerk HAVING sum(custkey) > 10000"); - assertQuery("SELECT orderkey, array_sort(array_agg(linenumber)) FROM lineitem GROUP BY 1"); - assertQuery("SELECT orderkey, map_agg(linenumber, discount) FROM lineitem GROUP BY 1"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey, array_sort(array_agg(linenumber)) FROM lineitem GROUP BY 1"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey, map_agg(linenumber, discount) FROM lineitem GROUP BY 1"); - assertQuery("SELECT array_agg(nationkey ORDER BY name) FROM nation"); - assertQuery("SELECT orderkey, array_agg(quantity ORDER BY linenumber DESC) FROM lineitem GROUP BY 1"); + assertQuery(getSession(exchangeEncoding), "SELECT array_agg(nationkey ORDER BY name) FROM nation"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey, array_agg(quantity ORDER BY linenumber DESC) FROM lineitem GROUP BY 1"); - assertQuery("SELECT array_sort(map_keys(map_union(quantity_by_linenumber))) FROM orders_ex"); + assertQuery(getSession(exchangeEncoding), "SELECT array_sort(map_keys(map_union(quantity_by_linenumber))) FROM orders_ex"); - assertQuery("SELECT orderkey, count_if(linenumber % 2 > 0) FROM lineitem GROUP BY 1"); - assertQuery("SELECT orderkey, bool_and(linenumber % 2 = 1) FROM lineitem GROUP BY 1"); - assertQuery("SELECT orderkey, bool_or(linenumber % 2 = 0) FROM lineitem GROUP BY 1"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey, count_if(linenumber % 2 > 0) FROM lineitem GROUP BY 1"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey, bool_and(linenumber % 2 = 1) FROM lineitem GROUP BY 1"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey, bool_or(linenumber % 2 = 0) FROM lineitem GROUP BY 1"); - assertQuery("SELECT linenumber = 2 AND quantity > 10, sum(quantity / 7) FROM lineitem GROUP BY 1"); + assertQuery(getSession(exchangeEncoding), "SELECT linenumber = 2 AND quantity > 10, sum(quantity / 7) FROM lineitem GROUP BY 1"); - assertQuerySucceeds("SELECT approx_percentile(totalprice, 0.25) FROM orders"); - assertQuerySucceeds("SELECT approx_percentile(totalprice, orderkey, 0.25) FROM orders"); - assertQuerySucceeds("SELECT clerk, approx_percentile(totalprice, 0.25) FROM orders GROUP BY 1"); - assertQuerySucceeds("SELECT approx_percentile(totalprice, 0.25, 0.005) FROM orders"); - assertQuerySucceeds("SELECT approx_percentile(totalprice, orderkey, 0.25, 0.005) FROM orders"); - assertQuerySucceeds("SELECT approx_percentile(totalprice, 0.25), approx_percentile(totalprice, 0.5) FROM orders"); - assertQuerySucceeds("SELECT approx_percentile(totalprice, orderkey, 0.25), approx_percentile(totalprice, orderkey, 0.5) FROM orders"); - assertQuerySucceeds("SELECT clerk, approx_percentile(totalprice, 0.25), approx_percentile(totalprice, 0.5) FROM orders GROUP BY 1"); - assertQuerySucceeds("SELECT approx_percentile(totalprice, 0.25, 0.005), approx_percentile(totalprice, 0.5, 0.005) FROM orders"); - assertQuerySucceeds("SELECT approx_percentile(totalprice, orderkey, 0.25, 0.005), approx_percentile(totalprice, orderkey, 0.5, 0.005) FROM orders"); - assertQuerySucceeds("SELECT approx_percentile(totalprice, ARRAY[0.25, 0.5]) FROM orders"); - assertQuerySucceeds("SELECT approx_percentile(totalprice, orderkey, ARRAY[0.25, 0.5]) FROM orders"); - assertQuerySucceeds("SELECT clerk, approx_percentile(totalprice, ARRAY[0.25, 0.5]) FROM orders GROUP BY 1"); - assertQuerySucceeds("SELECT approx_percentile(totalprice, ARRAY[0.25, 0.5], 0.005) FROM orders"); - assertQuerySucceeds("SELECT approx_percentile(totalprice, orderkey, ARRAY[0.25, 0.5], 0.005) FROM orders"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_percentile(totalprice, 0.25) FROM orders"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_percentile(totalprice, orderkey, 0.25) FROM orders"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT clerk, approx_percentile(totalprice, 0.25) FROM orders GROUP BY 1"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_percentile(totalprice, 0.25, 0.005) FROM orders"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_percentile(totalprice, orderkey, 0.25, 0.005) FROM orders"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_percentile(totalprice, 0.25), approx_percentile(totalprice, 0.5) FROM orders"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_percentile(totalprice, orderkey, 0.25), approx_percentile(totalprice, orderkey, 0.5) FROM orders"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT clerk, approx_percentile(totalprice, 0.25), approx_percentile(totalprice, 0.5) FROM orders GROUP BY 1"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_percentile(totalprice, 0.25, 0.005), approx_percentile(totalprice, 0.5, 0.005) FROM orders"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_percentile(totalprice, orderkey, 0.25, 0.005), approx_percentile(totalprice, orderkey, 0.5, 0.005) FROM orders"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_percentile(totalprice, ARRAY[0.25, 0.5]) FROM orders"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_percentile(totalprice, orderkey, ARRAY[0.25, 0.5]) FROM orders"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT clerk, approx_percentile(totalprice, ARRAY[0.25, 0.5]) FROM orders GROUP BY 1"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_percentile(totalprice, ARRAY[0.25, 0.5], 0.005) FROM orders"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_percentile(totalprice, orderkey, ARRAY[0.25, 0.5], 0.005) FROM orders"); // count is not using any channel or mask. // sum1 and sum3 are using different channels, but the same mask. // sum2 and sum1 are using the same channel, but different masks. - assertQuery("SELECT count(1), sum(IF(linenumber = 7, partkey)), sum(IF(linenumber = 5, partkey)), sum(IF(linenumber = 7, orderkey)) FROM lineitem"); - assertQuery("SELECT count(1), sum(partkey) FILTER (where linenumber = 7), sum(partkey) FILTER (where linenumber = 5), sum(orderkey) FILTER (where linenumber = 7) FROM lineitem"); - assertQuery("SELECT shipmode, count(1), sum(IF(linenumber = 7, partkey)), sum(IF(linenumber = 5, partkey)), sum(IF(linenumber = 7, orderkey)) FROM lineitem group by 1"); - assertQuery("SELECT shipmode, count(1), sum(partkey) FILTER (where linenumber = 7), sum(partkey) FILTER (where linenumber = 5), sum(orderkey) FILTER (where linenumber = 7) FROM lineitem group by 1"); + assertQuery(getSession(exchangeEncoding), "SELECT count(1), sum(IF(linenumber = 7, partkey)), sum(IF(linenumber = 5, partkey)), sum(IF(linenumber = 7, orderkey)) FROM lineitem"); + assertQuery(getSession(exchangeEncoding), "SELECT count(1), sum(partkey) FILTER (where linenumber = 7), sum(partkey) FILTER (where linenumber = 5), sum(orderkey) FILTER (where linenumber = 7) FROM lineitem"); + assertQuery(getSession(exchangeEncoding), "SELECT shipmode, count(1), sum(IF(linenumber = 7, partkey)), sum(IF(linenumber = 5, partkey)), sum(IF(linenumber = 7, orderkey)) FROM lineitem group by 1"); + assertQuery(getSession(exchangeEncoding), "SELECT shipmode, count(1), sum(partkey) FILTER (where linenumber = 7), sum(partkey) FILTER (where linenumber = 5), sum(orderkey) FILTER (where linenumber = 7) FROM lineitem group by 1"); // distinct limit assertQueryResultCount("SELECT orderkey FROM lineitem GROUP BY 1 LIMIT 17", 17); // aggregation with no grouping keys and no aggregates - assertQuery("with a as (select sum(nationkey) from nation) select x from a, unnest(array[1, 2,3]) as t(x)"); + assertQuery(getSession(exchangeEncoding), "with a as (select sum(nationkey) from nation) select x from a, unnest(array[1, 2,3]) as t(x)"); } - @Test - public void testGroupingSets() + @Test(dataProvider = "exchangeEncodingProvider") + public void testGroupingSets(String exchangeEncoding) { - assertQuery("SELECT orderstatus, orderpriority, count(1), min(orderkey) FROM orders GROUP BY GROUPING SETS ((orderstatus), (orderpriority))"); - assertQuery("SELECT orderstatus, orderpriority, count(1), min(orderkey) FROM orders GROUP BY CUBE (orderstatus, orderpriority)"); + assertQuery(getSession(exchangeEncoding), "SELECT orderstatus, orderpriority, count(1), min(orderkey) FROM orders GROUP BY GROUPING SETS ((orderstatus), (orderpriority))"); + assertQuery(getSession(exchangeEncoding), "SELECT orderstatus, orderpriority, count(1), min(orderkey) FROM orders GROUP BY CUBE (orderstatus, orderpriority)"); assertQuery("SELECT orderstatus, orderpriority, count(1), min(orderkey) FROM orders GROUP BY ROLLUP (orderstatus, orderpriority)"); // With grouping expression. - assertQuery("SELECT orderstatus, orderpriority, grouping(orderstatus), grouping(orderpriority), grouping(orderstatus, orderpriority), count(1), min(orderkey) FROM orders GROUP BY GROUPING SETS ((orderstatus), (orderpriority))"); - assertQuery("SELECT orderstatus, orderpriority, grouping(orderstatus), grouping(orderpriority), grouping(orderstatus, orderpriority), count(1), min(orderkey) FROM orders GROUP BY CUBE (orderstatus, orderpriority)"); - assertQuery("SELECT orderstatus, orderpriority, grouping(orderstatus), grouping(orderpriority), grouping(orderstatus, orderpriority), count(1), min(orderkey) FROM orders GROUP BY ROLLUP (orderstatus, orderpriority)"); + assertQuery(getSession(exchangeEncoding), "SELECT orderstatus, orderpriority, grouping(orderstatus), grouping(orderpriority), grouping(orderstatus, orderpriority), count(1), min(orderkey) FROM orders GROUP BY GROUPING SETS ((orderstatus), (orderpriority))"); + assertQuery(getSession(exchangeEncoding), "SELECT orderstatus, orderpriority, grouping(orderstatus), grouping(orderpriority), grouping(orderstatus, orderpriority), count(1), min(orderkey) FROM orders GROUP BY CUBE (orderstatus, orderpriority)"); + assertQuery(getSession(exchangeEncoding), "SELECT orderstatus, orderpriority, grouping(orderstatus), grouping(orderpriority), grouping(orderstatus, orderpriority), count(1), min(orderkey) FROM orders GROUP BY ROLLUP (orderstatus, orderpriority)"); // With aliased columns. - assertQuery("SELECT lna, lnb, SUM(quantity) FROM (SELECT linenumber lna, linenumber lnb, CAST(quantity AS BIGINT) quantity FROM lineitem) GROUP BY GROUPING SETS ((lna, lnb), (lna), (lnb), ())"); + assertQuery(getSession(exchangeEncoding), "SELECT lna, lnb, SUM(quantity) FROM (SELECT linenumber lna, linenumber lnb, CAST(quantity AS BIGINT) quantity FROM lineitem) GROUP BY GROUPING SETS ((lna, lnb), (lna), (lnb), ())"); } @Test @@ -135,27 +137,27 @@ public void testMixedDistinctAggregations() assertQuery(session, "SELECT max(orderstatus), COUNT(orderkey), sum(DISTINCT orderkey) FROM orders"); } - @Test - public void testEmptyGroupingSets() + @Test(dataProvider = "exchangeEncodingProvider") + public void testEmptyGroupingSets(String exchangeEncoding) { // Returns a single row with the global aggregation. - assertQuery("SELECT count(orderkey) FROM orders WHERE orderkey < 0 GROUP BY GROUPING SETS (())"); + assertQuery(getSession(exchangeEncoding), "SELECT count(orderkey) FROM orders WHERE orderkey < 0 GROUP BY GROUPING SETS (())"); // Returns 2 rows with global aggregation for the global grouping sets. - assertQuery("SELECT count(orderkey) FROM orders WHERE orderkey < 0 GROUP BY GROUPING SETS ((), ())"); + assertQuery(getSession(exchangeEncoding), "SELECT count(orderkey) FROM orders WHERE orderkey < 0 GROUP BY GROUPING SETS ((), ())"); // Returns a single row with the global aggregation. There are no rows for the orderkey group. - assertQuery("SELECT count(orderkey) FROM orders WHERE orderkey < 0 GROUP BY GROUPING SETS ((orderkey), ())"); + assertQuery(getSession(exchangeEncoding), "SELECT count(orderkey) FROM orders WHERE orderkey < 0 GROUP BY GROUPING SETS ((orderkey), ())"); // This is a shorthand for the above query. Returns a single row with the global aggregation. - assertQuery("SELECT count(orderkey) FROM orders WHERE orderkey < 0 GROUP BY CUBE (orderkey)"); + assertQuery(getSession(exchangeEncoding), "SELECT count(orderkey) FROM orders WHERE orderkey < 0 GROUP BY CUBE (orderkey)"); - assertQuery("SELECT count(orderkey) FROM orders WHERE orderkey < 0 GROUP BY ROLLUP (orderkey)"); + assertQuery(getSession(exchangeEncoding), "SELECT count(orderkey) FROM orders WHERE orderkey < 0 GROUP BY ROLLUP (orderkey)"); // Returns a single row with NULL orderkey. - assertQuery("SELECT orderkey FROM orders WHERE orderkey < 0 GROUP BY CUBE (orderkey)"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey FROM orders WHERE orderkey < 0 GROUP BY CUBE (orderkey)"); - assertQuery("SELECT orderkey FROM orders WHERE orderkey < 0 GROUP BY ROLLUP (orderkey)"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey FROM orders WHERE orderkey < 0 GROUP BY ROLLUP (orderkey)"); } @Test @@ -164,33 +166,33 @@ public void testStreamingAggregation() assertQuery("SELECT name, (SELECT max(name) FROM region WHERE regionkey = nation.regionkey AND length(name) > length(nation.name)) FROM nation"); } - @Test - public void testApproxDistinct() + @Test(dataProvider = "exchangeEncodingProvider") + public void testApproxDistinct(String exchangeEncoding) { // low cardinality -> expect exact results - assertQuery("SELECT approx_distinct(linenumber) FROM lineitem"); - assertQuery("SELECT orderkey, approx_distinct(linenumber) FROM lineitem GROUP BY 1"); + assertQuery(getSession(exchangeEncoding), "SELECT approx_distinct(linenumber) FROM lineitem"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey, approx_distinct(linenumber) FROM lineitem GROUP BY 1"); // high cardinality -> results may not be exact - assertQuerySucceeds("SELECT approx_distinct(orderkey) FROM lineitem"); - assertQuerySucceeds("SELECT linenumber, approx_distinct(orderkey) FROM lineitem GROUP BY 1"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_distinct(orderkey) FROM lineitem"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT linenumber, approx_distinct(orderkey) FROM lineitem GROUP BY 1"); // approx_set + cardinality - assertQuery("SELECT cardinality(approx_set(linenumber)) FROM lineitem"); - assertQuery("SELECT orderkey, cardinality(approx_set(linenumber)) FROM lineitem GROUP BY 1"); + assertQuery(getSession(exchangeEncoding), "SELECT cardinality(approx_set(linenumber)) FROM lineitem"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey, cardinality(approx_set(linenumber)) FROM lineitem GROUP BY 1"); // Verify that Velox can read HLL binaries written by Java Presto. - assertQuery("SELECT cardinality(cast(hll as hyperloglog)) FROM orders_hll"); - assertQuery("SELECT cardinality(merge(cast(hll as hyperloglog))) FROM orders_hll"); + assertQuery(getSession(exchangeEncoding), "SELECT cardinality(cast(hll as hyperloglog)) FROM orders_hll"); + assertQuery(getSession(exchangeEncoding), "SELECT cardinality(merge(cast(hll as hyperloglog))) FROM orders_hll"); } - @Test - public void testApproxMostFrequent() + @Test(dataProvider = "exchangeEncodingProvider") + public void testApproxMostFrequent(String exchangeEncoding) { - assertQuery("SELECT approx_most_frequent(3, linenumber, 1000) FROM lineitem"); - assertQuerySucceeds("SELECT orderkey, approx_most_frequent(3, linenumber, 10) FROM lineitem GROUP BY 1"); - assertQuerySucceeds("SELECT approx_most_frequent(3, orderkey, 1000) FROM lineitem"); - assertQuerySucceeds("SELECT linenumber, approx_most_frequent(3, orderkey, 10) FROM lineitem GROUP BY 1"); + assertQuery(getSession(exchangeEncoding), "SELECT approx_most_frequent(3, linenumber, 1000) FROM lineitem"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT orderkey, approx_most_frequent(3, linenumber, 10) FROM lineitem GROUP BY 1"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT approx_most_frequent(3, orderkey, 1000) FROM lineitem"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT linenumber, approx_most_frequent(3, orderkey, 10) FROM lineitem GROUP BY 1"); } @Test @@ -242,23 +244,23 @@ public void testMinMax() ".*Aggregate function signature is not supported.*"); } - @Test - public void testMinMaxBy() + @Test(dataProvider = "exchangeEncodingProvider") + public void testMinMaxBy(String exchangeEncoding) { // We use filters to make queries deterministic. - assertQuery("SELECT max_by(partkey, orderkey), max_by(quantity, orderkey), max_by(tax_as_real, orderkey) FROM lineitem where shipmode='MAIL'"); - assertQuery("SELECT min_by(partkey, orderkey), min_by(quantity, orderkey), min_by(tax_as_real, orderkey) FROM lineitem where shipmode='MAIL'"); + assertQuery(getSession(exchangeEncoding), "SELECT max_by(partkey, orderkey), max_by(quantity, orderkey), max_by(tax_as_real, orderkey) FROM lineitem where shipmode='MAIL'"); + assertQuery(getSession(exchangeEncoding), "SELECT min_by(partkey, orderkey), min_by(quantity, orderkey), min_by(tax_as_real, orderkey) FROM lineitem where shipmode='MAIL'"); - assertQuery("SELECT max_by(orderkey, extendedprice), max_by(orderkey, cast(extendedprice as REAL)) FROM lineitem"); - assertQuery("SELECT min_by(orderkey, extendedprice), min_by(orderkey, cast(extendedprice as REAL)) FROM lineitem where shipmode='MAIL'"); + assertQuery(getSession(exchangeEncoding), "SELECT max_by(orderkey, extendedprice), max_by(orderkey, cast(extendedprice as REAL)) FROM lineitem"); + assertQuery(getSession(exchangeEncoding), "SELECT min_by(orderkey, extendedprice), min_by(orderkey, cast(extendedprice as REAL)) FROM lineitem where shipmode='MAIL'"); // 3 argument variant of max_by, min_by - assertQuery("SELECT max_by(orderkey, linenumber, 5), min_by(orderkey, linenumber, 5) FROM lineitem GROUP BY orderkey"); + assertQuery(getSession(exchangeEncoding), "SELECT max_by(orderkey, linenumber, 5), min_by(orderkey, linenumber, 5) FROM lineitem GROUP BY orderkey"); // Non-numeric arguments - assertQuery("SELECT max_by(row(orderkey, custkey), orderkey, 5), min_by(row(orderkey, custkey), orderkey, 5) FROM orders"); - assertQuery("SELECT max_by(row(orderkey, linenumber), linenumber, 5), min_by(row(orderkey, linenumber), linenumber, 5) FROM lineitem GROUP BY orderkey"); - assertQuery("SELECT orderkey, MAX_BY(v, c, 5), MIN_BY(v, c, 5) FROM " + + assertQuery(getSession(exchangeEncoding), "SELECT max_by(row(orderkey, custkey), orderkey, 5), min_by(row(orderkey, custkey), orderkey, 5) FROM orders"); + assertQuery(getSession(exchangeEncoding), "SELECT max_by(row(orderkey, linenumber), linenumber, 5), min_by(row(orderkey, linenumber), linenumber, 5) FROM lineitem GROUP BY orderkey"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey, MAX_BY(v, c, 5), MIN_BY(v, c, 5) FROM " + "(SELECT orderkey, 'This is a long line ' || CAST(orderkey AS VARCHAR) AS v, 'This is also a really long line ' || CAST(linenumber AS VARCHAR) AS c FROM lineitem) " + "GROUP BY 1"); } @@ -334,16 +336,16 @@ public void testChecksum() assertQuery("SELECT checksum(a), checksum(b) FROM (VALUES (CAST('999999999999999999' AS DECIMAL(18, 0)), CAST('99999999999999999999999999999999999999' AS DECIMAL(38, 0))), (CAST('-999999999999999999' as DECIMAL(18, 0)), CAST('-99999999999999999999999999999999999999' AS DECIMAL(38, 0)))) AS t(a, b)"); } - @Test - public void testArbitrary() + @Test(dataProvider = "exchangeEncodingProvider") + public void testArbitrary(String exchangeEncoding) { // Non-deterministic queries - assertQuerySucceeds("SELECT orderkey, any_value(comment) FROM lineitem GROUP BY 1"); - assertQuerySucceeds("SELECT orderkey, arbitrary(discount) FROM lineitem GROUP BY 1"); - assertQuerySucceeds("SELECT orderkey, any_value(linenumber) FROM lineitem GROUP BY 1"); - assertQuerySucceeds("SELECT orderkey, arbitrary(linenumber_as_smallint) FROM lineitem GROUP BY 1"); - assertQuerySucceeds("SELECT orderkey, any_value(linenumber_as_tinyint) FROM lineitem GROUP BY 1"); - assertQuerySucceeds("SELECT orderkey, arbitrary(tax_as_real) FROM lineitem GROUP BY 1"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT orderkey, any_value(comment) FROM lineitem GROUP BY 1"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT orderkey, arbitrary(discount) FROM lineitem GROUP BY 1"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT orderkey, any_value(linenumber) FROM lineitem GROUP BY 1"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT orderkey, arbitrary(linenumber_as_smallint) FROM lineitem GROUP BY 1"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT orderkey, any_value(linenumber_as_tinyint) FROM lineitem GROUP BY 1"); + assertQuerySucceeds(getSession(exchangeEncoding), "SELECT orderkey, arbitrary(tax_as_real) FROM lineitem GROUP BY 1"); } @Test @@ -352,11 +354,11 @@ public void testMultiMapAgg() assertQuery("SELECT orderkey, multimap_agg(linenumber % 3, discount) FROM lineitem GROUP BY 1"); } - @Test - public void testMarkDistinct() + @Test(dataProvider = "exchangeEncodingProvider") + public void testMarkDistinct(String exchangeEncoding) { - assertQuery("SELECT count(distinct orderkey), count(distinct linenumber) FROM lineitem"); - assertQuery("SELECT orderkey, count(distinct comment), sum(distinct linenumber) FROM lineitem GROUP BY 1"); + assertQuery(getSession(exchangeEncoding), "SELECT count(distinct orderkey), count(distinct linenumber) FROM lineitem"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey, count(distinct comment), sum(distinct linenumber) FROM lineitem GROUP BY 1"); } @Test @@ -371,12 +373,12 @@ public void testDistinct() ".*Aggregations over sorted unique values are not supported yet"); } - @Test - public void testReduceAgg() + @Test(dataProvider = "exchangeEncodingProvider") + public void testReduceAgg(String exchangeEncoding) { - assertQuery("SELECT reduce_agg(orderkey, 0, (x, y) -> x + y, (x, y) -> x + y) FROM orders"); - assertQuery("SELECT orderkey, reduce_agg(linenumber, 0, (x, y) -> x + y, (x, y) -> x + y) FROM lineitem GROUP BY orderkey"); - assertQuery("SELECT orderkey, array_sort(reduce_agg(linenumber, array[], (s, x) -> s || x, (s, s2) -> s || s2)) FROM lineitem GROUP BY orderkey"); + assertQuery(getSession(exchangeEncoding), "SELECT reduce_agg(orderkey, 0, (x, y) -> x + y, (x, y) -> x + y) FROM orders"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey, reduce_agg(linenumber, 0, (x, y) -> x + y, (x, y) -> x + y) FROM lineitem GROUP BY orderkey"); + assertQuery(getSession(exchangeEncoding), "SELECT orderkey, array_sort(reduce_agg(linenumber, array[], (s, x) -> s || x, (s, s2) -> s || s2)) FROM lineitem GROUP BY orderkey"); } @Test @@ -398,4 +400,20 @@ private void assertQueryResultCount(String sql, int expectedResultCount) { assertEquals(getQueryRunner().execute(sql).getRowCount(), expectedResultCount); } + + @DataProvider(name = "exchangeEncodingProvider") + public Object[][] exchangeEncodingProvider() + { + return new Object[][] { + {"with_columnar_exchange_encoding"}, + {"with_row_wise_exchange_encoding"}, + }; + } + + private Session getSession(String encoding) + { + return Session.builder(getSession()) + .setSystemProperty(NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, "with_row_wise_exchange_encoding".equals(encoding) ? "1" : "1000") + .build(); + } } diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeGeneralQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeGeneralQueries.java index 646795365133a..6c2ed80ac9fc4 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeGeneralQueries.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/AbstractTestNativeGeneralQueries.java @@ -19,6 +19,7 @@ import com.facebook.presto.spi.ConnectorTableMetadata; import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.sql.analyzer.FeaturesConfig; +import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.testing.MaterializedResult; import com.facebook.presto.testing.MaterializedRow; import com.facebook.presto.testing.QueryRunner; @@ -38,6 +39,7 @@ import static com.facebook.presto.SystemSessionProperties.INLINE_SQL_FUNCTIONS; import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE; import static com.facebook.presto.SystemSessionProperties.KEY_BASED_SAMPLING_ENABLED; +import static com.facebook.presto.SystemSessionProperties.NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.hive.HiveStorageFormat.DWRF; @@ -61,6 +63,8 @@ import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createSupplier; import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createTableToTestHiddenColumns; import static com.facebook.presto.spi.plan.AggregationNode.Step.SINGLE; +import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR; +import static com.facebook.presto.spi.plan.ExchangeEncoding.ROW_WISE; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange; @@ -71,6 +75,7 @@ import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.project; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.singleGroupingSet; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan; +import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER; @@ -78,6 +83,7 @@ import static com.facebook.presto.transaction.TransactionBuilder.transaction; import static java.lang.String.format; import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -1851,6 +1857,48 @@ public void testUnicodeInJson() } } + @Test + public void testRowWiseExchange() + { + Session session = Session.builder(getSession()) + .setSystemProperty(NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, "10") + .build(); + + assertQuery(session, "SELECT orderkey, count(*) FROM orders GROUP BY orderkey", plan -> { + searchFrom(plan.getRoot()) + .where(node -> node instanceof ExchangeNode + && ((ExchangeNode) node).getScope() == REMOTE_STREAMING + && ((ExchangeNode) node).getPartitioningScheme().isSingleOrBroadcastOrArbitrary() + && ((ExchangeNode) node).getPartitioningScheme().getEncoding() == COLUMNAR) + .findOnlyElement(); + searchFrom(plan.getRoot()) + .where(node -> node instanceof ExchangeNode + && ((ExchangeNode) node).getScope() == REMOTE_STREAMING + && !((ExchangeNode) node).getPartitioningScheme().isSingleOrBroadcastOrArbitrary() + && ((ExchangeNode) node).getPartitioningScheme().getEncoding() == COLUMNAR) + .findOnlyElement(); + }); + + String wideAggregation = "SELECT orderkey, max(orderdate), max(comment), min(comment), sum(totalprice), max(totalprice) FROM orders GROUP BY orderkey"; + assertQuery(session, wideAggregation, plan -> { + searchFrom(plan.getRoot()) + .where(node -> node instanceof ExchangeNode + && ((ExchangeNode) node).getScope() == REMOTE_STREAMING + && ((ExchangeNode) node).getPartitioningScheme().isSingleOrBroadcastOrArbitrary() + && ((ExchangeNode) node).getPartitioningScheme().getEncoding() == COLUMNAR) + .findOnlyElement(); + searchFrom(plan.getRoot()) + .where(node -> node instanceof ExchangeNode + && ((ExchangeNode) node).getScope() == REMOTE_STREAMING + && !((ExchangeNode) node).getPartitioningScheme().isSingleOrBroadcastOrArbitrary() + && ((ExchangeNode) node).getPartitioningScheme().getEncoding() == ROW_WISE) + .findOnlyElement(); + }); + + assertThat(getQueryRunner().execute(session, "EXPLAIN (TYPE DISTRIBUTED) " + wideAggregation).getOnlyValue().toString()) + .contains("Output encoding: ROW_WISE"); + } + private void assertQueryResultCount(String sql, int expectedResultCount) { assertEquals(getQueryRunner().execute(sql).getRowCount(), expectedResultCount); diff --git a/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeGeneralQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeGeneralQueries.java index fba01435cd8cf..57131119a44e2 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeGeneralQueries.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeGeneralQueries.java @@ -67,4 +67,8 @@ public void testShowSessionWithoutJavaSessionProperties() {} @Override @Ignore public void testSetSessionJavaWorkerSessionProperty() {} + + @Override + @Ignore + public void testRowWiseExchange() {} } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSettingsRequirements.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSettingsRequirements.java index 2e0c402e1082f..b64e908fb4327 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSettingsRequirements.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkSettingsRequirements.java @@ -97,6 +97,7 @@ public static void setDefaults(FeaturesConfig config) config.setEnforceFixedDistributionForOutputOperator(true); config.setPrestoSparkAssignBucketToPartitionForPartitionedTableWriteEnabled(true); config.setTrackPartialAggregationHistory(false); + config.setPrestoSparkExecutionEnvironment(true); } public static void setDefaults(QueryManagerConfig config) diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorPartitioningHandle.java b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorPartitioningHandle.java index fdeba8b6e2c4f..5c4c2524734d7 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorPartitioningHandle.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/connector/ConnectorPartitioningHandle.java @@ -24,4 +24,14 @@ default boolean isCoordinatorOnly() { return false; } + + default boolean isBroadcast() + { + return false; + } + + default boolean isArbitrary() + { + return false; + } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/ExchangeEncoding.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/ExchangeEncoding.java new file mode 100644 index 0000000000000..2aa2cd105787f --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/ExchangeEncoding.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.plan; + +public enum ExchangeEncoding +{ + COLUMNAR, + ROW_WISE, +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/Partitioning.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/Partitioning.java index 52d0bb26fd535..f8b3ea457eccf 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/plan/Partitioning.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/Partitioning.java @@ -72,6 +72,11 @@ public List getArguments() return arguments; } + public boolean isSingleOrBroadcastOrArbitrary() + { + return handle.isSingleOrBroadcastOrArbitrary(); + } + public Set getVariableReferences() { return arguments.stream() diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PartitioningHandle.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PartitioningHandle.java index e3667f5996a81..f7572ea4ac55b 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PartitioningHandle.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PartitioningHandle.java @@ -71,6 +71,21 @@ public boolean isCoordinatorOnly() return connectorHandle.isCoordinatorOnly(); } + public boolean isBroadcast() + { + return connectorHandle.isBroadcast(); + } + + public boolean isArbitrary() + { + return connectorHandle.isArbitrary(); + } + + public boolean isSingleOrBroadcastOrArbitrary() + { + return isSingleNode() || isBroadcast() || isArbitrary(); + } + @Override public boolean equals(Object o) { diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PartitioningScheme.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PartitioningScheme.java index e01572fb1273f..244772fd50b8c 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PartitioningScheme.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PartitioningScheme.java @@ -24,6 +24,8 @@ import java.util.Set; import static com.facebook.presto.common.Utils.checkArgument; +import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR; +import static com.facebook.presto.spi.plan.ExchangeEncoding.ROW_WISE; import static java.lang.String.format; import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableSet; @@ -35,6 +37,7 @@ public class PartitioningScheme private final List outputLayout; private final Optional hashColumn; private final boolean replicateNullsAndAny; + private final ExchangeEncoding encoding; private final Optional bucketToPartition; public PartitioningScheme(Partitioning partitioning, List outputLayout) @@ -44,6 +47,7 @@ public PartitioningScheme(Partitioning partitioning, List outputLayout, @JsonProperty("hashColumn") Optional hashColumn, @JsonProperty("replicateNullsAndAny") boolean replicateNullsAndAny, + @JsonProperty("encoding") ExchangeEncoding encoding, @JsonProperty("bucketToPartition") Optional bucketToPartition) { this.partitioning = requireNonNull(partitioning, "partitioning is null"); @@ -79,6 +85,7 @@ public PartitioningScheme( checkArgument(!replicateNullsAndAny || columns.size() <= 1, "Must have at most one partitioning column when nullPartition is REPLICATE."); this.replicateNullsAndAny = replicateNullsAndAny; + this.encoding = requireNonNull(encoding, "encoding is null"); this.bucketToPartition = requireNonNull(bucketToPartition, "bucketToPartition is null"); } @@ -106,6 +113,12 @@ public boolean isReplicateNullsAndAny() return replicateNullsAndAny; } + @JsonProperty + public ExchangeEncoding getEncoding() + { + return encoding; + } + @JsonProperty public Optional getBucketToPartition() { @@ -114,7 +127,17 @@ public Optional getBucketToPartition() public PartitioningScheme withBucketToPartition(Optional bucketToPartition) { - return new PartitioningScheme(partitioning, outputLayout, hashColumn, replicateNullsAndAny, bucketToPartition); + return new PartitioningScheme(partitioning, outputLayout, hashColumn, replicateNullsAndAny, encoding, bucketToPartition); + } + + public PartitioningScheme withRowWiseEncoding() + { + return new PartitioningScheme(partitioning, outputLayout, hashColumn, replicateNullsAndAny, ROW_WISE, bucketToPartition); + } + + public boolean isSingleOrBroadcastOrArbitrary() + { + return partitioning.isSingleOrBroadcastOrArbitrary(); } @Override @@ -130,13 +153,14 @@ public boolean equals(Object o) return Objects.equals(partitioning, that.partitioning) && Objects.equals(outputLayout, that.outputLayout) && replicateNullsAndAny == that.replicateNullsAndAny && + encoding == that.encoding && Objects.equals(bucketToPartition, that.bucketToPartition); } @Override public int hashCode() { - return Objects.hash(partitioning, outputLayout, replicateNullsAndAny, bucketToPartition); + return Objects.hash(partitioning, outputLayout, replicateNullsAndAny, encoding, bucketToPartition); } @Override @@ -146,6 +170,7 @@ public String toString() ", outputLayout=" + outputLayout + ", hashChannel=" + hashColumn + ", replicateNullsAndAny=" + replicateNullsAndAny + + ", encoding=" + encoding + ", bucketToPartition=" + bucketToPartition + '}'; return sb;