Skip to content

Commit

Permalink
add sort columns option in dataframe writer
Browse files Browse the repository at this point in the history
  • Loading branch information
jackylk committed Apr 8, 2017
1 parent 12bf455 commit 4d35b0f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ object CompareTest {
.option("tempCSV", "false")
.option("single_pass", "true")
.option("dictionary_exclude", "id") // id is high cardinality column
.option("sort_columns", "")
.mode(SaveMode.Overwrite)
.save()
}
Expand All @@ -278,6 +279,7 @@ object CompareTest {
val loadParquetTime = loadParquetTable(spark, df)
val loadCarbonV3Time = loadCarbonTable(spark, df, version = "3")
println(s"load completed, time: $loadParquetTime, $loadCarbonV3Time")
df.unpersist()
spark.read.parquet(parquetTableName).registerTempTable(parquetTableName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,34 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
)
}

test("test load dataframe without sort") {
df.write
.format("carbondata")
.option("tableName", "carbon3")
.option("sort_columns", "")
.mode(SaveMode.Overwrite)
.save()
sql("select count(*) from carbon3 where c3 > 400").show
df.registerTempTable("temp")
sql("select count(*) from temp where c3 > 400").show
//sql("select * from carbon3 where c3 > 500").show
checkAnswer(
sql("select count(*) from carbon3 where c3 > 500"), Row(500)
)
}

test("test load dataframe using sort_columns") {
df.write
.format("carbondata")
.option("tableName", "carbon3")
.option("sort_columns", "c2, c3")
.mode(SaveMode.Overwrite)
.save()
checkAnswer(
sql("select count(*) from carbon3 where c3 > 500"), Row(500)
)
}

test("test decimal values for dataframe load"){
dataFrame.write
.format("carbondata")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,7 @@ class CarbonOption(options: Map[String, String]) {
def isBucketingEnabled: Boolean = options.contains("bucketcolumns") &&
options.contains("bucketnumber")

def sortColumns: Option[String] = options.get("sort_columns")

def toMap: Map[String, String] = options
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import scala.collection.mutable

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.execution.command.LoadTable
Expand All @@ -33,8 +35,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {

def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
// create a new table using dataframe's schema and write its content into the table
sqlContext.sparkSession.sql(
makeCreateTableString(dataFrame.schema, new CarbonOption(parameters)))
val sqlString = makeCreateTableString(dataFrame.schema, new CarbonOption(parameters))
sqlContext.sparkSession.sql(sqlString)
writeToCarbonFile(parameters)
}

Expand Down Expand Up @@ -84,7 +86,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB")

try {
sqlContext.sql(makeLoadString(tempCSVFolder, options))
val sqlString = makeLoadString(tempCSVFolder, options)
sqlContext.sql(sqlString)
} finally {
fs.delete(tempCSVPath, true)
}
Expand Down Expand Up @@ -162,26 +165,22 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
val carbonSchema = schema.map { field =>
s"${ field.name } ${ convertToCarbonType(field.dataType) }"
}
val property = new StringBuilder
property.append(
if (options.dictionaryInclude.isDefined) {
s"'DICTIONARY_INCLUDE' = '${options.dictionaryInclude.get}' ,"
} else {
""
}
).append(
if (options.dictionaryExclude.isDefined) {
s"'DICTIONARY_EXCLUDE' = '${options.dictionaryExclude.get}'"
} else {
""
}
)
val property = mutable.Set[String]()
if (options.dictionaryInclude.isDefined) {
property += s"'DICTIONARY_INCLUDE' = '${options.dictionaryInclude.get}'"
}
if (options.dictionaryExclude.isDefined) {
property += s"'DICTIONARY_EXCLUDE' = '${options.dictionaryExclude.get}'"
}
if (options.sortColumns.isDefined) {
property += s"'SORT_COLUMNS' = '${options.sortColumns.get}'"
}

s"""
| CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
| (${ carbonSchema.mkString(", ") })
| STORED BY 'carbondata'
| ${ if (property.nonEmpty) "TBLPROPERTIES (" + property + ")" else "" }
| ${ if (property.nonEmpty) "TBLPROPERTIES (" + property.mkString(",") + ")" else "" }
""".stripMargin
}

Expand Down

0 comments on commit 4d35b0f

Please sign in to comment.