Skip to content

Commit

Permalink
[CARBONDATA-584]added validation for table is not empty This closes #511
Browse files Browse the repository at this point in the history
  • Loading branch information
jackylk committed Jan 14, 2017
2 parents b581465 + bb0f83d commit 392bc29
Showing 1 changed file with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.spark.sql

import java.io.File

import scala.language.implicitConversions

import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
Expand All @@ -32,38 +31,41 @@ import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException

/**
* Carbon relation provider compliant to data source api.
* Creates carbon relations
*/
class CarbonSource extends CreatableRelationProvider
with SchemaRelationProvider with DataSourceRegister {
with SchemaRelationProvider with DataSourceRegister {

override def shortName(): String = "carbondata"

// called by any write operation like INSERT INTO DDL or DataFrame.write API
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
CarbonEnv.init(sqlContext.sparkSession)
// User should not specify path since only one store is supported in carbon currently,
// after we support multi-store, we can remove this limitation
require(!parameters.contains("path"), "'path' should not be specified, " +
"the path to store carbon file is the 'storePath' specified when creating CarbonContext")
"the path to store carbon file is the 'storePath' " +
"specified when creating CarbonContext")

val options = new CarbonOption(parameters)
val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
.exists(tablePath)
.exists(tablePath)
val (doSave, doAppend) = (mode, isExists) match {
case (SaveMode.ErrorIfExists, true) =>
sys.error(s"ErrorIfExists mode, path $storePath already exists.")
case (SaveMode.Overwrite, true) =>
sqlContext.sparkSession.sql(s"DROP TABLE IF EXISTS ${options.dbName}.${options.tableName}")
sqlContext.sparkSession
.sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }")
(true, false)
case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
(true, false)
Expand All @@ -85,9 +87,9 @@ class CarbonSource extends CreatableRelationProvider

// called by DDL operation with a USING clause
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
dataSchema: StructType): BaseRelation = {
sqlContext: SQLContext,
parameters: Map[String, String],
dataSchema: StructType): BaseRelation = {
CarbonEnv.init(sqlContext.sparkSession)
addLateDecodeOptimization(sqlContext.sparkSession)
val path = createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
Expand All @@ -104,10 +106,13 @@ class CarbonSource extends CreatableRelationProvider
}

private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
dataSchema: StructType): String = {
dataSchema: StructType): String = {

val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
val tableName: String = parameters.getOrElse("tableName", "default_table")
if (StringUtils.isBlank(tableName)) {
throw new MalformedCarbonCommandException("The Specified Table Name is Blank")
}
val options = new CarbonOption(parameters)
try {
CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession)
Expand Down

0 comments on commit 392bc29

Please sign in to comment.