diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index a14dafb881840..95a869d2e71c5 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -208,7 +208,7 @@ statement | SHOW CREATE TABLE multipartIdentifier (AS SERDE)? #showCreateTable | SHOW CURRENT NAMESPACE #showCurrentNamespace | (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction - | (DESC | DESCRIBE) namespace EXTENDED? + | (DESC | DESCRIBE) namespace EXTENDED? STORAGE? CLEAR? multipartIdentifier #describeNamespace | (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)? multipartIdentifier partitionSpec? describeColName? #describeRelation @@ -1286,6 +1286,7 @@ ansiNonReserved | SHOW | SKEWED | SORT + | STORAGE | SORTED | START | STATISTICS @@ -1564,6 +1565,7 @@ nonReserved | SORTED | START | STATISTICS + | STORAGE | STORED | STRATIFY | STRUCT @@ -1843,6 +1845,7 @@ SORT: 'SORT'; SORTED: 'SORTED'; START: 'START'; STATISTICS: 'STATISTICS'; +STORAGE: 'STORAGE'; STORED: 'STORED'; STRATIFY: 'STRATIFY'; STRUCT: 'STRUCT'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index b4f954f6167b9..abd9728998747 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2957,7 +2957,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg withOrigin(ctx) { DescribeNamespace( UnresolvedNamespace(visitMultipartIdentifier(ctx.multipartIdentifier())), - ctx.EXTENDED != null) + ctx.EXTENDED != null, ctx.STORAGE != null, ctx.CLEAR != null) } def cleanTableProperties( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index eca88f7090d54..dde8bebb3e0be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -262,7 +262,9 @@ case class DropNamespace( */ case class DescribeNamespace( namespace: LogicalPlan, - extended: Boolean) extends Command { + extended: Boolean, + storage: Boolean = false, + clear: Boolean = false) extends Command { override def children: Seq[LogicalPlan] = Seq(namespace) override def output: Seq[Attribute] = Seq( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 185152a5935f3..f47ab2f336944 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -194,12 +194,12 @@ class ResolveSessionCatalog( case AlterViewUnsetPropertiesStatement(SessionCatalogAndTable(_, tbl), keys, ifExists) => AlterTableUnsetPropertiesCommand(tbl.asTableIdentifier, keys, ifExists, isView = true) - case d @ DescribeNamespace(SessionCatalogAndNamespace(_, ns), _) => + case d @ DescribeNamespace(SessionCatalogAndNamespace(_, ns), _, _, _) => if (ns.length != 1) { throw new AnalysisException( s"The database name is not valid: ${ns.quoted}") } - DescribeDatabaseCommand(ns.head, d.extended) + DescribeDatabaseCommand(ns.head, d.extended, d.storage, d.clear) case AlterNamespaceSetProperties(SessionCatalogAndNamespace(_, ns), properties) => if (ns.length != 1) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 24efd884bd044..5c8fa20c0acf3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -18,10 +18,12 @@ package org.apache.spark.sql.execution.command import java.net.URI -import java.util.Locale +import java.text.SimpleDateFormat +import java.util.{Date, Locale} import java.util.concurrent.TimeUnit._ import scala.collection.{GenMap, GenSeq} +import scala.collection.mutable.ArrayBuffer import scala.collection.parallel.ForkJoinTaskSupport import scala.collection.parallel.immutable.ParVector import scala.util.control.NonFatal @@ -29,6 +31,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} +import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix import org.apache.spark.CarmelConcurrentModifiedException import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD @@ -193,7 +196,9 @@ case class AlterDatabaseSetLocationCommand(databaseName: String, location: Strin */ case class DescribeDatabaseCommand( databaseName: String, - extended: Boolean) + extended: Boolean, + storage: Boolean = false, + clear: Boolean = false) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { @@ -206,7 +211,7 @@ case class DescribeDatabaseCommand( Row("Location", CatalogUtils.URIToString(dbMetadata.locationUri)):: Row("Owner", allDbProperties.getOrElse(PROP_OWNER, "")) :: Nil - if (extended) { + val extendedResult: Seq[Row] = if (extended) { val properties = allDbProperties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES val propertiesStr = if (properties.isEmpty) { @@ -218,12 +223,129 @@ case class DescribeDatabaseCommand( } else { result } + if (storage) { + extendedResult ++ getDatabaseStorage(sparkSession, dbMetadata) + } else { + extendedResult + } } override val output: Seq[Attribute] = { AttributeReference("database_description_item", StringType, nullable = false)() :: AttributeReference("database_description_value", StringType, nullable = false)() :: Nil } + + private def getDatabaseStorage(sparkSession: SparkSession, + dbMetadata: CatalogDatabase): Seq[Row] = { + val result = new ArrayBuffer[Row] + val catalog = sparkSession.sessionState.catalog + val dbLocation = dbMetadata.locationUri.getPath + val hadoopConf = sparkSession.sessionState.newHadoopConf() + hadoopConf.set("fs.viewfs.rename.strategy", "SAME_FILESYSTEM_ACROSS_MOUNTPOINT") + val databasePath = new Path(dbLocation) + val fs = databasePath.getFileSystem(hadoopConf) + val dbPath = fs.makeQualified(databasePath) + result += Row("## Database Storage Summary", "") + val summary = fs.getContentSummary(dbPath) + val subStatus = fs.listStatus(dbPath).sortWith { + (fss1, fss2) => getLen(fss1, fs) > getLen(fss2, fs) + } + val (spaceQuotaStr, spaceQuotaRemStr) = if (summary.getSpaceQuota > 0L) { + (formatSize(summary.getSpaceQuota), + formatSize(summary.getSpaceQuota - summary.getSpaceConsumed)) + } else { + ("none", "inf") + } + result += Row("space quota", spaceQuotaStr) + result += Row("remaining space quota", spaceQuotaRemStr) + result += Row("total tables", String.valueOf(subStatus.length)) + + result += Row("", "") + val k = Math.min(15, subStatus.length) + result += Row(s"## Top $k Directories", "Size") + for (i <- 0 until k) { + val len = getLen(subStatus(i), fs) + result += Row(subStatus(i).getPath.toString, formatSize(len)) + } + + val tableLocations = catalog.listTables(databaseName).map { t => + val tp = new Path(catalog.getTableMetadata(t).location.getPath) + fs.makeQualified(tp).toString + }.toSet + + val leakStatus = subStatus.filterNot { s => + tableLocations.contains(fs.makeQualified(s.getPath).toString) + } + + result += Row("", "") + val message = if (clear) { + "try to clear" + } else if (leakStatus.nonEmpty) { + s"please contact Carmel-Oncall through carmel-support " + + s"slack channel to clear these ${leakStatus.length} storage leak paths" + } else { + "empty" + } + result += Row(s"## Suspicious Storage Leak ($message)", "Size") + + val baseTrash = sparkSession.sessionState.conf.getConfString( + "spark.sql.storage.leak.trash", "/tmp/spark/storage_leak_manual") + val baseTrashPath = fs.makeQualified(new Path(baseTrash)) + val deleteDate = new SimpleDateFormat("yyyy-MM-dd").format(new Date) + val trashPath = new Path(baseTrashPath, deleteDate) + // Ensure the temporary exists, e.g., /tmp/spark/storage_leak_manual/2021-08-01 + if (!fs.exists(trashPath)) fs.mkdirs(trashPath) + + if (leakStatus.nonEmpty) { + leakStatus.foreach { ls => + val lp = ls.getPath.toString + var state = lp + if (clear) { + val ok = performFakeDelete(lp, trashPath, fs) + val prefix = if (ok) "success" else "failure" + state = s"($prefix) " + state + } + result += Row(state, formatSize(getLen(ls, fs))) + } + } + result + } + + private def getLen(fss: FileStatus, fs: FileSystem): Long = { + if (fss.isFile) { + return fss.getLen + } + var dirLength = 0L + try { + dirLength = fs.getContentSummary(fss.getPath).getLength + } catch { + case e: Exception => + logWarning(s"Exception when calculating directory size ${fss.getPath}: ${e.toString}") + } + dirLength + } + + private def performFakeDelete(leakPath: String, trashPath: Path, fs: FileSystem): Boolean = { + val dest = leakPath.replace(":", "__").replace("//", "__").replace("/", "__"); + val destPath = new Path(trashPath, dest) + try { + val res = fs.rename(new Path(leakPath), destPath) + if (res) { + logInfo("Successfully moved " + leakPath + " to " + destPath) + } else { + logInfo("Failed to move " + leakPath + " to " + destPath) + } + res + } catch { + case e: Exception => + logWarning("Exception when renaming " + leakPath + " to " + destPath + ": " + e) + false + } + } + + private def formatSize(size: Long): String = { + TraditionalBinaryPrefix.long2String(size, "", 1) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 20fde62059ded..cb46a057df377 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -266,7 +266,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat Nil } - case desc @ DescribeNamespace(ResolvedNamespace(catalog, ns), extended) => + case desc @ DescribeNamespace(ResolvedNamespace(catalog, ns), extended, _, _) => DescribeNamespaceExec(desc.output, catalog.asNamespaceCatalog, ns, extended) :: Nil case desc @ DescribeRelation(r: ResolvedTable, partitionSpec, isExtended) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 209c258dca44d..c70cb1b11f45c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -3309,6 +3309,22 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + + test("CARMEL-5915: Add command to detect HDM storage leak") { + withDatabase("leakDb") { + sql("create database leakDb") + withTable("leakDb.tab1") { + sql("create table leakDb.tab1(id int, name string) using parquet") + val res = sql("desc database storage leakDb").collect() + assert(res.exists(_.getString(0).equals("## Database Storage Summary"))) + assert(res.exists(_.getString(0).equals("## Top 1 Directories"))) + assert(res.exists(_.getString(0).equals("## Suspicious Storage Leak (empty)"))) + + val res2 = sql("desc database storage clear leakDb").collect() + assert(res2.exists(_.getString(0).equals("## Suspicious Storage Leak (try to clear)"))) + } + } + } } object FakeLocalFsFileSystem {