From 8178d8a77320db16eacb9406d9f17ad2d952a11b Mon Sep 17 00:00:00 2001 From: S Daniel Zafar Date: Thu, 24 Oct 2024 10:34:46 -0600 Subject: [PATCH 1/8] Improve scan tables in mounts (#2767) ## Changes Make the `scan-tables-in-mounts` functionality much more robust and comprehensive, including adding new unit tests and fixing various bugs. We wanted to use this for a current UC migration for a large retailer and when we ran the job originally it skipped most of the directories and found 8 total tables. With my changes we now parse many many tables. I also worked through a number of bugs. I did my best to make minimal revisions to the existing code, but I suggest we refactor this to use `os` to list directories in the future instead of `dbutils` so that we can parallelize the operations making it scalable. Fixes #2540 ### Functionality - [X] modified existing workflow: scan-tables-in-mounts-experimental ### Tests - [X] manually tested - [X] added unit tests - [ ] added integration tests --------- Co-authored-by: dan.zafar --- .../labs/ucx/hive_metastore/locations.py | 89 ++++-- .../labs/ucx/source_code/known.json | 2 +- tests/unit/hive_metastore/test_locations.py | 267 +++++++++++++++--- 3 files changed, 290 insertions(+), 68 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/locations.py b/src/databricks/labs/ucx/hive_metastore/locations.py index ff011fb535..e948442463 100644 --- a/src/databricks/labs/ucx/hive_metastore/locations.py +++ b/src/databricks/labs/ucx/hive_metastore/locations.py @@ -20,7 +20,6 @@ logger = logging.getLogger(__name__) - _EXTERNAL_FILE_LOCATION_SCHEMES = ("s3", "s3a", "s3n", "gcs", "abfss") @@ -566,46 +565,71 @@ def _find_delta_log_folders( if delta_log_folders is None: delta_log_folders = {} logger.info(f"Listing {root_dir}") - file_infos = self._dbutils.fs.ls(root_dir) + file_infos = self._dbutils.fs.ls(root_dir) or [] for file_info in file_infos: if self._is_irrelevant(file_info.name) or file_info.path == root_dir: logger.debug(f"Path {file_info.path} is irrelevant") continue root_path = os.path.dirname(root_dir) - previous_entry = delta_log_folders.get(root_path) + parent_entry = delta_log_folders.get(root_path) table_in_mount = self._assess_path(file_info) - if previous_entry: - # Happens when first folder was _delta_log and next folders are partitioned folder - if previous_entry.format == "DELTA" and self._is_partitioned(file_info.name): - delta_log_folders[root_path] = TableInMount(format=previous_entry.format, is_partitioned=True) - # Happens when previous entries where partitioned folders and the current one is delta_log - if previous_entry.is_partitioned and table_in_mount and table_in_mount.format == "DELTA": - delta_log_folders[root_path] = TableInMount(format=table_in_mount.format, is_partitioned=True) - continue + if parent_entry: + # if the root_path was already found to be partitioned, we can break to reduce redundant ops + if delta_log_folders[root_path] == TableInMount(format="DELTA", is_partitioned=True): + break - if self._is_partitioned(file_info.name): - partition_format = self._find_partition_file_format(file_info.path) - if partition_format: - delta_log_folders[root_path] = partition_format - continue + if ( + # Happens when first folder was _delta_log and next folders are partitioned folder + parent_entry.format == "DELTA" + and self._is_partitioned(file_info.name) + ) or ( + # Happens when previous entries where partitioned folders and the current one is delta_log + parent_entry.is_partitioned + and table_in_mount + and table_in_mount.format == "DELTA" + ): + delta_log_folders[root_path] = TableInMount(format="DELTA", is_partitioned=True) + logger.debug(f"Added DELTA table for {root_path} (partitioned delta)") - if not table_in_mount: - self._find_delta_log_folders(file_info.path, delta_log_folders) - continue + self._recurse_dir_if_needed(file_info, delta_log_folders) + + elif self._is_partitioned(file_info.name): + self._handle_partitioned_file(file_info.path, root_path, delta_log_folders) + + elif not table_in_mount: + self._recurse_dir_if_needed(file_info, delta_log_folders) + + elif table_in_mount.format == "DELTA" and file_info.name == "_delta_log/": + delta_log_folders[root_path] = table_in_mount + logger.debug(f"Added {table_in_mount.format} table for {root_path} (normal delta)") + + else: + delta_log_folders[root_path] = table_in_mount + logger.debug(f"Added {table_in_mount.format} table for {root_path} (general)") + self._recurse_dir_if_needed(file_info, delta_log_folders) - delta_log_folders[root_path] = table_in_mount return delta_log_folders + def _recurse_dir_if_needed(self, file_info: FileInfo, delta_log_folders: dict[str, TableInMount]) -> None: + if self._is_recursible_dir(file_info): + self._find_delta_log_folders(file_info.path, delta_log_folders) + + def _handle_partitioned_file(self, file_path, root_path, delta_log_folders): + partition_format = self._find_partition_file_format(file_path) + if partition_format: + delta_log_folders[root_path] = partition_format + logger.debug(f"Added {partition_format.format} table for {root_path} (partitioned)") + def _find_partition_file_format(self, root_dir: str) -> TableInMount | None: - logger.info(f"Listing {root_dir}") + logger.info(f"Listing partitioned file {root_dir}") file_infos = self._dbutils.fs.ls(root_dir) for file_info in file_infos: path_extension = self._assess_path(file_info) if path_extension: return TableInMount(format=path_extension.format, is_partitioned=True) - if self._is_partitioned(file_info.name): + if self._is_partitioned(file_info.name) and file_info.path != root_dir: return self._find_partition_file_format(file_info.path) return None @@ -624,6 +648,23 @@ def _assess_path(self, file_info: FileInfo) -> TableInMount | None: return TableInMount(format="PARQUET", is_partitioned=False) return None + def _is_recursible_dir(self, file_info: FileInfo) -> bool: + # Rules for recursing into a folder + # - should be size 0 (usually a directory) + # - should not be the _delta_log directory + # - file should not be partitioned + # - file name should not start with 'part-' + # - no brackets, brackets are not allowed in dbutils.fs.ls + # - is not a streaming checkpoint dir + return ( + file_info.size == 0 + and file_info.name != "_delta_log/" + and not self._is_partitioned(file_info.name) + and not file_info.name.startswith("part-") + and not any(char in file_info.name for char in "[]") + and not self._is_streaming_checkpoint_dir(file_info) + ) + @staticmethod def _is_partitioned(file_name: str) -> bool: return '=' in file_name @@ -645,3 +686,7 @@ def _is_json(file_name: str) -> bool: def _is_irrelevant(self, file_name: str) -> bool: return any(pattern in file_name for pattern in self._fiter_paths) + + def _is_streaming_checkpoint_dir(self, file_info: FileInfo) -> bool: + path_dirs = [file.name for file in self._dbutils.fs.ls(file_info.path) if file.size == 0] + return 'commits/' in path_dirs and 'offsets/' in path_dirs diff --git a/src/databricks/labs/ucx/source_code/known.json b/src/databricks/labs/ucx/source_code/known.json index dac180305d..7e4faad2f4 100644 --- a/src/databricks/labs/ucx/source_code/known.json +++ b/src/databricks/labs/ucx/source_code/known.json @@ -33641,4 +33641,4 @@ "zipp.compat.py310": [], "zipp.glob": [] } -} \ No newline at end of file +} diff --git a/tests/unit/hive_metastore/test_locations.py b/tests/unit/hive_metastore/test_locations.py index 2e0a6306a4..4623ac5b70 100644 --- a/tests/unit/hive_metastore/test_locations.py +++ b/tests/unit/hive_metastore/test_locations.py @@ -301,12 +301,12 @@ def test_match_table_external_locations(): ] == missing_locations -def test_mount_listing_multiple_folders(): +def test_mount_listing_multiple_folders() -> None: client = create_autospec(WorkspaceClient) - first_folder = FileInfo("dbfs:/mnt/test_mount/table1/", "table1/", "", "") - second_folder = FileInfo("dbfs:/mnt/test_mount/table2/", "table2/", "", "") - folder_table1 = FileInfo("dbfs:/mnt/test_mount/table1/_delta_log/", "_delta_log/", "", "") + first_folder = FileInfo("dbfs:/mnt/test_mount/table1/", "table1/", 0, "") + second_folder = FileInfo("dbfs:/mnt/test_mount/table2/", "table2/", 0, "") + folder_table1 = FileInfo("dbfs:/mnt/test_mount/table1/_delta_log/", "_delta_log/", 0, "") folder_table2 = FileInfo("dbfs:/mnt/test_mount/table2/_SUCCESS", "_SUCCESS", "", "") folder_table3 = FileInfo("dbfs:/mnt/test_mount/table2/1.snappy.parquet", "1.snappy.parquet", "", "") @@ -337,10 +337,10 @@ def my_side_effect(path, **_): def test_mount_listing_sub_folders(): client = create_autospec(WorkspaceClient) - first_folder = FileInfo("dbfs:/mnt/test_mount/entity/", "entity/", "", "") - second_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/", "domain/", "", "") - third_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/", "table1/", "", "") - fourth_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/_delta_log/", "_delta_log/", "", "") + first_folder = FileInfo("dbfs:/mnt/test_mount/entity/", "entity/", 0, "") + second_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/", "domain/", 0, "") + third_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/", "table1/", 0, "") + fourth_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/_delta_log/", "_delta_log/", 0, "") fourth_folder_parquet = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/1.parquet", "1.parquet", "", "") delta_log = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/_delta_log/000.json", "000.json", "", "") delta_log_2 = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/_delta_log/001.json", "001.json", "", "") @@ -382,10 +382,10 @@ def my_side_effect(path, **_): def test_partitioned_parquet_layout(): client = create_autospec(WorkspaceClient) - first_folder = FileInfo("dbfs:/mnt/test_mount/entity/", "entity/", "", "") - first_partition = FileInfo("dbfs:/mnt/test_mount/entity/xxx=yyy/", "xxx=yyy/", "", "") + first_folder = FileInfo("dbfs:/mnt/test_mount/entity/", "entity/", 0, "") + first_partition = FileInfo("dbfs:/mnt/test_mount/entity/xxx=yyy/", "xxx=yyy/", 0, "") first_partition_files = FileInfo("dbfs:/mnt/test_mount/entity/xxx=yyy/1.parquet", "1.parquet", "", "") - second_partition = FileInfo("dbfs:/mnt/test_mount/entity/xxx=zzz/", "xxx=zzz/", "", "") + second_partition = FileInfo("dbfs:/mnt/test_mount/entity/xxx=zzz/", "xxx=zzz/", 0, "") second_partition_files = FileInfo("dbfs:/mnt/test_mount/entity/xxx=zzz/1.parquet", "1.parquet", "", "") def my_side_effect(path, **_): @@ -424,20 +424,20 @@ def my_side_effect(path, **_): def test_partitioned_delta(): client = create_autospec(WorkspaceClient) - first_folder = FileInfo("dbfs:/mnt/test_mount/entity/", "entity/", "", "") - first_first_partition = FileInfo("dbfs:/mnt/test_mount/entity/xxx=yyy/", "xxx=yyy/", "", "") + first_folder = FileInfo("dbfs:/mnt/test_mount/entity/", "entity/", 0, "") + first_first_partition = FileInfo("dbfs:/mnt/test_mount/entity/xxx=yyy/", "xxx=yyy/", 0, "") first_first_partition_files = FileInfo("dbfs:/mnt/test_mount/entity/xxx=yyy/1.parquet", "1.parquet", "", "") - first_second_partition = FileInfo("dbfs:/mnt/test_mount/entity/xxx=zzz/", "xxx=zzz/", "", "") + first_second_partition = FileInfo("dbfs:/mnt/test_mount/entity/xxx=zzz/", "xxx=zzz/", 0, "") first_second_partition_files = FileInfo("dbfs:/mnt/test_mount/entity/xxx=zzz/1.parquet", "1.parquet", "", "") - first_delta_log = FileInfo("dbfs:/mnt/test_mount/entity/_delta_log/", "_delta_log/", "", "") + first_delta_log = FileInfo("dbfs:/mnt/test_mount/entity/_delta_log/", "_delta_log/", 0, "") - second_folder = FileInfo("dbfs:/mnt/test_mount/entity_2/", "entity_2/", "", "") - second_first_partition = FileInfo("dbfs:/mnt/test_mount/entity_2/xxx=yyy/", "xxx=yyy/", "", "") - second_second_partition = FileInfo("dbfs:/mnt/test_mount/entity_2/xxx=yyy/aaa=bbb/", "aaa=bbb/", "", "") + second_folder = FileInfo("dbfs:/mnt/test_mount/entity_2/", "entity_2/", 0, "") + second_first_partition = FileInfo("dbfs:/mnt/test_mount/entity_2/xxx=yyy/", "xxx=yyy/", 0, "") + second_second_partition = FileInfo("dbfs:/mnt/test_mount/entity_2/xxx=yyy/aaa=bbb/", "aaa=bbb/", 0, "") second_second_partition_files = FileInfo( "dbfs:/mnt/test_mount/entity_2/xxx=yyy/aaa=bbb/1.parquet", "1.parquet", "", "" ) - second_delta_log = FileInfo("dbfs:/mnt/test_mount/entity_2/_delta_log/", "_delta_log/", "", "") + second_delta_log = FileInfo("dbfs:/mnt/test_mount/entity_2/_delta_log/", "_delta_log/", 0, "") def my_side_effect(path, **_): if path == "/mnt/test_mount": @@ -475,10 +475,10 @@ def my_side_effect(path, **_): def test_filtering_irrelevant_paths(): client = create_autospec(WorkspaceClient) - first_folder = FileInfo("dbfs:/mnt/test_mount/table1/", "table1/", "", "") - second_folder = FileInfo("dbfs:/mnt/test_mount/$_azuretempfolder/", "$_azuretempfolder/", "", "") - first_folder_delta_log = FileInfo("dbfs:/mnt/test_mount/table1/_delta_log/", "_delta_log/", "", "") - second_folder_delta_log = FileInfo("dbfs:/mnt/test_mount/$_azuretempfolder/_delta_log/", "_delta_log/", "", "") + first_folder = FileInfo("dbfs:/mnt/test_mount/table1/", "table1/", 0, "") + second_folder = FileInfo("dbfs:/mnt/test_mount/$_azuretempfolder/", "$_azuretempfolder/", 0, "") + first_folder_delta_log = FileInfo("dbfs:/mnt/test_mount/table1/_delta_log/", "_delta_log/", 0, "") + second_folder_delta_log = FileInfo("dbfs:/mnt/test_mount/$_azuretempfolder/_delta_log/", "_delta_log/", 0, "") def my_side_effect(path, **_): if path == "/mnt/test_mount": @@ -507,10 +507,10 @@ def my_side_effect(path, **_): def test_filter_irrelevant_mounts(): client = create_autospec(WorkspaceClient) - first_folder = FileInfo("/mnt/test_mount/table1/", "table1/", "", "") - second_folder = FileInfo("/mnt/test_mount2/table2/", "table2/", "", "") - first_folder_delta_log = FileInfo("/mnt/test_mount/table1/_delta_log/", "_delta_log/", "", "") - second_folder_delta_log = FileInfo("/mnt/test_mount2/table2/_delta_log/", "_delta_log/", "", "") + first_folder = FileInfo("/mnt/test_mount/table1/", "table1/", 0, "") + second_folder = FileInfo("/mnt/test_mount2/table2/", "table2/", 0, "") + first_folder_delta_log = FileInfo("/mnt/test_mount/table1/_delta_log/", "_delta_log/", 0, "") + second_folder_delta_log = FileInfo("/mnt/test_mount2/table2/_delta_log/", "_delta_log/", 0, "") def my_side_effect(path, **_): if path == "/mnt/test_mount": @@ -543,10 +543,10 @@ def my_side_effect(path, **_): def test_historical_data_should_be_overwritten() -> None: client = create_autospec(WorkspaceClient) - first_folder = FileInfo("dbfs:/mnt/test_mount/table1/", "table1/", "", "") - second_folder = FileInfo("dbfs:/mnt/test_mount2/table2/", "table2/", "", "") - first_folder_delta_log = FileInfo("dbfs:/mnt/test_mount/table1/_delta_log/", "_delta_log/", "", "") - second_folder_delta_log = FileInfo("dbfs:/mnt/test_mount2/table2/_delta_log/", "_delta_log/", "", "") + first_folder = FileInfo("dbfs:/mnt/test_mount/table1/", "table1/", 0, "") + second_folder = FileInfo("dbfs:/mnt/test_mount2/table2/", "table2/", 0, "") + first_folder_delta_log = FileInfo("dbfs:/mnt/test_mount/table1/_delta_log/", "_delta_log/", 0, "") + second_folder_delta_log = FileInfo("dbfs:/mnt/test_mount2/table2/_delta_log/", "_delta_log/", 0, "") def my_side_effect(path, **_): if path == "/mnt/test_mount": @@ -601,9 +601,9 @@ def my_side_effect(path, **_): def test_mount_include_paths(): client = create_autospec(WorkspaceClient) - first_folder = FileInfo("dbfs:/mnt/test_mount/table1/", "table1/", "", "") - second_folder = FileInfo("dbfs:/mnt/test_mount/table2/", "table2/", "", "") - folder_table1 = FileInfo("dbfs:/mnt/test_mount/table1/_delta_log/", "_delta_log/", "", "") + first_folder = FileInfo("dbfs:/mnt/test_mount/table1/", "table1/", 0, "") + second_folder = FileInfo("dbfs:/mnt/test_mount/table2/", "table2/", 0, "") + folder_table1 = FileInfo("dbfs:/mnt/test_mount/table1/_delta_log/", "_delta_log/", 0, "") folder_table2 = FileInfo("dbfs:/mnt/test_mount/table2/_SUCCESS", "_SUCCESS", "", "") folder_table3 = FileInfo("dbfs:/mnt/test_mount/table2/1.snappy.parquet", "1.snappy.parquet", "", "") @@ -634,10 +634,10 @@ def my_side_effect(path, **_): def test_mount_listing_csv_json(): client = create_autospec(WorkspaceClient) - first_folder = FileInfo("dbfs:/mnt/test_mount/entity/", "entity/", "", "") - second_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/", "domain/", "", "") + first_folder = FileInfo("dbfs:/mnt/test_mount/entity/", "entity/", 0, "") + second_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/", "domain/", 0, "") second_folder_random_csv = FileInfo("dbfs:/mnt/test_mount/entity/domain/test.csv", "test.csv", "", "") - third_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/", "table1/", "", "") + third_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/", "table1/", 0, "") first_json = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/some_jsons.json", "some_jsons.json", "", "") second_json = FileInfo( "dbfs:/mnt/test_mount/entity/domain/table1/some_other_jsons.json", "some_other_jsons.json", "", "" @@ -647,9 +647,9 @@ def my_side_effect(path, **_): if path == "/mnt/test_mount": return [first_folder] if path == "dbfs:/mnt/test_mount/entity/": - return [second_folder, second_folder_random_csv] + return [second_folder] if path == "dbfs:/mnt/test_mount/entity/domain/": - return [third_folder] + return [third_folder, second_folder_random_csv] if path == "dbfs:/mnt/test_mount/entity/domain/table1/": return [first_json, second_json] return None @@ -675,10 +675,10 @@ def my_side_effect(path, **_): Table( "hive_metastore", "mounted_test_mount", - "entity", + "domain", "EXTERNAL", "CSV", - "adls://bucket/entity", + "adls://bucket/entity/domain", ), ] @@ -686,10 +686,10 @@ def my_side_effect(path, **_): def test_mount_listing_seen_tables(): client = create_autospec(WorkspaceClient) - first_folder = FileInfo("dbfs:/mnt/test_mount/table1/", "table1/", "", "") - folder_table1 = FileInfo("dbfs:/mnt/test_mount/table1/_delta_log/", "_delta_log/", "", "") - second_folder = FileInfo("dbfs:/mnt/test_mount/table2/", "table2/", "", "") - second_folder1 = FileInfo("dbfs:/mnt/test_mount/table2/_delta_log/", "_delta_log/", "", "") + first_folder = FileInfo("dbfs:/mnt/test_mount/table1/", "table1/", 0, "") + folder_table1 = FileInfo("dbfs:/mnt/test_mount/table1/_delta_log/", "_delta_log/", 0, "") + second_folder = FileInfo("dbfs:/mnt/test_mount/table2/", "table2/", 0, "") + second_folder1 = FileInfo("dbfs:/mnt/test_mount/table2/_delta_log/", "_delta_log/", 0, "") def my_side_effect(path, **_): if path == "/mnt/test_mount": @@ -734,3 +734,180 @@ def test_resolve_dbfs_root_in_hms_federation(): mounts = mounts_crawler.snapshot() assert [Mount("/", 's3://original/bucket/')] == mounts + + +def test_mount_listing_misplaced_flat_file(): + client = create_autospec(WorkspaceClient) + + first_folder = FileInfo("dbfs:/mnt/test_mount/entity/", "entity/", 0, "") + second_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/", "domain/", 0, "") + misplaced_csv = FileInfo("dbfs:/mnt/test_mount/entity/domain/test.csv", "test.csv", "", "") + third_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/", "table1/", 0, "") + first_json = FileInfo("dbfs:/mnt/test_mount/entity/domain/table1/some_jsons.json", "some_jsons.json", "", "") + second_json = FileInfo( + "dbfs:/mnt/test_mount/entity/domain/table1/some_other_jsons.json", "some_other_jsons.json", "", "" + ) + z_dir = FileInfo("dbfs:/mnt/test_mount/entity/domain/z_dir/", "z_dir", 0, "") + z_dir_json = FileInfo("dbfs:/mnt/test_mount/entity/domain/z_dir/some.json", "some.json", "", "") + + def my_side_effect(path, **_): + if path == "/mnt/test_mount": + return [first_folder] + if path == "dbfs:/mnt/test_mount/entity/": + return [second_folder] + if path == "dbfs:/mnt/test_mount/entity/domain/": + return [third_folder, misplaced_csv, z_dir] + if path == "dbfs:/mnt/test_mount/entity/domain/table1/": + return [first_json, second_json] + if path == "dbfs:/mnt/test_mount/entity/domain/z_dir/": + return [z_dir_json] + return None + + client.dbutils.fs.ls.side_effect = my_side_effect + backend = MockBackend( + rows={ + '`hive_metastore`.`test`.`tables`': [], + '`test`.`mounts`': MOUNT_STORAGE[("/mnt/test_mount", "adls://bucket/")], + } + ) + mounts = MountsCrawler(backend, client, "test") + results = TablesInMounts(backend, client, "test", mounts).snapshot(force_refresh=True) + assert results == [ + Table( + "hive_metastore", + "mounted_test_mount", + "table1", + "EXTERNAL", + "JSON", + "adls://bucket/entity/domain/table1", + ), + Table( + "hive_metastore", + "mounted_test_mount", + "domain", + "EXTERNAL", + "CSV", + "adls://bucket/entity/domain", + ), + Table( + "hive_metastore", + "mounted_test_mount", + "z_dir", + "EXTERNAL", + "JSON", + "adls://bucket/entity/domain/z_dir", + ), + ] + + +def test_mount_dont_list_partitions(): + client = create_autospec(WorkspaceClient) + + first_folder = FileInfo("dbfs:/mnt/test_mount/entity/", "entity/", 0, "") + first_first_partition = FileInfo("dbfs:/mnt/test_mount/entity/xxx=yyy/", "xxx=yyy/", 0, "") + first_first_partition_files = FileInfo("dbfs:/mnt/test_mount/entity/xxx=yyy/1.parquet", "1.parquet", "", "") + first_second_partition = FileInfo("dbfs:/mnt/test_mount/entity/xxx=zzz/", "xxx=zzz/", 0, "") + first_second_partition_files = FileInfo("dbfs:/mnt/test_mount/entity/xxx=zzz/1.parquet", "1.parquet", "", "") + misplaced_json = FileInfo("dbfs:/mnt/test_mount/entity/xxx=zzz/misplaced.json", "misplaced.json", "", "") + first_delta_log = FileInfo("dbfs:/mnt/test_mount/entity/_delta_log/", "_delta_log/", 0, "") + + def my_side_effect(path, **_): + if path == "/mnt/test_mount": + return [first_folder] + if path == "dbfs:/mnt/test_mount/entity/": + return [first_delta_log, first_first_partition, first_second_partition] + if path == "dbfs:/mnt/test_mount/entity/xxx=yyy/": + return [first_first_partition_files, misplaced_json] + if path == "dbfs:/mnt/test_mount/entity/xxx=zzz/": + return [first_second_partition_files] + return None + + client.dbutils.fs.ls.side_effect = my_side_effect + backend = MockBackend( + rows={ + '`hive_metastore`.`test`.`tables`': [], + '`test`.`mounts`': MOUNT_STORAGE[("/mnt/test_mount", "adls://bucket/")], + } + ) + mounts = MountsCrawler(backend, client, "test") + results = TablesInMounts(backend, client, "test", mounts).snapshot(force_refresh=True) + assert len(results) == 1 + assert results[0].table_format == "DELTA" + assert results[0].is_partitioned + + +def test_mount_invalid_partition_char(): + + client = create_autospec(WorkspaceClient) + + folder = FileInfo("dbfs:/mnt/test_mount/entity/", "entity/", 0, "") + partition = FileInfo("dbfs:/mnt/test_mount/entity/xxx=yyy/", "xxx=yyy/", 0, "") + file = FileInfo("dbfs:/mnt/test_mount/entity/xxx=yyy/document=2023-12-20", "document=2023-12-20", "", "") + + def my_side_effect(path, **_): + if path == "/mnt/test_mount": + return [folder] + if path == "dbfs:/mnt/test_mount/entity/": + return [partition] + if path == "dbfs:/mnt/test_mount/entity/xxx=yyy/": + return [file] + if path == "dbfs:/mnt/test_mount/entity/xxx=yyy/document=2023-12-20": + return [file] + return None + + client.dbutils.fs.ls.side_effect = my_side_effect + backend = MockBackend( + rows={ + '`hive_metastore`.`test`.`tables`': [], + '`test`.`mounts`': MOUNT_STORAGE[("/mnt/test_mount", "adls://bucket/")], + } + ) + + try: + mounts = MountsCrawler(backend, client, "test") + results = TablesInMounts(backend, client, "test", mounts).snapshot(force_refresh=True) + except RecursionError: + pytest.fail("Recursion depth exceeded, possible infinite loop.") + + assert len(results) == 0 + + +def test_mount_exclude_streaming_checkpoint(): + client = create_autospec(WorkspaceClient) + + first_folder = FileInfo("dbfs:/mnt/test_mount/entity/", "entity/", 0, "") + second_folder = FileInfo("dbfs:/mnt/test_mount/entity/domain/", "domain/", 0, "") + csv = FileInfo("dbfs:/mnt/test_mount/entity/domain/test.csv", "test.csv", "", "") + checkpoint_dir = FileInfo( + "dbfs:/mnt/test_mount/entity/domain/streaming_checkpoint/", "streaming_checkpoint/", 0, "" + ) + offsets = FileInfo("dbfs:/mnt/test_mount/entity/domain/streaming_checkpoint/offsets/", "offsets/", 0, "") + commit = FileInfo("dbfs:/mnt/test_mount/entity/domain/streaming_checkpoint/commits/", "commits/", 0, "") + state = FileInfo("dbfs:/mnt/test_mount/entity/domain/streaming_checkpoint/state/", "state/", 0, "") + metadata = FileInfo("dbfs:/mnt/test_mount/entity/domain/streaming_checkpoint/metadata", "metadata", "", "") + some_json = FileInfo("dbfs:/mnt/test_mount/entity/domain/streaming_checkpoint/offsets/a.json", "a.json", "", "") + + def my_side_effect(path, **_): + if path == "/mnt/test_mount": + return [first_folder] + if path == "dbfs:/mnt/test_mount/entity/": + return [second_folder] + if path == "dbfs:/mnt/test_mount/entity/domain/": + return [checkpoint_dir, csv] + if path == "dbfs:/mnt/test_mount/entity/domain/streaming_checkpoint/": + return [offsets, commit, state, metadata] + if path == "dbfs:/mnt/test_mount/entity/domain/streaming_checkpoint/offsets/": + return [some_json] + return None + + client.dbutils.fs.ls.side_effect = my_side_effect + backend = MockBackend( + rows={ + '`hive_metastore`.`test`.`tables`': [], + '`test`.`mounts`': MOUNT_STORAGE[("/mnt/test_mount", "adls://bucket/")], + } + ) + mounts = MountsCrawler(backend, client, "test") + results = TablesInMounts(backend, client, "test", mounts).snapshot(force_refresh=True) + assert len(results) == 1 + assert results[0].table_format == "CSV" From e8179a1ba2ebd581cbd9c9f054b636549d97d080 Mon Sep 17 00:00:00 2001 From: Eric Vergnaud Date: Thu, 24 Oct 2024 18:37:11 +0200 Subject: [PATCH 2/8] Fix dynamic import issue (#3053) ## Changes Our current implementation doesn't infer import names when calling `importlib.import_module(some_name)` This PR fixes that. ### Linked issues None ### Functionality None ### Tests - [x] added unit tests --------- Co-authored-by: Eric Vergnaud --- .../labs/ucx/source_code/linters/imports.py | 18 ++++++-- .../labs/ucx/source_code/python/python_ast.py | 42 ++++++++++++++++++- tests/integration/source_code/test_graph.py | 19 +++++++++ .../source_code/python/test_python_ast.py | 12 ++++++ .../unit/source_code/samples/import-module.py | 4 ++ 5 files changed, 90 insertions(+), 5 deletions(-) create mode 100644 tests/unit/source_code/samples/import-module.py diff --git a/src/databricks/labs/ucx/source_code/linters/imports.py b/src/databricks/labs/ucx/source_code/linters/imports.py index 906a24cd19..d44b61b70a 100644 --- a/src/databricks/labs/ucx/source_code/linters/imports.py +++ b/src/databricks/labs/ucx/source_code/linters/imports.py @@ -9,7 +9,6 @@ from astroid import ( # type: ignore Attribute, Call, - Const, InferenceError, Import, ImportFrom, @@ -71,9 +70,20 @@ def _make_sources_for_import_call_nodes( problems: list[T], ) -> Iterable[ImportSource]: for node in nodes: - arg = node.args[0] - if isinstance(arg, Const): - yield ImportSource(node, arg.value) + yield from cls._make_sources_for_import_call_node(node, problem_factory, problems) + + @classmethod + def _make_sources_for_import_call_node( + cls, + node: Call, + problem_factory: ProblemFactory, + problems: list[T], + ) -> Iterable[ImportSource]: + if not node.args: + return + for inferred in InferredValue.infer_from_node(node.args[0]): + if inferred.is_inferred(): + yield ImportSource(node, inferred.as_string()) continue problem = problem_factory( 'dependency-not-constant', "Can't check dependency not provided as a constant", node diff --git a/src/databricks/labs/ucx/source_code/python/python_ast.py b/src/databricks/labs/ucx/source_code/python/python_ast.py index be860d8fed..72c315e768 100644 --- a/src/databricks/labs/ucx/source_code/python/python_ast.py +++ b/src/databricks/labs/ucx/source_code/python/python_ast.py @@ -68,7 +68,7 @@ def first_statement(self) -> NodeNG | None: return self.tree.first_statement() -class Tree: +class Tree: # pylint: disable=too-many-public-methods @classmethod def maybe_parse(cls, code: str) -> MaybeTree: @@ -285,6 +285,11 @@ def has_global(self, name: str) -> bool: self_module: Module = cast(Module, self.node) return self_module.globals.get(name, None) is not None + def get_global(self, name: str) -> list[NodeNG]: + if not self.has_global(name): + return [] + return cast(Module, self.node).globals.get(name) + def nodes_between(self, first_line: int, last_line: int) -> list[NodeNG]: if not isinstance(self.node, Module): raise NotImplementedError(f"Can't extract nodes from {type(self.node).__name__}") @@ -486,6 +491,7 @@ def __init__(self, node_type: type, match_nodes: list[tuple[str, type]]): self._matched_nodes: list[NodeNG] = [] self._node_type = node_type self._match_nodes = match_nodes + self._imports: dict[str, list[NodeNG]] = {} @property def matched_nodes(self) -> list[NodeNG]: @@ -521,6 +527,7 @@ def _matches(self, node: NodeNG, depth: int) -> bool: if isinstance(node, Call): return self._matches(node.func, depth) name, match_node = self._match_nodes[depth] + node = self._adjust_node_for_import_member(name, match_node, node) if not isinstance(node, match_node): return False next_node: NodeNG | None = None @@ -538,6 +545,39 @@ def _matches(self, node: NodeNG, depth: int) -> bool: return len(self._match_nodes) - 1 == depth return self._matches(next_node, depth + 1) + def _adjust_node_for_import_member(self, name: str, match_node: type, node: NodeNG) -> NodeNG: + if isinstance(node, match_node): + return node + # if we're looking for an attribute, it might be a global name + if match_node != Attribute or not isinstance(node, Name) or node.name != name: + return node + # in which case it could be an import member + module = Tree(Tree(node).root) + if not module.has_global(node.name): + return node + for import_from in module.get_global(node.name): + if not isinstance(import_from, ImportFrom): + continue + parent = Name( + name=import_from.modname, + lineno=import_from.lineno, + col_offset=import_from.col_offset, + end_lineno=import_from.end_lineno, + end_col_offset=import_from.end_col_offset, + parent=import_from.parent, + ) + resolved = Attribute( + attrname=name, + lineno=import_from.lineno, + col_offset=import_from.col_offset, + end_lineno=import_from.end_lineno, + end_col_offset=import_from.end_col_offset, + parent=parent, + ) + resolved.postinit(parent) + return resolved + return node + class NodeBase(ABC): diff --git a/tests/integration/source_code/test_graph.py b/tests/integration/source_code/test_graph.py index 8fb279a286..b66e14a740 100644 --- a/tests/integration/source_code/test_graph.py +++ b/tests/integration/source_code/test_graph.py @@ -38,3 +38,22 @@ def module_compatibility(self, name: str) -> Compatibility: # visit the graph without a 'visited' set roots = graph.root_dependencies assert roots + + +def test_graph_imports_dynamic_import(): + allow_list = KnownList() + library_resolver = PythonLibraryResolver(allow_list) + notebook_resolver = NotebookResolver(NotebookLoader()) + import_resolver = ImportFileResolver(FileLoader(), allow_list) + path_lookup = PathLookup.from_sys_path(Path(__file__).parent) + dependency_resolver = DependencyResolver( + library_resolver, notebook_resolver, import_resolver, import_resolver, path_lookup + ) + root_path = Path(__file__).parent.parent.parent / "unit" / "source_code" / "samples" / "import-module.py" + assert root_path.is_file() + maybe = dependency_resolver.resolve_file(path_lookup, root_path) + assert maybe.dependency + graph = DependencyGraph(maybe.dependency, None, dependency_resolver, path_lookup, CurrentSessionState()) + container = maybe.dependency.load(path_lookup) + problems = container.build_dependency_graph(graph) + assert not problems diff --git a/tests/unit/source_code/python/test_python_ast.py b/tests/unit/source_code/python/test_python_ast.py index c23b76c4ef..e65abe4b8e 100644 --- a/tests/unit/source_code/python/test_python_ast.py +++ b/tests/unit/source_code/python/test_python_ast.py @@ -170,6 +170,18 @@ def test_is_from_module() -> None: assert Tree(save_call).is_from_module("spark") +def test_locates_member_import() -> None: + source = """ +from importlib import import_module +module = import_module("xyz") +""" + maybe_tree = Tree.maybe_normalized_parse(source) + assert maybe_tree.tree is not None, maybe_tree.failure + tree = maybe_tree.tree + import_calls = tree.locate(Call, [("import_module", Attribute), ("importlib", Name)]) + assert import_calls + + @pytest.mark.parametrize("source, name, class_name", [("a = 123", "a", "int")]) def test_is_instance_of(source, name, class_name) -> None: maybe_tree = Tree.maybe_normalized_parse(source) diff --git a/tests/unit/source_code/samples/import-module.py b/tests/unit/source_code/samples/import-module.py new file mode 100644 index 0000000000..6e05a636ce --- /dev/null +++ b/tests/unit/source_code/samples/import-module.py @@ -0,0 +1,4 @@ +import importlib + +module_name = "astroid" +module = importlib.import_module(module_name) From 58b1cbab24edad35d9d003e6e1fe331d8060f4c8 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Thu, 24 Oct 2024 18:38:30 +0200 Subject: [PATCH 3/8] Update permissions crawling so that it doesn't fail if a secret scope disappears during crawling (#3070) ## Changes This PR updates the secret scope support so that permissions crawling doesn't fail when a secret scope is listed but disappears before the ACLs can be retrieved. Instead of failing the task we now log a warning and complete normally. ### Functionality - modified existing workflow: `assessment` ### Tests - added unit tests - existing integration tests --- .../labs/ucx/workspace_access/manager.py | 1 + .../labs/ucx/workspace_access/secrets.py | 14 +++++--- tests/unit/workspace_access/test_secrets.py | 33 +++++++++++++++++++ 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/src/databricks/labs/ucx/workspace_access/manager.py b/src/databricks/labs/ucx/workspace_access/manager.py index 50eba51d95..8376c05bf1 100644 --- a/src/databricks/labs/ucx/workspace_access/manager.py +++ b/src/databricks/labs/ucx/workspace_access/manager.py @@ -31,6 +31,7 @@ def _crawl(self) -> Iterable[Permissions]: logger.debug("Crawling permissions") crawler_tasks = list(self._get_crawler_tasks()) logger.info(f"Starting to crawl permissions. Total tasks: {len(crawler_tasks)}") + # Note: tasks can return Permissions | None, but threads.gather() filters out None results. items, errors = Threads.gather("crawl permissions", crawler_tasks) acute_errors = [] for error in errors: diff --git a/src/databricks/labs/ucx/workspace_access/secrets.py b/src/databricks/labs/ucx/workspace_access/secrets.py index f99d2925b5..f405671af1 100644 --- a/src/databricks/labs/ucx/workspace_access/secrets.py +++ b/src/databricks/labs/ucx/workspace_access/secrets.py @@ -6,6 +6,7 @@ from databricks.labs.blueprint.limiter import rate_limited from databricks.sdk import WorkspaceClient +from databricks.sdk.errors import NotFound from databricks.sdk.retries import retried from databricks.sdk.service import workspace from databricks.sdk.service.workspace import AclItem @@ -31,14 +32,19 @@ def __init__( self._verify_timeout = verify_timeout self._include_object_permissions = include_object_permissions - def get_crawler_tasks(self): - def _crawler_task(scope: workspace.SecretScope): + def get_crawler_tasks(self) -> Iterable[Callable[[], Permissions | None]]: + def _crawler_task(scope: workspace.SecretScope) -> Permissions | None: assert scope.name is not None - acl_items = self._ws.secrets.list_acls(scope.name) + try: + acl_items = self._ws.secrets.list_acls(scope.name) + acl_items_raw = [item.as_dict() for item in acl_items] + except NotFound: + logger.warning(f"Secret scope disappeared, cannot assess: {scope.name}") + return None return Permissions( object_id=scope.name, object_type="secrets", - raw=json.dumps([item.as_dict() for item in acl_items]), + raw=json.dumps(acl_items_raw), ) if self._include_object_permissions: diff --git a/tests/unit/workspace_access/test_secrets.py b/tests/unit/workspace_access/test_secrets.py index 6afe28d322..a0d731bf9e 100644 --- a/tests/unit/workspace_access/test_secrets.py +++ b/tests/unit/workspace_access/test_secrets.py @@ -1,10 +1,13 @@ import json +import logging +from collections.abc import Iterator from datetime import timedelta from unittest.mock import call, create_autospec import pytest from databricks.sdk import WorkspaceClient from databricks.sdk.service import workspace +from databricks.sdk.errors import ResourceDoesNotExist from databricks.labs.ucx.workspace_access.groups import MigrationState from databricks.labs.ucx.workspace_access.secrets import ( @@ -42,6 +45,36 @@ def test_secret_scopes_crawler(): assert item.raw == '[{"permission": "MANAGE", "principal": "test"}]' +def test_secret_scopes_disappearing_during_crawl(ws, caplog) -> None: + """Verify that when crawling secret scopes we continue instead of failing when a secret scope disappears.""" + + ws.secrets.list_scopes.return_value = [ + workspace.SecretScope(name="will_remain"), + workspace.SecretScope(name="will_disappear"), + ] + + def mock_list_acls(scope: str) -> Iterator[workspace.AclItem]: + if scope == "will_disappear": + raise ResourceDoesNotExist("Simulated disappearance") + yield workspace.AclItem(principal="a_principal", permission=workspace.AclPermission.MANAGE) + + ws.secrets.list_acls = mock_list_acls + + sup = SecretScopesSupport(ws=ws) + + tasks = list(sup.get_crawler_tasks()) + with caplog.at_level(logging.WARNING): + task_results = [task() for task in tasks] + + assert task_results == [ + Permissions( + object_id="will_remain", object_type="secrets", raw='[{"permission": "MANAGE", "principal": "a_principal"}]' + ), + None, + ] + assert "Secret scope disappeared, cannot assess: will_disappear" in caplog.messages + + def test_secret_scopes_crawler_include(): ws = create_autospec(WorkspaceClient) ws.secrets.list_acls.return_value = [ From 8e6c33721314bb43e285feb079899fbde2c920bc Mon Sep 17 00:00:00 2001 From: Cor Date: Thu, 24 Oct 2024 18:38:50 +0200 Subject: [PATCH 4/8] Add method to `HistoryLog` for encoding `Historical` (#3065) ## Changes Add method to `HistoryLog` for encoding `Historical`. Some pre-work for the remainder of the issues in 3064. ### Linked issue Progresses #3064 --- src/databricks/labs/ucx/progress/history.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index 1dd5efa68a..597bd657b5 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -2,11 +2,11 @@ import dataclasses import datetime as dt import typing -from enum import Enum, EnumMeta import json import logging +from enum import Enum, EnumMeta from collections.abc import Iterable, Sequence -from typing import ClassVar, Protocol, TypeVar, Generic, Any, get_type_hints +from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints, final from databricks.labs.lsql.backends import SqlBackend @@ -14,6 +14,7 @@ from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.progress.install import Historical + logger = logging.getLogger(__name__) @@ -275,8 +276,13 @@ def __init__( def full_name(self) -> str: return f"{self._catalog}.{self._schema}.{self._table}" + @final def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None: - history_records = [self._encoder.to_historical(record) for record in snapshot] + history_records = [self._encode_record_as_historical(record) for record in snapshot] logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.") # This is the only writer, and the mode is 'append'. This is documented as conflict-free. self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") + + def _encode_record_as_historical(self, record: Record) -> Historical: + """Encode a snapshot record as a historical log entry.""" + return self._encoder.to_historical(record) From 936a5d6e51355b4eb0832567939e498b0b27c843 Mon Sep 17 00:00:00 2001 From: Serge Smertin <259697+nfx@users.noreply.github.com> Date: Thu, 24 Oct 2024 18:54:45 +0200 Subject: [PATCH 5/8] Consistently use `sql_backend` and `tables_crawler` everywhere for properties and constructor argument names (#3071) --- src/databricks/labs/ucx/assessment/azure.py | 4 +- .../labs/ucx/assessment/clusters.py | 8 +- .../labs/ucx/assessment/init_scripts.py | 4 +- src/databricks/labs/ucx/assessment/jobs.py | 8 +- .../labs/ucx/assessment/pipelines.py | 4 +- src/databricks/labs/ucx/framework/crawlers.py | 12 +-- .../labs/ucx/hive_metastore/grants.py | 30 +++---- .../labs/ucx/hive_metastore/locations.py | 12 +-- .../labs/ucx/hive_metastore/table_migrate.py | 80 ++++++++++--------- .../hive_metastore/table_migration_status.py | 10 +-- .../labs/ucx/hive_metastore/table_move.py | 8 +- .../labs/ucx/hive_metastore/table_size.py | 4 +- .../labs/ucx/hive_metastore/tables.py | 12 +-- .../labs/ucx/hive_metastore/udfs.py | 6 +- .../labs/ucx/recon/migration_recon.py | 4 +- .../labs/ucx/source_code/directfs_access.py | 8 +- .../labs/ucx/source_code/used_table.py | 6 +- .../labs/ucx/workspace_access/generic.py | 2 +- .../labs/ucx/workspace_access/groups.py | 2 +- .../labs/ucx/workspace_access/manager.py | 4 +- tests/integration/assessment/test_clusters.py | 8 +- tests/integration/assessment/test_jobs.py | 4 +- .../integration/assessment/test_pipelines.py | 4 +- tests/unit/framework/test_crawlers.py | 4 +- 24 files changed, 129 insertions(+), 119 deletions(-) diff --git a/src/databricks/labs/ucx/assessment/azure.py b/src/databricks/labs/ucx/assessment/azure.py index 68233d958c..77c7bd8a81 100644 --- a/src/databricks/labs/ucx/assessment/azure.py +++ b/src/databricks/labs/ucx/assessment/azure.py @@ -41,8 +41,8 @@ class ServicePrincipalClusterMapping: class AzureServicePrincipalCrawler(CrawlerBase[AzureServicePrincipalInfo], JobsMixin, SecretsMixin): - def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): - super().__init__(sbe, "hive_metastore", schema, "azure_service_principals", AzureServicePrincipalInfo) + def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema): + super().__init__(sql_backend, "hive_metastore", schema, "azure_service_principals", AzureServicePrincipalInfo) self._ws = ws def _try_fetch(self) -> Iterable[AzureServicePrincipalInfo]: diff --git a/src/databricks/labs/ucx/assessment/clusters.py b/src/databricks/labs/ucx/assessment/clusters.py index e31284a1bb..0d605deeb7 100644 --- a/src/databricks/labs/ucx/assessment/clusters.py +++ b/src/databricks/labs/ucx/assessment/clusters.py @@ -147,8 +147,8 @@ def _check_cluster_failures(self, cluster: ClusterDetails, source: str) -> list[ class ClustersCrawler(CrawlerBase[ClusterInfo], CheckClusterMixin): - def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema: str): - super().__init__(sbe, "hive_metastore", schema, "clusters", ClusterInfo) + def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema: str): + super().__init__(sql_backend, "hive_metastore", schema, "clusters", ClusterInfo) self._ws = ws def _crawl(self) -> Iterable[ClusterInfo]: @@ -210,8 +210,8 @@ class PolicyInfo: class PoliciesCrawler(CrawlerBase[PolicyInfo], CheckClusterMixin): - def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): - super().__init__(sbe, "hive_metastore", schema, "policies", PolicyInfo) + def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema): + super().__init__(sql_backend, "hive_metastore", schema, "policies", PolicyInfo) self._ws = ws def _crawl(self) -> Iterable[PolicyInfo]: diff --git a/src/databricks/labs/ucx/assessment/init_scripts.py b/src/databricks/labs/ucx/assessment/init_scripts.py index 909015b678..20e1839c6c 100644 --- a/src/databricks/labs/ucx/assessment/init_scripts.py +++ b/src/databricks/labs/ucx/assessment/init_scripts.py @@ -41,8 +41,8 @@ def check_init_script(self, init_script_data: str | None, source: str) -> list[s class GlobalInitScriptCrawler(CrawlerBase[GlobalInitScriptInfo], CheckInitScriptMixin): - def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): - super().__init__(sbe, "hive_metastore", schema, "global_init_scripts", GlobalInitScriptInfo) + def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema): + super().__init__(sql_backend, "hive_metastore", schema, "global_init_scripts", GlobalInitScriptInfo) self._ws = ws def _crawl(self) -> Iterable[GlobalInitScriptInfo]: diff --git a/src/databricks/labs/ucx/assessment/jobs.py b/src/databricks/labs/ucx/assessment/jobs.py index 924b33bd9f..1f87b26770 100644 --- a/src/databricks/labs/ucx/assessment/jobs.py +++ b/src/databricks/labs/ucx/assessment/jobs.py @@ -82,8 +82,8 @@ def _job_clusters(job: BaseJob) -> Iterable[tuple[BaseJob, ClusterSpec]]: class JobsCrawler(CrawlerBase[JobInfo], JobsMixin, CheckClusterMixin): - def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): - super().__init__(sbe, "hive_metastore", schema, "jobs", JobInfo) + def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema): + super().__init__(sql_backend, "hive_metastore", schema, "jobs", JobInfo) self._ws = ws def _crawl(self) -> Iterable[JobInfo]: @@ -180,8 +180,8 @@ class SubmitRunsCrawler(CrawlerBase[SubmitRunInfo], JobsMixin, CheckClusterMixin "fs.adl", ] - def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema: str, num_days_history: int): - super().__init__(sbe, "hive_metastore", schema, "submit_runs", SubmitRunInfo) + def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema: str, num_days_history: int): + super().__init__(sql_backend, "hive_metastore", schema, "submit_runs", SubmitRunInfo) self._ws = ws self._num_days_history = num_days_history diff --git a/src/databricks/labs/ucx/assessment/pipelines.py b/src/databricks/labs/ucx/assessment/pipelines.py index 40163b7dff..84a591040b 100644 --- a/src/databricks/labs/ucx/assessment/pipelines.py +++ b/src/databricks/labs/ucx/assessment/pipelines.py @@ -29,8 +29,8 @@ class PipelineInfo: class PipelinesCrawler(CrawlerBase[PipelineInfo], CheckClusterMixin): - def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): - super().__init__(sbe, "hive_metastore", schema, "pipelines", PipelineInfo) + def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema): + super().__init__(sql_backend, "hive_metastore", schema, "pipelines", PipelineInfo) self._ws = ws def _crawl(self) -> Iterable[PipelineInfo]: diff --git a/src/databricks/labs/ucx/framework/crawlers.py b/src/databricks/labs/ucx/framework/crawlers.py index 034e738df1..a5f41fa88a 100644 --- a/src/databricks/labs/ucx/framework/crawlers.py +++ b/src/databricks/labs/ucx/framework/crawlers.py @@ -21,12 +21,12 @@ class DataclassInstance(Protocol): class CrawlerBase(ABC, Generic[Result]): - def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]) -> None: + def __init__(self, sql_backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]) -> None: """ Initializes a CrawlerBase instance. Args: - backend (SqlBackend): The backend that executes SQL queries: + sql_backend (SqlBackend): The backend that executes SQL queries: Statement Execution API or Databricks Runtime. catalog (str): The catalog name for the inventory persistence. schema: The schema name for the inventory persistence. @@ -35,9 +35,9 @@ def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, k self._catalog = self._valid(catalog) self._schema = self._valid(schema) self._table = self._valid(table) - self._backend = backend - self._fetch = backend.fetch - self._exec = backend.execute + self._sql_backend = sql_backend + self._fetch = sql_backend.fetch + self._exec = sql_backend.execute self._klass = klass @property @@ -161,4 +161,4 @@ def _snapshot(self, fetcher: ResultFn, loader: ResultFn, *, force_refresh: bool) def _update_snapshot(self, items: Sequence[Result], *, mode: Literal["append", "overwrite"]) -> None: logger.debug(f"[{self.full_name}] found {len(items)} new records for {self._table}") - self._backend.save_table(self.full_name, items, self._klass, mode=mode) + self._sql_backend.save_table(self.full_name, items, self._klass, mode=mode) diff --git a/src/databricks/labs/ucx/hive_metastore/grants.py b/src/databricks/labs/ucx/hive_metastore/grants.py index de9106ce40..10b686f833 100644 --- a/src/databricks/labs/ucx/hive_metastore/grants.py +++ b/src/databricks/labs/ucx/hive_metastore/grants.py @@ -215,12 +215,12 @@ def uc_grant_sql(self, object_type: str | None = None, object_key: str | None = class GrantsCrawler(CrawlerBase[Grant]): """Crawler that captures access controls that relate to data and other securable objects.""" - def __init__(self, tc: TablesCrawler, udf: UdfsCrawler, include_databases: list[str] | None = None): - assert tc._backend == udf._backend - assert tc._catalog == udf._catalog - assert tc._schema == udf._schema - super().__init__(tc._backend, tc._catalog, tc._schema, "grants", Grant) - self._tc = tc + def __init__(self, tables_crawler: TablesCrawler, udf: UdfsCrawler, include_databases: list[str] | None = None): + assert tables_crawler._sql_backend == udf._sql_backend + assert tables_crawler._catalog == udf._catalog + assert tables_crawler._schema == udf._schema + super().__init__(tables_crawler._sql_backend, tables_crawler._catalog, tables_crawler._schema, "grants", Grant) + self._tables_crawler = tables_crawler self._udf = udf self._include_databases = include_databases @@ -275,7 +275,7 @@ def _crawl(self) -> Iterable[Grant]: else: for database in self._include_databases: tasks.append(partial(self.grants, catalog=catalog, database=database)) - for table in self._tc.snapshot(): + for table in self._tables_crawler.snapshot(): fn = partial(self.grants, catalog=catalog, database=table.database) # Views are treated as a type of table and enumerated by the table crawler. if table.view_text is None: @@ -412,10 +412,10 @@ class AwsACL: def __init__( self, ws: WorkspaceClient, - backend: SqlBackend, + sql_backend: SqlBackend, installation: Installation, ): - self._backend = backend + self._sql_backend = sql_backend self._ws = ws self._installation = installation @@ -521,12 +521,12 @@ class AzureACL: def __init__( self, ws: WorkspaceClient, - backend: SqlBackend, + sql_backend: SqlBackend, spn_crawler: AzureServicePrincipalCrawler, installation: Installation, ): - self._backend = backend self._ws = ws + self._sql_backend = sql_backend self._spn_crawler = spn_crawler self._installation = installation @@ -604,13 +604,13 @@ class PrincipalACL: def __init__( self, ws: WorkspaceClient, - backend: SqlBackend, + sql_backend: SqlBackend, installation: Installation, tables_crawler: TablesCrawler, external_locations: ExternalLocations, cluster_locations: Callable[[], list[ComputeLocations]], ): - self._backend = backend + self._sql_backend = sql_backend self._ws = ws self._installation = installation self._tables_crawler = tables_crawler @@ -836,14 +836,14 @@ def __init__( migration_status_refresher: TableMigrationStatusRefresher, migrate_grants: MigrateGrants, ): - self._table_crawler = tables_crawler + self._tables_crawler = tables_crawler self._workspace_info = workspace_info self._migration_status_refresher = migration_status_refresher self._migrate_grants = migrate_grants def migrate_acls(self, *, target_catalog: str | None = None, hms_fed: bool = False) -> None: workspace_name = self._workspace_info.current() - tables = list(self._table_crawler.snapshot()) + tables = list(self._tables_crawler.snapshot()) if not tables: logger.info("No tables found to acl") return diff --git a/src/databricks/labs/ucx/hive_metastore/locations.py b/src/databricks/labs/ucx/hive_metastore/locations.py index e948442463..73fc15d6bd 100644 --- a/src/databricks/labs/ucx/hive_metastore/locations.py +++ b/src/databricks/labs/ucx/hive_metastore/locations.py @@ -150,12 +150,12 @@ class ExternalLocations(CrawlerBase[ExternalLocation]): def __init__( self, ws: WorkspaceClient, - sbe: SqlBackend, + sql_backend: SqlBackend, schema: str, tables_crawler: TablesCrawler, mounts_crawler: 'MountsCrawler', ): - super().__init__(sbe, "hive_metastore", schema, "external_locations", ExternalLocation) + super().__init__(sql_backend, "hive_metastore", schema, "external_locations", ExternalLocation) self._ws = ws self._tables_crawler = tables_crawler self._mounts_crawler = mounts_crawler @@ -344,12 +344,12 @@ def as_scheme_prefix(self) -> str: class MountsCrawler(CrawlerBase[Mount]): def __init__( self, - backend: SqlBackend, + sql_backend: SqlBackend, ws: WorkspaceClient, inventory_database: str, enable_hms_federation: bool = False, ): - super().__init__(backend, "hive_metastore", inventory_database, "mounts", Mount) + super().__init__(sql_backend, "hive_metastore", inventory_database, "mounts", Mount) self._dbutils = ws.dbutils self._enable_hms_federation = enable_hms_federation @@ -445,7 +445,7 @@ class TablesInMounts(CrawlerBase[Table]): def __init__( self, - backend: SqlBackend, + sql_backend: SqlBackend, ws: WorkspaceClient, inventory_database: str, mounts_crawler: MountsCrawler, @@ -453,7 +453,7 @@ def __init__( exclude_paths_in_mount: list[str] | None = None, include_paths_in_mount: list[str] | None = None, ): - super().__init__(backend, "hive_metastore", inventory_database, "tables", Table) + super().__init__(sql_backend, "hive_metastore", inventory_database, "tables", Table) self._dbutils = ws.dbutils self._mounts_crawler = mounts_crawler self._include_mounts = include_mounts diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 5173ec70f1..51c369a692 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -37,19 +37,19 @@ class TablesMigrator: def __init__( self, - table_crawler: TablesCrawler, + tables_crawler: TablesCrawler, ws: WorkspaceClient, - backend: SqlBackend, + sql_backend: SqlBackend, table_mapping: TableMapping, migration_status_refresher: TableMigrationStatusRefresher, migrate_grants: MigrateGrants, external_locations: ExternalLocations, ): - self._tc = table_crawler - self._backend = backend + self._tables_crawler = tables_crawler + self._sql_backend = sql_backend self._ws = ws - self._tm = table_mapping + self._table_mapping = table_mapping self._migration_status_refresher = migration_status_refresher self._seen_tables: dict[str, str] = {} self._migrate_grants = migrate_grants @@ -58,7 +58,7 @@ def __init__( def get_remaining_tables(self) -> list[Table]: self.index(force_refresh=True) table_rows = [] - for crawled_table in self._tc.snapshot(): + for crawled_table in self._tables_crawler.snapshot(): if not self._is_migrated(crawled_table.database, crawled_table.name): table_rows.append(crawled_table) logger.warning(f"remained-hive-metastore-table: {crawled_table.key}") @@ -93,7 +93,7 @@ def _migrate_tables( managed_table_external_storage: str, hiveserde_in_place_migrate: bool = False, ): - tables_to_migrate = self._tm.get_tables_to_migrate(self._tc) + tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler) tables_in_scope = filter(lambda t: t.src.what == what, tables_to_migrate) tasks = [] for table in tables_in_scope: @@ -112,7 +112,7 @@ def _migrate_tables( return tasks def _migrate_views(self): - tables_to_migrate = self._tm.get_tables_to_migrate(self._tc) + tables_to_migrate = self._table_mapping.get_tables_to_migrate(self._tables_crawler) all_tasks = [] # Every batch of views to migrate needs an up-to-date table migration index # to determine if the dependencies have been migrated @@ -225,9 +225,9 @@ def _migrate_view_table(self, src_view: ViewToMigrate): view_migrate_sql = self._sql_migrate_view(src_view) logger.debug(f"Migrating view {src_view.src.key} to using SQL query: {view_migrate_sql}") try: - self._backend.execute(view_migrate_sql) - self._backend.execute(self._sql_alter_to(src_view.src, src_view.rule.as_uc_table_key)) - self._backend.execute( + self._sql_backend.execute(view_migrate_sql) + self._sql_backend.execute(self._sql_alter_to(src_view.src, src_view.rule.as_uc_table_key)) + self._sql_backend.execute( self._sql_alter_from(src_view.src, src_view.rule.as_uc_table_key, self._ws.get_workspace_id()) ) except DatabricksError as e: @@ -238,7 +238,7 @@ def _migrate_view_table(self, src_view: ViewToMigrate): def _sql_migrate_view(self, src_view: ViewToMigrate) -> str: # We have to fetch create statement this way because of columns in: # CREATE VIEW x.y (col1, col2) AS SELECT * FROM w.t - create_statement = self._backend.fetch(f"SHOW CREATE TABLE {src_view.src.safe_sql_key}") + create_statement = self._sql_backend.fetch(f"SHOW CREATE TABLE {src_view.src.safe_sql_key}") src_view.src.view_text = next(iter(create_statement))["createtab_stmt"] # this does not require the index to be refreshed because the dependencies have already been validated return src_view.sql_migrate_view(self.index()) @@ -301,14 +301,14 @@ def _migrate_managed_as_external_table(self, src_table: Table, rule: Rule): table_migrate_sql = src_table.sql_migrate_as_external(target_table_key) logger.debug(f"Migrating external table {src_table.key} to using SQL query: {table_migrate_sql}") # have to wrap the fetch result with iter() for now, because StatementExecutionBackend returns iterator but RuntimeBackend returns list. - sync_result = next(iter(self._backend.fetch(table_migrate_sql))) + sync_result = next(iter(self._sql_backend.fetch(table_migrate_sql))) if sync_result.status_code != "SUCCESS": logger.warning( f"failed-to-migrate: SYNC command failed to migrate table {src_table.key} to {target_table_key}. " f"Status code: {sync_result.status_code}. Description: {sync_result.description}" ) return False - self._backend.execute(self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id())) + self._sql_backend.execute(self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id())) return self._migrate_grants.apply(src_table, rule.as_uc_table) def _migrate_external_table(self, src_table: Table, rule: Rule): @@ -316,19 +316,19 @@ def _migrate_external_table(self, src_table: Table, rule: Rule): table_migrate_sql = src_table.sql_migrate_external(target_table_key) logger.debug(f"Migrating external table {src_table.key} to using SQL query: {table_migrate_sql}") # have to wrap the fetch result with iter() for now, because StatementExecutionBackend returns iterator but RuntimeBackend returns list. - sync_result = next(iter(self._backend.fetch(table_migrate_sql))) + sync_result = next(iter(self._sql_backend.fetch(table_migrate_sql))) if sync_result.status_code != "SUCCESS": logger.warning( f"failed-to-migrate: SYNC command failed to migrate table {src_table.key} to {target_table_key}. " f"Status code: {sync_result.status_code}. Description: {sync_result.description}" ) return False - self._backend.execute(self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id())) + self._sql_backend.execute(self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id())) return self._migrate_grants.apply(src_table, rule.as_uc_table) def _migrate_external_table_hiveserde_in_place(self, src_table: Table, rule: Rule): # verify hive serde type - hiveserde_type = src_table.hiveserde_type(self._backend) + hiveserde_type = src_table.hiveserde_type(self._sql_backend) if hiveserde_type in [ HiveSerdeType.NOT_HIVESERDE, HiveSerdeType.OTHER_HIVESERDE, @@ -343,7 +343,7 @@ def _migrate_external_table_hiveserde_in_place(self, src_table: Table, rule: Rul dst_table_location = self._external_locations.resolve_mount(src_table.location) table_migrate_sql = src_table.sql_migrate_external_hiveserde_in_place( - rule.catalog_name, rule.dst_schema, rule.dst_table, self._backend, hiveserde_type, dst_table_location + rule.catalog_name, rule.dst_schema, rule.dst_table, self._sql_backend, hiveserde_type, dst_table_location ) if not table_migrate_sql: logger.error( @@ -355,10 +355,12 @@ def _migrate_external_table_hiveserde_in_place(self, src_table: Table, rule: Rul f"Migrating external table {src_table.key} to {rule.as_uc_table_key} using SQL query: {table_migrate_sql}" ) try: - self._backend.execute(table_migrate_sql) - self._backend.execute(self._sql_alter_to(src_table, rule.as_uc_table_key)) - self._backend.execute(self._sql_add_migrated_comment(src_table, rule.as_uc_table_key)) - self._backend.execute(self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id())) + self._sql_backend.execute(table_migrate_sql) + self._sql_backend.execute(self._sql_alter_to(src_table, rule.as_uc_table_key)) + self._sql_backend.execute(self._sql_add_migrated_comment(src_table, rule.as_uc_table_key)) + self._sql_backend.execute( + self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id()) + ) except DatabricksError as e: logger.warning(f"failed-to-migrate: Failed to migrate table {src_table.key} to {rule.as_uc_table_key}: {e}") return False @@ -371,10 +373,12 @@ def _migrate_dbfs_root_table(self, src_table: Table, rule: Rule): f"Migrating managed table {src_table.key} to {rule.as_uc_table_key} using SQL query: {table_migrate_sql}" ) try: - self._backend.execute(table_migrate_sql) - self._backend.execute(self._sql_alter_to(src_table, rule.as_uc_table_key)) - self._backend.execute(self._sql_add_migrated_comment(src_table, rule.as_uc_table_key)) - self._backend.execute(self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id())) + self._sql_backend.execute(table_migrate_sql) + self._sql_backend.execute(self._sql_alter_to(src_table, rule.as_uc_table_key)) + self._sql_backend.execute(self._sql_add_migrated_comment(src_table, rule.as_uc_table_key)) + self._sql_backend.execute( + self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id()) + ) except DatabricksError as e: logger.warning(f"failed-to-migrate: Failed to migrate table {src_table.key} to {rule.as_uc_table_key}: {e}") return False @@ -395,10 +399,12 @@ def _migrate_table_create_ctas(self, src_table: Table, rule: Rule): table_migrate_sql = src_table.sql_migrate_ctas_external(rule.as_uc_table_key, dst_table_location) logger.debug(f"Migrating table {src_table.key} to {rule.as_uc_table_key} using SQL query: {table_migrate_sql}") try: - self._backend.execute(table_migrate_sql) - self._backend.execute(self._sql_alter_to(src_table, rule.as_uc_table_key)) - self._backend.execute(self._sql_add_migrated_comment(src_table, rule.as_uc_table_key)) - self._backend.execute(self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id())) + self._sql_backend.execute(table_migrate_sql) + self._sql_backend.execute(self._sql_alter_to(src_table, rule.as_uc_table_key)) + self._sql_backend.execute(self._sql_add_migrated_comment(src_table, rule.as_uc_table_key)) + self._sql_backend.execute( + self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id()) + ) except DatabricksError as e: logger.warning(f"failed-to-migrate: Failed to migrate table {src_table.key} to {rule.as_uc_table_key}: {e}") return False @@ -407,13 +413,15 @@ def _migrate_table_create_ctas(self, src_table: Table, rule: Rule): def _migrate_table_in_mount(self, src_table: Table, rule: Rule): target_table_key = rule.as_uc_table_key try: - table_schema = self._backend.fetch(f"DESCRIBE TABLE delta.`{src_table.location}`;") + table_schema = self._sql_backend.fetch(f"DESCRIBE TABLE delta.`{src_table.location}`;") table_migrate_sql = src_table.sql_migrate_table_in_mount(target_table_key, table_schema) logger.info( f"Migrating table in mount {src_table.location} to UC table {rule.as_uc_table_key} using SQL query: {table_migrate_sql}" ) - self._backend.execute(table_migrate_sql) - self._backend.execute(self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id())) + self._sql_backend.execute(table_migrate_sql) + self._sql_backend.execute( + self._sql_alter_from(src_table, rule.as_uc_table_key, self._ws.get_workspace_id()) + ) except DatabricksError as e: logger.warning(f"failed-to-migrate: Failed to migrate table {src_table.key} to {rule.as_uc_table_key}: {e}") return False @@ -429,7 +437,7 @@ def _get_tables_to_revert(self, schema: str | None = None, table: str | None = N migrated_tables = [] if table and not schema: logger.error("Cannot accept 'Table' parameter without 'Schema' parameter") - for cur_table in self._tc.snapshot(): + for cur_table in self._tables_crawler.snapshot(): if schema and cur_table.database != schema: continue if table and cur_table.name != table: @@ -462,8 +470,8 @@ def _revert_migrated_table(self, table: Table, target_table_key: str): f"Reverting {table.object_type} table {table.database}.{table.name} upgraded_to {table.upgraded_to}" ) try: - self._backend.execute(table.sql_unset_upgraded_to()) - self._backend.execute(f"DROP {table.kind} IF EXISTS {escape_sql_identifier(target_table_key)}") + self._sql_backend.execute(table.sql_unset_upgraded_to()) + self._sql_backend.execute(f"DROP {table.kind} IF EXISTS {escape_sql_identifier(target_table_key)}") except DatabricksError as e: logger.warning(f"Failed to revert table {table.key}: {e}") diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 1a15be28f0..9f697f9f32 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -80,10 +80,10 @@ class TableMigrationStatusRefresher(CrawlerBase[TableMigrationStatus]): properties for the presence of the marker. """ - def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema, table_crawler: TablesCrawler): - super().__init__(sbe, "hive_metastore", schema, "migration_status", TableMigrationStatus) + def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_crawler: TablesCrawler): + super().__init__(sql_backend, "hive_metastore", schema, "migration_status", TableMigrationStatus) self._ws = ws - self._table_crawler = table_crawler + self._tables_crawler = tables_crawler def index(self, *, force_refresh: bool = False) -> TableMigrationIndex: return TableMigrationIndex(list(self.snapshot(force_refresh=force_refresh))) @@ -112,7 +112,7 @@ def get_seen_tables(self) -> dict[str, str]: def is_migrated(self, schema: str, table: str) -> bool: try: - results = self._backend.fetch( + results = self._sql_backend.fetch( f"SHOW TBLPROPERTIES {escape_sql_identifier(schema + '.' + table)} ('upgraded_to')" ) for result in results: @@ -129,7 +129,7 @@ def is_migrated(self, schema: str, table: str) -> bool: return False def _crawl(self) -> Iterable[TableMigrationStatus]: - all_tables = self._table_crawler.snapshot() + all_tables = self._tables_crawler.snapshot() reverse_seen = {v: k for k, v in self.get_seen_tables().items()} timestamp = datetime.datetime.now(datetime.timezone.utc).timestamp() for table in all_tables: diff --git a/src/databricks/labs/ucx/hive_metastore/table_move.py b/src/databricks/labs/ucx/hive_metastore/table_move.py index b36ae03cb2..affb5ed7d7 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_move.py +++ b/src/databricks/labs/ucx/hive_metastore/table_move.py @@ -16,10 +16,10 @@ class TableMove: - def __init__(self, ws: WorkspaceClient, backend: SqlBackend): - self._backend = backend - self._fetch = backend.fetch - self._execute = backend.execute + def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend): + self._sql_backend = sql_backend + self._fetch = sql_backend.fetch + self._execute = sql_backend.execute self._ws = ws def move( diff --git a/src/databricks/labs/ucx/hive_metastore/table_size.py b/src/databricks/labs/ucx/hive_metastore/table_size.py index 243c4e3418..62bfb06c58 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_size.py +++ b/src/databricks/labs/ucx/hive_metastore/table_size.py @@ -34,7 +34,7 @@ def __init__(self, tables_crawler: TablesCrawler | FasterTableScanCrawler) -> No from pyspark.sql.session import SparkSession # type: ignore[import-not-found] super().__init__( - tables_crawler._backend, + tables_crawler._sql_backend, "hive_metastore", tables_crawler._schema, "table_size", @@ -65,7 +65,7 @@ def _safe_get_table_size(self, table: Table) -> TableSize | None: logger.debug(f"Evaluating {table.key} table size.") try: # refresh table statistics to avoid stale stats in HMS - self._backend.execute(f"ANALYZE table {table.safe_sql_key} compute STATISTICS NOSCAN") + self._sql_backend.execute(f"ANALYZE table {table.safe_sql_key} compute STATISTICS NOSCAN") jvm_df = self._spark._jsparkSession.table(table.safe_sql_key) # pylint: disable=protected-access size_in_bytes = jvm_df.queryExecution().analyzed().stats().sizeInBytes() return TableSize( diff --git a/src/databricks/labs/ucx/hive_metastore/tables.py b/src/databricks/labs/ucx/hive_metastore/tables.py index 8f4adacde5..7a1f0e90a9 100644 --- a/src/databricks/labs/ucx/hive_metastore/tables.py +++ b/src/databricks/labs/ucx/hive_metastore/tables.py @@ -364,15 +364,15 @@ class MigrationCount: class TablesCrawler(CrawlerBase[Table]): - def __init__(self, backend: SqlBackend, schema, include_databases: list[str] | None = None): + def __init__(self, sql_backend: SqlBackend, schema, include_databases: list[str] | None = None): """ Initializes a TablesCrawler instance. Args: - backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark) + sql_backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark) schema: The schema name for the inventory persistence. """ - super().__init__(backend, "hive_metastore", schema, "tables", Table) + super().__init__(sql_backend, "hive_metastore", schema, "tables", Table) self._include_database = include_databases def _all_databases(self) -> list[str]: @@ -516,14 +516,14 @@ class FasterTableScanCrawler(TablesCrawler): Databricks workspace. """ - def __init__(self, backend: SqlBackend, schema, include_databases: list[str] | None = None): - self._backend = backend + def __init__(self, sql_backend: SqlBackend, schema, include_databases: list[str] | None = None): + self._sql_backend = sql_backend self._include_database = include_databases # pylint: disable-next=import-error,import-outside-toplevel from pyspark.sql.session import SparkSession # type: ignore[import-not-found] - super().__init__(backend, schema, include_databases) + super().__init__(sql_backend, schema, include_databases) self._spark = SparkSession.builder.getOrCreate() @cached_property diff --git a/src/databricks/labs/ucx/hive_metastore/udfs.py b/src/databricks/labs/ucx/hive_metastore/udfs.py index 81ed350838..12a1256039 100644 --- a/src/databricks/labs/ucx/hive_metastore/udfs.py +++ b/src/databricks/labs/ucx/hive_metastore/udfs.py @@ -42,15 +42,15 @@ def key(self) -> str: class UdfsCrawler(CrawlerBase[Udf]): - def __init__(self, backend: SqlBackend, schema: str, include_databases: list[str] | None = None): + def __init__(self, sql_backend: SqlBackend, schema: str, include_databases: list[str] | None = None): """ Initializes a UdfsCrawler instance. Args: - backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark) + sql_backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark) schema: The schema name for the inventory persistence. """ - super().__init__(backend, "hive_metastore", schema, "udfs", Udf) + super().__init__(sql_backend, "hive_metastore", schema, "udfs", Udf) self._include_database = include_databases def _all_databases(self) -> list[str]: diff --git a/src/databricks/labs/ucx/recon/migration_recon.py b/src/databricks/labs/ucx/recon/migration_recon.py index 404fd8f1ba..35314d0fe8 100644 --- a/src/databricks/labs/ucx/recon/migration_recon.py +++ b/src/databricks/labs/ucx/recon/migration_recon.py @@ -38,7 +38,7 @@ class ReconResult: class MigrationRecon(CrawlerBase[ReconResult]): def __init__( self, - sbe: SqlBackend, + sql_backend: SqlBackend, schema: str, migration_status_refresher: TableMigrationStatusRefresher, table_mapping: TableMapping, @@ -46,7 +46,7 @@ def __init__( data_comparator: DataComparator, default_threshold: float, ): - super().__init__(sbe, "hive_metastore", schema, "recon_results", ReconResult) + super().__init__(sql_backend, "hive_metastore", schema, "recon_results", ReconResult) self._migration_status_refresher = migration_status_refresher self._table_mapping = table_mapping self._schema_comparator = schema_comparator diff --git a/src/databricks/labs/ucx/source_code/directfs_access.py b/src/databricks/labs/ucx/source_code/directfs_access.py index 42406d3b13..33972f9e22 100644 --- a/src/databricks/labs/ucx/source_code/directfs_access.py +++ b/src/databricks/labs/ucx/source_code/directfs_access.py @@ -27,7 +27,7 @@ def for_paths(cls, backend: SqlBackend, schema) -> DirectFsAccessCrawler: def for_queries(cls, backend: SqlBackend, schema) -> DirectFsAccessCrawler: return DirectFsAccessCrawler(backend, schema, "directfs_in_queries") - def __init__(self, backend: SqlBackend, schema: str, table: str): + def __init__(self, sql_backend: SqlBackend, schema: str, table: str): """ Initializes a DFSACrawler instance. @@ -35,7 +35,9 @@ def __init__(self, backend: SqlBackend, schema: str, table: str): sql_backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark) schema: The schema name for the inventory persistence. """ - super().__init__(backend=backend, catalog="hive_metastore", schema=schema, table=table, klass=DirectFsAccess) + super().__init__( + sql_backend=sql_backend, catalog="hive_metastore", schema=schema, table=table, klass=DirectFsAccess + ) def dump_all(self, dfsas: Sequence[DirectFsAccess]) -> None: """This crawler doesn't follow the pull model because the fetcher fetches data for 2 crawlers, not just one @@ -50,7 +52,7 @@ def dump_all(self, dfsas: Sequence[DirectFsAccess]) -> None: def _try_fetch(self) -> Iterable[DirectFsAccess]: sql = f"SELECT * FROM {escape_sql_identifier(self.full_name)}" - for row in self._backend.fetch(sql): + for row in self._sql_backend.fetch(sql): yield self._klass.from_dict(row.asDict()) def _crawl(self) -> Iterable[DirectFsAccess]: diff --git a/src/databricks/labs/ucx/source_code/used_table.py b/src/databricks/labs/ucx/source_code/used_table.py index 953f6f7aca..b5cdb77c0b 100644 --- a/src/databricks/labs/ucx/source_code/used_table.py +++ b/src/databricks/labs/ucx/source_code/used_table.py @@ -15,7 +15,7 @@ class UsedTablesCrawler(CrawlerBase[UsedTable]): - def __init__(self, backend: SqlBackend, schema: str, table: str) -> None: + def __init__(self, sql_backend: SqlBackend, schema: str, table: str) -> None: """ Initializes a DFSACrawler instance. @@ -23,7 +23,7 @@ def __init__(self, backend: SqlBackend, schema: str, table: str) -> None: sql_backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark) schema: The schema name for the inventory persistence. """ - super().__init__(backend=backend, catalog="hive_metastore", schema=schema, table=table, klass=UsedTable) + super().__init__(sql_backend=sql_backend, catalog="hive_metastore", schema=schema, table=table, klass=UsedTable) @classmethod def for_paths(cls, backend: SqlBackend, schema: str) -> UsedTablesCrawler: @@ -46,7 +46,7 @@ def dump_all(self, tables: Sequence[UsedTable]) -> None: def _try_fetch(self) -> Iterable[UsedTable]: sql = f"SELECT * FROM {escape_sql_identifier(self.full_name)}" - for row in self._backend.fetch(sql): + for row in self._sql_backend.fetch(sql): yield self._klass.from_dict(row.asDict()) def _crawl(self) -> Iterable[UsedTable]: diff --git a/src/databricks/labs/ucx/workspace_access/generic.py b/src/databricks/labs/ucx/workspace_access/generic.py index a77bf24f43..a646317ece 100644 --- a/src/databricks/labs/ucx/workspace_access/generic.py +++ b/src/databricks/labs/ucx/workspace_access/generic.py @@ -332,7 +332,7 @@ def __init__( Listing.__init__(self, lambda: [], "_", "_") CrawlerBase.__init__( self, - backend=sql_backend, + sql_backend=sql_backend, catalog="hive_metastore", schema=inventory_database, table="workspace_objects", diff --git a/src/databricks/labs/ucx/workspace_access/groups.py b/src/databricks/labs/ucx/workspace_access/groups.py index 73b2283645..b361ed773d 100644 --- a/src/databricks/labs/ucx/workspace_access/groups.py +++ b/src/databricks/labs/ucx/workspace_access/groups.py @@ -626,7 +626,7 @@ def delete_original_workspace_groups(self): def _try_fetch(self) -> Iterable[MigratedGroup]: state = [] - for row in self._backend.fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"): + for row in self._sql_backend.fetch(f"SELECT * FROM {escape_sql_identifier(self.full_name)}"): state.append(MigratedGroup(*row)) if not self._include_group_names: diff --git a/src/databricks/labs/ucx/workspace_access/manager.py b/src/databricks/labs/ucx/workspace_access/manager.py index 8376c05bf1..09d11c6431 100644 --- a/src/databricks/labs/ucx/workspace_access/manager.py +++ b/src/databricks/labs/ucx/workspace_access/manager.py @@ -23,8 +23,8 @@ class PermissionManager(CrawlerBase[Permissions]): ERRORS_TO_IGNORE = ["FEATURE_DISABLED"] - def __init__(self, backend: SqlBackend, inventory_database: str, crawlers: list[AclSupport]): - super().__init__(backend, "hive_metastore", inventory_database, "permissions", Permissions) + def __init__(self, sql_backend: SqlBackend, inventory_database: str, crawlers: list[AclSupport]): + super().__init__(sql_backend, "hive_metastore", inventory_database, "permissions", Permissions) self._acl_support = crawlers def _crawl(self) -> Iterable[Permissions]: diff --git a/tests/integration/assessment/test_clusters.py b/tests/integration/assessment/test_clusters.py index 8cf0622220..ee7147ceb2 100644 --- a/tests/integration/assessment/test_clusters.py +++ b/tests/integration/assessment/test_clusters.py @@ -19,7 +19,7 @@ @retried(on=[NotFound], timeout=timedelta(minutes=5)) def test_cluster_crawler(ws, make_cluster, inventory_schema, sql_backend): created_cluster = make_cluster(single_node=True, spark_conf=_SPARK_CONF) - cluster_crawler = ClustersCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema) + cluster_crawler = ClustersCrawler(ws=ws, sql_backend=sql_backend, schema=inventory_schema) clusters = cluster_crawler.snapshot() results = [] for cluster in clusters: @@ -34,7 +34,7 @@ def test_cluster_crawler(ws, make_cluster, inventory_schema, sql_backend): def test_cluster_crawler_no_isolation(ws, make_cluster, inventory_schema, sql_backend): created_cluster = make_cluster(data_security_mode=DataSecurityMode.NONE, num_workers=1) - cluster_crawler = ClustersCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema) + cluster_crawler = ClustersCrawler(ws=ws, sql_backend=sql_backend, schema=inventory_schema) clusters = cluster_crawler.snapshot() results = [] for cluster in clusters: @@ -85,7 +85,7 @@ def test_cluster_crawler_mlr_no_isolation(ws, make_cluster, inventory_schema, sq created_cluster = make_cluster( data_security_mode=DataSecurityMode.NONE, spark_version='15.4.x-cpu-ml-scala2.12', num_workers=1 ) - cluster_crawler = ClustersCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema) + cluster_crawler = ClustersCrawler(ws=ws, sql_backend=sql_backend, schema=inventory_schema) clusters = cluster_crawler.snapshot() results = [] for cluster in clusters: @@ -113,7 +113,7 @@ def test_policy_crawler(ws, make_cluster_policy, inventory_schema, sql_backend, "spark_conf.fs.azure.account.auth.type": {"type": "fixed", "value": "OAuth", "hidden": True}, } created_policy_2 = make_cluster_policy(name=f"{policy_2}", definition=json.dumps(policy_definition)) - policy_crawler = PoliciesCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema) + policy_crawler = PoliciesCrawler(ws=ws, sql_backend=sql_backend, schema=inventory_schema) policies = policy_crawler.snapshot() results = [] for policy in policies: diff --git a/tests/integration/assessment/test_jobs.py b/tests/integration/assessment/test_jobs.py index 47fa6f1b81..f2e48fd460 100644 --- a/tests/integration/assessment/test_jobs.py +++ b/tests/integration/assessment/test_jobs.py @@ -15,7 +15,7 @@ @retried(on=[NotFound], timeout=timedelta(minutes=5)) def test_job_crawler(ws, make_job, inventory_schema, sql_backend): new_job = make_job(spark_conf=_SPARK_CONF) - job_crawler = JobsCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema) + job_crawler = JobsCrawler(ws=ws, sql_backend=sql_backend, schema=inventory_schema) jobs = job_crawler.snapshot() results = [] for job in jobs: @@ -53,7 +53,7 @@ def test_job_run_crawler(ws, env_or_skip, inventory_schema, sql_backend): assert run run_id = run.run_id - job_run_crawler = SubmitRunsCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema, num_days_history=1) + job_run_crawler = SubmitRunsCrawler(ws=ws, sql_backend=sql_backend, schema=inventory_schema, num_days_history=1) job_runs = job_run_crawler.snapshot() assert len(job_runs) >= 1 diff --git a/tests/integration/assessment/test_pipelines.py b/tests/integration/assessment/test_pipelines.py index 93f60c850f..b85f7bdbc8 100644 --- a/tests/integration/assessment/test_pipelines.py +++ b/tests/integration/assessment/test_pipelines.py @@ -13,7 +13,7 @@ def test_pipeline_crawler(ws, make_pipeline, inventory_schema, sql_backend): logger.info("setting up fixtures") created_pipeline = make_pipeline(configuration=_PIPELINE_CONF) - pipeline_crawler = PipelinesCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema) + pipeline_crawler = PipelinesCrawler(ws=ws, sql_backend=sql_backend, schema=inventory_schema) pipelines = pipeline_crawler.snapshot() results = [] for pipeline in pipelines: @@ -31,7 +31,7 @@ def test_pipeline_with_secret_conf_crawler(ws, make_pipeline, inventory_schema, logger.info("setting up fixtures") created_pipeline = make_pipeline(configuration=_PIPELINE_CONF_WITH_SECRET) - pipeline_crawler = PipelinesCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema) + pipeline_crawler = PipelinesCrawler(ws=ws, sql_backend=sql_backend, schema=inventory_schema) pipelines = pipeline_crawler.snapshot() results = [] for pipeline in pipelines: diff --git a/tests/unit/framework/test_crawlers.py b/tests/unit/framework/test_crawlers.py index 2fa5c9bfc9..e7dc67f4ab 100644 --- a/tests/unit/framework/test_crawlers.py +++ b/tests/unit/framework/test_crawlers.py @@ -32,7 +32,7 @@ class Bar: class _CrawlerFixture(CrawlerBase[Result]): def __init__( self, - backend: MockBackend, + sql_backend: MockBackend, catalog: str, schema: str, table: str, @@ -41,7 +41,7 @@ def __init__( fetcher: ResultFn = lambda: [], loader: ResultFn = lambda: [], ): - super().__init__(backend, catalog, schema, table, klass) + super().__init__(sql_backend, catalog, schema, table, klass) self._fetcher = fetcher self._loader = loader From 51f768a6fa6c34ecb5935b663ddfbb97a8b58766 Mon Sep 17 00:00:00 2001 From: Serge Smertin <259697+nfx@users.noreply.github.com> Date: Thu, 24 Oct 2024 19:36:22 +0200 Subject: [PATCH 6/8] Rename `HistoryLog` into `ProgressEncoder` (#3073) --- .../labs/ucx/contexts/workflow_task.py | 59 ++++++++++--------- src/databricks/labs/ucx/progress/history.py | 10 +++- src/databricks/labs/ucx/progress/jobs.py | 0 src/databricks/labs/ucx/progress/workflows.py | 14 ++--- tests/unit/assessment/test_clusters.py | 6 +- tests/unit/assessment/test_jobs.py | 4 +- tests/unit/assessment/test_pipelines.py | 4 +- tests/unit/hive_metastore/test_grants.py | 6 +- .../unit/hive_metastore/test_table_migrate.py | 4 +- tests/unit/hive_metastore/test_tables.py | 6 +- tests/unit/hive_metastore/test_udfs.py | 4 +- tests/unit/progress/test_history.py | 11 +++- tests/unit/progress/test_workflows.py | 20 +++---- 13 files changed, 83 insertions(+), 65 deletions(-) create mode 100644 src/databricks/labs/ucx/progress/jobs.py diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 81a6ec7497..1fc1a7d5aa 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -26,7 +26,7 @@ from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table from databricks.labs.ucx.hive_metastore.udfs import Udf from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder -from databricks.labs.ucx.progress.history import HistoryLog +from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder # As with GlobalContext, service factories unavoidably have a lot of public methods. @@ -137,7 +137,7 @@ def task_run_warning_recorder(self) -> TaskRunWarningRecorder: self._config_path.parent, self.named_parameters["workflow"], int(self.named_parameters["job_id"]), - int(self.named_parameters["parent_run_id"]), + self.parent_run_id, self.sql_backend, self.inventory_database, int(self.named_parameters.get("attempt", "0")), @@ -151,7 +151,7 @@ def workflow_run_recorder(self) -> WorkflowRunRecorder: workspace_id=self.workspace_id, workflow_name=self.named_parameters["workflow"], workflow_id=int(self.named_parameters["job_id"]), - workflow_run_id=int(self.named_parameters["parent_run_id"]), + workflow_run_id=self.parent_run_id, workflow_run_attempt=int(self.named_parameters.get("attempt", 0)), workflow_start_time=self.named_parameters["start_time"], ) @@ -161,89 +161,94 @@ def workspace_id(self) -> int: return self.workspace_client.get_workspace_id() @cached_property - def historical_clusters_log(self) -> HistoryLog[ClusterInfo]: - return HistoryLog( + def parent_run_id(self) -> int: + return int(self.named_parameters["parent_run_id"]) + + @cached_property + def clusters_progress(self) -> ProgressEncoder[ClusterInfo]: + return ProgressEncoder( self.sql_backend, self.cluster_ownership, ClusterInfo, - int(self.named_parameters["parent_run_id"]), + self.parent_run_id, self.workspace_id, self.config.ucx_catalog, ) @cached_property - def historical_cluster_policies_log(self) -> HistoryLog[PolicyInfo]: - return HistoryLog( + def policies_progress(self) -> ProgressEncoder[PolicyInfo]: + return ProgressEncoder( self.sql_backend, self.cluster_policy_ownership, PolicyInfo, - int(self.named_parameters["parent_run_id"]), + self.parent_run_id, self.workspace_id, self.config.ucx_catalog, ) @cached_property - def historical_grants_log(self) -> HistoryLog[Grant]: - return HistoryLog( + def grants_progress(self) -> ProgressEncoder[Grant]: + return ProgressEncoder( self.sql_backend, self.grant_ownership, Grant, - int(self.named_parameters["parent_run_id"]), + self.parent_run_id, self.workspace_id, self.config.ucx_catalog, ) @cached_property - def historical_jobs_log(self) -> HistoryLog[JobInfo]: - return HistoryLog( + def jobs_progress(self) -> ProgressEncoder[JobInfo]: + return ProgressEncoder( self.sql_backend, self.job_ownership, JobInfo, - int(self.named_parameters["parent_run_id"]), + self.parent_run_id, self.workspace_id, self.config.ucx_catalog, ) @cached_property - def historical_pipelines_log(self) -> HistoryLog[PipelineInfo]: - return HistoryLog( + def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]: + return ProgressEncoder( self.sql_backend, self.pipeline_ownership, PipelineInfo, - int(self.named_parameters["parent_run_id"]), + self.parent_run_id, self.workspace_id, self.config.ucx_catalog, ) @cached_property - def historical_tables_log(self) -> HistoryLog[Table]: - return HistoryLog( + def tables_progress(self) -> ProgressEncoder[Table]: + return ProgressEncoder( self.sql_backend, self.table_ownership, Table, - int(self.named_parameters["parent_run_id"]), + self.parent_run_id, self.workspace_id, self.config.ucx_catalog, ) @cached_property - def historical_table_migration_log(self) -> HistoryLog[TableMigrationStatus]: - return HistoryLog( + def historical_table_migration_log(self) -> ProgressEncoder[TableMigrationStatus]: + # TODO: merge into tables_progress + return ProgressEncoder( self.sql_backend, self.table_migration_ownership, TableMigrationStatus, - int(self.named_parameters["parent_run_id"]), + self.parent_run_id, self.workspace_id, self.config.ucx_catalog, ) @cached_property - def historical_udfs_log(self) -> HistoryLog[Udf]: - return HistoryLog( + def udfs_progress(self) -> ProgressEncoder[Udf]: + return ProgressEncoder( self.sql_backend, self.udf_ownership, Udf, - int(self.named_parameters["parent_run_id"]), + self.parent_run_id, self.workspace_id, self.config.ucx_catalog, ) diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index 597bd657b5..b1f2847807 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -252,7 +252,7 @@ def to_historical(self, record: Record) -> Historical: ) -class HistoryLog(Generic[Record]): +class ProgressEncoder(Generic[Record]): def __init__( self, sql_backend: SqlBackend, @@ -269,8 +269,12 @@ def __init__( self._catalog = catalog self._schema = schema self._table = table - encoder = HistoricalEncoder(job_run_id=run_id, workspace_id=workspace_id, ownership=ownership, klass=klass) - self._encoder = encoder + self._encoder = HistoricalEncoder( + job_run_id=run_id, + workspace_id=workspace_id, + ownership=ownership, + klass=klass, + ) @property def full_name(self) -> str: diff --git a/src/databricks/labs/ucx/progress/jobs.py b/src/databricks/labs/ucx/progress/jobs.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index e256af2fa3..ec172d6682 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -52,7 +52,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # The table migration cluster is not legacy-ACL enabled, so we can't crawl from here. # Step 2 of 2: Assuming (due to depends-on) the inventory was refreshed, capture into the history log. # WARNING: this will fail if the inventory is empty, because it will then try to perform a crawl. - history_log = ctx.historical_tables_log + history_log = ctx.tables_progress tables_snapshot = ctx.tables_crawler.snapshot() history_log.append_inventory_snapshot(tables_snapshot) @@ -61,7 +61,7 @@ def crawl_udfs(self, ctx: RuntimeContext) -> None: """Iterates over all UDFs in the Hive Metastore of the current workspace and persists their metadata in the table named `$inventory_database.udfs`. This inventory is currently used when scanning securable objects for issues with grants that cannot be migrated to Unit Catalog.""" - history_log = ctx.historical_udfs_log + history_log = ctx.udfs_progress udfs_snapshot = ctx.udfs_crawler.snapshot(force_refresh=True) history_log.append_inventory_snapshot(udfs_snapshot) @@ -74,7 +74,7 @@ def crawl_grants(self, ctx: RuntimeContext) -> None: Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table ACLs enabled and available for retrieval.""" - history_log = ctx.historical_grants_log + history_log = ctx.grants_progress grants_snapshot = ctx.grants_crawler.snapshot(force_refresh=True) history_log.append_inventory_snapshot(grants_snapshot) @@ -89,7 +89,7 @@ def assess_jobs(self, ctx: RuntimeContext) -> None: - Clusters with incompatible Spark config tags - Clusters referencing DBFS locations in one or more config options """ - history_log = ctx.historical_jobs_log + history_log = ctx.jobs_progress jobs_snapshot = ctx.jobs_crawler.snapshot(force_refresh=True) history_log.append_inventory_snapshot(jobs_snapshot) @@ -104,7 +104,7 @@ def assess_clusters(self, ctx: RuntimeContext) -> None: - Clusters with incompatible spark config tags - Clusters referencing DBFS locations in one or more config options """ - history_log = ctx.historical_clusters_log + history_log = ctx.clusters_progress clusters_snapshot = ctx.clusters_crawler.snapshot(force_refresh=True) history_log.append_inventory_snapshot(clusters_snapshot) @@ -119,7 +119,7 @@ def assess_pipelines(self, ctx: RuntimeContext) -> None: Subsequently, a list of all the pipelines with matching configurations are stored in the `$inventory.pipelines` table.""" - history_log = ctx.historical_pipelines_log + history_log = ctx.pipelines_progress pipelines_snapshot = ctx.pipelines_crawler.snapshot(force_refresh=True) history_log.append_inventory_snapshot(pipelines_snapshot) @@ -132,7 +132,7 @@ def crawl_cluster_policies(self, ctx: RuntimeContext) -> None: Subsequently, a list of all the policies with matching configurations are stored in the `$inventory.policies` table.""" - history_log = ctx.historical_cluster_policies_log + history_log = ctx.policies_progress cluster_policies_snapshot = ctx.policies_crawler.snapshot(force_refresh=True) history_log.append_inventory_snapshot(cluster_policies_snapshot) diff --git a/tests/unit/assessment/test_clusters.py b/tests/unit/assessment/test_clusters.py index 6a35d649dc..9399fff57a 100644 --- a/tests/unit/assessment/test_clusters.py +++ b/tests/unit/assessment/test_clusters.py @@ -19,7 +19,7 @@ ) from databricks.labs.ucx.framework.crawlers import SqlBackend from databricks.labs.ucx.framework.owners import AdministratorLocator -from databricks.labs.ucx.progress.history import HistoryLog +from databricks.labs.ucx.progress.history import ProgressEncoder from .. import mock_workspace_client @@ -263,7 +263,7 @@ def test_cluster_info_supports_history(mock_backend, cluster_info_record: Cluste admin_locator = create_autospec(AdministratorLocator) admin_locator.get_workspace_administrator.return_value = "the_admin" cluster_ownership = ClusterOwnership(admin_locator) - history_log = HistoryLog[ClusterInfo]( + history_log = ProgressEncoder[ClusterInfo]( mock_backend, cluster_ownership, ClusterInfo, @@ -427,7 +427,7 @@ def test_cluster_policy_supports_history(mock_backend, policy_info_record: Polic admin_locator = create_autospec(AdministratorLocator) admin_locator.get_workspace_administrator.return_value = "the_admin" cluster_policy_ownership = ClusterPolicyOwnership(admin_locator) - history_log = HistoryLog[PolicyInfo]( + history_log = ProgressEncoder[PolicyInfo]( mock_backend, cluster_policy_ownership, PolicyInfo, diff --git a/tests/unit/assessment/test_jobs.py b/tests/unit/assessment/test_jobs.py index 9844e8c51d..32a3665319 100644 --- a/tests/unit/assessment/test_jobs.py +++ b/tests/unit/assessment/test_jobs.py @@ -3,7 +3,7 @@ import pytest from databricks.labs.lsql.backends import MockBackend from databricks.labs.lsql.core import Row -from databricks.labs.ucx.progress.history import HistoryLog +from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.sdk.service.jobs import BaseJob, JobSettings from databricks.labs.ucx.__about__ import __version__ as ucx_version @@ -213,7 +213,7 @@ def test_job_supports_history(mock_backend, job_info_record: JobInfo, history_re admin_locator = create_autospec(AdministratorLocator) admin_locator.get_workspace_administrator.return_value = "the_admin" job_ownership = JobOwnership(admin_locator) - history_log = HistoryLog[JobInfo]( + history_log = ProgressEncoder[JobInfo]( mock_backend, job_ownership, JobInfo, diff --git a/tests/unit/assessment/test_pipelines.py b/tests/unit/assessment/test_pipelines.py index 13c2424229..7b2b61dad6 100644 --- a/tests/unit/assessment/test_pipelines.py +++ b/tests/unit/assessment/test_pipelines.py @@ -11,7 +11,7 @@ from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler from databricks.labs.ucx.assessment.pipelines import PipelineOwnership, PipelineInfo, PipelinesCrawler from databricks.labs.ucx.framework.owners import AdministratorLocator -from databricks.labs.ucx.progress.history import HistoryLog +from databricks.labs.ucx.progress.history import ProgressEncoder from .. import mock_workspace_client @@ -163,7 +163,7 @@ def test_pipeline_info_supports_history(mock_backend, pipeline_info_record: Pipe admin_locator = create_autospec(AdministratorLocator) admin_locator.get_workspace_administrator.return_value = "the_admin" pipeline_ownership = PipelineOwnership(admin_locator) - history_log = HistoryLog[PipelineInfo]( + history_log = ProgressEncoder[PipelineInfo]( mock_backend, pipeline_ownership, PipelineInfo, diff --git a/tests/unit/hive_metastore/test_grants.py b/tests/unit/hive_metastore/test_grants.py index 5321606504..4a128a0a22 100644 --- a/tests/unit/hive_metastore/test_grants.py +++ b/tests/unit/hive_metastore/test_grants.py @@ -12,7 +12,7 @@ from databricks.labs.ucx.hive_metastore.grants import Grant, GrantsCrawler, MigrateGrants, GrantOwnership from databricks.labs.ucx.hive_metastore.tables import Table, TablesCrawler from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler -from databricks.labs.ucx.progress.history import HistoryLog +from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.workspace_access.groups import GroupManager @@ -846,7 +846,9 @@ def test_grant_supports_history(mock_backend, grant_record: Grant, history_recor """Verify that Grant records are written to the history log as expected.""" mock_ownership = create_autospec(GrantOwnership) mock_ownership.owner_of.return_value = "the_admin" - history_log = HistoryLog[Grant](mock_backend, mock_ownership, Grant, run_id=1, workspace_id=2, catalog="a_catalog") + history_log = ProgressEncoder[Grant]( + mock_backend, mock_ownership, Grant, run_id=1, workspace_id=2, catalog="a_catalog" + ) history_log.append_inventory_snapshot([grant_record]) diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index f0d7360530..84f726433d 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -39,7 +39,7 @@ What, ) from databricks.labs.ucx.hive_metastore.view_migrate import ViewToMigrate -from databricks.labs.ucx.progress.history import HistoryLog +from databricks.labs.ucx.progress.history import ProgressEncoder from .. import mock_table_mapping, mock_workspace_client @@ -1601,7 +1601,7 @@ def test_table_migration_status_supports_history( """Verify that TableMigrationStatus records are written as expected to the history log.""" table_migration_ownership = create_autospec(TableMigrationOwnership) table_migration_ownership.owner_of.return_value = "the_admin" - history_log = HistoryLog[TableMigrationStatus]( + history_log = ProgressEncoder[TableMigrationStatus]( mock_backend, table_migration_ownership, TableMigrationStatus, diff --git a/tests/unit/hive_metastore/test_tables.py b/tests/unit/hive_metastore/test_tables.py index 03d6686d48..716844806f 100644 --- a/tests/unit/hive_metastore/test_tables.py +++ b/tests/unit/hive_metastore/test_tables.py @@ -5,7 +5,7 @@ import pytest from databricks.labs.lsql.backends import MockBackend from databricks.labs.lsql.core import Row -from databricks.labs.ucx.progress.history import HistoryLog +from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.sdk import WorkspaceClient from databricks.labs.ucx.__about__ import __version__ as ucx_version @@ -754,7 +754,9 @@ def test_table_supports_history(mock_backend, table_record: Table, history_recor """Verify that Table records are written as expected to the history log.""" mock_ownership = create_autospec(TableOwnership) mock_ownership.owner_of.return_value = "the_admin" - history_log = HistoryLog[Table](mock_backend, mock_ownership, Table, run_id=1, workspace_id=2, catalog="a_catalog") + history_log = ProgressEncoder[Table]( + mock_backend, mock_ownership, Table, run_id=1, workspace_id=2, catalog="a_catalog" + ) history_log.append_inventory_snapshot([table_record]) diff --git a/tests/unit/hive_metastore/test_udfs.py b/tests/unit/hive_metastore/test_udfs.py index db6cd97e1a..5a06336819 100644 --- a/tests/unit/hive_metastore/test_udfs.py +++ b/tests/unit/hive_metastore/test_udfs.py @@ -7,7 +7,7 @@ from databricks.labs.ucx.__about__ import __version__ as ucx_version from databricks.labs.ucx.framework.owners import AdministratorLocator from databricks.labs.ucx.hive_metastore.udfs import Udf, UdfsCrawler, UdfOwnership -from databricks.labs.ucx.progress.history import HistoryLog +from databricks.labs.ucx.progress.history import ProgressEncoder def test_key(): @@ -160,7 +160,7 @@ def test_udf_supports_history(mock_backend, udf_record: Udf, history_record: Row """Verify that Udf records are written as expected to the history log.""" mock_ownership = create_autospec(UdfOwnership) mock_ownership.owner_of.return_value = "the_admin" - history_log = HistoryLog[Udf](mock_backend, mock_ownership, Udf, run_id=1, workspace_id=2, catalog="a_catalog") + history_log = ProgressEncoder[Udf](mock_backend, mock_ownership, Udf, run_id=1, workspace_id=2, catalog="a_catalog") history_log.append_inventory_snapshot([udf_record]) diff --git a/tests/unit/progress/test_history.py b/tests/unit/progress/test_history.py index 01d6284023..61c64fdd00 100644 --- a/tests/unit/progress/test_history.py +++ b/tests/unit/progress/test_history.py @@ -11,7 +11,12 @@ from databricks.labs.ucx.__about__ import __version__ as ucx_version from databricks.labs.ucx.framework.owners import Ownership -from databricks.labs.ucx.progress.history import HistoricalEncoder, HistoryLog, Record, DataclassWithIdAttributes +from databricks.labs.ucx.progress.history import ( + HistoricalEncoder, + ProgressEncoder, + Record, + DataclassWithIdAttributes, +) from databricks.labs.ucx.progress.install import Historical @@ -536,7 +541,7 @@ def test_history_log_appends_historical_records(mock_backend, ownership) -> None ), ) - history_log = HistoryLog( + history_log = ProgressEncoder( mock_backend, ownership, _TestRecord, @@ -556,7 +561,7 @@ def test_history_log_default_location(mock_backend, ownership) -> None: """Verify that the history log defaults to the ucx.history in the configured catalog.""" record = _TestRecord(a_field="foo", b_field=1, failures=[]) - history_log = HistoryLog(mock_backend, ownership, _TestRecord, run_id=1, workspace_id=2, catalog="the_catalog") + history_log = ProgressEncoder(mock_backend, ownership, _TestRecord, run_id=1, workspace_id=2, catalog="the_catalog") history_log.append_inventory_snapshot([record]) assert history_log.full_name == "the_catalog.multiworkspace.historical" diff --git a/tests/unit/progress/test_workflows.py b/tests/unit/progress/test_workflows.py index 555eb49a04..f7f7a81b21 100644 --- a/tests/unit/progress/test_workflows.py +++ b/tests/unit/progress/test_workflows.py @@ -4,7 +4,7 @@ import pytest from databricks.labs.ucx.hive_metastore import TablesCrawler -from databricks.labs.ucx.progress.history import HistoryLog +from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.sdk import WorkspaceClient from databricks.sdk.service.catalog import CatalogInfo, MetastoreAssignment from databricks.sdk.service.jobs import BaseRun, RunResultState, RunState @@ -16,15 +16,15 @@ @pytest.mark.parametrize( "task, crawler, history_log", ( - (MigrationProgress.crawl_udfs, RuntimeContext.udfs_crawler, RuntimeContext.historical_udfs_log), - (MigrationProgress.crawl_grants, RuntimeContext.grants_crawler, RuntimeContext.historical_grants_log), - (MigrationProgress.assess_jobs, RuntimeContext.jobs_crawler, RuntimeContext.historical_jobs_log), - (MigrationProgress.assess_clusters, RuntimeContext.clusters_crawler, RuntimeContext.historical_clusters_log), - (MigrationProgress.assess_pipelines, RuntimeContext.pipelines_crawler, RuntimeContext.historical_pipelines_log), + (MigrationProgress.crawl_udfs, RuntimeContext.udfs_crawler, RuntimeContext.udfs_progress), + (MigrationProgress.crawl_grants, RuntimeContext.grants_crawler, RuntimeContext.grants_progress), + (MigrationProgress.assess_jobs, RuntimeContext.jobs_crawler, RuntimeContext.jobs_progress), + (MigrationProgress.assess_clusters, RuntimeContext.clusters_crawler, RuntimeContext.clusters_progress), + (MigrationProgress.assess_pipelines, RuntimeContext.pipelines_crawler, RuntimeContext.pipelines_progress), ( MigrationProgress.crawl_cluster_policies, RuntimeContext.policies_crawler, - RuntimeContext.historical_cluster_policies_log, + RuntimeContext.policies_progress, ), ( MigrationProgress.refresh_table_migration_status, @@ -36,7 +36,7 @@ def test_migration_progress_runtime_refresh(run_workflow, task, crawler, history_log) -> None: crawler_class = get_type_hints(crawler.func)["return"] mock_crawler = create_autospec(crawler_class) - mock_history_log = create_autospec(HistoryLog) + mock_history_log = create_autospec(ProgressEncoder) crawler_name = crawler.attrname history_log_name = history_log.attrname context_replacements = { @@ -52,10 +52,10 @@ def test_migration_progress_runtime_refresh(run_workflow, task, crawler, history def test_migration_progress_runtime_tables_refresh(run_workflow) -> None: """Ensure that the split crawl and update-history-log tasks perform their part of the refresh process.""" mock_tables_crawler = create_autospec(TablesCrawler) - mock_history_log = create_autospec(HistoryLog) + mock_history_log = create_autospec(ProgressEncoder) context_replacements = { "tables_crawler": mock_tables_crawler, - "historical_tables_log": mock_history_log, + "tables_progress": mock_history_log, "named_parameters": {"parent_run_id": 53}, } From 123c89aea1ebf556cf3362ec57064f70997dd9c9 Mon Sep 17 00:00:00 2001 From: Serge Smertin <259697+nfx@users.noreply.github.com> Date: Thu, 24 Oct 2024 20:37:17 +0200 Subject: [PATCH 7/8] Combine static code analysis results with historical job snapshots (#3074) Fix #3059 --- .../labs/ucx/contexts/workflow_task.py | 5 +- src/databricks/labs/ucx/progress/jobs.py | 54 +++++++++++++++ tests/unit/progress/test_jobs.py | 66 +++++++++++++++++++ 3 files changed, 123 insertions(+), 2 deletions(-) create mode 100644 tests/unit/progress/test_jobs.py diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index 1fc1a7d5aa..7243567fdf 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -27,6 +27,7 @@ from databricks.labs.ucx.hive_metastore.udfs import Udf from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder from databricks.labs.ucx.progress.history import ProgressEncoder +from databricks.labs.ucx.progress.jobs import JobsProgressEncoder from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder # As with GlobalContext, service factories unavoidably have a lot of public methods. @@ -199,10 +200,10 @@ def grants_progress(self) -> ProgressEncoder[Grant]: @cached_property def jobs_progress(self) -> ProgressEncoder[JobInfo]: - return ProgressEncoder( + return JobsProgressEncoder( self.sql_backend, self.job_ownership, - JobInfo, + self.inventory_database, self.parent_run_id, self.workspace_id, self.config.ucx_catalog, diff --git a/src/databricks/labs/ucx/progress/jobs.py b/src/databricks/labs/ucx/progress/jobs.py index e69de29bb2..198139543c 100644 --- a/src/databricks/labs/ucx/progress/jobs.py +++ b/src/databricks/labs/ucx/progress/jobs.py @@ -0,0 +1,54 @@ +import collections +from dataclasses import replace +from functools import cached_property + +from databricks.labs.lsql.backends import SqlBackend + +from databricks.labs.ucx.assessment.jobs import JobInfo, JobOwnership +from databricks.labs.ucx.progress.history import ProgressEncoder +from databricks.labs.ucx.progress.install import Historical +from databricks.labs.ucx.source_code.jobs import JobProblem + + +class JobsProgressEncoder(ProgressEncoder[JobInfo]): + + def __init__( + self, + sql_backend: SqlBackend, + ownership: JobOwnership, + inventory_database: str, + run_id: int, + workspace_id: int, + catalog: str, + schema: str = "multiworkspace", + table: str = "historical", + ) -> None: + super().__init__( + sql_backend, + ownership, + JobInfo, + run_id, + workspace_id, + catalog, + schema, + table, + ) + self._inventory_database = inventory_database + + @cached_property + def _job_problems(self) -> dict[int, list[str]]: + index = collections.defaultdict(list) + for row in self._sql_backend.fetch( + 'SELECT * FROM workflow_problems', + catalog='hive_metastore', + schema=self._inventory_database, + ): + job_problem = JobProblem(**row.asDict()) + failure = f'{job_problem.code}: {job_problem.task_key} task: {job_problem.path}: {job_problem.message}' + index[job_problem.job_id].append(failure) + return index + + def _encode_record_as_historical(self, record: JobInfo) -> Historical: + historical = super()._encode_record_as_historical(record) + failures = self._job_problems.get(int(record.job_id), []) + return replace(historical, failures=historical.failures + failures) diff --git a/tests/unit/progress/test_jobs.py b/tests/unit/progress/test_jobs.py new file mode 100644 index 0000000000..23c2aa1435 --- /dev/null +++ b/tests/unit/progress/test_jobs.py @@ -0,0 +1,66 @@ +from unittest.mock import create_autospec + +from databricks.labs.lsql import Row +from databricks.labs.lsql.backends import MockBackend + +from databricks.labs.ucx.assessment.jobs import JobOwnership, JobInfo +from databricks.labs.ucx.progress.jobs import JobsProgressEncoder +from databricks.labs.ucx import __version__ + + +def test_jobs_progress_encoder() -> None: + common = { + 'message': 'some failure', + 'job_name': 'job_name', + 'start_line': 1, + 'start_col': 2, + 'end_line': 3, + 'end_col': 4, + } + sql_backend = MockBackend( + rows={ + "workflow_problems": [ + Row(job_id=1, code="cannot-autofix-table-reference", task_key="a", path="/some/path", **common), + Row(job_id=1, code="catalog-api-in-shared-clusters", task_key="b", path="/some/other", **common), + Row(job_id=2, code="catalog-api-in-shared-clusters", task_key="c", path="/x", **common), + ], + } + ) + job_ownership = create_autospec(JobOwnership) + job_ownership.owner_of.return_value = "some_owner" + jobs_progress_encoder = JobsProgressEncoder( + sql_backend, + job_ownership, + "inventory", + 2, + 3, + "ucx", + ) + + jobs_progress_encoder.append_inventory_snapshot( + [ + JobInfo( + job_id='1', + success=0, + failures='["some failure from config"]', + ) + ] + ) + + rows = sql_backend.rows_written_for('`ucx`.`multiworkspace`.`historical`', 'append') + assert rows == [ + Row( + workspace_id=3, + job_run_id=2, + object_type='JobInfo', + object_id=['1'], + data={'job_id': '1', 'success': '0'}, + failures=[ + 'some failure from config', + 'cannot-autofix-table-reference: a task: /some/path: some failure', + 'catalog-api-in-shared-clusters: b task: /some/other: some failure', + ], + owner='some_owner', + ucx_version=__version__, + ) + ] From 21cafaad31671ffbd81459f02fb3f3fc0352eaf4 Mon Sep 17 00:00:00 2001 From: Serge Smertin <259697+nfx@users.noreply.github.com> Date: Thu, 24 Oct 2024 20:46:19 +0200 Subject: [PATCH 8/8] Determine ownership of tables based on grants and source code (#3066) Determine ownership of tables in the inventory based on the following rules: - If a table is owned by a principal in the grants table, then that principal is the owner. - If a table is written to by a query, then the owner of that query is the owner of the table. - If a table is written to by a notebook or file, then the owner of the path is the owner of the table. --- .../labs/ucx/contexts/application.py | 20 +++- src/databricks/labs/ucx/framework/owners.py | 18 +++ .../labs/ucx/hive_metastore/ownership.py | 111 ++++++++++++++++++ .../hive_metastore/table_migration_status.py | 28 ----- .../labs/ucx/hive_metastore/tables.py | 11 -- src/databricks/labs/ucx/source_code/base.py | 10 ++ .../labs/ucx/source_code/directfs_access.py | 18 +-- .../hive_metastore/test_table_migrate.py | 5 +- .../integration/hive_metastore/test_tables.py | 8 +- .../unit/hive_metastore/test_table_migrate.py | 3 +- tests/unit/hive_metastore/test_tables.py | 27 ++++- .../unit/source_code/test_directfs_access.py | 6 +- 12 files changed, 200 insertions(+), 65 deletions(-) create mode 100644 src/databricks/labs/ucx/hive_metastore/ownership.py diff --git a/src/databricks/labs/ucx/contexts/application.py b/src/databricks/labs/ucx/contexts/application.py index 300051c7e8..0d04270ac5 100644 --- a/src/databricks/labs/ucx/contexts/application.py +++ b/src/databricks/labs/ucx/contexts/application.py @@ -28,7 +28,7 @@ from databricks.labs.ucx.assessment.export import AssessmentExporter from databricks.labs.ucx.aws.credentials import CredentialManager from databricks.labs.ucx.config import WorkspaceConfig -from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership +from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership, LegacyQueryOwnership from databricks.labs.ucx.hive_metastore import ExternalLocations, MountsCrawler, TablesCrawler from databricks.labs.ucx.hive_metastore.catalog_schema import CatalogSchema from databricks.labs.ucx.hive_metastore.grants import ( @@ -43,13 +43,13 @@ PrincipalACL, ) from databricks.labs.ucx.hive_metastore.mapping import TableMapping -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationOwnership +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex +from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership, TableOwnership from databricks.labs.ucx.hive_metastore.table_migrate import ( TableMigrationStatusRefresher, TablesMigrator, ) from databricks.labs.ucx.hive_metastore.table_move import TableMove -from databricks.labs.ucx.hive_metastore.tables import TableOwnership from databricks.labs.ucx.hive_metastore.udfs import UdfsCrawler, UdfOwnership from databricks.labs.ucx.hive_metastore.verification import VerifyHasCatalog, VerifyHasMetastore from databricks.labs.ucx.installer.workflows import DeployedWorkflows @@ -263,17 +263,29 @@ def tables_crawler(self) -> TablesCrawler: @cached_property def table_ownership(self) -> TableOwnership: - return TableOwnership(self.administrator_locator) + return TableOwnership( + self.administrator_locator, + self.grants_crawler, + self.used_tables_crawler_for_paths, + self.used_tables_crawler_for_queries, + self.legacy_query_ownership, + self.workspace_path_ownership, + ) @cached_property def workspace_path_ownership(self) -> WorkspacePathOwnership: return WorkspacePathOwnership(self.administrator_locator, self.workspace_client) + @cached_property + def legacy_query_ownership(self) -> LegacyQueryOwnership: + return LegacyQueryOwnership(self.administrator_locator, self.workspace_client) + @cached_property def directfs_access_ownership(self) -> DirectFsAccessOwnership: return DirectFsAccessOwnership( self.administrator_locator, self.workspace_path_ownership, + self.legacy_query_ownership, self.workspace_client, ) diff --git a/src/databricks/labs/ucx/framework/owners.py b/src/databricks/labs/ucx/framework/owners.py index 0350ecfd60..55a1ddac98 100644 --- a/src/databricks/labs/ucx/framework/owners.py +++ b/src/databricks/labs/ucx/framework/owners.py @@ -201,6 +201,9 @@ def __init__(self, administrator_locator: AdministratorLocator, ws: WorkspaceCli super().__init__(administrator_locator) self._ws = ws + def owner_of_path(self, path: str) -> str: + return self.owner_of(WorkspacePath(self._ws, path)) + @retried(on=[InternalError], timeout=timedelta(minutes=1)) def _maybe_direct_owner(self, record: WorkspacePath) -> str | None: maybe_type_and_id = self._maybe_type_and_id(record) @@ -237,3 +240,18 @@ def _infer_from_first_can_manage(object_permissions): return acl.group_name return acl.service_principal_name return None + + +class LegacyQueryOwnership(Ownership[str]): + def __init__(self, administrator_locator: AdministratorLocator, workspace_client: WorkspaceClient) -> None: + super().__init__(administrator_locator) + self._workspace_client = workspace_client + + def _maybe_direct_owner(self, record: str) -> str | None: + try: + legacy_query = self._workspace_client.queries.get(record) + return legacy_query.owner_user_name + except NotFound: + return None + except InternalError: # redash is very naughty and throws 500s instead of proper 404s + return None diff --git a/src/databricks/labs/ucx/hive_metastore/ownership.py b/src/databricks/labs/ucx/hive_metastore/ownership.py new file mode 100644 index 0000000000..b11f5f6e81 --- /dev/null +++ b/src/databricks/labs/ucx/hive_metastore/ownership.py @@ -0,0 +1,111 @@ +import logging +from functools import cached_property + +from databricks.labs.ucx.framework.owners import ( + Ownership, + AdministratorLocator, + LegacyQueryOwnership, + WorkspacePathOwnership, +) +from databricks.labs.ucx.hive_metastore import TablesCrawler +from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus +from databricks.labs.ucx.hive_metastore.tables import Table +from databricks.labs.ucx.source_code.base import UsedTable +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler + +logger = logging.getLogger(__name__) + + +class TableOwnership(Ownership[Table]): + """Determine ownership of tables in the inventory based on the following rules: + - If a table is owned by a principal in the grants table, then that principal is the owner. + - If a table is written to by a query, then the owner of that query is the owner of the table. + - If a table is written to by a notebook or file, then the owner of the path is the owner of the table. + """ + + def __init__( + self, + administrator_locator: AdministratorLocator, + grants_crawler: GrantsCrawler, + used_tables_in_paths: UsedTablesCrawler, + used_tables_in_queries: UsedTablesCrawler, + legacy_query_ownership: LegacyQueryOwnership, + workspace_path_ownership: WorkspacePathOwnership, + ) -> None: + super().__init__(administrator_locator) + self._grants_crawler = grants_crawler + self._used_tables_in_paths = used_tables_in_paths + self._used_tables_in_queries = used_tables_in_queries + self._legacy_query_ownership = legacy_query_ownership + self._workspace_path_ownership = workspace_path_ownership + + def _maybe_direct_owner(self, record: Table) -> str | None: + owner = self._maybe_from_grants(record) + if owner: + return owner + return self._maybe_from_sources(record) + + def _maybe_from_sources(self, record: Table) -> str | None: + used_table = self._used_tables_snapshot.get((record.catalog, record.database, record.name)) + if not used_table: + return None + # If something writes to a table, then it's an owner of it + if not used_table.is_write: + return None + if used_table.source_type == 'QUERY' and used_table.query_id: + return self._legacy_query_ownership.owner_of(used_table.query_id) + if used_table.source_type in {'NOTEBOOK', 'FILE'}: + return self._workspace_path_ownership.owner_of_path(used_table.source_id) + logger.warning(f"Unknown source type {used_table.source_type} for {used_table.source_id}") + return None + + @cached_property + def _used_tables_snapshot(self) -> dict[tuple[str, str, str], UsedTable]: + index = {} + for collection in (self._used_tables_in_paths.snapshot(), self._used_tables_in_queries.snapshot()): + for used_table in collection: + key = used_table.catalog_name, used_table.schema_name, used_table.table_name + index[key] = used_table + return index + + def _maybe_from_grants(self, record: Table) -> str | None: + for grant in self._grants_snapshot: + if not grant.action_type == 'OWN': + continue + object_type, full_name = grant.this_type_and_key() + if object_type == 'TABLE' and full_name == record.key: + return grant.principal + if object_type in {'DATABASE', 'SCHEMA'} and full_name == f"{record.catalog}.{record.database}": + return grant.principal + return None + + @cached_property + def _grants_snapshot(self): + return self._grants_crawler.snapshot() + + +class TableMigrationOwnership(Ownership[TableMigrationStatus]): + """Determine ownership of table migration records in the inventory. + + This is the owner of the source table, if (and only if) the source table is present in the inventory. + """ + + def __init__(self, tables_crawler: TablesCrawler, table_ownership: TableOwnership) -> None: + super().__init__(table_ownership._administrator_locator) # TODO: Fix this + self._tables_crawler = tables_crawler + self._table_ownership = table_ownership + self._indexed_tables: dict[tuple[str, str], Table] | None = None + + def _tables_snapshot_index(self, reindex: bool = False) -> dict[tuple[str, str], Table]: + index = self._indexed_tables + if index is None or reindex: + snapshot = self._tables_crawler.snapshot() + index = {(table.database, table.name): table for table in snapshot} + self._indexed_tables = index + return index + + def _maybe_direct_owner(self, record: TableMigrationStatus) -> str | None: + index = self._tables_snapshot_index() + source_table = index.get((record.src_schema, record.src_table), None) + return self._table_ownership.owner_of(source_table) if source_table is not None else None diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 9f697f9f32..dde5f17790 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -9,10 +9,8 @@ from databricks.sdk.errors import NotFound from databricks.labs.ucx.framework.crawlers import CrawlerBase -from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore import TablesCrawler -from databricks.labs.ucx.hive_metastore.tables import Table, TableOwnership logger = logging.getLogger(__name__) @@ -162,29 +160,3 @@ def _iter_schemas(self): except NotFound: logger.warning(f"Catalog {catalog.name} no longer exists. Skipping checking its migration status.") continue - - -class TableMigrationOwnership(Ownership[TableMigrationStatus]): - """Determine ownership of table migration records in the inventory. - - This is the owner of the source table, if (and only if) the source table is present in the inventory. - """ - - def __init__(self, tables_crawler: TablesCrawler, table_ownership: TableOwnership) -> None: - super().__init__(table_ownership._administrator_locator) - self._tables_crawler = tables_crawler - self._table_ownership = table_ownership - self._indexed_tables: dict[tuple[str, str], Table] | None = None - - def _tables_snapshot_index(self, reindex: bool = False) -> dict[tuple[str, str], Table]: - index = self._indexed_tables - if index is None or reindex: - snapshot = self._tables_crawler.snapshot() - index = {(table.database, table.name): table for table in snapshot} - self._indexed_tables = index - return index - - def _maybe_direct_owner(self, record: TableMigrationStatus) -> str | None: - index = self._tables_snapshot_index() - source_table = index.get((record.src_schema, record.src_table), None) - return self._table_ownership.owner_of(source_table) if source_table is not None else None diff --git a/src/databricks/labs/ucx/hive_metastore/tables.py b/src/databricks/labs/ucx/hive_metastore/tables.py index 7a1f0e90a9..08f1864586 100644 --- a/src/databricks/labs/ucx/hive_metastore/tables.py +++ b/src/databricks/labs/ucx/hive_metastore/tables.py @@ -16,7 +16,6 @@ from databricks.sdk.errors import NotFound from databricks.labs.ucx.framework.crawlers import CrawlerBase -from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier logger = logging.getLogger(__name__) @@ -660,13 +659,3 @@ def _create_describe_tasks(self, catalog: str, database: str, table_names: list[ for table in table_names: tasks.append(partial(self._describe, catalog, database, table)) return tasks - - -class TableOwnership(Ownership[Table]): - """Determine ownership of tables in the inventory. - - At the present we don't determine a specific owner for tables. - """ - - def _maybe_direct_owner(self, record: Table) -> None: - return None diff --git a/src/databricks/labs/ucx/source_code/base.py b/src/databricks/labs/ucx/source_code/base.py index 6d46e9d600..659b38b2b7 100644 --- a/src/databricks/labs/ucx/source_code/base.py +++ b/src/databricks/labs/ucx/source_code/base.py @@ -231,6 +231,16 @@ def source_type(self) -> str | None: last = self.source_lineage[-1] return last.object_type + @property + def query_id(self) -> str | None: + if self.source_type != 'QUERY': + return None + last = self.source_lineage[-1] + parts = last.object_id.split('/') + if len(parts) < 2: + return None + return parts[1] + @dataclass class UsedTable(SourceInfo): diff --git a/src/databricks/labs/ucx/source_code/directfs_access.py b/src/databricks/labs/ucx/source_code/directfs_access.py index 33972f9e22..b0b449dd1a 100644 --- a/src/databricks/labs/ucx/source_code/directfs_access.py +++ b/src/databricks/labs/ucx/source_code/directfs_access.py @@ -10,7 +10,12 @@ from databricks.labs.lsql.backends import SqlBackend from databricks.sdk.errors import DatabricksError, NotFound -from databricks.labs.ucx.framework.owners import Ownership, AdministratorLocator, WorkspacePathOwnership +from databricks.labs.ucx.framework.owners import ( + Ownership, + AdministratorLocator, + WorkspacePathOwnership, + LegacyQueryOwnership, +) from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.source_code.base import DirectFsAccess @@ -73,15 +78,17 @@ def __init__( self, administrator_locator: AdministratorLocator, workspace_path_ownership: WorkspacePathOwnership, + legacy_query_ownership: LegacyQueryOwnership, workspace_client: WorkspaceClient, ) -> None: super().__init__(administrator_locator) self._workspace_path_ownership = workspace_path_ownership + self._legacy_query_ownership = legacy_query_ownership self._workspace_client = workspace_client def _maybe_direct_owner(self, record: DirectFsAccess) -> str | None: - if record.source_type == 'QUERY': - return self._query_owner(record) + if record.source_type == 'QUERY' and record.query_id: + return self._legacy_query_ownership.owner_of(record.query_id) if record.source_type in {'NOTEBOOK', 'FILE'}: return self._notebook_owner(record) logger.warning(f"Unknown source type {record.source_type} for {record.source_id}") @@ -94,8 +101,3 @@ def _notebook_owner(self, record): return owner except NotFound: return None - - def _query_owner(self, record): - query_id = record.source_lineage[-1].object_id.split('/')[1] - legacy_query = self._workspace_client.queries.get(query_id) - return legacy_query.owner_user_name diff --git a/tests/integration/hive_metastore/test_table_migrate.py b/tests/integration/hive_metastore/test_table_migrate.py index e9ba362a86..61f87ac8b2 100644 --- a/tests/integration/hive_metastore/test_table_migrate.py +++ b/tests/integration/hive_metastore/test_table_migrate.py @@ -2,11 +2,10 @@ from databricks.labs.ucx.hive_metastore import TablesCrawler from databricks.labs.ucx.hive_metastore.table_migration_status import ( - TableMigrationOwnership, TableMigrationStatus, TableMigrationStatusRefresher, ) -from databricks.labs.ucx.hive_metastore.tables import TableOwnership +from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership def test_table_migration_ownership(ws, runtime_ctx, inventory_schema, sql_backend) -> None: @@ -32,7 +31,7 @@ def is_migration_record_for_table(record: TableMigrationStatus) -> bool: synthetic_record = dataclasses.replace(table_migration_record, src_table="does_not_exist") # Verify for the table that the table owner and the migration status are a match. - table_ownership = TableOwnership(runtime_ctx.administrator_locator) + table_ownership = runtime_ctx.table_ownership table_migration_ownership = TableMigrationOwnership(tables_crawler, table_ownership) assert table_migration_ownership.owner_of(table_migration_record) == table_ownership.owner_of(table_record) diff --git a/tests/integration/hive_metastore/test_tables.py b/tests/integration/hive_metastore/test_tables.py index efd554591a..6041e53904 100644 --- a/tests/integration/hive_metastore/test_tables.py +++ b/tests/integration/hive_metastore/test_tables.py @@ -5,7 +5,7 @@ from databricks.sdk.retries import retried from databricks.labs.ucx.hive_metastore import TablesCrawler -from databricks.labs.ucx.hive_metastore.tables import What, TableOwnership +from databricks.labs.ucx.hive_metastore.tables import What logger = logging.getLogger(__name__) @@ -90,7 +90,6 @@ def test_partitioned_tables(ws, sql_backend, make_schema, make_table): def test_table_ownership(runtime_ctx, inventory_schema, sql_backend) -> None: """Verify the ownership can be determined for crawled tables.""" - # This currently isn't very useful: we don't currently locate specific owners for tables. # A table for which we'll determine the owner. table = runtime_ctx.make_table() @@ -103,5 +102,6 @@ def test_table_ownership(runtime_ctx, inventory_schema, sql_backend) -> None: table_record = next(record for record in records if record.full_name == table.full_name) # Verify ownership can be made. - ownership = TableOwnership(runtime_ctx.administrator_locator) - assert ownership.owner_of(table_record) == runtime_ctx.administrator_locator.get_workspace_administrator() + my_user = runtime_ctx.workspace_client.current_user.me() + owner = runtime_ctx.table_ownership.owner_of(table_record) + assert owner == my_user.user_name diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index 84f726433d..946a57b4f7 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -28,13 +28,12 @@ from databricks.labs.ucx.hive_metastore.table_migration_status import ( TableMigrationStatusRefresher, TableMigrationIndex, - TableMigrationOwnership, TableMigrationStatus, TableView, ) +from databricks.labs.ucx.hive_metastore.ownership import TableMigrationOwnership, TableOwnership from databricks.labs.ucx.hive_metastore.tables import ( Table, - TableOwnership, TablesCrawler, What, ) diff --git a/tests/unit/hive_metastore/test_tables.py b/tests/unit/hive_metastore/test_tables.py index 716844806f..057389d49b 100644 --- a/tests/unit/hive_metastore/test_tables.py +++ b/tests/unit/hive_metastore/test_tables.py @@ -6,19 +6,22 @@ from databricks.labs.lsql.backends import MockBackend from databricks.labs.lsql.core import Row from databricks.labs.ucx.progress.history import ProgressEncoder + +from databricks.labs.ucx.hive_metastore.grants import GrantsCrawler from databricks.sdk import WorkspaceClient from databricks.labs.ucx.__about__ import __version__ as ucx_version -from databricks.labs.ucx.framework.owners import AdministratorLocator +from databricks.labs.ucx.framework.owners import AdministratorLocator, LegacyQueryOwnership, WorkspacePathOwnership from databricks.labs.ucx.hive_metastore.locations import Mount, ExternalLocations, MountsCrawler from databricks.labs.ucx.hive_metastore.tables import ( FasterTableScanCrawler, HiveSerdeType, Table, - TableOwnership, TablesCrawler, What, ) +from databricks.labs.ucx.hive_metastore.ownership import TableOwnership +from databricks.labs.ucx.source_code.used_table import UsedTablesCrawler def test_is_delta_true(): @@ -676,12 +679,30 @@ def test_table_owner() -> None: admin_locator = create_autospec(AdministratorLocator) admin_locator.get_workspace_administrator.return_value = "an_admin" - ownership = TableOwnership(admin_locator) + grants_crawler = create_autospec(GrantsCrawler) + grants_crawler.snapshot.return_value = [] + used_tables_in_paths = create_autospec(UsedTablesCrawler) + used_tables_in_paths.snapshot.return_value = [] + used_tables_in_queries = create_autospec(UsedTablesCrawler) + used_tables_in_queries.snapshot.return_value = [] + legacy_query_ownership = create_autospec(LegacyQueryOwnership) + workspace_path_ownership = create_autospec(WorkspacePathOwnership) + + ownership = TableOwnership( + admin_locator, + grants_crawler, + used_tables_in_paths, + used_tables_in_queries, + legacy_query_ownership, + workspace_path_ownership, + ) table = Table(catalog="main", database="foo", name="bar", object_type="TABLE", table_format="DELTA") owner = ownership.owner_of(table) assert owner == "an_admin" admin_locator.get_workspace_administrator.assert_called_once() + legacy_query_ownership.owner_of.assert_not_called() + workspace_path_ownership.owner_of.assert_not_called() @pytest.mark.parametrize( diff --git a/tests/unit/source_code/test_directfs_access.py b/tests/unit/source_code/test_directfs_access.py index 5a9aadee80..2b381df95f 100644 --- a/tests/unit/source_code/test_directfs_access.py +++ b/tests/unit/source_code/test_directfs_access.py @@ -4,7 +4,7 @@ from databricks.labs.lsql.backends import MockBackend from databricks.sdk import WorkspaceClient -from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership +from databricks.labs.ucx.framework.owners import AdministratorLocator, WorkspacePathOwnership, LegacyQueryOwnership from databricks.labs.ucx.source_code.base import LineageAtom from databricks.labs.ucx.source_code.directfs_access import ( DirectFsAccessCrawler, @@ -43,11 +43,13 @@ def test_directfs_access_ownership() -> None: admin_locator = create_autospec(AdministratorLocator) workspace_path_ownership = create_autospec(WorkspacePathOwnership) workspace_path_ownership.owner_of.return_value = "other_admin" + legacy_query_ownership = create_autospec(LegacyQueryOwnership) - ownership = DirectFsAccessOwnership(admin_locator, workspace_path_ownership, ws) + ownership = DirectFsAccessOwnership(admin_locator, workspace_path_ownership, legacy_query_ownership, ws) dfsa = DirectFsAccess(source_lineage=[LineageAtom(object_type="NOTEBOOK", object_id="/x/y/z")]) owner = ownership.owner_of(dfsa) assert owner == "other_admin" ws.queries.get.assert_not_called() + legacy_query_ownership.owner_of.assert_not_called() admin_locator.get_workspace_administrator.assert_not_called()