Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update RemoteStore.__str__ and add UPath tests #1964

Merged
merged 9 commits into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ build.hooks.vcs.version-file = "src/zarr/_version.py"
[tool.hatch.envs.test]
dependencies = [
"numpy~={matrix:numpy}",
"universal_pathlib"
]
extra-dependencies = [
"coverage",
Expand Down
17 changes: 11 additions & 6 deletions src/zarr/store/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class RemoteStore(Store):
supports_listing: bool = True

_fs: AsyncFileSystem
_url: str
path: str
allowed_exceptions: tuple[type[Exception], ...]

Expand All @@ -51,18 +52,22 @@ def __init__(
"""

super().__init__(mode=mode)

if isinstance(url, str):
self._fs, self.path = fsspec.url_to_fs(url, **storage_options)
self._url = url.rstrip("/")
self._fs, _path = fsspec.url_to_fs(url, **storage_options)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a case where url has no trailing "/", but the return fom url_to_fs does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not aware of any, but I wanted to ensure that the UPath and non-UPath code paths do the exact same thing to the url string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be misunderstanding the question...

self.path = _path.rstrip("/")
elif hasattr(url, "protocol") and hasattr(url, "fs"):
# is UPath-like - but without importing
if storage_options:
raise ValueError(
"If constructed with a UPath object, no additional "
"storage_options are allowed"
)
self.path = url.path
self._fs = url._fs
# n.b. UPath returns the url and path attributes with a trailing /, at least for s3
# that trailing / must be removed to compose with the store interface
self._url = str(url).rstrip("/")
self.path = url.path.rstrip("/")
self._fs = url.fs
else:
raise ValueError("URL not understood, %s", url)
self.allowed_exceptions = allowed_exceptions
Expand All @@ -71,10 +76,10 @@ def __init__(
raise TypeError("FileSystem needs to support async operations")

def __str__(self) -> str:
return f"Remote fsspec store: {type(self._fs).__name__} , {self.path}"
return f"{self._url}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are implying that this store is the only way to access the URL displayed here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but this is consistent with our current practice for memory store and local store. Yes, this practice does mean that RemoteStore for local files will have the same str output as LocalStore, but I figure we should make the stores consistent before making them correct.


def __repr__(self) -> str:
return f"<RemoteStore({type(self._fs).__name__} , {self.path})>"
return f"<RemoteStore({type(self._fs).__name__}, {self.path})>"

async def get(
self,
Expand Down
77 changes: 57 additions & 20 deletions tests/v3/test_store/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import fsspec
import pytest
from upath import UPath
d-v-b marked this conversation as resolved.
Show resolved Hide resolved

from zarr.buffer import Buffer, default_buffer_prototype
from zarr.store import RemoteStore
from zarr.sync import sync
from zarr.testing.store import StoreTests

s3fs = pytest.importorskip("s3fs")
Expand All @@ -16,7 +18,7 @@
test_bucket_name = "test"
secure_bucket_name = "test-secure"
port = 5555
endpoint_uri = f"http://127.0.0.1:{port}/"
endpoint_url = f"http://127.0.0.1:{port}/"


@pytest.fixture(scope="module")
Expand All @@ -40,18 +42,33 @@ def get_boto3_client():

# NB: we use the sync botocore client for setup
session = Session()
return session.create_client("s3", endpoint_url=endpoint_uri)
return session.create_client("s3", endpoint_url=endpoint_url)


@pytest.fixture(autouse=True, scope="function")
def s3(s3_base):
"""
Quoting Martin Durant:
pytest-asyncio creates a new event loop for each async test.
When an async-mode s3fs instance is made from async, it will be assigned to the loop from
which it is made. That means that if you use s3fs again from a subsequent test,
you will have the same identical instance, but be running on a different loop - which fails.

For the rest: it's very convenient to clean up the state of the store between tests,
make sure we start off blank each time.

https://github.com/zarr-developers/zarr-python/pull/1785#discussion_r1634856207
"""
client = get_boto3_client()
client.create_bucket(Bucket=test_bucket_name, ACL="public-read")
s3fs.S3FileSystem.clear_instance_cache()
s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri})
s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_url})
session = sync(s3.set_session())
s3.invalidate_cache()
yield s3
requests.post(f"{endpoint_uri}/moto-api/reset")
requests.post(f"{endpoint_url}/moto-api/reset")
client.close()
d-v-b marked this conversation as resolved.
Show resolved Hide resolved
sync(session.close())


# ### end from s3fs ### #
Expand All @@ -65,7 +82,7 @@ async def alist(it):


async def test_basic():
store = RemoteStore(f"s3://{test_bucket_name}", mode="w", endpoint_url=endpoint_uri, anon=False)
store = RemoteStore(f"s3://{test_bucket_name}", mode="w", endpoint_url=endpoint_url, anon=False)
assert not await alist(store.list())
assert not await store.exists("foo")
data = b"hello"
Expand All @@ -81,31 +98,51 @@ async def test_basic():
class TestRemoteStoreS3(StoreTests[RemoteStore]):
store_cls = RemoteStore

@pytest.fixture(scope="function")
def store_kwargs(self) -> dict[str, str | bool]:
return {
"mode": "w",
"endpoint_url": endpoint_uri,
"anon": False,
"url": f"s3://{test_bucket_name}",
}
@pytest.fixture(scope="function", params=("use_upath", "use_str"))
def store_kwargs(self, request) -> dict[str, str | bool]:
url = f"s3://{test_bucket_name}"
anon = False
mode = "w"
if request.param == "use_upath":
return {"mode": mode, "url": UPath(url, endpoint_url=endpoint_url, anon=anon)}
elif request.param == "use_str":
return {"url": url, "mode": mode, "anon": anon, "endpoint_url": endpoint_url}

raise AssertionError

@pytest.fixture(scope="function")
def store(self, store_kwargs: dict[str, str | bool]) -> RemoteStore:
self._fs, _ = fsspec.url_to_fs(asynchronous=False, **store_kwargs)
out = self.store_cls(asynchronous=True, **store_kwargs)
url = store_kwargs["url"]
mode = store_kwargs["mode"]
if isinstance(url, UPath):
out = self.store_cls(url=url, mode=mode)
else:
endpoint_url = store_kwargs["endpoint_url"]
out = self.store_cls(url=url, asynchronous=True, mode=mode, endpoint_url=endpoint_url)
return out

def get(self, store: RemoteStore, key: str) -> Buffer:
return Buffer.from_bytes(self._fs.cat(f"{store.path}/{key}"))
# make a new, synchronous instance of the filesystem because this test is run in sync code
fs, _ = fsspec.url_to_fs(
d-v-b marked this conversation as resolved.
Show resolved Hide resolved
url=store._url,
asynchronous=False,
anon=store._fs.anon,
endpoint_url=store._fs.endpoint_url,
)
return Buffer.from_bytes(fs.cat(f"{store.path}/{key}"))

def set(self, store: RemoteStore, key: str, value: Buffer) -> None:
self._fs.write_bytes(f"{store.path}/{key}", value.to_bytes())
# make a new, synchronous instance of the filesystem because this test is run in sync code
fs, _ = fsspec.url_to_fs(
url=store._url,
asynchronous=False,
anon=store._fs.anon,
endpoint_url=store._fs.endpoint_url,
)
fs.write_bytes(f"{store.path}/{key}", value.to_bytes())

def test_store_repr(self, store: RemoteStore) -> None:
rep = str(store)
assert "fsspec" in rep
assert store.path in rep
assert str(store) == f"s3://{test_bucket_name}"

def test_store_supports_writes(self, store: RemoteStore) -> None:
assert True
Expand Down