From 9c9548baf2a02deb1e94e6c6436ba06502035e72 Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 21 May 2024 14:52:39 -0700 Subject: [PATCH] Account for PartitionedFileUtil.splitFiles signature change. Fixes #10299. In Apache Spark 4.0, the signature of `PartitionedFileUtil.splitFiles` was changed to remove unused parameters (apache/spark@eabea643c74). This causes the Spark RAPIDS plugin build to break with Spark 4.0. This commit introduces a shim to account for the signature change. Signed-off-by: MithunR --- .../shims/PartitionedFileUtilShim.scala | 42 +++++++++++++++++++ .../execution/rapids/shims/SplitFiles.scala | 5 +-- 2 files changed, 44 insertions(+), 3 deletions(-) create mode 100644 sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/PartitionedFileUtilShim.scala diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/PartitionedFileUtilShim.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/PartitionedFileUtilShim.scala new file mode 100644 index 000000000000..b488ffb5daad --- /dev/null +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/PartitionedFileUtilShim.scala @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "341db"} +{"spark": "350"} +{"spark": "351"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.execution.rapids.shims + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources.{FileStatusWithMetadata, PartitionedFile} + +object PartitionedFileUtilShim { + + // In Spark 4.0, PartitionedFileUtil.splitFiles lost its `sparkSession` parameter. + // This pre-Spark-4.0 shim keeps the `sparkSession` parameter. + def splitFiles(sparkSession: SparkSession, + file: FileStatusWithMetadata, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + PartitionedFileUtil.splitFiles(sparkSession, file, isSplitable, maxSplitBytes, partitionValues) + } + +} // object PartitionFileUtilShim; diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala index 3b94d5a52018..95bb46fe0ea6 100644 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory, PartitionedFile} trait SplitFiles { @@ -49,7 +48,7 @@ trait SplitFiles { selectedPartitions.flatMap { partition => partition.files.flatMap { f => - PartitionedFileUtil.splitFiles( + PartitionedFileUtilShim.splitFiles( sparkSession, f, isSplitable = canBeSplit(f.getPath, hadoopConf), @@ -71,7 +70,7 @@ trait SplitFiles { val filePath = file.getPath val isSplitable = relation.fileFormat.isSplitable( relation.sparkSession, relation.options, filePath) - PartitionedFileUtil.splitFiles( + PartitionedFileUtilShim.splitFiles( sparkSession = relation.sparkSession, file = file, isSplitable = isSplitable,