Skip to content

Commit 099bd42

Browse files
committed
[native] Derive TableScan stream type as FIXED
In native a fixed number of drivers is created for a pipeline containing TableScan
1 parent 924d309 commit 099bd42

File tree

17 files changed

+127
-55
lines changed

17 files changed

+127
-55
lines changed

presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizers.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -891,7 +891,7 @@ public PlanOptimizers(
891891
// MergeJoinForSortedInputOptimizer can avoid the local exchange for a join operation
892892
// Should be placed after AddExchanges, but before AddLocalExchange
893893
// To replace the JoinNode to MergeJoin ahead of AddLocalExchange to avoid adding extra local exchange
894-
builder.add(new MergeJoinForSortedInputOptimizer(metadata));
894+
builder.add(new MergeJoinForSortedInputOptimizer(metadata, featuresConfig.isNativeExecutionEnabled()));
895895

896896
// Optimizers above this don't understand local exchanges, so be careful moving this.
897897
builder.add(new AddLocalExchanges(metadata, featuresConfig.isNativeExecutionEnabled()));
@@ -960,7 +960,7 @@ public PlanOptimizers(
960960
statsCalculator,
961961
costCalculator,
962962
ImmutableList.of(),
963-
ImmutableSet.of(new RuntimeReorderJoinSides(metadata))));
963+
ImmutableSet.of(new RuntimeReorderJoinSides(metadata, featuresConfig.isNativeExecutionEnabled()))));
964964
this.runtimeOptimizers = runtimeBuilder.build();
965965
}
966966

presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/JoinSwappingUtils.java

