Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: More verbose error message #187

Merged
merged 3 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ dist/
build/
.vscode/
.DS_Store
/*.egg-info
/.hypothesis/
62 changes: 54 additions & 8 deletions src/openneuro/_download.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,30 @@
"""Openneuro download module.

The flow is roughly:

download
_get_download_metadata
_get_download_metadata
_check_snapshot_exists
_safe_query
_check_snapshot_exists ...
_get_local_tag
_match_include_exclude
_iterate_filenames
_match_include_exclude
_get_download_metadata ...
_download_files
_download_file
_retry_download
_download_file ...
_retrieve_and_write_to_disk
"""

import asyncio
import fnmatch
import hashlib
import json
import shlex
import string
import sys
from collections.abc import Generator, Iterable
Expand Down Expand Up @@ -249,6 +272,7 @@ async def _download_file(
max_retries: int,
retry_backoff: float,
semaphore: asyncio.Semaphore,
query_str: str,
) -> None:
"""Download an individual file."""
if outfile.exists():
Expand Down Expand Up @@ -282,12 +306,11 @@ async def _download_file(
max_retries=max_retries,
retry_backoff=retry_backoff,
semaphore=semaphore,
query_str=query_str,
)
return
else:
raise RuntimeError(
f"Timeout when trying to download " f"{outfile}."
)
raise RuntimeError(f"Timeout when trying to download {outfile}.")

# Try to get the S3 MD5 hash for the file.
try:
Expand Down Expand Up @@ -380,12 +403,19 @@ async def _download_file(
max_retries=max_retries,
retry_backoff=retry_backoff,
semaphore=semaphore,
query_str=query_str,
)
return
else:
raise RuntimeError(
f"Error {response.status_code} when trying "
f"to download {outfile} from {url}"
f"Error {response.status_code} when trying to download "
f"{outfile}. If this is unexpected:\n\n"
"1. Navigate to https://openneuro.org/crn/graphql\n"
f"2. Enter and run the operation: `{query_str}`\n"
'3. In the Response, try to manually download the "urls" '
f'for "{outfile.name}", which should contain {url}\n\n'
"If the download fails, open a GitHub issue like "
"https://github.com/OpenNeuroOrg/openneuro/issues/3145"
)

await _retrieve_and_write_to_disk(
Expand All @@ -410,6 +440,7 @@ async def _download_file(
max_retries=max_retries,
retry_backoff=retry_backoff,
semaphore=semaphore,
query_str=query_str,
)
return
else:
Expand All @@ -428,6 +459,7 @@ async def _retry_download(
max_retries: int,
retry_backoff: float,
semaphore: asyncio.Semaphore,
query_str: str,
) -> None:
tqdm.write(
_unicode(
Expand All @@ -449,6 +481,7 @@ async def _retry_download(
max_retries=max_retries,
retry_backoff=retry_backoff,
semaphore=semaphore,
query_str=query_str,
)


Expand Down Expand Up @@ -542,6 +575,7 @@ async def _download_files(
max_retries: int,
retry_backoff: float,
max_concurrent_downloads: int,
query_str: str,
) -> None:
"""Download files, one by one."""
# Semaphore (counter) to limit maximum number of concurrent download
Expand All @@ -556,6 +590,10 @@ async def _download_files(

outfile = target_dir / filename
outfile.parent.mkdir(parents=True, exist_ok=True)
this_query_str = string.Template(query_str).substitute(
tree=f'"{file["parent_tree"]}"',
)
this_query_str = " ".join(shlex.split(this_query_str, posix=False))
download_task = _download_file(
url=url,
api_file_size=api_file_size,
Expand All @@ -565,6 +603,7 @@ async def _download_files(
max_retries=max_retries,
retry_backoff=retry_backoff,
semaphore=semaphore,
query_str=this_query_str,
)
download_tasks.append(download_task)

Expand Down Expand Up @@ -627,10 +666,12 @@ def _iterate_filenames(
max_retries: int,
root: str = "",
include: Iterable[str] = tuple(),
parent_tree: str | None,
) -> Generator[dict[str, Any], None, None]:
"""Iterate over all files in a dataset, yielding filenames."""
directories = list()
for entity in files:
entity["parent_tree"] = parent_tree
if root:
entity["filename"] = f'{root}/{entity["filename"]}'
if entity["directory"]:
Expand Down Expand Up @@ -683,16 +724,15 @@ def _iterate_filenames(
max_retries=max_retries,
check_snapshot=False,
)
dir_iterator = _iterate_filenames(
yield from _iterate_filenames(
metadata["files"],
dataset_id=dataset_id,
tag=tag,
max_retries=max_retries,
root=this_dir,
include=include,
parent_tree=directory["id"],
)
for path in dir_iterator:
yield path


def _match_include_exclude(
Expand Down Expand Up @@ -830,6 +870,7 @@ def download(
tag=tag,
max_retries=max_retries,
include=include,
parent_tree=None,
),
desc=_unicode(f"Traversing directories for {dataset}", end="", emoji="📁"),
unit=" entities",
Expand Down Expand Up @@ -884,6 +925,10 @@ def download(
)
tqdm.write(_unicode(msg, emoji="📥", end=""))

query_str = snapshot_query_template.safe_substitute(
tag=tag or "null",
dataset_id=dataset,
)
coroutine = _download_files(
target_dir=target_dir,
files=files,
Expand All @@ -892,6 +937,7 @@ def download(
max_retries=max_retries,
retry_backoff=retry_backoff,
max_concurrent_downloads=max_concurrent_downloads,
query_str=query_str,
)

# Try to re-use event loop if it already exists. This is required e.g.
Expand Down