Skip to content

Commit

Permalink
Merge branch 'databrickslabs:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
rportilla-databricks authored Sep 27, 2024
2 parents d3a0e39 + 97b9996 commit 4d2e9a6
Show file tree
Hide file tree
Showing 18 changed files with 475 additions and 278 deletions.
38 changes: 38 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

19 changes: 4 additions & 15 deletions docs/table_persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Table Utilization:
| permissions | RW | | RW | RO | | RO | | |
| jobs | RW | RW | | | RO | | | |
| clusters | RW | RW | | | | | | |
| directfs_in_paths | RW | | | | | | | RW |
| directfs_in_queries | RW | | | | | | | RW |
| external_locations | RW | | | RO | | | | |
| workspace | RW | | RO | | RO | | | |
| workspace_objects | RW | | | | | | | |
Expand All @@ -22,11 +24,11 @@ Table Utilization:
| pipelines | RW | RW | | | | | | |
| groups | RW | | RO | | | | | |
| table_size | RW | | | | | | | |
| table_failures | RW | | | | | | | |
| submit_runs | RW | | | | | | | |
| policies | RW | RW | | | | | | |
| migration_status | | RW | | RW | | RW | | |
| workflow_problems | | | | | | | | RW |
| query_problems | RW | | | | | | | RW |
| workflow_problems | RW | | | | | | | RW |
| udfs | RW | RW | RO | | | | | |
| logs | RW | | RW | RW | | RW | RW | |
| recon_results | | | | | | | RW | |
Expand All @@ -53,19 +55,6 @@ Holds Inventory of all tables in all databases and their relevant metadata.

<br/>

#### _$inventory_.table_failures

Holds failures that occurred during crawling HMS tables

| Column | Datatype | Description | Comments |
|----------|----------|----------------------------------------------------------|----------|
| catalog | string | Original catalog of the table. hive_metastore by default |
| database | string | Original schema of the table |
| name | string | Name of the table |
| failures | string | Exception message context |

<br/>

#### _$inventory_.grants

Inventory of all Table ACLs for tables indexed in tables table.
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/__about__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# DO NOT MODIFY THIS FILE
__version__ = "0.37.0"
__version__ = "0.38.0"
10 changes: 6 additions & 4 deletions src/databricks/labs/ucx/assessment/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ class Assessment(Workflow):
def __init__(self):
super().__init__('assessment')

