Skip to content

Commit

Permalink
Rename forceSingleNode to noExchange
Browse files Browse the repository at this point in the history
forceSingleNode can be confusing since it hints fragmenting / scheduling mode
However this is mainly a plan optimizers decision to not generate distributed plan
  • Loading branch information
kewang1024 authored and arhimondr committed Dec 5, 2024
1 parent fc52b76 commit e535f7c
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public GroupedExecutionTagger.GroupedExecutionProperties visitJoin(JoinNode node
GroupedExecutionTagger.GroupedExecutionProperties right = node.getRight().accept(this, null);

if (!node.getDistributionType().isPresent() || !groupedExecutionEnabled) {
// This is possible when the optimizers is invoked with `forceSingleNode` set to true.
// This is possible when the optimizers is invoked with `noExchange` set to true.
return GroupedExecutionTagger.GroupedExecutionProperties.notCapable();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,19 @@ public PlanFragmenter(Metadata metadata, NodePartitioningManager nodePartitionin
this.singleNodePlanChecker = new PlanChecker(requireNonNull(featuresConfig, "featuresConfig is null"), true, planCheckerProviderManager);
}

public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNode, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
public SubPlan createSubPlans(Session session, Plan plan, boolean noExchange, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
{
VariableAllocator variableAllocator = new VariableAllocator(plan.getTypes().allVariables());
return createSubPlans(session, plan, forceSingleNode, idAllocator, variableAllocator, warningCollector);
return createSubPlans(session, plan, noExchange, idAllocator, variableAllocator, warningCollector);
}

public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNode, PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, WarningCollector warningCollector)
public SubPlan createSubPlans(Session session, Plan plan, boolean noExchange, PlanNodeIdAllocator idAllocator, VariableAllocator variableAllocator, WarningCollector warningCollector)
{
Fragmenter fragmenter = new Fragmenter(
session,
metadata,
plan.getStatsAndCosts(),
forceSingleNode ? singleNodePlanChecker : distributedPlanChecker,
noExchange ? singleNodePlanChecker : distributedPlanChecker,
warningCollector,
idAllocator,
variableAllocator,
Expand All @@ -85,13 +85,13 @@ public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNod
FragmentProperties properties = new FragmentProperties(new PartitioningScheme(
Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()),
plan.getRoot().getOutputVariables()));
if (forceSingleNode || isForceSingleNodeOutput(session)) {
if (noExchange || isForceSingleNodeOutput(session)) {
properties = properties.setSingleNodeDistribution();
}
PlanNode root = SimplePlanRewriter.rewriteWith(fragmenter, plan.getRoot(), properties);

SubPlan subPlan = fragmenter.buildRootFragment(root, properties);
return finalizeSubPlan(subPlan, config, metadata, nodePartitioningManager, session, forceSingleNode, warningCollector, subPlan.getFragment().getPartitioning());
return finalizeSubPlan(subPlan, config, metadata, nodePartitioningManager, session, noExchange, warningCollector, subPlan.getFragment().getPartitioning());
}

private static class Fragmenter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private PlanFragmenterUtils() {}
* @param metadata
* @param nodePartitioningManager
* @param session
* @param forceSingleNode
* @param noExchange
* @param warningCollector
* @return the final SubPlan for execution
*/
Expand All @@ -89,12 +89,12 @@ public static SubPlan finalizeSubPlan(
Metadata metadata,
NodePartitioningManager nodePartitioningManager,
Session session,
boolean forceSingleNode,
boolean noExchange,
WarningCollector warningCollector,
PartitioningHandle partitioningHandle)
{
subPlan = reassignPartitioningHandleIfNecessary(metadata, session, subPlan, partitioningHandle);
if (!forceSingleNode) {
if (!noExchange) {
// grouped execution is not supported for SINGLE_DISTRIBUTION
subPlan = analyzeGroupedExecution(session, subPlan, false, metadata, nodePartitioningManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void destroy()
public PlanOptimizers(
Metadata metadata,
SqlParser sqlParser,
boolean forceSingleNode,
boolean noExchange,
MBeanExporter exporter,
SplitManager splitManager,
ConnectorPlanOptimizerManager planOptimizerManager,
Expand Down Expand Up @@ -813,7 +813,7 @@ public PlanOptimizers(
costCalculator,
ImmutableSet.of(new ScaledWriterRule())));

if (!forceSingleNode) {
if (!noExchange) {
builder.add(new ReplicateSemiJoinInDelete()); // Must run before AddExchanges
builder.add(new IterativeOptimizer(
metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public PlanChecker(FeaturesConfig featuresConfig, PlanCheckerProviderManager pla
this(featuresConfig, false, planCheckerProviderManager);
}

public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode, PlanCheckerProviderManager planCheckerProviderManager)
public PlanChecker(FeaturesConfig featuresConfig, boolean noExchange, PlanCheckerProviderManager planCheckerProviderManager)
{
this.planCheckerProviderManager = requireNonNull(planCheckerProviderManager, "planCheckerProviderManager is null");
ImmutableListMultimap.Builder<Stage, Checker> builder = ImmutableListMultimap.builder();
Expand All @@ -69,7 +69,7 @@ public PlanChecker(FeaturesConfig featuresConfig, boolean forceSingleNode, PlanC
new TypeValidator(),
new VerifyOnlyOneOutputNode(),
new VerifyNoFilteredAggregations(),
new ValidateAggregationsWithDefaultValues(forceSingleNode),
new ValidateAggregationsWithDefaultValues(noExchange),
new ValidateStreamingAggregations(),
new VerifyNoIntermediateFormExpression(),
new VerifyProjectionLocality(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@
public class ValidateAggregationsWithDefaultValues
implements Checker
{
private final boolean forceSingleNode;
private final boolean noExchange;

public ValidateAggregationsWithDefaultValues(boolean forceSingleNode)
public ValidateAggregationsWithDefaultValues(boolean noExchange)
{
this.forceSingleNode = forceSingleNode;
this.noExchange = noExchange;
}

@Override
Expand Down Expand Up @@ -114,7 +114,7 @@ public Optional<SeenExchanges> visitAggregation(AggregationNode node, Void conte
// No remote repartition exchange between final and partial aggregation.
// Make sure that final aggregation operators are executed on a single node.
ActualProperties globalProperties = PropertyDerivations.derivePropertiesRecursively(node, metadata, session);
checkArgument(forceSingleNode || globalProperties.isSingleNode(),
checkArgument(noExchange || globalProperties.isSingleNode(),
"Final aggregation with default value not separated from partial aggregation by remote hash exchange");

if (!seenExchanges.localRepartitionExchange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1062,12 +1062,12 @@ private static SplitSchedulingStrategy getSplitSchedulingStrategy(StageExecution
return UNGROUPED_SCHEDULING;
}

public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNode)
public SubPlan createSubPlans(Session session, Plan plan, boolean noExchange)
{
return planFragmenter.createSubPlans(
session,
plan,
forceSingleNode,
noExchange,
new PlanNodeIdAllocator()
{
@Override
Expand All @@ -1090,21 +1090,21 @@ public Plan createPlan(Session session, @Language("SQL") String sql, Optimizer.P
return createPlan(session, sql, stage, true, warningCollector);
}

public Plan createPlan(Session session, @Language("SQL") String sql, Optimizer.PlanStage stage, boolean forceSingleNode, WarningCollector warningCollector)
public Plan createPlan(Session session, @Language("SQL") String sql, Optimizer.PlanStage stage, boolean noExchange, WarningCollector warningCollector)
{
AnalyzerOptions analyzerOptions = createAnalyzerOptions(session, warningCollector);
BuiltInPreparedQuery preparedQuery = new BuiltInQueryPreparer(sqlParser).prepareQuery(analyzerOptions, sql, session.getPreparedStatements(), warningCollector);
assertFormattedSql(sqlParser, createParsingOptions(session), preparedQuery.getStatement());

return createPlan(session, sql, getPlanOptimizers(forceSingleNode), stage, warningCollector);
return createPlan(session, sql, getPlanOptimizers(noExchange), stage, warningCollector);
}

public void setAdditionalOptimizer(List<PlanOptimizer> additionalOptimizer)
{
this.additionalOptimizer = additionalOptimizer;
}

public List<PlanOptimizer> getPlanOptimizers(boolean forceSingleNode)
public List<PlanOptimizer> getPlanOptimizers(boolean noExchange)
{
FeaturesConfig featuresConfig = new FeaturesConfig()
.setDistributedIndexJoinsEnabled(false)
Expand All @@ -1116,7 +1116,7 @@ public List<PlanOptimizer> getPlanOptimizers(boolean forceSingleNode)
planOptimizers.addAll(new PlanOptimizers(
metadata,
sqlParser,
forceSingleNode,
noExchange,
new MBeanExporter(new TestingMBeanServer()),
splitManager,
planOptimizerManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ protected void assertPlan(String sql, Session session, PlanMatchPattern pattern)
assertPlan(sql, session, Optimizer.PlanStage.OPTIMIZED_AND_VALIDATED, pattern, queryRunner.getPlanOptimizers(true));
}

protected void assertPlan(String sql, Session session, PlanMatchPattern pattern, boolean forceSingleNode)
protected void assertPlan(String sql, Session session, PlanMatchPattern pattern, boolean noExchange)
{
assertPlan(sql, session, Optimizer.PlanStage.OPTIMIZED_AND_VALIDATED, pattern, queryRunner.getPlanOptimizers(forceSingleNode));
assertPlan(sql, session, Optimizer.PlanStage.OPTIMIZED_AND_VALIDATED, pattern, queryRunner.getPlanOptimizers(noExchange));
}

protected void assertPlan(String sql, Optimizer.PlanStage stage, PlanMatchPattern pattern)
Expand Down Expand Up @@ -248,29 +248,29 @@ protected void assertMinimallyOptimizedPlanDoesNotMatch(@Language("SQL") String
assertPlanDoesNotMatch(sql, queryRunner.getDefaultSession(), Optimizer.PlanStage.OPTIMIZED, pattern, optimizers);
}

protected void assertPlanWithSession(@Language("SQL") String sql, Session session, boolean forceSingleNode, PlanMatchPattern pattern)
protected void assertPlanWithSession(@Language("SQL") String sql, Session session, boolean noExchange, PlanMatchPattern pattern)
{
queryRunner.inTransaction(session, transactionSession -> {
Plan actualPlan = queryRunner.createPlan(transactionSession, sql, Optimizer.PlanStage.OPTIMIZED_AND_VALIDATED, forceSingleNode, WarningCollector.NOOP);
Plan actualPlan = queryRunner.createPlan(transactionSession, sql, Optimizer.PlanStage.OPTIMIZED_AND_VALIDATED, noExchange, WarningCollector.NOOP);
PlanAssert.assertPlan(transactionSession, queryRunner.getMetadata(), queryRunner.getStatsCalculator(), actualPlan, pattern);
return null;
});
}

protected void assertPlanWithSession(@Language("SQL") String sql, Session session, boolean forceSingleNode, PlanMatchPattern pattern, Consumer<Plan> planValidator)
protected void assertPlanWithSession(@Language("SQL") String sql, Session session, boolean noExchange, PlanMatchPattern pattern, Consumer<Plan> planValidator)
{
queryRunner.inTransaction(session, transactionSession -> {
Plan actualPlan = queryRunner.createPlan(transactionSession, sql, Optimizer.PlanStage.OPTIMIZED_AND_VALIDATED, forceSingleNode, WarningCollector.NOOP);
Plan actualPlan = queryRunner.createPlan(transactionSession, sql, Optimizer.PlanStage.OPTIMIZED_AND_VALIDATED, noExchange, WarningCollector.NOOP);
PlanAssert.assertPlan(transactionSession, queryRunner.getMetadata(), queryRunner.getStatsCalculator(), actualPlan, pattern);
planValidator.accept(actualPlan);
return null;
});
}

protected void assertPlanValidatorWithSession(@Language("SQL") String sql, Session session, boolean forceSingleNode, Consumer<Plan> planValidator)
protected void assertPlanValidatorWithSession(@Language("SQL") String sql, Session session, boolean noExchange, Consumer<Plan> planValidator)
{
queryRunner.inTransaction(session, transactionSession -> {
Plan actualPlan = queryRunner.createPlan(transactionSession, sql, Optimizer.PlanStage.OPTIMIZED_AND_VALIDATED, forceSingleNode, WarningCollector.NOOP);
Plan actualPlan = queryRunner.createPlan(transactionSession, sql, Optimizer.PlanStage.OPTIMIZED_AND_VALIDATED, noExchange, WarningCollector.NOOP);
planValidator.accept(actualPlan);
return null;
});
Expand Down Expand Up @@ -309,15 +309,15 @@ protected Plan plan(String sql, Optimizer.PlanStage stage)
return plan(sql, stage, true);
}

protected Plan plan(String sql, Optimizer.PlanStage stage, boolean forceSingleNode)
protected Plan plan(String sql, Optimizer.PlanStage stage, boolean noExchange)
{
return plan(queryRunner.getDefaultSession(), sql, stage, forceSingleNode);
return plan(queryRunner.getDefaultSession(), sql, stage, noExchange);
}

protected Plan plan(Session session, String sql, Optimizer.PlanStage stage, boolean forceSingleNode)
protected Plan plan(Session session, String sql, Optimizer.PlanStage stage, boolean noExchange)
{
try {
return queryRunner.inTransaction(session, transactionSession -> queryRunner.createPlan(transactionSession, sql, stage, forceSingleNode, WarningCollector.NOOP));
return queryRunner.inTransaction(session, transactionSession -> queryRunner.createPlan(transactionSession, sql, stage, noExchange, WarningCollector.NOOP));
}
catch (RuntimeException e) {
throw new AssertionError("Planning failed for SQL: " + sql, e);
Expand All @@ -329,27 +329,27 @@ protected Plan plan(String sql, Optimizer.PlanStage stage, Session session)
return plan(sql, stage, true, session);
}

protected Plan plan(String sql, Optimizer.PlanStage stage, boolean forceSingleNode, Session session)
protected Plan plan(String sql, Optimizer.PlanStage stage, boolean noExchange, Session session)
{
try {
return queryRunner.inTransaction(session, transactionSession -> queryRunner.createPlan(transactionSession, sql, stage, forceSingleNode, WarningCollector.NOOP));
return queryRunner.inTransaction(session, transactionSession -> queryRunner.createPlan(transactionSession, sql, stage, noExchange, WarningCollector.NOOP));
}
catch (RuntimeException e) {
throw new AssertionError("Planning failed for SQL: " + sql, e);
}
}

protected SubPlan subplan(String sql, Optimizer.PlanStage stage, boolean forceSingleNode)
protected SubPlan subplan(String sql, Optimizer.PlanStage stage, boolean noExchange)
{
return subplan(sql, stage, forceSingleNode, getQueryRunner().getDefaultSession());
return subplan(sql, stage, noExchange, getQueryRunner().getDefaultSession());
}

protected SubPlan subplan(String sql, Optimizer.PlanStage stage, boolean forceSingleNode, Session session)
protected SubPlan subplan(String sql, Optimizer.PlanStage stage, boolean noExchange, Session session)
{
try {
return queryRunner.inTransaction(session, transactionSession -> {
Plan plan = queryRunner.createPlan(transactionSession, sql, stage, forceSingleNode, WarningCollector.NOOP);
return queryRunner.createSubPlans(transactionSession, plan, forceSingleNode);
Plan plan = queryRunner.createPlan(transactionSession, sql, stage, noExchange, WarningCollector.NOOP);
return queryRunner.createSubPlans(transactionSession, plan, noExchange);
});
}
catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ public void testWithPartialAggregationBelowJoinWithoutSeparatingExchange()
validatePlan(root, true);
}

private void validatePlan(PlanNode root, boolean forceSingleNode)
private void validatePlan(PlanNode root, boolean noExchange)
{
getQueryRunner().inTransaction(session -> {
// metadata.getCatalogHandle() registers the catalog for the transaction
session.getCatalog().ifPresent(catalog -> metadata.getCatalogHandle(session, catalog));
new ValidateAggregationsWithDefaultValues(forceSingleNode).validate(root, session, metadata, WarningCollector.NOOP);
new ValidateAggregationsWithDefaultValues(noExchange).validate(root, session, metadata, WarningCollector.NOOP);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,21 +226,21 @@ public PrestoSparkAdaptiveQueryExecution(

private IterativePlanFragmenter createIterativePlanFragmenter(PlanCheckerProviderManager planCheckerProviderManager)
{
boolean forceSingleNode = false;
boolean noExchange = false;
Function<PlanFragmentId, Boolean> isFragmentFinished = this.executedFragments::contains;

// TODO Create the IterativePlanFragmenter by injection (it has to become stateless first--check PR 18811).
return new IterativePlanFragmenter(
this.planAndMore.getPlan(),
isFragmentFinished,
this.metadata,
new PlanChecker(this.featuresConfig, forceSingleNode, planCheckerProviderManager),
new PlanChecker(this.featuresConfig, noExchange, planCheckerProviderManager),
this.idAllocator,
new PrestoSparkNodePartitioningManager(this.partitioningProviderManager),
this.queryManagerConfig,
this.session,
this.warningCollector,
forceSingleNode);
noExchange);
}

@Override
Expand Down
Loading

0 comments on commit e535f7c

Please sign in to comment.