Skip to content

Commit

Permalink
Merge pull request apache#22 from QiangCai/wfmaster
Browse files Browse the repository at this point in the history
carbon api for spark2
  • Loading branch information
scwf authored Nov 30, 2016
2 parents b462960 + 554fc33 commit 1a649f3
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.command

import java.text.SimpleDateFormat
import java.util
import java.util.UUID

Expand Down Expand Up @@ -322,7 +323,7 @@ import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil,
*
* @param alterTableModel
*/
private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) extends
case class AlterTableCompaction(alterTableModel: AlterTableModel) extends
RunnableCommand {

def run(sparkSession: SparkSession): Seq[Row] = {
Expand Down Expand Up @@ -439,7 +440,7 @@ case class CreateTable(cm: TableModel) extends RunnableCommand {
}
}

private[sql] case class DeleteLoadsById(
case class DeleteLoadsById(
loadids: Seq[String],
databaseNameOp: Option[String],
tableName: String) extends RunnableCommand {
Expand Down Expand Up @@ -501,7 +502,7 @@ private[sql] case class DeleteLoadsById(
}
}

private[sql] case class DeleteLoadsByLoadDate(
case class DeleteLoadsByLoadDate(
databaseNameOp: Option[String],
tableName: String,
dateField: String,
Expand Down Expand Up @@ -916,7 +917,7 @@ private[sql] case class DeleteLoadByDate(

}

private[sql] case class CleanFiles(
case class CleanFiles(
databaseNameOp: Option[String],
tableName: String) extends RunnableCommand {

Expand Down Expand Up @@ -955,3 +956,64 @@ private[sql] case class CleanFiles(
Seq.empty
}
}

case class ShowLoads(
databaseNameOp: Option[String],
tableName: String,
limit: Option[String],
override val output: Seq[Attribute]) extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val databaseName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
val tableUniqueName = databaseName + "_" + tableName
// Here using checkSchemasModifiedTimeAndReloadTables in tableExists to reload metadata if
// schema is changed by other process, so that tableInfoMap woulb be refilled.
val tableExists = CarbonEnv.get.carbonMetastore
.tableExists(TableIdentifier(tableName, databaseNameOp))(sparkSession)
if (!tableExists) {
sys.error(s"$databaseName.$tableName is not found")
}
val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
.getCarbonTable(tableUniqueName)
if (carbonTable == null) {
sys.error(s"$databaseName.$tableName is not found")
}
val path = carbonTable.getMetaDataFilepath
val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path)
if (loadMetadataDetailsArray.nonEmpty) {

val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)

var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith(
(l1, l2) => java.lang.Double.parseDouble(l1.getLoadName) > java.lang.Double
.parseDouble(l2.getLoadName)
)


if (limit.isDefined) {
loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray
.filter(load => load.getVisibility.equalsIgnoreCase("true"))
val limitLoads = limit.get
try {
val lim = Integer.parseInt(limitLoads)
loadMetadataDetailsSortedArray = loadMetadataDetailsSortedArray.slice(0, lim)
} catch {
case ex: NumberFormatException => sys.error(s" Entered limit is not a valid Number")
}

}

loadMetadataDetailsSortedArray.filter(load => load.getVisibility.equalsIgnoreCase("true"))
.map(load =>
Row(
load.getLoadName,
load.getLoadStatus,
new java.sql.Timestamp(parser.parse(load.getLoadStartTime).getTime),
new java.sql.Timestamp(parser.parse(load.getTimestamp).getTime))).toSeq
} else {
Seq.empty

}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import org.apache.spark.sql.execution.command.{CleanFiles => TableCleanFiles}
import org.apache.spark.sql.{CarbonEnv, SparkSession}

/**
* clean files api
*/
object CleanFiles {

def cleanFiles(spark: SparkSession, dbName: Option[String], tableName: String): Unit = {
TableCleanFiles(dbName, tableName).run(spark)
}

def main(args: Array[String]): Unit = {

if (args.length < 2) {
System.err.println("Usage: TableCleanFiles <store path> <table name>");
System.exit(1)
}

val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val spark = TableAPIUtil.spark(storePath, s"TableCleanFiles: $dbName.$tableName")
CarbonEnv.init(spark.sqlContext)
cleanFiles(spark, Option(dbName), tableName)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util

import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.execution.command.{AlterTableCompaction, AlterTableModel}

/**
* table compaction api
*/
object Compaction {

def compaction(spark: SparkSession, dbName: Option[String], tableName: String,
compactionType: String): Unit = {
AlterTableCompaction(AlterTableModel(dbName, tableName, compactionType, "")).run(spark)
}

def main(args: Array[String]): Unit = {
if (args.length < 3) {
System.err.println("Usage: TableCompaction <store path> <table name> <major|minor>");
System.exit(1)
}

val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val compactionType = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"TableCompaction: $dbName.$tableName")
CarbonEnv.init(spark.sqlContext)
compaction(spark, Option(dbName), tableName, compactionType)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.apache.spark.util

import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.execution.command.DeleteLoadsByLoadDate

/**
* delete segments before some date
*/
object DeleteSegmentByDate {

def deleteSegmentByDate(spark: SparkSession, dbName: Option[String], tableName: String,
dateValue: String): Unit = {
DeleteLoadsByLoadDate(dbName, tableName, "", dateValue).run(spark)
}

def main(args: Array[String]): Unit = {
if (args.length < 3) {
System.err.println(
"Usage: TableDeleteSegmentByDate <store path> <table name> <before date value>");
System.exit(1)
}

val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val dateValue = TableAPIUtil.escape(args(2))
val spark = TableAPIUtil.spark(storePath, s"TableCleanFiles: $dbName.$tableName")
CarbonEnv.init(spark.sqlContext)
deleteSegmentByDate(spark, Option(dbName), tableName, dateValue)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.apache.spark.util

import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.execution.command.DeleteLoadsById

/**
* delete segments by id list
*/
object DeleteSegmentById {

def extractSegmentIds(segmentIds: String): Seq[String] = {
segmentIds.split(",").toSeq
}

def deleteSegmentById(spark: SparkSession, dbName: Option[String], tableName: String,
segmentIds: Seq[String]): Unit = {
DeleteLoadsById(segmentIds, dbName, tableName).run(spark)
}

def main(args: Array[String]): Unit = {

if (args.length < 3) {
System.err.println(
"Usage: TableDeleteSegmentByID <store path> <table name> <segment id list>");
System.exit(1)
}

val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))
val segmentIds = extractSegmentIds(TableAPIUtil.escape(args(2)))
val spark = TableAPIUtil.spark(storePath, s"TableDeleteSegmentById: $dbName.$tableName")
CarbonEnv.init(spark.sqlContext)
deleteSegmentById(spark, Option(dbName), tableName, segmentIds)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.command.ShowLoads
import org.apache.spark.sql.types.{StringType, TimestampType}
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}

object ShowSegments {

def showSegments(spark: SparkSession, dbName: Option[String], tableName: String,
limit: Option[String]): Seq[Row] = {
val output = Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(),
AttributeReference("Status", StringType, nullable = false)(),
AttributeReference("Load Start Time", TimestampType, nullable = false)(),
AttributeReference("Load End Time", TimestampType, nullable = false)())
ShowLoads(dbName, tableName, limit: Option[String], output).run(spark)
}

def showString(rows: Seq[Row]): String = {
val sb = new StringBuilder
sb.append("+-----------------+---------------+---------------------+---------------------+\n")
.append("|SegmentSequenceId|Status |Load Start Time |Load End Time |\n")
.append("+-----------------+---------------+---------------------+---------------------+\n")
rows.foreach{row =>
sb.append("|")
.append(StringUtils.rightPad(row.getString(0), 17))
.append("|")
.append(StringUtils.rightPad(row.getString(1).substring(0, 15), 15))
.append("|")
.append(row.getAs[java.sql.Timestamp](2).formatted("yyyy-MM-dd HH:mm:ss.s"))
.append("|")
.append(row.getAs[java.sql.Timestamp](3).formatted("yyyy-MM-dd HH:mm:ss.s"))
.append("|\n")
}
sb.append("+-----------------+---------------+---------------------+---------------------+\n")
sb.toString
}

def parseLimit(limit: String): Int = {
Integer.parseInt(limit)
}

def main(args: Array[String]): Unit = {

if (args.length < 2) {
System.err.println("Usage: ShowSegments <store path> <table name> [limit]");
System.exit(1)
}

val storePath = TableAPIUtil.escape(args(0))
val (dbName, tableName) = TableAPIUtil.parseSchemaName(TableAPIUtil.escape(args(1)))

val limit = if (args.length >= 3 ) {
Some(TableAPIUtil.escape(args(2)))
} else {
None
}
val spark = TableAPIUtil.spark(storePath, s"TableCleanFiles: $dbName.$tableName")
CarbonEnv.init(spark.sqlContext)
val rows = showSegments(spark, Option(dbName), tableName, limit)
System.out.println(showString(rows))
}
}
Loading

0 comments on commit 1a649f3

Please sign in to comment.