-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22389][SQL] data source v2 partitioning reporting interface #20201
Conversation
Test build #85852 has finished for PR 20201 at commit
|
This looks very exciting to me |
be14e3b
to
ff5b650
Compare
Test build #86174 has started for PR 20201 at commit |
ff5b650
to
713140a
Compare
Test build #86177 has started for PR 20201 at commit |
retest this please |
Test build #86248 has finished for PR 20201 at commit
|
retest this please |
Test build #86255 has finished for PR 20201 at commit
|
case e: ShuffleExchangeExec => e | ||
}.isEmpty) | ||
|
||
val groupByColAB = df.groupBy('a, 'b).agg(count("*")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try df.groupBy('a + 'b).agg(count("*")).show()
At least, it should not fail, even if we do not support complex ClusteredDistribution
expressions
*/ | ||
@InterfaceStability.Evolving | ||
public class ClusteredDistribution implements Distribution { | ||
public String[] clusteredColumns; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to emphasize these columns are order insensitive.
import org.apache.spark.sql.sources.v2.reader.{ClusteredDistribution, Partitioning} | ||
|
||
/** | ||
* An adapter from public data source partitioning to catalyst internal partitioning. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partitioning
* the data ordering inside one partition(the output records of a single {@link ReadTask}). | ||
* | ||
* The instance of this interface is created and provided by Spark, then consumed by | ||
* {@link Partitioning#satisfy(Distribution)}. This means users don't need to implement |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
users
-> data source developers
import org.apache.spark.annotation.InterfaceStability; | ||
|
||
/** | ||
* An interface to represent output data partitioning for a data source, which is returned by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
output
-> the output
/** | ||
* An interface to represent output data partitioning for a data source, which is returned by | ||
* {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work like a | ||
* snapshot, once created, it should be deterministic and always report same number of partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
, once
-> . Once
713140a
to
28987a7
Compare
Test build #86483 has finished for PR 20201 at commit
|
* An interface to represent the output data partitioning for a data source, which is returned by | ||
* {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work like a | ||
* snapshot. Once created, it should be deterministic and always report same number of partitions | ||
* and same "satisfy" result for a certain distribution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same number
-> the same number
and same "satisfy" result
-> the same "satisfy" result
public interface Partitioning { | ||
|
||
/** | ||
* Returns the number of partitions/{@link ReadTask}s the data source outputs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returns the number of partitions/(i.e., {@link ReadTask}s) that the data source outputs.
* recommended to check every Spark new release and support new distributions if possible, to | ||
* avoid shuffle at Spark side for more cases. | ||
*/ | ||
boolean satisfy(Distribution d); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
d
-> distribution
} | ||
|
||
@Override | ||
public boolean satisfy(Distribution d) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
LGTM except a few minor comments. |
Test build #86489 has finished for PR 20201 at commit
|
Thanks! Merged to master/2.3 |
## What changes were proposed in this pull request? a new interface which allows data source to report partitioning and avoid shuffle at Spark side. The design is pretty like the internal distribution/partitioing framework. Spark defines a `Distribution` interfaces and several concrete implementations, and ask the data source to report a `Partitioning`, the `Partitioning` should tell Spark if it can satisfy a `Distribution` or not. ## How was this patch tested? new test Author: Wenchen Fan <wenchen@databricks.com> Closes #20201 from cloud-fan/partition-reporting. (cherry picked from commit 51eb750) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
@cloud-fan, please ping me to review PRs for DataSourceV2. Our new table format uses it and we're preparing some changes, so I want to make sure we're heading in the same direction for this. |
ah sorry I missed this, but it's not too late to do post-hoc reviews, any comments are welcome! |
What changes were proposed in this pull request?
a new interface which allows data source to report partitioning and avoid shuffle at Spark side.
The design is pretty like the internal distribution/partitioing framework. Spark defines a
Distribution
interfaces and several concrete implementations, and ask the data source to report aPartitioning
, thePartitioning
should tell Spark if it can satisfy aDistribution
or not.How was this patch tested?
new test