diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index d03c90c92b7..65fc266d497 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql -import java.io.File - import scala.language.implicitConversions +import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.CarbonLateDecodeStrategy @@ -32,38 +31,41 @@ import org.apache.spark.sql.types.{DecimalType, StructType} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.CarbonOption +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException /** * Carbon relation provider compliant to data source api. * Creates carbon relations */ class CarbonSource extends CreatableRelationProvider - with SchemaRelationProvider with DataSourceRegister { + with SchemaRelationProvider with DataSourceRegister { override def shortName(): String = "carbondata" // called by any write operation like INSERT INTO DDL or DataFrame.write API override def createRelation( - sqlContext: SQLContext, - mode: SaveMode, - parameters: Map[String, String], - data: DataFrame): BaseRelation = { + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { CarbonEnv.init(sqlContext.sparkSession) // User should not specify path since only one store is supported in carbon currently, // after we support multi-store, we can remove this limitation require(!parameters.contains("path"), "'path' should not be specified, " + - "the path to store carbon file is the 'storePath' specified when creating CarbonContext") + "the path to store carbon file is the 'storePath' " + + "specified when creating CarbonContext") val options = new CarbonOption(parameters) val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION) val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName) val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - .exists(tablePath) + .exists(tablePath) val (doSave, doAppend) = (mode, isExists) match { case (SaveMode.ErrorIfExists, true) => sys.error(s"ErrorIfExists mode, path $storePath already exists.") case (SaveMode.Overwrite, true) => - sqlContext.sparkSession.sql(s"DROP TABLE IF EXISTS ${options.dbName}.${options.tableName}") + sqlContext.sparkSession + .sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }") (true, false) case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) => (true, false) @@ -85,9 +87,9 @@ class CarbonSource extends CreatableRelationProvider // called by DDL operation with a USING clause override def createRelation( - sqlContext: SQLContext, - parameters: Map[String, String], - dataSchema: StructType): BaseRelation = { + sqlContext: SQLContext, + parameters: Map[String, String], + dataSchema: StructType): BaseRelation = { CarbonEnv.init(sqlContext.sparkSession) addLateDecodeOptimization(sqlContext.sparkSession) val path = createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema) @@ -104,10 +106,13 @@ class CarbonSource extends CreatableRelationProvider } private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String], - dataSchema: StructType): String = { + dataSchema: StructType): String = { val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME) val tableName: String = parameters.getOrElse("tableName", "default_table") + if (StringUtils.isBlank(tableName)) { + throw new MalformedCarbonCommandException("The Specified Table Name is Blank") + } val options = new CarbonOption(parameters) try { CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession)