From cd021a9f1a6a98d6771c3ff21ac621b58f33b2ca Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Mon, 26 Aug 2024 14:52:34 +0200 Subject: [PATCH 1/3] Add tests for implicit casts in batch insert --- .../DeltaInsertIntoImplicitCastSuite.scala | 193 +++++++++++++ .../spark/sql/delta/DeltaInsertIntoTest.scala | 272 ++++++++++++++++++ ...peWideningInsertSchemaEvolutionSuite.scala | 252 ++-------------- 3 files changed, 495 insertions(+), 222 deletions(-) create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoImplicitCastSuite.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoImplicitCastSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoImplicitCastSuite.scala new file mode 100644 index 0000000000..7e3fd5ea6e --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoImplicitCastSuite.scala @@ -0,0 +1,193 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.sources.DeltaSQLConf + +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.types._ + +/** + * Test suite covering implicit casting in INSERT operations when the type of the data to insert + * doesn't match the type in Delta table. + * + * The casting behavior is (unfortunately) dependent on the API used to run the INSERT, e.g. + * Dataframe V1 insertInto() vs V2 saveAsTable() or using SQL. + * This suite intends to exhaustively cover all the ways INSERT can be run on a Delta table. See + * [[DeltaInsertIntoTest]] for a list of these INSERT operations covered. + */ +class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { + + for (schemaEvolution <- BOOLEAN_DOMAIN) { + testInserts("insert with implicit up and down cast on top-level fields, " + + s"schemaEvolution=$schemaEvolution")( + initialSchemaDDL = "a long, b int", + initialJsonData = Seq("""{ "a": 1, "b": 2 }"""), + partitionBy = Seq("a"), + overwriteWhere = "a" -> 1, + insertSchemaDDL = "a int, b long", + insertJsonData = Seq("""{ "a": 1, "b": 4 }"""), + expectedSchema = StructType(new StructType() + .add("a", LongType) + .add("b", IntegerType)), + // The following insert operations don't implicitly cast the data but fail instead - see + // following test covering failure for these cases. We should change this to offer consistent + // behavior across all inserts. + excludeInserts = Seq( + DFv1SaveAsTable(SaveMode.Append), + DFv1SaveAsTable(SaveMode.Overwrite), + DFv2Append, + DFv2Overwrite, + DFv2OverwritePartition + ), + confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString) + ) + + testInserts("insert with implicit up and down cast on top-level fields, " + + s"schemaEvolution=$schemaEvolution")( + initialSchemaDDL = "a long, b int", + initialJsonData = Seq("""{ "a": 1, "b": 2 }"""), + partitionBy = Seq("a"), + overwriteWhere = "a" -> 1, + insertSchemaDDL = "a int, b long", + insertJsonData = Seq("""{ "a": 1, "b": 4 }"""), + checkError = ex => { + checkError( + ex, + errorClass = "DELTA_FAILED_TO_MERGE_FIELDS", + parameters = Map( + "currentField" -> "a", + "updateField" -> "a" + )) + }, + includeInserts = Seq( + DFv1SaveAsTable(SaveMode.Append), + DFv1SaveAsTable(SaveMode.Overwrite), + DFv2Append, + DFv2Overwrite, + DFv2OverwritePartition + ), + confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString) + ) + + testInserts("insert with implicit up and down cast on fields nested in array, " + + s"schemaEvolution=$schemaEvolution")( + initialSchemaDDL = "key int, a array>", + initialJsonData = Seq("""{ "key": 1, "a": [ { "x": 1, "y": 2 } ] }"""), + partitionBy = Seq("key"), + overwriteWhere = "key" -> 1, + insertSchemaDDL = "key int, a array>", + insertJsonData = Seq("""{ "key": 1, "a": [ { "x": 3, "y": 4 } ] }"""), + expectedSchema = new StructType() + .add("key", IntegerType) + .add("a", ArrayType(new StructType() + .add("x", LongType) + .add("y", IntegerType, nullable = true))), + // The following insert operations don't implicitly cast the data but fail instead - see + // following test covering failure for these cases. We should change this to offer consistent + // behavior across all inserts. + excludeInserts = Seq( + DFv1SaveAsTable(SaveMode.Append), + DFv1SaveAsTable(SaveMode.Overwrite), + DFv2Append, + DFv2Overwrite, + DFv2OverwritePartition + ), + confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString) + ) + + testInserts("insert with implicit up and down cast on fields nested in array, " + + s"schemaEvolution=$schemaEvolution")( + initialSchemaDDL = "key int, a array>", + initialJsonData = Seq("""{ "key": 1, "a": [ { "x": 1, "y": 2 } ] }"""), + partitionBy = Seq("key"), + overwriteWhere = "key" -> 1, + insertSchemaDDL = "key int, a array>", + insertJsonData = Seq("""{ "key": 1, "a": [ { "x": 3, "y": 4 } ] }"""), + checkError = ex => { + checkError( + ex, + errorClass = "DELTA_FAILED_TO_MERGE_FIELDS", + parameters = Map( + "currentField" -> "a", + "updateField" -> "a" + )) + }, + includeInserts = Seq( + DFv1SaveAsTable(SaveMode.Append), + DFv1SaveAsTable(SaveMode.Overwrite), + DFv2Append, + DFv2Overwrite, + DFv2OverwritePartition + ), + confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString) + ) + + testInserts("insert with implicit up and down cast on fields nested in map, " + + s"schemaEvolution=$schemaEvolution")( + initialSchemaDDL = "key int, m map>", + initialJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 1, "y": 2 } } }"""), + partitionBy = Seq("key"), + overwriteWhere = "key" -> 1, + insertSchemaDDL = "key int, m map>", + insertJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 3, "y": 4 } } }"""), + expectedSchema = new StructType() + .add("key", IntegerType) + .add("m", MapType(StringType, new StructType() + .add("x", LongType) + .add("y", IntegerType))), + // The following insert operations don't implicitly cast the data but fail instead - see + // following test covering failure for these cases. We should change this to offer consistent + // behavior across all inserts. + excludeInserts = Seq( + DFv1SaveAsTable(SaveMode.Append), + DFv1SaveAsTable(SaveMode.Overwrite), + DFv2Append, + DFv2Overwrite, + DFv2OverwritePartition + ), + confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString) + ) + + testInserts("insert with implicit up and down cast on fields nested in map, " + + s"schemaEvolution=$schemaEvolution")( + initialSchemaDDL = "key int, m map>", + initialJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 1, "y": 2 } } }"""), + partitionBy = Seq("key"), + overwriteWhere = "key" -> 1, + insertSchemaDDL = "key int, m map>", + insertJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 3, "y": 4 } } }"""), + checkError = ex => { + checkError( + ex, + errorClass = "DELTA_FAILED_TO_MERGE_FIELDS", + parameters = Map( + "currentField" -> "m", + "updateField" -> "m" + )) + }, + includeInserts = Seq( + DFv1SaveAsTable(SaveMode.Append), + DFv1SaveAsTable(SaveMode.Overwrite), + DFv2Append, + DFv2Overwrite, + DFv2OverwritePartition + ), + confs = Seq(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> schemaEvolution.toString) + ) + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala new file mode 100644 index 0000000000..544c535646 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala @@ -0,0 +1,272 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest + +import org.apache.spark.{DebugFilesystem, SparkThrowable} +import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.types.StructType + +/** + * There are **many** different ways to run an insert: + * - Using SQL, the dataframe v1 and v2 APIs or the streaming API. + * - Append vs. Overwrite / Partition overwrite. + * - Position-based vs. name-based resolution. + * + * Each take a unique path through analysis. The abstractions below captures these different + * inserts to allow more easily running tests with all or a subset of them. + */ +trait DeltaInsertIntoTest extends QueryTest with DeltaDMLTestUtils with DeltaSQLCommandTest { + + /** + * Represents one way of inserting data into a Delta table. + * @param mode Append or Overwrite. This dictates in particular what the expected result after the + * insert should be. + * @param name A human-readable name for the insert type displayed in the test names. + */ + trait Insert { + val mode: SaveMode + val name: String + + /** + * The method that tests will call to run the insert. Each type of insert must implement its + * specific way to run insert. + */ + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit + + /** SQL keyword for this type of insert. */ + def intoOrOverwrite: String = if (mode == SaveMode.Append) "INTO" else "OVERWRITE" + + /** The expected content of the table after the insert. */ + def expectedResult(initialDF: DataFrame, insertedDF: DataFrame): DataFrame = + if (mode == SaveMode.Overwrite) insertedDF + else initialDF.unionByName(insertedDF, allowMissingColumns = true) + } + + /** INSERT INTO/OVERWRITE */ + case class SQLInsertByPosition(mode: SaveMode) extends Insert { + val name: String = s"INSERT $intoOrOverwrite" + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = + sql(s"INSERT $intoOrOverwrite target SELECT * FROM source") + } + + /** INSERT INTO/OVERWRITE (a, b) */ + case class SQLInsertColList(mode: SaveMode) extends Insert { + val name: String = s"INSERT $intoOrOverwrite (columns) - $mode" + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { + val colList = columns.mkString(", ") + sql(s"INSERT $intoOrOverwrite target ($colList) SELECT $colList FROM source") + } + } + + /** INSERT INTO/OVERWRITE BY NAME */ + case class SQLInsertByName(mode: SaveMode) extends Insert { + val name: String = s"INSERT $intoOrOverwrite BY NAME - $mode" + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = + sql(s"INSERT $intoOrOverwrite target SELECT ${columns.mkString(", ")} FROM source") + } + + /** INSERT INTO REPLACE WHERE */ + object SQLInsertOverwriteReplaceWhere extends Insert { + val mode: SaveMode = SaveMode.Overwrite + val name: String = s"INSERT INTO REPLACE WHERE" + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = + sql(s"INSERT INTO target REPLACE WHERE $whereCol = $whereValue " + + s"SELECT ${columns.mkString(", ")} FROM source") + } + + /** INSERT OVERWRITE PARTITION (part = 1) */ + object SQLInsertOverwritePartitionByPosition extends Insert { + val mode: SaveMode = SaveMode.Overwrite + val name: String = s"INSERT OVERWRITE PARTITION (partition)" + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { + val assignments = columns.filterNot(_ == whereCol).mkString(", ") + sql(s"INSERT OVERWRITE target PARTITION ($whereCol = $whereValue) " + + s"SELECT $assignments FROM source") + } + } + + /** INSERT OVERWRITE PARTITION (part = 1) (a, b) */ + object SQLInsertOverwritePartitionColList extends Insert { + val mode: SaveMode = SaveMode.Overwrite + val name: String = s"INSERT OVERWRITE PARTITION (partition) (columns)" + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { + val assignments = columns.filterNot(_ == whereCol).mkString(", ") + sql(s"INSERT OVERWRITE target PARTITION ($whereCol = $whereValue) ($assignments) " + + s"SELECT $assignments FROM source") + } + } + + /** df.write.mode(mode).insertInto() */ + case class DFv1InsertInto(mode: SaveMode) extends Insert { + val name: String = s"DFv1 insertInto() - $mode" + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = + spark.read.table("source").write.mode(mode).insertInto("target") + } + + /** df.write.mode(mode).saveAsTable() */ + case class DFv1SaveAsTable(mode: SaveMode) extends Insert { + val name: String = s"DFv1 saveAsTable() - $mode" + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { + spark.read.table("source").write.mode(mode).format("delta").saveAsTable("target") + } + } + + /** df.writeTo.append() */ + object DFv2Append extends Insert { self: Insert => + val mode: SaveMode = SaveMode.Append + val name: String = "DFv2 append()" + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { + spark.read.table("source").writeTo("target").append() + } + } + + /** df.writeTo.overwrite() */ + object DFv2Overwrite extends Insert { self: Insert => + val mode: SaveMode = SaveMode.Overwrite + val name: String = s"DFv2 overwrite()" + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { + spark.read.table("source").writeTo("target").overwrite(col(whereCol) === lit(whereValue)) + } + } + + /** df.writeTo.overwritePartitions() */ + object DFv2OverwritePartition extends Insert { self: Insert => + override val mode: SaveMode = SaveMode.Overwrite + val name: String = s"DFv2 overwritePartitions()" + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { + spark.read.table("source").writeTo("target").overwritePartitions() + } + } + + /** df.writeStream.toTable() */ + object StreamingInsert extends Insert { self: Insert => + override val mode: SaveMode = SaveMode.Append + val name: String = s"Streaming toTable()" + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { + val tablePath = DeltaLog.forTable(spark, TableIdentifier("target")).dataPath + val query = spark.readStream + .table("source") + .writeStream + .option("checkpointLocation", tablePath.toString) + .format("delta") + .toTable("target") + query.processAllAvailable() + } + } + + /** Collects all the types of insert previously defined. */ + protected lazy val allInsertTypes: Seq[Insert] = Seq( + SQLInsertOverwriteReplaceWhere, + SQLInsertOverwritePartitionByPosition, + SQLInsertOverwritePartitionColList, + DFv2Append, + DFv2Overwrite, + DFv2OverwritePartition, + StreamingInsert + ) ++ (for { + mode: SaveMode <- Seq(SaveMode.Append, SaveMode.Overwrite) + insert: Insert <- Seq( + SQLInsertByPosition(mode), + SQLInsertColList(mode), + SQLInsertByName(mode), + DFv1InsertInto(mode), + DFv1SaveAsTable(mode) + ) + } yield insert) + + // scalastyle:off argcount + /** + * Test runner to cover INSERT operations defined above. + * @param name Test name + * @param initialSchemaDDL Initial schema of the table to be inserted into (as a DDL string). + * @param initialJsonData Initial data present in the table to be inserted into (as a JSON + * string). + * @param partitionBy Partition columns for the initial table. + * @param insertSchemaDDL Schema of the data to be inserted (as a DDL string). + * @param insertJsonData Data to be inserted (as a JSON string) + * @param overwriteWhere Where clause for overwrite PARTITION / REPLACE WHERE (as + * colName -> value) + * @param expectedSchema Expected schema of the table after the insert. Only set this parameter + * if the insert is expected to succeed. + * @param checkError A check to run on the exception thrown by the insert operation. Only + * set this parameter if the insert is expected to fail. + * @param includeInserts List of insert types to run the test with. Defaults to all inserts. + * @param excludeInserts List of insert types to exclude when running the test. Defaults to no + * inserts excluded. + * @param confs Custom spark confs to set before running the insert operation. + */ + def testInserts(name: String)( + initialSchemaDDL: String, + initialJsonData: Seq[String], + partitionBy: Seq[String] = Seq.empty, + insertSchemaDDL: String, + insertJsonData: Seq[String], + overwriteWhere: (String, Int), + expectedSchema: StructType = null, + checkError: SparkThrowable => Unit = null, + includeInserts: Seq[Insert] = allInsertTypes, + excludeInserts: Seq[Insert] = Seq.empty, + confs: Seq[(String, String)] = Seq.empty): Unit = { + for (insert <- includeInserts.filterNot(excludeInserts.toSet)) { + test(s"${insert.name} - $name") { + withTable("source", "target") { + val initialDF = readFromJSON(initialJsonData, StructType.fromDDL(initialSchemaDDL)) + val writer = initialDF.write.format("delta") + if (partitionBy.nonEmpty) { + writer.partitionBy(partitionBy: _*) + } + writer.saveAsTable("target") + // Write the data to insert to a table so that we can use it in both SQL and dataframe + // writer inserts. + val insertDF = readFromJSON(insertJsonData, StructType.fromDDL(insertSchemaDDL)) + insertDF.write.format("delta").saveAsTable("source") + + withSQLConf(confs: _*) { + if (checkError == null) { + assert(expectedSchema != null, "You must provide either `expectedSchema` or " + + "`checkError` in test method `testInserts`.") + insert.runInsert( + columns = insertDF.schema.map(_.name), + whereCol = overwriteWhere._1, + whereValue = overwriteWhere._2 + ) + val target = spark.read.table("target") + assert(target.schema === expectedSchema) + checkAnswer(target, insert.expectedResult(initialDF, insertDF)) + } else { + assert(expectedSchema == null, "You can't provide both `expectedSchema` and " + + "`checkError` in test method `testInserts`.") + val ex = intercept[SparkThrowable] { + insert.runInsert( + columns = insertDF.schema.map(_.name), + whereCol = overwriteWhere._1, + whereValue = overwriteWhere._2 + ) + } + checkError(ex) + } + } + } + } + } + } + // scalastyle:on argcount +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala index e97bf55f1a..c313754dd1 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala @@ -25,7 +25,6 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, Dataset, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.types._ @@ -51,7 +50,9 @@ class TypeWideningInsertSchemaEvolutionSuite /** * Tests covering type widening during schema evolution in INSERT. */ -trait TypeWideningInsertSchemaEvolutionTests extends TypeWideningTestCases { +trait TypeWideningInsertSchemaEvolutionTests + extends DeltaInsertIntoTest + with TypeWideningTestCases { self: QueryTest with TypeWideningTestMixin with DeltaDMLTestUtils => import testImplicits._ @@ -160,209 +161,7 @@ trait TypeWideningInsertSchemaEvolutionTests extends TypeWideningTestCases { checkAnswer(readDeltaTable(tempPath), Row(1)) } - - /** - * There are **many** different ways to run an insert: - * - Using SQL or the dataframe v1 and v2 APIs. - * - Append vs. Overwrite / Partition overwrite. - * - Position-based vs. name-based resolution. - * - * Each take a unique path through analysis. The abstractions below captures these different - * inserts to allow more easily running tests with all or a subset of them. - * - * @param mode Append or Overwrite. This dictates in particular what the expected result after the - * insert should be. - * @param name A human-readable name for the insert type displayed in the test names. - */ - trait Insert { - val mode: SaveMode - val name: String - - /** - * The method that tests will call to run the insert. Each type of insert must implement its - * sepcific way to run insert. - */ - def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit - - /** SQL keyword for this type of insert. */ - def intoOrOverwrite: String = if (mode == SaveMode.Append) "INTO" else "OVERWRITE" - - /** The expected content of the table after the insert. */ - def expectedResult(initialDF: DataFrame, insertedDF: DataFrame): DataFrame = - if (mode == SaveMode.Overwrite) insertedDF - else initialDF.unionByName(insertedDF, allowMissingColumns = true) - } - - /** INSERT INTO/OVERWRITE */ - case class SQLInsertByPosition(mode: SaveMode) extends Insert { - val name: String = s"INSERT $intoOrOverwrite" - def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = - sql(s"INSERT $intoOrOverwrite target SELECT * FROM source") - } - - /** INSERT INTO/OVERWRITE (a, b) */ - case class SQLInsertColList(mode: SaveMode) extends Insert { - val name: String = s"INSERT $intoOrOverwrite (columns) - $mode" - def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { - val colList = columns.mkString(", ") - sql(s"INSERT $intoOrOverwrite target ($colList) SELECT $colList FROM source") - } - } - - /** INSERT INTO/OVERWRITE BY NAME */ - case class SQLInsertByName(mode: SaveMode) extends Insert { - val name: String = s"INSERT $intoOrOverwrite BY NAME - $mode" - def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = - sql(s"INSERT $intoOrOverwrite target SELECT ${columns.mkString(", ")} FROM source") - } - - /** INSERT INTO REPLACE WHERE */ - object SQLInsertOverwriteReplaceWhere extends Insert { - val mode: SaveMode = SaveMode.Overwrite - val name: String = s"INSERT INTO REPLACE WHERE" - def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = - sql(s"INSERT INTO target REPLACE WHERE $whereCol = $whereValue " + - s"SELECT ${columns.mkString(", ")} FROM source") - } - - /** INSERT OVERWRITE PARTITION (part = 1) */ - object SQLInsertOverwritePartitionByPosition extends Insert { - val mode: SaveMode = SaveMode.Overwrite - val name: String = s"INSERT OVERWRITE PARTITION (partition)" - def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { - val assignments = columns.filterNot(_ == whereCol).mkString(", ") - sql(s"INSERT OVERWRITE target PARTITION ($whereCol = $whereValue) " + - s"SELECT $assignments FROM source") - } - } - - /** INSERT OVERWRITE PARTITION (part = 1) (a, b) */ - object SQLInsertOverwritePartitionColList extends Insert { - val mode: SaveMode = SaveMode.Overwrite - val name: String = s"INSERT OVERWRITE PARTITION (partition) (columns)" - def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { - val assignments = columns.filterNot(_ == whereCol).mkString(", ") - sql(s"INSERT OVERWRITE target PARTITION ($whereCol = $whereValue) ($assignments) " + - s"SELECT $assignments FROM source") - } - } - - /** df.write.mode(mode).insertInto() */ - case class DFv1InsertInto(mode: SaveMode) extends Insert { - val name: String = s"DFv1 insertInto() - $mode" - def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = - spark.read.table("source").write.mode(mode).insertInto("target") - } - - /** df.write.mode(mode).saveAsTable() */ - case class DFv1SaveAsTable(mode: SaveMode) extends Insert { - val name: String = s"DFv1 saveAsTable() - $mode" - def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { - spark.read.table("source").write.mode(mode).format("delta").saveAsTable("target") - } - } - - /** df.writeTo.append() */ - object DFv2Append extends Insert { self: Insert => - val mode: SaveMode = SaveMode.Append - val name: String = "DFv2 append()" - def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { - spark.read.table("source").writeTo("target").append() - } - } - - /** df.writeTo.overwrite() */ - object DFv2Overwrite extends Insert { self: Insert => - val mode: SaveMode = SaveMode.Overwrite - val name: String = s"DFv2 overwrite()" - def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { - spark.read.table("source").writeTo("target").overwrite(col(whereCol) === lit(whereValue)) - } - } - - /** df.writeTo.overwritePartitions() */ - object DFv2OverwritePartition extends Insert { self: Insert => - override val mode: SaveMode = SaveMode.Overwrite - val name: String = s"DFv2 overwritePartitions()" - def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { - spark.read.table("source").writeTo("target").overwritePartitions() - } - } - - /** Collects all the types of insert previously defined. */ - protected lazy val allInsertTypes: Seq[Insert] = Seq( - SQLInsertOverwriteReplaceWhere, - SQLInsertOverwritePartitionByPosition, - SQLInsertOverwritePartitionColList, - DFv2Append, - DFv2Overwrite, - DFv2OverwritePartition - ) ++ (for { - mode: SaveMode <- Seq(SaveMode.Append, SaveMode.Overwrite) - insert: Insert <- Seq( - SQLInsertByPosition(mode), - SQLInsertColList(mode), - SQLInsertByName(mode), - DFv1InsertInto(mode), - DFv1SaveAsTable(mode) - ) - } yield insert) - - /** - * Test runner for type evolution in INSERT. - * @param name Test name - * @param initialSchemaDDL Initial schema of the table to be inserted into (as a DDL string). - * @param initialJsonData Initial data present in the table to be inserted into (as a JSON - * string). - * @param partitionBy Partition columns for the initial table. - * @param insertSchemaDDL Schema of the data to be inserted (as a DDL string). - * @param insertJsonData Data to be inserted (as a JSON string) - * @param overwriteWhere Where clause for overwrite PARTITION / REPLACE WHERE (as - * colName -> value) - * @param expectedSchema Expected schema of the table after the insert. - * @param includeInserts List of insert types to run the test with. Defaults to all inserts. - * @param excludeInserts List of insert types to exclude when running the test. Defaults to no - * inserts excluded. - */ - def testInsertTypeEvolution(name: String)( - initialSchemaDDL: String, - initialJsonData: Seq[String], - partitionBy: Seq[String] = Seq.empty, - insertSchemaDDL: String, - insertJsonData: Seq[String], - overwriteWhere: (String, Int), - expectedSchema: StructType, - includeInserts: Seq[Insert] = allInsertTypes, - excludeInserts: Seq[Insert] = Seq.empty): Unit = { - for (insert <- includeInserts.filterNot(excludeInserts.toSet)) { - test(s"${insert.name} - $name") { - withTable("source", "target") { - val initialDF = readFromJSON(initialJsonData, StructType.fromDDL(initialSchemaDDL)) - val writer = initialDF.write.format("delta") - if (partitionBy.nonEmpty) { - writer.partitionBy(partitionBy: _*) - } - writer.saveAsTable("target") - // Write the data to insert to a table so that we can use it in both SQL and dataframe - // writer inserts. - val insertDF = readFromJSON(insertJsonData, StructType.fromDDL(insertSchemaDDL)) - insertDF.write.format("delta").saveAsTable("source") - - insert.runInsert( - columns = insertDF.schema.map(_.name), - whereCol = overwriteWhere._1, - whereValue = overwriteWhere._2 - ) - - val target = spark.read.table("target") - assert(target.schema === expectedSchema) - checkAnswer(target, insert.expectedResult(initialDF, insertDF)) - } - } - } - } - - testInsertTypeEvolution("top-level type evolution")( + testInserts("top-level type evolution")( initialSchemaDDL = "a int, b short", initialJsonData = Seq("""{ "a": 1, "b": 2 }"""), partitionBy = Seq("a"), @@ -372,10 +171,11 @@ trait TypeWideningInsertSchemaEvolutionTests extends TypeWideningTestCases { expectedSchema = StructType(new StructType() .add("a", IntegerType) .add("b", IntegerType, nullable = true, - metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))) + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))), + excludeInserts = Seq(StreamingInsert) ) - testInsertTypeEvolution("top-level type evolution with column upcast")( + testInserts("top-level type evolution with column upcast")( initialSchemaDDL = "a int, b short, c int", initialJsonData = Seq("""{ "a": 1, "b": 2, "c": 3 }"""), partitionBy = Seq("a"), @@ -386,10 +186,11 @@ trait TypeWideningInsertSchemaEvolutionTests extends TypeWideningTestCases { .add("a", IntegerType) .add("b", IntegerType, nullable = true, metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) - .add("c", IntegerType) + .add("c", IntegerType), + excludeInserts = Seq(StreamingInsert) ) - testInsertTypeEvolution("top-level type evolution with schema evolution")( + testInserts("top-level type evolution with schema evolution")( initialSchemaDDL = "a int, b short", initialJsonData = Seq("""{ "a": 1, "b": 2 }"""), partitionBy = Seq("a"), @@ -405,11 +206,12 @@ trait TypeWideningInsertSchemaEvolutionTests extends TypeWideningTestCases { excludeInserts = Seq( SQLInsertColList(SaveMode.Append), SQLInsertColList(SaveMode.Overwrite), - SQLInsertOverwritePartitionColList) + SQLInsertOverwritePartitionColList, + StreamingInsert) ) - testInsertTypeEvolution("nested type evolution by position")( + testInserts("nested type evolution by position")( initialSchemaDDL = "key int, s struct, m map, a array", initialJsonData = Seq("""{ "key": 1, "s": { "x": 1, "y": 2 }, "m": { "p": 3 }, "a": [4] }"""), @@ -434,11 +236,12 @@ trait TypeWideningInsertSchemaEvolutionTests extends TypeWideningTestCases { version = 1, from = ShortType, to = IntegerType, - path = Seq("element"))) + path = Seq("element"))), + excludeInserts = Seq(StreamingInsert) ) - testInsertTypeEvolution("nested type evolution with struct evolution by position")( + testInserts("nested type evolution with struct evolution by position")( initialSchemaDDL = "key int, s struct, m map, a array", initialJsonData = Seq("""{ "key": 1, "s": { "x": 1, "y": 2 }, "m": { "p": 3 }, "a": [4] }"""), @@ -466,11 +269,12 @@ trait TypeWideningInsertSchemaEvolutionTests extends TypeWideningTestCases { version = 1, from = ShortType, to = IntegerType, - path = Seq("element"))) + path = Seq("element"))), + excludeInserts = Seq(StreamingInsert) ) - testInsertTypeEvolution("nested struct type evolution with field upcast")( + testInserts("nested struct type evolution with field upcast")( initialSchemaDDL = "key int, s struct", initialJsonData = Seq("""{ "key": 1, "s": { "x": 1, "y": 2 } }"""), partitionBy = Seq("key"), @@ -482,13 +286,14 @@ trait TypeWideningInsertSchemaEvolutionTests extends TypeWideningTestCases { .add("s", new StructType() .add("x", IntegerType) .add("y", IntegerType, nullable = true, - metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))) + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))), + excludeInserts = Seq(StreamingInsert) ) // Interestingly, we introduced a special case to handle schema evolution / casting for structs // directly nested into an array. This doesn't always work with maps or with elements that // aren't a struct (see other tests). - testInsertTypeEvolution("nested struct type evolution with field upcast in array")( + testInserts("nested struct type evolution with field upcast in array")( initialSchemaDDL = "key int, a array>", initialJsonData = Seq("""{ "key": 1, "a": [ { "x": 1, "y": 2 } ] }"""), partitionBy = Seq("key"), @@ -500,12 +305,13 @@ trait TypeWideningInsertSchemaEvolutionTests extends TypeWideningTestCases { .add("a", ArrayType(new StructType() .add("x", IntegerType) .add("y", IntegerType, nullable = true, - metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)))) + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)))), + excludeInserts = Seq(StreamingInsert) ) // The next two tests document inconsistencies when handling maps. Using SQL doesn't allow type // evolution but using the dataframe API does. - testInsertTypeEvolution("nested struct type evolution with field upcast in map")( + testInserts("nested struct type evolution with field upcast in map")( initialSchemaDDL = "key int, m map>", initialJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 1, "y": 2 } } }"""), partitionBy = Seq("key"), @@ -523,11 +329,12 @@ trait TypeWideningInsertSchemaEvolutionTests extends TypeWideningTestCases { DFv1SaveAsTable(SaveMode.Overwrite), DFv2Append, DFv2Overwrite, - DFv2OverwritePartition + DFv2OverwritePartition, + StreamingInsert ) ) - testInsertTypeEvolution("nested struct type evolution with field upcast in map")( + testInserts("nested struct type evolution with field upcast in map")( initialSchemaDDL = "key int, m map>", initialJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 1, "y": 2 } } }"""), partitionBy = Seq("key"), @@ -546,7 +353,8 @@ trait TypeWideningInsertSchemaEvolutionTests extends TypeWideningTestCases { DFv1SaveAsTable(SaveMode.Overwrite), DFv2Append, DFv2Overwrite, - DFv2OverwritePartition + DFv2OverwritePartition, + StreamingInsert ) ) } From 5dd84845ba1ed9b89d22a5cc85d13c27af29dbfb Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Mon, 2 Sep 2024 10:53:30 +0200 Subject: [PATCH 2/3] Address comments --- .../DeltaInsertIntoImplicitCastSuite.scala | 53 ++++++++----- .../spark/sql/delta/DeltaInsertIntoTest.scala | 79 ++++++++++++------- ...peWideningInsertSchemaEvolutionSuite.scala | 39 ++++----- 3 files changed, 104 insertions(+), 67 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoImplicitCastSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoImplicitCastSuite.scala index 7e3fd5ea6e..9cd5e92147 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoImplicitCastSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoImplicitCastSuite.scala @@ -41,15 +41,18 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { overwriteWhere = "a" -> 1, insertSchemaDDL = "a int, b long", insertJsonData = Seq("""{ "a": 1, "b": 4 }"""), - expectedSchema = StructType(new StructType() - .add("a", LongType) - .add("b", IntegerType)), + expectedResult = ExpectedResult.Success( + expectedSchema = new StructType() + .add("a", LongType) + .add("b", IntegerType)), // The following insert operations don't implicitly cast the data but fail instead - see // following test covering failure for these cases. We should change this to offer consistent // behavior across all inserts. excludeInserts = Seq( DFv1SaveAsTable(SaveMode.Append), DFv1SaveAsTable(SaveMode.Overwrite), + DFv1Save(SaveMode.Append), + DFv1Save(SaveMode.Overwrite), DFv2Append, DFv2Overwrite, DFv2OverwritePartition @@ -65,7 +68,7 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { overwriteWhere = "a" -> 1, insertSchemaDDL = "a int, b long", insertJsonData = Seq("""{ "a": 1, "b": 4 }"""), - checkError = ex => { + expectedResult = ExpectedResult.Failure(ex => { checkError( ex, errorClass = "DELTA_FAILED_TO_MERGE_FIELDS", @@ -73,10 +76,12 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { "currentField" -> "a", "updateField" -> "a" )) - }, + }), includeInserts = Seq( DFv1SaveAsTable(SaveMode.Append), DFv1SaveAsTable(SaveMode.Overwrite), + DFv1Save(SaveMode.Append), + DFv1Save(SaveMode.Overwrite), DFv2Append, DFv2Overwrite, DFv2OverwritePartition @@ -92,17 +97,20 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, a array>", insertJsonData = Seq("""{ "key": 1, "a": [ { "x": 3, "y": 4 } ] }"""), - expectedSchema = new StructType() - .add("key", IntegerType) - .add("a", ArrayType(new StructType() - .add("x", LongType) - .add("y", IntegerType, nullable = true))), + expectedResult = ExpectedResult.Success( + expectedSchema = new StructType() + .add("key", IntegerType) + .add("a", ArrayType(new StructType() + .add("x", LongType) + .add("y", IntegerType, nullable = true)))), // The following insert operations don't implicitly cast the data but fail instead - see // following test covering failure for these cases. We should change this to offer consistent // behavior across all inserts. excludeInserts = Seq( DFv1SaveAsTable(SaveMode.Append), DFv1SaveAsTable(SaveMode.Overwrite), + DFv1Save(SaveMode.Append), + DFv1Save(SaveMode.Overwrite), DFv2Append, DFv2Overwrite, DFv2OverwritePartition @@ -118,7 +126,7 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, a array>", insertJsonData = Seq("""{ "key": 1, "a": [ { "x": 3, "y": 4 } ] }"""), - checkError = ex => { + expectedResult = ExpectedResult.Failure(ex => { checkError( ex, errorClass = "DELTA_FAILED_TO_MERGE_FIELDS", @@ -126,10 +134,12 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { "currentField" -> "a", "updateField" -> "a" )) - }, + }), includeInserts = Seq( DFv1SaveAsTable(SaveMode.Append), DFv1SaveAsTable(SaveMode.Overwrite), + DFv1Save(SaveMode.Append), + DFv1Save(SaveMode.Overwrite), DFv2Append, DFv2Overwrite, DFv2OverwritePartition @@ -145,17 +155,20 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, m map>", insertJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 3, "y": 4 } } }"""), - expectedSchema = new StructType() - .add("key", IntegerType) - .add("m", MapType(StringType, new StructType() - .add("x", LongType) - .add("y", IntegerType))), + expectedResult = ExpectedResult.Success( + expectedSchema = new StructType() + .add("key", IntegerType) + .add("m", MapType(StringType, new StructType() + .add("x", LongType) + .add("y", IntegerType)))), // The following insert operations don't implicitly cast the data but fail instead - see // following test covering failure for these cases. We should change this to offer consistent // behavior across all inserts. excludeInserts = Seq( DFv1SaveAsTable(SaveMode.Append), DFv1SaveAsTable(SaveMode.Overwrite), + DFv1Save(SaveMode.Append), + DFv1Save(SaveMode.Overwrite), DFv2Append, DFv2Overwrite, DFv2OverwritePartition @@ -171,7 +184,7 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, m map>", insertJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 3, "y": 4 } } }"""), - checkError = ex => { + expectedResult = ExpectedResult.Failure(ex => { checkError( ex, errorClass = "DELTA_FAILED_TO_MERGE_FIELDS", @@ -179,10 +192,12 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { "currentField" -> "m", "updateField" -> "m" )) - }, + }), includeInserts = Seq( DFv1SaveAsTable(SaveMode.Append), DFv1SaveAsTable(SaveMode.Overwrite), + DFv1Save(SaveMode.Append), + DFv1Save(SaveMode.Overwrite), DFv2Append, DFv2Overwrite, DFv2OverwritePartition diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala index 544c535646..3145ef6f8c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala @@ -22,6 +22,7 @@ import org.apache.spark.{DebugFilesystem, SparkThrowable} import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.types.StructType /** @@ -129,6 +130,15 @@ trait DeltaInsertIntoTest extends QueryTest with DeltaDMLTestUtils with DeltaSQL } } + /** df.write.mode(mode).save() */ + case class DFv1Save(mode: SaveMode) extends Insert { + val name: String = s"DFv1 save() - $mode" + def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("target")) + spark.read.table("source").write.mode(mode).format("delta").save(deltaLog.dataPath.toString) + } + } + /** df.writeTo.append() */ object DFv2Append extends Insert { self: Insert => val mode: SaveMode = SaveMode.Append @@ -167,6 +177,7 @@ trait DeltaInsertIntoTest extends QueryTest with DeltaDMLTestUtils with DeltaSQL .writeStream .option("checkpointLocation", tablePath.toString) .format("delta") + .trigger(Trigger.AvailableNow()) .toTable("target") query.processAllAvailable() } @@ -188,11 +199,26 @@ trait DeltaInsertIntoTest extends QueryTest with DeltaDMLTestUtils with DeltaSQL SQLInsertColList(mode), SQLInsertByName(mode), DFv1InsertInto(mode), - DFv1SaveAsTable(mode) + DFv1SaveAsTable(mode), + DFv1Save(mode) ) } yield insert) - // scalastyle:off argcount + /** + * Represents the expected result after running an insert operation in `testInserts()` below. + * Either: + * - Success: the table schema after the operation is checked against the expected schema. + * `testInserts()` also validates the data, though it's able to infer the expected data from the + * test inputs. + * - Failure: an exception is thrown and the caller passes a function to check that it matches an + * expected error. + */ + type ExpectedResult = Either[StructType, SparkThrowable => Unit] + object ExpectedResult { + def Success(expectedSchema: StructType): ExpectedResult = Left(expectedSchema) + def Failure(checkError: SparkThrowable => Unit): ExpectedResult = Right(checkError) + } + /** * Test runner to cover INSERT operations defined above. * @param name Test name @@ -204,15 +230,13 @@ trait DeltaInsertIntoTest extends QueryTest with DeltaDMLTestUtils with DeltaSQL * @param insertJsonData Data to be inserted (as a JSON string) * @param overwriteWhere Where clause for overwrite PARTITION / REPLACE WHERE (as * colName -> value) - * @param expectedSchema Expected schema of the table after the insert. Only set this parameter - * if the insert is expected to succeed. - * @param checkError A check to run on the exception thrown by the insert operation. Only - * set this parameter if the insert is expected to fail. + * @param expectedResult Expected result, see [[ExpectedResult]] above. * @param includeInserts List of insert types to run the test with. Defaults to all inserts. * @param excludeInserts List of insert types to exclude when running the test. Defaults to no * inserts excluded. * @param confs Custom spark confs to set before running the insert operation. */ + // scalastyle:off argcount def testInserts(name: String)( initialSchemaDDL: String, initialJsonData: Seq[String], @@ -220,8 +244,7 @@ trait DeltaInsertIntoTest extends QueryTest with DeltaDMLTestUtils with DeltaSQL insertSchemaDDL: String, insertJsonData: Seq[String], overwriteWhere: (String, Int), - expectedSchema: StructType = null, - checkError: SparkThrowable => Unit = null, + expectedResult: ExpectedResult, includeInserts: Seq[Insert] = allInsertTypes, excludeInserts: Seq[Insert] = Seq.empty, confs: Seq[(String, String)] = Seq.empty): Unit = { @@ -239,29 +262,25 @@ trait DeltaInsertIntoTest extends QueryTest with DeltaDMLTestUtils with DeltaSQL val insertDF = readFromJSON(insertJsonData, StructType.fromDDL(insertSchemaDDL)) insertDF.write.format("delta").saveAsTable("source") + def runInsert(): Unit = + insert.runInsert( + columns = insertDF.schema.map(_.name), + whereCol = overwriteWhere._1, + whereValue = overwriteWhere._2 + ) + withSQLConf(confs: _*) { - if (checkError == null) { - assert(expectedSchema != null, "You must provide either `expectedSchema` or " + - "`checkError` in test method `testInserts`.") - insert.runInsert( - columns = insertDF.schema.map(_.name), - whereCol = overwriteWhere._1, - whereValue = overwriteWhere._2 - ) - val target = spark.read.table("target") - assert(target.schema === expectedSchema) - checkAnswer(target, insert.expectedResult(initialDF, insertDF)) - } else { - assert(expectedSchema == null, "You can't provide both `expectedSchema` and " + - "`checkError` in test method `testInserts`.") - val ex = intercept[SparkThrowable] { - insert.runInsert( - columns = insertDF.schema.map(_.name), - whereCol = overwriteWhere._1, - whereValue = overwriteWhere._2 - ) - } - checkError(ex) + expectedResult match { + case Left(expectedSchema) => + runInsert() + val target = spark.read.table("target") + assert(target.schema === expectedSchema) + checkAnswer(target, insert.expectedResult(initialDF, insertDF)) + case Right(checkError) => + val ex = intercept[SparkThrowable] { + runInsert() + } + checkError(ex) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala index c313754dd1..a1b5028b6d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala @@ -168,7 +168,7 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "a" -> 1, insertSchemaDDL = "a int, b int", insertJsonData = Seq("""{ "a": 1, "b": 4 }"""), - expectedSchema = StructType(new StructType() + expectedResult = ExpectedResult.Success(expectedSchema = new StructType() .add("a", IntegerType) .add("b", IntegerType, nullable = true, metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))), @@ -182,11 +182,11 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "a" -> 1, insertSchemaDDL = "a int, b int, c short", insertJsonData = Seq("""{ "a": 1, "b": 5, "c": 6 }"""), - expectedSchema = new StructType() + expectedResult = ExpectedResult.Success(expectedSchema = new StructType() .add("a", IntegerType) .add("b", IntegerType, nullable = true, metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) - .add("c", IntegerType), + .add("c", IntegerType)), excludeInserts = Seq(StreamingInsert) ) @@ -197,11 +197,11 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "a" -> 1, insertSchemaDDL = "a int, b int, c int", insertJsonData = Seq("""{ "a": 1, "b": 4, "c": 5 }"""), - expectedSchema = new StructType() + expectedResult = ExpectedResult.Success(expectedSchema = new StructType() .add("a", IntegerType) .add("b", IntegerType, nullable = true, metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) - .add("c", IntegerType), + .add("c", IntegerType)), // INSERT INTO/OVERWRITE (a, b) VALUES doesn't support schema evolution. excludeInserts = Seq( SQLInsertColList(SaveMode.Append), @@ -219,7 +219,7 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, s struct, m map, a array", insertJsonData = Seq("""{ "key": 1, "s": { "x": 4, "y": 5 }, "m": { "p": 6 }, "a": [7] }"""), - expectedSchema = new StructType() + expectedResult = ExpectedResult.Success(expectedSchema = new StructType() .add("key", IntegerType) .add("s", new StructType() .add("x", ShortType) @@ -236,7 +236,7 @@ trait TypeWideningInsertSchemaEvolutionTests version = 1, from = ShortType, to = IntegerType, - path = Seq("element"))), + path = Seq("element")))), excludeInserts = Seq(StreamingInsert) ) @@ -251,7 +251,7 @@ trait TypeWideningInsertSchemaEvolutionTests "key int, s struct, m map, a array", insertJsonData = Seq("""{ "key": 1, "s": { "x": 4, "y": 5, "z": 8 }, "m": { "p": 6 }, "a": [7] }"""), - expectedSchema = new StructType() + expectedResult = ExpectedResult.Success(expectedSchema = new StructType() .add("key", IntegerType) .add("s", new StructType() .add("x", ShortType) @@ -269,7 +269,7 @@ trait TypeWideningInsertSchemaEvolutionTests version = 1, from = ShortType, to = IntegerType, - path = Seq("element"))), + path = Seq("element")))), excludeInserts = Seq(StreamingInsert) ) @@ -281,12 +281,12 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, s struct", insertJsonData = Seq("""{ "key": 1, "s": { "x": 4, "y": 5 } }"""), - expectedSchema = new StructType() + expectedResult = ExpectedResult.Success(expectedSchema = new StructType() .add("key", IntegerType) .add("s", new StructType() .add("x", IntegerType) .add("y", IntegerType, nullable = true, - metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))), + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)))), excludeInserts = Seq(StreamingInsert) ) @@ -300,12 +300,12 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, a array>", insertJsonData = Seq("""{ "key": 1, "a": [ { "x": 3, "y": 4 } ] }"""), - expectedSchema = new StructType() + expectedResult = ExpectedResult.Success(expectedSchema = new StructType() .add("key", IntegerType) .add("a", ArrayType(new StructType() .add("x", IntegerType) .add("y", IntegerType, nullable = true, - metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)))), + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))))), excludeInserts = Seq(StreamingInsert) ) @@ -318,15 +318,17 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, m map>", insertJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 3, "y": 4 } } }"""), - expectedSchema = new StructType() + expectedResult = ExpectedResult.Success(expectedSchema = new StructType() .add("key", IntegerType) // Type evolution wasn't applied in the map. .add("m", MapType(StringType, new StructType() .add("x", IntegerType) - .add("y", ShortType))), + .add("y", ShortType)))), excludeInserts = Seq( DFv1SaveAsTable(SaveMode.Append), DFv1SaveAsTable(SaveMode.Overwrite), + DFv1Save(SaveMode.Append), + DFv1Save(SaveMode.Overwrite), DFv2Append, DFv2Overwrite, DFv2OverwritePartition, @@ -341,7 +343,7 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, m map>", insertJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 3, "y": 4 } } }"""), - expectedSchema = StructType(new StructType() + expectedResult = ExpectedResult.Success(expectedSchema = new StructType() .add("key", IntegerType) // Type evolution was applied in the map. .add("m", MapType(StringType, new StructType() @@ -351,10 +353,11 @@ trait TypeWideningInsertSchemaEvolutionTests includeInserts = Seq( DFv1SaveAsTable(SaveMode.Append), DFv1SaveAsTable(SaveMode.Overwrite), + DFv1Save(SaveMode.Append), + DFv1Save(SaveMode.Overwrite), DFv2Append, DFv2Overwrite, - DFv2OverwritePartition, - StreamingInsert + DFv2OverwritePartition ) ) } From c73830852cedefe27f4bb74413121879da47d2d8 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Mon, 2 Sep 2024 13:53:29 +0200 Subject: [PATCH 3/3] Empty commit to retrigger CI