From 1104aea518a4f3662ad3bb35c2b38b229186d3c4 Mon Sep 17 00:00:00 2001 From: Ronan Lamy Date: Wed, 30 Nov 2022 13:07:53 +0000 Subject: [PATCH 1/5] Extract _do_lsdir() from _lsdir() --- s3fs/core.py | 104 ++++++++++++++++++++++++++++----------------------- 1 file changed, 58 insertions(+), 46 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index ff96252b..20446140 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -674,53 +674,13 @@ async def _lsdir( if path not in self.dircache or refresh or not delimiter or versions: try: logger.debug("Get directory listing page for %s" % path) - await self.set_session() - s3 = await self.get_s3(bucket) - if self.version_aware: - method = "list_object_versions" - contents_key = "Versions" - else: - method = "list_objects_v2" - contents_key = "Contents" - pag = s3.get_paginator(method) - config = {} - if max_items is not None: - config.update(MaxItems=max_items, PageSize=2 * max_items) - it = pag.paginate( - Bucket=bucket, - Prefix=prefix, - Delimiter=delimiter, - PaginationConfig=config, - **self.req_kw, + files = await self._do_lsdir( + bucket, + max_items=max_items, + delimiter=delimiter, + prefix=prefix, + versions=versions, ) - files = [] - dircache = [] - async for i in it: - dircache.extend(i.get("CommonPrefixes", [])) - for c in i.get(contents_key, []): - if not self.version_aware or c.get("IsLatest") or versions: - c["type"] = "file" - c["size"] = c["Size"] - files.append(c) - if dircache: - files.extend( - [ - { - "Key": l["Prefix"][:-1], - "Size": 0, - "StorageClass": "DIRECTORY", - "type": "directory", - "size": 0, - } - for l in dircache - ] - ) - for f in files: - f["Key"] = "/".join([bucket, f["Key"]]) - f["name"] = f["Key"] - version_id = f.get("VersionId") - if versions and version_id and version_id != "null": - f["name"] += f"?versionId={version_id}" except ClientError as e: raise translate_boto_error(e) @@ -729,6 +689,58 @@ async def _lsdir( return files return self.dircache[path] + async def _do_lsdir( + self, bucket, max_items=None, delimiter="/", prefix="", versions=False + ): + await self.set_session() + s3 = await self.get_s3(bucket) + if self.version_aware: + method = "list_object_versions" + contents_key = "Versions" + else: + method = "list_objects_v2" + contents_key = "Contents" + pag = s3.get_paginator(method) + config = {} + if max_items is not None: + config.update(MaxItems=max_items, PageSize=2 * max_items) + it = pag.paginate( + Bucket=bucket, + Prefix=prefix, + Delimiter=delimiter, + PaginationConfig=config, + **self.req_kw, + ) + files = [] + dircache = [] + async for i in it: + dircache.extend(i.get("CommonPrefixes", [])) + for c in i.get(contents_key, []): + if not self.version_aware or c.get("IsLatest") or versions: + c["type"] = "file" + c["size"] = c["Size"] + files.append(c) + if dircache: + files.extend( + [ + { + "Key": l["Prefix"][:-1], + "Size": 0, + "StorageClass": "DIRECTORY", + "type": "directory", + "size": 0, + } + for l in dircache + ] + ) + for f in files: + f["Key"] = "/".join([bucket, f["Key"]]) + f["name"] = f["Key"] + version_id = f.get("VersionId") + if versions and version_id and version_id != "null": + f["name"] += f"?versionId={version_id}" + return files + async def _glob(self, path, **kwargs): if path.startswith("*"): raise ValueError("Cannot traverse all of S3") From 36f9a4f15b364468fd53dbe7ee9ca53716bb40fa Mon Sep 17 00:00:00 2001 From: Ronan Lamy Date: Wed, 30 Nov 2022 13:43:31 +0000 Subject: [PATCH 2/5] Refactor _do_lsdir() to do most of the work inside the loop --- s3fs/core.py | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 20446140..7f7cf6e6 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -712,35 +712,35 @@ async def _do_lsdir( **self.req_kw, ) files = [] - dircache = [] + dirs = [] async for i in it: - dircache.extend(i.get("CommonPrefixes", [])) + for l in i.get("CommonPrefixes", []): + c = { + "Key": l["Prefix"][:-1], + "Size": 0, + "StorageClass": "DIRECTORY", + "type": "directory", + } + self._fill_info(c, bucket, versions=False) + dirs.append(c) for c in i.get(contents_key, []): if not self.version_aware or c.get("IsLatest") or versions: c["type"] = "file" c["size"] = c["Size"] + self._fill_info(c, bucket, versions=versions) files.append(c) - if dircache: - files.extend( - [ - { - "Key": l["Prefix"][:-1], - "Size": 0, - "StorageClass": "DIRECTORY", - "type": "directory", - "size": 0, - } - for l in dircache - ] - ) - for f in files: - f["Key"] = "/".join([bucket, f["Key"]]) - f["name"] = f["Key"] - version_id = f.get("VersionId") - if versions and version_id and version_id != "null": - f["name"] += f"?versionId={version_id}" + files += dirs return files + @staticmethod + def _fill_info(f, bucket, versions=False): + f["size"] = f["Size"] + f["Key"] = "/".join([bucket, f["Key"]]) + f["name"] = f["Key"] + version_id = f.get("VersionId") + if versions and version_id and version_id != "null": + f["name"] += f"?versionId={version_id}" + async def _glob(self, path, **kwargs): if path.startswith("*"): raise ValueError("Cannot traverse all of S3") From 6b9c503aa74a982ee9821266a545e2d4240b3d28 Mon Sep 17 00:00:00 2001 From: Ronan Lamy Date: Mon, 5 Dec 2022 14:43:31 +0000 Subject: [PATCH 3/5] Replace _do_lsdir() with async generator _iterdir() --- s3fs/core.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 7f7cf6e6..2de97fc7 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -674,13 +674,20 @@ async def _lsdir( if path not in self.dircache or refresh or not delimiter or versions: try: logger.debug("Get directory listing page for %s" % path) - files = await self._do_lsdir( + dirs = [] + files = [] + async for c in self._iterdir( bucket, max_items=max_items, delimiter=delimiter, prefix=prefix, versions=versions, - ) + ): + if c["type"] == "directory": + dirs.append(c) + else: + files.append(c) + files += dirs except ClientError as e: raise translate_boto_error(e) @@ -689,7 +696,7 @@ async def _lsdir( return files return self.dircache[path] - async def _do_lsdir( + async def _iterdir( self, bucket, max_items=None, delimiter="/", prefix="", versions=False ): await self.set_session() @@ -711,8 +718,6 @@ async def _do_lsdir( PaginationConfig=config, **self.req_kw, ) - files = [] - dirs = [] async for i in it: for l in i.get("CommonPrefixes", []): c = { @@ -722,15 +727,13 @@ async def _do_lsdir( "type": "directory", } self._fill_info(c, bucket, versions=False) - dirs.append(c) + yield c for c in i.get(contents_key, []): if not self.version_aware or c.get("IsLatest") or versions: c["type"] = "file" c["size"] = c["Size"] self._fill_info(c, bucket, versions=versions) - files.append(c) - files += dirs - return files + yield c @staticmethod def _fill_info(f, bucket, versions=False): From 6f3af90ac95e32e6b949b1ce5afc141866aa27b0 Mon Sep 17 00:00:00 2001 From: Ronan Lamy Date: Fri, 9 Dec 2022 12:53:47 +0000 Subject: [PATCH 4/5] Add docstring for _iterdir() --- s3fs/core.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/s3fs/core.py b/s3fs/core.py index 2de97fc7..e2a47161 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -699,6 +699,10 @@ async def _lsdir( async def _iterdir( self, bucket, max_items=None, delimiter="/", prefix="", versions=False ): + """Iterate asynchronously over files and directories under `prefix`. + + The contents are yielded in arbitrary order as info dicts. + """ await self.set_session() s3 = await self.get_s3(bucket) if self.version_aware: From e604a164bb48ffe74173110a649ed04750d20de4 Mon Sep 17 00:00:00 2001 From: Ronan Lamy Date: Fri, 9 Dec 2022 12:55:05 +0000 Subject: [PATCH 5/5] Move 'versions' consistency check from _lsdir() to _iterdir() --- s3fs/core.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index e2a47161..d7686ec7 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -661,11 +661,6 @@ async def _lsdir( prefix="", versions=False, ): - if versions and not self.version_aware: - raise ValueError( - "versions cannot be specified if the filesystem is not version aware" - ) - bucket, key, _ = self.split_path(path) if not prefix: prefix = "" @@ -703,6 +698,10 @@ async def _iterdir( The contents are yielded in arbitrary order as info dicts. """ + if versions and not self.version_aware: + raise ValueError( + "versions cannot be specified if the filesystem is not version aware" + ) await self.set_session() s3 = await self.get_s3(bucket) if self.version_aware: