Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ErrorClass to Throw AnalysisException [databricks] #10830

Merged
merged 19 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuWriteJobStatsTracker
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.shims.RapidsErrorUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -84,10 +84,9 @@ object GpuDataWritingCommand {
if (fs.exists(filePath) &&
fs.getFileStatus(filePath).isDirectory &&
fs.listStatus(filePath).length != 0) {
TrampolineUtil.throwAnalysisException(
s"CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory " +
s"${tablePath} . To allow overwriting the existing non-empty directory, " +
s"set '$allowNonEmptyLocationInCTASKey' to true.")
throw RapidsErrorUtils.
createTableAsSelectWithNonEmptyDirectoryError(tablePath.toString,
allowNonEmptyLocationInCTASKey)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuWriteJobStatsTracker
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.shims.RapidsErrorUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -82,10 +82,9 @@ object GpuRunnableCommand {
if (fs.exists(filePath) &&
fs.getFileStatus(filePath).isDirectory &&
fs.listStatus(filePath).length != 0) {
TrampolineUtil.throwAnalysisException(
s"CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory " +
s"${tablePath} . To allow overwriting the existing non-empty directory, " +
s"set '$allowNonEmptyLocationInCTASKey' to true.")
throw RapidsErrorUtils.
createTableAsSelectWithNonEmptyDirectoryError(tablePath.toString,
allowNonEmptyLocationInCTASKey)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,9 +19,9 @@ package org.apache.spark.sql.hive.rapids
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.rapids.shims.RapidsErrorUtils
import org.apache.spark.sql.types.{DataType, DoubleType, FloatType, StringType}

object RapidsHiveErrors {
Expand Down Expand Up @@ -53,8 +53,7 @@ object RapidsHiveErrors {
}

def cannotResolveAttributeError(name: String, outputStr: String): Throwable = {
new AnalysisException(
s"Unable to resolve $name given [$outputStr]")
throw RapidsErrorUtils.cannotResolveAttributeError(name, outputStr)
}

def writePartitionExceedConfigSizeWhenDynamicPartitionError(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.shims.SchemaUtilsShims
import org.apache.spark.sql.rapids.shims.{RapidsErrorUtils, SchemaUtilsShims}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{HadoopFSUtils, ThreadUtils, Utils}
Expand Down Expand Up @@ -144,8 +144,8 @@ abstract class GpuDataSourceBase(
}
inferredOpt
}.getOrElse {
throw new AnalysisException(s"Failed to resolve the schema for $format for " +
s"the partition column: $partitionColumn. It must be specified manually.")
throw RapidsErrorUtils.
partitionColumnNotSpecifiedError(format.toString, partitionColumn)
}
}
StructType(partitionFields)
Expand All @@ -162,8 +162,7 @@ abstract class GpuDataSourceBase(
caseInsensitiveOptions - "path",
SparkShimImpl.filesFromFileIndex(tempFileIndex))
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format. It must be specified manually.")
throw RapidsErrorUtils.dataSchemaNotSpecifiedError(format.toString)
}

// We just print a waring message if the data schema and partition schema have the duplicate
Expand Down Expand Up @@ -201,17 +200,13 @@ abstract class GpuDataSourceBase(
case (dataSource: RelationProvider, None) =>
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
case (_: SchemaRelationProvider, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $className.")
throw RapidsErrorUtils.schemaNotSpecifiedForSchemaRelationProviderError(className)
case (dataSource: RelationProvider, Some(schema)) =>
val baseRelation =
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
if (!DataType.equalsIgnoreCompatibleNullability(baseRelation.schema, schema)) {
throw new AnalysisException(
"The user-specified schema doesn't match the actual schema: " +
s"user-specified: ${schema.toDDL}, actual: ${baseRelation.schema.toDDL}. If " +
"you're using DataFrameReader.schema API or creating a table, please do not " +
"specify the schema. Or if you're scanning an existed table, please drop " +
"it and re-create it.")
throw RapidsErrorUtils.userSpecifiedSchemaMismatchActualSchemaError(schema,
baseRelation.schema)
}
baseRelation

Expand All @@ -233,9 +228,8 @@ abstract class GpuDataSourceBase(
caseInsensitiveOptions - "path",
SparkShimImpl.filesFromFileIndex(fileCatalog))
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " +
"It must be specified manually")
throw RapidsErrorUtils.
dataSchemaNotSpecifiedError(format.toString, fileCatalog.allFiles().mkString(","))
}

HadoopFsRelation(
Expand Down Expand Up @@ -276,8 +270,7 @@ abstract class GpuDataSourceBase(
caseInsensitiveOptions)(sparkSession)

case _ =>
throw new AnalysisException(
s"$className is not a valid Spark SQL Data Source.")
throw RapidsErrorUtils.invalidDataSourceError(className)
}

relation match {
Expand Down Expand Up @@ -411,22 +404,13 @@ object GpuDataSourceBase extends Logging {
dataSource
case Failure(error) =>
if (provider1.startsWith("org.apache.spark.sql.hive.orc")) {
throw new AnalysisException(
"Hive built-in ORC data source must be used with Hive support enabled. " +
"Please use the native ORC data source by setting 'spark.sql.orc.impl' to " +
"'native'")
throw RapidsErrorUtils.orcNotUsedWithHiveEnabledError()
} else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
provider1 == "com.databricks.spark.avro" ||
provider1 == "org.apache.spark.sql.avro") {
throw new AnalysisException(
s"Failed to find data source: $provider1. Avro is built-in but external data " +
"source module since Spark 2.4. Please deploy the application as per " +
"the deployment section of \"Apache Avro Data Source Guide\".")
throw RapidsErrorUtils.failedToFindAvroDataSourceError(provider1)
} else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
throw new AnalysisException(
s"Failed to find data source: $provider1. Please deploy the application as " +
"per the deployment section of " +
"\"Structured Streaming + Kafka Integration Guide\".")
throw RapidsErrorUtils.failedToFindKafkaDataSourceError(provider1)
} else {
throw new ClassNotFoundException(
s"Failed to find data source: $provider1. Please find packages at " +
Expand Down Expand Up @@ -459,8 +443,7 @@ object GpuDataSourceBase extends Logging {
s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
internalSources.head.getClass
} else {
throw new AnalysisException(s"Multiple sources found for $provider1 " +
s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
throw RapidsErrorUtils.findMultipleDataSourceError(provider1, sourceNames)
}
}
} catch {
Expand Down Expand Up @@ -513,7 +496,7 @@ object GpuDataSourceBase extends Logging {
}

if (checkEmptyGlobPath && globResult.isEmpty) {
throw new AnalysisException(s"Path does not exist: $globPath")
throw RapidsErrorUtils.dataPathNotExistError(globPath.toString)
}

globResult
Expand All @@ -527,7 +510,7 @@ object GpuDataSourceBase extends Logging {
ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", numThreads) { path =>
val fs = path.getFileSystem(hadoopConf)
if (!fs.exists(path)) {
throw new AnalysisException(s"Path does not exist: $path")
throw RapidsErrorUtils.dataPathNotExistError(path.toString)
}
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,7 @@ import com.nvidia.spark.rapids.{ColumnarFileFormat, GpuDataWritingCommand}
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionPathString
Expand All @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, CommandUtils}
import org.apache.spark.sql.execution.datasources.{FileFormatWriter, FileIndex, PartitioningUtils}
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.shims.SchemaUtilsShims
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand Down Expand Up @@ -121,7 +122,7 @@ case class GpuInsertIntoHadoopFsRelationCommand(
val pathExists = fs.exists(qualifiedOutputPath)
(mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
TrampolineUtil.throwAnalysisException(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
false
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,8 +23,8 @@ import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.shims.ShimUnaryExpression

import org.apache.spark.TaskContext
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionWithRandomSeed}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -52,7 +52,7 @@ case class GpuRand(child: Expression) extends ShimUnaryExpression with GpuExpres
@transient protected lazy val seed: Long = child match {
case GpuLiteral(s, IntegerType) => s.asInstanceOf[Int]
case GpuLiteral(s, LongType) => s.asInstanceOf[Long]
case _ => throw new AnalysisException(
case _ => TrampolineUtil.throwAnalysisException(
s"Input argument to $prettyName must be an integer, long or null literal.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,6 @@ object TrampolineUtil {
def postEvent(sc: SparkContext, sparkEvent: SparkListenerEvent): Unit = {
sc.listenerBus.post(sparkEvent)
}

class RapidsAnalysisException(msg: String) extends AnalysisException(msg)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.shims.RapidsErrorUtils
import org.apache.spark.sql.types._

object ParquetSchemaClipShims {
Expand Down Expand Up @@ -67,10 +68,10 @@ object ParquetSchemaClipShims {
TrampolineUtil.throwAnalysisException(s"Parquet type not supported: $typeString")

def typeNotImplemented() =
TrampolineUtil.throwAnalysisException(s"Parquet type not yet supported: $typeString")
throw RapidsErrorUtils.parquetTypeUnsupportedYetError(typeString)

def illegalType() =
TrampolineUtil.throwAnalysisException(s"Illegal Parquet type: $typeString")
throw RapidsErrorUtils.illegalParquetTypeError(typeString)

// When maxPrecision = -1, we skip precision range check, and always respect the precision
// specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored
Expand All @@ -80,8 +81,7 @@ object ParquetSchemaClipShims {
val scale = field.getDecimalMetadata.getScale

if (!(maxPrecision == -1 || 1 <= precision && precision <= maxPrecision)) {
TrampolineUtil.throwAnalysisException(
s"Invalid decimal precision: $typeName " +
TrampolineUtil.throwAnalysisException(s"Invalid decimal precision: $typeName " +
s"cannot store $precision digits (max $maxPrecision)")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg
import org.apache.hadoop.hive.ql.plan.TableDesc

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils, ExternalCatalogWithListener}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand All @@ -58,6 +58,7 @@ import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.client.hive._
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.spark.sql.hive.rapids.{GpuHiveTextFileFormat, GpuSaveAsHiveFile, RapidsHiveErrors}
import org.apache.spark.sql.rapids.shims.RapidsErrorUtils
import org.apache.spark.sql.vectorized.ColumnarBatch

final class GpuInsertIntoHiveTableMeta(cmd: InsertIntoHiveTable,
Expand Down Expand Up @@ -192,7 +193,7 @@ case class GpuInsertIntoHiveTable(
// Report error if any static partition appears after a dynamic partition
val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
throw RapidsErrorUtils.dynamicPartitionParentError
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.types.{DataType, Decimal, DecimalType}

object RapidsErrorUtils {
object RapidsErrorUtils extends RapidsQueryErrorUtils {
def invalidArrayIndexError(index: Int, numElements: Int,
isElementAtF: Boolean = false): ArrayIndexOutOfBoundsException = {
// Follow the Spark string format before 3.3.0
Expand Down
Loading