+13-10
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public static Optional<JoinNode> createRuntimeSwappedJoinNode(
6464
Metadata metadata,
6565
Lookup lookup,
6666
Session session,
67-
PlanNodeIdAllocator idAllocator)
67+
PlanNodeIdAllocator idAllocator,
68+
boolean nativeExecution)
6869
{
6970
JoinNode swapped = joinNode.flipChildren();
7071

@@ -76,7 +77,7 @@ public static Optional<JoinNode> createRuntimeSwappedJoinNode(
7677
PlanNode resolvedSwappedLeft = lookup.resolve(newLeft);
7778
if (resolvedSwappedLeft instanceof ExchangeNode && resolvedSwappedLeft.getSources().size() == 1) {
7879
// Ensure the new probe after skipping the local exchange will satisfy the required probe side property
79-
if (checkProbeSidePropertySatisfied(resolvedSwappedLeft.getSources().get(0), metadata, lookup, session)) {
80+
if (checkProbeSidePropertySatisfied(resolvedSwappedLeft.getSources().get(0), metadata, lookup, session, nativeExecution)) {
8081
newLeft = resolvedSwappedLeft.getSources().get(0);
8182
// The HashGenerationOptimizer will generate hashVariables and append to the output layout of the nodes following the same order. Therefore,
8283
// we use the index of the old hashVariable in the ExchangeNode output layout to retrieve the hashVariable from the new left node, and feed
@@ -100,7 +101,7 @@ public static Optional<JoinNode> createRuntimeSwappedJoinNode(
100101
.map(EquiJoinClause::getRight)
101102
.collect(toImmutableList());
102103
PlanNode newRight = swapped.getRight();
103-
if (!checkBuildSidePropertySatisfied(swapped.getRight(), buildJoinVariables, metadata, lookup, session)) {
104+
if (!checkBuildSidePropertySatisfied(swapped.getRight(), buildJoinVariables, metadata, lookup, session, nativeExecution)) {
104105
if (getTaskConcurrency(session) > 1) {
105106
newRight = systemPartitionedExchange(
106107
idAllocator.getNextId(),
@@ -132,7 +133,7 @@ public static Optional<JoinNode> createRuntimeSwappedJoinNode(
132133
}
133134

134135
// Check if the new probe side after removing unnecessary local exchange is valid.
135-
public static boolean checkProbeSidePropertySatisfied(PlanNode node, Metadata metadata, Lookup lookup, Session session)
136+
public static boolean checkProbeSidePropertySatisfied(PlanNode node, Metadata metadata, Lookup lookup, Session session, boolean nativeExecution)
136137
{
137138
StreamPreferredProperties requiredProbeProperty;
138139
if (isSpillEnabled(session) && isJoinSpillingEnabled(session)) {
@@ -141,7 +142,7 @@ public static boolean checkProbeSidePropertySatisfied(PlanNode node, Metadata me
141142
else {
142143
requiredProbeProperty = defaultParallelism(session);
143144
}
144-
StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, lookup, session);
145+
StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, lookup, session, nativeExecution);
145146
return requiredProbeProperty.isSatisfiedBy(nodeProperty);
146147
}
147148

@@ -151,7 +152,8 @@ private static boolean checkBuildSidePropertySatisfied(
151152
List<VariableReferenceExpression> partitioningColumns,
152153
Metadata metadata,
153154
Lookup lookup,
154-
Session session)
155+
Session session,
156+
boolean nativeExecution)
155157
{
156158
StreamPreferredProperties requiredBuildProperty;
157159
if (getTaskConcurrency(session) > 1) {
@@ -160,21 +162,22 @@ private static boolean checkBuildSidePropertySatisfied(
160162
else {
161163
requiredBuildProperty = singleStream();
162164
}
163-
StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, lookup, session);
165+
StreamPropertyDerivations.StreamProperties nodeProperty = derivePropertiesRecursively(node, metadata, lookup, session, nativeExecution);
164166
return requiredBuildProperty.isSatisfiedBy(nodeProperty);
165167
}
166168

167169
private static StreamPropertyDerivations.StreamProperties derivePropertiesRecursively(
168170
PlanNode node,
169171
Metadata metadata,
170172
Lookup lookup,
171-
Session session)
173+
Session session,
174+
boolean nativeExecution)
172175
{
173176
PlanNode actual = lookup.resolve(node);
174177
List<StreamPropertyDerivations.StreamProperties> inputProperties = actual.getSources().stream()
175-
.map(source -> derivePropertiesRecursively(source, metadata, lookup, session))
178+
.map(source -> derivePropertiesRecursively(source, metadata, lookup, session, nativeExecution))
176179
.collect(toImmutableList());
177-
return StreamPropertyDerivations.deriveProperties(actual, inputProperties, metadata, session);
180+
return StreamPropertyDerivations.deriveProperties(actual, inputProperties, metadata, session, nativeExecution);
178181
}
179182

180183
public static boolean isBelowBroadcastLimit(PlanNode planNode, Rule.Context context)

presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/RuntimeReorderJoinSides.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,12 @@ public class RuntimeReorderJoinSides
4444
private static final Pattern<JoinNode> PATTERN = join();
4545

4646
private final Metadata metadata;
47+
private final boolean nativeExecution;
4748

48-
public RuntimeReorderJoinSides(Metadata metadata)
49+
public RuntimeReorderJoinSides(Metadata metadata, boolean nativeExecution)
4950
{
5051
this.metadata = requireNonNull(metadata, "metadata is null");
52+
this.nativeExecution = nativeExecution;
5153
}
5254

5355
@Override
@@ -97,7 +99,7 @@ public Result apply(JoinNode joinNode, Captures captures, Context context)
9799
return Result.empty();
98100
}
99101

100-
Optional<JoinNode> rewrittenNode = createRuntimeSwappedJoinNode(joinNode, metadata, context.getLookup(), context.getSession(), context.getIdAllocator());
102+
Optional<JoinNode> rewrittenNode = createRuntimeSwappedJoinNode(joinNode, metadata, context.getLookup(), context.getSession(), context.getIdAllocator(), nativeExecution);
101103
if (rewrittenNode.isPresent()) {
102104
log.debug(format("Probe size: %.2f is smaller than Build size: %.2f => invoke runtime join swapping on JoinNode ID: %s.", leftOutputSizeInBytes, rightOutputSizeInBytes, joinNode.getId()));
103105
return Result.ofPlanNode(rewrittenNode.get());

presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -887,7 +887,7 @@ public PlanWithProperties visitIndexJoin(IndexJoinNode node, StreamPreferredProp
887887
parentPreferences.constrainTo(node.getProbeSource().getOutputVariables()).withDefaultParallelism(session));
888888

889889
// index source does not support local parallel and must produce a single stream
890-
StreamProperties indexStreamProperties = derivePropertiesRecursively(node.getIndexSource(), metadata, session);
890+
StreamProperties indexStreamProperties = derivePropertiesRecursively(node.getIndexSource(), metadata, session, nativeExecution);
891891
checkArgument(indexStreamProperties.getDistribution() == SINGLE, "index source must be single stream");
892892
PlanWithProperties index = new PlanWithProperties(node.getIndexSource(), indexStreamProperties);
893893

@@ -983,12 +983,12 @@ private PlanWithProperties rebaseAndDeriveProperties(PlanNode node, List<PlanWit
983983

984984
private PlanWithProperties deriveProperties(PlanNode result, StreamProperties inputProperties)
985985
{
986-
return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session));
986+
return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session, nativeExecution));
987987
}
988988

989989
private PlanWithProperties deriveProperties(PlanNode result, List<StreamProperties> inputProperties)
990990
{
991-
return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session));
991+
return new PlanWithProperties(result, StreamPropertyDerivations.deriveProperties(result, inputProperties, metadata, session, nativeExecution));
992992
}
993993

994994
private PlanWithProperties accept(PlanNode node, StreamPreferredProperties context)

presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergeJoinForSortedInputOptimizer.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@ public class MergeJoinForSortedInputOptimizer
4040
implements PlanOptimizer
4141
{
4242
private final Metadata metadata;
43+
private final boolean nativeExecution;
4344
private boolean isEnabledForTesting;
4445

45-
public MergeJoinForSortedInputOptimizer(Metadata metadata)
46+
public MergeJoinForSortedInputOptimizer(Metadata metadata, boolean nativeExecution)
4647
{
4748
this.metadata = requireNonNull(metadata, "metadata is null");
49+
this.nativeExecution = nativeExecution;
4850
}
4951

5052
@Override
@@ -139,8 +141,8 @@ public PlanNode visitJoin(JoinNode node, RewriteContext<Void> context)
139141
private boolean meetsDataRequirement(PlanNode left, PlanNode right, JoinNode node)
140142
{
141143
// Acquire data properties for both left and right side
142-
StreamPropertyDerivations.StreamProperties leftProperties = StreamPropertyDerivations.derivePropertiesRecursively(left, metadata, session);
143-
StreamPropertyDerivations.StreamProperties rightProperties = StreamPropertyDerivations.derivePropertiesRecursively(right, metadata, session);
144+
StreamPropertyDerivations.StreamProperties leftProperties = StreamPropertyDerivations.derivePropertiesRecursively(left, metadata, session, nativeExecution);
145+
StreamPropertyDerivations.StreamProperties rightProperties = StreamPropertyDerivations.derivePropertiesRecursively(right, metadata, session, nativeExecution);
144146

145147
List<VariableReferenceExpression> leftJoinColumns = node.getCriteria().stream().map(EquiJoinClause::getLeft).collect(toImmutableList());
146148
List<VariableReferenceExpression> rightJoinColumns = node.getCriteria().stream()

presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java

+16-13
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.facebook.presto.spi.plan.WindowNode;
4242
import com.facebook.presto.spi.relation.RowExpression;
4343
import com.facebook.presto.spi.relation.VariableReferenceExpression;
44+
import com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties.StreamDistribution;
4445
import com.facebook.presto.sql.planner.plan.ApplyNode;
4546
import com.facebook.presto.sql.planner.plan.AssignUniqueId;
4647
import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode;
@@ -66,8 +67,6 @@
6667
import com.google.common.collect.ImmutableSet;
6768
import com.google.common.collect.Iterables;
6869

69-
import javax.annotation.concurrent.Immutable;
70-
7170
import java.util.HashMap;
7271
import java.util.HashSet;
7372
import java.util.List;
@@ -97,20 +96,20 @@ public final class StreamPropertyDerivations
9796
{
9897
private StreamPropertyDerivations() {}
9998

100-
public static StreamProperties derivePropertiesRecursively(PlanNode node, Metadata metadata, Session session)
99+
public static StreamProperties derivePropertiesRecursively(PlanNode node, Metadata metadata, Session session, boolean nativeExecution)
101100
{
102101
List<StreamProperties> inputProperties = node.getSources().stream()
103-
.map(source -> derivePropertiesRecursively(source, metadata, session))
102+
.map(source -> derivePropertiesRecursively(source, metadata, session, nativeExecution))
104103
.collect(toImmutableList());
105-
return StreamPropertyDerivations.deriveProperties(node, inputProperties, metadata, session);
104+
return StreamPropertyDerivations.deriveProperties(node, inputProperties, metadata, session, nativeExecution);
106105
}
107106

108-
public static StreamProperties deriveProperties(PlanNode node, StreamProperties inputProperties, Metadata metadata, Session session)
107+
public static StreamProperties deriveProperties(PlanNode node, StreamProperties inputProperties, Metadata metadata, Session session, boolean nativeExecution)
109108
{
110-
return deriveProperties(node, ImmutableList.of(inputProperties), metadata, session);
109+
return deriveProperties(node, ImmutableList.of(inputProperties), metadata, session, nativeExecution);
111110
}
112111

113-
public static StreamProperties deriveProperties(PlanNode node, List<StreamProperties> inputProperties, Metadata metadata, Session session)
112+
public static StreamProperties deriveProperties(PlanNode node, List<StreamProperties> inputProperties, Metadata metadata, Session session, boolean nativeExecution)
114113
{
115114
requireNonNull(node, "node is null");
116115
requireNonNull(inputProperties, "inputProperties is null");
@@ -128,7 +127,7 @@ public static StreamProperties deriveProperties(PlanNode node, List<StreamProper
128127
metadata,
129128
session);
130129

131-
StreamProperties result = node.accept(new Visitor(metadata, session), inputProperties)
130+
StreamProperties result = node.accept(new Visitor(metadata, session, nativeExecution), inputProperties)
132131
.withOtherActualProperties(otherProperties);
133132

134133
result.getPartitioningColumns().ifPresent(columns ->
@@ -148,11 +147,13 @@ private static class Visitor
148147
{
149148
private final Metadata metadata;
150149
private final Session session;
150+
private final boolean nativeExecution;
151151

152-
private Visitor(Metadata metadata, Session session)
152+
private Visitor(Metadata metadata, Session session, boolean nativeExecution)
153153
{
154154
this.metadata = metadata;
155155
this.session = session;
156+
this.nativeExecution = nativeExecution;
156157
}
157158

158159
@Override
@@ -292,13 +293,16 @@ public StreamProperties visitTableScan(TableScanNode node, List<StreamProperties
292293
Optional<Set<VariableReferenceExpression>> streamPartitionSymbols = layout.getStreamPartitioningColumns()
293294
.flatMap(columns -> getNonConstantVariables(columns, assignments, constants));
294295

296+
// Native execution creates a fixed number of drivers for TableScan pipelines
297+
StreamDistribution streamDistribution = nativeExecution ? FIXED : MULTIPLE;
298+
295299
// if we are partitioned on empty set, we must say multiple of unknown partitioning, because
296300
// the connector does not guarantee a single split in this case (since it might not understand
297301
// that the value is a constant).
298302
if (streamPartitionSymbols.isPresent() && streamPartitionSymbols.get().isEmpty()) {
299-
return new StreamProperties(MULTIPLE, Optional.empty(), false);
303+
return new StreamProperties(streamDistribution, Optional.empty(), false);
300304
}
301-
return new StreamProperties(MULTIPLE, streamPartitionSymbols, false);
305+
return new StreamProperties(streamDistribution, streamPartitionSymbols, false);
302306
}
303307

304308
private Optional<Set<VariableReferenceExpression>> getNonConstantVariables(Set<ColumnHandle> columnHandles, Map<ColumnHandle, VariableReferenceExpression> assignments, Set<ColumnHandle> globalConstants)
@@ -641,7 +645,6 @@ public StreamProperties visitRemoteSource(RemoteSourceNode node, List<StreamProp
641645
}
642646
}
643647

644-
@Immutable
645648
public static final class StreamProperties
646649
{
647650
public enum StreamDistribution

presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/PlanChecker.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean noExchange, PlanChecke
6969
new TypeValidator(),
7070
new VerifyOnlyOneOutputNode(),
7171
new VerifyNoFilteredAggregations(),
72-
new ValidateAggregationsWithDefaultValues(noExchange),
73-
new ValidateStreamingAggregations(),
72+
new ValidateAggregationsWithDefaultValues(noExchange, featuresConfig.isNativeExecutionEnabled()),
73+
new ValidateStreamingAggregations(featuresConfig.isNativeExecutionEnabled()),
7474
new VerifyNoIntermediateFormExpression(),
7575
new VerifyProjectionLocality(),
7676
new DynamicFiltersChecker(),

presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateAggregationsWithDefaultValues.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,12 @@ public class ValidateAggregationsWithDefaultValues
5252
implements Checker
5353
{
5454
private final boolean noExchange;
55+
private final boolean nativeExecution;
5556

56-
public ValidateAggregationsWithDefaultValues(boolean noExchange)
57+
public ValidateAggregationsWithDefaultValues(boolean noExchange, boolean nativeExecution)
5758
{
5859
this.noExchange = noExchange;
60+
this.nativeExecution = nativeExecution;
5961
}
6062

6163
@Override
@@ -121,7 +123,7 @@ public Optional<SeenExchanges> visitAggregation(AggregationNode node, Void conte
121123
if (!seenExchanges.localRepartitionExchange) {
122124
// No local repartition exchange between final and partial aggregation.
123125
// Make sure that final aggregation operators are executed by single thread.
124-
StreamProperties localProperties = StreamPropertyDerivations.derivePropertiesRecursively(node, metadata, session);
126+
StreamProperties localProperties = StreamPropertyDerivations.derivePropertiesRecursively(node, metadata, session, nativeExecution);
125127
checkArgument(localProperties.isSingleStream(),
126128
"Final aggregation with default value not separated from partial aggregation by local hash exchange");
127129
}

presto-main/src/main/java/com/facebook/presto/sql/planner/sanity/ValidateStreamingAggregations.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -41,22 +41,31 @@
4141
public class ValidateStreamingAggregations
4242
implements Checker
4343
{
44+
private final boolean nativeExecution;
45+
46+
public ValidateStreamingAggregations(boolean nativeExecution)
47+
{
48+
this.nativeExecution = nativeExecution;
49+
}
50+
4451
@Override
4552
public void validate(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector)
4653
{
47-
planNode.accept(new Visitor(session, metadata), null);
54+
planNode.accept(new Visitor(session, metadata, nativeExecution), null);
4855
}
4956

5057
private static final class Visitor
5158
extends InternalPlanVisitor<Void, Void>
5259
{
5360
private final Session session;
5461
private final Metadata metadata;
62+
private final boolean nativeExecution;
5563

56-
private Visitor(Session session, Metadata metadata)
64+
private Visitor(Session session, Metadata metadata, boolean nativeExecution)
5765
{
5866
this.session = session;
5967
this.metadata = metadata;
68+
this.nativeExecution = nativeExecution;
6069
}
6170

6271
@Override
@@ -73,7 +82,7 @@ public Void visitAggregation(AggregationNode node, Void context)
7382
return null;
7483
}
7584

76-
StreamProperties properties = derivePropertiesRecursively(node.getSource(), metadata, session);
85+
StreamProperties properties = derivePropertiesRecursively(node.getSource(), metadata, session, nativeExecution);
7786

7887
List<LocalProperty<VariableReferenceExpression>> desiredProperties = ImmutableList.of(new GroupingProperty<>(node.getPreGroupedVariables()));
7988
Iterator<Optional<LocalProperty<VariableReferenceExpression>>> matchIterator = LocalProperties.match(properties.getLocalProperties(), desiredProperties).iterator();

0 commit comments

Comments
 (0)