Skip to content

Commit

Permalink
Account for PartitionedFileUtil.splitFiles signature change.
Browse files Browse the repository at this point in the history
Fixes NVIDIA#10299.

In Apache Spark 4.0, the signature of `PartitionedFileUtil.splitFiles` was changed
to remove unused parameters (apache/spark@eabea643c74).  This causes the Spark RAPIDS
plugin build to break with Spark 4.0.

This commit introduces a shim to account for the signature change.

Signed-off-by: MithunR <mithunr@nvidia.com>
  • Loading branch information
mythrocks committed May 21, 2024
1 parent 188eb39 commit 9c9548b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*** spark-rapids-shim-json-lines
{"spark": "341db"}
{"spark": "350"}
{"spark": "351"}
spark-rapids-shim-json-lines ***/
package org.apache.spark.sql.execution.rapids.shims

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{FileStatusWithMetadata, PartitionedFile}

object PartitionedFileUtilShim {

// In Spark 4.0, PartitionedFileUtil.splitFiles lost its `sparkSession` parameter.
// This pre-Spark-4.0 shim keeps the `sparkSession` parameter.
def splitFiles(sparkSession: SparkSession,
file: FileStatusWithMetadata,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
PartitionedFileUtil.splitFiles(sparkSession, file, isSplitable, maxSplitBytes, partitionValues)
}

} // object PartitionFileUtilShim;
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory, PartitionedFile}

trait SplitFiles {
Expand All @@ -49,7 +48,7 @@ trait SplitFiles {

selectedPartitions.flatMap { partition =>
partition.files.flatMap { f =>
PartitionedFileUtil.splitFiles(
PartitionedFileUtilShim.splitFiles(
sparkSession,
f,
isSplitable = canBeSplit(f.getPath, hadoopConf),
Expand All @@ -71,7 +70,7 @@ trait SplitFiles {
val filePath = file.getPath
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
PartitionedFileUtilShim.splitFiles(
sparkSession = relation.sparkSession,
file = file,
isSplitable = isSplitable,
Expand Down

0 comments on commit 9c9548b

Please sign in to comment.