From 68c032c246bb091b25d80e436b9288cca9245265 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 5 Nov 2020 22:00:45 -0800 Subject: [PATCH] [SPARK-33364][SQL] Introduce the "purge" option in TableCatalog.dropTable for v2 catalog ### What changes were proposed in this pull request? This PR proposes to introduce the `purge` option in `TableCatalog.dropTable` so that v2 catalogs can use the option if needed. Related discussion: https://github.com/apache/spark/pull/30079#discussion_r510594110 ### Why are the changes needed? Spark DDL supports passing the purge option to `DROP TABLE` command. However, the option is not used (ignored) for v2 catalogs. ### Does this PR introduce _any_ user-facing change? This PR introduces a new API in `TableCatalog`. ### How was this patch tested? Added a test. Closes #30267 from imback82/purge_table. Authored-by: Terry Kim Signed-off-by: Dongjoon Hyun --- .../sql/connector/catalog/TableCatalog.java | 23 +++++++++++++++++++ .../datasources/v2/DataSourceV2Strategy.scala | 4 ++-- .../datasources/v2/DropTableExec.scala | 9 +++++--- .../sql/connector/DataSourceV2SQLSuite.scala | 11 +++++++++ 4 files changed, 42 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index b818515adf9c0..92079d127b1e3 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -162,6 +162,29 @@ Table alterTable( */ boolean dropTable(Identifier ident); + /** + * Drop a table in the catalog with an option to purge. + *

+ * If the catalog supports views and contains a view for the identifier and not a table, this + * must not drop the view and must return false. + *

+ * If the catalog supports the option to purge a table, this method must be overridden. + * The default implementation falls back to {@link #dropTable(Identifier)} dropTable} if the + * purge option is set to false. Otherwise, it throws {@link UnsupportedOperationException}. + * + * @param ident a table identifier + * @param purge whether a table should be purged + * @return true if a table was deleted, false if no table exists for the identifier + * + * @since 3.1.0 + */ + default boolean dropTable(Identifier ident, boolean purge) { + if (purge) { + throw new UnsupportedOperationException("Purge option is not supported."); + } + return dropTable(ident); + } + /** * Renames a table in the catalog. *

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 4bb58142b3d19..648929eaa33ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -228,8 +228,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case DescribeColumn(_: ResolvedTable, _, _) => throw new AnalysisException("Describing columns is not supported for v2 tables.") - case DropTable(r: ResolvedTable, ifExists, _) => - DropTableExec(r.catalog, r.identifier, ifExists) :: Nil + case DropTable(r: ResolvedTable, ifExists, purge) => + DropTableExec(r.catalog, r.identifier, ifExists, purge) :: Nil case _: NoopDropTable => LocalTableScanExec(Nil, Nil) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala index 967613f77577c..1fd0cd177478b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala @@ -25,12 +25,15 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} /** * Physical plan node for dropping a table. */ -case class DropTableExec(catalog: TableCatalog, ident: Identifier, ifExists: Boolean) - extends V2CommandExec { +case class DropTableExec( + catalog: TableCatalog, + ident: Identifier, + ifExists: Boolean, + purge: Boolean) extends V2CommandExec { override def run(): Seq[InternalRow] = { if (catalog.tableExists(ident)) { - catalog.dropTable(ident) + catalog.dropTable(ident, purge) } else if (!ifExists) { throw new NoSuchTableException(ident) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 893ee5f130cda..444daf8233c67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -751,6 +751,17 @@ class DataSourceV2SQLSuite sql("DROP TABLE IF EXISTS testcat.db.notbl") } + test("DropTable: purge option") { + withTable("testcat.ns.t") { + sql("CREATE TABLE testcat.ns.t (id bigint) USING foo") + val ex = intercept[UnsupportedOperationException] { + sql ("DROP TABLE testcat.ns.t PURGE") + } + // The default TableCatalog.dropTable implementation doesn't support the purge option. + assert(ex.getMessage.contains("Purge option is not supported")) + } + } + test("SPARK-33174: DROP TABLE should resolve to a temporary view first") { withTable("testcat.ns.t") { withTempView("t") {