From 53c0b9f9cf6870511e76ae39f40e51231a9c9af5 Mon Sep 17 00:00:00 2001 From: "dan.zafar" Date: Mon, 21 Oct 2024 12:02:48 -0600 Subject: [PATCH] Refactor to reduce McCabe rating to <=10 --- .../labs/ucx/hive_metastore/locations.py | 93 ++++++++++--------- 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/locations.py b/src/databricks/labs/ucx/hive_metastore/locations.py index cf0b396eda..e948442463 100644 --- a/src/databricks/labs/ucx/hive_metastore/locations.py +++ b/src/databricks/labs/ucx/hive_metastore/locations.py @@ -20,7 +20,6 @@ logger = logging.getLogger(__name__) - _EXTERNAL_FILE_LOCATION_SCHEMES = ("s3", "s3a", "s3n", "gcs", "abfss") @@ -567,7 +566,6 @@ def _find_delta_log_folders( delta_log_folders = {} logger.info(f"Listing {root_dir}") file_infos = self._dbutils.fs.ls(root_dir) or [] - partitioned_table_counter = 0 for file_info in file_infos: if self._is_irrelevant(file_info.name) or file_info.path == root_dir: logger.debug(f"Path {file_info.path} is irrelevant") @@ -578,34 +576,30 @@ def _find_delta_log_folders( table_in_mount = self._assess_path(file_info) if parent_entry: - # Happens when first folder was _delta_log and next folders are partitioned folder - if parent_entry.format == "DELTA" and self._is_partitioned(file_info.name): - delta_log_folders[root_path] = TableInMount(format=parent_entry.format, is_partitioned=True) - logger.debug(f"Added {parent_entry.format} table for {root_path} (partitioned delta)") - # this can spin for hours if there is a large enough directory - partitioned_table_counter += 1 - if partitioned_table_counter > 10: - logger.debug("Exiting after 10 reps:") - for file in file_infos[:10]: - logger.debug("\t" + file.name) - break - # Happens when previous entries where partitioned folders and the current one is delta_log - if parent_entry.is_partitioned and table_in_mount and table_in_mount.format == "DELTA": - delta_log_folders[root_path] = TableInMount(format=table_in_mount.format, is_partitioned=True) - logger.debug(f"Added {parent_entry.format} table for {root_path} (delta in partitioned)") - - if self._is_recursible_dir(file_info): - self._find_delta_log_folders(file_info.path, delta_log_folders) + # if the root_path was already found to be partitioned, we can break to reduce redundant ops + if delta_log_folders[root_path] == TableInMount(format="DELTA", is_partitioned=True): + break + + if ( + # Happens when first folder was _delta_log and next folders are partitioned folder + parent_entry.format == "DELTA" + and self._is_partitioned(file_info.name) + ) or ( + # Happens when previous entries where partitioned folders and the current one is delta_log + parent_entry.is_partitioned + and table_in_mount + and table_in_mount.format == "DELTA" + ): + delta_log_folders[root_path] = TableInMount(format="DELTA", is_partitioned=True) + logger.debug(f"Added DELTA table for {root_path} (partitioned delta)") + + self._recurse_dir_if_needed(file_info, delta_log_folders) elif self._is_partitioned(file_info.name): - partition_format = self._find_partition_file_format(file_info.path) - if partition_format: - delta_log_folders[root_path] = partition_format - logger.debug(f"Added {partition_format.format} table for {root_path} (partitioned)") + self._handle_partitioned_file(file_info.path, root_path, delta_log_folders) elif not table_in_mount: - if self._is_recursible_dir(file_info): - self._find_delta_log_folders(file_info.path, delta_log_folders) + self._recurse_dir_if_needed(file_info, delta_log_folders) elif table_in_mount.format == "DELTA" and file_info.name == "_delta_log/": delta_log_folders[root_path] = table_in_mount @@ -614,11 +608,20 @@ def _find_delta_log_folders( else: delta_log_folders[root_path] = table_in_mount logger.debug(f"Added {table_in_mount.format} table for {root_path} (general)") - if self._is_recursible_dir(file_info): - self._find_delta_log_folders(file_info.path, delta_log_folders) + self._recurse_dir_if_needed(file_info, delta_log_folders) return delta_log_folders + def _recurse_dir_if_needed(self, file_info: FileInfo, delta_log_folders: dict[str, TableInMount]) -> None: + if self._is_recursible_dir(file_info): + self._find_delta_log_folders(file_info.path, delta_log_folders) + + def _handle_partitioned_file(self, file_path, root_path, delta_log_folders): + partition_format = self._find_partition_file_format(file_path) + if partition_format: + delta_log_folders[root_path] = partition_format + logger.debug(f"Added {partition_format.format} table for {root_path} (partitioned)") + def _find_partition_file_format(self, root_dir: str) -> TableInMount | None: logger.info(f"Listing partitioned file {root_dir}") file_infos = self._dbutils.fs.ls(root_dir) @@ -645,6 +648,23 @@ def _assess_path(self, file_info: FileInfo) -> TableInMount | None: return TableInMount(format="PARQUET", is_partitioned=False) return None + def _is_recursible_dir(self, file_info: FileInfo) -> bool: + # Rules for recursing into a folder + # - should be size 0 (usually a directory) + # - should not be the _delta_log directory + # - file should not be partitioned + # - file name should not start with 'part-' + # - no brackets, brackets are not allowed in dbutils.fs.ls + # - is not a streaming checkpoint dir + return ( + file_info.size == 0 + and file_info.name != "_delta_log/" + and not self._is_partitioned(file_info.name) + and not file_info.name.startswith("part-") + and not any(char in file_info.name for char in "[]") + and not self._is_streaming_checkpoint_dir(file_info) + ) + @staticmethod def _is_partitioned(file_name: str) -> bool: return '=' in file_name @@ -670,20 +690,3 @@ def _is_irrelevant(self, file_name: str) -> bool: def _is_streaming_checkpoint_dir(self, file_info: FileInfo) -> bool: path_dirs = [file.name for file in self._dbutils.fs.ls(file_info.path) if file.size == 0] return 'commits/' in path_dirs and 'offsets/' in path_dirs - - def _is_recursible_dir(self, file_info: FileInfo) -> bool: - # Rules for recursing into a folder - # - should be size 0 (usually a directory) - # - should not be the _delta_log directory - # - file should not be partitioned - # - file name should not start with part- - # - brackets are not allowed in dbutils.fs.ls - # - is not a streaming checkpoint dir - return ( - file_info.size == 0 - and file_info.name != "_delta_log/" - and not self._is_partitioned(file_info.name) - and not file_info.name.startswith("part-") - and not any([char in file_info.name for char in "[]"]) - and not self._is_streaming_checkpoint_dir(file_info) - )