From 822ad9bc5a10288e2a1a7af54d9c6f61f9d6e151 Mon Sep 17 00:00:00 2001 From: MithunR Date: Fri, 31 May 2024 00:19:35 -0700 Subject: [PATCH] [Spark 4.0] Account for `PartitionedFileUtil.splitFiles` signature change. (#10857) * Account for PartitionedFileUtil.splitFiles signature change. Fixes #10299. In Apache Spark 4.0, the signature of `PartitionedFileUtil.splitFiles` was changed to remove unused parameters (apache/spark@eabea643c74). This causes the Spark RAPIDS plugin build to break with Spark 4.0. This commit introduces a shim to account for the signature change. Signed-off-by: MithunR * Common base for PartitionFileUtilsShims. Signed-off-by: MithunR * Reusing existing PartitionedFileUtilsShims. * More refactor, for pre-3.5 compile. * Updated Copyright date. * Fixed style error. * Re-fixed the copyright year. * Added missing import. --------- Signed-off-by: MithunR --- .../shims/PartitionedFileUtilsShim.scala | 22 +-------- .../shims/PartitionedFileUtilsShimBase.scala | 45 +++++++++++++++++++ .../shims/PartitionedFileUtilsShim.scala | 16 ++++++- .../execution/rapids/shims/SplitFiles.scala | 6 +-- .../shims/PartitionedFileUtilsShim.scala | 38 ++++++++++++++++ .../shims/PartitionedFileUtilsShim.scala | 40 +++++++++++++++++ 6 files changed, 141 insertions(+), 26 deletions(-) create mode 100644 sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShimBase.scala create mode 100644 sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala create mode 100644 sql-plugin/src/main/spark400/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala diff --git a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala index ca2fa215892..62fe32ae8db 100644 --- a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala +++ b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala @@ -19,27 +19,7 @@ {"spark": "341"} {"spark": "342"} {"spark": "343"} -{"spark": "350"} -{"spark": "351"} -{"spark": "400"} spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids.shims -import org.apache.spark.paths.SparkPath -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.PartitionedFile - -object PartitionedFileUtilsShim { - // Wrapper for case class constructor so Java code can access - // the default values across Spark versions. - def newPartitionedFile( - partitionValues: InternalRow, - filePath: String, - start: Long, - length: Long): PartitionedFile = PartitionedFile(partitionValues, - SparkPath.fromPathString(filePath), start, length) - - def withNewLocations(pf: PartitionedFile, locations: Seq[String]): PartitionedFile = { - pf.copy(locations = locations.toArray) - } -} +object PartitionedFileUtilsShim extends PartitionedFileUtilsShimBase diff --git a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShimBase.scala b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShimBase.scala new file mode 100644 index 00000000000..a94c76dc083 --- /dev/null +++ b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShimBase.scala @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "340"} +{"spark": "341"} +{"spark": "342"} +{"spark": "343"} +{"spark": "350"} +{"spark": "351"} +{"spark": "400"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile + +trait PartitionedFileUtilsShimBase { + + // Wrapper for case class constructor so Java code can access + // the default values across Spark versions. + def newPartitionedFile(partitionValues: InternalRow, + filePath: String, + start: Long, + length: Long): PartitionedFile = PartitionedFile(partitionValues, + SparkPath.fromPathString(filePath), start, length) + + def withNewLocations(pf: PartitionedFile, locations: Seq[String]): PartitionedFile = { + pf.copy(locations = locations.toArray) + } +} diff --git a/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala b/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala index 249502f1b49..0f1bdafde7a 100644 --- a/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala +++ b/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,10 @@ spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids.shims import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources.{FileStatusWithMetadata, PartitionedFile} object PartitionedFileUtilsShim { // Wrapper for case class constructor so Java code can access @@ -37,4 +39,14 @@ object PartitionedFileUtilsShim { def withNewLocations(pf: PartitionedFile, locations: Seq[String]): PartitionedFile = { pf.copy(locations = locations) } + + // In Spark 4.0, PartitionedFileUtil.splitFiles lost its `sparkSession` parameter. + // This pre-Spark-4.0 shim keeps the `sparkSession` parameter. + def splitFiles(sparkSession: SparkSession, + file: FileStatusWithMetadata, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + PartitionedFileUtil.splitFiles(sparkSession, file, isSplitable, maxSplitBytes, partitionValues) + } } diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala index 3b94d5a5201..1934cb6af9f 100644 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala @@ -23,12 +23,12 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.execution.rapids.shims +import com.nvidia.spark.rapids.shims.PartitionedFileUtilsShim import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory, PartitionedFile} trait SplitFiles { @@ -49,7 +49,7 @@ trait SplitFiles { selectedPartitions.flatMap { partition => partition.files.flatMap { f => - PartitionedFileUtil.splitFiles( + PartitionedFileUtilsShim.splitFiles( sparkSession, f, isSplitable = canBeSplit(f.getPath, hadoopConf), @@ -71,7 +71,7 @@ trait SplitFiles { val filePath = file.getPath val isSplitable = relation.fileFormat.isSplitable( relation.sparkSession, relation.options, filePath) - PartitionedFileUtil.splitFiles( + PartitionedFileUtilsShim.splitFiles( sparkSession = relation.sparkSession, file = file, isSplitable = isSplitable, diff --git a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala new file mode 100644 index 00000000000..71ad5ae1a0f --- /dev/null +++ b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "350"} +{"spark": "351"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources.{FileStatusWithMetadata, PartitionedFile} + +object PartitionedFileUtilsShim extends PartitionedFileUtilsShimBase { + // In Spark 4.0, PartitionedFileUtil.splitFiles lost its `sparkSession` parameter. + // This pre-Spark-4.0 shim keeps the `sparkSession` parameter. + def splitFiles(sparkSession: SparkSession, + file: FileStatusWithMetadata, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + PartitionedFileUtil.splitFiles(sparkSession, file, isSplitable, maxSplitBytes, partitionValues) + } +} diff --git a/sql-plugin/src/main/spark400/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala b/sql-plugin/src/main/spark400/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala new file mode 100644 index 00000000000..de8e98962a7 --- /dev/null +++ b/sql-plugin/src/main/spark400/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "400"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources.{FileStatusWithMetadata, PartitionedFile} + +object PartitionedFileUtilsShim extends PartitionedFileUtilsShimBase { + + // In Spark 4.0, PartitionedFileUtil.splitFiles lost its `sparkSession` parameter. + // This Spark-4.0+ shim ignores the `sparkSession` parameter. + def splitFiles(sparkSession: SparkSession, + file: FileStatusWithMetadata, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + PartitionedFileUtil.splitFiles(file, isSplitable, maxSplitBytes, partitionValues) + } + +} // object PartitionFileUtilsShim;