-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-50615][SQL] Push variant into scan. #49235
Conversation
@cloud-fan @gene-db @cashmand Please help review, thanks! |
@@ -95,7 +95,8 @@ class SparkOptimizer( | |||
EliminateLimits, | |||
ConstantFolding), | |||
Batch("User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*), | |||
Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition))) | |||
Batch("Replace CTE with Repartition", Once, ReplaceCTERefWithRepartition), | |||
Batch("Push Variant Into Scan", Once, PushVariantIntoScan))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this rule need to be in the end of optimizer?
def addVariantFields(attrId: ExprId, dataType: DataType, defaultValue: Any, | ||
path: Seq[Int]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def addVariantFields(attrId: ExprId, dataType: DataType, defaultValue: Any, | |
path: Seq[Int]): Unit = { | |
def addVariantFields( | |
attrId: ExprId, | |
dataType: DataType, | |
defaultValue: Any, | |
path: Seq[Int]): Unit = { |
private def addField(map: HashMap[RequestedVariantField, Int], | ||
field: RequestedVariantField): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private def addField(map: HashMap[RequestedVariantField, Int], | |
field: RequestedVariantField): Unit = { | |
private def addField( | |
map: HashMap[RequestedVariantField, Int], | |
field: RequestedVariantField): Unit = { |
} | ||
} | ||
|
||
private def rewritePlan(originalPlan: LogicalPlan, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, fix indentaiton please
// - Filter [v.0 = 1] | ||
// - Relation [v: struct<0: int, 1: string, 2: variant>] | ||
// The struct fields are annotated with `VariantMetadata` to indicate the extraction path. | ||
object PushVariantIntoScan extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rule matches a similar plan pattern as SchemaPruning
, shall we also put this rule in the earlyScanPushDownRules
of SparkOptimizer
?
@cloud-fan Thanks! I have made the changes you recommended. |
thanks, merging to master! |
### What changes were proposed in this pull request? It adds support for variant struct in Parquet reader. The concept of variant struct was introduced in #49235. It includes all the extracted fields from a variant column that the query requests. ### Why are the changes needed? By producing variant struct in Parquet reader, we can avoid reading/rebuilding the full variant and achieve more efficient variant processing. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49263 from chenhao-db/spark_variant_struct_reader. Authored-by: Chenhao Li <chenhao.li@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
It adds an optimizer rule to push variant into scan by rewriting the variant type with a struct type producing all requested fields and rewriting the variant extraction expressions by struct accesses. This will be the foundation of the variant shredding reader. The rule must be disabled at this point because the scan part is not yet able to recognize the special struct.
Why are the changes needed?
It is necessary for the performance of reading from shredded variant. With this rule (and the reader implemented), the scan only needs to fetch the necessary shredded columns required by the plan.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test.
Was this patch authored or co-authored using generative AI tooling?
No.