Skip to content

Commit

Permalink
Refactor to reduce McCabe rating to <=10
Browse files Browse the repository at this point in the history
  • Loading branch information
danzafar committed Oct 21, 2024
1 parent cb91023 commit 53c0b9f
Showing 1 changed file with 48 additions and 45 deletions.
93 changes: 48 additions & 45 deletions src/databricks/labs/ucx/hive_metastore/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

logger = logging.getLogger(__name__)


_EXTERNAL_FILE_LOCATION_SCHEMES = ("s3", "s3a", "s3n", "gcs", "abfss")


Expand Down Expand Up @@ -567,7 +566,6 @@ def _find_delta_log_folders(
delta_log_folders = {}
logger.info(f"Listing {root_dir}")
file_infos = self._dbutils.fs.ls(root_dir) or []
partitioned_table_counter = 0
for file_info in file_infos:
if self._is_irrelevant(file_info.name) or file_info.path == root_dir:
logger.debug(f"Path {file_info.path} is irrelevant")
Expand All @@ -578,34 +576,30 @@ def _find_delta_log_folders(
table_in_mount = self._assess_path(file_info)

if parent_entry:
# Happens when first folder was _delta_log and next folders are partitioned folder
if parent_entry.format == "DELTA" and self._is_partitioned(file_info.name):
delta_log_folders[root_path] = TableInMount(format=parent_entry.format, is_partitioned=True)
logger.debug(f"Added {parent_entry.format} table for {root_path} (partitioned delta)")
# this can spin for hours if there is a large enough directory
partitioned_table_counter += 1
if partitioned_table_counter > 10:
logger.debug("Exiting after 10 reps:")
for file in file_infos[:10]:
logger.debug("\t" + file.name)
break
# Happens when previous entries where partitioned folders and the current one is delta_log
if parent_entry.is_partitioned and table_in_mount and table_in_mount.format == "DELTA":
delta_log_folders[root_path] = TableInMount(format=table_in_mount.format, is_partitioned=True)
logger.debug(f"Added {parent_entry.format} table for {root_path} (delta in partitioned)")

if self._is_recursible_dir(file_info):
self._find_delta_log_folders(file_info.path, delta_log_folders)
# if the root_path was already found to be partitioned, we can break to reduce redundant ops
if delta_log_folders[root_path] == TableInMount(format="DELTA", is_partitioned=True):
break

if (
# Happens when first folder was _delta_log and next folders are partitioned folder
parent_entry.format == "DELTA"
and self._is_partitioned(file_info.name)
) or (
# Happens when previous entries where partitioned folders and the current one is delta_log
parent_entry.is_partitioned
and table_in_mount
and table_in_mount.format == "DELTA"
):
delta_log_folders[root_path] = TableInMount(format="DELTA", is_partitioned=True)
logger.debug(f"Added DELTA table for {root_path} (partitioned delta)")

self._recurse_dir_if_needed(file_info, delta_log_folders)

elif self._is_partitioned(file_info.name):
partition_format = self._find_partition_file_format(file_info.path)
if partition_format:
delta_log_folders[root_path] = partition_format
logger.debug(f"Added {partition_format.format} table for {root_path} (partitioned)")
self._handle_partitioned_file(file_info.path, root_path, delta_log_folders)

elif not table_in_mount:
if self._is_recursible_dir(file_info):
self._find_delta_log_folders(file_info.path, delta_log_folders)
self._recurse_dir_if_needed(file_info, delta_log_folders)

elif table_in_mount.format == "DELTA" and file_info.name == "_delta_log/":
delta_log_folders[root_path] = table_in_mount
Expand All @@ -614,11 +608,20 @@ def _find_delta_log_folders(
else:
delta_log_folders[root_path] = table_in_mount
logger.debug(f"Added {table_in_mount.format} table for {root_path} (general)")
if self._is_recursible_dir(file_info):
self._find_delta_log_folders(file_info.path, delta_log_folders)
self._recurse_dir_if_needed(file_info, delta_log_folders)

return delta_log_folders

def _recurse_dir_if_needed(self, file_info: FileInfo, delta_log_folders: dict[str, TableInMount]) -> None:
if self._is_recursible_dir(file_info):
self._find_delta_log_folders(file_info.path, delta_log_folders)

def _handle_partitioned_file(self, file_path, root_path, delta_log_folders):
partition_format = self._find_partition_file_format(file_path)
if partition_format:
delta_log_folders[root_path] = partition_format
logger.debug(f"Added {partition_format.format} table for {root_path} (partitioned)")

def _find_partition_file_format(self, root_dir: str) -> TableInMount | None:
logger.info(f"Listing partitioned file {root_dir}")
file_infos = self._dbutils.fs.ls(root_dir)
Expand All @@ -645,6 +648,23 @@ def _assess_path(self, file_info: FileInfo) -> TableInMount | None:
return TableInMount(format="PARQUET", is_partitioned=False)
return None

def _is_recursible_dir(self, file_info: FileInfo) -> bool:
# Rules for recursing into a folder
# - should be size 0 (usually a directory)
# - should not be the _delta_log directory
# - file should not be partitioned
# - file name should not start with 'part-'
# - no brackets, brackets are not allowed in dbutils.fs.ls
# - is not a streaming checkpoint dir
return (
file_info.size == 0
and file_info.name != "_delta_log/"
and not self._is_partitioned(file_info.name)
and not file_info.name.startswith("part-")
and not any(char in file_info.name for char in "[]")
and not self._is_streaming_checkpoint_dir(file_info)
)

@staticmethod
def _is_partitioned(file_name: str) -> bool:
return '=' in file_name
Expand All @@ -670,20 +690,3 @@ def _is_irrelevant(self, file_name: str) -> bool:
def _is_streaming_checkpoint_dir(self, file_info: FileInfo) -> bool:
path_dirs = [file.name for file in self._dbutils.fs.ls(file_info.path) if file.size == 0]
return 'commits/' in path_dirs and 'offsets/' in path_dirs

def _is_recursible_dir(self, file_info: FileInfo) -> bool:
# Rules for recursing into a folder
# - should be size 0 (usually a directory)
# - should not be the _delta_log directory
# - file should not be partitioned
# - file name should not start with part-
# - brackets are not allowed in dbutils.fs.ls
# - is not a streaming checkpoint dir
return (
file_info.size == 0
and file_info.name != "_delta_log/"
and not self._is_partitioned(file_info.name)
and not file_info.name.startswith("part-")
and not any([char in file_info.name for char in "[]"])
and not self._is_streaming_checkpoint_dir(file_info)
)

0 comments on commit 53c0b9f

Please sign in to comment.