-
Notifications
You must be signed in to change notification settings - Fork 926
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support versioning of the underlying dataset with PartitionedDataset #521
Changes from 24 commits
00e1783
61dba32
6f347ec
a80a799
fb6992d
eceeb50
7ce6bbc
70ed18d
1b8c7fc
1232568
91ecc39
cc50803
e34f58b
f757d26
49c1214
0758334
f3adbec
a8c7bf7
10a6afc
b486a15
5fbf972
8e94bbf
e39032c
b56d201
b4390ce
5dc8c0f
0aeea98
423c7be
7d7e4c5
bcfe409
207dfb4
71e847e
519ef7d
8ea477f
8b57e46
11965e4
e8f5020
9b5cd46
224343c
2665e46
7001adb
514a07e
b82c9f6
8619e24
865ad81
1eac08a
841361d
e08a8e5
6d3f5ea
691d9d6
0cbd9c2
aa90f92
364572f
3eb5b6c
1c64f64
aace8ae
de8eb4b
3b801ed
d651cee
8beebf8
92a9c51
89dbfc4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
""" | ||
import operator | ||
from copy import deepcopy | ||
from pathlib import PurePosixPath | ||
from typing import Any, Callable, Dict, List, Tuple, Type, Union | ||
from urllib.parse import urlparse | ||
from warnings import warn | ||
|
@@ -58,6 +59,18 @@ | |
S3_PROTOCOLS = ("s3", "s3a", "s3n") | ||
|
||
|
||
def _grandparent(path: str) -> str: | ||
path_obj = PurePosixPath(path) | ||
grandparent = path_obj.parents[1] | ||
if grandparent.name != path_obj.name: | ||
last_three_parts = path_obj.relative_to(*path_obj.parts[:-3]) | ||
raise DataSetError( | ||
f"`{path}` is not a well-formed versioned path ending with " | ||
f"`filename/timestamp/filename` (got `{last_three_parts}`)." | ||
deepyaman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
return str(grandparent) | ||
DmitriiDeriabinQB marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
class PartitionedDataSet(AbstractDataSet): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @deepyaman! I believe if we want this to be merged before 0.17, we might need these changes to be implemented for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also should this be merged in |
||
# pylint: disable=too-many-instance-attributes,protected-access | ||
"""``PartitionedDataSet`` loads and saves partitioned file-like data using the | ||
|
@@ -144,9 +157,6 @@ def __init__( # pylint: disable=too-many-arguments | |
the filesystem implementation. | ||
fs_args: Extra arguments to pass into underlying filesystem class constructor | ||
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``) | ||
|
||
Raises: | ||
DataSetError: If versioning is enabled for the underlying dataset. | ||
""" | ||
# pylint: disable=import-outside-toplevel | ||
from fsspec.utils import infer_storage_options # for performance reasons | ||
|
@@ -160,13 +170,6 @@ def __init__( # pylint: disable=too-many-arguments | |
|
||
dataset = dataset if isinstance(dataset, dict) else {"type": dataset} | ||
self._dataset_type, self._dataset_config = parse_dataset_definition(dataset) | ||
if VERSION_KEY in self._dataset_config: | ||
raise DataSetError( | ||
"`{}` does not support versioning of the underlying dataset. " | ||
"Please remove `{}` flag from the dataset definition.".format( | ||
self.__class__.__name__, VERSIONED_FLAG_KEY | ||
) | ||
) | ||
|
||
self._credentials, dataset_credentials = _split_credentials(credentials) | ||
if dataset_credentials: | ||
|
@@ -217,7 +220,7 @@ def _normalized_path(self) -> str: | |
@cachedmethod(cache=operator.attrgetter("_partition_cache")) | ||
def _list_partitions(self) -> List[str]: | ||
return [ | ||
deepyaman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
path | ||
_grandparent(path) if self._dataset_config.get(VERSION_KEY) else path | ||
deepyaman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for path in self._filesystem.find(self._normalized_path, **self._load_args) | ||
if path.endswith(self._filename_suffix) | ||
] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,7 +52,7 @@ def partitioned_data_pandas(): | |
|
||
@pytest.fixture | ||
def local_csvs(tmp_path, partitioned_data_pandas): | ||
local_dir = Path(str(tmp_path / "csvs")) | ||
local_dir = tmp_path / "csvs" | ||
local_dir.mkdir() | ||
|
||
for k, data in partitioned_data_pandas.items(): | ||
|
@@ -62,6 +62,11 @@ def local_csvs(tmp_path, partitioned_data_pandas): | |
return local_dir | ||
|
||
|
||
@pytest.fixture | ||
def filepath_csvs(tmp_path): | ||
return str(tmp_path / "csvs") | ||
|
||
|
||
LOCAL_DATASET_DEFINITION = [ | ||
"pandas.CSVDataSet", | ||
"kedro.extras.datasets.pandas.CSVDataSet", | ||
|
@@ -286,17 +291,56 @@ def test_invalid_dataset_config(self, dataset_config, error_pattern): | |
@pytest.mark.parametrize( | ||
"dataset_config", | ||
[ | ||
{"type": CSVDataSet, "versioned": True}, | ||
{"type": "pandas.CSVDataSet", "versioned": True}, | ||
{**ds_config, "versioned": True} | ||
for ds_config in LOCAL_DATASET_DEFINITION | ||
if isinstance(ds_config, dict) | ||
], | ||
) | ||
def test_versioned_dataset_not_allowed(self, dataset_config): | ||
pattern = ( | ||
"`PartitionedDataSet` does not support versioning of the underlying " | ||
"dataset. Please remove `versioned` flag from the dataset definition." | ||
@pytest.mark.parametrize( | ||
"suffix,expected_num_parts", [("", 5), (".csv", 3), ("p4", 1)] | ||
) | ||
def test_versioned_dataset_save_and_load( | ||
self, | ||
filepath_csvs, | ||
dataset_config, | ||
suffix, | ||
expected_num_parts, | ||
partitioned_data_pandas, | ||
): | ||
"""Test that saved and reloaded data matches the original one for | ||
the versioned data set.""" | ||
PartitionedDataSet(filepath_csvs, dataset_config).save(partitioned_data_pandas) | ||
|
||
pds = PartitionedDataSet(filepath_csvs, dataset_config, filename_suffix=suffix) | ||
loaded_partitions = pds.load() | ||
|
||
assert len(loaded_partitions.keys()) == expected_num_parts | ||
for partition_id, load_func in loaded_partitions.items(): | ||
df = load_func() | ||
assert_frame_equal(df, partitioned_data_pandas[partition_id + suffix]) | ||
if suffix: | ||
assert not partition_id.endswith(suffix) | ||
|
||
def test_malformed_versioned_path(self, tmp_path): | ||
local_dir = tmp_path / "files" | ||
local_dir.mkdir() | ||
|
||
path = local_dir / "path/to/folder/new/partition/version/partition/file" | ||
path.parent.mkdir(parents=True, exist_ok=True) | ||
path.write_text("content") | ||
|
||
pds = PartitionedDataSet( | ||
str(local_dir / "path/to/folder"), | ||
{"type": "pandas.CSVDataSet", "versioned": True}, | ||
) | ||
with pytest.raises(DataSetError, match=re.escape(pattern)): | ||
PartitionedDataSet(str(Path.cwd()), dataset_config) | ||
|
||
pattern = re.escape( | ||
f"`{path.as_posix()}` is not a well-formed versioned path ending with " | ||
"`filename/timestamp/filename` (got `version/partition/" | ||
"file`)." | ||
deepyaman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
with pytest.raises(DataSetError, match=pattern): | ||
pds.load() | ||
|
||
def test_no_partitions(self, tmpdir): | ||
pds = PartitionedDataSet(str(tmpdir), "pandas.CSVDataSet") | ||
|
@@ -328,7 +372,7 @@ def test_no_partitions(self, tmpdir): | |
def test_filepath_arg_warning(self, pds_config, filepath_arg): | ||
pattern = ( | ||
f"`{filepath_arg}` key must not be specified in the dataset definition as it " | ||
f"will be overwritten by partition path" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We usually keep an |
||
"will be overwritten by partition path" | ||
deepyaman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
with pytest.warns(UserWarning, match=re.escape(pattern)): | ||
PartitionedDataSet(**pds_config) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Optional) Maybe also add a hint about that was expected? Something like: