Skip to content

Commit

Permalink
Added Timestamp specific in-memory columnar representation
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Jul 21, 2014
1 parent db56f2d commit 45dd05d
Show file tree
Hide file tree
Showing 124 changed files with 255 additions and 65 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ conf/spark-env.sh
conf/streaming-env.sh
conf/log4j.properties
conf/spark-defaults.conf
conf/hive-site.xml
docs/_site
docs/api
target/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.spark.sql.catalyst.types._

Expand All @@ -41,6 +42,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
// UDFToString
private[this] def castToString: Any => Any = child.dataType match {
case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8"))
case TimestampType => buildCast[Timestamp](_, timestampToString)
case _ => buildCast[Any](_, _.toString)
}

Expand Down Expand Up @@ -126,6 +128,18 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
ts.getTime / 1000 + ts.getNanos.toDouble / 1000000000
}

// Converts Timestamp to string according to Hive TimestampWritable convention
private[this] def timestampToString(ts: Timestamp): String = {
val timestampString = ts.toString
val formatted = Cast.simpleDateFormat.format(ts)

if (timestampString.length > 19 && timestampString.substring(19) != ".0") {
formatted + timestampString.substring(19)
} else {
formatted
}
}

private[this] def castToLong: Any => Any = child.dataType match {
case StringType =>
buildCast[String](_, s => try s.toLong catch {
Expand Down Expand Up @@ -249,3 +263,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression {
if (evaluated == null) null else cast(evaluated)
}
}

object Cast {
private[sql] val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ private[sql] class FloatColumnAccessor(buffer: ByteBuffer)
private[sql] class StringColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, STRING)

private[sql] class TimestampColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, TIMESTAMP)

private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer, BINARY)
with NullableColumnAccessor
Expand All @@ -105,16 +108,17 @@ private[sql] object ColumnAccessor {
val columnTypeId = dup.getInt()

columnTypeId match {
case INT.typeId => new IntColumnAccessor(dup)
case LONG.typeId => new LongColumnAccessor(dup)
case FLOAT.typeId => new FloatColumnAccessor(dup)
case DOUBLE.typeId => new DoubleColumnAccessor(dup)
case BOOLEAN.typeId => new BooleanColumnAccessor(dup)
case BYTE.typeId => new ByteColumnAccessor(dup)
case SHORT.typeId => new ShortColumnAccessor(dup)
case STRING.typeId => new StringColumnAccessor(dup)
case BINARY.typeId => new BinaryColumnAccessor(dup)
case GENERIC.typeId => new GenericColumnAccessor(dup)
case INT.typeId => new IntColumnAccessor(dup)
case LONG.typeId => new LongColumnAccessor(dup)
case FLOAT.typeId => new FloatColumnAccessor(dup)
case DOUBLE.typeId => new DoubleColumnAccessor(dup)
case BOOLEAN.typeId => new BooleanColumnAccessor(dup)
case BYTE.typeId => new ByteColumnAccessor(dup)
case SHORT.typeId => new ShortColumnAccessor(dup)
case STRING.typeId => new StringColumnAccessor(dup)
case TIMESTAMP.typeId => new TimestampColumnAccessor(dup)
case BINARY.typeId => new BinaryColumnAccessor(dup)
case GENERIC.typeId => new GenericColumnAccessor(dup)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColum

private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING)

private[sql] class TimestampColumnBuilder
extends NativeColumnBuilder(new TimestampColumnStats, TIMESTAMP)

private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)

// TODO (lian) Add support for array, struct and map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,21 +344,52 @@ private[sql] class StringColumnStats extends BasicColumnStats(STRING) {
}

