diff --git a/spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala b/spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala index ada8b7ec1f0..ea4dadc6788 100644 --- a/spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala +++ b/spark/src/main/scala/io/delta/tables/DeltaMergeBuilder.scala @@ -323,7 +323,7 @@ class DeltaMergeBuilder private( // Resolve UpCast expressions that `PreprocessTableMerge` may have introduced. mergeIntoCommand = PostHocResolveUpCast(sparkSession).apply(mergeIntoCommand) sparkSession.sessionState.analyzer.checkAnalysis(mergeIntoCommand) - mergeIntoCommand.asInstanceOf[MergeIntoCommand].run(sparkSession) + toDataset(sparkSession, mergeIntoCommand) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala index 973f2117003..0600bc61851 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala @@ -23,6 +23,7 @@ import java.util.Locale import scala.language.implicitConversions import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions, UsageRecord} +import org.apache.spark.sql.delta.commands.MergeIntoCommand import org.apache.spark.sql.delta.commands.merge.MergeStats import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLTestUtils @@ -3362,6 +3363,25 @@ abstract class MergeIntoSuiteBase "delta.dml.merge.findTouchedFiles", "delta.dml.merge.writeInsertsOnlyWhenNoMatches", "delta.dml.merge") + + test("merge execution is recorded with QueryExecutionListener") { + withKeyValueData( + source = (0, 0) :: (1, 10) :: Nil, + target = (1, 1) :: (2, 2) :: Nil) { case (sourceName, targetName) => + val plans = withLogicalPlansCaptured(spark, optimizedPlan = false) { + executeMerge( + tgt = s"$targetName t", + src = s"$sourceName s", + cond = "s.key = t.key", + update(set = "*")) + } + val mergeCommands = plans.collect { + case m: MergeIntoCommand => m + } + assert(mergeCommands.size === 1, + "Merge command wasn't properly recorded by QueryExecutionListener") + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/SchemaValidationSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/SchemaValidationSuite.scala index 10997cc1139..0e9e1c47515 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/SchemaValidationSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/SchemaValidationSuite.scala @@ -26,21 +26,15 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSparkSession -trait SchemaValidationSuiteBase extends QueryTest with SharedSparkSession with DeltaSQLCommandTest { - - def checkMergeException(e: Exception, col: String): Unit = { - assert(e.isInstanceOf[MetadataChangedException]) - assert(e.getMessage.contains( - "The metadata of the Delta table has been changed by a concurrent update")) - } -} - /** * This Suite tests the behavior of Delta commands when a schema altering commit is run after the * command completes analysis but before the command starts the transaction. We want to make sure * That we do not corrupt tables. */ -class SchemaValidationSuite extends SchemaValidationSuiteBase { +class SchemaValidationSuite + extends QueryTest + with SharedSparkSession + with DeltaSQLCommandTest { class BlockingRule( blockActionLatch: CountDownLatch, @@ -331,7 +325,7 @@ class SchemaValidationSuite extends SchemaValidationSuiteBase { /** * Concurrently drop column in merge condition. Merge command detects the schema change while - * resolving the target and throws an AnalysisException + * resolving the target and throws a DeltaAnalysisException */ testConcurrentChange("merge - remove a column in merge condition concurrently")( createTable = (spark: SparkSession, tblPath: String) => { @@ -343,7 +337,7 @@ class SchemaValidationSuite extends SchemaValidationSuiteBase { actionToTest = (spark: SparkSession, tblPath: String) => { val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tblPath) val sourceDf = spark.range(10).withColumn("col2", lit(2)) - val e = intercept[Exception] { + val e = intercept[DeltaAnalysisException] { deltaTable.as("t1") .merge(sourceDf.as("t2"), "t1.id == t2.id") .whenNotMatched() @@ -352,14 +346,22 @@ class SchemaValidationSuite extends SchemaValidationSuiteBase { .updateAll() .execute() } - checkMergeException(e, "id") + + checkErrorMatchPVals( + exception = e, + errorClass = "DELTA_SCHEMA_CHANGE_SINCE_ANALYSIS", + parameters = Map( + "schemaDiff" -> ".*id.*", + "legacyFlagMessage" -> "" + ) + ) }, concurrentChange = dropColFromSampleTable("id") ) /** * Concurrently drop column not in merge condition but in target. Merge command detects the schema - * change while resolving the target and throws an AnalysisException + * change while resolving the target and throws a DeltaAnalysisException */ testConcurrentChange("merge - remove a column not in merge condition concurrently")( createTable = (spark: SparkSession, tblPath: String) => { @@ -371,7 +373,7 @@ class SchemaValidationSuite extends SchemaValidationSuiteBase { actionToTest = (spark: SparkSession, tblPath: String) => { val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tblPath) val sourceDf = spark.range(10).withColumn("col2", lit(2)) - val e = intercept[Exception] { + val e = intercept[DeltaAnalysisException] { deltaTable.as("t1") .merge(sourceDf.as("t2"), "t1.id == t2.id") .whenNotMatched() @@ -380,7 +382,14 @@ class SchemaValidationSuite extends SchemaValidationSuiteBase { .updateAll() .execute() } - checkMergeException(e, "col2") + checkErrorMatchPVals( + exception = e, + errorClass = "DELTA_SCHEMA_CHANGE_SINCE_ANALYSIS", + parameters = Map( + "schemaDiff" -> ".*col2.*", + "legacyFlagMessage" -> "" + ) + ) }, concurrentChange = dropColFromSampleTable("col2") )