Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optional support for synchronous HTTP filesystem #748

Merged
merged 6 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pangeo_forge_recipes/openers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def open_url(
cache: Optional[CacheFSSpecTarget] = None,
secrets: Optional[Dict] = None,
open_kwargs: Optional[Dict] = None,
fsspec_sync_patch: bool = False,
) -> OpenFileType:
"""Open a string-based URL with fsspec.

Expand All @@ -29,10 +30,10 @@ def open_url(
kw = open_kwargs or {}
if cache is not None:
# this has side effects
cache.cache_file(url, secrets, **kw)
cache.cache_file(url, secrets, fsspec_sync_patch, **kw)
open_file = cache.open_file(url, mode="rb")
else:
open_file = _get_opener(url, secrets, **kw)
open_file = _get_opener(url, secrets, fsspec_sync_patch, **kw)
return open_file


Expand Down
30 changes: 20 additions & 10 deletions pangeo_forge_recipes/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@
OpenFileType = Union[fsspec.core.OpenFile, fsspec.spec.AbstractBufferedFile, io.IOBase]


def _get_url_size(fname, secrets, **open_kwargs):
with _get_opener(fname, secrets, **open_kwargs) as of:
size = of.size
return size


def _copy_btw_filesystems(input_opener, output_opener, BLOCK_SIZE=10_000_000):
with input_opener as source:
with output_opener as target:
Expand Down Expand Up @@ -192,18 +186,22 @@ def _full_path(self, path: str) -> str:
class CacheFSSpecTarget(FlatFSSpecTarget):
"""Alias for FlatFSSpecTarget"""

def cache_file(self, fname: str, secrets: Optional[dict], **open_kwargs) -> None:
def cache_file(
self, fname: str, secrets: Optional[dict], fsspec_sync_patch=False, **open_kwargs
) -> None:
# check and see if the file already exists in the cache
logger.info(f"Caching file '{fname}'")
input_opener = _get_opener(fname, secrets, fsspec_sync_patch, **open_kwargs)

if self.exists(fname):
cached_size = self.size(fname)
remote_size = _get_url_size(fname, secrets, **open_kwargs)
with input_opener as of:
remote_size = of.size
if cached_size == remote_size:
# TODO: add checksumming here
logger.info(f"File '{fname}' is already cached")
return

input_opener = _get_opener(fname, secrets, **open_kwargs)
target_opener = self.open(fname, mode="wb")
logger.info(f"Copying remote file '{fname}' to cache")
_copy_btw_filesystems(input_opener, target_opener)
Expand All @@ -228,7 +226,19 @@ def _add_query_string_secrets(fname: str, secrets: dict) -> str:
return urlunparse(parsed)


def _get_opener(fname, secrets, **open_kwargs):
def _get_opener(fname, secrets, fsspec_sync_patch, **open_kwargs):
if fsspec_sync_patch:
logger.debug("Attempting to enable synchronous filesystem implementations in FSSpec")
try:
from httpfs_sync.core import SyncHTTPFileSystem

SyncHTTPFileSystem.overwrite_async_registration()
logger.debug("Synchronous HTTP implementation enabled.")
except ImportError:
logger.warning(
"httpfs_sync could not be imported. Falling back to async http implementation."
)

fname = fname if not secrets else _add_query_string_secrets(fname, secrets)
return fsspec.open(fname, mode="rb", **open_kwargs)

Expand Down
5 changes: 5 additions & 0 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,16 @@ class OpenURLWithFSSpec(beam.PTransform):
:param secrets: If provided these secrets will be injected into the URL as a query string.
:param open_kwargs: Extra arguments passed to fsspec.open.
:param max_concurrency: Max concurrency for this transform.
:param fsspec_sync_patch: Experimental. Likely slower. When enabled, this attempts to
replace asynchronous code with synchronous implementations to potentially address
deadlocking issues. cf. https://github.com/h5py/h5py/issues/2019
"""

cache: Optional[str | CacheFSSpecTarget] = None
secrets: Optional[dict] = None
open_kwargs: Optional[dict] = None
max_concurrency: Optional[int] = None
fsspec_sync_patch: bool = False

def expand(self, pcoll):
if isinstance(self.cache, str):
Expand All @@ -161,6 +165,7 @@ def expand(self, pcoll):
kws = dict(
cache=cache,
secrets=self.secrets,
fsspec_sync_patch=self.fsspec_sync_patch,
open_kwargs=self.open_kwargs,
)
return pcoll | MapWithConcurrencyLimit(
Expand Down
Loading