override def contains(row: Row, ordinal: Int) = {
!(upperBound eq null) && {
(upperBound ne null) && {
val field = columnType.getField(row, ordinal)
lowerBound.compareTo(field) <= 0 && field.compareTo(upperBound) <= 0
}
}

override def isAbove(row: Row, ordinal: Int) = {
!(upperBound eq null) && {
(upperBound ne null) && {
val field = columnType.getField(row, ordinal)
field.compareTo(upperBound) < 0
}
}

override def isBelow(row: Row, ordinal: Int) = {
!(lowerBound eq null) && {
(lowerBound ne null) && {
val field = columnType.getField(row, ordinal)
lowerBound.compareTo(field) < 0
}
}
}

private[sql] class TimestampColumnStats extends BasicColumnStats(TIMESTAMP) {
override def initialBounds = (null, null)

override def gatherStats(row: Row, ordinal: Int) {
val field = columnType.getField(row, ordinal)
if ((upperBound eq null) || field.compareTo(upperBound) > 0) _upper = field
if ((lowerBound eq null) || field.compareTo(lowerBound) < 0) _lower = field
}

override def contains(row: Row, ordinal: Int) = {
(upperBound ne null) && {
val field = columnType.getField(row, ordinal)
lowerBound.compareTo(field) <= 0 && field.compareTo(upperBound) <= 0
}
}

override def isAbove(row: Row, ordinal: Int) = {
(lowerBound ne null) && {
val field = columnType.getField(row, ordinal)
field.compareTo(upperBound) < 0
}
}

override def isBelow(row: Row, ordinal: Int) = {
(lowerBound ne null) && {
val field = columnType.getField(row, ordinal)
lowerBound.compareTo(field) < 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.nio.ByteBuffer

import scala.reflect.runtime.universe.TypeTag

import java.sql.Timestamp

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.catalyst.types._
Expand Down Expand Up @@ -221,6 +223,26 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
override def getField(row: Row, ordinal: Int) = row.getString(ordinal)
}

private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 8, 12) {
override def extract(buffer: ByteBuffer) = {
val timestamp = new Timestamp(buffer.getLong())
timestamp.setNanos(buffer.getInt())
timestamp
}

override def append(v: Timestamp, buffer: ByteBuffer) {
buffer.putLong(v.getTime).putInt(v.getNanos)
}

override def getField(row: Row, ordinal: Int) = {
row(ordinal).asInstanceOf[Timestamp]
}

override def setField(row: MutableRow, ordinal: Int, value: Timestamp) {
row(ordinal) = value
}
}

private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
typeId: Int,
defaultSize: Int)
Expand All @@ -240,7 +262,7 @@ private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
}
}

