Skip to content

Commit

Permalink
[SPARK-50614][SQL] Revert API change for shredding
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

#49234 changed the `setSchema` method of ParquetWriteSupport to add a shredding schema parameter. To avoid the risk of breaking libraries that call `setSchema`, this PR instead creates a separate `setShreddingSchema` method to set the shredding schema. If not called, shredding will not be used.

### Why are the changes needed?

Avoid breaking API.

### Does this PR introduce _any_ user-facing change?

No, the feature has not been released yet.

### How was this patch tested?

Existing unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49353 from cashmand/fix_set_schema.

Authored-by: cashmand <david.cashman@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cashmand authored and cloud-fan committed Jan 6, 2025
1 parent b613b22 commit 3ecb290
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,8 @@ object ParquetUtils extends Logging {
}

// This metadata is useful for keeping UDTs like Vector/Matrix.
ParquetWriteSupport.setSchema(dataSchema, conf, shreddingSchema)
ParquetWriteSupport.setSchema(dataSchema, conf)
shreddingSchema.foreach(ParquetWriteSupport.setShreddingSchema(_, conf))

// Sets flags for `ParquetWriteSupport`, which converts Catalyst schema to Parquet
// schema and writes actual rows to Parquet files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,14 @@ object ParquetWriteSupport {
val SPARK_VARIANT_SHREDDING_SCHEMA: String =
"org.apache.spark.sql.parquet.variant.shredding.attributes"

def setSchema(schema: StructType, configuration: Configuration,
shreddingSchema: Option[StructType]): Unit = {
def setSchema(schema: StructType, configuration: Configuration): Unit = {
configuration.set(SPARK_ROW_SCHEMA, schema.json)
configuration.setIfUnset(
ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.WriterVersion.PARQUET_1_0.toString)
shreddingSchema.foreach { s =>
configuration.set(SPARK_VARIANT_SHREDDING_SCHEMA, s.json)
}
}

def setShreddingSchema(shreddingSchema: StructType, configuration: Configuration): Unit = {
configuration.set(SPARK_VARIANT_SHREDDING_SCHEMA, shreddingSchema.json)
}
}

0 comments on commit 3ecb290

Please sign in to comment.