diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9e6dbf3344064..4913dccf4b179 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -575,9 +575,9 @@ class Analyzer( // |- view2 (defaultDatabase = db2) // |- view3 (defaultDatabase = db3) // |- view4 (defaultDatabase = db4) - // In this case, the view `view1` is a nested view, it directly references `table2`、`view2` + // In this case, the view `view1` is a nested view, it directly references `table2`, `view2` // and `view4`, the view `view2` references `view3`. On resolving the table, we look up the - // relations `table2`、`view2`、`view4` using the default database `db1`, and look up the + // relations `table2`, `view2`, `view4` using the default database `db1`, and look up the // relation `view3` using the default database `db2`. // // Note this is compatible with the views defined by older versions of Spark(before 2.2), which diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index ca0000a465c97..8d8b5b86d5aa1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -402,13 +402,6 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru @InterfaceStability.Stable object StructType extends AbstractDataType { - /** - * A key used in field metadata to indicate that the field comes from the result of merging - * two different StructTypes that do not always contain the field. That is to say, the field - * might be missing (optional) from one of the StructTypes. - */ - private[sql] val metadataKeyForOptionalField = "_OPTIONAL_" - override private[sql] def defaultConcreteType: DataType = new StructType override private[sql] def acceptsType(other: DataType): Boolean = { @@ -463,8 +456,6 @@ object StructType extends AbstractDataType { case (StructType(leftFields), StructType(rightFields)) => val newFields = ArrayBuffer.empty[StructField] - // This metadata will record the fields that only exist in one of two StructTypes - val optionalMeta = new MetadataBuilder() val rightMapped = fieldsMap(rightFields) leftFields.foreach { @@ -476,8 +467,7 @@ object StructType extends AbstractDataType { nullable = leftNullable || rightNullable) } .orElse { - optionalMeta.putBoolean(metadataKeyForOptionalField, value = true) - Some(leftField.copy(metadata = optionalMeta.build())) + Some(leftField) } .foreach(newFields += _) } @@ -486,8 +476,7 @@ object StructType extends AbstractDataType { rightFields .filterNot(f => leftMapped.get(f.name).nonEmpty) .foreach { f => - optionalMeta.putBoolean(metadataKeyForOptionalField, value = true) - newFields += f.copy(metadata = optionalMeta.build()) + newFields += f } StructType(newFields) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala index 12d2c00dc9c49..61e1ec7c7ab35 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala @@ -132,55 +132,6 @@ class DataTypeSuite extends SparkFunSuite { assert(mapped === expected) } - test("merge where right is empty") { - val left = StructType( - StructField("a", LongType) :: - StructField("b", FloatType) :: Nil) - - val right = StructType(List()) - val merged = left.merge(right) - - assert(DataType.equalsIgnoreCompatibleNullability(merged, left)) - assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - } - - test("merge where left is empty") { - - val left = StructType(List()) - - val right = StructType( - StructField("a", LongType) :: - StructField("b", FloatType) :: Nil) - - val merged = left.merge(right) - - assert(DataType.equalsIgnoreCompatibleNullability(merged, right)) - assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - } - - test("merge where both are non-empty") { - val left = StructType( - StructField("a", LongType) :: - StructField("b", FloatType) :: Nil) - - val right = StructType( - StructField("c", LongType) :: Nil) - - val expected = StructType( - StructField("a", LongType) :: - StructField("b", FloatType) :: - StructField("c", LongType) :: Nil) - - val merged = left.merge(right) - - assert(DataType.equalsIgnoreCompatibleNullability(merged, expected)) - assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - assert(merged("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - } - test("merge where right contains type conflict") { val left = StructType( StructField("a", LongType) :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index d9831c5d5faf2..828949eddc8ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -109,9 +109,7 @@ class ParquetFileFormat // We want to clear this temporary metadata from saving into Parquet file. // This metadata is only useful for detecting optional columns when pushdowning filters. - val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField, - dataSchema).asInstanceOf[StructType] - ParquetWriteSupport.setSchema(dataSchemaToWrite, conf) + ParquetWriteSupport.setSchema(dataSchema, conf) // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) // and `CatalystWriteSupport` (writing actual rows to Parquet files). @@ -307,11 +305,7 @@ class ParquetFileFormat ParquetWriteSupport.SPARK_ROW_SCHEMA, ParquetSchemaConverter.checkFieldNames(requiredSchema).json) - // We want to clear this temporary metadata from saving into Parquet file. - // This metadata is only useful for detecting optional columns when pushdowning filters. - val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField, - requiredSchema).asInstanceOf[StructType] - ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf) + ParquetWriteSupport.setSchema(requiredSchema, hadoopConf) // Sets flags for `CatalystSchemaConverter` hadoopConf.setBoolean( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 7730d1fccb0b9..2efeb807a5a6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -169,23 +169,14 @@ private[parquet] object ParquetFilters { } /** - * Returns a map from name of the column to the data type, if predicate push down applies - * (i.e. not an optional field). - * - * SPARK-11955: The optional fields will have metadata StructType.metadataKeyForOptionalField. - * These fields only exist in one side of merged schemas. Due to that, we can't push down filters - * using such fields, otherwise Parquet library will throw exception (PARQUET-389). - * Here we filter out such fields. + * Returns a map from name of the column to the data type, if predicate push down applies. */ private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match { case StructType(fields) => // Here we don't flatten the fields in the nested schema but just look up through // root fields. Currently, accessing to nested fields does not push down filters // and it does not support to create filters for them. - fields.filter { f => - !f.metadata.contains(StructType.metadataKeyForOptionalField) || - !f.metadata.getBoolean(StructType.metadataKeyForOptionalField) - }.map(f => f.name -> f.dataType).toMap + fields.map(f => f.name -> f.dataType).toMap case _ => Map.empty[String, DataType] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index a0d57d79f045a..fa046c808ef47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -368,76 +368,36 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } - test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") { + test("Filter applied on merged Parquet schema with new column should work") { import testImplicits._ Seq("true", "false").map { vectorized => withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) { withTempPath { dir => - val pathOne = s"${dir.getCanonicalPath}/table1" - (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne) - val pathTwo = s"${dir.getCanonicalPath}/table2" - (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo) - - // If the "c = 1" filter gets pushed down, this query will throw an exception which - // Parquet emits. This is a Parquet issue (PARQUET-389). - val df = spark.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a") + val path1 = s"${dir.getCanonicalPath}/table1" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path1) + val path2 = s"${dir.getCanonicalPath}/table2" + (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(path2) + + // No matter "c = 1" gets pushed down or not, this query should work without exception. + val df = spark.read.parquet(path1, path2).filter("c = 1").selectExpr("c", "b", "a") checkAnswer( df, Row(1, "1", null)) - // The fields "a" and "c" only exist in one Parquet file. - assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - - val pathThree = s"${dir.getCanonicalPath}/table3" - df.write.parquet(pathThree) - - // We will remove the temporary metadata when writing Parquet file. - val schema = spark.read.parquet(pathThree).schema - assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) - - val pathFour = s"${dir.getCanonicalPath}/table4" + val path3 = s"${dir.getCanonicalPath}/table3" val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b") - dfStruct.select(struct("a").as("s")).write.parquet(pathFour) + dfStruct.select(struct("a").as("s")).write.parquet(path3) - val pathFive = s"${dir.getCanonicalPath}/table5" + val path4 = s"${dir.getCanonicalPath}/table4" val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b") - dfStruct2.select(struct("c").as("s")).write.parquet(pathFive) + dfStruct2.select(struct("c").as("s")).write.parquet(path4) - // If the "s.c = 1" filter gets pushed down, this query will throw an exception which - // Parquet emits. - val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 1") + // No matter "s.c = 1" gets pushed down or not, this query should work without exception. + val dfStruct3 = spark.read.parquet(path3, path4).filter("s.c = 1") .selectExpr("s") checkAnswer(dfStruct3, Row(Row(null, 1))) - - // The fields "s.a" and "s.c" only exist in one Parquet file. - val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType] - assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) - - val pathSix = s"${dir.getCanonicalPath}/table6" - dfStruct3.write.parquet(pathSix) - - // We will remove the temporary metadata when writing Parquet file. - val forPathSix = spark.read.parquet(pathSix).schema - assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) - - // sanity test: make sure optional metadata field is not wrongly set. - val pathSeven = s"${dir.getCanonicalPath}/table7" - (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathSeven) - val pathEight = s"${dir.getCanonicalPath}/table8" - (4 to 6).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathEight) - - val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 1").selectExpr("a", "b") - checkAnswer( - df2, - Row(1, "1")) - - // The fields "a" and "b" exist in both two Parquet files. No metadata is set. - assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField)) - assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField)) } } }