Skip to content

Commit

Permalink
[SPARK-33856][SQL] Migrate ALTER TABLE ... RENAME TO PARTITION to use…
Browse files Browse the repository at this point in the history
… UnresolvedTable to resolve the identifier

### What changes were proposed in this pull request?

This PR proposes to migrate `ALTER TABLE ... RENAME TO PARTITION` to use `UnresolvedTable` to resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in [JIRA](https://issues.apache.org/jira/browse/SPARK-29900) or [proposal doc](https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing).

Note that `ALTER TABLE ... RENAME TO PARTITION` is not supported for v2 tables.

### Why are the changes needed?

The PR makes the resolution consistent behavior consistent. For example,
```
sql("CREATE DATABASE test")
sql("CREATE TABLE spark_catalog.test.t (id bigint, val string) USING csv PARTITIONED BY (id)")
sql("CREATE TEMPORARY VIEW t AS SELECT 2")
sql("USE spark_catalog.test")
sql("ALTER TABLE t PARTITION (id=1) RENAME TO PARTITION (id=2)") // works fine assuming id=1 exists.
```
, but after this PR:
```
sql("ALTER TABLE t PARTITION (id=1) RENAME TO PARTITION (id=2)")
org.apache.spark.sql.AnalysisException: t is a temp view. 'ALTER TABLE ... RENAME TO PARTITION' expects a table; line 1 pos 0
```
, which is the consistent behavior with other commands.

### Does this PR introduce _any_ user-facing change?

After this PR, `ALTER TABLE` in the above example is resolved to a temp view `t` first instead of `spark_catalog.test.t`.

### How was this patch tested?

Updated existing tests.

Closes #30862 from imback82/alter_table_rename_partition_v2.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
imback82 authored and cloud-fan committed Dec 21, 2020
1 parent 8e26339 commit 1c7b79c
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case AlterTableDropPartition(ResolvedTable(_, _, table), parts, _, _) =>
checkAlterTablePartition(table, parts)

case AlterTableRenamePartition(ResolvedTable(_, _, table), from, _) =>
checkAlterTablePartition(table, Seq(from))

case showPartitions: ShowPartitions => checkShowPartitions(showPartitions)

case _ => // Falls back to the following checks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, LogicalPlan, ShowPartitions}
import org.apache.spark.sql.catalyst.plans.logical.{AlterTableAddPartition, AlterTableDropPartition, AlterTableRenamePartition, LogicalPlan, ShowPartitions}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
Expand Down Expand Up @@ -51,6 +51,15 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
partitionSchema,
requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames)))

case r @ AlterTableRenamePartition(
ResolvedTable(_, _, table: SupportsPartitionManagement), from, _) =>
val partitionSchema = table.partitionSchema()
r.copy(from = resolvePartitionSpecs(
table.name,
Seq(from),
partitionSchema,
requireExactMatchedPartitionSpec(table.name, _, partitionSchema.fieldNames)).head)

