-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(execute): add method to parallelize aggregate transformations #4851
Conversation
1ee1fb6
to
bc32180
Compare
execute/aggregate.go
Outdated
done, err := t.mergeState(key, value) | ||
if err != nil || !done { | ||
return err | ||
} | ||
|
||
// We are done. Retrieve the state. | ||
var state interface{} | ||
if merged, ok := t.d.Delete(key); ok { | ||
state = merged.(*aggregateParallelState).state | ||
} | ||
return t.computeFor(key, state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit; This looks almost identical to a section in flushKey
. Maybe factor out to a function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original version of this didn't have these do the same thing. I've refactored this so mergeState
can call computeFor
so the done
parameter isn't needed anymore.
tr, _, err := execute.NewAggregateParallelTransformation( | ||
executetest.RandomDatasetID(), | ||
parents, | ||
&mock.AggregateParallelTransformation{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This mock.AggregateParallelTransformation{
seems to be the same in all(?) the added tests, refactor into something reusable?
parents map[DatasetID]*RandomAccessGroupLookup | ||
finished int | ||
err error | ||
mu sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to make a separate parallelAggregateTransformation
? The mixing of parallel and sequential concerns in the methods here makes it more difficult to read the code than necessary (IMO)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I think it's a bit cleaner now.
6744a09
to
1a7c988
Compare
Aggregate transformations can now be given multiple parents and will work in parallel. To define an aggregate transformation as parallelizable, a `Merge()` method must be implemented on the transformation. This can only happen if a planner rule modifies the plan so that a node which normally has one parent will have multiple parents instead. This allows aggregate transformations that implement this interface to act as merge nodes.
1a7c988
to
18ef575
Compare
Aggregate transformations can now be given multiple parents and will
work in parallel. To define an aggregate transformation as
parallelizable, a
Merge()
method must be implemented on thetransformation.
This can only happen if a planner rule modifies the plan so that a node
which normally has one parent will have multiple parents instead. This
allows aggregate transformations that implement this interface to act as
merge nodes.
Done checklist