diff --git a/RELEASE.md b/RELEASE.md index 2090d3c764..431a313e50 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,6 +1,7 @@ # Upcoming Release 0.19.0 ## Major features and improvements +* `PartitionedDataSet` and `IncrementalDataSet` now both support versioning of the underlying dataset. ## Bug fixes and other changes @@ -900,8 +901,7 @@ from kedro.framework.session import KedroSession * We've added a `DeprecationWarning` to the Transformers API when adding a transformer to the catalog. These will be removed in release 0.18.0. Use Hooks to customise the `load` and `save` methods. ## Thanks for supporting contributions -[Deepyaman Datta](https://github.com/deepyaman), -[Zach Schuster](https://github.com/zschuster) +[Deepyaman Datta](https://github.com/deepyaman), [Zach Schuster](https://github.com/zschuster) ## Migration guide from Kedro 0.16.* to 0.17.* diff --git a/docs/source/deployment/prefect.md b/docs/source/deployment/prefect.md index 64d1018984..b602b499ec 100644 --- a/docs/source/deployment/prefect.md +++ b/docs/source/deployment/prefect.md @@ -1,6 +1,6 @@ # Prefect -This page explains how to run your Kedro pipeline using [Prefect 2.0](https://www.prefect.io/products/core/), an open-source workflow management system. +This page explains how to run your Kedro pipeline using [Prefect 2.0](https://www.prefect.io/opensource), an open-source workflow management system. The scope of this documentation is the deployment to a self hosted [Prefect Server](https://docs.prefect.io/2.10.17/host/), which is an open-source backend that makes it easy to monitor and execute your Prefect flows and automatically extends Prefect 2.0. We will use an [Agent that dequeues submitted flow runs from a Work Queue](https://docs.prefect.io/2.10.17/tutorial/deployments/#why-workpools-and-workers). diff --git a/kedro/io/partitioned_dataset.py b/kedro/io/partitioned_dataset.py index 1501fc4a04..7c077e853e 100644 --- a/kedro/io/partitioned_dataset.py +++ b/kedro/io/partitioned_dataset.py @@ -6,6 +6,7 @@ import operator import warnings from copy import deepcopy +from pathlib import PurePosixPath from typing import Any, Callable from urllib.parse import urlparse @@ -36,6 +37,18 @@ IncrementalDataSet: type[IncrementalDataset] +def _grandparent(path: str) -> str: + path_obj = PurePosixPath(path) + grandparent = path_obj.parents[1] + if grandparent.name != path_obj.name: + last_three_parts = path_obj.relative_to(*path_obj.parts[:-3]) + raise DatasetError( + f"`{path}` is not a well-formed versioned path ending with " + f"`filename/timestamp/filename` (got `{last_three_parts}`)." + ) + return str(grandparent) + + class PartitionedDataset(AbstractDataSet): # noqa: too-many-instance-attributes,protected-access """``PartitionedDataset`` loads and saves partitioned file-like data using the @@ -182,7 +195,7 @@ def __init__( # noqa: too-many-arguments load_args: Keyword arguments to be passed into ``find()`` method of the filesystem implementation. fs_args: Extra arguments to pass into underlying filesystem class constructor - (e.g. `{"project": "my-project"}` for ``GCSFileSystem``) + (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). overwrite: If True, any existing partitions will be removed. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. @@ -204,12 +217,6 @@ def __init__( # noqa: too-many-arguments dataset = dataset if isinstance(dataset, dict) else {"type": dataset} self._dataset_type, self._dataset_config = parse_dataset_definition(dataset) - if VERSION_KEY in self._dataset_config: - raise DatasetError( - f"'{self.__class__.__name__}' does not support versioning of the " - f"underlying dataset. Please remove '{VERSIONED_FLAG_KEY}' flag from " - f"the dataset definition." - ) if credentials: if CREDENTIALS_KEY in self._dataset_config: @@ -260,8 +267,9 @@ def _normalized_path(self) -> str: @cachedmethod(cache=operator.attrgetter("_partition_cache")) def _list_partitions(self) -> list[str]: + dataset_is_versioned = VERSION_KEY in self._dataset_config return [ - path + _grandparent(path) if dataset_is_versioned else path for path in self._filesystem.find(self._normalized_path, **self._load_args) if path.endswith(self._filename_suffix) ] @@ -442,7 +450,7 @@ def __init__( # noqa: too-many-arguments This is ignored by Kedro, but may be consumed by users or external plugins. Raises: - DatasetError: If versioning is enabled for the underlying dataset. + DatasetError: If versioning is enabled for the checkpoint dataset. """ super().__init__( @@ -502,6 +510,7 @@ def _list_partitions(self) -> list[str]: checkpoint_path = self._filesystem._strip_protocol( # noqa: protected-access self._checkpoint_config[self._filepath_arg] ) + dataset_is_versioned = VERSION_KEY in self._dataset_config def _is_valid_partition(partition) -> bool: if not partition.endswith(self._filename_suffix): @@ -515,9 +524,9 @@ def _is_valid_partition(partition) -> bool: return self._comparison_func(partition_id, checkpoint) return sorted( - part - for part in self._filesystem.find(self._normalized_path, **self._load_args) - if _is_valid_partition(part) + _grandparent(path) if dataset_is_versioned else path + for path in self._filesystem.find(self._normalized_path, **self._load_args) + if _is_valid_partition(path) ) @property diff --git a/tests/io/test_incremental_dataset.py b/tests/io/test_incremental_dataset.py index b1dd974f28..7f993d05cc 100644 --- a/tests/io/test_incremental_dataset.py +++ b/tests/io/test_incremental_dataset.py @@ -255,11 +255,75 @@ def test_checkpoint_type( ), ], ) - def test_version_not_allowed(self, tmp_path, checkpoint_config, error_pattern): + def test_checkpoint_versioning_not_allowed( + self, tmp_path, checkpoint_config, error_pattern + ): """Test that invalid checkpoint configurations raise expected errors""" with pytest.raises(DatasetError, match=re.escape(error_pattern)): IncrementalDataset(str(tmp_path), DATASET, checkpoint=checkpoint_config) + @pytest.mark.parametrize("dataset_config", [{"type": DATASET, "versioned": True}]) + @pytest.mark.parametrize( + "suffix,expected_num_parts", [("", 5), (".csv", 5), ("bad", 0)] + ) + def test_versioned_dataset_save_and_load( + self, + mocker, + tmp_path, + partitioned_data_pandas, + dataset_config, + suffix, + expected_num_parts, + ): + """Test that saved and reloaded data matches the original one for + the versioned data set.""" + save_version = "2020-01-01T00.00.00.000Z" + mock_ts = mocker.patch( + "kedro.io.core.generate_timestamp", return_value=save_version + ) + IncrementalDataset(str(tmp_path), dataset_config).save(partitioned_data_pandas) + mock_ts.assert_called_once() + + dataset = IncrementalDataset( + str(tmp_path), dataset_config, filename_suffix=suffix + ) + loaded_partitions = dataset.load() + + assert len(loaded_partitions) == expected_num_parts + + actual_save_versions = set() + for part in loaded_partitions: + partition_dir = tmp_path / (part + suffix) + actual_save_versions |= {each.name for each in partition_dir.iterdir()} + assert partition_dir.is_dir() + assert_frame_equal( + loaded_partitions[part], partitioned_data_pandas[part + suffix] + ) + + if expected_num_parts: + # all partitions were saved using the same version string + assert actual_save_versions == {save_version} + + def test_malformed_versioned_path(self, tmp_path): + local_dir = tmp_path / "files" + local_dir.mkdir() + + path = local_dir / "path/to/folder/new/partition/version/partition/file" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("content") + + dataset = IncrementalDataset( + str(local_dir / "path/to/folder"), + {"type": "pandas.CSVDataSet", "versioned": True}, + ) + + pattern = re.escape( + f"`{path.as_posix()}` is not a well-formed versioned path ending with " + f"`filename/timestamp/filename` (got `version/partition/file`)." + ) + with pytest.raises(DatasetError, match=pattern): + dataset.load() + @pytest.mark.parametrize( "pds_config,fs_creds,dataset_creds,checkpoint_creds", [ diff --git a/tests/io/test_partitioned_dataset.py b/tests/io/test_partitioned_dataset.py index 97735a7380..da88a024dc 100644 --- a/tests/io/test_partitioned_dataset.py +++ b/tests/io/test_partitioned_dataset.py @@ -27,7 +27,7 @@ def partitioned_data_pandas(): @pytest.fixture def local_csvs(tmp_path, partitioned_data_pandas): - local_dir = Path(str(tmp_path / "csvs")) + local_dir = tmp_path / "csvs" local_dir.mkdir() for k, data in partitioned_data_pandas.items(): @@ -37,6 +37,11 @@ def local_csvs(tmp_path, partitioned_data_pandas): return local_dir +@pytest.fixture +def filepath_csvs(tmp_path): + return str(tmp_path / "csvs") + + LOCAL_DATASET_DEFINITION = [ "pandas.CSVDataSet", "kedro.extras.datasets.pandas.CSVDataSet", @@ -278,17 +283,68 @@ def test_invalid_dataset_config(self, dataset_config, error_pattern): @pytest.mark.parametrize( "dataset_config", [ - {"type": CSVDataSet, "versioned": True}, - {"type": "pandas.CSVDataSet", "versioned": True}, + {**ds_config, "versioned": True} + for ds_config in LOCAL_DATASET_DEFINITION + if isinstance(ds_config, dict) ], ) - def test_versioned_dataset_not_allowed(self, dataset_config): - pattern = ( - "'PartitionedDataset' does not support versioning of the underlying " - "dataset. Please remove 'versioned' flag from the dataset definition." + @pytest.mark.parametrize( + "suffix,expected_num_parts", [("", 5), (".csv", 3), ("p4", 1)] + ) + def test_versioned_dataset_save_and_load( + self, + mocker, + filepath_csvs, + dataset_config, + suffix, + expected_num_parts, + partitioned_data_pandas, + ): # pylint: disable=too-many-locals + """Test that saved and reloaded data matches the original one for + the versioned data set.""" + save_version = "2020-01-01T00.00.00.000Z" + mock_ts = mocker.patch( + "kedro.io.core.generate_timestamp", return_value=save_version ) - with pytest.raises(DatasetError, match=re.escape(pattern)): - PartitionedDataset(str(Path.cwd()), dataset_config) + PartitionedDataset(filepath_csvs, dataset_config).save(partitioned_data_pandas) + mock_ts.assert_called_once() + + pds = PartitionedDataset(filepath_csvs, dataset_config, filename_suffix=suffix) + loaded_partitions = pds.load() + + assert len(loaded_partitions) == expected_num_parts + actual_save_versions = set() + for partition_id, load_func in loaded_partitions.items(): + partition_dir = Path(filepath_csvs, partition_id + suffix) + actual_save_versions |= {each.name for each in partition_dir.iterdir()} + df = load_func() + assert_frame_equal(df, partitioned_data_pandas[partition_id + suffix]) + if suffix: + assert not partition_id.endswith(suffix) + + if expected_num_parts: + # all partitions were saved using the same version string + assert actual_save_versions == {save_version} + + def test_malformed_versioned_path(self, tmp_path): + local_dir = tmp_path / "files" + local_dir.mkdir() + + path = local_dir / "path/to/folder/new/partition/version/partition/file" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("content") + + pds = PartitionedDataset( + str(local_dir / "path/to/folder"), + {"type": "pandas.CSVDataSet", "versioned": True}, + ) + + pattern = re.escape( + f"`{path.as_posix()}` is not a well-formed versioned path ending with " + f"`filename/timestamp/filename` (got `version/partition/file`)." + ) + with pytest.raises(DatasetError, match=pattern): + pds.load() def test_no_partitions(self, tmpdir): pds = PartitionedDataset(str(tmpdir), "pandas.CSVDataSet")