Skip to content

Commit

Permalink
[CARMEL-5915] Add command to detect HDM storage leak (#904)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenzhu authored and GitHub Enterprise committed Apr 15, 2022
1 parent f27f1d5 commit 8c2a7dd
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ statement
| SHOW CREATE TABLE multipartIdentifier (AS SERDE)? #showCreateTable
| SHOW CURRENT NAMESPACE #showCurrentNamespace
| (DESC | DESCRIBE) FUNCTION EXTENDED? describeFuncName #describeFunction
| (DESC | DESCRIBE) namespace EXTENDED?
| (DESC | DESCRIBE) namespace EXTENDED? STORAGE? CLEAR?
multipartIdentifier #describeNamespace
| (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)?
multipartIdentifier partitionSpec? describeColName? #describeRelation
Expand Down Expand Up @@ -1286,6 +1286,7 @@ ansiNonReserved
| SHOW
| SKEWED
| SORT
| STORAGE
| SORTED
| START
| STATISTICS
Expand Down Expand Up @@ -1564,6 +1565,7 @@ nonReserved
| SORTED
| START
| STATISTICS
| STORAGE
| STORED
| STRATIFY
| STRUCT
Expand Down Expand Up @@ -1843,6 +1845,7 @@ SORT: 'SORT';
SORTED: 'SORTED';
START: 'START';
STATISTICS: 'STATISTICS';
STORAGE: 'STORAGE';
STORED: 'STORED';
STRATIFY: 'STRATIFY';
STRUCT: 'STRUCT';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2957,7 +2957,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
withOrigin(ctx) {
DescribeNamespace(
UnresolvedNamespace(visitMultipartIdentifier(ctx.multipartIdentifier())),
ctx.EXTENDED != null)
ctx.EXTENDED != null, ctx.STORAGE != null, ctx.CLEAR != null)
}

