Skip to content

Commit

Permalink
Added Py4j implementation of tables crawler to retrieve a list of HMS…
Browse files Browse the repository at this point in the history
… tables in the assessment workflow (#2579)

## Changes
Tests, fixes and uses the new FasterTableScanCrawler in the Assessment
Job. The feature flag will determine whether the assessment uses the old
scala code or the new python (using py4j) for better logging during
table scans.

### Linked issues
Fix #2190 

### Functionality
- [x] modified existing workflow: `assesment.crawl_tables` now uses new
py4j crawler over the scala one

### Tests
- [x] added integration tests

---------

Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
  • Loading branch information
pritishpai and nfx authored Sep 26, 2024
1 parent 0b9c4d9 commit 143c637
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 218 deletions.
14 changes: 0 additions & 14 deletions docs/table_persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ Table Utilization:
| pipelines | RW | RW | | | | | | |
| groups | RW | | RO | | | | | |
| table_size | RW | | | | | | | |
| table_failures | RW | | | | | | | |
| submit_runs | RW | | | | | | | |
| policies | RW | RW | | | | | | |
| migration_status | | RW | | RW | | RW | | |
Expand Down Expand Up @@ -53,19 +52,6 @@ Holds Inventory of all tables in all databases and their relevant metadata.

<br/>

#### _$inventory_.table_failures

Holds failures that occurred during crawling HMS tables

| Column | Datatype | Description | Comments |
|----------|----------|----------------------------------------------------------|----------|
| catalog | string | Original catalog of the table. hive_metastore by default |
| database | string | Original schema of the table |
| name | string | Name of the table |
| failures | string | Exception message context |

<br/>

#### _$inventory_.grants

Inventory of all Table ACLs for tables indexed in tables table.
Expand Down
10 changes: 6 additions & 4 deletions src/databricks/labs/ucx/assessment/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ class Assessment(Workflow):
def __init__(self):
super().__init__('assessment')

@job_task(notebook="hive_metastore/tables.scala")
@job_task
def crawl_tables(self, ctx: RuntimeContext):
"""Iterates over all tables in the Hive Metastore of the current workspace and persists their metadata, such
as _database name_, _table name_, _table type_, _table location_, etc., in the table named
`$inventory_database.tables`. The metadata stored is then used in the subsequent tasks and workflows to, for
example, find all Hive Metastore tables that cannot easily be migrated to Unity Catalog."""
as _database name_, _table name_, _table type_, _table location_, etc., in the Delta table named
`$inventory_database.tables`. Note that the `inventory_database` is set in the configuration file. The metadata
stored is then used in the subsequent tasks and workflows to, for example, find all Hive Metastore tables that
cannot easily be migrated to Unity Catalog."""
ctx.tables_crawler.snapshot()

@job_task
def crawl_udfs(self, ctx: RuntimeContext):
Expand Down
5 changes: 5 additions & 0 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from databricks.labs.ucx.contexts.application import GlobalContext
from databricks.labs.ucx.hive_metastore import TablesInMounts
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder


Expand Down Expand Up @@ -81,6 +82,10 @@ def policies_crawler(self):
def global_init_scripts_crawler(self):
return GlobalInitScriptCrawler(self.workspace_client, self.sql_backend, self.inventory_database)

@cached_property
def tables_crawler(self):
return FasterTableScanCrawler(self.sql_backend, self.inventory_database)

@cached_property
def tables_in_mounts(self):
return TablesInMounts(
Expand Down
51 changes: 40 additions & 11 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,14 @@ def _crawl(self) -> Iterable[Table]:
After performing initial scan of all tables, starts making parallel
DESCRIBE TABLE EXTENDED queries for every table.
Production tasks would most likely be executed through `tables.scala`
Production tasks would most likely be executed through FasterTableScanCrawler
within `crawl_tables` task due to `spark.sharedState.externalCatalog`
lower-level APIs not requiring a roundtrip to storage, which is not
possible for Azure storage with credentials supplied through Spark
conf (see https://github.com/databrickslabs/ucx/issues/249).
FasterTableScanCrawler uses the _jsparkSession to utilize faster scanning with Scala APIs.
See also https://github.com/databrickslabs/ucx/issues/247
"""
tasks = []
Expand Down Expand Up @@ -475,10 +477,14 @@ def _describe(self, catalog: str, database: str, table: str) -> Table | None:


class FasterTableScanCrawler(CrawlerBase):
def _try_fetch(self) -> Iterable[Table]:
"""Tries to load table information from the database or throws TABLE_OR_VIEW_NOT_FOUND error"""
for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"):
yield Table(*row)
"""
FasterTableScanCrawler is a specialized version of TablesCrawler that uses spark._jsparkSession to utilize
faster scanning with Scala APIs.
For integration testing, FasterTableScanCrawler is tested using the larger assessment test rather than
just the class. Testing the class individually would require utilizing a remote spark connection with the
Databricks workspace.
"""

def __init__(self, backend: SqlBackend, schema, include_databases: list[str] | None = None):
self._backend = backend
Expand All @@ -504,17 +510,26 @@ def _option_as_python(scala_option: typing.Any):
return scala_option.get() if scala_option.isDefined() else None

def _all_databases(self) -> list[str]:
if not self._include_database:
return list(self._iterator(self._external_catalog.listDatabases()))
return self._include_database
try:
if not self._include_database:
return list(self._iterator(self._external_catalog.listDatabases()))
return self._include_database
except Exception as err: # pylint: disable=broad-exception-caught
logger.error(f"failed-table-crawl: listing databases -> catalog : {err}", exc_info=True)
return []

def _list_tables(self, database: str) -> list[str]:
try:
return list(self._iterator(self._external_catalog.listTables(database)))
except Exception as err: # pylint: disable=broad-exception-caught
logger.warning(f"Failed to list tables in {database}: {err}")
logger.warning(f"failed-table-crawl: listing tables from database -> {database} : {err}", exc_info=True)
return []

def _try_fetch(self) -> Iterable[Table]:
"""Tries to load table information from the database or throws TABLE_OR_VIEW_NOT_FOUND error"""
for row in self._fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"):
yield Table(*row)

@staticmethod
def _format_properties_list(properties_list: list) -> str:
if len(properties_list) == 0:
Expand Down Expand Up @@ -555,8 +570,8 @@ def _describe(self, catalog, database, table) -> Table | None:
view_text = self._option_as_python(raw_table.viewText())
table_properties = list(self._iterator(raw_table.properties()))
formatted_table_properties = self._format_properties_list(table_properties)
except Exception: # pylint: disable=broad-exception-caught
logger.warning(f"Couldn't fetch information for table: {full_name}", exc_info=True)
except Exception as e: # pylint: disable=broad-exception-caught
logger.warning(f"failed-table-crawl: describing table -> {full_name}: {e}", exc_info=True)
return None
return Table(
catalog=catalog,
Expand All @@ -583,16 +598,30 @@ def _crawl(self) -> Iterable[Table]:
catalog_tables, errors = Threads.gather("describing tables in ", tasks)
if len(errors) > 0:
logger.warning(f"Detected {len(errors)} errors while scanning tables in ")

logger.info(f"Finished scanning {len(catalog_tables)} tables")
return catalog_tables

def _get_table_names(self, database: str) -> list[str]:
"""
Lists tables names in the specified database.
:param database:
:return: list of table names
"""
table_names = []
table_names_batches = Threads.strict('listing tables', [partial(self._list_tables, database)])
for table_batch in table_names_batches:
table_names.extend(table_batch)
return table_names

def _create_describe_tasks(self, catalog: str, database: str, table_names: list[str]) -> list[partial]:
"""
Creates a list of partial functions for describing tables.
:param catalog:
:param database:
:param table_names:
:return: list of partial functions
"""
tasks = []
for table in table_names:
tasks.append(partial(self._describe, catalog, database, table))
Expand Down
123 changes: 0 additions & 123 deletions src/databricks/labs/ucx/hive_metastore/tables.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
--title 'Table Crawl Failures'
--height 4
--width 4
*/
WITH latest_job_runs AS (
SELECT
timestamp,
job_id,
job_run_id
FROM (
SELECT
CAST(timestamp AS TIMESTAMP) AS timestamp,
job_id,
job_run_id,
ROW_NUMBER() OVER (PARTITION BY job_id ORDER BY CAST(timestamp AS TIMESTAMP) DESC) = 1 AS latest_run_of_job
FROM inventory.logs
)
WHERE
latest_run_of_job
), logs_latest_job_runs AS (
SELECT
CAST(logs.timestamp AS TIMESTAMP) AS timestamp,
message,
job_run_id,
job_id,
workflow_name,
task_name
FROM inventory.logs
JOIN latest_job_runs
USING (job_id, job_run_id)
WHERE
workflow_name IN ('assessment')
), table_crawl_failures AS (
SELECT
timestamp,
REGEXP_EXTRACT(message, '^failed-table-crawl: (.+?) -> (.+?): (.+)$', 1) AS error_reason,
REGEXP_EXTRACT(message, '^failed-table-crawl: (.+?) -> (.+?): (.+)$', 2) AS error_entity,
REGEXP_EXTRACT(message, '^failed-table-crawl: (.+?) -> (.+?): (.+)$', 3) AS error_message,
job_run_id,
job_id,
workflow_name,
task_name
FROM logs_latest_job_runs
WHERE
STARTSWITH(message, 'failed-table-crawl: ')
)
SELECT
timestamp,
error_reason,
error_entity,
error_message,
job_run_id,
job_id,
workflow_name,
task_name
FROM table_crawl_failures
ORDER BY
1
Loading

0 comments on commit 143c637

Please sign in to comment.