From 3ecb2908dea68b96a0ec8a00ffd69ad872a6d42a Mon Sep 17 00:00:00 2001 From: cashmand Date: Mon, 6 Jan 2025 10:32:54 +0800 Subject: [PATCH] [SPARK-50614][SQL] Revert API change for shredding ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/49234 changed the `setSchema` method of ParquetWriteSupport to add a shredding schema parameter. To avoid the risk of breaking libraries that call `setSchema`, this PR instead creates a separate `setShreddingSchema` method to set the shredding schema. If not called, shredding will not be used. ### Why are the changes needed? Avoid breaking API. ### Does this PR introduce _any_ user-facing change? No, the feature has not been released yet. ### How was this patch tested? Existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #49353 from cashmand/fix_set_schema. Authored-by: cashmand Signed-off-by: Wenchen Fan --- .../execution/datasources/parquet/ParquetUtils.scala | 3 ++- .../datasources/parquet/ParquetWriteSupport.scala | 10 +++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index a609a4e0a25f3..663182d8d1820 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -485,7 +485,8 @@ object ParquetUtils extends Logging { } // This metadata is useful for keeping UDTs like Vector/Matrix. - ParquetWriteSupport.setSchema(dataSchema, conf, shreddingSchema) + ParquetWriteSupport.setSchema(dataSchema, conf) + shreddingSchema.foreach(ParquetWriteSupport.setShreddingSchema(_, conf)) // Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet // schema and writes actual rows to Parquet files. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 02b432f98d7d8..35eb57a2e4fb2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -539,14 +539,14 @@ object ParquetWriteSupport { val SPARK_VARIANT_SHREDDING_SCHEMA: String = "org.apache.spark.sql.parquet.variant.shredding.attributes" - def setSchema(schema: StructType, configuration: Configuration, - shreddingSchema: Option[StructType]): Unit = { + def setSchema(schema: StructType, configuration: Configuration): Unit = { configuration.set(SPARK_ROW_SCHEMA, schema.json) configuration.setIfUnset( ParquetOutputFormat.WRITER_VERSION, ParquetProperties.WriterVersion.PARQUET_1_0.toString) - shreddingSchema.foreach { s => - configuration.set(SPARK_VARIANT_SHREDDING_SCHEMA, s.json) - } + } + + def setShreddingSchema(shreddingSchema: StructType, configuration: Configuration): Unit = { + configuration.set(SPARK_VARIANT_SHREDDING_SCHEMA, shreddingSchema.json) } }