Skip to content

Commit

Permalink
added validation for table is not empty and don not contain any spaces
Browse files Browse the repository at this point in the history
reformatted the code

replace validation with string utils

removed java style error

removed unnecesary code

removed java style err

removed unused import

changed the exception message
  • Loading branch information
anubhav100 authored and jackylk committed Jan 14, 2017
1 parent b581465 commit bb0f83d
Showing 1 changed file with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.spark.sql

import java.io.File

import scala.language.implicitConversions

import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
Expand All @@ -32,38 +31,41 @@ import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException

/**
* Carbon relation provider compliant to data source api.
* Creates carbon relations
*/
class CarbonSource extends CreatableRelationProvider
with SchemaRelationProvider with DataSourceRegister {
with SchemaRelationProvider with DataSourceRegister {

override def shortName(): String = "carbondata"

// called by any write operation like INSERT INTO DDL or DataFrame.write API
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
CarbonEnv.init(sqlContext.sparkSession)
// User should not specify path since only one store is supported in carbon currently,
// after we support multi-store, we can remove this limitation
require(!parameters.contains("path"), "'path' should not be specified, " +
"the path to store carbon file is the 'storePath' specified when creating CarbonContext")
"the path to store carbon file is the 'storePath' " +
"specified when creating CarbonContext")

val options = new CarbonOption(parameters)
val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
.exists(tablePath)
.exists(tablePath)
val (doSave, doAppend) = (mode, isExists) match {
case (SaveMode.ErrorIfExists, true) =>
sys.error(s"ErrorIfExists mode, path $storePath already exists.")
case (SaveMode.Overwrite, true) =>
sqlContext.sparkSession.sql(s"DROP TABLE IF EXISTS ${options.dbName}.${options.tableName}")
sqlContext.sparkSession
.sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }")
(true, false)
case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
(true, false)
Expand All @@ -85,9 +87,9 @@ class CarbonSource extends CreatableRelationProvider

// called by DDL operation with a USING clause
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
dataSchema: StructType): BaseRelation = {
sqlContext: SQLContext,
parameters: Map[String, String],
dataSchema: StructType): BaseRelation = {
CarbonEnv.init(sqlContext.sparkSession)
addLateDecodeOptimization(sqlContext.sparkSession)
val path = createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
Expand All @@ -104,10 +106,13 @@ class CarbonSource extends CreatableRelationProvider
}

private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
dataSchema: StructType): String = {
dataSchema: StructType): String = {

val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
val tableName: String = parameters.getOrElse("tableName", "default_table")
if (StringUtils.isBlank(tableName)) {
throw new MalformedCarbonCommandException("The Specified Table Name is Blank")
}
val options = new CarbonOption(parameters)
try {
CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession)
Expand Down

0 comments on commit bb0f83d

Please sign in to comment.