Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve cte scheduling #22205

Merged
merged 2 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ protected QueryRunner createQueryRunner()
Optional.empty());
}

@Test
public void testCteExecutionWhereOneCteRemovedBySimplifyEmptyInputRule()
{
String sql = "WITH t as(select orderkey, count(*) as count from (select orderkey from orders where false) group by orderkey)," +
"t1 as (SELECT * FROM orders)," +
" b AS ((SELECT orderkey FROM t) UNION (SELECT orderkey FROM t1)) " +
"SELECT * FROM b";
QueryRunner queryRunner = getQueryRunner();
compareResults(queryRunner.execute(getMaterializedSession(),
sql),
queryRunner.execute(getSession(),
sql));
}

@Test
public void testSimplePersistentCte()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.facebook.presto.spi.plan.OutputNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.UnionNode;
import com.facebook.presto.spi.plan.ValuesNode;
Expand All @@ -38,6 +37,7 @@
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.SampleNode;
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.SortNode;
import com.facebook.presto.sql.planner.plan.SpatialJoinNode;
import com.google.common.collect.ImmutableList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
import com.facebook.presto.spi.plan.IntersectNode;
import com.facebook.presto.spi.plan.JoinDistributionType;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.spi.plan.UnionNode;
import com.facebook.presto.sql.planner.iterative.GroupReference;
import com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType;
import com.facebook.presto.sql.planner.iterative.rule.ReorderJoins;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.SpatialJoinNode;

import javax.annotation.concurrent.ThreadSafe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

