Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50644][SQL] Read variant struct in Parquet reader. #49263

Closed

Conversation

chenhao-db
Copy link
Contributor

What changes were proposed in this pull request?

It adds support for variant struct in Parquet reader. The concept of variant struct was introduced in #49235. It includes all the extracted fields from a variant column that the query requests.

Why are the changes needed?

By producing variant struct in Parquet reader, we can avoid reading/rebuilding the full variant and achieve more efficient variant processing.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Dec 21, 2024
@HyukjinKwon HyukjinKwon changed the title [SPARK-50644] Read variant struct in Parquet reader. [SPARK-50644][SQL] Read variant struct in Parquet reader. Dec 23, 2024
@@ -188,6 +570,32 @@ case object SparkShreddingUtils {
scalarSchema, objectSchema, arraySchema)
}

// Convert a scalar variant schema into a Spark scalar type.
def scalarSchemaToSparkType(scalar: VariantSchema.ScalarType): DataType = scalar match {
case _: VariantSchema.StringType => StringType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can have a util that lists supported types here .. otherwise, it's very likely we miss to fix this place when we happen to support more types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just keep it as it is because:

  1. Essentially, this function only needs to process the VariantSchema returned by buildVariantSchema, which we have control of and will know it has changed.
  2. I cannot think of an easy way list all supported types. Decimal types can have many precision-scale combinations.

@chenhao-db
Copy link
Contributor Author

@cloud-fan @cashmand @gene-db could you help review? Thanks!

@@ -390,6 +394,11 @@ object ParquetReadSupport extends Logging {
.named(parquetRecord.getName)
}

private def clipVariantSchema(parquetType: GroupType, variantStruct: StructType): GroupType = {
// TODO(SHREDDING): clip `parquetType` to retain the necessary columns.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this requires the new parquet version to support column pruning for variant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it requires a new Parquet version - it should be possible to clip it in sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala in the same way that unused struct fields are clipped. The logic of deciding which fields can be clipped is more complicated, though.

Copy link
Contributor Author

@chenhao-db chenhao-db Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't. in this function, we will have custom logic to clip parquetType to retain the necessary columns for reading variantStruct. But this part will be in a future PR to avoid making the single PR too big.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 2c1c4d2 Dec 24, 2024
Copy link
Contributor

@cashmand cashmand left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question for a possible future follow-up.

override def readFromTyped(row: InternalRow, topLevelMetadata: Array[Byte]): Any = {
if (castProject == null) {
return if (targetType.isInstanceOf[StringType]) {
UTF8String.fromString(rebuildVariant(row, topLevelMetadata).toJson(castArgs.zoneId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where the target type is string and the typed_value type is also string, would this add a lot of overhead? Is it worth specializing that case, since it seems like one that's likely to be common. I guess more generally, is rebuildVariant a heavier-than-necessary hammer if typed_value is any scalar?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a misunderstanding. If the target type is string and the typed_value type is also string, castProject will not be null, and the code with not take the rebuild path. I also measured the cost of castProject, and it turns out to be small. For string -> string specifically, if I replace the whole readFromTyped with row.getUTF8String(schema.typedIdx), the performance improvement is <10%.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, got it, I missed that we only do this in the castProject == null case. Thanks for measuring the performance impact of readFromTyped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants