diff --git a/s3fs/core.py b/s3fs/core.py index ff96252b..d7686ec7 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -661,11 +661,6 @@ async def _lsdir( prefix="", versions=False, ): - if versions and not self.version_aware: - raise ValueError( - "versions cannot be specified if the filesystem is not version aware" - ) - bucket, key, _ = self.split_path(path) if not prefix: prefix = "" @@ -674,53 +669,20 @@ async def _lsdir( if path not in self.dircache or refresh or not delimiter or versions: try: logger.debug("Get directory listing page for %s" % path) - await self.set_session() - s3 = await self.get_s3(bucket) - if self.version_aware: - method = "list_object_versions" - contents_key = "Versions" - else: - method = "list_objects_v2" - contents_key = "Contents" - pag = s3.get_paginator(method) - config = {} - if max_items is not None: - config.update(MaxItems=max_items, PageSize=2 * max_items) - it = pag.paginate( - Bucket=bucket, - Prefix=prefix, - Delimiter=delimiter, - PaginationConfig=config, - **self.req_kw, - ) + dirs = [] files = [] - dircache = [] - async for i in it: - dircache.extend(i.get("CommonPrefixes", [])) - for c in i.get(contents_key, []): - if not self.version_aware or c.get("IsLatest") or versions: - c["type"] = "file" - c["size"] = c["Size"] - files.append(c) - if dircache: - files.extend( - [ - { - "Key": l["Prefix"][:-1], - "Size": 0, - "StorageClass": "DIRECTORY", - "type": "directory", - "size": 0, - } - for l in dircache - ] - ) - for f in files: - f["Key"] = "/".join([bucket, f["Key"]]) - f["name"] = f["Key"] - version_id = f.get("VersionId") - if versions and version_id and version_id != "null": - f["name"] += f"?versionId={version_id}" + async for c in self._iterdir( + bucket, + max_items=max_items, + delimiter=delimiter, + prefix=prefix, + versions=versions, + ): + if c["type"] == "directory": + dirs.append(c) + else: + files.append(c) + files += dirs except ClientError as e: raise translate_boto_error(e) @@ -729,6 +691,62 @@ async def _lsdir( return files return self.dircache[path] + async def _iterdir( + self, bucket, max_items=None, delimiter="/", prefix="", versions=False + ): + """Iterate asynchronously over files and directories under `prefix`. + + The contents are yielded in arbitrary order as info dicts. + """ + if versions and not self.version_aware: + raise ValueError( + "versions cannot be specified if the filesystem is not version aware" + ) + await self.set_session() + s3 = await self.get_s3(bucket) + if self.version_aware: + method = "list_object_versions" + contents_key = "Versions" + else: + method = "list_objects_v2" + contents_key = "Contents" + pag = s3.get_paginator(method) + config = {} + if max_items is not None: + config.update(MaxItems=max_items, PageSize=2 * max_items) + it = pag.paginate( + Bucket=bucket, + Prefix=prefix, + Delimiter=delimiter, + PaginationConfig=config, + **self.req_kw, + ) + async for i in it: + for l in i.get("CommonPrefixes", []): + c = { + "Key": l["Prefix"][:-1], + "Size": 0, + "StorageClass": "DIRECTORY", + "type": "directory", + } + self._fill_info(c, bucket, versions=False) + yield c + for c in i.get(contents_key, []): + if not self.version_aware or c.get("IsLatest") or versions: + c["type"] = "file" + c["size"] = c["Size"] + self._fill_info(c, bucket, versions=versions) + yield c + + @staticmethod + def _fill_info(f, bucket, versions=False): + f["size"] = f["Size"] + f["Key"] = "/".join([bucket, f["Key"]]) + f["name"] = f["Key"] + version_id = f.get("VersionId") + if versions and version_id and version_id != "null": + f["name"] += f"?versionId={version_id}" + async def _glob(self, path, **kwargs): if path.startswith("*"): raise ValueError("Cannot traverse all of S3")