import com.facebook.presto.Session;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.iterative.Lookup;
import com.facebook.presto.sql.planner.plan.SequenceNode;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.ValuesNode;
import com.facebook.presto.spi.relation.RowExpression;
Expand All @@ -41,6 +40,7 @@
import com.facebook.presto.sql.planner.plan.MetadataDeleteNode;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.facebook.presto.sql.planner.plan.StatisticsWriterNode;
import com.facebook.presto.sql.planner.plan.TableFinishNode;
Expand Down Expand Up @@ -204,25 +204,33 @@ public PlanNode visitTableFinish(TableFinishNode node, RewriteContext<FragmentPr
@Override
public PlanNode visitSequence(SequenceNode node, RewriteContext<FragmentProperties> context)
{
// Since this is topologically sorted by the LogicalCtePlanner, need to make sure that execution order follows
// Can be optimized further to avoid non dependents from getting blocked
int cteProducerCount = node.getCteProducers().size();
checkArgument(cteProducerCount >= 1, "Sequence Node has 0 CTE producers");
PlanNode source = node.getCteProducers().get(cteProducerCount - 1);
FragmentProperties childProperties = new FragmentProperties(new PartitioningScheme(
Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()),
source.getOutputVariables()));
SubPlan lastSubPlan = buildSubPlan(source, childProperties, context);

for (int sourceIndex = cteProducerCount - 2; sourceIndex >= 0; sourceIndex--) {
source = node.getCteProducers().get(sourceIndex);
childProperties = new FragmentProperties(new PartitioningScheme(
// To ensure that the execution order is maintained, we use an independent dependency graph.
// This approach creates subgraphs sequentially, enhancing control over the execution flow. However, there are optimization opportunities:
// 1. Can consider blocking only the CTEConsumer stages that are in a reading state.
// This approach sounds good on paper may not be ideal as it can block the entire query, leading to resource wastage since no progress can be made until the writing operations are complete.
// 2. ToDo: Another improvement will be to schedule the execution of subgraphs based on their order in the overall execution plan instead of a topological sorting done here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general question again, but still trying to understand the basics. can you draw out what a fragmented plan with dependent ctes would look like before and after this change?

Copy link
Member Author

@jaystarshot jaystarshot Apr 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, lets take an example case of Graph(a, b) or sql
WITH t as (SELECT * FROM orders), t1 as (Select * FROM customer) SELECT * FROM t JOIN t1 ON true

Lets say that the Plan would be the below for simplicity.

...
     Exchange
        Sequence:
                           cteProducer1
                           cteProducer2
       PrimarySource:
                      Exchange
                                  Join 
                                     ....

Lets say the primary source creates subplan (subPrimary1->subPrimary2)
and creates a list of subplans for the cteProducers as (subCteProducerT1 and subCteProducerT2)
The code is a bit tricker here but in the visitSequence we add the the cteproducers to the context created by the parent exchange. (here is actually where the parent exchange creates new properties or context)

so before the subplans look like

SubParent -> subPrimary1 -> subPrimary2
          -> subCteProducer1 -> subCteProducer2

code is where we were adding the subCteProducer2 as a child of subCteProducer1

Now we would add the subCteProducer2 to the same level as the subCteProducer1

SubParent ->  subPrimary1 -> subPrimary2
           -> subCteProducer1 
           -> subCteProducer2

We can look at the subCteProducer1 and subCteProducer2 as independent subgraphs for the general case of independent subgraphs
In both cases similar structure would be represented in the sectionedPlan

// but that needs change to plan section framework for it to be able to handle the same child planSection.
List<List<PlanNode>> independentCteProducerSubgraphs = node.getIndependentCteProducers();
for (List<PlanNode> cteProducerSubgraph : independentCteProducerSubgraphs) {
int cteProducerCount = cteProducerSubgraph.size();
checkArgument(cteProducerCount >= 1, "CteProducer subgraph has 0 CTE producers");
PlanNode source = cteProducerSubgraph.get(cteProducerCount - 1);
FragmentProperties childProperties = new FragmentProperties(new PartitioningScheme(
Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()),
source.getOutputVariables()));
childProperties.addChildren(ImmutableList.of(lastSubPlan));
lastSubPlan = buildSubPlan(source, childProperties, context);
SubPlan lastSubPlan = buildSubPlan(source, childProperties, context);
for (int sourceIndex = cteProducerCount - 2; sourceIndex >= 0; sourceIndex--) {
source = cteProducerSubgraph.get(sourceIndex);
childProperties = new FragmentProperties(new PartitioningScheme(
Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()),
source.getOutputVariables()));
childProperties.addChildren(ImmutableList.of(lastSubPlan));
lastSubPlan = buildSubPlan(source, childProperties, context);
}
// This makes sure that the sectionedPlans generated in com.facebook.presto.execution.scheduler.StreamingPlanSection
// are independent and thus could be scheduled concurrently
context.get().addChildren(ImmutableList.of(lastSubPlan));
}
context.get().addChildren(ImmutableList.of(lastSubPlan));
return node.getPrimarySource().accept(this, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.TableWriterNode;
import com.google.common.base.VerifyException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TopNNode;
import com.facebook.presto.spi.plan.UnionNode;
Expand Down Expand Up @@ -68,6 +67,7 @@
import com.facebook.presto.sql.planner.plan.LateralJoinNode;
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.SortNode;
import com.facebook.presto.sql.planner.plan.SpatialJoinNode;
import com.facebook.presto.sql.planner.plan.StatisticsWriterNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.SpecialFormExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
Expand All @@ -32,6 +31,7 @@
import com.facebook.presto.sql.planner.SimplePlanVisitor;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.VariablesExtractor;
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.UnionNode;
import com.facebook.presto.spi.relation.CallExpression;
Expand All @@ -45,6 +44,7 @@
import com.facebook.presto.sql.planner.plan.MergeJoinNode;
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.SpatialJoinNode;
import com.facebook.presto.sql.planner.plan.TopNRowNumberNode;
import com.facebook.presto.sql.planner.plan.UnnestNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@
import com.facebook.presto.spi.plan.CteReferenceNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.sql.planner.SimplePlanVisitor;
import com.facebook.presto.sql.planner.TypeProvider;
import com.facebook.presto.sql.planner.plan.ApplyNode;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.graph.Graph;
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
import com.google.common.graph.Traverser;
Expand Down Expand Up @@ -135,8 +136,11 @@ public PlanNode transformPersistentCtes(Session session, PlanNode root)
return transformedCte;
}
isPlanRewritten = true;
SequenceNode sequenceNode = new SequenceNode(root.getSourceLocation(), planNodeIdAllocator.getNextId(), topologicalOrderedList,
transformedCte.getSources().get(0));
SequenceNode sequenceNode = new SequenceNode(root.getSourceLocation(),
planNodeIdAllocator.getNextId(),
topologicalOrderedList,
transformedCte.getSources().get(0),
context.createIndexedGraphFromTopologicallySortedCteProducers(topologicalOrderedList));
return root.replaceChildren(Arrays.asList(sequenceNode));
}

Expand Down Expand Up @@ -271,23 +275,25 @@ public PlanNode visitApply(ApplyNode node, RewriteContext<LogicalCteOptimizerCon
node.getCorrelation(),
node.getOriginSubqueryError(),
node.getMayParticipateInAntiJoin());
}}
}
}

