Skip to content

Commit

Permalink
Exception handling for file permissions and file not found events
Browse files Browse the repository at this point in the history
  • Loading branch information
john-corcoran authored Jul 27, 2021
1 parent 6ddb3a3 commit a46ad0b
Showing 1 changed file with 52 additions and 10 deletions.
62 changes: 52 additions & 10 deletions ia_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,18 @@ def bytes_filesize_to_readable_str(bytes_filesize: int) -> str:

def file_paths_in_folder(folder_path: str) -> typing.List[str]:
"""Return sorted list of paths of files at a directory (and its subdirectories)"""
log = logging.getLogger(__name__)

def walk_error(os_error: OSError) -> None:
"""Log any errors occurring during os.walk"""
log.warning(
"'%s' could not be accessed during folder scanning - any contents will not be"
" processed. Try running script as admin",
os_error.filename,
)

file_paths = []
for root, _, file_names in os.walk(folder_path):
for root, _, file_names in os.walk(folder_path, onerror=walk_error):
for name in file_names:
file_paths.append(os.path.join(root, name))
return sorted(file_paths)
Expand Down Expand Up @@ -172,6 +182,7 @@ def get_metadata_from_files_in_folder(
relative_paths_from_ia_metadata: typing.Optional[typing.List[str]] = None,
) -> typing.Dict[str, str]:
"""Return dict of file paths and metadata of files at a directory (and its subdirectories)"""
log = logging.getLogger(__name__)
results = {} # type: typing.Dict[str, str]
if relative_paths_from_ia_metadata is not None:
file_paths = [
Expand All @@ -182,17 +193,33 @@ def get_metadata_from_files_in_folder(
file_paths = file_paths_in_folder(folder_path)
if hash_flag:
for file_path in tqdm.tqdm(file_paths):
if os.path.isfile(file_path):
md5 = md5_hash_file(file_path)
results[
os.path.normpath(os.path.relpath(file_path, folder_path))
] = md5.lower().strip()
if os.path.isfile(file_path): # We will alert on this elsewhere if the file isn't found
try:
md5 = md5_hash_file(file_path)
results[
os.path.normpath(os.path.relpath(file_path, folder_path))
] = md5.lower().strip()
except (PermissionError, OSError):
log.warning(
"PermissionError/OSError occurred when accessing file '%s' - try running "
"script as admin",
file_path,
)
else:
# Return file sizes if we're not checking hash values
for file_path in file_paths:
if os.path.isfile(file_path):
file_size = os.path.getsize(file_path)
results[os.path.normpath(os.path.relpath(file_path, folder_path))] = str(file_size)
if os.path.isfile(file_path): # We will alert on this elsewhere if the file isn't found
try:
file_size = os.path.getsize(file_path)
results[os.path.normpath(os.path.relpath(file_path, folder_path))] = str(
file_size
)
except (PermissionError, OSError):
log.warning(
"PermissionError/OSError occurred when accessing file '%s' - try running "
"script as admin",
file_path,
)
return results


Expand Down Expand Up @@ -222,7 +249,22 @@ def check_hash(file_path: str, md5_value_from_ia: str) -> typing.Tuple[str, str]
hash check of a file
"""
md5_value_local = md5_hash_file(file_path)
try:
md5_value_local = md5_hash_file(file_path)
except FileNotFoundError:
return (
"warning",
"'{}' file seems to have been deleted before hashing could complete".format(
os.path.basename(file_path)
),
)
except (PermissionError, OSError):
return (
"warning",
"PermissionError/OSError when attempting to hash '{}'".format(
os.path.basename(file_path)
),
)
if md5_value_local.lower().strip() == md5_value_from_ia.lower().strip():
return (
"debug",
Expand Down

0 comments on commit a46ad0b

Please sign in to comment.