Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-537][SPARK2] Fix bug for DICTIONARY_EXCLUDE option in spark2 #440

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,9 @@ class CarbonGlobalDictionaryGenerateRDD(
model.primDimensions(split.index).getColName
}")
} else {
sys
.error(s"Dictionary file ${
model.primDimensions(split.index).getColName
} is locked for updation. Please try after some time")
sys.error(s"Dictionary file lock($dictLock) for " +
s"${model.primDimensions(split.index).getColName} is locked." +
s" Please try after some time or remove this lock file")
}
val t2 = System.currentTimeMillis
val fileType = FileFactory.getFileType(model.dictFilePaths(split.index))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
}

/**
* detects whether double or decimal column is part of dictionary_exclude
* dictionary_exclude is supported for string, timestamp only
*/
def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
val dataTypes = Array("string", "timestamp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class CarbonSource extends CreatableRelationProvider
}
f
}
val map = scala.collection.mutable.Map[String, String]();
val map = scala.collection.mutable.Map[String, String]()
parameters.foreach { x => map.put(x._1, x._2) }
val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map)
CreateTable(cm).run(sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ object TableCreator {

// detects whether complex dimension is part of dictionary_exclude
def isComplexDimDictionaryExclude(dimensionDataType: String): Boolean = {
val dimensionType = Array("array", "struct")
val dimensionType = Array("arraytype", "structtype")
dimensionType.exists(x => x.equalsIgnoreCase(dimensionDataType))
}

// detects whether double or decimal column is part of dictionary_exclude
// dictionary_exclude is only supported for string, timestamp, date data type
def isStringAndTimestampColDictionaryExclude(columnDataType: String): Boolean = {
val dataTypes = Array("string", "timestamp", "date", "stringtype", "timestamptype", "datetype")
val dataTypes = Array("stringtype", "timestamptype", "datetype")
dataTypes.exists(x => x.equalsIgnoreCase(columnDataType))
}

Expand All @@ -62,33 +62,33 @@ object TableCreator {
// All excluded cols should be there in create table cols
if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
dictExcludeCols =
tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
dictExcludeCols
.map { dictExcludeCol =>
if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
" does not exist in table. Please check create table statement."
tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
dictExcludeCols.foreach { dictExcludeCol =>
if (!fields.exists(x => x.column.equalsIgnoreCase(dictExcludeCol))) {
val errormsg = "DICTIONARY_EXCLUDE column: " + dictExcludeCol +
" does not exist in table. Please check create table statement."
throw new MalformedCarbonCommandException(errormsg)
} else {
val dataType = fields.find { x =>
x.column.equalsIgnoreCase(dictExcludeCol)
}.get.dataType.get
if (isComplexDimDictionaryExclude(dataType)) {
val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " +
dictExcludeCol
throw new MalformedCarbonCommandException(errormsg)
} else {
val dataType = fields.find(x =>
x.column.equalsIgnoreCase(dictExcludeCol)).get.dataType.get
if (isComplexDimDictionaryExclude(dataType)) {
val errormsg = "DICTIONARY_EXCLUDE is unsupported for complex datatype column: " +
dictExcludeCol
throw new MalformedCarbonCommandException(errormsg)
} else if (!isStringAndTimestampColDictionaryExclude(dataType)) {
val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() +
" data type column: " + dictExcludeCol
throw new MalformedCarbonCommandException(errorMsg)
}
} else if (!isStringAndTimestampColDictionaryExclude(dataType)) {
val errorMsg = "DICTIONARY_EXCLUDE is unsupported for " + dataType.toLowerCase() +
" data type column: " + dictExcludeCol
throw new MalformedCarbonCommandException(errorMsg)
}
}
}
}
// All included cols should be there in create table cols
if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
dictIncludeCols =
tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(",").map(_.trim)
dictIncludeCols.map { distIncludeCol =>
tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(",").map(_.trim)
dictIncludeCols.foreach { distIncludeCol =>
if (!fields.exists(x => x.column.equalsIgnoreCase(distIncludeCol.trim))) {
val errormsg = "DICTIONARY_INCLUDE column: " + distIncludeCol.trim +
" does not exist in table. Please check create table statement."
Expand Down Expand Up @@ -117,9 +117,9 @@ object TableCreator {
}
dimFields += field
} else if (dictIncludeCols.exists(x => x.equalsIgnoreCase(field.column))) {
dimFields += (field)
dimFields += field
} else if (isDetectAsDimentionDatatype(field.dataType.get)) {
dimFields += (field)
dimFields += field
}
}
)
Expand All @@ -143,13 +143,13 @@ object TableCreator {
// get all included cols
if (tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).isDefined) {
dictIncludedCols =
tableProperties.get(CarbonCommonConstants.DICTIONARY_INCLUDE).get.split(',').map(_.trim)
tableProperties(CarbonCommonConstants.DICTIONARY_INCLUDE).split(',').map(_.trim)
}

// get all excluded cols
if (tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).isDefined) {
dictExcludedCols =
tableProperties.get(CarbonCommonConstants.DICTIONARY_EXCLUDE).get.split(',').map(_.trim)
tableProperties(CarbonCommonConstants.DICTIONARY_EXCLUDE).split(',').map(_.trim)
}

// by default consider all non string cols as msrs. consider all include/ exclude cols as dims
Expand Down Expand Up @@ -264,7 +264,7 @@ object TableCreator {
if (tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).isDefined) {

var splittedColGrps: Seq[String] = Seq[String]()
val nonSplitCols: String = tableProperties.get(CarbonCommonConstants.COLUMN_GROUPS).get
val nonSplitCols: String = tableProperties(CarbonCommonConstants.COLUMN_GROUPS)

// row groups will be specified in table properties like -> "(col1,col2),(col3,col4)"
// here first splitting the value by () . so that the above will be splitted into 2 strings.
Expand Down Expand Up @@ -313,9 +313,8 @@ object TableCreator {

if (tableProperties.get("NO_INVERTED_INDEX").isDefined) {
noInvertedIdxColsProps =
tableProperties.get("NO_INVERTED_INDEX").get.split(',').map(_.trim)
noInvertedIdxColsProps
.map { noInvertedIdxColProp =>
tableProperties("NO_INVERTED_INDEX").split(',').map(_.trim)
noInvertedIdxColsProps.foreach { noInvertedIdxColProp =>
if (!fields.exists(x => x.column.equalsIgnoreCase(noInvertedIdxColProp))) {
val errormsg = "NO_INVERTED_INDEX column: " + noInvertedIdxColProp +
" does not exist in table. Please check create table statement."
Expand Down Expand Up @@ -357,11 +356,11 @@ object TableCreator {
field.storeType
)
case "array" => Field(field.column, Some("Array"), field.name,
field.children.map(f => f.map(normalizeType(_))),
field.children.map(f => f.map(normalizeType)),
field.parent, field.storeType
)
case "struct" => Field(field.column, Some("Struct"), field.name,
field.children.map(f => f.map(normalizeType(_))),
field.children.map(f => f.map(normalizeType)),
field.parent, field.storeType
)
case "bigint" => Field(field.column, Some("BigInt"), field.name, Some(null), field.parent,
Expand All @@ -372,7 +371,7 @@ object TableCreator {
// checking if the nested data type contains the child type as decimal(10,0),
// if it is present then extracting the precision and scale. resetting the data type
// with Decimal.
case _ if (dataType.startsWith("decimal")) =>
case _ if dataType.startsWith("decimal") =>
val (precision, scale) = getScaleAndPrecision(dataType)
Field(field.column,
Some("Decimal"),
Expand Down Expand Up @@ -443,22 +442,17 @@ object TableCreator {
def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
, tableName: String, fields: Seq[Field],
partitionCols: Seq[PartitionerField],
tableProperties: Map[String, String]): TableModel
= {
tableProperties: Map[String, String]): TableModel = {

val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
fields, tableProperties)
if (dims.isEmpty) {
throw new MalformedCarbonCommandException(s"Table ${
dbName.getOrElse(
CarbonCommonConstants.DATABASE_DEFAULT_NAME)
}.$tableName"
+
" can not be created without key columns. Please " +
"use DICTIONARY_INCLUDE or " +
"DICTIONARY_EXCLUDE to set at least one key " +
"column " +
"if all specified columns are numeric types")
val errorMsg =
s"Table ${dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME)}.$tableName " +
"can not be created without key columns. " +
"Please use DICTIONARY_INCLUDE or DICTIONARY_EXCLUDE to set at least one key " +
"column if all specified columns are numeric types"
throw new MalformedCarbonCommandException(errorMsg)
}
val msrs: Seq[Field] = extractMsrColsFromFields(fields, tableProperties)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,13 @@ case class LoadTable(


val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
val kettleHomePath = CarbonScalaUtil.getKettleHome(sparkSession.sqlContext)
var kettleHomePath = ""

// TODO It will be removed after kettle is removed.
val useKettle = options.get("use_kettle") match {
case Some(value) => value.toBoolean
case Some(value) =>
kettleHomePath = CarbonScalaUtil.getKettleHome(sparkSession.sqlContext)
value.toBoolean
case _ =>
val useKettleLocal = System.getProperty("use.kettle")
if (useKettleLocal == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,22 @@ class CarbonDataSourceSuite extends FunSuite with BeforeAndAfterAll {
""".stripMargin)
}

test("dictionary_exclude option") {
try {
spark.sql(
"""
|create table test_option(i int, s1 string, s2 string, s3 string)
|using org.apache.spark.sql.CarbonSource
|options (dictionary_exclude 's1, s2, s3')
""".stripMargin)
spark.sql(
"""
|drop table test_option
""".stripMargin)
} catch {
case e: Exception =>
fail("unexpected exception: " + e)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,8 @@ public HdfsFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) {
return true;
}

@Override
public String toString() {
return "HdfsFileLock(" + location + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,8 @@ public LocalFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) {
return status;
}

@Override
public String toString() {
return "LocalFileLock(" + lockFilePath + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,9 @@ private void createRecursivly(String path) throws KeeperException, InterruptedEx
return true;
}

@Override
public String toString() {
return "ZooKeeperLocking(" + lockPath + ")";
}

}