private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16) {
private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](9, 16) {
override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]) {
row(ordinal) = value
}
Expand All @@ -251,7 +273,7 @@ private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16) {
// Used to process generic objects (all types other than those listed above). Objects should be
// serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized
// byte array.
private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16) {
private[sql] object GENERIC extends ByteArrayColumnType[DataType](10, 16) {
override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]) {
row(ordinal) = SparkSqlSerializer.deserialize[Any](value)
}
Expand All @@ -262,16 +284,17 @@ private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16) {
private[sql] object ColumnType {
def apply(dataType: DataType): ColumnType[_, _] = {
dataType match {
case IntegerType => INT
case LongType => LONG
case FloatType => FLOAT
case DoubleType => DOUBLE
case BooleanType => BOOLEAN
case ByteType => BYTE
case ShortType => SHORT
case StringType => STRING
case BinaryType => BINARY
case _ => GENERIC
case IntegerType => INT
case LongType => LONG
case FloatType => FLOAT
case DoubleType => DOUBLE
case BooleanType => BOOLEAN
case ByteType => BYTE
case ShortType => SHORT
case StringType => STRING
case BinaryType => BINARY
case TimestampType => TIMESTAMP
case _ => GENERIC
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.types._

class ColumnStatsSuite extends FunSuite {
testColumnStats(classOf[BooleanColumnStats], BOOLEAN)
testColumnStats(classOf[ByteColumnStats], BYTE)
testColumnStats(classOf[ShortColumnStats], SHORT)
testColumnStats(classOf[IntColumnStats], INT)
testColumnStats(classOf[LongColumnStats], LONG)
testColumnStats(classOf[FloatColumnStats], FLOAT)
testColumnStats(classOf[DoubleColumnStats], DOUBLE)
testColumnStats(classOf[StringColumnStats], STRING)
testColumnStats(classOf[BooleanColumnStats], BOOLEAN)
testColumnStats(classOf[ByteColumnStats], BYTE)
testColumnStats(classOf[ShortColumnStats], SHORT)
testColumnStats(classOf[IntColumnStats], INT)
testColumnStats(classOf[LongColumnStats], LONG)
testColumnStats(classOf[FloatColumnStats], FLOAT)
testColumnStats(classOf[DoubleColumnStats], DOUBLE)
testColumnStats(classOf[StringColumnStats], STRING)
testColumnStats(classOf[TimestampColumnStats], TIMESTAMP)

def testColumnStats[T <: NativeType, U <: NativeColumnStats[T]](
columnStatsClass: Class[U],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.columnar

import java.nio.ByteBuffer
import java.sql.Timestamp

import org.scalatest.FunSuite

Expand All @@ -32,7 +33,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
test("defaultSize") {
val checks = Map(
INT -> 4, SHORT -> 2, LONG -> 8, BYTE -> 1, DOUBLE -> 8, FLOAT -> 4,
BOOLEAN -> 1, STRING -> 8, BINARY -> 16, GENERIC -> 16)
BOOLEAN -> 1, STRING -> 8, TIMESTAMP -> 12, BINARY -> 16, GENERIC -> 16)

checks.foreach { case (columnType, expectedSize) =>
assertResult(expectedSize, s"Wrong defaultSize for $columnType") {
Expand All @@ -52,14 +53,15 @@ class ColumnTypeSuite extends FunSuite with Logging {
}
}

checkActualSize(INT, Int.MaxValue, 4)
checkActualSize(SHORT, Short.MaxValue, 2)
checkActualSize(LONG, Long.MaxValue, 8)
checkActualSize(BYTE, Byte.MaxValue, 1)
checkActualSize(DOUBLE, Double.MaxValue, 8)
checkActualSize(FLOAT, Float.MaxValue, 4)
checkActualSize(BOOLEAN, true, 1)
checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length)
checkActualSize(INT, Int.MaxValue, 4)
checkActualSize(SHORT, Short.MaxValue, 2)
checkActualSize(LONG, Long.MaxValue, 8)
checkActualSize(BYTE, Byte.MaxValue, 1)
checkActualSize(DOUBLE, Double.MaxValue, 8)
checkActualSize(FLOAT, Float.MaxValue, 4)
checkActualSize(BOOLEAN, true, 1)
checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length)
checkActualSize(TIMESTAMP, new Timestamp(0L), 12)

val binary = Array.fill[Byte](4)(0: Byte)
checkActualSize(BINARY, binary, 4 + 4)
Expand Down Expand Up @@ -188,17 +190,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
}

private def hexDump(value: Any): String = {
if (value.isInstanceOf[String]) {
val sb = new StringBuilder()
for (ch <- value.asInstanceOf[String].toCharArray) {
sb.append(Integer.toHexString(ch & 0xffff)).append(' ')
}
if (! sb.isEmpty) sb.setLength(sb.length - 1)
sb.toString()
} else {
// for now ..
hexDump(value.toString)
}
value.toString.map(ch => Integer.toHexString(ch & 0xffff)).mkString(" ")
}

private def dumpBuffer(buff: ByteBuffer): Any = {
Expand All @@ -207,7 +199,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
val b = buff.get()
sb.append(Integer.toHexString(b & 0xff)).append(' ')
}
if (! sb.isEmpty) sb.setLength(sb.length - 1)
if (sb.nonEmpty) sb.setLength(sb.length - 1)
sb.toString()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.columnar
import scala.collection.immutable.HashSet
import scala.util.Random

import java.sql.Timestamp

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.types.{DataType, NativeType}
Expand All @@ -39,15 +41,19 @@ object ColumnarTestUtils {
}

(columnType match {
case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort
case INT => Random.nextInt()
case LONG => Random.nextLong()
case FLOAT => Random.nextFloat()
case DOUBLE => Random.nextDouble()
case STRING => Random.nextString(Random.nextInt(32))
case BOOLEAN => Random.nextBoolean()
case BINARY => randomBytes(Random.nextInt(32))
case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort
case INT => Random.nextInt()
case LONG => Random.nextLong()
case FLOAT => Random.nextFloat()
case DOUBLE => Random.nextDouble()
case STRING => Random.nextString(Random.nextInt(32))
case BOOLEAN => Random.nextBoolean()
case BINARY => randomBytes(Random.nextInt(32))
case TIMESTAMP =>
val timestamp = new Timestamp(Random.nextLong())
timestamp.setNanos(Random.nextInt(999999999))
timestamp
case _ =>
// Using a random one-element map instead of an arbitrary object
Map(Random.nextInt() -> Random.nextString(Random.nextInt(32)))
Expand Down Expand Up @@ -96,5 +102,4 @@ object ColumnarTestUtils {

(values, rows)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive

import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
import java.sql.Timestamp
import java.util.{ArrayList => JArrayList}

import scala.collection.JavaConversions._
Expand All @@ -28,6 +29,7 @@ import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde2.io.TimestampWritable

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -266,6 +268,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
}.toSeq.sorted.mkString("{", ",", "}")
case (null, _) => "NULL"
case (t: Timestamp, TimestampType) => new TimestampWritable(t).toString
case (other, tpe) if primitiveTypes contains tpe => other.toString
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1.293814861E9
Loading

0 comments on commit 45dd05d

Please sign in to comment.