Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Jan 22, 2018
1 parent ea4719f commit 28987a7
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
*/
@InterfaceStability.Evolving
public class ClusteredDistribution implements Distribution {
public String[] clusteredColumns;

/**
* The names of the clustered columns. Note that they are order insensitive.
*/
public final String[] clusteredColumns;

public ClusteredDistribution(String[] clusteredColumns) {
this.clusteredColumns = clusteredColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
* the data ordering inside one partition(the output records of a single {@link ReadTask}).
*
* The instance of this interface is created and provided by Spark, then consumed by
* {@link Partitioning#satisfy(Distribution)}. This means users don't need to implement
* this interface, but need to catch as more concrete implementations of this interface as
* possible in {@link Partitioning#satisfy(Distribution)}.
* {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to
* implement this interface, but need to catch as more concrete implementations of this interface
* as possible in {@link Partitioning#satisfy(Distribution)}.
*
* Concrete implementations until now:
* <ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.apache.spark.annotation.InterfaceStability;

/**
* An interface to represent output data partitioning for a data source, which is returned by
* An interface to represent the output data partitioning for a data source, which is returned by
* {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work like a
* snapshot, once created, it should be deterministic and always report same number of partitions
* snapshot. Once created, it should be deterministic and always report same number of partitions
* and same "satisfy" result for a certain distribution.
*/
@InterfaceStability.Evolving
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression}
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.sources.v2.reader.{ClusteredDistribution, Partitioning}

/**
* An adapter from public data source partitioning to catalyst internal partitioning.
* An adapter from public data source partitioning to catalyst internal `Partitioning`.
*/
class DataSourcePartitioning(
partitioning: Partitioning,
Expand All @@ -33,7 +33,7 @@ class DataSourcePartitioning(
override def satisfies(required: physical.Distribution): Boolean = {
super.satisfies(required) || {
required match {
case d: physical.ClusteredDistribution if d.clustering.forall(_.isInstanceOf[Attribute]) =>
case d: physical.ClusteredDistribution if isCandidate(d.clustering) =>
val attrs = d.clustering.map(_.asInstanceOf[Attribute])
partitioning.satisfy(
new ClusteredDistribution(attrs.map { a =>
Expand All @@ -46,4 +46,11 @@ class DataSourcePartitioning(
}
}
}

private def isCandidate(clustering: Seq[Expression]): Boolean = {
clustering.forall {
case a: Attribute => colNames.contains(a)
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
assert(groupByColB.queryExecution.executedPlan.collectFirst {
case e: ShuffleExchangeExec => e
}.isDefined)

val groupByAPlusB = df.groupBy('a + 'b).agg(count("*"))
checkAnswer(groupByAPlusB, Seq(Row(5, 2), Row(6, 2), Row(8, 1), Row(9, 1)))
assert(groupByAPlusB.queryExecution.executedPlan.collectFirst {
case e: ShuffleExchangeExec => e
}.isDefined)
}
}
}
Expand Down

0 comments on commit 28987a7

Please sign in to comment.