From ef93577376f9099c801c81e17c764817cc2c503e Mon Sep 17 00:00:00 2001 From: Greg Hayes Date: Sat, 28 Aug 2021 22:38:06 -0500 Subject: [PATCH] Flexible url handling (#271) * Updated _strip_protocol to support flexible urls * Commits for flexible url handling * Added unit test to evaluate new url format * Linting --- CHANGELOG.md | 5 ++++ README.md | 4 +++ adlfs/__init__.py | 16 ++++++++++ adlfs/spec.py | 16 +++++++--- adlfs/tests/test_uri_format.py | 53 ++++++++++++++++++++++++++++++++++ 5 files changed, 90 insertions(+), 4 deletions(-) create mode 100644 adlfs/tests/test_uri_format.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c069a2e..438087eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,9 @@ **Change Log** +v2021.09.1 +---------- +- Fixed isdir() bug causing some directories to be labeled incorrectly as files +- Added flexible url handling to improve compatibility with other applications using Spark and fsspec + v2021.08.1 ---------- - Fixed call to isdir(), to run direct call to Azure container, instead of calling .ls on the directory diff --git a/README.md b/README.md index 8fcecafd..e894e25e 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,10 @@ storage_options={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY} ddf = dd.read_csv('abfs://{CONTAINER}/{FOLDER}/*.csv', storage_options=storage_options) ddf = dd.read_parquet('az://{CONTAINER}/folder.parquet', storage_options=storage_options) +Accepted protocol / uri formats include: +'PROTOCOL://container/path-part/file' +'PROTOCOL://container@account.dfs.core.windows.net/path-part/file' + or optionally, if AZURE_STORAGE_ACCOUNT_NAME and an AZURE_STORAGE_ is set as an environmental variable, then storage_options will be read from the environmental variables diff --git a/adlfs/__init__.py b/adlfs/__init__.py index a1c5810c..38e6a8c7 100644 --- a/adlfs/__init__.py +++ b/adlfs/__init__.py @@ -2,6 +2,8 @@ from .spec import AzureBlobFileSystem, AzureBlobFile from ._version import get_versions +import fsspec + __all__ = ["AzureBlobFileSystem", "AzureBlobFile", "AzureDatalakeFileSystem"] __version__ = get_versions()["version"] @@ -11,3 +13,17 @@ __version__ = get_versions()["version"] del get_versions + +if hasattr(fsspec, "register_implementation"): + fsspec.register_implementation("abfss", AzureBlobFileSystem, clobber=True) +else: + from fsspec.registry import known_implementations + + known_implementations["abfss"] = { + "class": "adlfs.AzureBlobFileSystem", + "err": "Please install adlfs to use the abfss protocol", + } + + del known_implementations + +del fsspec # clear the module namespace diff --git a/adlfs/spec.py b/adlfs/spec.py index a39f3505..779bc7e6 100644 --- a/adlfs/spec.py +++ b/adlfs/spec.py @@ -451,15 +451,23 @@ def _strip_protocol(cls, path: str): str Returns a path without the protocol """ + STORE_SUFFIX = ".dfs.core.windows.net" logger.debug(f"_strip_protocol for {path}") + if not path.startswith(("abfs://", "az://")): + path = path.lstrip("/") + path = "abfs://" + path ops = infer_storage_options(path) - + if "username" in ops: + if ops.get("username", None): + ops["path"] = ops["username"] + ops["path"] # we need to make sure that the path retains # the format {host}/{path} # here host is the container_name - if ops.get("host", None): - ops["path"] = ops["host"] + ops["path"] - ops["path"] = ops["path"].lstrip("/") + elif ops.get("host", None): + if ( + ops["host"].count(STORE_SUFFIX) == 0 + ): # no store-suffix, so this is container-name + ops["path"] = ops["host"] + ops["path"] logger.debug(f"_strip_protocol({path}) = {ops}") return ops["path"] diff --git a/adlfs/tests/test_uri_format.py b/adlfs/tests/test_uri_format.py new file mode 100644 index 00000000..01729f69 --- /dev/null +++ b/adlfs/tests/test_uri_format.py @@ -0,0 +1,53 @@ +import dask.dataframe as dd +import pandas as pd +from pandas.testing import assert_frame_equal + +from adlfs import AzureBlobFileSystem + + +URL = "http://127.0.0.1:10000" +ACCOUNT_NAME = "devstoreaccount1" +KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" # NOQA +CONN_STR = f"DefaultEndpointsProtocol=http;AccountName={ACCOUNT_NAME};AccountKey={KEY};BlobEndpoint={URL}/{ACCOUNT_NAME};" # NOQA + + +def test_dask_parquet(storage): + fs = AzureBlobFileSystem( + account_name=storage.account_name, connection_string=CONN_STR + ) + fs.mkdir("test") + STORAGE_OPTIONS = { + "account_name": "devstoreaccount1", + "connection_string": CONN_STR, + } + df = pd.DataFrame( + { + "col1": [1, 2, 3, 4], + "col2": [2, 4, 6, 8], + "index_key": [1, 1, 2, 2], + "partition_key": [1, 1, 2, 2], + } + ) + + dask_dataframe = dd.from_pandas(df, npartitions=1) + for protocol in ["abfs", "az"]: + dask_dataframe.to_parquet( + "{}://test@dfs.core.windows.net/test_group.parquet".format(protocol), + storage_options=STORAGE_OPTIONS, + engine="pyarrow", + ) + + fs = AzureBlobFileSystem(**STORAGE_OPTIONS) + assert fs.ls("test/test_group.parquet") == [ + "test/test_group.parquet/_common_metadata", + "test/test_group.parquet/_metadata", + "test/test_group.parquet/part.0.parquet", + ] + fs.rm("test/test_group.parquet") + + df_test = dd.read_parquet( + "abfs://test/test_group.parquet", + storage_options=STORAGE_OPTIONS, + engine="pyarrow", + ).compute() + assert_frame_equal(df, df_test)