-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-50614][SQL] Add Variant shredding support for Parquet #49234
Conversation
this.schema | ||
} else { | ||
val v = StructType.fromString(shreddedSchemaString) | ||
// A bit awkwardly, the schema string doesn't include metadata to identify which struct |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to suggestions, but from reading the code, it didn't look like the metadata could be passed through the DDL representation, and I couldn't think of an alternative approach to identify the shredding for each Variant field. Technically, for this PR I guess I could have gotten away with constructing the shredded schema in this code directly from the SQLConf, but in the future, I think we'll want to do it based on table properties and/or buffered data in the task, and I don't think it would make sense for that to go here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we use data type JSON string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, thanks. I had looked at the json
method in StructType, and forgot that the metadata is in StructField, not StructType, so I missed that it is writing the metadata there. Let me clean this up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the PR to clean this up. I also moved the code to add/check the metadata into SparkShreddingUtils
.
2303842
to
3f7fad8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cashmand Thanks for this feature!
LGTM
val VARIANT_WRITE_SHREDDING_ENABLED = | ||
buildConf("spark.sql.variant.writeShredding.enabled") | ||
.internal() | ||
.doc("When true, the Parquet reader is allowed to write shredded variant. ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean Delta is controlled some other way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intent of this conf is to be a global kill switch for the write shredding feature, and that we'll pass a write-specific conf to actually control how we shred - e.g. have the shredding determined by sampling data in the task, or specify a shredding schema for each Variant column.
Right now, the conf only has an effect if VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST
is also set to a non-empty value. I could remove it if we don't think there's any value to having a global kill switch like this. @cloud-fan, any opinion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok to have a kill switch.
val VARIANT_WRITE_SHREDDING_KEY: String = "__VARIANT_WRITE_SHREDDING_KEY" | ||
|
||
def isVariantShreddingStruct(s: StructType): Boolean = { | ||
s.fields.length > 0 && s.fields.forall(_.metadata.contains(VARIANT_WRITE_SHREDDING_KEY)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every field contains the shredding key? What happens only some have it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put it on all or none in updateSchemaForVariantShredding
, so that shouldn't really happen. We could check and fail here if there's an inconsistency. I could also just put it on the first field in the struct, if you prefer. I don't think there's any real need to put it on all of them.
val VARIANT_WRITE_SHREDDING_ENABLED = | ||
buildConf("spark.sql.variant.writeShredding.enabled") | ||
.internal() | ||
.doc("When true, the Parquet reader is allowed to write shredded variant. ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.doc("When true, the Parquet reader is allowed to write shredded variant. ") | |
.doc("When true, the Parquet writer is allowed to write shredded variant. ") |
case (field, shreddingField) => | ||
field.dataType match { | ||
case s: StructType => | ||
field.copy(dataType = replaceVariantTypes(s, shreddingSchema)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered adding a property like shreddedSchema
in VariantType
? This way, we won't need to determine how this struct type was from. Currently, it seems that this is judged through __VARIANT_WRITE_SHREDDING_KEY."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Zouxxyy, that's actually what I tried initially. I could go back to that approach if there's a consensus that it's better, but I felt that it had some downsides. Specifically, the approach I took was to add an optional shreddingSchema
property to the VariantType class, but leave the case object VariantType
to represent a VariantType with no shredding (similar to what's been done for StringType with default collation), to avoid having to modify all of the existing code that pattern matches to VariantType
.
I didn't think this approach added much value - it's not very clear what it means for the type to have a shredding schema outside of the specific context of the couple of writer classes that care about it, and it felt like it was just pushing writer-specific code into the core of the type system. E.g. when serializing the type to a string, should we include the shredding schema? I think the answer is that we'd want to when sending it to ParquetWriteSupport, but not when writing it to the parquet file (since it's not part of the Spark type when reading it back from Parquet). It seemed better to resolve these ambiguities explicitly in the relevant writer code, rather than making them part of the data type definition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cashmand Thank you for your reply. We have also implemented the shredding logic internally in Apache Paimon. Specifically, we use VariantType
along with an optional ShreddSchema
. In particular, on the write side, we calculate the ShreddSchema
based on the table's shredding props and then set it into VariantType
, which is passed to our writer (currently Parquet, but it could be ORC in the future, etc.).
On the read side, we calculate the required schema based on the scan filter and projection, which is also written into the optional attribute of VariantType, and passed to our reader. Combined with the schema of the Parquet variant, we generate the trimmed read ShreddSchema. I think that shredding is an extension of the variant type; when our writer sees, oh, there exists a ShreddSchema, the writer immediately knows that shredding needs to be performed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shredding only matters for read/write and I think it's weird to put the info in the VariantType
which can also be consumed by SQL functions/operators. The current approach looks cleaner to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Zouxxyy, I tend to agree with @cloud-fan's comment. My preference would be to stick with the current approach for now. I think it should be entirely contained within the Parquet writer code, so if we decide to extend the VariantType later, I don't think it would be hard to change this part of the code to use a different approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, It does look much more concise for now, looking forward to its completion!
thanks, merging to master! |
### What changes were proposed in this pull request? A debug message added in #49234 was missing a brace. As a result, the message was printing the non-pretty string representation of the struct, followed by the string `.prettyJson}`. This PR fixes it. ### Why are the changes needed? Cleaner debug messages. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually verified in spark-shell that the message looks wrong without the change, and correct with the change. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49345 from cashmand/fix_debug_msg. Authored-by: cashmand <david.cashman@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? #49234 changed the `setSchema` method of ParquetWriteSupport to add a shredding schema parameter. To avoid the risk of breaking libraries that call `setSchema`, this PR instead creates a separate `setShreddingSchema` method to set the shredding schema. If not called, shredding will not be used. ### Why are the changes needed? Avoid breaking API. ### Does this PR introduce _any_ user-facing change? No, the feature has not been released yet. ### How was this patch tested? Existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49353 from cashmand/fix_set_schema. Authored-by: cashmand <david.cashman@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Adds support for shredding in the Parquet writer code. Currently, the only way to enable shredding is through a SQLConf that provides the schema to use for shredding. This doesn't make sense as a user API, and is added only for testing. The exact API for Spark to determine a shredding schema is still TBD, but likely candidates are to infer it at the task level by inspecting the first few rows of data, or add an API to specify the schema for a given column. Either way, the code in this PR would basically be unchanged, it would just use a different mechanism to provide the schema.
Why are the changes needed?
Needed for Variant shredding support.
Does this PR introduce any user-facing change?
No, the feature is new in Spark 4.0, and is currently disabled, and only usable as a test feature.
How was this patch tested?
Added a unit test suite.
Was this patch authored or co-authored using generative AI tooling?
No.