diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataWritingCommandExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataWritingCommandExec.scala index 5a54d0b2f66..019f9b2e6b0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataWritingCommandExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataWritingCommandExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuWriteJobStatsTracker -import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.rapids.shims.RapidsErrorUtils import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration @@ -84,10 +84,9 @@ object GpuDataWritingCommand { if (fs.exists(filePath) && fs.getFileStatus(filePath).isDirectory && fs.listStatus(filePath).length != 0) { - TrampolineUtil.throwAnalysisException( - s"CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory " + - s"${tablePath} . To allow overwriting the existing non-empty directory, " + - s"set '$allowNonEmptyLocationInCTASKey' to true.") + throw RapidsErrorUtils. + createTableAsSelectWithNonEmptyDirectoryError(tablePath.toString, + allowNonEmptyLocationInCTASKey) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRunnableCommandExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRunnableCommandExec.scala index e3869960fc4..43bd593c0b5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRunnableCommandExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRunnableCommandExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.GpuWriteJobStatsTracker -import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.rapids.shims.RapidsErrorUtils import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration @@ -82,10 +82,9 @@ object GpuRunnableCommand { if (fs.exists(filePath) && fs.getFileStatus(filePath).isDirectory && fs.listStatus(filePath).length != 0) { - TrampolineUtil.throwAnalysisException( - s"CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory " + - s"${tablePath} . To allow overwriting the existing non-empty directory, " + - s"set '$allowNonEmptyLocationInCTASKey' to true.") + throw RapidsErrorUtils. + createTableAsSelectWithNonEmptyDirectoryError(tablePath.toString, + allowNonEmptyLocationInCTASKey) } } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/RapidsHiveErrors.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/RapidsHiveErrors.scala index 259a04ec318..40cac90680f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/RapidsHiveErrors.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/RapidsHiveErrors.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,9 +19,9 @@ package org.apache.spark.sql.hive.rapids import org.apache.hadoop.fs.Path import org.apache.spark.SparkException -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.rapids.shims.RapidsErrorUtils import org.apache.spark.sql.types.{DataType, DoubleType, FloatType, StringType} object RapidsHiveErrors { @@ -53,8 +53,7 @@ object RapidsHiveErrors { } def cannotResolveAttributeError(name: String, outputStr: String): Throwable = { - new AnalysisException( - s"Unable to resolve $name given [$outputStr]") + throw RapidsErrorUtils.cannotResolveAttributeError(name, outputStr) } def writePartitionExceedConfigSizeWhenDynamicPartitionError( diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSourceBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSourceBase.scala index 0ec720733e8..5589bca0435 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSourceBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSourceBase.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2 import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.rapids.shims.SchemaUtilsShims +import org.apache.spark.sql.rapids.shims.{RapidsErrorUtils, SchemaUtilsShims} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.{HadoopFSUtils, ThreadUtils, Utils} @@ -144,8 +144,8 @@ abstract class GpuDataSourceBase( } inferredOpt }.getOrElse { - throw new AnalysisException(s"Failed to resolve the schema for $format for " + - s"the partition column: $partitionColumn. It must be specified manually.") + throw RapidsErrorUtils. + partitionColumnNotSpecifiedError(format.toString, partitionColumn) } } StructType(partitionFields) @@ -162,8 +162,7 @@ abstract class GpuDataSourceBase( caseInsensitiveOptions - "path", SparkShimImpl.filesFromFileIndex(tempFileIndex)) }.getOrElse { - throw new AnalysisException( - s"Unable to infer schema for $format. It must be specified manually.") + throw RapidsErrorUtils.dataSchemaNotSpecifiedError(format.toString) } // We just print a waring message if the data schema and partition schema have the duplicate @@ -201,17 +200,13 @@ abstract class GpuDataSourceBase( case (dataSource: RelationProvider, None) => dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) case (_: SchemaRelationProvider, None) => - throw new AnalysisException(s"A schema needs to be specified when using $className.") + throw RapidsErrorUtils.schemaNotSpecifiedForSchemaRelationProviderError(className) case (dataSource: RelationProvider, Some(schema)) => val baseRelation = dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) if (!DataType.equalsIgnoreCompatibleNullability(baseRelation.schema, schema)) { - throw new AnalysisException( - "The user-specified schema doesn't match the actual schema: " + - s"user-specified: ${schema.toDDL}, actual: ${baseRelation.schema.toDDL}. If " + - "you're using DataFrameReader.schema API or creating a table, please do not " + - "specify the schema. Or if you're scanning an existed table, please drop " + - "it and re-create it.") + throw RapidsErrorUtils.userSpecifiedSchemaMismatchActualSchemaError(schema, + baseRelation.schema) } baseRelation @@ -233,9 +228,8 @@ abstract class GpuDataSourceBase( caseInsensitiveOptions - "path", SparkShimImpl.filesFromFileIndex(fileCatalog)) }.getOrElse { - throw new AnalysisException( - s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " + - "It must be specified manually") + throw RapidsErrorUtils. + dataSchemaNotSpecifiedError(format.toString, fileCatalog.allFiles().mkString(",")) } HadoopFsRelation( @@ -276,8 +270,7 @@ abstract class GpuDataSourceBase( caseInsensitiveOptions)(sparkSession) case _ => - throw new AnalysisException( - s"$className is not a valid Spark SQL Data Source.") + throw RapidsErrorUtils.invalidDataSourceError(className) } relation match { @@ -411,22 +404,13 @@ object GpuDataSourceBase extends Logging { dataSource case Failure(error) => if (provider1.startsWith("org.apache.spark.sql.hive.orc")) { - throw new AnalysisException( - "Hive built-in ORC data source must be used with Hive support enabled. " + - "Please use the native ORC data source by setting 'spark.sql.orc.impl' to " + - "'native'") + throw RapidsErrorUtils.orcNotUsedWithHiveEnabledError() } else if (provider1.toLowerCase(Locale.ROOT) == "avro" || provider1 == "com.databricks.spark.avro" || provider1 == "org.apache.spark.sql.avro") { - throw new AnalysisException( - s"Failed to find data source: $provider1. Avro is built-in but external data " + - "source module since Spark 2.4. Please deploy the application as per " + - "the deployment section of \"Apache Avro Data Source Guide\".") + throw RapidsErrorUtils.failedToFindAvroDataSourceError(provider1) } else if (provider1.toLowerCase(Locale.ROOT) == "kafka") { - throw new AnalysisException( - s"Failed to find data source: $provider1. Please deploy the application as " + - "per the deployment section of " + - "\"Structured Streaming + Kafka Integration Guide\".") + throw RapidsErrorUtils.failedToFindKafkaDataSourceError(provider1) } else { throw new ClassNotFoundException( s"Failed to find data source: $provider1. Please find packages at " + @@ -459,8 +443,7 @@ object GpuDataSourceBase extends Logging { s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).") internalSources.head.getClass } else { - throw new AnalysisException(s"Multiple sources found for $provider1 " + - s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.") + throw RapidsErrorUtils.findMultipleDataSourceError(provider1, sourceNames) } } } catch { @@ -513,7 +496,7 @@ object GpuDataSourceBase extends Logging { } if (checkEmptyGlobPath && globResult.isEmpty) { - throw new AnalysisException(s"Path does not exist: $globPath") + throw RapidsErrorUtils.dataPathNotExistError(globPath.toString) } globResult @@ -527,7 +510,7 @@ object GpuDataSourceBase extends Logging { ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", numThreads) { path => val fs = path.getFileSystem(hadoopConf) if (!fs.exists(path)) { - throw new AnalysisException(s"Path does not exist: $path") + throw RapidsErrorUtils.dataPathNotExistError(path.toString) } } } catch { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInsertIntoHadoopFsRelationCommand.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInsertIntoHadoopFsRelationCommand.scala index 2b7974fd1a6..ece5ef5acf5 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInsertIntoHadoopFsRelationCommand.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInsertIntoHadoopFsRelationCommand.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2022, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ import com.nvidia.spark.rapids.{ColumnarFileFormat, GpuDataWritingCommand} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.io.FileCommitProtocol -import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} +import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionPathString @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, CommandUtils} import org.apache.spark.sql.execution.datasources.{FileFormatWriter, FileIndex, PartitioningUtils} import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode -import org.apache.spark.sql.rapids.shims.SchemaUtilsShims +import org.apache.spark.sql.rapids.shims.{RapidsErrorUtils, SchemaUtilsShims} import org.apache.spark.sql.vectorized.ColumnarBatch case class GpuInsertIntoHadoopFsRelationCommand( @@ -121,7 +121,7 @@ case class GpuInsertIntoHadoopFsRelationCommand( val pathExists = fs.exists(qualifiedOutputPath) (mode, pathExists) match { case (SaveMode.ErrorIfExists, true) => - throw new AnalysisException(s"path $qualifiedOutputPath already exists.") + throw RapidsErrorUtils.outputPathAlreadyExistsError(qualifiedOutputPath) case (SaveMode.Overwrite, true) => if (ifPartitionNotExists && matchingPartitions.nonEmpty) { false diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuRandomExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuRandomExpressions.scala index 6675f678f6d..f9d0be81505 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuRandomExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuRandomExpressions.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,8 +23,8 @@ import com.nvidia.spark.rapids.Arm.withResource import com.nvidia.spark.rapids.shims.ShimUnaryExpression import org.apache.spark.TaskContext -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionWithRandomSeed} +import org.apache.spark.sql.rapids.execution.RapidsAnalysisException import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils @@ -52,7 +52,7 @@ case class GpuRand(child: Expression) extends ShimUnaryExpression with GpuExpres @transient protected lazy val seed: Long = child match { case GpuLiteral(s, IntegerType) => s.asInstanceOf[Int] case GpuLiteral(s, LongType) => s.asInstanceOf[Long] - case _ => throw new AnalysisException( + case _ => throw new RapidsAnalysisException( s"Input argument to $prettyName must be an integer, long or null literal.") } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala index 5ffe08348f1..8a88cc4024d 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala @@ -157,9 +157,6 @@ object TrampolineUtil { TaskContext.get.taskMemoryManager() } - /** Throw a Spark analysis exception */ - def throwAnalysisException(msg: String) = throw new AnalysisException(msg) - /** Set the task context for the current thread */ def setTaskContext(tc: TaskContext): Unit = TaskContext.setTaskContext(tc) @@ -241,4 +238,13 @@ object TrampolineUtil { } def getSparkHadoopUtilConf: Configuration = SparkHadoopUtil.get.conf + } + +/** + * This class is to only be used to throw errors specific to the + * RAPIDS Accelerator or errors mirroring Spark where a raw + * AnalysisException is thrown directly rather than via an error + * utility class (this should be rare). + */ +class RapidsAnalysisException(msg: String) extends AnalysisException(msg) diff --git a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala index fd48b8b6375..4d6d4967a80 100644 --- a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala +++ b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,8 @@ import org.apache.parquet.schema.OriginalType._ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.rapids.execution.RapidsAnalysisException +import org.apache.spark.sql.rapids.shims.RapidsErrorUtils import org.apache.spark.sql.types._ object ParquetSchemaClipShims { @@ -64,13 +65,13 @@ object ParquetSchemaClipShims { if (originalType == null) s"$typeName" else s"$typeName ($originalType)" def typeNotSupported() = - TrampolineUtil.throwAnalysisException(s"Parquet type not supported: $typeString") + throw new RapidsAnalysisException(s"Parquet type not supported: $typeString") def typeNotImplemented() = - TrampolineUtil.throwAnalysisException(s"Parquet type not yet supported: $typeString") + throw RapidsErrorUtils.parquetTypeUnsupportedYetError(typeString) def illegalType() = - TrampolineUtil.throwAnalysisException(s"Illegal Parquet type: $typeString") + throw RapidsErrorUtils.illegalParquetTypeError(typeString) // When maxPrecision = -1, we skip precision range check, and always respect the precision // specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored @@ -80,8 +81,7 @@ object ParquetSchemaClipShims { val scale = field.getDecimalMetadata.getScale if (!(maxPrecision == -1 || 1 <= precision && precision <= maxPrecision)) { - TrampolineUtil.throwAnalysisException( - s"Invalid decimal precision: $typeName " + + throw new RapidsAnalysisException(s"Invalid decimal precision: $typeName " + s"cannot store $precision digits (max $maxPrecision)") } @@ -121,7 +121,7 @@ object ParquetSchemaClipShims { case INT96 => if (!SQLConf.get.isParquetINT96AsTimestamp) { - TrampolineUtil.throwAnalysisException( + throw new RapidsAnalysisException( "INT96 is not supported unless it's interpreted as timestamp. " + s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.") } diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala index 6d4ca5da7c3..2ea0301fa2c 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/hive/rapids/shims/GpuInsertIntoHiveTable.scala @@ -45,7 +45,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils, ExternalCatalogWithListener} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -58,6 +58,7 @@ import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.client.hive._ import org.apache.spark.sql.hive.execution.InsertIntoHiveTable import org.apache.spark.sql.hive.rapids.{GpuHiveFileFormat, GpuSaveAsHiveFile, RapidsHiveErrors} +import org.apache.spark.sql.rapids.shims.RapidsErrorUtils import org.apache.spark.sql.vectorized.ColumnarBatch final class GpuInsertIntoHiveTableMeta(cmd: InsertIntoHiveTable, @@ -193,7 +194,7 @@ case class GpuInsertIntoHiveTable( // Report error if any static partition appears after a dynamic partition val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { - throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) + throw RapidsErrorUtils.dynamicPartitionParentError } } diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala index f23229e0956..7fa269db71a 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.types.{DataType, Decimal, DecimalType} -object RapidsErrorUtils { +object RapidsErrorUtils extends RapidsQueryErrorUtils { def invalidArrayIndexError(index: Int, numElements: Int, isElementAtF: Boolean = false): ArrayIndexOutOfBoundsException = { // Follow the Spark string format before 3.3.0 diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/RapidsQueryErrorUtils.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/RapidsQueryErrorUtils.scala new file mode 100644 index 00000000000..266cb4ef54f --- /dev/null +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/RapidsQueryErrorUtils.scala @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} +spark-rapids-shim-json-lines ***/ + +package org.apache.spark.sql.rapids.shims + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.ErrorMsg + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.rapids.execution.RapidsAnalysisException +import org.apache.spark.sql.types.StructType + +trait RapidsQueryErrorUtils { + + def outputPathAlreadyExistsError(qualifiedOutputPath: Path): Throwable = { + new AnalysisException(s"path $qualifiedOutputPath already exists.") + } + + def createTableAsSelectWithNonEmptyDirectoryError(tablePath: String, conf: String): Throwable = { + new AnalysisException(s"CREATE-TABLE-AS-SELECT cannot create table with location to a " + + s"non-empty directory $tablePath. To allow overwriting the existing non-empty directory, " + + s"set '$conf' to true.") + } + + def cannotResolveAttributeError(name: String, outputStr: String): Throwable = { + new AnalysisException(s"Unable to resolve $name given [$outputStr]") + } + + def partitionColumnNotSpecifiedError(format: String, partitionColumn: String): Throwable = { + new AnalysisException(s"Failed to resolve the schema for $format for the partition column: " + + s"$partitionColumn. It must be specified manually.") + } + + def dataSchemaNotSpecifiedError(format: String): Throwable = { + new AnalysisException(s"Unable to infer schema for $format. It must be specified manually.") + } + + def schemaNotSpecifiedForSchemaRelationProviderError(className: String): Throwable = { + new AnalysisException(s"A schema needs to be specified when using $className.") + } + + def userSpecifiedSchemaMismatchActualSchemaError( + schema: StructType, + actualSchema: StructType): Throwable = { + new AnalysisException("The user-specified schema doesn't match the actual schema: " + + s"user-specified: ${schema.toDDL}, actual: ${actualSchema.toDDL}. If " + + "you're using DataFrameReader.schema API or creating a table, please do not " + + "specify the schema. Or if you're scanning an existed table, please drop " + + "it and re-create it.") + } + + def dataSchemaNotSpecifiedError(format: String, fileCatalog: String): Throwable = { + new AnalysisException(s"Unable to infer schema for $format at $fileCatalog. " + + "It must be specified manually") + } + + def invalidDataSourceError(className: String): Throwable = { + new AnalysisException(s"$className is not a valid Spark SQL Data Source.") + } + + def orcNotUsedWithHiveEnabledError(): Throwable = { + new AnalysisException( + s"Hive built-in ORC data source must be used with Hive support enabled. " + + s"Please use the native ORC data source by setting 'spark.sql.orc.impl' to 'native'.") + } + + def failedToFindAvroDataSourceError(provider: String): Throwable = { + new AnalysisException( + s"Failed to find data source: $provider. Avro is built-in but external data " + + "source module since Spark 2.4. Please deploy the application as per " + + "the deployment section of \"Apache Avro Data Source Guide\".") + } + + def failedToFindKafkaDataSourceError(provider: String): Throwable = { + new AnalysisException( + s"Failed to find data source: $provider. Please deploy the application as " + + "per the deployment section of " + + "\"Structured Streaming + Kafka Integration Guide\".") + } + + def findMultipleDataSourceError(provider: String, sourceNames: Seq[String]): Throwable = { + new AnalysisException( + s"Multiple sources found for $provider " + + s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.") + } + + def dataPathNotExistError(path: String): Throwable = { + new AnalysisException(s"Path does not exist: $path") + } + + def dynamicPartitionParentError: Throwable = { + throw new RapidsAnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) + } + + def tableOrViewAlreadyExistsError(tableName: String): Throwable = { + new AnalysisException(s"Table $tableName already exists. You need to drop it first.") + } + + def parquetTypeUnsupportedYetError(parquetType: String): Throwable = { + new AnalysisException(s"Parquet type not yet supported: $parquetType.") + } + + def illegalParquetTypeError(parquetType: String): Throwable = { + new AnalysisException(s"Illegal Parquet type: $parquetType.") + } +} \ No newline at end of file diff --git a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala index c3152a8a235..bba205f267f 100644 --- a/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala +++ b/sql-plugin/src/main/spark320/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala @@ -29,7 +29,8 @@ import org.apache.parquet.schema.LogicalTypeAnnotation._ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.rapids.execution.RapidsAnalysisException +import org.apache.spark.sql.rapids.shims.RapidsErrorUtils import org.apache.spark.sql.types._ object ParquetSchemaClipShims { @@ -67,10 +68,10 @@ object ParquetSchemaClipShims { if (typeAnnotation == null) s"$typeName" else s"$typeName ($typeAnnotation)" def typeNotImplemented() = - TrampolineUtil.throwAnalysisException(s"Parquet type not yet supported: $typeString") + throw RapidsErrorUtils.parquetTypeUnsupportedYetError(typeString) def illegalType() = - TrampolineUtil.throwAnalysisException(s"Illegal Parquet type: $typeString") + throw RapidsErrorUtils.illegalParquetTypeError(typeString) // When maxPrecision = -1, we skip precision range check, and always respect the precision // specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored @@ -82,7 +83,7 @@ object ParquetSchemaClipShims { val scale = decimalLogicalTypeAnnotation.getScale if (!(maxPrecision == -1 || 1 <= precision && precision <= maxPrecision)) { - TrampolineUtil.throwAnalysisException( + throw new RapidsAnalysisException( s"Invalid decimal precision: $typeName " + s"cannot store $precision digits (max $maxPrecision)") } @@ -143,14 +144,14 @@ object ParquetSchemaClipShims { TimestampType case timestamp: TimestampLogicalTypeAnnotation if timestamp.getUnit == TimeUnit.NANOS && ParquetLegacyNanoAsLongShims.legacyParquetNanosAsLong => - TrampolineUtil.throwAnalysisException( + throw new RapidsAnalysisException( "GPU does not support spark.sql.legacy.parquet.nanosAsLong") case _ => illegalType() } case INT96 => if (!SQLConf.get.isParquetINT96AsTimestamp) { - TrampolineUtil.throwAnalysisException( + throw new RapidsAnalysisException( "INT96 is not supported unless it's interpreted as timestamp. " + s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.") } diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala index b301397255a..68a6ce30569 100644 --- a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala +++ b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.types.{DataType, Decimal, DecimalType} -object RapidsErrorUtils { +object RapidsErrorUtils extends RapidsQueryErrorUtils { def invalidArrayIndexError(index: Int, numElements: Int, isElementAtF: Boolean = false): ArrayIndexOutOfBoundsException = { // Follow the Spark string format before 3.3.0 diff --git a/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/RapidsQueryErrorUtils.scala b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/RapidsQueryErrorUtils.scala new file mode 100644 index 00000000000..dbc4145ee54 --- /dev/null +++ b/sql-plugin/src/main/spark320/scala/org/apache/spark/sql/rapids/shims/RapidsQueryErrorUtils.scala @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "334"} +{"spark": "340"} +{"spark": "341"} +{"spark": "341db"} +{"spark": "342"} +{"spark": "343"} +{"spark": "350"} +{"spark": "351"} +{"spark": "400"} +spark-rapids-shim-json-lines ***/ + +package org.apache.spark.sql.rapids.shims + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.ErrorMsg + +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.rapids.execution.RapidsAnalysisException +import org.apache.spark.sql.types.StructType + +trait RapidsQueryErrorUtils { + + def outputPathAlreadyExistsError(qualifiedOutputPath: Path): Throwable = { + QueryCompilationErrors.outputPathAlreadyExistsError(qualifiedOutputPath) + } + + def createTableAsSelectWithNonEmptyDirectoryError(tablePath: String, conf: String): Throwable = { + QueryCompilationErrors.createTableAsSelectWithNonEmptyDirectoryError(tablePath) + } + + def cannotResolveAttributeError(name: String, outputStr: String): Throwable = { + QueryCompilationErrors.cannotResolveAttributeError(name, outputStr) + } + + def partitionColumnNotSpecifiedError(format: String, partitionColumn: String): Throwable = { + QueryCompilationErrors.partitionColumnNotSpecifiedError(format, partitionColumn) + } + + def dataSchemaNotSpecifiedError(format: String): Throwable = { + QueryCompilationErrors.dataSchemaNotSpecifiedError(format) + } + + def schemaNotSpecifiedForSchemaRelationProviderError(className: String): Throwable = { + QueryCompilationErrors.schemaNotSpecifiedForSchemaRelationProviderError(className) + } + + def userSpecifiedSchemaMismatchActualSchemaError( + schema: StructType, + actualSchema: StructType): Throwable = { + QueryCompilationErrors.userSpecifiedSchemaMismatchActualSchemaError(schema, actualSchema) + } + + def dataSchemaNotSpecifiedError(format: String, fileCatalog: String): Throwable = { + QueryCompilationErrors.dataSchemaNotSpecifiedError(format, fileCatalog) + } + + def invalidDataSourceError(className: String): Throwable = { + QueryCompilationErrors.invalidDataSourceError(className) + } + + def orcNotUsedWithHiveEnabledError(): Throwable = { + QueryCompilationErrors.orcNotUsedWithHiveEnabledError() + } + + def failedToFindAvroDataSourceError(provider: String): Throwable = { + QueryCompilationErrors.failedToFindAvroDataSourceError(provider) + } + + def failedToFindKafkaDataSourceError(provider: String): Throwable = { + QueryCompilationErrors.failedToFindKafkaDataSourceError(provider) + } + + def findMultipleDataSourceError(provider: String, sourceNames: Seq[String]): Throwable = { + QueryCompilationErrors.findMultipleDataSourceError(provider, sourceNames) + } + + def dataPathNotExistError(path: String): Throwable = { + QueryCompilationErrors.dataPathNotExistError(path) + } + + def tableOrViewAlreadyExistsError(tableName: String): Throwable = { + QueryCompilationErrors.tableOrViewAlreadyExistsError(tableName) + } + + def parquetTypeUnsupportedYetError(parquetType: String): Throwable = { + QueryCompilationErrors.parquetTypeUnsupportedYetError(parquetType) + } + + def illegalParquetTypeError(parquetType: String): Throwable = { + QueryCompilationErrors.illegalParquetTypeError(parquetType) + } + + def dynamicPartitionParentError: Throwable = { + throw new RapidsAnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) + } +} \ No newline at end of file diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala index 56708017a23..8c395274e07 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/ParquetSchemaClipShims.scala @@ -44,7 +44,8 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport.containsFieldIds import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.rapids.execution.RapidsAnalysisException +import org.apache.spark.sql.rapids.shims.RapidsErrorUtils import org.apache.spark.sql.types._ object ParquetSchemaClipShims { @@ -109,10 +110,11 @@ object ParquetSchemaClipShims { if (typeAnnotation == null) s"$typeName" else s"$typeName ($typeAnnotation)" def typeNotImplemented() = - TrampolineUtil.throwAnalysisException(s"Parquet type not yet supported: $typeString") + throw RapidsErrorUtils.parquetTypeUnsupportedYetError(typeString) def illegalType() = - TrampolineUtil.throwAnalysisException(s"Illegal Parquet type: $parquetType") + throw RapidsErrorUtils.illegalParquetTypeError(typeString) + // When maxPrecision = -1, we skip precision range check, and always respect the precision // specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored @@ -124,7 +126,7 @@ object ParquetSchemaClipShims { val scale = decimalLogicalTypeAnnotation.getScale if (!(maxPrecision == -1 || 1 <= precision && precision <= maxPrecision)) { - TrampolineUtil.throwAnalysisException(s"Invalid decimal precision: $typeName " + + throw new RapidsAnalysisException(s"Invalid decimal precision: $typeName " + s"cannot store $precision digits (max $maxPrecision)") } @@ -183,14 +185,14 @@ object ParquetSchemaClipShims { ParquetTimestampAnnotationShims.timestampTypeForMillisOrMicros(timestamp) case timestamp: TimestampLogicalTypeAnnotation if timestamp.getUnit == TimeUnit.NANOS && ParquetLegacyNanoAsLongShims.legacyParquetNanosAsLong => - TrampolineUtil.throwAnalysisException( + throw new RapidsAnalysisException( "GPU does not support spark.sql.legacy.parquet.nanosAsLong") case _ => illegalType() } case INT96 => if (!SQLConf.get.isParquetINT96AsTimestamp) { - TrampolineUtil.throwAnalysisException( + throw new RapidsAnalysisException( "INT96 is not supported unless it's interpreted as timestamp. " + s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.") } diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala index bb28c370749..e5cdcd43568 100644 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala +++ b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,7 +31,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, Decimal, DecimalType} -object RapidsErrorUtils extends RapidsErrorUtilsFor330plus { +object RapidsErrorUtils extends RapidsErrorUtilsFor330plus with RapidsQueryErrorUtils { def mapKeyNotExistError( key: String, diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala index 1012b28d8b7..7e58a54c921 100644 --- a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ package org.apache.spark.sql.rapids.shims import org.apache.spark.sql.errors.QueryExecutionErrors -object RapidsErrorUtils extends RapidsErrorUtilsBase { +object RapidsErrorUtils extends RapidsErrorUtilsBase with RapidsQueryErrorUtils { def sqlArrayIndexNotStartAtOneError(): RuntimeException = { QueryExecutionErrors.elementAtByIndexZeroError(context = null) } diff --git a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala index c8d76f85e5c..42fd5941025 100644 --- a/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala +++ b/sql-plugin/src/main/spark332db/scala/com/nvidia/spark/rapids/shims/GpuInsertIntoHiveTable.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils, ExternalCatalogWithListener} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -48,6 +48,7 @@ import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.execution.InsertIntoHiveTable import org.apache.spark.sql.hive.rapids.{GpuHiveFileFormat, GpuSaveAsHiveFile, RapidsHiveErrors} +import org.apache.spark.sql.rapids.shims.RapidsErrorUtils import org.apache.spark.sql.vectorized.ColumnarBatch final class GpuInsertIntoHiveTableMeta(cmd: InsertIntoHiveTable, @@ -182,7 +183,7 @@ case class GpuInsertIntoHiveTable( // Report error if any static partition appears after a dynamic partition val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { - throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) + throw RapidsErrorUtils.dynamicPartitionParentError } } diff --git a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala index 78daa0bf6f1..e7b3561f5fd 100644 --- a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala +++ b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala @@ -42,7 +42,7 @@ import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, SortOrder} @@ -51,6 +51,7 @@ import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.{GpuWriteFiles, GpuWriteFilesExec, GpuWriteFilesSpec, WriteTaskResult, WriteTaskStats} import org.apache.spark.sql.execution.datasources.FileFormatWriter.OutputSpec +import org.apache.spark.sql.rapids.execution.RapidsAnalysisException import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -61,7 +62,7 @@ object GpuFileFormatWriter extends Logging { private def verifySchema(format: ColumnarFileFormat, schema: StructType): Unit = { schema.foreach { field => if (!format.supportDataType(field.dataType)) { - throw new AnalysisException( + throw new RapidsAnalysisException( s"$format data source does not support ${field.dataType.catalogString} data type.") } } diff --git a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/shims/GpuCreateDataSourceTableAsSelectCommandShims.scala b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/shims/GpuCreateDataSourceTableAsSelectCommandShims.scala index 9e36cf41fad..6308f24c552 100644 --- a/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/shims/GpuCreateDataSourceTableAsSelectCommandShims.scala +++ b/sql-plugin/src/main/spark332db/scala/org/apache/spark/sql/rapids/shims/GpuCreateDataSourceTableAsSelectCommandShims.scala @@ -64,7 +64,7 @@ case class GpuCreateDataSourceTableAsSelectCommand( s"Expect the table $tableName has been dropped when the save mode is Overwrite") if (mode == SaveMode.ErrorIfExists) { - throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.") + throw RapidsErrorUtils.tableOrViewAlreadyExistsError(tableName) } if (mode == SaveMode.Ignore) { // Since the table already exists and the save mode is Ignore, we will just return. diff --git a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala index 8ee0485ab36..e6f8886f19c 100644 --- a/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala +++ b/sql-plugin/src/main/spark340/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, Decimal, DecimalType} -object RapidsErrorUtils extends RapidsErrorUtilsFor330plus { +object RapidsErrorUtils extends RapidsErrorUtilsFor330plus with RapidsQueryErrorUtils { def mapKeyNotExistError( key: String, diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala index a0ba17f9bd4..9b800d4e51a 100644 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/shims/RapidsErrorUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ package org.apache.spark.sql.rapids.shims import org.apache.spark.sql.errors.QueryExecutionErrors -object RapidsErrorUtils extends RapidsErrorUtilsBase { +object RapidsErrorUtils extends RapidsErrorUtilsBase with RapidsQueryErrorUtils { def sqlArrayIndexNotStartAtOneError(): RuntimeException = { QueryExecutionErrors.invalidIndexOfZeroError(context = null) }