From 31b9877968b9c21cf860da265a51d3f1f2d30d24 Mon Sep 17 00:00:00 2001 From: Andreas Poehlmann Date: Thu, 8 Feb 2024 20:21:47 +0100 Subject: [PATCH] upath: provide storage_options customization classmethod --- upath/core.py | 130 +++++++++++++++++++++++++------------------------- 1 file changed, 65 insertions(+), 65 deletions(-) diff --git a/upath/core.py b/upath/core.py index e697f1a..4b41eda 100644 --- a/upath/core.py +++ b/upath/core.py @@ -92,6 +92,8 @@ class UPath(PathlibPathShim, Path): _protocol_dispatch: bool | None = None _flavour = FSSpecFlavour() + # === upath.UPath constructor ===================================== + def __new__( cls, *args, protocol: str | None = None, **storage_options: Any ) -> UPath: @@ -113,14 +115,16 @@ def __new__( ) protocol = storage_options.pop("scheme") - # determine which UPath subclass to dispatch to + # determine the protocol pth_protocol = get_upath_protocol( part0, protocol=protocol, storage_options=storage_options ) - upath_cls = get_upath_class(protocol=pth_protocol) - if upath_cls is None: - raise ValueError(f"Unsupported filesystem: {pth_protocol!r}") - if cls._protocol_dispatch is not None and not cls._protocol_dispatch: + # determine which UPath subclass to dispatch to + if cls._protocol_dispatch or cls._protocol_dispatch is None: + upath_cls = get_upath_class(protocol=pth_protocol) + if upath_cls is None: + raise ValueError(f"Unsupported filesystem: {pth_protocol!r}") + else: # user subclasses can request to disable protocol dispatch # by setting MyUPathSubclass._protocol_dispatch to `False`. # This will effectively ignore the registered UPath @@ -194,11 +198,9 @@ def __init__( if isinstance(args0, UPath): self._storage_options = {**args0.storage_options, **storage_options} else: - fs_cls: type[AbstractFileSystem] = get_filesystem_class( - protocol or self._protocol + self._storage_options = type(self)._parse_storage_options( + str(args0), protocol, storage_options ) - pth_storage_options = fs_cls._get_kwargs_from_urls(str(args0)) - self._storage_options = {**pth_storage_options, **storage_options} else: self._storage_options = storage_options.copy() @@ -226,7 +228,31 @@ def __init__( return super().__init__(*args) - # === upath.UPath only ============================================ + # === upath.UPath PUBLIC ADDITIONAL API =========================== + + @property + def protocol(self) -> str: + return self._protocol + + @property + def storage_options(self) -> Mapping[str, Any]: + return MappingProxyType(self._storage_options) + + @property + def fs(self) -> AbstractFileSystem: + try: + return self._fs_cached + except AttributeError: + fs = self._fs_cached = self._fs_factory( + str(self), self.protocol, self.storage_options + ) + return fs + + @property + def path(self) -> str: + return super().__str__() + + # === upath.UPath CUSTOMIZABLE API ================================ @classmethod def _transform_init_args( @@ -238,13 +264,26 @@ def _transform_init_args( """allow customization of init args in subclasses""" return args, protocol, storage_options - @property - def protocol(self) -> str: - return self._protocol + @classmethod + def _parse_storage_options( + cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any] + ) -> dict[str, Any]: + """Parse storage_options from the urlpath""" + fs_cls: type[AbstractFileSystem] = get_filesystem_class(protocol) + pth_storage_options = fs_cls._get_kwargs_from_urls(urlpath) + return {**pth_storage_options, **storage_options} - @property - def storage_options(self) -> Mapping[str, Any]: - return MappingProxyType(self._storage_options) + @classmethod + def _fs_factory( + cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any] + ) -> AbstractFileSystem: + """Instantiate the filesystem_spec filesystem class""" + fs_cls = get_filesystem_class(protocol) + so_dct = fs_cls._get_kwargs_from_urls(urlpath) + so_dct.update(storage_options) + return fs_cls(**storage_options) + + # === upath.UPath COMPATIBILITY API =============================== def __init_subclass__(cls, **kwargs): """provide a clean migration path for custom user subclasses""" @@ -300,31 +339,17 @@ def _fs_factory( inst = cls_._default_accessor(url, **storage_options) return inst._fs - cls._fs_factory = classmethod(_fs_factory) - - @classmethod - def _fs_factory( - cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any] - ) -> AbstractFileSystem: - """Instantiate the filesystem_spec filesystem class""" - fs_cls = get_filesystem_class(protocol) - so_dct = fs_cls._get_kwargs_from_urls(urlpath) - so_dct.update(storage_options) - return fs_cls(**storage_options) - - @property - def fs(self) -> AbstractFileSystem: - try: - return self._fs_cached - except AttributeError: - fs = self._fs_cached = self._fs_factory( - str(self), self.protocol, self.storage_options - ) - return fs + def _parse_storage_options( + cls_, urlpath: str, protocol: str, storage_options: Mapping[str, Any] + ) -> dict[str, Any]: + url = urlsplit(urlpath) + if protocol: + url = url._replace(scheme=protocol) + inst = cls_._default_accessor(url, **storage_options) + return inst._fs.storage_options - @property - def path(self) -> str: - return super().__str__() + cls._fs_factory = classmethod(_fs_factory) + cls._parse_storage_options = classmethod(_parse_storage_options) @property def _path(self): @@ -442,14 +467,6 @@ def _drv(self): def _drv(self, value): self.__drv = value - @property - def drive(self): - try: - return self.__drv - except AttributeError: - self._load_parts() - return self.__drv - @property def _root(self): # direct access to ._root should emit a warning, @@ -464,14 +481,6 @@ def _root(self): def _root(self, value): self.__root = value - @property - def root(self): - try: - return self.__root - except AttributeError: - self._load_parts() - return self.__root - @property def _parts(self): # UPath._parts is not used anymore, and not available @@ -489,15 +498,6 @@ def _parts(self): def _parts(self, value): self.__parts = value - @property - def parts(self): - try: - return self.__parts - except AttributeError: - self._load_parts() - self.__parts = super().parts - return self.__parts - # === pathlib.PurePath ============================================ def __reduce__(self):