case r @ ShowPartitions(ResolvedTable(_, _, table: SupportsPartitionManagement), partSpecs) =>
r.copy(pattern = resolvePartitionSpecs(
table.name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3761,7 +3761,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Create an [[AlterTableRenamePartitionStatement]]
* Create an [[AlterTableRenamePartition]]
*
* For example:
* {{{
Expand All @@ -3770,9 +3770,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
*/
override def visitRenameTablePartition(
ctx: RenameTablePartitionContext): LogicalPlan = withOrigin(ctx) {
AlterTableRenamePartitionStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
visitNonOptionalPartitionSpec(ctx.from),
AlterTableRenamePartition(
UnresolvedTable(
visitMultipartIdentifier(ctx.multipartIdentifier),
"ALTER TABLE ... RENAME TO PARTITION"),
UnresolvedPartitionSpec(visitNonOptionalPartitionSpec(ctx.from)),
visitNonOptionalPartitionSpec(ctx.to))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,6 @@ case class AlterTableSetLocationStatement(
partitionSpec: Option[TablePartitionSpec],
location: String) extends ParsedStatement

/**
* ALTER TABLE ... RENAME PARTITION command, as parsed from SQL.
*/
case class AlterTableRenamePartitionStatement(
tableName: Seq[String],
from: TablePartitionSpec,
to: TablePartitionSpec) extends ParsedStatement

/**
* An INSERT INTO statement, as parsed from SQL.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,19 @@ case class AlterTableDropPartition(
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the ALTER TABLE ... RENAME TO PARTITION command.
*/
case class AlterTableRenamePartition(
child: LogicalPlan,
from: PartitionSpec,
to: TablePartitionSpec) extends Command {
override lazy val resolved: Boolean =
childrenResolved && from.isInstanceOf[ResolvedPartitionSpec]

override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the ALTER TABLE ... RECOVER PARTITIONS command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedNamespace, UnresolvedPartitionSpec, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -2106,9 +2106,9 @@ class DDLParserSuite extends AnalysisTest {
|RENAME TO PARTITION (dt='2008-09-09', country='uk')
""".stripMargin
val parsed1 = parsePlan(sql1)
val expected1 = AlterTableRenamePartitionStatement(
Seq("table_name"),
Map("dt" -> "2008-08-08", "country" -> "us"),
val expected1 = AlterTableRenamePartition(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... RENAME TO PARTITION"),
UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us")),
Map("dt" -> "2008-09-09", "country" -> "uk"))
comparePlans(parsed1, expected1)

Expand All @@ -2118,9 +2118,9 @@ class DDLParserSuite extends AnalysisTest {
|RENAME TO PARTITION (ds='2018-06-10')
""".stripMargin
val parsed2 = parsePlan(sql2)
val expected2 = AlterTableRenamePartitionStatement(
Seq("a", "b", "c"),
Map("ds" -> "2017-06-10"),
val expected2 = AlterTableRenamePartition(
UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... RENAME TO PARTITION"),
UnresolvedPartitionSpec(Map("ds" -> "2017-06-10")),
Map("ds" -> "2018-06-10"))
comparePlans(parsed2, expected2)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,10 +445,10 @@ class ResolveSessionCatalog(
partSpecsAndLocs.asUnresolvedPartitionSpecs.map(spec => (spec.spec, spec.location)),
ifNotExists)

case AlterTableRenamePartitionStatement(tbl, from, to) =>
val v1TableName = parseV1Table(tbl, "ALTER TABLE RENAME PARTITION")
case AlterTableRenamePartition(
ResolvedV1TableIdentifier(ident), UnresolvedPartitionSpec(from, _), to) =>
AlterTableRenamePartitionCommand(
v1TableName.asTableIdentifier,
ident.asTableIdentifier,
from,
to)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
AlterTableDropPartitionExec(
table, parts.asResolvedPartitionSpecs, ignoreIfNotExists) :: Nil

case AlterTableRenamePartition(_: ResolvedTable, _: ResolvedPartitionSpec, _) =>
throw new AnalysisException(
"ALTER TABLE ... RENAME TO PARTITION is not supported for v2 tables.")

case AlterTableRecoverPartitions(_: ResolvedTable) =>
throw new AnalysisException(
"ALTER TABLE ... RECOVER PARTITIONS is not supported for v2 tables.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,21 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
}

test("ALTER TABLE RENAME PARTITION") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $t PARTITION (id=1) RENAME TO PARTITION (id=2)")
val nonPartTbl = "testcat.ns1.ns2.tbl"
val partTbl = "testpart.ns1.ns2.tbl"
withTable(nonPartTbl, partTbl) {
spark.sql(s"CREATE TABLE $nonPartTbl (id bigint, data string) USING foo PARTITIONED BY (id)")
val e1 = intercept[AnalysisException] {
sql(s"ALTER TABLE $nonPartTbl PARTITION (id=1) RENAME TO PARTITION (id=2)")
}
assert(e1.message.contains(s"Table $nonPartTbl can not alter partitions"))

spark.sql(s"CREATE TABLE $partTbl (id bigint, data string) USING foo PARTITIONED BY (id)")
val e2 = intercept[AnalysisException] {
sql(s"ALTER TABLE $partTbl PARTITION (id=1) RENAME TO PARTITION (id=2)")
}
assert(e.message.contains("ALTER TABLE RENAME PARTITION is only supported with v1 tables"))
assert(e2.message.contains(
"ALTER TABLE ... RENAME TO PARTITION is not supported for v2 tables."))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
assertAnalysisError(
s"ALTER TABLE $viewName SET SERDEPROPERTIES ('p' = 'an')",
s"$viewName is a temp view. 'ALTER TABLE ... SET [SERDE|SERDEPROPERTIES]' expects a table")
assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') RENAME TO PARTITION (a='5')")
assertAnalysisError(
s"ALTER TABLE $viewName PARTITION (a='4') RENAME TO PARTITION (a='5')",
s"$viewName is a temp view. 'ALTER TABLE ... RENAME TO PARTITION' expects a table")
assertAnalysisError(
s"ALTER TABLE $viewName RECOVER PARTITIONS",
s"$viewName is a temp view. 'ALTER TABLE ... RECOVER PARTITIONS' expects a table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.internal.config
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchFunctionException, NoSuchPartitionException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
Expand Down Expand Up @@ -1642,9 +1642,10 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
Set(Map("a" -> "10", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))

// table to alter does not exist
intercept[NoSuchTableException] {
val e = intercept[AnalysisException] {
sql("ALTER TABLE does_not_exist PARTITION (c='3') RENAME TO PARTITION (c='333')")
}
assert(e.getMessage.contains("Table not found: does_not_exist"))

// partition to rename does not exist
intercept[NoSuchPartitionException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,8 +896,9 @@ class HiveDDLSuite
s"ALTER TABLE $oldViewName RECOVER PARTITIONS",
s"$oldViewName is a view. 'ALTER TABLE ... RECOVER PARTITIONS' expects a table.")

assertErrorForAlterTableOnView(
s"ALTER TABLE $oldViewName PARTITION (a='1') RENAME TO PARTITION (a='100')")
assertAnalysisError(
s"ALTER TABLE $oldViewName PARTITION (a='1') RENAME TO PARTITION (a='100')",
s"$oldViewName is a view. 'ALTER TABLE ... RENAME TO PARTITION' expects a table.")

assertAnalysisError(
s"ALTER TABLE $oldViewName ADD IF NOT EXISTS PARTITION (a='4', b='8')",
Expand Down

0 comments on commit 1c7b79c

Please sign in to comment.