Skip to content

Commit

Permalink
Review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreSchumacher committed Jun 19, 2014
1 parent 94eea3a commit 7eceb67
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
2 changes: 1 addition & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ object SparkBuild extends Build {
"com.twitter" % "parquet-hadoop" % parquetVersion,
"com.twitter" % "parquet-avro" % parquetVersion % "test",
// here we need version >= 1.7.5 because of AVRO-1274
"org.apache.avro" % "avro" % "1.7.6" % "test"
"org.apache.avro" % "avro" % "1.7.6" % "test",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.3.0" // json4s-jackson 3.2.6 requires jackson-databind 2.3.0.
),
initialCommands in console :=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ private[sql] object CatalystConverter {
new CatalystArrayConverter(elementType, fieldIndex, parent)
}
case StructType(fields: Seq[StructField]) => {
new CatalystStructConverter(fields, fieldIndex, parent)
new CatalystStructConverter(fields.toArray, fieldIndex, parent)
}
case MapType(keyType: DataType, valueType: DataType) => {
new CatalystMapConverter(
Seq(
Array(
new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false),
new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, true)),
fieldIndex,
parent)
fieldIndex,
parent)
}
// Strings, Shorts and Bytes do not have a corresponding type in Parquet
// so we need to treat them separately
Expand Down Expand Up @@ -127,14 +127,14 @@ private[sql] object CatalystConverter {
attributes: Seq[Attribute]): CatalystConverter = {
// For non-nested types we use the optimized Row converter
if (attributes.forall(a => ParquetTypesConverter.isPrimitiveType(a.dataType))) {
new CatalystPrimitiveRowConverter(attributes)
new CatalystPrimitiveRowConverter(attributes.toArray)
} else {
new CatalystGroupConverter(attributes)
new CatalystGroupConverter(attributes.toArray)
}
}
}

private[parquet] trait CatalystConverter {
private[parquet] abstract class CatalystConverter extends GroupConverter {
/**
* The number of fields this group has
*/
Expand Down Expand Up @@ -206,14 +206,14 @@ private[parquet] trait CatalystConverter {
* @param schema The corresponding Catalyst schema in the form of a list of attributes.
*/
private[parquet] class CatalystGroupConverter(
protected[parquet] val schema: Seq[FieldType],
protected[parquet] val schema: Array[FieldType],
protected[parquet] val index: Int,
protected[parquet] val parent: CatalystConverter,
protected[parquet] var current: ArrayBuffer[Any],
protected[parquet] var buffer: ArrayBuffer[Row])
extends GroupConverter with CatalystConverter {
extends CatalystConverter {

def this(schema: Seq[FieldType], index: Int, parent: CatalystConverter) =
def this(schema: Array[FieldType], index: Int, parent: CatalystConverter) =
this(
schema,
index,
Expand All @@ -225,7 +225,7 @@ private[parquet] class CatalystGroupConverter(
/**
* This constructor is used for the root converter only!
*/
def this(attributes: Seq[Attribute]) =
def this(attributes: Array[Attribute]) =
this(attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)), 0, null)

protected [parquet] val converters: Array[Converter] =
Expand Down Expand Up @@ -254,7 +254,7 @@ private[parquet] class CatalystGroupConverter(
override protected[parquet] def clearBuffer(): Unit = buffer.clear()

override def start(): Unit = {
current = ArrayBuffer.fill(schema.length)(null)
current = ArrayBuffer.fill(size)(null)
converters.foreach {
converter => if (!converter.isPrimitive) {
converter.asInstanceOf[CatalystConverter].clearBuffer
Expand All @@ -277,12 +277,12 @@ private[parquet] class CatalystGroupConverter(
* converter is optimized for rows of primitive types (non-nested records).
*/
private[parquet] class CatalystPrimitiveRowConverter(
protected[parquet] val schema: Seq[FieldType],
protected[parquet] val schema: Array[FieldType],
protected[parquet] var current: ParquetRelation.RowType)
extends GroupConverter with CatalystConverter {
extends CatalystConverter {

// This constructor is used for the root converter only
def this(attributes: Seq[Attribute]) =
def this(attributes: Array[Attribute]) =
this(
attributes.map(a => new FieldType(a.name, a.dataType, a.nullable)),
new ParquetRelation.RowType(attributes.length))
Expand Down Expand Up @@ -313,7 +313,7 @@ private[parquet] class CatalystPrimitiveRowConverter(

override def start(): Unit = {
var i = 0
while (i < schema.length) {
while (i < size) {
current.setNullAt(i)
i = i + 1
}
Expand Down Expand Up @@ -398,7 +398,7 @@ private[parquet] class CatalystArrayConverter(
val index: Int,
protected[parquet] val parent: CatalystConverter,
protected[parquet] var buffer: Buffer[Any])
extends GroupConverter with CatalystConverter {
extends CatalystConverter {

def this(elementType: DataType, index: Int, parent: CatalystConverter) =
this(
Expand Down Expand Up @@ -462,7 +462,7 @@ private[parquet] class CatalystNativeArrayConverter(
val index: Int,
protected[parquet] val parent: CatalystConverter,
protected[parquet] var capacity: Int = CatalystArrayConverter.INITIAL_ARRAY_SIZE)
extends GroupConverter with CatalystConverter {
extends CatalystConverter {

type NativeType = elementType.JvmType

Expand Down Expand Up @@ -577,7 +577,7 @@ private[parquet] class CatalystNativeArrayConverter(
* @param parent
*/
private[parquet] class CatalystStructConverter(
override protected[parquet] val schema: Seq[FieldType],
override protected[parquet] val schema: Array[FieldType],
override protected[parquet] val index: Int,
override protected[parquet] val parent: CatalystConverter)
extends CatalystGroupConverter(schema, index, parent) {
Expand Down Expand Up @@ -605,14 +605,14 @@ private[parquet] class CatalystStructConverter(
* @param parent
*/
private[parquet] class CatalystMapConverter(
protected[parquet] val schema: Seq[FieldType],
protected[parquet] val schema: Array[FieldType],
override protected[parquet] val index: Int,
override protected[parquet] val parent: CatalystConverter)
extends GroupConverter with CatalystConverter {
extends CatalystConverter {

private val map = new HashMap[Any, Any]()

private val keyValueConverter = new GroupConverter with CatalystConverter {
private val keyValueConverter = new CatalystConverter {
private var currentKey: Any = null
private var currentValue: Any = null
val keyConverter = CatalystConverter.createConverter(schema(0), 0, this)
Expand Down

0 comments on commit 7eceb67

Please sign in to comment.