Skip to content

Commit

Permalink
[data] Implement Dataset.distinct (ray-project#36655)
Browse files Browse the repository at this point in the history
Implement `Dataset.distinct`. Currently this API only supports Datasets with one single column. This is because `groupby` doesn't support multiple columns yet.

Co-authored-by: Philipp Moritz <pcmoritz@gmail.com>
Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
  • Loading branch information
2 people authored and arvind-chandra committed Aug 31, 2023
1 parent 9289fd6 commit cb16282
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 0 deletions.
1 change: 1 addition & 0 deletions doc/source/data/api/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Grouped and Global Aggregations
:toctree: doc/

Dataset.groupby
Dataset.distinct
Dataset.aggregate
Dataset.sum
Dataset.min
Expand Down
29 changes: 29 additions & 0 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1717,6 +1717,35 @@ def groupby(self, key: Optional[str]) -> "GroupedData":

return GroupedData(self, key)

def distinct(self) -> "Dataset":
"""Remove duplicate rows from the :class:`~ray.data.Dataset`.
Examples:
>>> import ray
>>> ds = ray.data.from_items([1, 2, 3, 2, 3])
>>> ds.distinct().take_all()
[{'item': 1}, {'item': 2}, {'item': 3}]
Time complexity: O(dataset size * log(dataset size / parallelism))
.. note:: Currently distinct only supports
:class:`~ray.data.Dataset` with one single column.
Returns:
A new :class:`~ray.data.Dataset` with distinct rows.
"""
columns = self.columns(fetch_if_missing=True)
assert columns is not None
if len(columns) > 1:
# TODO(hchen): Remove this limitation once groupby supports
# multiple columns.
raise NotImplementedError(
"`distinct` currently only supports Datasets with one single column, "
"please apply `select_columns` before `distinct`."
)
column = columns[0]
return self.groupby(column).count().select_columns([column])

@ConsumptionAPI
def aggregate(self, *aggs: AggregateFn) -> Union[Any, Dict[str, Any]]:
"""Aggregate the entire dataset as one group.
Expand Down
20 changes: 20 additions & 0 deletions python/ray/data/tests/test_all_to_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,26 @@ def test_repartition_shuffle_arrow(ray_start_regular_shared):
assert large._block_num_rows() == [500] * 20


def test_distinct(ray_start_regular_shared):
ds = ray.data.from_items([3, 2, 3, 1, 2, 3])
assert ds.distinct().sort("item").take_all() == [
{"item": 1},
{"item": 2},
{"item": 3},
]
ds = ray.data.from_items(
[
{"a": 1, "b": 1},
{"a": 1, "b": 2},
]
)
# Currently, we don't support distinct on multiple columns.
with pytest.raises(NotImplementedError):
ds.distinct().take_all()
# After selecting a single column, distinct should work.
assert ds.select_columns(["a"]).distinct().take_all() == [{"a": 1}]


def test_grouped_dataset_repr(ray_start_regular_shared):
ds = ray.data.from_items([{"key": "spam"}, {"key": "ham"}, {"key": "spam"}])
assert repr(ds.groupby("key")) == f"GroupedData(dataset={ds!r}, key='key')"
Expand Down

0 comments on commit cb16282

Please sign in to comment.