diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 46fcfbb9e26ba..2ad2d04af5704 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -66,43 +66,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected case class Keyword(str: String) protected implicit def asParser(k: Keyword): Parser[String] = - allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) - - protected class SqlLexical extends StdLexical { - case class FloatLit(chars: String) extends Token { - override def toString = chars - } - override lazy val token: Parser[Token] = ( - identChar ~ rep( identChar | digit ) ^^ - { case first ~ rest => processIdent(first :: rest mkString "") } - | rep1(digit) ~ opt('.' ~> rep(digit)) ^^ { - case i ~ None => NumericLit(i mkString "") - case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString("")) - } - | '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^ - { case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") } - | '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^ - { case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") } - | EofCh ^^^ EOF - | '\'' ~> failure("unclosed string literal") - | '\"' ~> failure("unclosed string literal") - | delim - | failure("illegal character") - ) - - override def identChar = letter | elem('.') | elem('_') - - override def whitespace: Parser[Any] = rep( - whitespaceChar - | '/' ~ '*' ~ comment - | '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') ) - | '#' ~ rep( chrExcept(EofCh, '\n') ) - | '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') ) - | '/' ~ '*' ~ failure("unclosed comment") - ) - } - - override val lexical = new SqlLexical + lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) protected val ALL = Keyword("ALL") protected val AND = Keyword("AND") @@ -161,24 +125,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers { this.getClass .getMethods .filter(_.getReturnType == classOf[Keyword]) - .map(_.invoke(this).asInstanceOf[Keyword]) - - /** Generate all variations of upper and lower case of a given string */ - private def allCaseVersions(s: String, prefix: String = ""): Stream[String] = { - if (s == "") { - Stream(prefix) - } else { - allCaseVersions(s.tail, prefix + s.head.toLower) ++ - allCaseVersions(s.tail, prefix + s.head.toUpper) - } - } + .map(_.invoke(this).asInstanceOf[Keyword].str) - lexical.reserved ++= reservedWords.flatMap(w => allCaseVersions(w.str)) - - lexical.delimiters += ( - "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")", - ",", ";", "%", "{", "}", ":", "[", "]" - ) + override val lexical = new SqlLexical(reservedWords) protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = { exprs.zipWithIndex.map { @@ -383,7 +332,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars) protected lazy val baseExpression: PackratParser[Expression] = - expression ~ "[" ~ expression <~ "]" ^^ { + expression ~ "[" ~ expression <~ "]" ^^ { case base ~ _ ~ ordinal => GetItem(base, ordinal) } | TRUE ^^^ Literal(true, BooleanType) | @@ -399,3 +348,55 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val dataType: Parser[DataType] = STRING ^^^ StringType } + +class SqlLexical(val keywords: Seq[String]) extends StdLexical { + case class FloatLit(chars: String) extends Token { + override def toString = chars + } + + reserved ++= keywords.flatMap(w => allCaseVersions(w)) + + delimiters += ( + "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")", + ",", ";", "%", "{", "}", ":", "[", "]" + ) + + override lazy val token: Parser[Token] = ( + identChar ~ rep( identChar | digit ) ^^ + { case first ~ rest => processIdent(first :: rest mkString "") } + | rep1(digit) ~ opt('.' ~> rep(digit)) ^^ { + case i ~ None => NumericLit(i mkString "") + case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString("")) + } + | '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^ + { case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") } + | '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^ + { case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") } + | EofCh ^^^ EOF + | '\'' ~> failure("unclosed string literal") + | '\"' ~> failure("unclosed string literal") + | delim + | failure("illegal character") + ) + + override def identChar = letter | elem('_') | elem('.') + + override def whitespace: Parser[Any] = rep( + whitespaceChar + | '/' ~ '*' ~ comment + | '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') ) + | '#' ~ rep( chrExcept(EofCh, '\n') ) + | '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') ) + | '/' ~ '*' ~ failure("unclosed comment") + ) + + /** Generate all variations of upper and lower case of a given string */ + def allCaseVersions(s: String, prefix: String = ""): Stream[String] = { + if (s == "") { + Stream(prefix) + } else { + allCaseVersions(s.tail, prefix + s.head.toLower) ++ + allCaseVersions(s.tail, prefix + s.head.toUpper) + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala index b6aeae92f8bec..5d3bb25ad568c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala @@ -50,6 +50,8 @@ case class GetItem(child: Expression, ordinal: Expression) extends Expression { null } else { if (child.dataType.isInstanceOf[ArrayType]) { + // TODO: consider using Array[_] for ArrayType child to avoid + // boxing of primitives val baseValue = value.asInstanceOf[Seq[_]] val o = key.asInstanceOf[Int] if (o >= baseValue.size || o < 0) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index da34bd3a21503..bb77bccf86176 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -19,9 +19,71 @@ package org.apache.spark.sql.catalyst.types import java.sql.Timestamp -import scala.reflect.runtime.universe.{typeTag, TypeTag} +import scala.util.parsing.combinator.RegexParsers -import org.apache.spark.sql.catalyst.expressions.Expression +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror} + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.util.Utils + +/** + * + */ +object DataType extends RegexParsers { + protected lazy val primitiveType: Parser[DataType] = + "StringType" ^^^ StringType | + "FloatType" ^^^ FloatType | + "IntegerType" ^^^ IntegerType | + "ByteType" ^^^ ByteType | + "ShortType" ^^^ ShortType | + "DoubleType" ^^^ DoubleType | + "LongType" ^^^ LongType | + "BinaryType" ^^^ BinaryType | + "BooleanType" ^^^ BooleanType | + "DecimalType" ^^^ DecimalType | + "TimestampType" ^^^ TimestampType + + protected lazy val arrayType: Parser[DataType] = + "ArrayType" ~> "(" ~> dataType <~ ")" ^^ ArrayType + + protected lazy val mapType: Parser[DataType] = + "MapType" ~> "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ { + case t1 ~ _ ~ t2 => MapType(t1, t2) + } + + protected lazy val structField: Parser[StructField] = + ("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ { + case name ~ tpe ~ nullable => + StructField(name, tpe, nullable = nullable) + } + + protected lazy val boolVal: Parser[Boolean] = + "true" ^^^ true | + "false" ^^^ false + + + protected lazy val structType: Parser[DataType] = + "StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ { + case fields => new StructType(fields) + } + + protected lazy val dataType: Parser[DataType] = + arrayType | + mapType | + structType | + primitiveType + + /** + * Parses a string representation of a DataType. + * + * TODO: Generate parser as pickler... + */ + def apply(asString: String): DataType = parseAll(dataType, asString) match { + case Success(result, _) => result + case failure: NoSuccess => sys.error(s"Unsupported dataType: $asString, $failure") + } +} abstract class DataType { /** Matches any expression that evaluates to this DataType */ @@ -29,25 +91,36 @@ abstract class DataType { case e: Expression if e.dataType == this => true case _ => false } + + def isPrimitive: Boolean = false } case object NullType extends DataType +trait PrimitiveType extends DataType { + override def isPrimitive = true +} + abstract class NativeType extends DataType { type JvmType @transient val tag: TypeTag[JvmType] val ordering: Ordering[JvmType] + + @transient val classTag = { + val mirror = runtimeMirror(Utils.getSparkClassLoader) + ClassTag[JvmType](mirror.runtimeClass(tag.tpe)) + } } -case object StringType extends NativeType { +case object StringType extends NativeType with PrimitiveType { type JvmType = String @transient lazy val tag = typeTag[JvmType] val ordering = implicitly[Ordering[JvmType]] } -case object BinaryType extends DataType { +case object BinaryType extends DataType with PrimitiveType { type JvmType = Array[Byte] } -case object BooleanType extends NativeType { +case object BooleanType extends NativeType with PrimitiveType { type JvmType = Boolean @transient lazy val tag = typeTag[JvmType] val ordering = implicitly[Ordering[JvmType]] @@ -63,7 +136,7 @@ case object TimestampType extends NativeType { } } -abstract class NumericType extends NativeType { +abstract class NumericType extends NativeType with PrimitiveType { // Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for // implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a // type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets @@ -154,6 +227,17 @@ case object FloatType extends FractionalType { case class ArrayType(elementType: DataType) extends DataType case class StructField(name: String, dataType: DataType, nullable: Boolean) -case class StructType(fields: Seq[StructField]) extends DataType + +object StructType { + def fromAttributes(attributes: Seq[Attribute]): StructType = { + StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable))) + } + + // def apply(fields: Seq[StructField]) = new StructType(fields.toIndexedSeq) +} + +case class StructType(fields: Seq[StructField]) extends DataType { + def toAttributes = fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)()) +} case class MapType(keyType: DataType, valueType: DataType) extends DataType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1617ec717b2e0..ab376e5504d35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -94,7 +94,7 @@ class SQLContext(@transient val sparkContext: SparkContext) * @group userf */ def parquetFile(path: String): SchemaRDD = - new SchemaRDD(this, parquet.ParquetRelation(path)) + new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration))) /** * Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index ff9842267ffe0..ff6deeda2394d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -99,7 +99,9 @@ class JavaSQLContext(val sqlContext: SQLContext) { * Loads a parquet file, returning the result as a [[JavaSchemaRDD]]. */ def parquetFile(path: String): JavaSchemaRDD = - new JavaSchemaRDD(sqlContext, ParquetRelation(path)) + new JavaSchemaRDD( + sqlContext, + ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration))) /** * Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index feb280d1d1411..4694f25d6d630 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -154,7 +154,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.WriteToFile(path, child) => val relation = ParquetRelation.create(path, child, sparkContext.hadoopConfiguration) - InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil + // Note: overwrite=false because otherwise the metadata we just created will be deleted + InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sparkContext) :: Nil case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) => InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala new file mode 100644 index 0000000000000..889a408e3c393 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -0,0 +1,667 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap} + +import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter} +import parquet.schema.MessageType + +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.expressions.{GenericRow, Row, Attribute} +import org.apache.spark.sql.parquet.CatalystConverter.FieldType + +/** + * Collection of converters of Parquet types (group and primitive types) that + * model arrays and maps. The conversions are partly based on the AvroParquet + * converters that are part of Parquet in order to be able to process these + * types. + * + * There are several types of converters: + * + */ + +private[sql] object CatalystConverter { + // The type internally used for fields + type FieldType = StructField + + // This is mostly Parquet convention (see, e.g., `ConversionPatterns`). + // Note that "array" for the array elements is chosen by ParquetAvro. + // Using a different value will result in Parquet silently dropping columns. + val ARRAY_ELEMENTS_SCHEMA_NAME = "array" + val MAP_KEY_SCHEMA_NAME = "key" + val MAP_VALUE_SCHEMA_NAME = "value" + val MAP_SCHEMA_NAME = "map" + + // TODO: consider using Array[T] for arrays to avoid boxing of primitive types + type ArrayScalaType[T] = Seq[T] + type StructScalaType[T] = Seq[T] + type MapScalaType[K, V] = Map[K, V] + + protected[parquet] def createConverter( + field: FieldType, + fieldIndex: Int, + parent: CatalystConverter): Converter = { + val fieldType: DataType = field.dataType + fieldType match { + // For native JVM types we use a converter with native arrays + case ArrayType(elementType: NativeType) => { + new CatalystNativeArrayConverter(elementType, fieldIndex, parent) + } + // This is for other types of arrays, including those with nested fields + case ArrayType(elementType: DataType) => { + new CatalystArrayConverter(elementType, fieldIndex, parent) + } + case StructType(fields: Seq[StructField]) => { + new CatalystStructConverter(fields.toArray, fieldIndex, parent) + } + case MapType(keyType: DataType, valueType: DataType) => { + new CatalystMapConverter( + Array( + new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false), + new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, true)), + fieldIndex, + parent) + } + // Strings, Shorts and Bytes do not have a corresponding type in Parquet + // so we need to treat them separately + case StringType => { + new CatalystPrimitiveConverter(parent, fieldIndex) { + override def addBinary(value: Binary): Unit = + parent.updateString(fieldIndex, value) + } + } + case ShortType => { + new CatalystPrimitiveConverter(parent, fieldIndex) { + override def addInt(value: Int): Unit = + parent.updateShort(fieldIndex, value.asInstanceOf[ShortType.JvmType]) + } + } + case ByteType => { + new CatalystPrimitiveConverter(parent, fieldIndex) { + override def addInt(value: Int): Unit = + parent.updateByte(fieldIndex, value.asInstanceOf[ByteType.JvmType]) + } + } + // All other primitive types use the default converter + case ctype: NativeType => { // note: need the type tag here! + new CatalystPrimitiveConverter(parent, fieldIndex) + } + case _ => throw new RuntimeException( + s"unable to convert datatype ${field.dataType.toString} in CatalystConverter") + } + } + + protected[parquet] def createRootConverter( + parquetSchema: MessageType, + attributes: Seq[Attribute]): CatalystConverter = { + // For non-nested types we use the optimized Row converter + if (attributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))) { + new CatalystPrimitiveRowConverter(attributes.toArray) + } else { + new CatalystGroupConverter(attributes.toArray) + } + } +} + +private[parquet] abstract class CatalystConverter extends GroupConverter { + /** + * The number of fields this group has + */ + protected[parquet] val size: Int + + /** + * The index of this converter in the parent + */ + protected[parquet] val index: Int + + /** + * The parent converter + */ + protected[parquet] val parent: CatalystConverter + + /** + * Called by child converters to update their value in its parent (this). + * Note that if possible the more specific update methods below should be used + * to avoid auto-boxing of native JVM types. + * + * @param fieldIndex + * @param value + */ + protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit + + protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = + updateField(fieldIndex, value) + + protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = + updateField(fieldIndex, value) + + protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = + updateField(fieldIndex, value) + + protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit = + updateField(fieldIndex, value) + + protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit = + updateField(fieldIndex, value) + + protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = + updateField(fieldIndex, value) + + protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = + updateField(fieldIndex, value) + + protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = + updateField(fieldIndex, value.getBytes) + + protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit = + updateField(fieldIndex, value.toStringUsingUTF8) + + protected[parquet] def isRootConverter: Boolean = parent == null + + protected[parquet] def clearBuffer(): Unit + + /** + * Should only be called in the root (group) converter! + * + * @return + */ + def getCurrentRecord: Row = throw new UnsupportedOperationException +} + +/** + * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record + * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. + * + * @param schema The corresponding Catalyst schema in the form of a list of attributes. + */ +private[parquet] class CatalystGroupConverter( + protected[parquet] val schema: Array[FieldType], + protected[parquet] val index: Int, + protected[parquet] val parent: CatalystConverter, + protected[parquet] var current: ArrayBuffer[Any], + protected[parquet] var buffer: ArrayBuffer[Row]) + extends CatalystConverter { + + def this(schema: Array[FieldType], index: Int, parent: CatalystConverter) = + this( + schema, + index, + parent, + current=null, + buffer=new ArrayBuffer[Row]( + CatalystArrayConverter.INITIAL_ARRAY_SIZE)) + + /** + * This constructor is used for the root converter only! + */ + def this(attributes: Array[Attribute]) = + this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null) + + protected [parquet] val converters: Array[Converter] = + schema.map(field => + CatalystConverter.createConverter(field, schema.indexOf(field), this)) + .toArray + + override val size = schema.size + + override def getCurrentRecord: Row = { + assert(isRootConverter, "getCurrentRecord should only be called in root group converter!") + // TODO: use iterators if possible + // Note: this will ever only be called in the root converter when the record has been + // fully processed. Therefore it will be difficult to use mutable rows instead, since + // any non-root converter never would be sure when it would be safe to re-use the buffer. + new GenericRow(current.toArray) + } + + override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) + + // for child converters to update upstream values + override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { + current.update(fieldIndex, value) + } + + override protected[parquet] def clearBuffer(): Unit = buffer.clear() + + override def start(): Unit = { + current = ArrayBuffer.fill(size)(null) + converters.foreach { + converter => if (!converter.isPrimitive) { + converter.asInstanceOf[CatalystConverter].clearBuffer + } + } + } + + override def end(): Unit = { + if (!isRootConverter) { + assert(current!=null) // there should be no empty groups + buffer.append(new GenericRow(current.toArray)) + parent.updateField(index, new GenericRow(buffer.toArray.asInstanceOf[Array[Any]])) + } + } +} + +/** + * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record + * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. Note that his + * converter is optimized for rows of primitive types (non-nested records). + */ +private[parquet] class CatalystPrimitiveRowConverter( + protected[parquet] val schema: Array[FieldType], + protected[parquet] var current: ParquetRelation.RowType) + extends CatalystConverter { + + // This constructor is used for the root converter only + def this(attributes: Array[Attribute]) = + this( + attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), + new ParquetRelation.RowType(attributes.length)) + + protected [parquet] val converters: Array[Converter] = + schema.map(field => + CatalystConverter.createConverter(field, schema.indexOf(field), this)) + .toArray + + override val size = schema.size + + override val index = 0 + + override val parent = null + + // Should be only called in root group converter! + override def getCurrentRecord: ParquetRelation.RowType = current + + override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) + + // for child converters to update upstream values + override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { + throw new UnsupportedOperationException // child converters should use the + // specific update methods below + } + + override protected[parquet] def clearBuffer(): Unit = {} + + override def start(): Unit = { + var i = 0 + while (i < size) { + current.setNullAt(i) + i = i + 1 + } + } + + override def end(): Unit = {} + + // Overriden here to avoid auto-boxing for primitive types + override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = + current.setBoolean(fieldIndex, value) + + override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = + current.setInt(fieldIndex, value) + + override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = + current.setLong(fieldIndex, value) + + override protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit = + current.setShort(fieldIndex, value) + + override protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit = + current.setByte(fieldIndex, value) + + override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = + current.setDouble(fieldIndex, value) + + override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = + current.setFloat(fieldIndex, value) + + override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = + current.update(fieldIndex, value.getBytes) + + override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit = + current.setString(fieldIndex, value.toStringUsingUTF8) +} + +/** + * A `parquet.io.api.PrimitiveConverter` that converts Parquet types to Catalyst types. + * + * @param parent The parent group converter. + * @param fieldIndex The index inside the record. + */ +private[parquet] class CatalystPrimitiveConverter( + parent: CatalystConverter, + fieldIndex: Int) extends PrimitiveConverter { + override def addBinary(value: Binary): Unit = + parent.updateBinary(fieldIndex, value) + + override def addBoolean(value: Boolean): Unit = + parent.updateBoolean(fieldIndex, value) + + override def addDouble(value: Double): Unit = + parent.updateDouble(fieldIndex, value) + + override def addFloat(value: Float): Unit = + parent.updateFloat(fieldIndex, value) + + override def addInt(value: Int): Unit = + parent.updateInt(fieldIndex, value) + + override def addLong(value: Long): Unit = + parent.updateLong(fieldIndex, value) +} + +object CatalystArrayConverter { + val INITIAL_ARRAY_SIZE = 20 +} + +/** + * A `parquet.io.api.GroupConverter` that converts a single-element groups that + * match the characteristics of an array (see + * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an + * [[org.apache.spark.sql.catalyst.types.ArrayType]]. + * + * @param elementType The type of the array elements (complex or primitive) + * @param index The position of this (array) field inside its parent converter + * @param parent The parent converter + * @param buffer A data buffer + */ +private[parquet] class CatalystArrayConverter( + val elementType: DataType, + val index: Int, + protected[parquet] val parent: CatalystConverter, + protected[parquet] var buffer: Buffer[Any]) + extends CatalystConverter { + + def this(elementType: DataType, index: Int, parent: CatalystConverter) = + this( + elementType, + index, + parent, + new ArrayBuffer[Any](CatalystArrayConverter.INITIAL_ARRAY_SIZE)) + + protected[parquet] val converter: Converter = CatalystConverter.createConverter( + new CatalystConverter.FieldType( + CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, + elementType, + false), + fieldIndex=0, + parent=this) + + override def getConverter(fieldIndex: Int): Converter = converter + + // arrays have only one (repeated) field, which is its elements + override val size = 1 + + override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { + // fieldIndex is ignored (assumed to be zero but not checked) + if(value == null) { + throw new IllegalArgumentException("Null values inside Parquet arrays are not supported!") + } + buffer += value + } + + override protected[parquet] def clearBuffer(): Unit = { + buffer.clear() + } + + override def start(): Unit = { + if (!converter.isPrimitive) { + converter.asInstanceOf[CatalystConverter].clearBuffer + } + } + + override def end(): Unit = { + assert(parent != null) + // here we need to make sure to use ArrayScalaType + parent.updateField(index, buffer.toArray.toSeq) + clearBuffer() + } +} + +/** + * A `parquet.io.api.GroupConverter` that converts a single-element groups that + * match the characteristics of an array (see + * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an + * [[org.apache.spark.sql.catalyst.types.ArrayType]]. + * + * @param elementType The type of the array elements (native) + * @param index The position of this (array) field inside its parent converter + * @param parent The parent converter + * @param capacity The (initial) capacity of the buffer + */ +private[parquet] class CatalystNativeArrayConverter( + val elementType: NativeType, + val index: Int, + protected[parquet] val parent: CatalystConverter, + protected[parquet] var capacity: Int = CatalystArrayConverter.INITIAL_ARRAY_SIZE) + extends CatalystConverter { + + type NativeType = elementType.JvmType + + private var buffer: Array[NativeType] = elementType.classTag.newArray(capacity) + + private var elements: Int = 0 + + protected[parquet] val converter: Converter = CatalystConverter.createConverter( + new CatalystConverter.FieldType( + CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, + elementType, + false), + fieldIndex=0, + parent=this) + + override def getConverter(fieldIndex: Int): Converter = converter + + // arrays have only one (repeated) field, which is its elements + override val size = 1 + + override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = + throw new UnsupportedOperationException + + // Overriden here to avoid auto-boxing for primitive types + override protected[parquet] def updateBoolean(fieldIndex: Int, value: Boolean): Unit = { + checkGrowBuffer() + buffer(elements) = value.asInstanceOf[NativeType] + elements += 1 + } + + override protected[parquet] def updateInt(fieldIndex: Int, value: Int): Unit = { + checkGrowBuffer() + buffer(elements) = value.asInstanceOf[NativeType] + elements += 1 + } + + override protected[parquet] def updateShort(fieldIndex: Int, value: Short): Unit = { + checkGrowBuffer() + buffer(elements) = value.asInstanceOf[NativeType] + elements += 1 + } + + override protected[parquet] def updateByte(fieldIndex: Int, value: Byte): Unit = { + checkGrowBuffer() + buffer(elements) = value.asInstanceOf[NativeType] + elements += 1 + } + + override protected[parquet] def updateLong(fieldIndex: Int, value: Long): Unit = { + checkGrowBuffer() + buffer(elements) = value.asInstanceOf[NativeType] + elements += 1 + } + + override protected[parquet] def updateDouble(fieldIndex: Int, value: Double): Unit = { + checkGrowBuffer() + buffer(elements) = value.asInstanceOf[NativeType] + elements += 1 + } + + override protected[parquet] def updateFloat(fieldIndex: Int, value: Float): Unit = { + checkGrowBuffer() + buffer(elements) = value.asInstanceOf[NativeType] + elements += 1 + } + + override protected[parquet] def updateBinary(fieldIndex: Int, value: Binary): Unit = { + checkGrowBuffer() + buffer(elements) = value.getBytes.asInstanceOf[NativeType] + elements += 1 + } + + override protected[parquet] def updateString(fieldIndex: Int, value: Binary): Unit = { + checkGrowBuffer() + buffer(elements) = value.toStringUsingUTF8.asInstanceOf[NativeType] + elements += 1 + } + + override protected[parquet] def clearBuffer(): Unit = { + elements = 0 + } + + override def start(): Unit = {} + + override def end(): Unit = { + assert(parent != null) + // here we need to make sure to use ArrayScalaType + parent.updateField( + index, + buffer.slice(0, elements).toSeq) + clearBuffer() + } + + private def checkGrowBuffer(): Unit = { + if (elements >= capacity) { + val newCapacity = 2 * capacity + val tmp: Array[NativeType] = elementType.classTag.newArray(newCapacity) + Array.copy(buffer, 0, tmp, 0, capacity) + buffer = tmp + capacity = newCapacity + } + } +} + +/** + * This converter is for multi-element groups of primitive or complex types + * that have repetition level optional or required (so struct fields). + * + * @param schema The corresponding Catalyst schema in the form of a list of + * attributes. + * @param index + * @param parent + */ +private[parquet] class CatalystStructConverter( + override protected[parquet] val schema: Array[FieldType], + override protected[parquet] val index: Int, + override protected[parquet] val parent: CatalystConverter) + extends CatalystGroupConverter(schema, index, parent) { + + override protected[parquet] def clearBuffer(): Unit = {} + + // TODO: think about reusing the buffer + override def end(): Unit = { + assert(!isRootConverter) + // here we need to make sure to use StructScalaType + // Note: we need to actually make a copy of the array since we + // may be in a nested field + parent.updateField(index, new GenericRow(current.toArray)) + } +} + +/** + * A `parquet.io.api.GroupConverter` that converts two-element groups that + * match the characteristics of a map (see + * [[org.apache.spark.sql.parquet.ParquetTypesConverter]]) into an + * [[org.apache.spark.sql.catalyst.types.MapType]]. + * + * @param schema + * @param index + * @param parent + */ +private[parquet] class CatalystMapConverter( + protected[parquet] val schema: Array[FieldType], + override protected[parquet] val index: Int, + override protected[parquet] val parent: CatalystConverter) + extends CatalystConverter { + + private val map = new HashMap[Any, Any]() + + private val keyValueConverter = new CatalystConverter { + private var currentKey: Any = null + private var currentValue: Any = null + val keyConverter = CatalystConverter.createConverter(schema(0), 0, this) + val valueConverter = CatalystConverter.createConverter(schema(1), 1, this) + + override def getConverter(fieldIndex: Int): Converter = { + if (fieldIndex == 0) keyConverter else valueConverter + } + + override def end(): Unit = CatalystMapConverter.this.map += currentKey -> currentValue + + override def start(): Unit = { + currentKey = null + currentValue = null + } + + override protected[parquet] val size: Int = 2 + override protected[parquet] val index: Int = 0 + override protected[parquet] val parent: CatalystConverter = CatalystMapConverter.this + + override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = { + fieldIndex match { + case 0 => + currentKey = value + case 1 => + currentValue = value + case _ => + new RuntimePermission(s"trying to update Map with fieldIndex $fieldIndex") + } + } + + override protected[parquet] def clearBuffer(): Unit = {} + } + + override protected[parquet] val size: Int = 1 + + override protected[parquet] def clearBuffer(): Unit = {} + + override def start(): Unit = { + map.clear() + } + + override def end(): Unit = { + // here we need to make sure to use MapScalaType + parent.updateField(index, map.toMap) + } + + override def getConverter(fieldIndex: Int): Converter = keyValueConverter + + override protected[parquet] def updateField(fieldIndex: Int, value: Any): Unit = + throw new UnsupportedOperationException +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 32813a66de3c3..96c131a7f8af1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -20,25 +20,16 @@ package org.apache.spark.sql.parquet import java.io.IOException import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.permission.FsAction -import org.apache.hadoop.mapreduce.Job -import parquet.hadoop.util.ContextUtil -import parquet.hadoop.{ParquetOutputFormat, Footer, ParquetFileWriter, ParquetFileReader} -import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} -import parquet.io.api.{Binary, RecordConsumer} -import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveType, MessageType, MessageTypeParser} -import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName} -import parquet.schema.Type.Repetition +import parquet.hadoop.ParquetOutputFormat +import parquet.hadoop.metadata.CompressionCodecName +import parquet.schema.MessageType import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode} -import org.apache.spark.sql.catalyst.types._ - -// Implicits -import scala.collection.JavaConversions._ /** * Relation that consists of data stored in a Parquet columnar format. @@ -52,21 +43,20 @@ import scala.collection.JavaConversions._ * * @param path The path to the Parquet file. */ -private[sql] case class ParquetRelation(val path: String) - extends LeafNode with MultiInstanceRelation { +private[sql] case class ParquetRelation( + val path: String, + @transient val conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation { self: Product => /** Schema derived from ParquetFile */ def parquetSchema: MessageType = ParquetTypesConverter - .readMetaData(new Path(path)) + .readMetaData(new Path(path), conf) .getFileMetaData .getSchema /** Attributes */ - override val output = - ParquetTypesConverter - .convertToAttributes(parquetSchema) + override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path), conf) override def newInstance = ParquetRelation(path).asInstanceOf[this.type] @@ -141,7 +131,9 @@ private[sql] object ParquetRelation { } ParquetRelation.enableLogForwarding() ParquetTypesConverter.writeMetaData(attributes, path, conf) - new ParquetRelation(path.toString) + new ParquetRelation(path.toString, Some(conf)) { + override val output = attributes + } } private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = { @@ -170,151 +162,3 @@ private[sql] object ParquetRelation { path } } - -private[parquet] object ParquetTypesConverter { - def toDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match { - // for now map binary to string type - // TODO: figure out how Parquet uses strings or why we can't use them in a MessageType schema - case ParquetPrimitiveTypeName.BINARY => StringType - case ParquetPrimitiveTypeName.BOOLEAN => BooleanType - case ParquetPrimitiveTypeName.DOUBLE => DoubleType - case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => ArrayType(ByteType) - case ParquetPrimitiveTypeName.FLOAT => FloatType - case ParquetPrimitiveTypeName.INT32 => IntegerType - case ParquetPrimitiveTypeName.INT64 => LongType - case ParquetPrimitiveTypeName.INT96 => - // TODO: add BigInteger type? TODO(andre) use DecimalType instead???? - sys.error("Warning: potential loss of precision: converting INT96 to long") - LongType - case _ => sys.error( - s"Unsupported parquet datatype $parquetType") - } - - def fromDataType(ctype: DataType): ParquetPrimitiveTypeName = ctype match { - case StringType => ParquetPrimitiveTypeName.BINARY - case BooleanType => ParquetPrimitiveTypeName.BOOLEAN - case DoubleType => ParquetPrimitiveTypeName.DOUBLE - case ArrayType(ByteType) => ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY - case FloatType => ParquetPrimitiveTypeName.FLOAT - case IntegerType => ParquetPrimitiveTypeName.INT32 - case LongType => ParquetPrimitiveTypeName.INT64 - case _ => sys.error(s"Unsupported datatype $ctype") - } - - def consumeType(consumer: RecordConsumer, ctype: DataType, record: Row, index: Int): Unit = { - ctype match { - case StringType => consumer.addBinary( - Binary.fromByteArray( - record(index).asInstanceOf[String].getBytes("utf-8") - ) - ) - case IntegerType => consumer.addInteger(record.getInt(index)) - case LongType => consumer.addLong(record.getLong(index)) - case DoubleType => consumer.addDouble(record.getDouble(index)) - case FloatType => consumer.addFloat(record.getFloat(index)) - case BooleanType => consumer.addBoolean(record.getBoolean(index)) - case _ => sys.error(s"Unsupported datatype $ctype, cannot write to consumer") - } - } - - def getSchema(schemaString : String) : MessageType = - MessageTypeParser.parseMessageType(schemaString) - - def convertToAttributes(parquetSchema: MessageType) : Seq[Attribute] = { - parquetSchema.getColumns.map { - case (desc) => - val ctype = toDataType(desc.getType) - val name: String = desc.getPath.mkString(".") - new AttributeReference(name, ctype, false)() - } - } - - // TODO: allow nesting? - def convertFromAttributes(attributes: Seq[Attribute]): MessageType = { - val fields: Seq[ParquetType] = attributes.map { - a => new ParquetPrimitiveType(Repetition.OPTIONAL, fromDataType(a.dataType), a.name) - } - new MessageType("root", fields) - } - - def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration) { - if (origPath == null) { - throw new IllegalArgumentException("Unable to write Parquet metadata: path is null") - } - val fs = origPath.getFileSystem(conf) - if (fs == null) { - throw new IllegalArgumentException( - s"Unable to write Parquet metadata: path $origPath is incorrectly formatted") - } - val path = origPath.makeQualified(fs) - if (fs.exists(path) && !fs.getFileStatus(path).isDir) { - throw new IllegalArgumentException(s"Expected to write to directory $path but found file") - } - val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE) - if (fs.exists(metadataPath)) { - try { - fs.delete(metadataPath, true) - } catch { - case e: IOException => - throw new IOException(s"Unable to delete previous PARQUET_METADATA_FILE at $metadataPath") - } - } - val extraMetadata = new java.util.HashMap[String, String]() - extraMetadata.put("path", path.toString) - // TODO: add extra data, e.g., table name, date, etc.? - - val parquetSchema: MessageType = - ParquetTypesConverter.convertFromAttributes(attributes) - val metaData: FileMetaData = new FileMetaData( - parquetSchema, - extraMetadata, - "Spark") - - ParquetRelation.enableLogForwarding() - ParquetFileWriter.writeMetadataFile( - conf, - path, - new Footer(path, new ParquetMetadata(metaData, Nil)) :: Nil) - } - - /** - * Try to read Parquet metadata at the given Path. We first see if there is a summary file - * in the parent directory. If so, this is used. Else we read the actual footer at the given - * location. - * @param origPath The path at which we expect one (or more) Parquet files. - * @return The `ParquetMetadata` containing among other things the schema. - */ - def readMetaData(origPath: Path): ParquetMetadata = { - if (origPath == null) { - throw new IllegalArgumentException("Unable to read Parquet metadata: path is null") - } - val job = new Job() - // TODO: since this is called from ParquetRelation (LogicalPlan) we don't have access - // to SparkContext's hadoopConfig; in principle the default FileSystem may be different(?!) - val conf = ContextUtil.getConfiguration(job) - val fs: FileSystem = origPath.getFileSystem(conf) - if (fs == null) { - throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath") - } - val path = origPath.makeQualified(fs) - if (!fs.getFileStatus(path).isDir) { - throw new IllegalArgumentException( - s"Expected $path for be a directory with Parquet files/metadata") - } - ParquetRelation.enableLogForwarding() - val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE) - // if this is a new table that was just created we will find only the metadata file - if (fs.exists(metadataPath) && fs.isFile(metadataPath)) { - ParquetFileReader.readFooter(conf, metadataPath) - } else { - // there may be one or more Parquet files in the given directory - val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path)) - // TODO: for now we assume that all footers (if there is more than one) have identical - // metadata; we may want to add a check here at some point - if (footers.size() == 0) { - throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path") - } - footers(0).getParquetMetadata - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 65ba1246fbf9a..624f2e2fa13f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -36,6 +36,7 @@ import parquet.schema.MessageType import org.apache.spark.{Logging, SerializableWritable, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row} +import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} /** @@ -64,10 +65,13 @@ case class ParquetTableScan( NewFileInputFormat.addInputPath(job, path) } - // Store Parquet schema in `Configuration` + // Store both requested and original schema in `Configuration` conf.set( - RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA, - ParquetTypesConverter.convertFromAttributes(output).toString) + RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + ParquetTypesConverter.convertToString(output)) + conf.set( + RowWriteSupport.SPARK_ROW_SCHEMA, + ParquetTypesConverter.convertToString(relation.output)) // Store record filtering predicate in `Configuration` // Note 1: the input format ignores all predicates that cannot be expressed @@ -166,13 +170,18 @@ case class InsertIntoParquetTable( val job = new Job(sc.hadoopConfiguration) - ParquetOutputFormat.setWriteSupportClass( - job, - classOf[org.apache.spark.sql.parquet.RowWriteSupport]) + val writeSupport = + if (child.output.map(_.dataType).forall(_.isPrimitive)) { + logger.debug("Initializing MutableRowWriteSupport") + classOf[org.apache.spark.sql.parquet.MutableRowWriteSupport] + } else { + classOf[org.apache.spark.sql.parquet.RowWriteSupport] + } + + ParquetOutputFormat.setWriteSupportClass(job, writeSupport) - // TODO: move that to function in object val conf = ContextUtil.getConfiguration(job) - conf.set(RowWriteSupport.PARQUET_ROW_SCHEMA, relation.parquetSchema.toString) + RowWriteSupport.setSchema(relation.output, conf) val fspath = new Path(relation.path) val fs = fspath.getFileSystem(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 71ba0fecce47a..bfcbdeb34a92f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -29,21 +29,23 @@ import parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.Logging import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.execution.SparkSqlSerializer +import com.google.common.io.BaseEncoding /** * A `parquet.io.api.RecordMaterializer` for Rows. * *@param root The root group converter for the record. */ -private[parquet] class RowRecordMaterializer(root: CatalystGroupConverter) +private[parquet] class RowRecordMaterializer(root: CatalystConverter) extends RecordMaterializer[Row] { - def this(parquetSchema: MessageType) = - this(new CatalystGroupConverter(ParquetTypesConverter.convertToAttributes(parquetSchema))) + def this(parquetSchema: MessageType, attributes: Seq[Attribute]) = + this(CatalystConverter.createRootConverter(parquetSchema, attributes)) override def getCurrentRecord: Row = root.getCurrentRecord - override def getRootConverter: GroupConverter = root + override def getRootConverter: GroupConverter = root.asInstanceOf[GroupConverter] } /** @@ -56,68 +58,94 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging { stringMap: java.util.Map[String, String], fileSchema: MessageType, readContext: ReadContext): RecordMaterializer[Row] = { - log.debug(s"preparing for read with file schema $fileSchema") - new RowRecordMaterializer(readContext.getRequestedSchema) + log.debug(s"preparing for read with Parquet file schema $fileSchema") + // Note: this very much imitates AvroParquet + val parquetSchema = readContext.getRequestedSchema + var schema: Seq[Attribute] = null + + if (readContext.getReadSupportMetadata != null) { + // first try to find the read schema inside the metadata (can result from projections) + if ( + readContext + .getReadSupportMetadata + .get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA) != null) { + schema = ParquetTypesConverter.convertFromString( + readContext.getReadSupportMetadata.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA)) + } else { + // if unavailable, try the schema that was read originally from the file or provided + // during the creation of the Parquet relation + if (readContext.getReadSupportMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) { + schema = ParquetTypesConverter.convertFromString( + readContext.getReadSupportMetadata.get(RowReadSupport.SPARK_METADATA_KEY)) + } + } + } + // if both unavailable, fall back to deducing the schema from the given Parquet schema + if (schema == null) { + log.debug("falling back to Parquet read schema") + schema = ParquetTypesConverter.convertToAttributes(parquetSchema) + } + log.debug(s"list of attributes that will be read: $schema") + new RowRecordMaterializer(parquetSchema, schema) } override def init( configuration: Configuration, keyValueMetaData: java.util.Map[String, String], fileSchema: MessageType): ReadContext = { - val requested_schema_string = - configuration.get(RowReadSupport.PARQUET_ROW_REQUESTED_SCHEMA, fileSchema.toString) - val requested_schema = - MessageTypeParser.parseMessageType(requested_schema_string) - log.debug(s"read support initialized for requested schema $requested_schema") - ParquetRelation.enableLogForwarding() - new ReadContext(requested_schema, keyValueMetaData) + var parquetSchema: MessageType = fileSchema + var metadata: java.util.Map[String, String] = new java.util.HashMap[String, String]() + val requestedAttributes = RowReadSupport.getRequestedSchema(configuration) + + if (requestedAttributes != null) { + parquetSchema = ParquetTypesConverter.convertFromAttributes(requestedAttributes) + metadata.put( + RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + ParquetTypesConverter.convertToString(requestedAttributes)) + } + + val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA) + if (origAttributesStr != null) { + metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr) + } + + return new ReadSupport.ReadContext(parquetSchema, metadata) } } private[parquet] object RowReadSupport { - val PARQUET_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" + val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema" + val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata" + + private def getRequestedSchema(configuration: Configuration): Seq[Attribute] = { + val schemaString = configuration.get(RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA) + if (schemaString == null) null else ParquetTypesConverter.convertFromString(schemaString) + } } /** * A `parquet.hadoop.api.WriteSupport` for Row ojects. */ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { - def setSchema(schema: MessageType, configuration: Configuration) { - // for testing - this.schema = schema - // TODO: could use Attributes themselves instead of Parquet schema? - configuration.set( - RowWriteSupport.PARQUET_ROW_SCHEMA, - schema.toString) - configuration.set( - ParquetOutputFormat.WRITER_VERSION, - ParquetProperties.WriterVersion.PARQUET_1_0.toString) - } - - def getSchema(configuration: Configuration): MessageType = { - MessageTypeParser.parseMessageType(configuration.get(RowWriteSupport.PARQUET_ROW_SCHEMA)) - } - private var schema: MessageType = null - private var writer: RecordConsumer = null - private var attributes: Seq[Attribute] = null + private[parquet] var writer: RecordConsumer = null + private[parquet] var attributes: Seq[Attribute] = null override def init(configuration: Configuration): WriteSupport.WriteContext = { - schema = if (schema == null) getSchema(configuration) else schema - attributes = ParquetTypesConverter.convertToAttributes(schema) - log.debug(s"write support initialized for requested schema $schema") + attributes = if (attributes == null) RowWriteSupport.getSchema(configuration) else attributes + + log.debug(s"write support initialized for requested schema $attributes") ParquetRelation.enableLogForwarding() new WriteSupport.WriteContext( - schema, + ParquetTypesConverter.convertFromAttributes(attributes), new java.util.HashMap[java.lang.String, java.lang.String]()) } override def prepareForWrite(recordConsumer: RecordConsumer): Unit = { writer = recordConsumer - log.debug(s"preparing for write with schema $schema") + log.debug(s"preparing for write with schema $attributes") } - // TODO: add groups (nested fields) override def write(record: Row): Unit = { if (attributes.size > record.size) { throw new IndexOutOfBoundsException( @@ -130,98 +158,176 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { // null values indicate optional fields but we do not check currently if (record(index) != null && record(index) != Nil) { writer.startField(attributes(index).name, index) - ParquetTypesConverter.consumeType(writer, attributes(index).dataType, record, index) + writeValue(attributes(index).dataType, record(index)) writer.endField(attributes(index).name, index) } index = index + 1 } writer.endMessage() } -} -private[parquet] object RowWriteSupport { - val PARQUET_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.schema" -} - -/** - * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record to a `Row` object. - * - * @param schema The corresponding Catalyst schema in the form of a list of attributes. - */ -private[parquet] class CatalystGroupConverter( - schema: Seq[Attribute], - protected[parquet] val current: ParquetRelation.RowType) extends GroupConverter { - - def this(schema: Seq[Attribute]) = this(schema, new ParquetRelation.RowType(schema.length)) - - val converters: Array[Converter] = schema.map { - a => a.dataType match { - case ctype: NativeType => - // note: for some reason matching for StringType fails so use this ugly if instead - if (ctype == StringType) { - new CatalystPrimitiveStringConverter(this, schema.indexOf(a)) - } else { - new CatalystPrimitiveConverter(this, schema.indexOf(a)) - } - case _ => throw new RuntimeException( - s"unable to convert datatype ${a.dataType.toString} in CatalystGroupConverter") + private[parquet] def writeValue(schema: DataType, value: Any): Unit = { + if (value != null && value != Nil) { + schema match { + case t @ ArrayType(_) => writeArray( + t, + value.asInstanceOf[CatalystConverter.ArrayScalaType[_]]) + case t @ MapType(_, _) => writeMap( + t, + value.asInstanceOf[CatalystConverter.MapScalaType[_, _]]) + case t @ StructType(_) => writeStruct( + t, + value.asInstanceOf[CatalystConverter.StructScalaType[_]]) + case _ => writePrimitive(schema.asInstanceOf[PrimitiveType], value) + } } - }.toArray + } - override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) + private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = { + if (value != null && value != Nil) { + schema match { + case StringType => writer.addBinary( + Binary.fromByteArray( + value.asInstanceOf[String].getBytes("utf-8") + ) + ) + case IntegerType => writer.addInteger(value.asInstanceOf[Int]) + case ShortType => writer.addInteger(value.asInstanceOf[Int]) + case LongType => writer.addLong(value.asInstanceOf[Long]) + case ByteType => writer.addInteger(value.asInstanceOf[Int]) + case DoubleType => writer.addDouble(value.asInstanceOf[Double]) + case FloatType => writer.addFloat(value.asInstanceOf[Float]) + case BooleanType => writer.addBoolean(value.asInstanceOf[Boolean]) + case _ => sys.error(s"Do not know how to writer $schema to consumer") + } + } + } - private[parquet] def getCurrentRecord: ParquetRelation.RowType = current + private[parquet] def writeStruct( + schema: StructType, + struct: CatalystConverter.StructScalaType[_]): Unit = { + if (struct != null && struct != Nil) { + val fields = schema.fields.toArray + writer.startGroup() + var i = 0 + while(i < fields.size) { + if (struct(i) != null && struct(i) != Nil) { + writer.startField(fields(i).name, i) + writeValue(fields(i).dataType, struct(i)) + writer.endField(fields(i).name, i) + } + i = i + 1 + } + writer.endGroup() + } + } - override def start(): Unit = { - var i = 0 - while (i < schema.length) { - current.setNullAt(i) - i = i + 1 + // TODO: support null values, see + // https://issues.apache.org/jira/browse/SPARK-1649 + private[parquet] def writeArray( + schema: ArrayType, + array: CatalystConverter.ArrayScalaType[_]): Unit = { + val elementType = schema.elementType + writer.startGroup() + if (array.size > 0) { + writer.startField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) + var i = 0 + while(i < array.size) { + writeValue(elementType, array(i)) + i = i + 1 + } + writer.endField(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 0) } + writer.endGroup() } - override def end(): Unit = {} + // TODO: support null values, see + // https://issues.apache.org/jira/browse/SPARK-1649 + private[parquet] def writeMap( + schema: MapType, + map: CatalystConverter.MapScalaType[_, _]): Unit = { + writer.startGroup() + if (map.size > 0) { + writer.startField(CatalystConverter.MAP_SCHEMA_NAME, 0) + writer.startGroup() + writer.startField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) + for(key <- map.keys) { + writeValue(schema.keyType, key) + } + writer.endField(CatalystConverter.MAP_KEY_SCHEMA_NAME, 0) + writer.startField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) + for(value <- map.values) { + writeValue(schema.valueType, value) + } + writer.endField(CatalystConverter.MAP_VALUE_SCHEMA_NAME, 1) + writer.endGroup() + writer.endField(CatalystConverter.MAP_SCHEMA_NAME, 0) + } + writer.endGroup() + } } -/** - * A `parquet.io.api.PrimitiveConverter` that converts Parquet types to Catalyst types. - * - * @param parent The parent group converter. - * @param fieldIndex The index inside the record. - */ -private[parquet] class CatalystPrimitiveConverter( - parent: CatalystGroupConverter, - fieldIndex: Int) extends PrimitiveConverter { - // TODO: consider refactoring these together with ParquetTypesConverter - override def addBinary(value: Binary): Unit = - parent.getCurrentRecord.update(fieldIndex, value.getBytes) +// Optimized for non-nested rows +private[parquet] class MutableRowWriteSupport extends RowWriteSupport { + override def write(record: Row): Unit = { + if (attributes.size > record.size) { + throw new IndexOutOfBoundsException( + s"Trying to write more fields than contained in row (${attributes.size}>${record.size})") + } - override def addBoolean(value: Boolean): Unit = - parent.getCurrentRecord.setBoolean(fieldIndex, value) + var index = 0 + writer.startMessage() + while(index < attributes.size) { + // null values indicate optional fields but we do not check currently + if (record(index) != null && record(index) != Nil) { + writer.startField(attributes(index).name, index) + consumeType(attributes(index).dataType, record, index) + writer.endField(attributes(index).name, index) + } + index = index + 1 + } + writer.endMessage() + } - override def addDouble(value: Double): Unit = - parent.getCurrentRecord.setDouble(fieldIndex, value) + private def consumeType( + ctype: DataType, + record: Row, + index: Int): Unit = { + ctype match { + case StringType => writer.addBinary( + Binary.fromByteArray( + record(index).asInstanceOf[String].getBytes("utf-8") + ) + ) + case IntegerType => writer.addInteger(record.getInt(index)) + case ShortType => writer.addInteger(record.getShort(index)) + case LongType => writer.addLong(record.getLong(index)) + case ByteType => writer.addInteger(record.getByte(index)) + case DoubleType => writer.addDouble(record.getDouble(index)) + case FloatType => writer.addFloat(record.getFloat(index)) + case BooleanType => writer.addBoolean(record.getBoolean(index)) + case _ => sys.error(s"Unsupported datatype $ctype, cannot write to consumer") + } + } +} - override def addFloat(value: Float): Unit = - parent.getCurrentRecord.setFloat(fieldIndex, value) +private[parquet] object RowWriteSupport { + val SPARK_ROW_SCHEMA: String = "org.apache.spark.sql.parquet.row.attributes" - override def addInt(value: Int): Unit = - parent.getCurrentRecord.setInt(fieldIndex, value) + def getSchema(configuration: Configuration): Seq[Attribute] = { + val schemaString = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA) + if (schemaString == null) { + throw new RuntimeException("Missing schema!") + } + ParquetTypesConverter.convertFromString(schemaString) + } - override def addLong(value: Long): Unit = - parent.getCurrentRecord.setLong(fieldIndex, value) + def setSchema(schema: Seq[Attribute], configuration: Configuration) { + val encoded = ParquetTypesConverter.convertToString(schema) + configuration.set(SPARK_ROW_SCHEMA, encoded) + configuration.set( + ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.WriterVersion.PARQUET_1_0.toString) + } } -/** - * A `parquet.io.api.PrimitiveConverter` that converts Parquet strings (fixed-length byte arrays) - * into Catalyst Strings. - * - * @param parent The parent group converter. - * @param fieldIndex The index inside the record. - */ -private[parquet] class CatalystPrimitiveStringConverter( - parent: CatalystGroupConverter, - fieldIndex: Int) extends CatalystPrimitiveConverter(parent, fieldIndex) { - override def addBinary(value: Binary): Unit = - parent.getCurrentRecord.setString(fieldIndex, value.toStringUsingUTF8) -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala index 46c7172985642..1dc58633a2a68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala @@ -17,14 +17,19 @@ package org.apache.spark.sql.parquet +import java.io.File + import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.mapreduce.Job import parquet.example.data.{GroupWriter, Group} import parquet.example.data.simple.SimpleGroup -import parquet.hadoop.ParquetWriter +import parquet.hadoop.{ParquetReader, ParquetFileReader, ParquetWriter} import parquet.hadoop.api.WriteSupport import parquet.hadoop.api.WriteSupport.WriteContext +import parquet.hadoop.example.GroupReadSupport +import parquet.hadoop.util.ContextUtil import parquet.io.api.RecordConsumer import parquet.schema.{MessageType, MessageTypeParser} @@ -51,13 +56,13 @@ private[sql] object ParquetTestData { val testSchema = """message myrecord { - |optional boolean myboolean; - |optional int32 myint; - |optional binary mystring; - |optional int64 mylong; - |optional float myfloat; - |optional double mydouble; - |}""".stripMargin + optional boolean myboolean; + optional int32 myint; + optional binary mystring; + optional int64 mylong; + optional float myfloat; + optional double mydouble; + }""" // field names for test assertion error messages val testSchemaFieldNames = Seq( @@ -71,23 +76,23 @@ private[sql] object ParquetTestData { val subTestSchema = """ - |message myrecord { - |optional boolean myboolean; - |optional int64 mylong; - |} - """.stripMargin + message myrecord { + optional boolean myboolean; + optional int64 mylong; + } + """ val testFilterSchema = """ - |message myrecord { - |required boolean myboolean; - |required int32 myint; - |required binary mystring; - |required int64 mylong; - |required float myfloat; - |required double mydouble; - |} - """.stripMargin + message myrecord { + required boolean myboolean; + required int32 myint; + required binary mystring; + required int64 mylong; + required float myfloat; + required double mydouble; + } + """ // field names for test assertion error messages val subTestSchemaFieldNames = Seq( @@ -100,9 +105,110 @@ private[sql] object ParquetTestData { lazy val testData = new ParquetRelation(testDir.toURI.toString) + val testNestedSchema1 = + // based on blogpost example, source: + // https://blog.twitter.com/2013/dremel-made-simple-with-parquet + // note: instead of string we have to use binary (?) otherwise + // Parquet gives us: + // IllegalArgumentException: expected one of [INT64, INT32, BOOLEAN, + // BINARY, FLOAT, DOUBLE, INT96, FIXED_LEN_BYTE_ARRAY] + // Also repeated primitives seem tricky to convert (AvroParquet + // only uses them in arrays?) so only use at most one in each group + // and nothing else in that group (-> is mapped to array)! + // The "values" inside ownerPhoneNumbers is a keyword currently + // so that array types can be translated correctly. + """ + message AddressBook { + required binary owner; + optional group ownerPhoneNumbers { + repeated binary array; + } + optional group contacts { + repeated group array { + required binary name; + optional binary phoneNumber; + } + } + } + """ + + + val testNestedSchema2 = + """ + message TestNested2 { + required int32 firstInt; + optional int32 secondInt; + optional group longs { + repeated int64 array; + } + required group entries { + repeated group array { + required double value; + optional boolean truth; + } + } + optional group outerouter { + repeated group array { + repeated group array { + repeated int32 array; + } + } + } + } + """ + + val testNestedSchema3 = + """ + message TestNested3 { + required int32 x; + optional group booleanNumberPairs { + repeated group array { + required int32 key; + optional group value { + repeated group array { + required double nestedValue; + optional boolean truth; + } + } + } + } + } + """ + + val testNestedSchema4 = + """ + message TestNested4 { + required int32 x; + optional group data1 { + repeated group map { + required binary key; + required int32 value; + } + } + required group data2 { + repeated group map { + required binary key; + required group value { + required int64 payload1; + optional binary payload2; + } + } + } + } + """ + + val testNestedDir1 = Utils.createTempDir() + val testNestedDir2 = Utils.createTempDir() + val testNestedDir3 = Utils.createTempDir() + val testNestedDir4 = Utils.createTempDir() + + lazy val testNestedData1 = new ParquetRelation(testNestedDir1.toURI.toString) + lazy val testNestedData2 = new ParquetRelation(testNestedDir2.toURI.toString) + def writeFile() = { - testDir.delete + testDir.delete() val path: Path = new Path(new Path(testDir.toURI), new Path("part-r-0.parquet")) + val job = new Job() val schema: MessageType = MessageTypeParser.parseMessageType(testSchema) val writeSupport = new TestGroupWriteSupport(schema) val writer = new ParquetWriter[Group](path, writeSupport) @@ -150,5 +256,149 @@ private[sql] object ParquetTestData { } writer.close() } + + def writeNestedFile1() { + // example data from https://blog.twitter.com/2013/dremel-made-simple-with-parquet + testNestedDir1.delete() + val path: Path = new Path(new Path(testNestedDir1.toURI), new Path("part-r-0.parquet")) + val schema: MessageType = MessageTypeParser.parseMessageType(testNestedSchema1) + + val r1 = new SimpleGroup(schema) + r1.add(0, "Julien Le Dem") + r1.addGroup(1) + .append(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, "555 123 4567") + .append(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, "555 666 1337") + .append(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, "XXX XXX XXXX") + val contacts = r1.addGroup(2) + contacts.addGroup(0) + .append("name", "Dmitriy Ryaboy") + .append("phoneNumber", "555 987 6543") + contacts.addGroup(0) + .append("name", "Chris Aniszczyk") + + val r2 = new SimpleGroup(schema) + r2.add(0, "A. Nonymous") + + val writeSupport = new TestGroupWriteSupport(schema) + val writer = new ParquetWriter[Group](path, writeSupport) + writer.write(r1) + writer.write(r2) + writer.close() + } + + def writeNestedFile2() { + testNestedDir2.delete() + val path: Path = new Path(new Path(testNestedDir2.toURI), new Path("part-r-0.parquet")) + val schema: MessageType = MessageTypeParser.parseMessageType(testNestedSchema2) + + val r1 = new SimpleGroup(schema) + r1.add(0, 1) + r1.add(1, 7) + val longs = r1.addGroup(2) + longs.add(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME , 1.toLong << 32) + longs.add(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 1.toLong << 33) + longs.add(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 1.toLong << 34) + val booleanNumberPair = r1.addGroup(3).addGroup(0) + booleanNumberPair.add("value", 2.5) + booleanNumberPair.add("truth", false) + val top_level = r1.addGroup(4) + val second_level_a = top_level.addGroup(0) + val second_level_b = top_level.addGroup(0) + val third_level_aa = second_level_a.addGroup(0) + val third_level_ab = second_level_a.addGroup(0) + val third_level_c = second_level_b.addGroup(0) + third_level_aa.add( + CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, + 7) + third_level_ab.add( + CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, + 8) + third_level_c.add( + CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, + 9) + + val writeSupport = new TestGroupWriteSupport(schema) + val writer = new ParquetWriter[Group](path, writeSupport) + writer.write(r1) + writer.close() + } + + def writeNestedFile3() { + testNestedDir3.delete() + val path: Path = new Path(new Path(testNestedDir3.toURI), new Path("part-r-0.parquet")) + val schema: MessageType = MessageTypeParser.parseMessageType(testNestedSchema3) + + val r1 = new SimpleGroup(schema) + r1.add(0, 1) + val booleanNumberPairs = r1.addGroup(1) + val g1 = booleanNumberPairs.addGroup(0) + g1.add(0, 1) + val nested1 = g1.addGroup(1) + val ng1 = nested1.addGroup(0) + ng1.add(0, 1.5) + ng1.add(1, false) + val ng2 = nested1.addGroup(0) + ng2.add(0, 2.5) + ng2.add(1, true) + val g2 = booleanNumberPairs.addGroup(0) + g2.add(0, 2) + val ng3 = g2.addGroup(1) + .addGroup(0) + ng3.add(0, 3.5) + ng3.add(1, false) + + val writeSupport = new TestGroupWriteSupport(schema) + val writer = new ParquetWriter[Group](path, writeSupport) + writer.write(r1) + writer.close() + } + + def writeNestedFile4() { + testNestedDir4.delete() + val path: Path = new Path(new Path(testNestedDir4.toURI), new Path("part-r-0.parquet")) + val schema: MessageType = MessageTypeParser.parseMessageType(testNestedSchema4) + + val r1 = new SimpleGroup(schema) + r1.add(0, 7) + val map1 = r1.addGroup(1) + val keyValue1 = map1.addGroup(0) + keyValue1.add(0, "key1") + keyValue1.add(1, 1) + val keyValue2 = map1.addGroup(0) + keyValue2.add(0, "key2") + keyValue2.add(1, 2) + val map2 = r1.addGroup(2) + val keyValue3 = map2.addGroup(0) + // TODO: currently only string key type supported + keyValue3.add(0, "seven") + val valueGroup1 = keyValue3.addGroup(1) + valueGroup1.add(0, 42.toLong) + valueGroup1.add(1, "the answer") + val keyValue4 = map2.addGroup(0) + // TODO: currently only string key type supported + keyValue4.add(0, "eight") + val valueGroup2 = keyValue4.addGroup(1) + valueGroup2.add(0, 49.toLong) + + val writeSupport = new TestGroupWriteSupport(schema) + val writer = new ParquetWriter[Group](path, writeSupport) + writer.write(r1) + writer.close() + } + + // TODO: this is not actually used anywhere but useful for debugging + /* def readNestedFile(file: File, schemaString: String): Unit = { + val configuration = new Configuration() + val path = new Path(new Path(file.toURI), new Path("part-r-0.parquet")) + val fs: FileSystem = path.getFileSystem(configuration) + val schema: MessageType = MessageTypeParser.parseMessageType(schemaString) + assert(schema != null) + val outputStatus: FileStatus = fs.getFileStatus(new Path(path.toString)) + val footers = ParquetFileReader.readFooter(configuration, outputStatus) + assert(footers != null) + val reader = new ParquetReader(new Path(path.toString), new GroupReadSupport()) + val first = reader.read() + assert(first != null) + } */ } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala new file mode 100644 index 0000000000000..f9046368e7ced --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import java.io.IOException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.mapreduce.Job + +import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter} +import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData} +import parquet.hadoop.util.ContextUtil +import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveType, MessageType} +import parquet.schema.{GroupType => ParquetGroupType, OriginalType => ParquetOriginalType, ConversionPatterns} +import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName} +import parquet.schema.Type.Repetition + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute} +import org.apache.spark.sql.catalyst.types._ + +// Implicits +import scala.collection.JavaConversions._ + +private[parquet] object ParquetTypesConverter extends Logging { + def isPrimitiveType(ctype: DataType): Boolean = + classOf[PrimitiveType] isAssignableFrom ctype.getClass + + def toPrimitiveDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match { + case ParquetPrimitiveTypeName.BINARY => StringType + case ParquetPrimitiveTypeName.BOOLEAN => BooleanType + case ParquetPrimitiveTypeName.DOUBLE => DoubleType + case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY => ArrayType(ByteType) + case ParquetPrimitiveTypeName.FLOAT => FloatType + case ParquetPrimitiveTypeName.INT32 => IntegerType + case ParquetPrimitiveTypeName.INT64 => LongType + case ParquetPrimitiveTypeName.INT96 => + // TODO: add BigInteger type? TODO(andre) use DecimalType instead???? + sys.error("Potential loss of precision: cannot convert INT96") + case _ => sys.error( + s"Unsupported parquet datatype $parquetType") + } + + /** + * Converts a given Parquet `Type` into the corresponding + * [[org.apache.spark.sql.catalyst.types.DataType]]. + * + * We apply the following conversion rules: + * + * Note that fields are determined to be `nullable` if and only if their Parquet repetition + * level is not `REQUIRED`. + * + * @param parquetType The type to convert. + * @return The corresponding Catalyst type. + */ + def toDataType(parquetType: ParquetType): DataType = { + def correspondsToMap(groupType: ParquetGroupType): Boolean = { + if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) { + false + } else { + // This mostly follows the convention in ``parquet.schema.ConversionPatterns`` + val keyValueGroup = groupType.getFields.apply(0).asGroupType() + keyValueGroup.getRepetition == Repetition.REPEATED && + keyValueGroup.getName == CatalystConverter.MAP_SCHEMA_NAME && + keyValueGroup.getFieldCount == 2 && + keyValueGroup.getFields.apply(0).getName == CatalystConverter.MAP_KEY_SCHEMA_NAME && + keyValueGroup.getFields.apply(1).getName == CatalystConverter.MAP_VALUE_SCHEMA_NAME + } + } + + def correspondsToArray(groupType: ParquetGroupType): Boolean = { + groupType.getFieldCount == 1 && + groupType.getFieldName(0) == CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME && + groupType.getFields.apply(0).getRepetition == Repetition.REPEATED + } + + if (parquetType.isPrimitive) { + toPrimitiveDataType(parquetType.asPrimitiveType.getPrimitiveTypeName) + } else { + val groupType = parquetType.asGroupType() + parquetType.getOriginalType match { + // if the schema was constructed programmatically there may be hints how to convert + // it inside the metadata via the OriginalType field + case ParquetOriginalType.LIST => { // TODO: check enums! + assert(groupType.getFieldCount == 1) + val field = groupType.getFields.apply(0) + new ArrayType(toDataType(field)) + } + case ParquetOriginalType.MAP => { + assert( + !groupType.getFields.apply(0).isPrimitive, + "Parquet Map type malformatted: expected nested group for map!") + val keyValueGroup = groupType.getFields.apply(0).asGroupType() + assert( + keyValueGroup.getFieldCount == 2, + "Parquet Map type malformatted: nested group should have 2 (key, value) fields!") + val keyType = toDataType(keyValueGroup.getFields.apply(0)) + assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED) + val valueType = toDataType(keyValueGroup.getFields.apply(1)) + assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED) + new MapType(keyType, valueType) + } + case _ => { + // Note: the order of these checks is important! + if (correspondsToMap(groupType)) { // MapType + val keyValueGroup = groupType.getFields.apply(0).asGroupType() + val keyType = toDataType(keyValueGroup.getFields.apply(0)) + assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED) + val valueType = toDataType(keyValueGroup.getFields.apply(1)) + assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED) + new MapType(keyType, valueType) + } else if (correspondsToArray(groupType)) { // ArrayType + val elementType = toDataType(groupType.getFields.apply(0)) + new ArrayType(elementType) + } else { // everything else: StructType + val fields = groupType + .getFields + .map(ptype => new StructField( + ptype.getName, + toDataType(ptype), + ptype.getRepetition != Repetition.REQUIRED)) + new StructType(fields) + } + } + } + } + } + + /** + * For a given Catalyst [[org.apache.spark.sql.catalyst.types.DataType]] return + * the name of the corresponding Parquet primitive type or None if the given type + * is not primitive. + * + * @param ctype The type to convert + * @return The name of the corresponding Parquet primitive type + */ + def fromPrimitiveDataType(ctype: DataType): + Option[ParquetPrimitiveTypeName] = ctype match { + case StringType => Some(ParquetPrimitiveTypeName.BINARY) + case BooleanType => Some(ParquetPrimitiveTypeName.BOOLEAN) + case DoubleType => Some(ParquetPrimitiveTypeName.DOUBLE) + case ArrayType(ByteType) => + Some(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) + case FloatType => Some(ParquetPrimitiveTypeName.FLOAT) + case IntegerType => Some(ParquetPrimitiveTypeName.INT32) + // There is no type for Byte or Short so we promote them to INT32. + case ShortType => Some(ParquetPrimitiveTypeName.INT32) + case ByteType => Some(ParquetPrimitiveTypeName.INT32) + case LongType => Some(ParquetPrimitiveTypeName.INT64) + case _ => None + } + + /** + * Converts a given Catalyst [[org.apache.spark.sql.catalyst.types.DataType]] into + * the corresponding Parquet `Type`. + * + * The conversion follows the rules below: + * + * Parquet's repetition level is generally set according to the following rule: + * + * + *@param ctype The type to convert + * @param name The name of the [[org.apache.spark.sql.catalyst.expressions.Attribute]] + * whose type is converted + * @param nullable When true indicates that the attribute is nullable + * @param inArray When true indicates that this is a nested attribute inside an array. + * @return The corresponding Parquet type. + */ + def fromDataType( + ctype: DataType, + name: String, + nullable: Boolean = true, + inArray: Boolean = false): ParquetType = { + val repetition = + if (inArray) { + Repetition.REPEATED + } else { + if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED + } + val primitiveType = fromPrimitiveDataType(ctype) + if (primitiveType.isDefined) { + new ParquetPrimitiveType(repetition, primitiveType.get, name) + } else { + ctype match { + case ArrayType(elementType) => { + val parquetElementType = fromDataType( + elementType, + CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, + nullable = false, + inArray = true) + ConversionPatterns.listType(repetition, name, parquetElementType) + } + case StructType(structFields) => { + val fields = structFields.map { + field => fromDataType(field.dataType, field.name, field.nullable, inArray = false) + } + new ParquetGroupType(repetition, name, fields) + } + case MapType(keyType, valueType) => { + val parquetKeyType = + fromDataType( + keyType, + CatalystConverter.MAP_KEY_SCHEMA_NAME, + nullable = false, + inArray = false) + val parquetValueType = + fromDataType( + valueType, + CatalystConverter.MAP_VALUE_SCHEMA_NAME, + nullable = false, + inArray = false) + ConversionPatterns.mapType( + repetition, + name, + parquetKeyType, + parquetValueType) + } + case _ => sys.error(s"Unsupported datatype $ctype") + } + } + } + + def convertToAttributes(parquetSchema: ParquetType): Seq[Attribute] = { + parquetSchema + .asGroupType() + .getFields + .map( + field => + new AttributeReference( + field.getName, + toDataType(field), + field.getRepetition != Repetition.REQUIRED)()) + } + + def convertFromAttributes(attributes: Seq[Attribute]): MessageType = { + val fields = attributes.map( + attribute => + fromDataType(attribute.dataType, attribute.name, attribute.nullable)) + new MessageType("root", fields) + } + + def convertFromString(string: String): Seq[Attribute] = { + DataType(string) match { + case s: StructType => s.toAttributes + case other => sys.error(s"Can convert $string to row") + } + } + + def convertToString(schema: Seq[Attribute]): String = { + StructType.fromAttributes(schema).toString + } + + def writeMetaData(attributes: Seq[Attribute], origPath: Path, conf: Configuration): Unit = { + if (origPath == null) { + throw new IllegalArgumentException("Unable to write Parquet metadata: path is null") + } + val fs = origPath.getFileSystem(conf) + if (fs == null) { + throw new IllegalArgumentException( + s"Unable to write Parquet metadata: path $origPath is incorrectly formatted") + } + val path = origPath.makeQualified(fs) + if (fs.exists(path) && !fs.getFileStatus(path).isDir) { + throw new IllegalArgumentException(s"Expected to write to directory $path but found file") + } + val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE) + if (fs.exists(metadataPath)) { + try { + fs.delete(metadataPath, true) + } catch { + case e: IOException => + throw new IOException(s"Unable to delete previous PARQUET_METADATA_FILE at $metadataPath") + } + } + val extraMetadata = new java.util.HashMap[String, String]() + extraMetadata.put( + RowReadSupport.SPARK_METADATA_KEY, + ParquetTypesConverter.convertToString(attributes)) + // TODO: add extra data, e.g., table name, date, etc.? + + val parquetSchema: MessageType = + ParquetTypesConverter.convertFromAttributes(attributes) + val metaData: FileMetaData = new FileMetaData( + parquetSchema, + extraMetadata, + "Spark") + + ParquetRelation.enableLogForwarding() + ParquetFileWriter.writeMetadataFile( + conf, + path, + new Footer(path, new ParquetMetadata(metaData, Nil)) :: Nil) + } + + /** + * Try to read Parquet metadata at the given Path. We first see if there is a summary file + * in the parent directory. If so, this is used. Else we read the actual footer at the given + * location. + * @param origPath The path at which we expect one (or more) Parquet files. + * @param configuration The Hadoop configuration to use. + * @return The `ParquetMetadata` containing among other things the schema. + */ + def readMetaData(origPath: Path, configuration: Option[Configuration]): ParquetMetadata = { + if (origPath == null) { + throw new IllegalArgumentException("Unable to read Parquet metadata: path is null") + } + val job = new Job() + val conf = configuration.getOrElse(ContextUtil.getConfiguration(job)) + val fs: FileSystem = origPath.getFileSystem(conf) + if (fs == null) { + throw new IllegalArgumentException(s"Incorrectly formatted Parquet metadata path $origPath") + } + val path = origPath.makeQualified(fs) + if (!fs.getFileStatus(path).isDir) { + throw new IllegalArgumentException( + s"Expected $path for be a directory with Parquet files/metadata") + } + ParquetRelation.enableLogForwarding() + val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE) + // if this is a new table that was just created we will find only the metadata file + if (fs.exists(metadataPath) && fs.isFile(metadataPath)) { + ParquetFileReader.readFooter(conf, metadataPath) + } else { + // there may be one or more Parquet files in the given directory + val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path)) + // TODO: for now we assume that all footers (if there is more than one) have identical + // metadata; we may want to add a check here at some point + if (footers.size() == 0) { + throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path") + } + footers(0).getParquetMetadata + } + } + + /** + * Reads in Parquet Metadata from the given path and tries to extract the schema + * (Catalyst attributes) from the application-specific key-value map. If this + * is empty it falls back to converting from the Parquet file schema which + * may lead to an upcast of types (e.g., {byte, short} to int). + * + * @param origPath The path at which we expect one (or more) Parquet files. + * @param conf The Hadoop configuration to use. + * @return A list of attributes that make up the schema. + */ + def readSchemaFromFile(origPath: Path, conf: Option[Configuration]): Seq[Attribute] = { + val keyValueMetadata: java.util.Map[String, String] = + readMetaData(origPath, conf) + .getFileMetaData + .getKeyValueMetaData + if (keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) { + convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY)) + } else { + val attributes = convertToAttributes( + readMetaData(origPath, conf).getFileMetaData.getSchema) + log.warn(s"Falling back to schema conversion from Parquet types; result: $attributes") + attributes + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 9810520bb9ae6..0c239d00b199b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -26,15 +26,16 @@ import parquet.hadoop.ParquetFileWriter import parquet.hadoop.util.ContextUtil import parquet.schema.MessageTypeParser +import org.apache.spark.SparkContext import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.util.getTempFilePath import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType} import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.TestData import org.apache.spark.sql.SchemaRDD -import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.catalyst.expressions.Equals -import org.apache.spark.sql.catalyst.types.IntegerType +import org.apache.spark.sql.catalyst.util.getTempFilePath +import org.apache.spark.sql.catalyst.{SqlLexical, SqlParser} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, Star} import org.apache.spark.util.Utils // Implicits @@ -56,15 +57,37 @@ case class OptionalReflectData( doubleField: Option[Double], booleanField: Option[Boolean]) +case class Nested(i: Int, s: String) + +case class Data(array: Seq[Int], nested: Nested) + +case class AllDataTypes( + stringField: String, + intField: Int, + longField: Long, + floatField: Float, + doubleField: Double, + shortField: Short, + byteField: Byte, + booleanField: Boolean) + class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { import TestData._ TestData // Load test data tables. var testRDD: SchemaRDD = null + // TODO: remove this once SqlParser can parse nested select statements + var nestedParserSqlContext: NestedParserSQLContext = null + override def beforeAll() { + nestedParserSqlContext = new NestedParserSQLContext(TestSQLContext.sparkContext) ParquetTestData.writeFile() ParquetTestData.writeFilterFile() + ParquetTestData.writeNestedFile1() + ParquetTestData.writeNestedFile2() + ParquetTestData.writeNestedFile3() + ParquetTestData.writeNestedFile4() testRDD = parquetFile(ParquetTestData.testDir.toString) testRDD.registerAsTable("testsource") parquetFile(ParquetTestData.testFilterDir.toString) @@ -74,9 +97,33 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA override def afterAll() { Utils.deleteRecursively(ParquetTestData.testDir) Utils.deleteRecursively(ParquetTestData.testFilterDir) + Utils.deleteRecursively(ParquetTestData.testNestedDir1) + Utils.deleteRecursively(ParquetTestData.testNestedDir2) + Utils.deleteRecursively(ParquetTestData.testNestedDir3) + Utils.deleteRecursively(ParquetTestData.testNestedDir4) // here we should also unregister the table?? } + test("Read/Write All Types") { + val tempDir = getTempFilePath("parquetTest").getCanonicalPath + val range = (0 to 255) + TestSQLContext.sparkContext.parallelize(range) + .map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)) + .saveAsParquetFile(tempDir) + val result = parquetFile(tempDir).collect() + range.foreach { + i => + assert(result(i).getString(0) == s"$i", s"row $i String field did not match, got ${result(i).getString(0)}") + assert(result(i).getInt(1) === i) + assert(result(i).getLong(2) === i.toLong) + assert(result(i).getFloat(3) === i.toFloat) + assert(result(i).getDouble(4) === i.toDouble) + assert(result(i).getShort(5) === i.toShort) + assert(result(i).getByte(6) === i.toByte) + assert(result(i).getBoolean(7) === (i % 2 == 0)) + } + } + test("self-join parquet files") { val x = ParquetTestData.testData.as('x) val y = ParquetTestData.testData.as('y) @@ -154,7 +201,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA path, TestSQLContext.sparkContext.hadoopConfiguration) assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE))) - val metaData = ParquetTypesConverter.readMetaData(path) + val metaData = ParquetTypesConverter.readMetaData(path, Some(ContextUtil.getConfiguration(job))) assert(metaData != null) ParquetTestData .testData @@ -197,10 +244,37 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i") } Utils.deleteRecursively(file) - assert(true) } - test("insert (appending) to same table via Scala API") { + test("Insert (overwrite) via Scala API") { + val dirname = Utils.createTempDir() + val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + source_rdd.registerAsTable("source") + val dest_rdd = createParquetFile[TestRDDEntry](dirname.toString) + dest_rdd.registerAsTable("dest") + sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect() + val rdd_copy1 = sql("SELECT * FROM dest").collect() + assert(rdd_copy1.size === 100) + assert(rdd_copy1(0).apply(0) === 1) + assert(rdd_copy1(0).apply(1) === "val_1") + // TODO: why does collecting break things? It seems InsertIntoParquet::execute() is + // executed twice otherwise?! + sql("INSERT INTO dest SELECT * FROM source") + val rdd_copy2 = sql("SELECT * FROM dest").collect() + assert(rdd_copy2.size === 200) + assert(rdd_copy2(0).apply(0) === 1) + assert(rdd_copy2(0).apply(1) === "val_1") + assert(rdd_copy2(99).apply(0) === 100) + assert(rdd_copy2(99).apply(1) === "val_100") + assert(rdd_copy2(100).apply(0) === 1) + assert(rdd_copy2(100).apply(1) === "val_1") + Utils.deleteRecursively(dirname) + } + + test("Insert (appending) to same table via Scala API") { + // TODO: why does collecting break things? It seems InsertIntoParquet::execute() is + // executed twice otherwise?! sql("INSERT INTO testsource SELECT * FROM testsource") val double_rdd = sql("SELECT * FROM testsource").collect() assert(double_rdd != null) @@ -363,4 +437,272 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA val query = sql(s"SELECT mystring FROM testfiltersource WHERE myint < 10") assert(query.collect().size === 10) } + + test("Importing nested Parquet file (Addressbook)") { + val result = TestSQLContext + .parquetFile(ParquetTestData.testNestedDir1.toString) + .toSchemaRDD + .collect() + assert(result != null) + assert(result.size === 2) + val first_record = result(0) + val second_record = result(1) + assert(first_record != null) + assert(second_record != null) + assert(first_record.size === 3) + assert(second_record(1) === null) + assert(second_record(2) === null) + assert(second_record(0) === "A. Nonymous") + assert(first_record(0) === "Julien Le Dem") + val first_owner_numbers = first_record(1) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]] + val first_contacts = first_record(2) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]] + assert(first_owner_numbers != null) + assert(first_owner_numbers(0) === "555 123 4567") + assert(first_owner_numbers(2) === "XXX XXX XXXX") + assert(first_contacts(0) + .asInstanceOf[CatalystConverter.StructScalaType[_]].size === 2) + val first_contacts_entry_one = first_contacts(0) + .asInstanceOf[CatalystConverter.StructScalaType[_]] + assert(first_contacts_entry_one(0) === "Dmitriy Ryaboy") + assert(first_contacts_entry_one(1) === "555 987 6543") + val first_contacts_entry_two = first_contacts(1) + .asInstanceOf[CatalystConverter.StructScalaType[_]] + assert(first_contacts_entry_two(0) === "Chris Aniszczyk") + } + + test("Importing nested Parquet file (nested numbers)") { + val result = TestSQLContext + .parquetFile(ParquetTestData.testNestedDir2.toString) + .toSchemaRDD + .collect() + assert(result.size === 1, "number of top-level rows incorrect") + assert(result(0).size === 5, "number of fields in row incorrect") + assert(result(0)(0) === 1) + assert(result(0)(1) === 7) + val subresult1 = result(0)(2).asInstanceOf[CatalystConverter.ArrayScalaType[_]] + assert(subresult1.size === 3) + assert(subresult1(0) === (1.toLong << 32)) + assert(subresult1(1) === (1.toLong << 33)) + assert(subresult1(2) === (1.toLong << 34)) + val subresult2 = result(0)(3) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) + .asInstanceOf[CatalystConverter.StructScalaType[_]] + assert(subresult2.size === 2) + assert(subresult2(0) === 2.5) + assert(subresult2(1) === false) + val subresult3 = result(0)(4) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]] + assert(subresult3.size === 2) + assert(subresult3(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]].size === 2) + val subresult4 = subresult3(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]] + assert(subresult4(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 7) + assert(subresult4(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 8) + assert(subresult3(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]].size === 1) + assert(subresult3(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 9) + } + + test("Simple query on addressbook") { + val data = TestSQLContext + .parquetFile(ParquetTestData.testNestedDir1.toString) + .toSchemaRDD + val tmp = data.where('owner === "Julien Le Dem").select('owner as 'a, 'contacts as 'c).collect() + assert(tmp.size === 1) + assert(tmp(0)(0) === "Julien Le Dem") + } + + test("Projection in addressbook") { + val data = nestedParserSqlContext + .parquetFile(ParquetTestData.testNestedDir1.toString) + .toSchemaRDD + data.registerAsTable("data") + val query = nestedParserSqlContext.sql("SELECT owner, contacts[1].name FROM data") + val tmp = query.collect() + assert(tmp.size === 2) + assert(tmp(0).size === 2) + assert(tmp(0)(0) === "Julien Le Dem") + assert(tmp(0)(1) === "Chris Aniszczyk") + assert(tmp(1)(0) === "A. Nonymous") + assert(tmp(1)(1) === null) + } + + test("Simple query on nested int data") { + val data = nestedParserSqlContext + .parquetFile(ParquetTestData.testNestedDir2.toString) + .toSchemaRDD + data.registerAsTable("data") + val result1 = nestedParserSqlContext.sql("SELECT entries[0].value FROM data").collect() + assert(result1.size === 1) + assert(result1(0).size === 1) + assert(result1(0)(0) === 2.5) + val result2 = nestedParserSqlContext.sql("SELECT entries[0] FROM data").collect() + assert(result2.size === 1) + val subresult1 = result2(0)(0).asInstanceOf[CatalystConverter.StructScalaType[_]] + assert(subresult1.size === 2) + assert(subresult1(0) === 2.5) + assert(subresult1(1) === false) + val result3 = nestedParserSqlContext.sql("SELECT outerouter FROM data").collect() + val subresult2 = result3(0)(0) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]] + assert(subresult2(0).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 7) + assert(subresult2(1).asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 8) + assert(result3(0)(0) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]](1) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) + .asInstanceOf[CatalystConverter.ArrayScalaType[_]](0) === 9) + } + + test("nested structs") { + val data = nestedParserSqlContext + .parquetFile(ParquetTestData.testNestedDir3.toString) + .toSchemaRDD + data.registerAsTable("data") + val result1 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[0].value[0].truth FROM data").collect() + assert(result1.size === 1) + assert(result1(0).size === 1) + assert(result1(0)(0) === false) + val result2 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[0].value[1].truth FROM data").collect() + assert(result2.size === 1) + assert(result2(0).size === 1) + assert(result2(0)(0) === true) + val result3 = nestedParserSqlContext.sql("SELECT booleanNumberPairs[1].value[0].truth FROM data").collect() + assert(result3.size === 1) + assert(result3(0).size === 1) + assert(result3(0)(0) === false) + } + + test("simple map") { + val data = TestSQLContext + .parquetFile(ParquetTestData.testNestedDir4.toString) + .toSchemaRDD + data.registerAsTable("mapTable") + val result1 = sql("SELECT data1 FROM mapTable").collect() + assert(result1.size === 1) + assert(result1(0)(0) + .asInstanceOf[CatalystConverter.MapScalaType[String, _]] + .getOrElse("key1", 0) === 1) + assert(result1(0)(0) + .asInstanceOf[CatalystConverter.MapScalaType[String, _]] + .getOrElse("key2", 0) === 2) + val result2 = sql("""SELECT data1["key1"] FROM mapTable""").collect() + assert(result2(0)(0) === 1) + } + + test("map with struct values") { + val data = nestedParserSqlContext + .parquetFile(ParquetTestData.testNestedDir4.toString) + .toSchemaRDD + data.registerAsTable("mapTable") + val result1 = nestedParserSqlContext.sql("SELECT data2 FROM mapTable").collect() + assert(result1.size === 1) + val entry1 = result1(0)(0) + .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]] + .getOrElse("seven", null) + assert(entry1 != null) + assert(entry1(0) === 42) + assert(entry1(1) === "the answer") + val entry2 = result1(0)(0) + .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]] + .getOrElse("eight", null) + assert(entry2 != null) + assert(entry2(0) === 49) + assert(entry2(1) === null) + val result2 = nestedParserSqlContext.sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM mapTable""").collect() + assert(result2.size === 1) + assert(result2(0)(0) === 42.toLong) + assert(result2(0)(1) === "the answer") + } + + test("Writing out Addressbook and reading it back in") { + // TODO: find out why CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME + // has no effect in this test case + val tmpdir = Utils.createTempDir() + Utils.deleteRecursively(tmpdir) + val result = nestedParserSqlContext + .parquetFile(ParquetTestData.testNestedDir1.toString) + .toSchemaRDD + result.saveAsParquetFile(tmpdir.toString) + nestedParserSqlContext + .parquetFile(tmpdir.toString) + .toSchemaRDD + .registerAsTable("tmpcopy") + val tmpdata = nestedParserSqlContext.sql("SELECT owner, contacts[1].name FROM tmpcopy").collect() + assert(tmpdata.size === 2) + assert(tmpdata(0).size === 2) + assert(tmpdata(0)(0) === "Julien Le Dem") + assert(tmpdata(0)(1) === "Chris Aniszczyk") + assert(tmpdata(1)(0) === "A. Nonymous") + assert(tmpdata(1)(1) === null) + Utils.deleteRecursively(tmpdir) + } + + test("Writing out Map and reading it back in") { + val data = nestedParserSqlContext + .parquetFile(ParquetTestData.testNestedDir4.toString) + .toSchemaRDD + val tmpdir = Utils.createTempDir() + Utils.deleteRecursively(tmpdir) + data.saveAsParquetFile(tmpdir.toString) + nestedParserSqlContext + .parquetFile(tmpdir.toString) + .toSchemaRDD + .registerAsTable("tmpmapcopy") + val result1 = nestedParserSqlContext.sql("""SELECT data1["key2"] FROM tmpmapcopy""").collect() + assert(result1.size === 1) + assert(result1(0)(0) === 2) + val result2 = nestedParserSqlContext.sql("SELECT data2 FROM tmpmapcopy").collect() + assert(result2.size === 1) + val entry1 = result2(0)(0) + .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]] + .getOrElse("seven", null) + assert(entry1 != null) + assert(entry1(0) === 42) + assert(entry1(1) === "the answer") + val entry2 = result2(0)(0) + .asInstanceOf[CatalystConverter.MapScalaType[String, CatalystConverter.StructScalaType[_]]] + .getOrElse("eight", null) + assert(entry2 != null) + assert(entry2(0) === 49) + assert(entry2(1) === null) + val result3 = nestedParserSqlContext.sql("""SELECT data2["seven"].payload1, data2["seven"].payload2 FROM tmpmapcopy""").collect() + assert(result3.size === 1) + assert(result3(0)(0) === 42.toLong) + assert(result3(0)(1) === "the answer") + Utils.deleteRecursively(tmpdir) + } +} + +// TODO: the code below is needed temporarily until the standard parser is able to parse +// nested field expressions correctly +class NestedParserSQLContext(@transient override val sparkContext: SparkContext) extends SQLContext(sparkContext) { + override protected[sql] val parser = new NestedSqlParser() +} + +class NestedSqlLexical(override val keywords: Seq[String]) extends SqlLexical(keywords) { + override def identChar = letter | elem('_') + delimiters += (".") +} + +class NestedSqlParser extends SqlParser { + override val lexical = new NestedSqlLexical(reservedWords) + + override protected lazy val baseExpression: PackratParser[Expression] = + expression ~ "[" ~ expression <~ "]" ^^ { + case base ~ _ ~ ordinal => GetItem(base, ordinal) + } | + expression ~ "." ~ ident ^^ { + case base ~ _ ~ fieldName => GetField(base, fieldName) + } | + TRUE ^^^ Literal(true, BooleanType) | + FALSE ^^^ Literal(false, BooleanType) | + cast | + "(" ~> expression <~ ")" | + function | + "-" ~> literal ^^ UnaryMinus | + ident ^^ UnresolvedAttribute | + "*" ^^^ Star(None) | + literal } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 68284344afd55..f923d68932f83 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -208,7 +208,9 @@ object HiveMetastoreTypes extends RegexParsers { } protected lazy val structType: Parser[DataType] = - "struct" ~> "<" ~> repsep(structField,",") <~ ">" ^^ StructType + "struct" ~> "<" ~> repsep(structField,",") <~ ">" ^^ { + case fields => new StructType(fields) + } protected lazy val dataType: Parser[DataType] = arrayType |