def cleanTableProperties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,9 @@ case class DropNamespace(
*/
case class DescribeNamespace(
namespace: LogicalPlan,
extended: Boolean) extends Command {
extended: Boolean,
storage: Boolean = false,
clear: Boolean = false) extends Command {
override def children: Seq[LogicalPlan] = Seq(namespace)

override def output: Seq[Attribute] = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ class ResolveSessionCatalog(
case AlterViewUnsetPropertiesStatement(SessionCatalogAndTable(_, tbl), keys, ifExists) =>
AlterTableUnsetPropertiesCommand(tbl.asTableIdentifier, keys, ifExists, isView = true)

case d @ DescribeNamespace(SessionCatalogAndNamespace(_, ns), _) =>
case d @ DescribeNamespace(SessionCatalogAndNamespace(_, ns), _, _, _) =>
if (ns.length != 1) {
throw new AnalysisException(
s"The database name is not valid: ${ns.quoted}")
}
DescribeDatabaseCommand(ns.head, d.extended)
DescribeDatabaseCommand(ns.head, d.extended, d.storage, d.clear)

case AlterNamespaceSetProperties(SessionCatalogAndNamespace(_, ns), properties) =>
if (ns.length != 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
package org.apache.spark.sql.execution.command

import java.net.URI
import java.util.Locale
import java.text.SimpleDateFormat
import java.util.{Date, Locale}
import java.util.concurrent.TimeUnit._

import scala.collection.{GenMap, GenSeq}
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.immutable.ParVector
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix

import org.apache.spark.CarmelConcurrentModifiedException
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
Expand Down Expand Up @@ -193,7 +196,9 @@ case class AlterDatabaseSetLocationCommand(databaseName: String, location: Strin
*/
case class DescribeDatabaseCommand(
databaseName: String,
extended: Boolean)
extended: Boolean,
storage: Boolean = false,
clear: Boolean = false)
extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
Expand All @@ -206,7 +211,7 @@ case class DescribeDatabaseCommand(
Row("Location", CatalogUtils.URIToString(dbMetadata.locationUri))::
Row("Owner", allDbProperties.getOrElse(PROP_OWNER, "")) :: Nil

if (extended) {
val extendedResult: Seq[Row] = if (extended) {
val properties = allDbProperties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES
val propertiesStr =
if (properties.isEmpty) {
Expand All @@ -218,12 +223,129 @@ case class DescribeDatabaseCommand(
} else {
result
}
if (storage) {
extendedResult ++ getDatabaseStorage(sparkSession, dbMetadata)
} else {
extendedResult
}
}

override val output: Seq[Attribute] = {
AttributeReference("database_description_item", StringType, nullable = false)() ::
AttributeReference("database_description_value", StringType, nullable = false)() :: Nil
}

private def getDatabaseStorage(sparkSession: SparkSession,
dbMetadata: CatalogDatabase): Seq[Row] = {
val result = new ArrayBuffer[Row]
val catalog = sparkSession.sessionState.catalog
val dbLocation = dbMetadata.locationUri.getPath
val hadoopConf = sparkSession.sessionState.newHadoopConf()
hadoopConf.set("fs.viewfs.rename.strategy", "SAME_FILESYSTEM_ACROSS_MOUNTPOINT")
val databasePath = new Path(dbLocation)
val fs = databasePath.getFileSystem(hadoopConf)
val dbPath = fs.makeQualified(databasePath)
result += Row("## Database Storage Summary", "")
val summary = fs.getContentSummary(dbPath)
val subStatus = fs.listStatus(dbPath).sortWith {
(fss1, fss2) => getLen(fss1, fs) > getLen(fss2, fs)
}
val (spaceQuotaStr, spaceQuotaRemStr) = if (summary.getSpaceQuota > 0L) {
(formatSize(summary.getSpaceQuota),
formatSize(summary.getSpaceQuota - summary.getSpaceConsumed))
} else {
("none", "inf")
}
result += Row("space quota", spaceQuotaStr)
result += Row("remaining space quota", spaceQuotaRemStr)
result += Row("total tables", String.valueOf(subStatus.length))

result += Row("", "")
val k = Math.min(15, subStatus.length)
result += Row(s"## Top $k Directories", "Size")
for (i <- 0 until k) {
val len = getLen(subStatus(i), fs)
result += Row(subStatus(i).getPath.toString, formatSize(len))
}

val tableLocations = catalog.listTables(databaseName).map { t =>
val tp = new Path(catalog.getTableMetadata(t).location.getPath)
fs.makeQualified(tp).toString
}.toSet

val leakStatus = subStatus.filterNot { s =>
tableLocations.contains(fs.makeQualified(s.getPath).toString)
}

result += Row("", "")
val message = if (clear) {
"try to clear"
} else if (leakStatus.nonEmpty) {
s"please contact Carmel-Oncall through carmel-support " +
s"slack channel to clear these ${leakStatus.length} storage leak paths"
} else {
"empty"
}
result += Row(s"## Suspicious Storage Leak ($message)", "Size")

val baseTrash = sparkSession.sessionState.conf.getConfString(
"spark.sql.storage.leak.trash", "/tmp/spark/storage_leak_manual")
val baseTrashPath = fs.makeQualified(new Path(baseTrash))
val deleteDate = new SimpleDateFormat("yyyy-MM-dd").format(new Date)
val trashPath = new Path(baseTrashPath, deleteDate)
// Ensure the temporary exists, e.g., /tmp/spark/storage_leak_manual/2021-08-01
if (!fs.exists(trashPath)) fs.mkdirs(trashPath)

if (leakStatus.nonEmpty) {
leakStatus.foreach { ls =>
val lp = ls.getPath.toString
var state = lp
if (clear) {
val ok = performFakeDelete(lp, trashPath, fs)
val prefix = if (ok) "success" else "failure"
state = s"($prefix) " + state
}
result += Row(state, formatSize(getLen(ls, fs)))
}
}
result
}

private def getLen(fss: FileStatus, fs: FileSystem): Long = {
if (fss.isFile) {
return fss.getLen
}
var dirLength = 0L
try {
dirLength = fs.getContentSummary(fss.getPath).getLength
} catch {
case e: Exception =>
logWarning(s"Exception when calculating directory size ${fss.getPath}: ${e.toString}")
}
dirLength
}

private def performFakeDelete(leakPath: String, trashPath: Path, fs: FileSystem): Boolean = {
val dest = leakPath.replace(":", "__").replace("//", "__").replace("/", "__");
val destPath = new Path(trashPath, dest)
try {
val res = fs.rename(new Path(leakPath), destPath)
if (res) {
logInfo("Successfully moved " + leakPath + " to " + destPath)
} else {
logInfo("Failed to move " + leakPath + " to " + destPath)
}
res
} catch {
case e: Exception =>
logWarning("Exception when renaming " + leakPath + " to " + destPath + ": " + e)
false
}
}

private def formatSize(size: Long): String = {
TraditionalBinaryPrefix.long2String(size, "", 1)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
Nil
}

case desc @ DescribeNamespace(ResolvedNamespace(catalog, ns), extended) =>
case desc @ DescribeNamespace(ResolvedNamespace(catalog, ns), extended, _, _) =>
DescribeNamespaceExec(desc.output, catalog.asNamespaceCatalog, ns, extended) :: Nil

case desc @ DescribeRelation(r: ResolvedTable, partitionSpec, isExtended) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3309,6 +3309,22 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}
}

test("CARMEL-5915: Add command to detect HDM storage leak") {
withDatabase("leakDb") {
sql("create database leakDb")
withTable("leakDb.tab1") {
sql("create table leakDb.tab1(id int, name string) using parquet")
val res = sql("desc database storage leakDb").collect()
assert(res.exists(_.getString(0).equals("## Database Storage Summary")))
assert(res.exists(_.getString(0).equals("## Top 1 Directories")))
assert(res.exists(_.getString(0).equals("## Suspicious Storage Leak (empty)")))

val res2 = sql("desc database storage clear leakDb").collect()
assert(res2.exists(_.getString(0).equals("## Suspicious Storage Leak (try to clear)")))
}
}
}
}

object FakeLocalFsFileSystem {
Expand Down

0 comments on commit 8c2a7dd

Please sign in to comment.