Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IGNORE THIS PR] Run integration tests for PR from fork #3044

Closed
wants to merge 12 commits into from
89 changes: 67 additions & 22 deletions src/databricks/labs/ucx/hive_metastore/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

logger = logging.getLogger(__name__)


_EXTERNAL_FILE_LOCATION_SCHEMES = ("s3", "s3a", "s3n", "gcs", "abfss")


Expand Down Expand Up @@ -566,46 +565,71 @@ def _find_delta_log_folders(
if delta_log_folders is None:
delta_log_folders = {}
logger.info(f"Listing {root_dir}")
file_infos = self._dbutils.fs.ls(root_dir)
file_infos = self._dbutils.fs.ls(root_dir) or []
for file_info in file_infos:
if self._is_irrelevant(file_info.name) or file_info.path == root_dir:
logger.debug(f"Path {file_info.path} is irrelevant")
continue

root_path = os.path.dirname(root_dir)
previous_entry = delta_log_folders.get(root_path)
parent_entry = delta_log_folders.get(root_path)
table_in_mount = self._assess_path(file_info)

if previous_entry:
# Happens when first folder was _delta_log and next folders are partitioned folder
if previous_entry.format == "DELTA" and self._is_partitioned(file_info.name):
delta_log_folders[root_path] = TableInMount(format=previous_entry.format, is_partitioned=True)
# Happens when previous entries where partitioned folders and the current one is delta_log
if previous_entry.is_partitioned and table_in_mount and table_in_mount.format == "DELTA":
delta_log_folders[root_path] = TableInMount(format=table_in_mount.format, is_partitioned=True)
continue
if parent_entry:
# if the root_path was already found to be partitioned, we can break to reduce redundant ops
if delta_log_folders[root_path] == TableInMount(format="DELTA", is_partitioned=True):
break

if self._is_partitioned(file_info.name):
partition_format = self._find_partition_file_format(file_info.path)
if partition_format:
delta_log_folders[root_path] = partition_format
continue
if (
# Happens when first folder was _delta_log and next folders are partitioned folder
parent_entry.format == "DELTA"
and self._is_partitioned(file_info.name)
) or (
# Happens when previous entries where partitioned folders and the current one is delta_log
parent_entry.is_partitioned
and table_in_mount
and table_in_mount.format == "DELTA"
):
delta_log_folders[root_path] = TableInMount(format="DELTA", is_partitioned=True)
logger.debug(f"Added DELTA table for {root_path} (partitioned delta)")

if not table_in_mount:
self._find_delta_log_folders(file_info.path, delta_log_folders)
continue
self._recurse_dir_if_needed(file_info, delta_log_folders)

elif self._is_partitioned(file_info.name):
self._handle_partitioned_file(file_info.path, root_path, delta_log_folders)

elif not table_in_mount:
self._recurse_dir_if_needed(file_info, delta_log_folders)

elif table_in_mount.format == "DELTA" and file_info.name == "_delta_log/":
delta_log_folders[root_path] = table_in_mount
logger.debug(f"Added {table_in_mount.format} table for {root_path} (normal delta)")

else:
delta_log_folders[root_path] = table_in_mount
logger.debug(f"Added {table_in_mount.format} table for {root_path} (general)")
self._recurse_dir_if_needed(file_info, delta_log_folders)

delta_log_folders[root_path] = table_in_mount
return delta_log_folders

def _recurse_dir_if_needed(self, file_info: FileInfo, delta_log_folders: dict[str, TableInMount]) -> None:
if self._is_recursible_dir(file_info):
self._find_delta_log_folders(file_info.path, delta_log_folders)

def _handle_partitioned_file(self, file_path, root_path, delta_log_folders):
partition_format = self._find_partition_file_format(file_path)
if partition_format:
delta_log_folders[root_path] = partition_format
logger.debug(f"Added {partition_format.format} table for {root_path} (partitioned)")

def _find_partition_file_format(self, root_dir: str) -> TableInMount | None:
logger.info(f"Listing {root_dir}")
logger.info(f"Listing partitioned file {root_dir}")
file_infos = self._dbutils.fs.ls(root_dir)
for file_info in file_infos:
path_extension = self._assess_path(file_info)
if path_extension:
return TableInMount(format=path_extension.format, is_partitioned=True)
if self._is_partitioned(file_info.name):
if self._is_partitioned(file_info.name) and file_info.path != root_dir:
return self._find_partition_file_format(file_info.path)
return None

Expand All @@ -624,6 +648,23 @@ def _assess_path(self, file_info: FileInfo) -> TableInMount | None:
return TableInMount(format="PARQUET", is_partitioned=False)
return None

def _is_recursible_dir(self, file_info: FileInfo) -> bool:
# Rules for recursing into a folder
# - should be size 0 (usually a directory)
# - should not be the _delta_log directory
# - file should not be partitioned
# - file name should not start with 'part-'
# - no brackets, brackets are not allowed in dbutils.fs.ls
# - is not a streaming checkpoint dir
return (
file_info.size == 0
and file_info.name != "_delta_log/"
and not self._is_partitioned(file_info.name)
and not file_info.name.startswith("part-")
and not any(char in file_info.name for char in "[]")
and not self._is_streaming_checkpoint_dir(file_info)
)

@staticmethod
def _is_partitioned(file_name: str) -> bool:
return '=' in file_name
Expand All @@ -645,3 +686,7 @@ def _is_json(file_name: str) -> bool:

def _is_irrelevant(self, file_name: str) -> bool:
return any(pattern in file_name for pattern in self._fiter_paths)

def _is_streaming_checkpoint_dir(self, file_info: FileInfo) -> bool:
path_dirs = [file.name for file in self._dbutils.fs.ls(file_info.path) if file.size == 0]
return 'commits/' in path_dirs and 'offsets/' in path_dirs
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/source_code/known.json
Original file line number Diff line number Diff line change
Expand Up @@ -33641,4 +33641,4 @@
"zipp.compat.py310": [],
"zipp.glob": []
}
}
}
Loading
Loading