Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19411][SQL] Remove the metadata used to mark optional columns in merged Parquet schema for filter predicate pushdown #16756

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,9 @@ class Analyzer(
// |- view2 (defaultDatabase = db2)
// |- view3 (defaultDatabase = db3)
// |- view4 (defaultDatabase = db4)
// In this case, the view `view1` is a nested view, it directly references `table2``view2`
// In this case, the view `view1` is a nested view, it directly references `table2`, `view2`
// and `view4`, the view `view2` references `view3`. On resolving the table, we look up the
// relations `table2``view2``view4` using the default database `db1`, and look up the
// relations `table2`, `view2`, `view4` using the default database `db1`, and look up the
// relation `view3` using the default database `db2`.
//
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,6 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
@InterfaceStability.Stable
object StructType extends AbstractDataType {

/**
* A key used in field metadata to indicate that the field comes from the result of merging
* two different StructTypes that do not always contain the field. That is to say, the field
* might be missing (optional) from one of the StructTypes.
*/
private[sql] val metadataKeyForOptionalField = "_OPTIONAL_"

override private[sql] def defaultConcreteType: DataType = new StructType

override private[sql] def acceptsType(other: DataType): Boolean = {
Expand Down Expand Up @@ -463,8 +456,6 @@ object StructType extends AbstractDataType {

case (StructType(leftFields), StructType(rightFields)) =>
val newFields = ArrayBuffer.empty[StructField]
// This metadata will record the fields that only exist in one of two StructTypes
val optionalMeta = new MetadataBuilder()

val rightMapped = fieldsMap(rightFields)
leftFields.foreach {
Expand All @@ -476,8 +467,7 @@ object StructType extends AbstractDataType {
nullable = leftNullable || rightNullable)
}
.orElse {
optionalMeta.putBoolean(metadataKeyForOptionalField, value = true)
Some(leftField.copy(metadata = optionalMeta.build()))
Some(leftField)
}
.foreach(newFields += _)
}
Expand All @@ -486,8 +476,7 @@ object StructType extends AbstractDataType {
rightFields
.filterNot(f => leftMapped.get(f.name).nonEmpty)
.foreach { f =>
optionalMeta.putBoolean(metadataKeyForOptionalField, value = true)
newFields += f.copy(metadata = optionalMeta.build())
newFields += f
}

StructType(newFields)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,55 +132,6 @@ class DataTypeSuite extends SparkFunSuite {
assert(mapped === expected)
}

test("merge where right is empty") {
val left = StructType(
StructField("a", LongType) ::
StructField("b", FloatType) :: Nil)

val right = StructType(List())
val merged = left.merge(right)

assert(DataType.equalsIgnoreCompatibleNullability(merged, left))
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
}

test("merge where left is empty") {

val left = StructType(List())

val right = StructType(
StructField("a", LongType) ::
StructField("b", FloatType) :: Nil)

val merged = left.merge(right)

assert(DataType.equalsIgnoreCompatibleNullability(merged, right))
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
}

test("merge where both are non-empty") {
val left = StructType(
StructField("a", LongType) ::
StructField("b", FloatType) :: Nil)

val right = StructType(
StructField("c", LongType) :: Nil)

val expected = StructType(
StructField("a", LongType) ::
StructField("b", FloatType) ::
StructField("c", LongType) :: Nil)

val merged = left.merge(right)

assert(DataType.equalsIgnoreCompatibleNullability(merged, expected))
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(merged("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
}

test("merge where right contains type conflict") {
val left = StructType(
StructField("a", LongType) ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ class ParquetFileFormat

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)
ParquetWriteSupport.setSchema(dataSchema, conf)

// Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema)
// and `CatalystWriteSupport` (writing actual rows to Parquet files).
Expand Down Expand Up @@ -307,11 +305,7 @@ class ParquetFileFormat
ParquetWriteSupport.SPARK_ROW_SCHEMA,
ParquetSchemaConverter.checkFieldNames(requiredSchema).json)

// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
requiredSchema).asInstanceOf[StructType]
ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)

// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,23 +169,14 @@ private[parquet] object ParquetFilters {
}

/**
* Returns a map from name of the column to the data type, if predicate push down applies
* (i.e. not an optional field).
*
* SPARK-11955: The optional fields will have metadata StructType.metadataKeyForOptionalField.
* These fields only exist in one side of merged schemas. Due to that, we can't push down filters
* using such fields, otherwise Parquet library will throw exception (PARQUET-389).
* Here we filter out such fields.
* Returns a map from name of the column to the data type, if predicate push down applies.
*/
private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
case StructType(fields) =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
fields.filter { f =>
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
}.map(f => f.name -> f.dataType).toMap
fields.map(f => f.name -> f.dataType).toMap
case _ => Map.empty[String, DataType]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,76 +368,36 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}


test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
test("Filter applied on merged Parquet schema with new column should work") {
import testImplicits._
Seq("true", "false").map { vectorized =>
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
withTempPath { dir =>
val pathOne = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
val pathTwo = s"${dir.getCanonicalPath}/table2"
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)

// If the "c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits. This is a Parquet issue (PARQUET-389).
val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a")
val path1 = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path1)
val path2 = s"${dir.getCanonicalPath}/table2"
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(path2)

// No matter "c = 1" gets pushed down or not, this query should work without exception.
val df = spark.read.parquet(path1, path2).filter("c = 1").selectExpr("c", "b", "a")
checkAnswer(
df,
Row(1, "1", null))

// The fields "a" and "c" only exist in one Parquet file.
assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathThree = s"${dir.getCanonicalPath}/table3"
df.write.parquet(pathThree)

// We will remove the temporary metadata when writing Parquet file.
val schema = spark.read.parquet(pathThree).schema
assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

val pathFour = s"${dir.getCanonicalPath}/table4"
val path3 = s"${dir.getCanonicalPath}/table3"
val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
dfStruct.select(struct("a").as("s")).write.parquet(path3)

val pathFive = s"${dir.getCanonicalPath}/table5"
val path4 = s"${dir.getCanonicalPath}/table4"
val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
dfStruct2.select(struct("c").as("s")).write.parquet(path4)

// If the "s.c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits.
val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1")
// No matter "s.c = 1" gets pushed down or not, this query should work without exception.
val dfStruct3 = spark.read.parquet(path3, path4).filter("s.c = 1")
.selectExpr("s")
checkAnswer(dfStruct3, Row(Row(null, 1)))

// The fields "s.a" and "s.c" only exist in one Parquet file.
val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))

val pathSix = s"${dir.getCanonicalPath}/table6"
dfStruct3.write.parquet(pathSix)

// We will remove the temporary metadata when writing Parquet file.
val forPathSix = spark.read.parquet(pathSix).schema
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))

// sanity test: make sure optional metadata field is not wrongly set.
val pathSeven = s"${dir.getCanonicalPath}/table7"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven)
val pathEight = s"${dir.getCanonicalPath}/table8"
(4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight)

val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b")
checkAnswer(
df2,
Row(1, "1"))

// The fields "a" and "b" exist in both two Parquet files. No metadata is set.
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
}
}
}
Expand Down