public static class LogicalCteOptimizerContext
{
public Map<String, CteProducerNode> cteProducerMap;

// a -> b indicates that b needs to be processed before a
MutableGraph<String> graph;
public Stack<String> activeCteStack;
// a -> b indicates that a needs to be processed before b
private MutableGraph<String> cteDependencyGraph;

private Stack<String> activeCteStack;

public Set<String> complexCtes;
private Set<String> complexCtes;

public LogicalCteOptimizerContext()
{
cteProducerMap = new HashMap<>();
// The cte graph will never have cycles because sql won't allow it
graph = GraphBuilder.directed().build();
cteDependencyGraph = GraphBuilder.directed().build();
activeCteStack = new Stack<>();
complexCtes = new HashSet<>();
}
Expand Down Expand Up @@ -319,9 +325,10 @@ public Optional<String> peekActiveCte()

public void addDependency(String currentCte)
{
graph.addNode(currentCte);
cteDependencyGraph.addNode(currentCte);
Optional<String> parentCte = peekActiveCte();
parentCte.ifPresent(s -> graph.putEdge(currentCte, s));
// (current -> parentCte) this indicates that currentCte must be processed first
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed wrong comments about the dependency graph and added explanations

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could reverse the graph to be according to the comments and then add a reverse in topological ordering, but then i would have to update some tests (since topological ordering is not unique). Its easier to update the comments

parentCte.ifPresent(s -> cteDependencyGraph.putEdge(currentCte, s));
}

public void addComplexCte(String cteId)
Expand All @@ -342,9 +349,29 @@ public boolean isComplexCte(String cteId)
public List<PlanNode> getTopologicalOrdering()
{
ImmutableList.Builder<PlanNode> topSortedCteProducerListBuilder = ImmutableList.builder();
Traverser.forGraph(graph).depthFirstPostOrder(graph.nodes())
Traverser.forGraph(cteDependencyGraph).depthFirstPostOrder(cteDependencyGraph.nodes())
.forEach(cteName -> topSortedCteProducerListBuilder.add(cteProducerMap.get(cteName)));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are not reversing the list and hence the topological ordering is correct, if we reverse the graph, we should reverse the list here and in SequenceNode

return topSortedCteProducerListBuilder.build();
}

public Graph<Integer> createIndexedGraphFromTopologicallySortedCteProducers(List<PlanNode> topologicalSortedCteProducerList)
{
Map<String, Integer> cteIdToProducerIndexMap = new HashMap<>();
MutableGraph<Integer> indexGraph = GraphBuilder
.directed()
.expectedNodeCount(topologicalSortedCteProducerList.size())
.build();
for (int i = 0; i < topologicalSortedCteProducerList.size(); i++) {
cteIdToProducerIndexMap.put(((CteProducerNode) topologicalSortedCteProducerList.get(i)).getCteId(), i);
indexGraph.addNode(i);
}

// Populate the new graph with edges based on the index mapping
for (String cteId : cteDependencyGraph.nodes()) {
cteDependencyGraph.successors(cteId).forEach(successor ->
indexGraph.putEdge(cteIdToProducerIndexMap.get(cteId), cteIdToProducerIndexMap.get(successor)));
}
return indexGraph;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.facebook.presto.spi.plan.OutputNode;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TopNNode;
import com.facebook.presto.spi.plan.ValuesNode;
Expand Down Expand Up @@ -63,6 +62,7 @@
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.SampleNode;
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.SortNode;
import com.facebook.presto.sql.planner.plan.SpatialJoinNode;
import com.facebook.presto.sql.planner.plan.StatisticsWriterNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.plan.SequenceNode;
import com.facebook.presto.spi.plan.SetOperationNode;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.TopNNode;
Expand All @@ -56,6 +55,7 @@
import com.facebook.presto.sql.planner.plan.LateralJoinNode;
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
import com.facebook.presto.sql.planner.plan.SequenceNode;
import com.facebook.presto.sql.planner.plan.SimplePlanRewriter;
import com.facebook.presto.sql.planner.plan.SortNode;
import com.facebook.presto.sql.planner.plan.SpatialJoinNode;
Expand Down Expand Up @@ -532,7 +532,7 @@ public PlanNode visitSequence(SequenceNode node, RewriteContext<Set<VariableRefe
.map(leftSource -> context.rewrite(leftSource, leftInputs)).collect(toImmutableList());
Set<VariableReferenceExpression> rightInputs = ImmutableSet.copyOf(node.getPrimarySource().getOutputVariables());
PlanNode primarySource = context.rewrite(node.getPrimarySource(), rightInputs);
return new SequenceNode(node.getSourceLocation(), node.getId(), cteProducers, primarySource);
return new SequenceNode(node.getSourceLocation(), node.getId(), cteProducers, primarySource, node.getCteDependencyGraph());
}

@Override
Expand Down
Loading
Loading