-
Notifications
You must be signed in to change notification settings - Fork 28.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-22389][SQL] data source v2 partitioning reporting interface
## What changes were proposed in this pull request? a new interface which allows data source to report partitioning and avoid shuffle at Spark side. The design is pretty like the internal distribution/partitioing framework. Spark defines a `Distribution` interfaces and several concrete implementations, and ask the data source to report a `Partitioning`, the `Partitioning` should tell Spark if it can satisfy a `Distribution` or not. ## How was this patch tested? new test Author: Wenchen Fan <wenchen@databricks.com> Closes #20201 from cloud-fan/partition-reporting. (cherry picked from commit 51eb750) Signed-off-by: gatorsmile <gatorsmile@gmail.com>
- Loading branch information
1 parent
566ef93
commit 7241556
Showing
9 changed files
with
411 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
38 changes: 38 additions & 0 deletions
38
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ClusteredDistribution.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2.reader; | ||
|
||
import org.apache.spark.annotation.InterfaceStability; | ||
|
||
/** | ||
* A concrete implementation of {@link Distribution}. Represents a distribution where records that | ||
* share the same values for the {@link #clusteredColumns} will be produced by the same | ||
* {@link ReadTask}. | ||
*/ | ||
@InterfaceStability.Evolving | ||
public class ClusteredDistribution implements Distribution { | ||
|
||
/** | ||
* The names of the clustered columns. Note that they are order insensitive. | ||
*/ | ||
public final String[] clusteredColumns; | ||
|
||
public ClusteredDistribution(String[] clusteredColumns) { | ||
this.clusteredColumns = clusteredColumns; | ||
} | ||
} |
39 changes: 39 additions & 0 deletions
39
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Distribution.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2.reader; | ||
|
||
import org.apache.spark.annotation.InterfaceStability; | ||
|
||
/** | ||
* An interface to represent data distribution requirement, which specifies how the records should | ||
* be distributed among the {@link ReadTask}s that are returned by | ||
* {@link DataSourceV2Reader#createReadTasks()}. Note that this interface has nothing to do with | ||
* the data ordering inside one partition(the output records of a single {@link ReadTask}). | ||
* | ||
* The instance of this interface is created and provided by Spark, then consumed by | ||
* {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to | ||
* implement this interface, but need to catch as more concrete implementations of this interface | ||
* as possible in {@link Partitioning#satisfy(Distribution)}. | ||
* | ||
* Concrete implementations until now: | ||
* <ul> | ||
* <li>{@link ClusteredDistribution}</li> | ||
* </ul> | ||
*/ | ||
@InterfaceStability.Evolving | ||
public interface Distribution {} |
46 changes: 46 additions & 0 deletions
46
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Partitioning.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2.reader; | ||
|
||
import org.apache.spark.annotation.InterfaceStability; | ||
|
||
/** | ||
* An interface to represent the output data partitioning for a data source, which is returned by | ||
* {@link SupportsReportPartitioning#outputPartitioning()}. Note that this should work like a | ||
* snapshot. Once created, it should be deterministic and always report the same number of | ||
* partitions and the same "satisfy" result for a certain distribution. | ||
*/ | ||
@InterfaceStability.Evolving | ||
public interface Partitioning { | ||
|
||
/** | ||
* Returns the number of partitions(i.e., {@link ReadTask}s) the data source outputs. | ||
*/ | ||
int numPartitions(); | ||
|
||
/** | ||
* Returns true if this partitioning can satisfy the given distribution, which means Spark does | ||
* not need to shuffle the output data of this data source for some certain operations. | ||
* | ||
* Note that, Spark may add new concrete implementations of {@link Distribution} in new releases. | ||
* This method should be aware of it and always return false for unrecognized distributions. It's | ||
* recommended to check every Spark new release and support new distributions if possible, to | ||
* avoid shuffle at Spark side for more cases. | ||
*/ | ||
boolean satisfy(Distribution distribution); | ||
} |
33 changes: 33 additions & 0 deletions
33
...core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2.reader; | ||
|
||
import org.apache.spark.annotation.InterfaceStability; | ||
|
||
/** | ||
* A mix in interface for {@link DataSourceV2Reader}. Data source readers can implement this | ||
* interface to report data partitioning and try to avoid shuffle at Spark side. | ||
*/ | ||
@InterfaceStability.Evolving | ||
public interface SupportsReportPartitioning { | ||
|
||
/** | ||
* Returns the output data partitioning that this reader guarantees. | ||
*/ | ||
Partitioning outputPartitioning(); | ||
} |
56 changes: 56 additions & 0 deletions
56
...src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution.datasources.v2 | ||
|
||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression} | ||
import org.apache.spark.sql.catalyst.plans.physical | ||
import org.apache.spark.sql.sources.v2.reader.{ClusteredDistribution, Partitioning} | ||
|
||
/** | ||
* An adapter from public data source partitioning to catalyst internal `Partitioning`. | ||
*/ | ||
class DataSourcePartitioning( | ||
partitioning: Partitioning, | ||
colNames: AttributeMap[String]) extends physical.Partitioning { | ||
|
||
override val numPartitions: Int = partitioning.numPartitions() | ||
|
||
override def satisfies(required: physical.Distribution): Boolean = { | ||
super.satisfies(required) || { | ||
required match { | ||
case d: physical.ClusteredDistribution if isCandidate(d.clustering) => | ||
val attrs = d.clustering.map(_.asInstanceOf[Attribute]) | ||
partitioning.satisfy( | ||
new ClusteredDistribution(attrs.map { a => | ||
val name = colNames.get(a) | ||
assert(name.isDefined, s"Attribute ${a.name} is not found in the data source output") | ||
name.get | ||
}.toArray)) | ||
|
||
case _ => false | ||
} | ||
} | ||
} | ||
|
||
private def isCandidate(clustering: Seq[Expression]): Boolean = { | ||
clustering.forall { | ||
case a: Attribute => colNames.contains(a) | ||
case _ => false | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
110 changes: 110 additions & 0 deletions
110
...core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package test.org.apache.spark.sql.sources.v2; | ||
|
||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
|
||
import org.apache.spark.sql.Row; | ||
import org.apache.spark.sql.catalyst.expressions.GenericRow; | ||
import org.apache.spark.sql.sources.v2.DataSourceV2; | ||
import org.apache.spark.sql.sources.v2.DataSourceV2Options; | ||
import org.apache.spark.sql.sources.v2.ReadSupport; | ||
import org.apache.spark.sql.sources.v2.reader.*; | ||
import org.apache.spark.sql.types.StructType; | ||
|
||
public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport { | ||
|
||
class Reader implements DataSourceV2Reader, SupportsReportPartitioning { | ||
private final StructType schema = new StructType().add("a", "int").add("b", "int"); | ||
|
||
@Override | ||
public StructType readSchema() { | ||
return schema; | ||
} | ||
|
||
@Override | ||
public List<ReadTask<Row>> createReadTasks() { | ||
return java.util.Arrays.asList( | ||
new SpecificReadTask(new int[]{1, 1, 3}, new int[]{4, 4, 6}), | ||
new SpecificReadTask(new int[]{2, 4, 4}, new int[]{6, 2, 2})); | ||
} | ||
|
||
@Override | ||
public Partitioning outputPartitioning() { | ||
return new MyPartitioning(); | ||
} | ||
} | ||
|
||
static class MyPartitioning implements Partitioning { | ||
|
||
@Override | ||
public int numPartitions() { | ||
return 2; | ||
} | ||
|
||
@Override | ||
public boolean satisfy(Distribution distribution) { | ||
if (distribution instanceof ClusteredDistribution) { | ||
String[] clusteredCols = ((ClusteredDistribution) distribution).clusteredColumns; | ||
return Arrays.asList(clusteredCols).contains("a"); | ||
} | ||
|
||
return false; | ||
} | ||
} | ||
|
||
static class SpecificReadTask implements ReadTask<Row>, DataReader<Row> { | ||
private int[] i; | ||
private int[] j; | ||
private int current = -1; | ||
|
||
SpecificReadTask(int[] i, int[] j) { | ||
assert i.length == j.length; | ||
this.i = i; | ||
this.j = j; | ||
} | ||
|
||
@Override | ||
public boolean next() throws IOException { | ||
current += 1; | ||
return current < i.length; | ||
} | ||
|
||
@Override | ||
public Row get() { | ||
return new GenericRow(new Object[] {i[current], j[current]}); | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
|
||
} | ||
|
||
@Override | ||
public DataReader<Row> createDataReader() { | ||
return this; | ||
} | ||
} | ||
|
||
@Override | ||
public DataSourceV2Reader createReader(DataSourceV2Options options) { | ||
return new Reader(); | ||
} | ||
} |
Oops, something went wrong.