From fb067e803cc4bacb06395cd46d4d2d9accafce3c Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Wed, 21 Jun 2023 10:40:50 -0700 Subject: [PATCH 1/8] Implement Dataset.distinct Signed-off-by: Hao Chen --- python/ray/data/dataset.py | 27 ++++++++++++++++++++++++ python/ray/data/tests/test_all_to_all.py | 18 ++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 7e1d64226459a..95755121fedb0 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1700,6 +1700,33 @@ def groupby(self, key: Optional[str]) -> "GroupedData": return GroupedData(self, key) + def distinct(self) -> "Dataset": + """Remove duplicate rows from the dataset. + + Examples: + >>> import ray + >>> ds = ray.data.from_items([1, 2, 3, 2, 3]) + >>> ds.distinct().take_all() + [{'item': 1}, {'item': 2}, {'item': 3}] + + Time complexity: O(dataset size * log(dataset size / parallelism)) + + .. note:: Currently distinct only supports Datasets with one single column. + + Returns: + A new dataset with distinct rows. + """ + columns = self.columns(fetch_if_missing=True) + assert columns is not None + if len(columns) > 1: + # TODO(hchen): Remove this limitation once groupby supports + # multiple columns. + raise NotImplementedError( + "`distinct` currently only suports Datasets with one single column, " + "please apply `select_columns` before `distinct`." + ) + return self.groupby(columns[0]).count().drop_columns(["count()"]) + @ConsumptionAPI def aggregate(self, *aggs: AggregateFn) -> Union[Any, Dict[str, Any]]: """Aggregate the entire dataset as one group. diff --git a/python/ray/data/tests/test_all_to_all.py b/python/ray/data/tests/test_all_to_all.py index 22700ff1e9efd..e5a8a8640063b 100644 --- a/python/ray/data/tests/test_all_to_all.py +++ b/python/ray/data/tests/test_all_to_all.py @@ -217,6 +217,24 @@ def test_repartition_shuffle_arrow(ray_start_regular_shared): assert large._block_num_rows() == [500] * 20 +def test_distinct(ray_start_regular_shared): + ds = ray.data.from_items([3, 2, 3, 1, 2, 3]) + assert ds.distinct().sort("item").take_all() == [ + {"item": 1}, + {"item": 2}, + {"item": 3}, + ] + ds = ray.data.from_items([ + {"a": 1, "b": 1}, + {"a": 1, "b": 2}, + ]) + # Currently, we don't support distinct on multiple columns. + with pytest.raises(NotImplementedError): + ds.distinct().take_all() + # After selecting a single column, distinct should work. + assert ds.select_columns(["a"]).distinct().take_all() == [{"a": 1}] + + def test_grouped_dataset_repr(ray_start_regular_shared): ds = ray.data.from_items([{"key": "spam"}, {"key": "ham"}, {"key": "spam"}]) assert repr(ds.groupby("key")) == f"GroupedData(dataset={ds!r}, key='key')" From 2569947025be410cd15d7e706d5682c7fc182693 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Wed, 21 Jun 2023 14:12:58 -0700 Subject: [PATCH 2/8] fix Signed-off-by: Hao Chen --- python/ray/data/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 95755121fedb0..0df8bb79f9d9b 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1707,7 +1707,7 @@ def distinct(self) -> "Dataset": >>> import ray >>> ds = ray.data.from_items([1, 2, 3, 2, 3]) >>> ds.distinct().take_all() - [{'item': 1}, {'item': 2}, {'item': 3}] + [{'item': 1}, {'item': 2}, {'item': 3}] Time complexity: O(dataset size * log(dataset size / parallelism)) From 776d512316c27b2e0269bf55282753750ea1aaec Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Wed, 21 Jun 2023 14:50:19 -0700 Subject: [PATCH 3/8] refine doc Signed-off-by: Hao Chen --- python/ray/data/dataset.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index ca82622780819..59f46036f49c0 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1720,7 +1720,7 @@ def groupby(self, key: Optional[str]) -> "GroupedData": return GroupedData(self, key) def distinct(self) -> "Dataset": - """Remove duplicate rows from the dataset. + """Remove duplicate rows from the :class:`~ray.data.Dataset`. Examples: >>> import ray @@ -1730,10 +1730,10 @@ def distinct(self) -> "Dataset": Time complexity: O(dataset size * log(dataset size / parallelism)) - .. note:: Currently distinct only supports Datasets with one single column. + .. note:: Currently distinct only supports :class:`~ray.data.Dataset`s with one single column. Returns: - A new dataset with distinct rows. + A new :class:`~ray.data.Dataset` with distinct rows. """ columns = self.columns(fetch_if_missing=True) assert columns is not None From 3d59fbdc49fb044059fd9fdf4401fb2719019d79 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Thu, 22 Jun 2023 13:15:28 -0700 Subject: [PATCH 4/8] select_columns Signed-off-by: Hao Chen --- python/ray/data/dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 59f46036f49c0..51c4975a88824 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1744,7 +1744,8 @@ def distinct(self) -> "Dataset": "`distinct` currently only suports Datasets with one single column, " "please apply `select_columns` before `distinct`." ) - return self.groupby(columns[0]).count().drop_columns(["count()"]) + column = columns[0] + return self.groupby(column).count().select_columns([column]) @ConsumptionAPI def aggregate(self, *aggs: AggregateFn) -> Union[Any, Dict[str, Any]]: From 65d88ea8b1217597139c186735af885efd6ddc5d Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Thu, 22 Jun 2023 13:19:13 -0700 Subject: [PATCH 5/8] api doc Signed-off-by: Hao Chen --- doc/source/data/api/dataset.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/source/data/api/dataset.rst b/doc/source/data/api/dataset.rst index cb6ac22fbc4ff..6e92e2bc9bd5a 100644 --- a/doc/source/data/api/dataset.rst +++ b/doc/source/data/api/dataset.rst @@ -61,6 +61,7 @@ Grouped and Global Aggregations :toctree: doc/ Dataset.groupby + Dataset.distinct Dataset.aggregate Dataset.sum Dataset.min From 5a80393e973dfe7be5aa9dd3094301fe784d5963 Mon Sep 17 00:00:00 2001 From: Hao Chen Date: Thu, 22 Jun 2023 13:21:14 -0700 Subject: [PATCH 6/8] lint Signed-off-by: Hao Chen --- python/ray/data/tests/test_all_to_all.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/python/ray/data/tests/test_all_to_all.py b/python/ray/data/tests/test_all_to_all.py index e5a8a8640063b..fd72c940a0865 100644 --- a/python/ray/data/tests/test_all_to_all.py +++ b/python/ray/data/tests/test_all_to_all.py @@ -220,14 +220,16 @@ def test_repartition_shuffle_arrow(ray_start_regular_shared): def test_distinct(ray_start_regular_shared): ds = ray.data.from_items([3, 2, 3, 1, 2, 3]) assert ds.distinct().sort("item").take_all() == [ - {"item": 1}, - {"item": 2}, - {"item": 3}, + {"item": 1}, + {"item": 2}, + {"item": 3}, ] - ds = ray.data.from_items([ - {"a": 1, "b": 1}, - {"a": 1, "b": 2}, - ]) + ds = ray.data.from_items( + [ + {"a": 1, "b": 1}, + {"a": 1, "b": 2}, + ] + ) # Currently, we don't support distinct on multiple columns. with pytest.raises(NotImplementedError): ds.distinct().take_all() From 1c382e0b0901024500e985747fe861cebda2a918 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 23 Jun 2023 10:30:07 +0200 Subject: [PATCH 7/8] fix small typo Signed-off-by: Philipp Moritz --- python/ray/data/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 94168a090b619..0cba3bd27c583 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1741,7 +1741,7 @@ def distinct(self) -> "Dataset": # TODO(hchen): Remove this limitation once groupby supports # multiple columns. raise NotImplementedError( - "`distinct` currently only suports Datasets with one single column, " + "`distinct` currently only supports Datasets with one single column, " "please apply `select_columns` before `distinct`." ) column = columns[0] From 0b72fa9d2db044c24fed6f5f8b8340798d16c2a6 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Fri, 23 Jun 2023 11:18:57 +0200 Subject: [PATCH 8/8] fix lint and formatting --- python/ray/data/dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 0cba3bd27c583..520c99014e00a 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -1730,7 +1730,8 @@ def distinct(self) -> "Dataset": Time complexity: O(dataset size * log(dataset size / parallelism)) - .. note:: Currently distinct only supports :class:`~ray.data.Dataset`s with one single column. + .. note:: Currently distinct only supports + :class:`~ray.data.Dataset` with one single column. Returns: A new :class:`~ray.data.Dataset` with distinct rows.