Skip to content

Commit

Permalink
Remove the metadata used to mark optional columns for merged Parquet …
Browse files Browse the repository at this point in the history
…schema.
  • Loading branch information
viirya committed Jan 31, 2017
1 parent 26a4cba commit ec2bbbf
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,9 @@ class Analyzer(
// |- view2 (defaultDatabase = db2)
// |- view3 (defaultDatabase = db3)
// |- view4 (defaultDatabase = db4)
// In this case, the view `view1` is a nested view, it directly references `table2``view2`
// In this case, the view `view1` is a nested view, it directly references `table2`, `view2`
// and `view4`, the view `view2` references `view3`. On resolving the table, we look up the
// relations `table2``view2``view4` using the default database `db1`, and look up the
// relations `table2`, `view2`, `view4` using the default database `db1`, and look up the
// relation `view3` using the default database `db2`.
//
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,6 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
@InterfaceStability.Stable
object StructType extends AbstractDataType {

/**
* A key used in field metadata to indicate that the field comes from the result of merging
* two different StructTypes that do not always contain the field. That is to say, the field
* might be missing (optional) from one of the StructTypes.
*/
private[sql] val metadataKeyForOptionalField = "_OPTIONAL_"

override private[sql] def defaultConcreteType: DataType = new StructType

override private[sql] def acceptsType(other: DataType): Boolean = {
Expand Down Expand Up @@ -463,8 +456,6 @@ object StructType extends AbstractDataType {

case (StructType(leftFields), StructType(rightFields)) =>
val newFields = ArrayBuffer.empty[StructField]
// This metadata will record the fields that only exist in one of two StructTypes
val optionalMeta = new MetadataBuilder()

val rightMapped = fieldsMap(rightFields)
leftFields.foreach {
Expand All @@ -476,8 +467,7 @@ object StructType extends AbstractDataType {
nullable = leftNullable || rightNullable)
}
.orElse {
optionalMeta.putBoolean(metadataKeyForOptionalField, value = true)
Some(leftField.copy(metadata = optionalMeta.build()))
Some(leftField)
}
.foreach(newFields += _)
}
Expand All @@ -486,8 +476,7 @@ object StructType extends AbstractDataType {
rightFields
.filterNot(f => leftMapped.get(f.name).nonEmpty)
.foreach { f =>
optionalMeta.putBoolean(metadataKeyForOptionalField, value = true)
newFields += f.copy(metadata = optionalMeta.build())
newFields += f
}

StructType(newFields)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,55 +132,6 @@ class DataTypeSuite extends SparkFunSuite {
assert(mapped === expected)
}

test("merge where right is empty") {
val left = StructType(
StructField("a", LongType) ::
StructField("b", FloatType) :: Nil)

val right = StructType(List())
val merged = left.merge(right)

assert(DataType.equalsIgnoreCompatibleNullability(merged, left))
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
}

test("merge where left is empty") {

val left = StructType(List())

val right = StructType(
StructField("a", LongType) ::
StructField("b", FloatType) :: Nil)

val merged = left.merge(right)

assert(DataType.equalsIgnoreCompatibleNullability(merged, right))
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
}

test("merge where both are non-empty") {
val left = StructType(
StructField("a", LongType) ::
StructField("b", FloatType) :: Nil)

val right = StructType(
StructField("c", LongType) :: Nil)

val expected = StructType(
StructField("a", LongType) ::
StructField("b", FloatType) ::
StructField("c", LongType) :: Nil)

val merged = left.merge(right)

assert(DataType.equalsIgnoreCompatibleNullability(merged, expected))
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(merged("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
}

test("merge where right contains type conflict") {
val left = StructType(
StructField("a", LongType) ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ class ParquetFileFormat

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)
ParquetWriteSupport.setSchema(dataSchema, conf)

// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
Expand Down Expand Up @@ -307,11 +305,7 @@ class ParquetFileFormat
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
requiredSchema).asInstanceOf[StructType]
ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)

// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,23 +169,14 @@ private[parquet] object ParquetFilters {
}

/**
* Returns a map from name of the column to the data type, if predicate push down applies
* (i.e. not an optional field).
*
* SPARK-11955: The optional fields will have metadata StructType.metadataKeyForOptionalField.
* These fields only exist in one side of merged schemas. Due to that, we can't push down filters
* using such fields, otherwise Parquet library will throw exception (PARQUET-389).
* Here we filter out such fields.
* Returns a map from name of the column to the data type, if predicate push down applies.
*/
private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
case StructType(fields) =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
fields.filter { f =>
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
}.map(f => f.name -> f.dataType).toMap
fields.map(f => f.name -> f.dataType).toMap
case _ => Map.empty[String, DataType]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,76 +368,36 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}


test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
test("Filter applied on merged Parquet schema with new column should work") {
import testImplicits._
Seq("true", "false").map { vectorized =>
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
withTempPath { dir =>
val pathOne = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
val pathTwo = s"${dir.getCanonicalPath}/table2"
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)

// If the "c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits. This is a Parquet issue (PARQUET-389).
val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
val path1 = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path1)
val path2 = s"${dir.getCanonicalPath}/table2"
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(path2)

// No matter "c = 1" gets pushed down or not, this query should work without exception.
val df = spark.read.parquet(path1, path2).filter("c = 1").selectExpr("c", "b", "a")
checkAnswer(
df,
Row(1, "1", null))

// The fields "a" and "c" only exist in one Parquet file.
assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathThree = s"${dir.getCanonicalPath}/table3"
df.write.parquet(pathThree)

// We will remove the temporary metadata when writing Parquet file.
val schema = spark.read.parquet(pathThree).schema
assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

val pathFour = s"${dir.getCanonicalPath}/table4"
val path3 = s"${dir.getCanonicalPath}/table3"
val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
dfStruct.select(struct("a").as("s")).write.parquet(path3)

val pathFive = s"${dir.getCanonicalPath}/table5"
val path4 = s"${dir.getCanonicalPath}/table4"
val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
dfStruct2.select(struct("c").as("s")).write.parquet(path4)

// If the "s.c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits.
val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
// No matter "s.c = 1" gets pushed down or not, this query should work without exception.
val dfStruct3 = spark.read.parquet(path3, path4).filter("s.c = 1")
.selectExpr("s")
checkAnswer(dfStruct3, Row(Row(null, 1)))

// The fields "s.a" and "s.c" only exist in one Parquet file.
val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathSix = s"${dir.getCanonicalPath}/table6"
dfStruct3.write.parquet(pathSix)

// We will remove the temporary metadata when writing Parquet file.
val forPathSix = spark.read.parquet(pathSix).schema
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

// sanity test: make sure optional metadata field is not wrongly set.
val pathSeven = s"${dir.getCanonicalPath}/table7"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
val pathEight = s"${dir.getCanonicalPath}/table8"
(4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)

val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
checkAnswer(
df2,
Row(1, "1"))

// The fields "a" and "b" exist in both two Parquet files. No metadata is set.
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
}
}
}
Expand Down

0 comments on commit ec2bbbf

Please sign in to comment.