Skip to content

Commit

Permalink
Add optional flag for enabling synchronous filesystem patching; add s…
Browse files Browse the repository at this point in the history
…upport for http case
  • Loading branch information
moradology committed May 22, 2024
1 parent 1809077 commit eca4faa
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
5 changes: 3 additions & 2 deletions pangeo_forge_recipes/openers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def open_url(
cache: Optional[CacheFSSpecTarget] = None,
secrets: Optional[Dict] = None,
open_kwargs: Optional[Dict] = None,
fsspec_sync_patch: bool = False
) -> OpenFileType:
"""Open a string-based URL with fsspec.
Expand All @@ -29,10 +30,10 @@ def open_url(
kw = open_kwargs or {}
if cache is not None:
# this has side effects
cache.cache_file(url, secrets, **kw)
cache.cache_file(url, secrets, fsspec_sync_patch **kw)
open_file = cache.open_file(url, mode="rb")
else:
open_file = _get_opener(url, secrets, **kw)
open_file = _get_opener(url, secrets, fsspec_sync_patch, **kw)
return open_file


Expand Down
24 changes: 14 additions & 10 deletions pangeo_forge_recipes/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@
OpenFileType = Union[fsspec.core.OpenFile, fsspec.spec.AbstractBufferedFile, io.IOBase]


def _get_url_size(fname, secrets, **open_kwargs):
with _get_opener(fname, secrets, **open_kwargs) as of:
size = of.size
return size


def _copy_btw_filesystems(input_opener, output_opener, BLOCK_SIZE=10_000_000):
with input_opener as source:
with output_opener as target:
Expand Down Expand Up @@ -192,18 +186,19 @@ def _full_path(self, path: str) -> str:
class CacheFSSpecTarget(FlatFSSpecTarget):
"""Alias for FlatFSSpecTarget"""

def cache_file(self, fname: str, secrets: Optional[dict], **open_kwargs) -> None:
def cache_file(self, fname: str, secrets: Optional[dict], fsspec_sync_patch=False, **open_kwargs) -> None:
# check and see if the file already exists in the cache
logger.info(f"Caching file '{fname}'")
input_opener = _get_opener(fname, secrets, fsspec_sync_patch, **open_kwargs)

if self.exists(fname):
cached_size = self.size(fname)
remote_size = _get_url_size(fname, secrets, **open_kwargs)
remote_size = input_opener.size
if cached_size == remote_size:
# TODO: add checksumming here
logger.info(f"File '{fname}' is already cached")
return

input_opener = _get_opener(fname, secrets, **open_kwargs)
target_opener = self.open(fname, mode="wb")
logger.info(f"Copying remote file '{fname}' to cache")
_copy_btw_filesystems(input_opener, target_opener)
Expand All @@ -228,7 +223,16 @@ def _add_query_string_secrets(fname: str, secrets: dict) -> str:
return urlunparse(parsed)


def _get_opener(fname, secrets, **open_kwargs):
def _get_opener(fname, secrets, fsspec_sync_patch, **open_kwargs):
if fsspec_sync_patch:
logger.debug("Attempting to enable synchronous filesystem implementations in FSSpec")
try:
from httpfs_sync.core import SyncHTTPFileSystem
SyncHTTPFileSystem.overwrite_async_registration()
logger.debug("Synchronous HTTP implementation enabled.")
except ImportError:
logger.warning("httpfs_sync could not be imported. Falling back to default (async) http implementation.")

fname = fname if not secrets else _add_query_string_secrets(fname, secrets)
return fsspec.open(fname, mode="rb", **open_kwargs)

Expand Down
8 changes: 7 additions & 1 deletion pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,17 @@ class OpenURLWithFSSpec(beam.PTransform):
:param secrets: If provided these secrets will be injected into the URL as a query string.
:param open_kwargs: Extra arguments passed to fsspec.open.
:param max_concurrency: Max concurrency for this transform.
:param fsspec_sync_patch: Experimental. Likely slower. When enabled, this attempts to
replace asynchronous code with synchronous implementations to potentially address
deadlocking issues. cf. https://github.com/h5py/h5py/issues/2019
"""

cache: Optional[str | CacheFSSpecTarget] = None
secrets: Optional[dict] = None
open_kwargs: Optional[dict] = None
max_concurrency: Optional[int] = None
fsspec_sync_patch: bool = False


def expand(self, pcoll):
if isinstance(self.cache, str):
Expand All @@ -161,12 +166,13 @@ def expand(self, pcoll):
kws = dict(
cache=cache,
secrets=self.secrets,
fsspec_sync_patch=self.sync_patch,
open_kwargs=self.open_kwargs,
)
return pcoll | MapWithConcurrencyLimit(
open_url,
kwargs=kws,
max_concurrency=self.max_concurrency,
max_concurrency=self.max_concurrency
)


Expand Down

0 comments on commit eca4faa

Please sign in to comment.