Skip to content

Commit

Permalink
adlfs: add support for timeout/connection_timeout/read_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop committed Oct 2, 2023
1 parent 01e91b1 commit f106578
Showing 1 changed file with 17 additions and 0 deletions.
17 changes: 17 additions & 0 deletions adlfs/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ def __init__(
version_aware: bool = False,
assume_container_exists: Optional[bool] = None,
max_concurrency: Optional[int] = None,
timeout: Optional[int] = None,
connection_timeout: Optional[int] = None,
read_timeout: Optional[int] = None,
**kwargs,
):
super_kwargs = {
Expand Down Expand Up @@ -270,6 +273,15 @@ def __init__(
self.default_fill_cache = default_fill_cache
self.default_cache_type = default_cache_type
self.version_aware = version_aware

self._timeout_kwargs = {}
if timeout is not None:
self._timeout_kwargs["timeout"] = timeout
if connection_timeout is not None:
self._timeout_kwargs["connection_timeout"] = connection_timeout
if read_timeout is not None:
self._timeout_kwargs["read_timeout"] = read_timeout

if (
self.credential is None
and self.account_key is None
Expand Down Expand Up @@ -1347,6 +1359,7 @@ async def _pipe_file(
overwrite=overwrite,
metadata={"is_directory": "false"},
max_concurrency=max_concurrency or self.max_concurrency,
**self._timeout_kwargs,
**kwargs,
)
self.invalidate_cache(self._parent(path))
Expand Down Expand Up @@ -1379,6 +1392,7 @@ async def _cat_file(
length=length,
version_id=version_id,
max_concurrency=max_concurrency or self.max_concurrency,
**self._timeout_kwargs,
)
except ResourceNotFoundError as e:
raise FileNotFoundError from e
Expand Down Expand Up @@ -1557,6 +1571,7 @@ async def _put_file(
"upload_stream_current", callback
),
max_concurrency=max_concurrency or self.max_concurrency,
**self._timeout_kwargs,
)
self.invalidate_cache()
except ResourceExistsError:
Expand Down Expand Up @@ -1633,6 +1648,7 @@ async def _get_file(
),
version_id=version_id,
max_concurrency=max_concurrency or self.max_concurrency,
**self._timeout_kwargs,
)
with open(lpath, "wb") as my_blob:
await stream.readinto(my_blob)
Expand Down Expand Up @@ -2048,6 +2064,7 @@ async def _async_upload_chunk(self, final: bool = False, **kwargs):
length=length,
blob_type=BlobType.AppendBlob,
metadata=self.metadata,
**self._timeout_kwargs,
)
else:
raise ValueError(
Expand Down

0 comments on commit f106578

Please sign in to comment.