Skip to content

Commit

Permalink
modify tables.scala
Browse files Browse the repository at this point in the history
  • Loading branch information
nfx committed Sep 18, 2023
1 parent 25475e8 commit 427e5ea
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 40 deletions.
70 changes: 31 additions & 39 deletions src/databricks/labs/ucx/hive_metastore/tables.scala
Original file line number Diff line number Diff line change
@@ -1,50 +1,42 @@
import spark.implicits._
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.DataFrame

val PERSIST_SCHEMA = "ucx"
val DEBUG = true

// must follow the same structure as databricks.labs.ucx.hive_metastore.tables.Table
case class TableDetails(catalog: String, database: String, name: String, object_type: String,
table_format: String, location: String, view_text: String)

// Get metadata for a given table and map it to a TableDetails case class object
def getTableDetails(db: String, t: String): Option[TableDetails] = {
try {
val table: CatalogTable = spark.sharedState.externalCatalog.getTable(db = db, table = t)
def metadataForAllTables(databases: Seq[String]): DataFrame = {
import spark.implicits._

if (table == null) {
println(s"getTable('${db}.${t}') returned null")
None
val externalCatalog = spark.sharedState.externalCatalog
databases.par.flatMap(databaseName => {
val tables = externalCatalog.listTables(databaseName)
if (tables == null) {
println(s"[WARN][${databaseName}] listTables returned null")
Seq()
} else {
Some(TableDetails("hive_metastore", db, t, table.tableType.name, table.provider.getOrElse("Unknown"),
table.storage.locationUri.getOrElse("None").toString, table.viewText.getOrElse("None"), "", // TBD: Set WS ID
table.createTime, table.lastAccessTime))
tables.par.map(tableName => try {
val table = externalCatalog.getTable(databaseName, tableName)
if (table == null) {
println(s"[WARN][${databaseName}.${tableName}] result is null")
None
} else {
Some(TableDetails("hive_metastore", databaseName, tableName, table.tableType.name, table.provider.orNull,
table.storage.locationUri.map(_.toString).orNull, table.viewText.orNull))
}
} catch {
case err: Throwable =>
println(s"[ERROR][${databaseName}.${tableName}] ignoring table because of ${err}")
None
}).toList.collect {
case Some(x) => x
}
}
} catch {
case err: Throwable =>
println(s"Got some other kind of Throwable exception, ignoring for ${db}.${t}")
println(s"Error: ${err}")
None
}
}

// Retrieve metadata for a database and map it to a list of TableDetails case class objects
def getDbTables(db: String): Seq[TableDetails] = {
val tables = spark.sharedState.externalCatalog.listTables(db)
if (tables == null) {
println(s"listTable('${db}') returned null")
Seq()
} else {
tables.par.map(getTableDetails(db, _)).toList.collect { case Some(x) => x }
}
}).toList.toDF
}

val dbs = spark.sharedState.externalCatalog.listDatabases()

// create schema if not available
spark.sql(s"CREATE SCHEMA IF NOT EXISTS ${PERSIST_SCHEMA}")

val df = dbs.par.flatMap(getDbTables).toList.toDF
dbutils.widgets.text("inventory_database", "ucx")
val inventoryDatabase = dbutils.widgets.get("inventory_database")

// write rows to table
df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(s"${PERSIST_SCHEMA}.tables")
val df = metadataForAllTables(spark.sharedState.externalCatalog.listDatabases())
df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(s"$inventoryDatabase.tables")
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def _job_settings(self, step_name: str, dbfs_path: str):
"tasks": [self._job_task(task, dbfs_path) for task in tasks],
}

def _job_task(self, task: Task, dbfs_path: str):
def _job_task(self, task: Task, dbfs_path: str) -> jobs.Task:
jobs_task = jobs.Task(
task_key=task.name,
job_cluster_key=task.job_cluster,
Expand Down

0 comments on commit 427e5ea

Please sign in to comment.