Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve cte scheduling #22205

Merged
merged 2 commits into from
Apr 5, 2024
Merged

Conversation

jaystarshot
Copy link
Member

@jaystarshot jaystarshot commented Mar 14, 2024

Description

Fixes #21639

Previously each materialized cte was scheduled one by one for safe dependent cte handling. This PR reduces latency improves cte scheduling by scheduling multiple subgraphs of dependent ctes independently.
This PR has these major changes

Commit 1 - link

  1. Adding dependency graph in SequenceNode
  2. Using the dependency graph to schedule subgraphs in basePlanFragmenter
  3. Updating optimizers especially SimplifyPlanWithEmptyInput.java to properly process this depenent graph

Commit 2 link

  1. Moving SequenceNode to presto-main since the graph dependency is not available in presto-spi

Motivation and Context

Impact

Test Plan

Tested with prod queries and confirmed from the splits page that the behavior was as expected for independent ctes

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* Improved latency of materialized ctes by scheduling multiple dependent subgraphs independently.


@jaystarshot
Copy link
Member Author

jaystarshot commented Mar 15, 2024

This works, but going for the ultimate latency where only the cte consumers would be blocked till producers complete.

Did not do this since it needs a lot of additional refactor in the scheduling layer to handle + this won't be that effective since the query will be blocked till the cte is written wasting resources which is not always ideal.

@jaystarshot jaystarshot changed the title Improve Independent cte scheduling Improve cte scheduling Mar 15, 2024
@jaystarshot jaystarshot force-pushed the oss-cte-scheduling branch 7 times, most recently from a491dca to 6115223 Compare April 3, 2024 05:51
@jaystarshot jaystarshot marked this pull request as ready for review April 3, 2024 05:57
@jaystarshot jaystarshot requested a review from a team as a code owner April 3, 2024 05:57
@jaystarshot jaystarshot force-pushed the oss-cte-scheduling branch 3 times, most recently from c0b5982 to edd771f Compare April 3, 2024 06:38
@rschlussel
Copy link
Contributor

I'm having trouble understanding why we need to keep track of the whole cte graph vs. just the direct dependencies of the sequence node.
I think an example of the plan for dependent ctes would help me. e.g. what would the plan look like after logical optimization for a query like
with cte_1 as (select * from table), cte2 as (SELECT * from cte1 where a > b), SELECT * From cte1 join cte2 on cte1.x = cte2.x?

(also, for the commits - the commit that moves sequence node out of spi should be first since the other one depends on it. And also the other commit, take out the uber specific references and maybe add more information to the body).

@jaystarshot
Copy link
Member Author

jaystarshot commented Apr 3, 2024

After logical planning, the sequence node stores the cte producers in a topologically sorted list.
In your example, the graph would be cte2->cte1 (since write of cte1 must be executed before write of cte2).
So the plan would be

Sequence 
   CteProducers:     1. cte2
                     2. cte1
   PrimarySource: (SELECT * FROM cte2 JOIN cte1 ON..)

We need to keep a track of the whole graph since in this example cte1 must be executed before cte2 since cte1 is a dependency of cte2

Also consider cases like Graph (cte3->cte4, cte2->cte1, cte5). Here the three subgraphs are independent and can be scheduled independently. same for (cte1, cte2, cte3) where all are independent.

Moved Sequence node to presto main since a dependency of com.google.common.graph was needed which is not available in spi and previous disussions have suggested not adding that dependency to spi
@jaystarshot jaystarshot force-pushed the oss-cte-scheduling branch 2 times, most recently from 09b13cc to 1367d3c Compare April 3, 2024 17:39
Copy link

github-actions bot commented Apr 3, 2024

Codenotify: Notifying subscribers in CODENOTIFY files for diff c5674d6...ca02943.

No notifications.

@rschlussel
Copy link
Contributor

rschlussel commented Apr 3, 2024

After logical planning, the sequence node stores the cte producers in a topologically sorted list. In your example, the graph would be cte2->cte1 (since write of cte1 must be executed before write of cte2). So the plan would be

Sequence 
   CteProducers:     1. cte2
                     2. cte1
   PrimarySource: (SELECT * FROM cte2 JOIN cte1 ON..)

We need to keep a track of the whole graph since in this example cte1 must be executed before cte2 since cte1 is a dependency of cte2

Also consider cases like Graph (cte3->cte4, cte2->cte1, cte5). Here the three subgraphs are independent and can be scheduled independently. same for (cte1, cte2, cte3) where all are independent.

