-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29572][SQL] add v1 read fallback API in DS v2 #26231
Conversation
@@ -97,17 +97,14 @@ trait DataSourceScanExec extends LeafExecNode { | |||
|
|||
/** Physical plan node for scanning data from a relation. */ | |||
case class RowDataSourceScanExec( | |||
fullOutput: Seq[Attribute], | |||
requiredColumnsIndex: Seq[Int], | |||
output: Seq[Attribute], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I need to use RowDataSourceScanExec
in the new read fallback code path, simplify it a little bit to make it easier to use.
Test build #112548 has finished for PR 26231 at commit
|
Retest this please. |
Test build #112622 has finished for PR 26231 at commit
|
034bc07
to
bb195f3
Compare
Test build #112771 has started for PR 26231 at commit |
@@ -141,7 +138,8 @@ case class RowDataSourceScanExec( | |||
// Don't care about `rdd` and `tableIdentifier` when canonicalizing. | |||
override def doCanonicalize(): SparkPlan = | |||
copy( | |||
fullOutput.map(QueryPlan.normalizeExpressions(_, fullOutput)), | |||
// Only the required column names matter when checking equality. | |||
output.map(a => a.withExprId(ExprId(-1))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this not use normalizeExpressions
? It seems odd to use a special case fixed ID here.
override def readSchema(): StructType = requiredSchema | ||
override def toV1Relation(): BaseRelation = { | ||
new BaseRelation with TableScan { | ||
override def sqlContext: SQLContext = SparkSession.active.sqlContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can SQLContext be passed in when converting?
} | ||
|
||
class TableWithV1ReadFallback extends Table with SupportsRead { | ||
override def name(): String = "v1-read-fallback" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should return the string version of the identifier it was loaded with.
bb195f3
to
f5cb61f
Compare
Test build #112924 has finished for PR 26231 at commit
|
f5cb61f
to
e8d718d
Compare
@@ -27,7 +26,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy | |||
import org.apache.spark.sql.internal.SQLConf | |||
|
|||
class SparkPlanner( | |||
val sparkContext: SparkContext, | |||
val session: SparkSession, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow. Finally. :)
@@ -42,7 +43,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { | |||
*/ | |||
private def pushFilters( | |||
scanBuilder: ScanBuilder, | |||
filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { | |||
filters: Seq[Expression]): (Seq[Filter], Seq[Expression]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @dbtsai since this is related to his on-going nested column filter work.
Test build #112951 has finished for PR 26231 at commit
|
e8d718d
to
87fc70a
Compare
@@ -51,6 +54,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] { | |||
""".stripMargin) | |||
|
|||
val scanRelation = DataSourceV2ScanRelation(relation.table, scan, output) | |||
scanRelation.setTagValue(PUSHED_FILTERS_TAG, pushedFilters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be convenient if Scan
can report pushed filters itself. But I'm not sure how to design the API to make it work.
Here I just store the pushed filters in the DataSourceV2ScanRelation
, so that I can use it when creating v1 physical scan node later, which needs pushedFilters
to do equality check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the pushedFilters
be just a parameter of DataSourceV2ScanRelation
?
Test build #113092 has finished for PR 26231 at commit
|
Test build #113094 has finished for PR 26231 at commit
|
Test build #115512 has finished for PR 26231 at commit
|
case s: TableScan => s.buildScan() | ||
case _ => | ||
throw new IllegalArgumentException( | ||
"`V1Scan.toV1Relation` must return a `TableScan` instance.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it must return TableScan
, why not rename the API as toV1TableScan
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I meant rename to toV1TableScan
and also change the return type as TableScan
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TableScan
is just a mixin, what we expect is BaseScan with TableScan
, but that doesn't work for java.
Test build #115561 has finished for PR 26231 at commit
|
|
||
import DataSourceV2Implicits._ | ||
|
||
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { | ||
// projection and filters were already pushed down in the optimizer. | ||
// this uses PhysicalOperation to get the projection and ensure that if the batch scan does | ||
// not support columnar, a projection is added to convert the rows to UnsafeRow. | ||
case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we match the V1Scan here? Thus we can simplify the code below.
sql/core/src/test/scala/org/apache/spark/sql/connector/V1ReadFallbackSuite.scala
Show resolved
Hide resolved
cd923ee
to
f786fa8
Compare
...core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
Outdated
Show resolved
Hide resolved
|
||
import DataSourceV2Implicits._ | ||
|
||
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { | ||
case PhysicalOperation(project, filters, | ||
relation @ DataSourceV2ScanRelation(table, v1Scan: V1Scan, output)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
table
is not used
Test build #116182 has finished for PR 26231 at commit
|
eccadc7
to
51bd0d7
Compare
Test build #116309 has finished for PR 26231 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some questions around V1Scan. I wonder if we can have the V1Scan output be a bit different such that the pushed filters and schema pruning be pushed down into the scan even for V1 relations, and maybe produce an RDD[Row]. Let me know what you think
|
||
/** | ||
* Creates an `BaseRelation` that can scan data from DataSource v1 to RDD[Row]. The returned | ||
* relation must be a `TableScan` instance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it need to be a TableScan
? Can't it be a HadoopFsRelation? Can't it be a PrunedFilteredScan?
public interface V1Scan extends Scan { | ||
|
||
/** | ||
* Creates an `BaseRelation` with `TableScan` that can scan data from DataSource v1 to RDD[Row]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Create a BaseRelation
* | ||
* @since 3.0.0 | ||
*/ | ||
<T extends BaseRelation & TableScan> T toV1TableScan(SQLContext context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it kind of seems weird to me that we're introducing new APIs that use deprecated APIs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We haven't marked SQLContext
as deprecated yet.
* @since 3.0.0 | ||
*/ | ||
@Unstable | ||
public interface V1Scan extends Scan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not push down filters and schema pruning down to this scan? We support these in the V1 APIs. Then you can avoid the pushed Filters tag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is the same with v1 write fallback API. The v1 write fallback API also relies on the v2 API to config the write. It's better to leverage the v2 infra as much as we can. e.g. we may improve the v2 pushdown to push more operators that v1 doesn't support.
val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd) | ||
val originalOutputNames = relation.table.schema().map(_.name) | ||
val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf) | ||
val dsScan = RowDataSourceScanExec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about an alternate constructor?
Similar to v1 write fallback API which uses v2 API to config the write, I think it makes more sense to use V2 API to do operator pushdown. That's why the V1 scan should be a |
Test build #116600 has finished for PR 26231 at commit
|
retest this please |
Test build #116617 has finished for PR 26231 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the filter sets provided is wrong. I think we can do something even better by some code reorganization. In V2ScanRelationPushDown
if we change the ordering of pruneColumns
and pushFilters
, I think you can create an API that simply wraps PrunedScan
and PrunedFilteredScan
. Imagine it as follows:
- First you prune columns. The V1ScanBuilder will check if the relation is
PrunedScan
orPrunedFilteredScan
. If it isPrunedScan
, it will eagerly build the RDD - If it is a
PrunedFilteredScan
, then it will wait. OnceSupportsPushDownFilters
kicks in, you call intoPrunedFilteredScan
with the pruned schema and filters. Then you get the RDD, and the BaseRelation gives you the unhandled filters. - You wrap the returned RDD in TableScan, and you're done. If the relation was neither, you just call the TableScan.buildScan().
What do you think?
@@ -51,6 +54,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] { | |||
""".stripMargin) | |||
|
|||
val scanRelation = DataSourceV2ScanRelation(relation.table, scan, output) | |||
scanRelation.setTagValue(PUSHED_FILTERS_TAG, pushedFilters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the pushedFilters
be just a parameter of DataSourceV2ScanRelation
?
val dsScan = RowDataSourceScanExec( | ||
output, | ||
requiredColumnsIndex, | ||
pushedFilters.toSet, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is incorrect, right? There were other filters that weren't handled. As I understand, this should be the entire set of filters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it should be the entire set of filters, but it's not a big deal. RowDataSourceScanExec.filters
is only used in toString
, to let people know which filters are pushed but not accepted by the source.
Anyway we can retain the full filter set like the pushed filters, I'll fix it.
class V1TableScan( | ||
context: SQLContext, | ||
requiredSchema: StructType, | ||
filters: Array[Filter]) extends BaseRelation with TableScan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about unhandledFilters
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation doesn't need to track the unhandled filters.
Implementation tells Spark the unhandled filters (a.k.a. post-scan filters) at https://github.com/apache/spark/pull/26231/files#diff-e65f6ba43960e865ba29530572696f56R150 , and then only need to track the pushed filters to evaluate them when scanning.
ff2d410
to
92b04c2
Compare
92b04c2
to
a48e7bb
Compare
Test build #116828 has finished for PR 26231 at commit
|
Test build #116834 has finished for PR 26231 at commit
|
retest this please |
Test build #116858 has finished for PR 26231 at commit
|
LGTM |
thanks for the review, merging to master! |
What changes were proposed in this pull request?
Add a
V1Scan
interface, so that data source v1 implementations can migrate to DS v2 much easier.Why are the changes needed?
It's a lot of work to migrate v1 sources to DS v2. The new API added here can allow v1 sources to go through v2 code paths without implementing all the Batch, Stream, PartitionReaderFactory, ... stuff.
We already have a v1 write fallback API after #25348
Does this PR introduce any user-facing change?
no
How was this patch tested?
new test suite