Skip to content

Commit

Permalink
Flexible url handling (fsspec#271)
Browse files Browse the repository at this point in the history
* Updated _strip_protocol to support flexible urls

* Commits for flexible url handling

* Added unit test to evaluate new url format

* Linting
  • Loading branch information
hayesgb authored Aug 29, 2021
1 parent 12d7add commit ef93577
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 4 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
**Change Log**
v2021.09.1
----------
- Fixed isdir() bug causing some directories to be labeled incorrectly as files
- Added flexible url handling to improve compatibility with other applications using Spark and fsspec

v2021.08.1
----------
- Fixed call to isdir(), to run direct call to Azure container, instead of calling .ls on the directory
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ storage_options={'account_name': ACCOUNT_NAME, 'account_key': ACCOUNT_KEY}
ddf = dd.read_csv('abfs://{CONTAINER}/{FOLDER}/*.csv', storage_options=storage_options)
ddf = dd.read_parquet('az://{CONTAINER}/folder.parquet', storage_options=storage_options)

Accepted protocol / uri formats include:
'PROTOCOL://container/path-part/file'
'PROTOCOL://container@account.dfs.core.windows.net/path-part/file'

or optionally, if AZURE_STORAGE_ACCOUNT_NAME and an AZURE_STORAGE_<CREDENTIAL> is
set as an environmental variable, then storage_options will be read from the environmental
variables
Expand Down
16 changes: 16 additions & 0 deletions adlfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from .spec import AzureBlobFileSystem, AzureBlobFile
from ._version import get_versions

import fsspec

__all__ = ["AzureBlobFileSystem", "AzureBlobFile", "AzureDatalakeFileSystem"]

__version__ = get_versions()["version"]
Expand All @@ -11,3 +13,17 @@

__version__ = get_versions()["version"]
del get_versions

if hasattr(fsspec, "register_implementation"):
fsspec.register_implementation("abfss", AzureBlobFileSystem, clobber=True)
else:
from fsspec.registry import known_implementations

known_implementations["abfss"] = {
"class": "adlfs.AzureBlobFileSystem",
"err": "Please install adlfs to use the abfss protocol",
}

del known_implementations

del fsspec # clear the module namespace
16 changes: 12 additions & 4 deletions adlfs/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,15 +451,23 @@ def _strip_protocol(cls, path: str):
str
Returns a path without the protocol
"""
STORE_SUFFIX = ".dfs.core.windows.net"
logger.debug(f"_strip_protocol for {path}")
if not path.startswith(("abfs://", "az://")):
path = path.lstrip("/")
path = "abfs://" + path
ops = infer_storage_options(path)

if "username" in ops:
if ops.get("username", None):
ops["path"] = ops["username"] + ops["path"]
# we need to make sure that the path retains
# the format {host}/{path}
# here host is the container_name
if ops.get("host", None):
ops["path"] = ops["host"] + ops["path"]
ops["path"] = ops["path"].lstrip("/")
elif ops.get("host", None):
if (
ops["host"].count(STORE_SUFFIX) == 0
): # no store-suffix, so this is container-name
ops["path"] = ops["host"] + ops["path"]

logger.debug(f"_strip_protocol({path}) = {ops}")
return ops["path"]
Expand Down
53 changes: 53 additions & 0 deletions adlfs/tests/test_uri_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import dask.dataframe as dd
import pandas as pd
from pandas.testing import assert_frame_equal

from adlfs import AzureBlobFileSystem


URL = "http://127.0.0.1:10000"
ACCOUNT_NAME = "devstoreaccount1"
KEY = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" # NOQA
CONN_STR = f"DefaultEndpointsProtocol=http;AccountName={ACCOUNT_NAME};AccountKey={KEY};BlobEndpoint={URL}/{ACCOUNT_NAME};" # NOQA


def test_dask_parquet(storage):
fs = AzureBlobFileSystem(
account_name=storage.account_name, connection_string=CONN_STR
)
fs.mkdir("test")
STORAGE_OPTIONS = {
"account_name": "devstoreaccount1",
"connection_string": CONN_STR,
}
df = pd.DataFrame(
{
"col1": [1, 2, 3, 4],
"col2": [2, 4, 6, 8],
"index_key": [1, 1, 2, 2],
"partition_key": [1, 1, 2, 2],
}
)

dask_dataframe = dd.from_pandas(df, npartitions=1)
for protocol in ["abfs", "az"]:
dask_dataframe.to_parquet(
"{}://test@dfs.core.windows.net/test_group.parquet".format(protocol),
storage_options=STORAGE_OPTIONS,
engine="pyarrow",
)

fs = AzureBlobFileSystem(**STORAGE_OPTIONS)
assert fs.ls("test/test_group.parquet") == [
"test/test_group.parquet/_common_metadata",
"test/test_group.parquet/_metadata",
"test/test_group.parquet/part.0.parquet",
]
fs.rm("test/test_group.parquet")

df_test = dd.read_parquet(
"abfs://test/test_group.parquet",
storage_options=STORAGE_OPTIONS,
engine="pyarrow",
).compute()
assert_frame_equal(df, df_test)

0 comments on commit ef93577

Please sign in to comment.