Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-584]added validation for table is not empty #511

Closed
wants to merge 8 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.spark.sql

import java.io.File

import scala.language.implicitConversions

import org.apache.commons.lang.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
Expand All @@ -32,38 +31,41 @@ import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException

/**
* Carbon relation provider compliant to data source api.
* Creates carbon relations
*/
class CarbonSource extends CreatableRelationProvider
with SchemaRelationProvider with DataSourceRegister {
with SchemaRelationProvider with DataSourceRegister {

override def shortName(): String = "carbondata"

// called by any write operation like INSERT INTO DDL or DataFrame.write API
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
CarbonEnv.init(sqlContext.sparkSession)
// User should not specify path since only one store is supported in carbon currently,
// after we support multi-store, we can remove this limitation
require(!parameters.contains("path"), "'path' should not be specified, " +
"the path to store carbon file is the 'storePath' specified when creating CarbonContext")
"the path to store carbon file is the 'storePath' " +
"specified when creating CarbonContext")

val options = new CarbonOption(parameters)
val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
.exists(tablePath)
.exists(tablePath)
val (doSave, doAppend) = (mode, isExists) match {
case (SaveMode.ErrorIfExists, true) =>
sys.error(s"ErrorIfExists mode, path $storePath already exists.")
case (SaveMode.Overwrite, true) =>
sqlContext.sparkSession.sql(s"DROP TABLE IF EXISTS ${options.dbName}.${options.tableName}")
sqlContext.sparkSession
.sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }")
(true, false)
case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
(true, false)
Expand All @@ -85,9 +87,9 @@ class CarbonSource extends CreatableRelationProvider

// called by DDL operation with a USING clause
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String],
dataSchema: StructType): BaseRelation = {
sqlContext: SQLContext,
parameters: Map[String, String],
dataSchema: StructType): BaseRelation = {
CarbonEnv.init(sqlContext.sparkSession)
addLateDecodeOptimization(sqlContext.sparkSession)
val path = createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema)
Expand All @@ -104,10 +106,13 @@ class CarbonSource extends CreatableRelationProvider
}

private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
dataSchema: StructType): String = {
dataSchema: StructType): String = {

val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
val tableName: String = parameters.getOrElse("tableName", "default_table")
if (StringUtils.isBlank(tableName)) {
throw new MalformedCarbonCommandException("The Specified Table Name is Blank")
}
val options = new CarbonOption(parameters)
try {
CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession)
Expand Down