From ece0bb4789083f6df5b9ac0f069339c67f3f1c22 Mon Sep 17 00:00:00 2001 From: Ruslan Kuprieiev Date: Tue, 3 Oct 2023 01:34:58 +0300 Subject: [PATCH] adlfs: add support for timeout/connection_timeout/read_timeout Related #303 #338 Related https://github.com/iterative/dvc-azure/issues/42 --- adlfs/spec.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/adlfs/spec.py b/adlfs/spec.py index 8c3d4cc1..5345e6f7 100644 --- a/adlfs/spec.py +++ b/adlfs/spec.py @@ -173,6 +173,14 @@ class AzureBlobFileSystem(AsyncFileSystem): max_concurrency: The number of concurrent connections to use when uploading or downloading a blob. If None it will be inferred from fsspec.asyn._get_batch_size(). + timeout: int + Sets the server-side timeout when uploading or downloading a blob. + connection_timeout: int + The number of seconds the client will wait to establish a connection to the server + when uploading or downloading a blob. + read_timeout: int + The number of seconds the client will wait, between consecutive read operations, + for a response from the server while uploading or downloading a blob. Pass on to fsspec: @@ -237,6 +245,9 @@ def __init__( version_aware: bool = False, assume_container_exists: Optional[bool] = None, max_concurrency: Optional[int] = None, + timeout: Optional[int] = None, + connection_timeout: Optional[int] = None, + read_timeout: Optional[int] = None, **kwargs, ): super_kwargs = { @@ -270,6 +281,15 @@ def __init__( self.default_fill_cache = default_fill_cache self.default_cache_type = default_cache_type self.version_aware = version_aware + + self._timeout_kwargs = {} + if timeout is not None: + self._timeout_kwargs["timeout"] = timeout + if connection_timeout is not None: + self._timeout_kwargs["connection_timeout"] = connection_timeout + if read_timeout is not None: + self._timeout_kwargs["read_timeout"] = read_timeout + if ( self.credential is None and self.account_key is None @@ -1347,6 +1367,7 @@ async def _pipe_file( overwrite=overwrite, metadata={"is_directory": "false"}, max_concurrency=max_concurrency or self.max_concurrency, + **self._timeout_kwargs, **kwargs, ) self.invalidate_cache(self._parent(path)) @@ -1379,6 +1400,7 @@ async def _cat_file( length=length, version_id=version_id, max_concurrency=max_concurrency or self.max_concurrency, + **self._timeout_kwargs, ) except ResourceNotFoundError as e: raise FileNotFoundError from e @@ -1557,6 +1579,7 @@ async def _put_file( "upload_stream_current", callback ), max_concurrency=max_concurrency or self.max_concurrency, + **self._timeout_kwargs, ) self.invalidate_cache() except ResourceExistsError: @@ -1633,6 +1656,7 @@ async def _get_file( ), version_id=version_id, max_concurrency=max_concurrency or self.max_concurrency, + **self._timeout_kwargs, ) with open(lpath, "wb") as my_blob: await stream.readinto(my_blob) @@ -2048,6 +2072,7 @@ async def _async_upload_chunk(self, final: bool = False, **kwargs): length=length, blob_type=BlobType.AppendBlob, metadata=self.metadata, + **self._timeout_kwargs, ) else: raise ValueError(