Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify UCObjectStore.list_objects to lists all files recursively #2959

Merged
merged 9 commits into from
Feb 3, 2024
39 changes: 28 additions & 11 deletions composer/utils/object_store/uc_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,6 @@ def get_object_size(self, object_name: str) -> int:
def list_objects(self, prefix: Optional[str]) -> List[str]:
"""List all objects in the object store with the given prefix.
.. note::
This function removes the directories from the returned list.
Args:
prefix (str): The prefix to search for.
Expand All @@ -234,14 +230,35 @@ def list_objects(self, prefix: Optional[str]) -> List[str]:

from databricks.sdk.core import DatabricksError
try:
data = json.dumps({'path': self._get_object_path(prefix)})
# NOTE: This API is in preview and should not be directly used outside of this instance
resp = self.client.api_client.do(method='GET',
path=self._UC_VOLUME_LIST_API_ENDPOINT,
data=data,
headers={'Source': 'mosaicml/composer'})
assert isinstance(resp, dict)
return [f['path'] for f in resp.get('files', []) if not f['is_dir']]
logging.warn('UCObjectStore.list_objects is experimental.')

# Iteratively get all UC Volume files with `prefix`.
stack = [prefix]
all_files = []

while len(stack) > 0:
current_path = stack.pop()

# Note: Databricks SDK handles HTTP errors and retries.
# See https://github.com/databricks/databricks-sdk-py/blob/v0.18.0/databricks/sdk/core.py#L125 and
# https://github.com/databricks/databricks-sdk-py/blob/v0.18.0/databricks/sdk/retries.py#L33 .
resp = self.client.api_client.do(method='GET',
path=self._UC_VOLUME_LIST_API_ENDPOINT,
data=json.dumps({'path': self._get_object_path(current_path)}),
headers={'Source': 'mosaicml/composer'})

assert isinstance(resp, dict), 'Response is not a dictionary'

for f in resp.get('files', []):
fpath = f['path']
if f['is_dir']:
stack.append(fpath)
else:
all_files.append(fpath)

return all_files

except DatabricksError as e:
_wrap_errors(self.get_uri(prefix), e)
return []
46 changes: 43 additions & 3 deletions tests/utils/object_store/test_uc_object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,49 @@ def generate_dummy_file(_):
raise NotImplementedError(f'Test for result={result} is not implemented.')


def test_list_objects_nested_folders(ws_client, uc_object_store):
expected_files = [
'/Volumes/catalog/volume/schema/path/to/folder/file1.txt',
'/Volumes/catalog/volume/schema/path/to/folder/file2.txt',
'/Volumes/catalog/volume/schema/path/to/folder/subdir/file1.txt',
'/Volumes/catalog/volume/schema/path/to/folder/subdir/file2.txt',
]
uc_list_api_responses = [{
'files': [{
'path': '/Volumes/catalog/volume/schema/path/to/folder/file1.txt',
'is_dir': False
}, {
'path': '/Volumes/catalog/volume/schema/path/to/folder/file2.txt',
'is_dir': False
}, {
'path': '/Volumes/catalog/volume/schema/path/to/folder/subdir',
'is_dir': True
}]
}, {
'files': [{
'path': '/Volumes/catalog/volume/schema/path/to/folder/subdir/file1.txt',
'is_dir': False
}, {
'path': '/Volumes/catalog/volume/schema/path/to/folder/subdir/file2.txt',
'is_dir': False
}]
}]

prefix = 'Volumes/catalog/schema/volume/path/to/folder'

ws_client.api_client.do = MagicMock(side_effect=[uc_list_api_responses[0], uc_list_api_responses[1]])
actual_files = uc_object_store.list_objects(prefix=prefix)

assert actual_files == expected_files

ws_client.api_client.do.assert_called_with(method='GET',
path=uc_object_store._UC_VOLUME_LIST_API_ENDPOINT,
data='{"path": "/Volumes/catalog/volume/schema/path/to/folder/subdir"}',
headers={'Source': 'mosaicml/composer'})

assert ws_client.api_client.do.call_count == 2


@pytest.mark.parametrize('result', ['success', 'prefix_none', 'not_found', 'error'])
def test_list_objects(ws_client, uc_object_store, result):
expected_files = [
Expand All @@ -173,9 +216,6 @@ def test_list_objects(ws_client, uc_object_store, result):
}, {
'path': '/Volumes/catalog/volume/schema/path/to/folder/file2.txt',
'is_dir': False
}, {
'path': '/Volumes/catalog/volume/schema/path/to/folder/samples/',
'is_dir': True
}]
}

Expand Down
Loading