@job_task(notebook="hive_metastore/tables.scala")
@job_task
def crawl_tables(self, ctx: RuntimeContext):
"""Iterates over all tables in the Hive Metastore of the current workspace and persists their metadata, such
as _database name_, _table name_, _table type_, _table location_, etc., in the table named
`$inventory_database.tables`. The metadata stored is then used in the subsequent tasks and workflows to, for
example, find all Hive Metastore tables that cannot easily be migrated to Unity Catalog."""
as _database name_, _table name_, _table type_, _table location_, etc., in the Delta table named
`$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata
stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
cannot easily be migrated to Unity Catalog."""
ctx.tables_crawler.snapshot()

@job_task
def crawl_udfs(self, ctx: RuntimeContext):
Expand Down
5 changes: 5 additions & 0 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from databricks.labs.ucx.contexts.application import GlobalContext
from databricks.labs.ucx.hive_metastore import TablesInMounts
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder


Expand Down Expand Up @@ -81,6 +82,10 @@ def policies_crawler(self):
def global_init_scripts_crawler(self):
return GlobalInitScriptCrawler(self.workspace_client, self.sql_backend, self.inventory_database)

@cached_property
def tables_crawler(self):
return FasterTableScanCrawler(self.sql_backend, self.inventory_database)

@cached_property
def tables_in_mounts(self):
return TablesInMounts(
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/hive_metastore/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def load(self) -> list[Rule]:
try:
return self._installation.load(list[Rule], filename=self.FILENAME)
except NotFound:
msg = "Please run: databricks labs ucx table-mapping"
msg = "Please run: databricks labs ucx create-table-mapping"
raise ValueError(msg) from None

def skip_table_or_view(self, schema_name: str, table_name: str, load_table: Callable[[str, str], Table | None]):
Expand Down
51 changes: 40 additions & 11 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,14 @@ def _crawl(self) -> Iterable[Table]:
After performing initial scan of all tables, starts making parallel
DESCRIBE TABLE EXTENDED queries for every table.
Production tasks would most likely be executed through `tables.scala`
Production tasks would most likely be executed through FasterTableScanCrawler
within `crawl_tables` task due to `spark.sharedState.externalCatalog`
lower-level APIs not requiring a roundtrip to storage, which is not
possible for Azure storage with credentials supplied through Spark
conf (see https://github.com/databrickslabs/ucx/issues/249).
FasterTableScanCrawler uses the _jsparkSession to utilize faster scanning with Scala APIs.
See also https://github.com/databrickslabs/ucx/issues/247
"""
tasks = []
Expand Down Expand Up @@ -475,10 +477,14 @@ def _describe(self, catalog: str, database: str, table: str) -> Table | None:


class FasterTableScanCrawler(CrawlerBase):
def _try_fetch(self) -> Iterable[Table]:
"""Tries to load table information from the database or throws TABLE_OR_VIEW_NOT_FOUND error"""
for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"):
yield Table(*row)
"""
FasterTableScanCrawler is a specialized version of TablesCrawler that uses spark._jsparkSession to utilize
faster scanning with Scala APIs.
For integration testing, FasterTableScanCrawler is tested using the larger assessment test rather than
just the class. Testing the class individually would require utilizing a remote spark connection with the
Databricks workspace.
"""

def __init__(self, backend: SqlBackend, schema, include_databases: list[str] | None = None):
self._backend = backend
Expand All @@ -504,17 +510,26 @@ def _option_as_python(scala_option: typing.Any):
return scala_option.get() if scala_option.isDefined() else None

def _all_databases(self) -> list[str]:
if not self._include_database:
return list(self._iterator(self._external_catalog.listDatabases()))
return self._include_database
try:
if not self._include_database:
return list(self._iterator(self._external_catalog.listDatabases()))
return self._include_database
except Exception as err: # pylint: disable=broad-exception-caught
logger.error(f"failed-table-crawl: listing databases -> catalog : {err}", exc_info=True)
return []

def _list_tables(self, database: str) -> list[str]:
try:
return list(self._iterator(self._external_catalog.listTables(database)))
except Exception as err: # pylint: disable=broad-exception-caught
logger.warning(f"Failed to list tables in {database}: {err}")
logger.warning(f"failed-table-crawl: listing tables from database -> {database} : {err}", exc_info=True)
return []

def _try_fetch(self) -> Iterable[Table]:
"""Tries to load table information from the database or throws TABLE_OR_VIEW_NOT_FOUND error"""
for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"):
yield Table(*row)

@staticmethod
def _format_properties_list(properties_list: list) -> str:
if len(properties_list) == 0:
Expand Down Expand Up @@ -555,8 +570,8 @@ def _describe(self, catalog, database, table) -> Table | None:
view_text = self._option_as_python(raw_table.viewText())
table_properties = list(self._iterator(raw_table.properties()))
formatted_table_properties = self._format_properties_list(table_properties)
except Exception: # pylint: disable=broad-exception-caught
logger.warning(f"Couldn't fetch information for table: {full_name}", exc_info=True)
except Exception as e: # pylint: disable=broad-exception-caught
logger.warning(f"failed-table-crawl: describing table -> {full_name}: {e}", exc_info=True)
return None
return Table(
catalog=catalog,
Expand All @@ -583,16 +598,30 @@ def _crawl(self) -> Iterable[Table]:
catalog_tables, errors = Threads.gather("describing tables in ", tasks)
if len(errors) > 0:
logger.warning(f"Detected {len(errors)} errors while scanning tables in ")

logger.info(f"Finished scanning {len(catalog_tables)} tables")
return catalog_tables

def _get_table_names(self, database: str) -> list[str]:
"""
Lists tables names in the specified database.
:param database:
:return: list of table names
"""
table_names = []
table_names_batches = Threads.strict('listing tables', [partial(self._list_tables, database)])
for table_batch in table_names_batches:
table_names.extend(table_batch)
return table_names

def _create_describe_tasks(self, catalog: str, database: str, table_names: list[str]) -> list[partial]:
"""
Creates a list of partial functions for describing tables.
:param catalog:
:param database:
:param table_names:
:return: list of partial functions
"""
tasks = []
for table in table_names:
tasks.append(partial(self._describe, catalog, database, table))
Expand Down
123 changes: 0 additions & 123 deletions src/databricks/labs/ucx/hive_metastore/tables.scala

This file was deleted.

Loading

0 comments on commit 4d2e9a6

Please sign in to comment.