But does cte2 have its own sequence node where it depends on cte1? Something like below:

Sequence 
   CteProducers:     
                                1. sequence (cte2)
                                        CteProducers:
                                             1. cte1
                                            PrimarySource: `SELECT * from cte1 where a < b`
                                2. cte1
                                  
   PrimarySource: (SELECT * FROM cte2 JOIN cte1 ON..)I'm basically trying 

I'm basically trying to understand why the graph needs to be specified explicitly vs. derived from the plan structure.

@jaystarshot
Copy link
Member Author

Ahh I see, nope currently there is only one sequence node per query and it is responsible for maintaining all these dependencies. Reasoning behind this was ease of other optimizations and cases where its difficult to represent in the plan structure since there is a common node eg: graph (a->b, c->b) and graph(a->b, a->c)

// This approach creates subgraphs sequentially, enhancing control over the execution flow. However, there are optimization opportunities:
// 1. Can consider blocking only the CTEConsumer stages that are in a reading state.
// This approach sounds good on paper may not be ideal as it can block the entire query, leading to resource wastage since no progress can be made until the writing operations are complete.
// 2. ToDo: Another improvement will be to schedule the execution of subgraphs based on their order in the overall execution plan instead of a topological sorting done here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general question again, but still trying to understand the basics. can you draw out what a fragmented plan with dependent ctes would look like before and after this change?

Copy link
Member Author

@jaystarshot jaystarshot Apr 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, lets take an example case of Graph(a, b) or sql
WITH t as (SELECT * FROM orders), t1 as (Select * FROM customer) SELECT * FROM t JOIN t1 ON true

Lets say that the Plan would be the below for simplicity.

...
     Exchange
        Sequence:
                           cteProducer1
                           cteProducer2
       PrimarySource:
                      Exchange
                                  Join 
                                     ....

Lets say the primary source creates subplan (subPrimary1->subPrimary2)
and creates a list of subplans for the cteProducers as (subCteProducerT1 and subCteProducerT2)
The code is a bit tricker here but in the visitSequence we add the the cteproducers to the context created by the parent exchange. (here is actually where the parent exchange creates new properties or context)

so before the subplans look like

SubParent -> subPrimary1 -> subPrimary2
          -> subCteProducer1 -> subCteProducer2

code is where we were adding the subCteProducer2 as a child of subCteProducer1

Now we would add the subCteProducer2 to the same level as the subCteProducer1

SubParent ->  subPrimary1 -> subPrimary2
           -> subCteProducer1 
           -> subCteProducer2

We can look at the subCteProducer1 and subCteProducer2 as independent subgraphs for the general case of independent subgraphs
In both cases similar structure would be represented in the sectionedPlan

Optional<String> parentCte = peekActiveCte();
parentCte.ifPresent(s -> graph.putEdge(currentCte, s));
// (current -> parentCte) this indicates that currentCte must be processed first
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed wrong comments about the dependency graph and added explanations

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could reverse the graph to be according to the comments and then add a reverse in topological ordering, but then i would have to update some tests (since topological ordering is not unique). Its easier to update the comments

@@ -342,9 +349,29 @@ public boolean isComplexCte(String cteId)
public List<PlanNode> getTopologicalOrdering()
{
ImmutableList.Builder<PlanNode> topSortedCteProducerListBuilder = ImmutableList.builder();
Traverser.forGraph(graph).depthFirstPostOrder(graph.nodes())
Traverser.forGraph(cteDependencyGraph).depthFirstPostOrder(cteDependencyGraph.nodes())
.forEach(cteName -> topSortedCteProducerListBuilder.add(cteProducerMap.get(cteName)));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are not reversing the list and hence the topological ordering is correct, if we reverse the graph, we should reverse the list here and in SequenceNode

rschlussel
rschlussel previously approved these changes Apr 5, 2024
@@ -531,7 +532,7 @@ public void testSequence()
Optional.empty(),
new PlanNodeId("sequence"),
ImmutableList.of(cteProducerNode1, cteProducerNode2),
joinNode);
joinNode, GraphBuilder.directed().build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put this on its own line

This improves latency of cte materialized queries by scheduling subgraphs independently
@jaystarshot jaystarshot merged commit 5e0825b into prestodb:master Apr 5, 2024
55 of 56 checks passed
@wanglinsong wanglinsong mentioned this pull request May 1, 2024
48 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve scheduling of independent ctes in CTE Materialization
2 participants