From 1c7b79c0578c76629ac68a7e180f33e40aa380d8 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 21 Dec 2020 04:58:56 +0000 Subject: [PATCH] [SPARK-33856][SQL] Migrate ALTER TABLE ... RENAME TO PARTITION to use UnresolvedTable to resolve the identifier ### What changes were proposed in this pull request? This PR proposes to migrate `ALTER TABLE ... RENAME TO PARTITION` to use `UnresolvedTable` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing). Note that `ALTER TABLE ... RENAME TO PARTITION` is not supported for v2 tables. ### Why are the changes needed? The PR makes the resolution consistent behavior consistent. For example, ``` sql("CREATE DATABASE test") sql("CREATE TABLE spark_catalog.test.t (id bigint, val string) USING csv PARTITIONED BY (id)") sql("CREATE TEMPORARY VIEW t AS SELECT 2") sql("USE spark_catalog.test") sql("ALTER TABLE t PARTITION (id=1) RENAME TO PARTITION (id=2)") // works fine assuming id=1 exists. ``` , but after this PR: ``` sql("ALTER TABLE t PARTITION (id=1) RENAME TO PARTITION (id=2)") org.apache.spark.sql.AnalysisException: t is a temp view. 'ALTER TABLE ... RENAME TO PARTITION' expects a table; line 1 pos 0 ``` , which is the consistent behavior with other commands. ### Does this PR introduce _any_ user-facing change? After this PR, `ALTER TABLE` in the above example is resolved to a temp view `t` first instead of `spark_catalog.test.t`. ### How was this patch tested? Updated existing tests. Closes #30862 from imback82/alter_table_rename_partition_v2. Authored-by: Terry Kim Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/CheckAnalysis.scala | 3 +++ .../analysis/ResolvePartitionSpec.scala | 11 +++++++++- .../sql/catalyst/parser/AstBuilder.scala | 10 ++++++---- .../catalyst/plans/logical/statements.scala | 8 -------- .../catalyst/plans/logical/v2Commands.scala | 13 ++++++++++++ .../sql/catalyst/parser/DDLParserSuite.scala | 14 ++++++------- .../analysis/ResolveSessionCatalog.scala | 6 +++--- .../datasources/v2/DataSourceV2Strategy.scala | 4 ++++ .../AlterTablePartitionV2SQLSuite.scala | 20 +++++++++++++------ .../spark/sql/execution/SQLViewSuite.scala | 4 +++- .../sql/execution/command/DDLSuite.scala | 5 +++-- .../sql/hive/execution/HiveDDLSuite.scala | 5 +++-- 12 files changed, 69 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index c0cdcdf2d9577..472de096b2f22 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -587,6 +587,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { case AlterTableDropPartition(ResolvedTable(_, _, table), parts, _, _) => checkAlterTablePartition(table, parts) + case AlterTableRenamePartition(ResolvedTable(_, _, table), from, _) => + checkAlterTablePartition(table, Seq(from)) + case showPartitions: ShowPartitions => checkShowPartitions(showPartitions) case _ => // Falls back to the following checks diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala index 35e4820cd710b..2c2bea6f89d49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} -import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, LogicalPlan, ShowPartitions} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, AlterTableRenamePartition, LogicalPlan, ShowPartitions} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement @@ -51,6 +51,15 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] { partitionSchema, requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames))) + case r @ AlterTableRenamePartition( + ResolvedTable(_, _, table: SupportsPartitionManagement), from, _) => + val partitionSchema = table.partitionSchema() + r.copy(from = resolvePartitionSpecs( + table.name, + Seq(from), + partitionSchema, + requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames)).head) + case r @ ShowPartitions(ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs) => r.copy(pattern = resolvePartitionSpecs( table.name, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 94589688953d7..9c265544f3227 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3761,7 +3761,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg } /** - * Create an [[AlterTableRenamePartitionStatement]] + * Create an [[AlterTableRenamePartition]] * * For example: * {{{ @@ -3770,9 +3770,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg */ override def visitRenameTablePartition( ctx: RenameTablePartitionContext): LogicalPlan = withOrigin(ctx) { - AlterTableRenamePartitionStatement( - visitMultipartIdentifier(ctx.multipartIdentifier), - visitNonOptionalPartitionSpec(ctx.from), + AlterTableRenamePartition( + UnresolvedTable( + visitMultipartIdentifier(ctx.multipartIdentifier), + "ALTER TABLE ... RENAME TO PARTITION"), + UnresolvedPartitionSpec(visitNonOptionalPartitionSpec(ctx.from)), visitNonOptionalPartitionSpec(ctx.to)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 59239f6e041a5..f6d141ded384a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -292,14 +292,6 @@ case class AlterTableSetLocationStatement( partitionSpec: Option[TablePartitionSpec], location: String) extends ParsedStatement -/** - * ALTER TABLE ... RENAME PARTITION command, as parsed from SQL. - */ -case class AlterTableRenamePartitionStatement( - tableName: Seq[String], - from: TablePartitionSpec, - to: TablePartitionSpec) extends ParsedStatement - /** * An INSERT INTO statement, as parsed from SQL. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index fa67d311c39c3..87d81d5330574 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -673,6 +673,19 @@ case class AlterTableDropPartition( override def children: Seq[LogicalPlan] = child :: Nil } +/** + * The logical plan of the ALTER TABLE ... RENAME TO PARTITION command. + */ +case class AlterTableRenamePartition( + child: LogicalPlan, + from: PartitionSpec, + to: TablePartitionSpec) extends Command { + override lazy val resolved: Boolean = + childrenResolved && from.isInstanceOf[ResolvedPartitionSpec] + + override def children: Seq[LogicalPlan] = child :: Nil +} + /** * The logical plan of the ALTER TABLE ... RECOVER PARTITIONS command. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 5eb0c9a39f1e6..330a01be4bfb3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser import java.util.Locale import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedPartitionSpec, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView} import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource} import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal} import org.apache.spark.sql.catalyst.plans.logical._ @@ -2106,9 +2106,9 @@ class DDLParserSuite extends AnalysisTest { |RENAME TO PARTITION (dt='2008-09-09', country='uk') """.stripMargin val parsed1 = parsePlan(sql1) - val expected1 = AlterTableRenamePartitionStatement( - Seq("table_name"), - Map("dt" -> "2008-08-08", "country" -> "us"), + val expected1 = AlterTableRenamePartition( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... RENAME TO PARTITION"), + UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us")), Map("dt" -> "2008-09-09", "country" -> "uk")) comparePlans(parsed1, expected1) @@ -2118,9 +2118,9 @@ class DDLParserSuite extends AnalysisTest { |RENAME TO PARTITION (ds='2018-06-10') """.stripMargin val parsed2 = parsePlan(sql2) - val expected2 = AlterTableRenamePartitionStatement( - Seq("a", "b", "c"), - Map("ds" -> "2017-06-10"), + val expected2 = AlterTableRenamePartition( + UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... RENAME TO PARTITION"), + UnresolvedPartitionSpec(Map("ds" -> "2017-06-10")), Map("ds" -> "2018-06-10")) comparePlans(parsed2, expected2) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 723647a4a9207..66d1c406a5603 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -445,10 +445,10 @@ class ResolveSessionCatalog( partSpecsAndLocs.asUnresolvedPartitionSpecs.map(spec => (spec.spec, spec.location)), ifNotExists) - case AlterTableRenamePartitionStatement(tbl, from, to) => - val v1TableName = parseV1Table(tbl, "ALTER TABLE RENAME PARTITION") + case AlterTableRenamePartition( + ResolvedV1TableIdentifier(ident), UnresolvedPartitionSpec(from, _), to) => AlterTableRenamePartitionCommand( - v1TableName.asTableIdentifier, + ident.asTableIdentifier, from, to) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 50bcf81f1ba2d..635117a9932ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -342,6 +342,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat AlterTableDropPartitionExec( table, parts.asResolvedPartitionSpecs, ignoreIfNotExists) :: Nil + case AlterTableRenamePartition(_: ResolvedTable, _: ResolvedPartitionSpec, _) => + throw new AnalysisException( + "ALTER TABLE ... RENAME TO PARTITION is not supported for v2 tables.") + case AlterTableRecoverPartitions(_: ResolvedTable) => throw new AnalysisException( "ALTER TABLE ... RECOVER PARTITIONS is not supported for v2 tables.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index ac4d055eb0e60..bdf2fa5b7ac96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -33,13 +33,21 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { } test("ALTER TABLE RENAME PARTITION") { - val t = "testcat.ns1.ns2.tbl" - withTable(t) { - spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)") - val e = intercept[AnalysisException] { - sql(s"ALTER TABLE $t PARTITION (id=1) RENAME TO PARTITION (id=2)") + val nonPartTbl = "testcat.ns1.ns2.tbl" + val partTbl = "testpart.ns1.ns2.tbl" + withTable(nonPartTbl, partTbl) { + spark.sql(s"CREATE TABLE $nonPartTbl (id bigint, data string) USING foo PARTITIONED BY (id)") + val e1 = intercept[AnalysisException] { + sql(s"ALTER TABLE $nonPartTbl PARTITION (id=1) RENAME TO PARTITION (id=2)") + } + assert(e1.message.contains(s"Table $nonPartTbl can not alter partitions")) + + spark.sql(s"CREATE TABLE $partTbl (id bigint, data string) USING foo PARTITIONED BY (id)") + val e2 = intercept[AnalysisException] { + sql(s"ALTER TABLE $partTbl PARTITION (id=1) RENAME TO PARTITION (id=2)") } - assert(e.message.contains("ALTER TABLE RENAME PARTITION is only supported with v1 tables")) + assert(e2.message.contains( + "ALTER TABLE ... RENAME TO PARTITION is not supported for v2 tables.")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 6d65fddb1be62..9b84e0fe4bcb7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -149,7 +149,9 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { assertAnalysisError( s"ALTER TABLE $viewName SET SERDEPROPERTIES ('p' = 'an')", s"$viewName is a temp view. 'ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]' expects a table") - assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') RENAME TO PARTITION (a='5')") + assertAnalysisError( + s"ALTER TABLE $viewName PARTITION (a='4') RENAME TO PARTITION (a='5')", + s"$viewName is a temp view. 'ALTER TABLE ... RENAME TO PARTITION' expects a table") assertAnalysisError( s"ALTER TABLE $viewName RECOVER PARTITIONS", s"$viewName is a temp view. 'ALTER TABLE ... RECOVER PARTITIONS' expects a table") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index d6474ae7d5f00..7a6076d6d9576 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.config import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER @@ -1642,9 +1642,10 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p"))) // table to alter does not exist - intercept[NoSuchTableException] { + val e = intercept[AnalysisException] { sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')") } + assert(e.getMessage.contains("Table not found: does_not_exist")) // partition to rename does not exist intercept[NoSuchPartitionException] { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 34f127bade95b..e55b2d390a5d9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -896,8 +896,9 @@ class HiveDDLSuite s"ALTER TABLE $oldViewName RECOVER PARTITIONS", s"$oldViewName is a view. 'ALTER TABLE ... RECOVER PARTITIONS' expects a table.") - assertErrorForAlterTableOnView( - s"ALTER TABLE $oldViewName PARTITION (a='1') RENAME TO PARTITION (a='100')") + assertAnalysisError( + s"ALTER TABLE $oldViewName PARTITION (a='1') RENAME TO PARTITION (a='100')", + s"$oldViewName is a view. 'ALTER TABLE ... RENAME TO PARTITION' expects a table.") assertAnalysisError( s"ALTER TABLE $oldViewName ADD IF NOT EXISTS PARTITION (a='4', b='8')",