diff --git a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala index ca2fa215892..62fe32ae8db 100644 --- a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala +++ b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala @@ -19,27 +19,7 @@ {"spark": "341"} {"spark": "342"} {"spark": "343"} -{"spark": "350"} -{"spark": "351"} -{"spark": "400"} spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids.shims -import org.apache.spark.paths.SparkPath -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.PartitionedFile - -object PartitionedFileUtilsShim { - // Wrapper for case class constructor so Java code can access - // the default values across Spark versions. - def newPartitionedFile( - partitionValues: InternalRow, - filePath: String, - start: Long, - length: Long): PartitionedFile = PartitionedFile(partitionValues, - SparkPath.fromPathString(filePath), start, length) - - def withNewLocations(pf: PartitionedFile, locations: Seq[String]): PartitionedFile = { - pf.copy(locations = locations.toArray) - } -} +object PartitionedFileUtilsShim extends PartitionedFileUtilsShimBase diff --git a/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShimBase.scala b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShimBase.scala new file mode 100644 index 00000000000..a94c76dc083 --- /dev/null +++ b/sql-plugin/src/main/spark340/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShimBase.scala @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "340"} +{"spark": "341"} +{"spark": "342"} +{"spark": "343"} +{"spark": "350"} +{"spark": "351"} +{"spark": "400"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile + +trait PartitionedFileUtilsShimBase { + + // Wrapper for case class constructor so Java code can access + // the default values across Spark versions. + def newPartitionedFile(partitionValues: InternalRow, + filePath: String, + start: Long, + length: Long): PartitionedFile = PartitionedFile(partitionValues, + SparkPath.fromPathString(filePath), start, length) + + def withNewLocations(pf: PartitionedFile, locations: Seq[String]): PartitionedFile = { + pf.copy(locations = locations.toArray) + } +} diff --git a/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala b/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala index 249502f1b49..0f1bdafde7a 100644 --- a/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala +++ b/sql-plugin/src/main/spark341db/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,10 @@ spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids.shims import org.apache.spark.paths.SparkPath +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources.{FileStatusWithMetadata, PartitionedFile} object PartitionedFileUtilsShim { // Wrapper for case class constructor so Java code can access @@ -37,4 +39,14 @@ object PartitionedFileUtilsShim { def withNewLocations(pf: PartitionedFile, locations: Seq[String]): PartitionedFile = { pf.copy(locations = locations) } + + // In Spark 4.0, PartitionedFileUtil.splitFiles lost its `sparkSession` parameter. + // This pre-Spark-4.0 shim keeps the `sparkSession` parameter. + def splitFiles(sparkSession: SparkSession, + file: FileStatusWithMetadata, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + PartitionedFileUtil.splitFiles(sparkSession, file, isSplitable, maxSplitBytes, partitionValues) + } } diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala index 3b94d5a5201..1934cb6af9f 100644 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/execution/rapids/shims/SplitFiles.scala @@ -23,12 +23,12 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.execution.rapids.shims +import com.nvidia.spark.rapids.shims.PartitionedFileUtilsShim import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.PartitionedFileUtil import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionDirectory, PartitionedFile} trait SplitFiles { @@ -49,7 +49,7 @@ trait SplitFiles { selectedPartitions.flatMap { partition => partition.files.flatMap { f => - PartitionedFileUtil.splitFiles( + PartitionedFileUtilsShim.splitFiles( sparkSession, f, isSplitable = canBeSplit(f.getPath, hadoopConf), @@ -71,7 +71,7 @@ trait SplitFiles { val filePath = file.getPath val isSplitable = relation.fileFormat.isSplitable( relation.sparkSession, relation.options, filePath) - PartitionedFileUtil.splitFiles( + PartitionedFileUtilsShim.splitFiles( sparkSession = relation.sparkSession, file = file, isSplitable = isSplitable, diff --git a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala new file mode 100644 index 00000000000..71ad5ae1a0f --- /dev/null +++ b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "350"} +{"spark": "351"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources.{FileStatusWithMetadata, PartitionedFile} + +object PartitionedFileUtilsShim extends PartitionedFileUtilsShimBase { + // In Spark 4.0, PartitionedFileUtil.splitFiles lost its `sparkSession` parameter. + // This pre-Spark-4.0 shim keeps the `sparkSession` parameter. + def splitFiles(sparkSession: SparkSession, + file: FileStatusWithMetadata, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + PartitionedFileUtil.splitFiles(sparkSession, file, isSplitable, maxSplitBytes, partitionValues) + } +} diff --git a/sql-plugin/src/main/spark400/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala b/sql-plugin/src/main/spark400/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala new file mode 100644 index 00000000000..de8e98962a7 --- /dev/null +++ b/sql-plugin/src/main/spark400/scala/com/nvidia/spark/rapids/shims/PartitionedFileUtilsShim.scala @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "400"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.PartitionedFileUtil +import org.apache.spark.sql.execution.datasources.{FileStatusWithMetadata, PartitionedFile} + +object PartitionedFileUtilsShim extends PartitionedFileUtilsShimBase { + + // In Spark 4.0, PartitionedFileUtil.splitFiles lost its `sparkSession` parameter. + // This Spark-4.0+ shim ignores the `sparkSession` parameter. + def splitFiles(sparkSession: SparkSession, + file: FileStatusWithMetadata, + isSplitable: Boolean, + maxSplitBytes: Long, + partitionValues: InternalRow): Seq[PartitionedFile] = { + PartitionedFileUtil.splitFiles(file, isSplitable, maxSplitBytes, partitionValues) + } + +} // object PartitionFileUtilsShim;