Skip to content

Commit

Permalink
This commit contains three changes:
Browse files Browse the repository at this point in the history
* Expose `DataType`s in the sql package (internal details are private to sql).
* Introduce `createSchemaRDD` to create a `SchemaRDD` from an `RDD` with a provided schema (represented by a `StructType`) and a provided function to construct `Row`,
* Add a function `simpleString` to every `DataType`. Also, the schema represented by a `StructType` can be visualized by `printSchema`.
  • Loading branch information
yhuai committed Jul 9, 2014
1 parent b520b64 commit 16be3e5
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ object Row {
* }}}
*/
def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)

/**
* Construct a [[Row]] with the given values.
*/
def apply(values: Any*): Row = new GenericRow(values.toArray)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import scala.language.dynamics

import org.apache.spark.sql.catalyst.types.DataType

case object DynamicType extends DataType
case object DynamicType extends DataType {
def simpleString: String = "dynamic"
}

case class WrapDynamic(children: Seq[Attribute]) extends Expression {
type EvaluatedType = DynamicRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,52 +125,11 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
}.toSeq
}

protected def generateSchemaString(schema: Seq[Attribute]): String = {
val builder = new StringBuilder
builder.append("root\n")
val prefix = " |"
schema.foreach { attribute =>
val name = attribute.name
val dataType = attribute.dataType
dataType match {
case fields: StructType =>
builder.append(s"$prefix-- $name: $StructType\n")
generateSchemaString(fields, s"$prefix |", builder)
case ArrayType(fields: StructType) =>
builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
generateSchemaString(fields, s"$prefix |", builder)
case ArrayType(elementType: DataType) =>
builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
case _ => builder.append(s"$prefix-- $name: $dataType\n")
}
}

builder.toString()
}

protected def generateSchemaString(
schema: StructType,
prefix: String,
builder: StringBuilder): StringBuilder = {
schema.fields.foreach {
case StructField(name, fields: StructType, _) =>
builder.append(s"$prefix-- $name: $StructType\n")
generateSchemaString(fields, s"$prefix |", builder)
case StructField(name, ArrayType(fields: StructType), _) =>
builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
generateSchemaString(fields, s"$prefix |", builder)
case StructField(name, ArrayType(elementType: DataType), _) =>
builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
case StructField(name, fieldType: DataType, _) =>
builder.append(s"$prefix-- $name: $fieldType\n")
}

builder
}
def schema: StructType = StructType.fromAttributes(output)

/** Returns the output schema in the tree format. */
def schemaString: String = generateSchemaString(output)
def formattedSchemaString: String = schema.formattedSchemaString

/** Prints out the schema in the tree format */
def printSchema(): Unit = println(schemaString)
def printSchema(): Unit = println(formattedSchemaString)
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ object DataType extends RegexParsers {
"true" ^^^ true |
"false" ^^^ false


protected lazy val structType: Parser[DataType] =
"StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
case fields => new StructType(fields)
Expand Down Expand Up @@ -93,47 +92,56 @@ abstract class DataType {
}

def isPrimitive: Boolean = false

def simpleString: String
}

case object NullType extends DataType
case object NullType extends DataType {
def simpleString: String = "null"
}

trait PrimitiveType extends DataType {
override def isPrimitive = true
}

abstract class NativeType extends DataType {
type JvmType
@transient val tag: TypeTag[JvmType]
val ordering: Ordering[JvmType]
private[sql] type JvmType
@transient private[sql] val tag: TypeTag[JvmType]
private[sql] val ordering: Ordering[JvmType]

@transient val classTag = {
@transient private[sql] val classTag = {
val mirror = runtimeMirror(Utils.getSparkClassLoader)
ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
}
}

case object StringType extends NativeType with PrimitiveType {
type JvmType = String
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = String
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "string"
}
case object BinaryType extends DataType with PrimitiveType {
type JvmType = Array[Byte]
private[sql] type JvmType = Array[Byte]
def simpleString: String = "binary"
}
case object BooleanType extends NativeType with PrimitiveType {
type JvmType = Boolean
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Boolean
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "boolean"
}

case object TimestampType extends NativeType {
type JvmType = Timestamp
private[sql] type JvmType = Timestamp

@transient lazy val tag = typeTag[JvmType]
@transient private[sql] lazy val tag = typeTag[JvmType]

val ordering = new Ordering[JvmType] {
private[sql] val ordering = new Ordering[JvmType] {
def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
}

def simpleString: String = "timestamp"
}

abstract class NumericType extends NativeType with PrimitiveType {
Expand All @@ -142,7 +150,7 @@ abstract class NumericType extends NativeType with PrimitiveType {
// type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
// desugared by the compiler into an argument to the objects constructor. This means there is no
// longer an no argument constructor and thus the JVM cannot serialize the object anymore.
val numeric: Numeric[JvmType]
private[sql] val numeric: Numeric[JvmType]
}

/** Matcher for any expressions that evaluate to [[IntegralType]]s */
Expand All @@ -154,39 +162,43 @@ object IntegralType {
}

abstract class IntegralType extends NumericType {
val integral: Integral[JvmType]
private[sql] val integral: Integral[JvmType]
}

case object LongType extends IntegralType {
type JvmType = Long
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Long]]
val integral = implicitly[Integral[Long]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Long
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[Long]]
private[sql] val integral = implicitly[Integral[Long]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "long"
}

case object IntegerType extends IntegralType {
type JvmType = Int
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Int]]
val integral = implicitly[Integral[Int]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Int
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[Int]]
private[sql] val integral = implicitly[Integral[Int]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "integer"
}

case object ShortType extends IntegralType {
type JvmType = Short
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Short]]
val integral = implicitly[Integral[Short]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Short
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[Short]]
private[sql] val integral = implicitly[Integral[Short]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "short"
}

