-
Notifications
You must be signed in to change notification settings - Fork 703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CARBONDATA-584]added validation for table is not empty #511
Changes from 7 commits
05bd1ea
da7714a
b288314
e731b53
d43393f
f987069
c215739
134856f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,10 +17,9 @@ | |
|
||
package org.apache.spark.sql | ||
|
||
import java.io.File | ||
|
||
import scala.language.implicitConversions | ||
|
||
import org.apache.commons.lang.StringUtils | ||
import org.apache.hadoop.fs.Path | ||
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException | ||
import org.apache.spark.sql.execution.CarbonLateDecodeStrategy | ||
|
@@ -32,38 +31,41 @@ import org.apache.spark.sql.types.{DecimalType, StructType} | |
import org.apache.carbondata.core.constants.CarbonCommonConstants | ||
import org.apache.carbondata.core.util.CarbonProperties | ||
import org.apache.carbondata.spark.CarbonOption | ||
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException | ||
|
||
/** | ||
* Carbon relation provider compliant to data source api. | ||
* Creates carbon relations | ||
*/ | ||
class CarbonSource extends CreatableRelationProvider | ||
with SchemaRelationProvider with DataSourceRegister { | ||
with SchemaRelationProvider with DataSourceRegister { | ||
|
||
override def shortName(): String = "carbondata" | ||
|
||
// called by any write operation like INSERT INTO DDL or DataFrame.write API | ||
override def createRelation( | ||
sqlContext: SQLContext, | ||
mode: SaveMode, | ||
parameters: Map[String, String], | ||
data: DataFrame): BaseRelation = { | ||
sqlContext: SQLContext, | ||
mode: SaveMode, | ||
parameters: Map[String, String], | ||
data: DataFrame): BaseRelation = { | ||
CarbonEnv.init(sqlContext.sparkSession) | ||
// User should not specify path since only one store is supported in carbon currently, | ||
// after we support multi-store, we can remove this limitation | ||
require(!parameters.contains("path"), "'path' should not be specified, " + | ||
"the path to store carbon file is the 'storePath' specified when creating CarbonContext") | ||
"the path to store carbon file is the 'storePath' " + | ||
"specified when creating CarbonContext") | ||
|
||
val options = new CarbonOption(parameters) | ||
val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION) | ||
val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName) | ||
val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) | ||
.exists(tablePath) | ||
.exists(tablePath) | ||
val (doSave, doAppend) = (mode, isExists) match { | ||
case (SaveMode.ErrorIfExists, true) => | ||
sys.error(s"ErrorIfExists mode, path $storePath already exists.") | ||
case (SaveMode.Overwrite, true) => | ||
sqlContext.sparkSession.sql(s"DROP TABLE IF EXISTS ${options.dbName}.${options.tableName}") | ||
sqlContext.sparkSession | ||
.sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }") | ||
(true, false) | ||
case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) => | ||
(true, false) | ||
|
@@ -85,9 +87,9 @@ class CarbonSource extends CreatableRelationProvider | |
|
||
// called by DDL operation with a USING clause | ||
override def createRelation( | ||
sqlContext: SQLContext, | ||
parameters: Map[String, String], | ||
dataSchema: StructType): BaseRelation = { | ||
sqlContext: SQLContext, | ||
parameters: Map[String, String], | ||
dataSchema: StructType): BaseRelation = { | ||
CarbonEnv.init(sqlContext.sparkSession) | ||
addLateDecodeOptimization(sqlContext.sparkSession) | ||
val path = createTableIfNotExists(sqlContext.sparkSession, parameters, dataSchema) | ||
|
@@ -104,14 +106,17 @@ class CarbonSource extends CreatableRelationProvider | |
} | ||
|
||
private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String], | ||
dataSchema: StructType): String = { | ||
dataSchema: StructType): String = { | ||
|
||
val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME) | ||
val tableName: String = parameters.getOrElse("tableName", "default_table") | ||
val emptyTableName: String = parameters.getOrElse("tableName", "default_table") | ||
if (StringUtils.isBlank(emptyTableName)) { | ||
throw new MalformedCarbonCommandException("INVALID TABLE NAME") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can say it is empty table name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jackylk i think emptyTableName will make less sense because it is not gurranted that it will be empty There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the if check is true, it means the table name passing by user is blank There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there is misunderstanding, I suggest to give "the specified table name is blank" in the exception message. Not suggesting to change the variable name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there is misunderstanding, I suggest to give "the specified table name is blank" in the exception message. Not suggesting to change the variable name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jackylk oh get that now thats why i am asking you i m correcting it now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jackylk corrected it sry for mistake |
||
} | ||
val options = new CarbonOption(parameters) | ||
try { | ||
CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession) | ||
CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName" | ||
CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), emptyTableName)(sparkSession) | ||
CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$emptyTableName" | ||
} catch { | ||
case ex: NoSuchTableException => | ||
val fields = dataSchema.map { col => | ||
|
@@ -140,9 +145,9 @@ class CarbonSource extends CreatableRelationProvider | |
} | ||
} | ||
val cm = TableCreator.prepareTableModel(false, Option(dbName), | ||
tableName, fields, Nil, bucketFields, map) | ||
emptyTableName, fields, Nil, bucketFields, map) | ||
CreateTable(cm, false).run(sparkSession) | ||
CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName" | ||
CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$emptyTableName" | ||
case ex: Exception => | ||
throw new Exception("do not have dbname and tablename for carbon table", ex) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can say it is empty table name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jackylk sure i am correcting it