case object ByteType extends IntegralType {
type JvmType = Byte
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Byte]]
val integral = implicitly[Integral[Byte]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Byte
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[Byte]]
private[sql] val integral = implicitly[Integral[Byte]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "byte"
}

/** Matcher for any expressions that evaluate to [[FractionalType]]s */
Expand All @@ -197,47 +209,127 @@ object FractionalType {
}
}
abstract class FractionalType extends NumericType {
val fractional: Fractional[JvmType]
private[sql] val fractional: Fractional[JvmType]
}

case object DecimalType extends FractionalType {
type JvmType = BigDecimal
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[BigDecimal]]
val fractional = implicitly[Fractional[BigDecimal]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = BigDecimal
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[BigDecimal]]
private[sql] val fractional = implicitly[Fractional[BigDecimal]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "decimal"
}

case object DoubleType extends FractionalType {
type JvmType = Double
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Double]]
val fractional = implicitly[Fractional[Double]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Double
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[Double]]
private[sql] val fractional = implicitly[Fractional[Double]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "double"
}

case object FloatType extends FractionalType {
type JvmType = Float
@transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Float]]
val fractional = implicitly[Fractional[Float]]
val ordering = implicitly[Ordering[JvmType]]
private[sql] type JvmType = Float
@transient private[sql] lazy val tag = typeTag[JvmType]
private[sql] val numeric = implicitly[Numeric[Float]]
private[sql] val fractional = implicitly[Fractional[Float]]
private[sql] val ordering = implicitly[Ordering[JvmType]]
def simpleString: String = "float"
}

case class ArrayType(elementType: DataType) extends DataType
case class ArrayType(elementType: DataType) extends DataType {
private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"${prefix}-- element: ${elementType.simpleString}\n")
elementType match {
case array: ArrayType =>
array.buildFormattedString(s"$prefix |", builder)
case struct: StructType =>
struct.buildFormattedString(s"$prefix |", builder)
case map: MapType =>
map.buildFormattedString(s"$prefix |", builder)
case _ =>
}
}

case class StructField(name: String, dataType: DataType, nullable: Boolean)
def simpleString: String = "array"
}

case class StructField(name: String, dataType: DataType, nullable: Boolean) {

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"${prefix}-- ${name}: ${dataType.simpleString} (nullable = ${nullable})\n")
dataType match {
case array: ArrayType =>
array.buildFormattedString(s"$prefix |", builder)
case struct: StructType =>
struct.buildFormattedString(s"$prefix |", builder)
case map: MapType =>
map.buildFormattedString(s"$prefix |", builder)
case _ =>
}
}
}

object StructType {
def fromAttributes(attributes: Seq[Attribute]): StructType = {
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
}

private def validateFields(fields: Seq[StructField]): Boolean =
fields.map(field => field.name).distinct.size == fields.size

// def apply(fields: Seq[StructField]) = new StructType(fields.toIndexedSeq)
}

case class StructType(fields: Seq[StructField]) extends DataType {
require(StructType.validateFields(fields), "Found fields with the same name.")

def toAttributes = fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())

def formattedSchemaString: String = {
val builder = new StringBuilder
builder.append("root\n")
val prefix = " |"
fields.foreach(field => field.buildFormattedString(prefix, builder))

builder.toString()
}

def printSchema(): Unit = println(formattedSchemaString)

private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
fields.foreach(field => field.buildFormattedString(prefix, builder))
}

def simpleString: String = "struct"
}

case class MapType(keyType: DataType, valueType: DataType) extends DataType
case class MapType(keyType: DataType, valueType: DataType) extends DataType {
private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
builder.append(s"${prefix}-- key: ${keyType.simpleString}\n")
keyType match {
case array: ArrayType =>
array.buildFormattedString(s"$prefix |", builder)
case struct: StructType =>
struct.buildFormattedString(s"$prefix |", builder)
case map: MapType =>
map.buildFormattedString(s"$prefix |", builder)
case _ =>
}

builder.append(s"${prefix}-- value: ${valueType.simpleString}\n")
valueType match {
case array: ArrayType =>
array.buildFormattedString(s"$prefix |", builder)
case struct: StructType =>
struct.buildFormattedString(s"$prefix |", builder)
case map: MapType =>
map.buildFormattedString(s"$prefix |", builder)
case _ =>
}
}

def simpleString: String = "map"
}
Loading

0 comments on commit 16be3e5

Please sign in to comment.