Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] [Docs] Update map_batches documentation #28435

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d88840f
Add `Dataset.native_batch_format`
bveeramani Sep 11, 2022
1db147f
[Datasets] [Docs] Update `map_batches` reference
bveeramani Sep 11, 2022
dcf01c7
Update dataset.py
bveeramani Sep 11, 2022
8c21deb
Update dataset.py
bveeramani Sep 11, 2022
5d1f880
Update example
bveeramani Sep 13, 2022
fe95ffa
Change name and implementation
bveeramani Sep 13, 2022
07b3a85
Rename "native" as "default"
bveeramani Sep 13, 2022
35c12a3
Update docs
bveeramani Sep 13, 2022
56df5f5
Add warnings
bveeramani Sep 13, 2022
a596e4d
Merge branch 'bveeramani/native-to-default' into bveeramani/native-ba…
bveeramani Sep 13, 2022
a7724aa
Update dataset.py
bveeramani Sep 15, 2022
7045ebd
Merge remote-tracking branch 'upstream/master' into bveeramani/map-ba…
bveeramani Sep 15, 2022
067292a
Merge branch 'bveeramani/native-batch-format' into bveeramani/map-bat…
bveeramani Sep 15, 2022
472c00f
Update dataset.rst
bveeramani Sep 15, 2022
d9eb36a
Merge remote-tracking branch 'upstream/master' into bveeramani/native…
bveeramani Sep 15, 2022
99e1c16
Merge branch 'bveeramani/native-batch-format' into bveeramani/map-bat…
bveeramani Sep 15, 2022
6623897
Update dataset.py
bveeramani Sep 15, 2022
9b3db87
Update dataset.py
bveeramani Sep 15, 2022
28d9ad2
Update dataset.py
bveeramani Sep 15, 2022
acf27e7
Address review comments
bveeramani Sep 15, 2022
86bdfba
Address suggestion
bveeramani Sep 22, 2022
7958a72
Merge branch 'master' into bveeramani/map-batches-example
bveeramani Sep 22, 2022
ab49e64
Update dataset.py
bveeramani Sep 22, 2022
bcaed20
Update dataset.py
bveeramani Sep 23, 2022
e137def
Delete model
bveeramani Sep 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions doc/source/data/api/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Dataset API

ray.data.Dataset.count
ray.data.Dataset.schema
ray.data.Dataset.native_batch_format
ray.data.Dataset.num_blocks
ray.data.Dataset.size_bytes
ray.data.Dataset.input_files
Expand Down Expand Up @@ -256,30 +257,32 @@ Inspecting Metadata

.. automethod:: ray.data.Dataset.schema

.. automethod:: ray.data.Dataset.native_batch_format

.. automethod:: ray.data.Dataset.num_blocks

.. automethod:: ray.data.Dataset.size_bytes

.. automethod:: ray.data.Dataset.input_files

.. automethod:: ray.data.Dataset.stats

.. automethod:: ray.data.Dataset.get_internal_block_refs

Execution
---------

.. automethod:: ray.data.Dataset.fully_executed

.. automethod:: ray.data.Dataset.is_fully_executed

.. automethod:: ray.data.Dataset.lazy

Serialization
-------------

.. automethod:: ray.data.Dataset.has_serializable_lineage

.. automethod:: ray.data.Dataset.serialize_lineage
.. automethod:: ray.data.Dataset.deserialize_lineage

.. automethod:: ray.data.Dataset.deserialize_lineage
219 changes: 159 additions & 60 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Iterable,
Iterator,
List,
Type,
Optional,
Tuple,
Union,
Expand Down Expand Up @@ -317,88 +318,124 @@ def map_batches(
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
**ray_remote_args,
) -> "Dataset[Any]":
"""Apply the given function to batches of records of this dataset.
"""Apply the given function to batches of data.

The format of the data batch provided to ``fn`` can be controlled via the
``batch_format`` argument, and the output of the UDF can be any batch type.
Batches are represented as dataframes, arrays, or lists. The default batch
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
type is determined by your dataset's schema. To determine the default batch
type, call :meth:`~Dataset.native_batch_format`. Alternatively, set the batch
type with ``batch_format``.

This is a blocking operation.
To learn more about writing functions for :meth:`~Dataset.map_batches`, read
:ref:`writing user-defined functions <transform_datasets_writing_udfs>`.

.. note::
This is a blocking operation. The time complexity is :math:`O(n/p)`, where
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
:math:`n` is the dataset size and :math:`p` is the parallelism.
bveeramani marked this conversation as resolved.
Show resolved Hide resolved

.. tip::
Consider using :class:`~ray.data.preprocessors.BatchMapper`. It's more
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
performant and easier to use.

Examples:

>>> import pandas as pd
>>> import ray
>>> # Transform python objects.
>>> ds = ray.data.range(1000)
>>> # Transform batches in parallel.
>>> ds.map_batches(lambda batch: [v * 2 for v in batch])
Dataset(num_blocks=..., num_rows=1000, schema=<class 'int'>)
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> init_model = ... # doctest: +SKIP
>>> df = pd.DataFrame({
... "name": ["?", "Grace Hopper", "Rafael Nadal"],
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
... "age": ["22", "85", "36"]
... })
>>> ds = ray.data.from_pandas(df)
>>> ds
Dataset(num_blocks=1, num_rows=3, schema={name: object, age: object})

Call :meth:`~Dataset.native_batch_format` to determine the default batch
type.

>>> ds.native_batch_format()
<class 'pandas.core.frame.DataFrame'>

Once you know the batch type, define a function that transforms batches
of data. :meth:`~Dataset.map_batches` applies the function in parallel.
bveeramani marked this conversation as resolved.
Show resolved Hide resolved

>>> def map_fn(batch: pd.DataFrame) -> pd.DataFrame:
... batch["age"] = batch["age"].astype(int)
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
... return batch
>>> ds = ds.map_batches(map_fn)
>>> ds
Dataset(num_blocks=1, num_rows=3, schema={name: object, age: int64})

Your ``fn`` can return a different type than the input type. To learn more
about supported output types, read
:ref:`user-defined function output types <transform_datasets_batch_output_types>`.

>>> from typing import List
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
>>> def map_fn(batch: pd.DataFrame) -> List[int]:
... return list(batch["age"])
>>> ds = ds.map_batches(map_fn)
>>> ds
Dataset(num_blocks=1, num_rows=3, schema=<class 'int'>)

Shared state can improve the performance of some workloads. To persist state
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
across function invocations, pass a callable type to ``fn`` and specify an
:class:`ActorPoolStrategy <ray.data.ActorPoolStrategy>`.
bveeramani marked this conversation as resolved.
Show resolved Hide resolved

In the example below, ``CachedModel`` is called on an autoscaling pool of
two to eight :ref:`Ray actors <actor-guide>`, each allocated one GPU by Ray.

>>> from ray.data import ActorPoolStrategy
>>> init_large_model = ... # doctest: +SKIP
>>> class CachedModel:
... def __init__(self):
... self.model = init_model()
... self.model = init_large_model()
... def __call__(self, item):
... return self.model(item)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute=ActorPoolStrategy(2, 8) the transform will be applied on an
>>> # autoscaling pool of 2-8 Ray actors, each allocated 1 GPU by Ray.
>>> from ray.data._internal.compute import ActorPoolStrategy
>>> ds.map_batches( # doctest: +SKIP
... CachedModel, # doctest: +SKIP
... batch_size=256, # doctest: +SKIP
... compute=ActorPoolStrategy(2, 8), # doctest: +SKIP
... num_gpus=1) # doctest: +SKIP

You can use ``map_batches`` to efficiently filter records.

>>> import ray
>>> ds = ray.data.range(10000)
>>> ds.count()
10000
>>> ds = ds.map_batches(lambda batch: [x for x in batch if x % 2 == 0])
>>> ds.count()
5000

Time complexity: O(dataset size / parallelism)
... num_gpus=1
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
... ) # doctest: +SKIP

Args:
fn: The function to apply to each record batch, or a class type
that can be instantiated to create such a callable. Callable classes are
only supported for the actor compute strategy.
batch_size: The number of rows in each batch, or None to use entire blocks
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows.
Defaults to 4096.
compute: The compute strategy, either "tasks" (default) to use Ray
tasks, or "actors" to use an autoscaling actor pool. If wanting to
configure the min or max size of the autoscaling actor pool, you can
provide an
:class:`ActorPoolStrategy(min, max) <ray.data.ActorPoolStrategy>`
instance. If using callable classes for fn, the actor compute strategy
must be used.
batch_format: Specify "native" to use the native block format (promotes
tables to Pandas and tensors to NumPy), "pandas" to select
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or "numpy"
to select ``numpy.ndarray`` for tensor datasets and
batch_size: The number of rows in each batch, or ``None`` to use entire
blocks as batches. Blocks can contain different number of rows, and
the last batch can include fewer than ``batch_size`` rows. Defaults to
``4096``.
compute: The compute strategy, either ``"tasks"`` (default) to use Ray
tasks, or ``"actors"`` to use an autoscaling actor pool. If you want to
configure the size of the autoscaling actor pool, provide an
:class:`ActorPoolStrategy <ray.data.ActorPoolStrategy>` instance.
If you're passing callable type to ``fn``, you must pass an
:class:`ActorPoolStrategy <ray.data.ActorPoolStrategy>`.
batch_format: Specify ``"native"`` to use the native block format (promotes
tables to Pandas and tensors to NumPy), ``"pandas"`` to select
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or
``"numpy"`` to select ``numpy.ndarray`` for tensor datasets and
``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "native".
fn_args: Positional arguments to pass to ``fn``, after the data batch. These
arguments will be top-level arguments in the underlying Ray task that's
submitted.
fn_kwargs: Keyword arguments to pass to ``fn``. These arguments will be
top-level arguments in the underlying Ray task that's submitted.
fn_args: Positional arguments to pass to ``fn`` after the first argument.
These arguments are top-level arguments to the underlying Ray task.
fn_kwargs: Keyword arguments to pass to ``fn``. These arguments are
top-level arguments to the underlying Ray task.
fn_constructor_args: Positional arguments to pass to ``fn``'s constructor.
This can only be provided if ``fn`` is a callable class and the actor
compute strategy is being used. These arguments will be top-level
arguments in the underlying Ray actor construction task that's
submitted.
You can only provide this if ``fn`` is a callable class. These arguments
are top-level arguments in the underlying Ray actor construction task.
fn_constructor_kwargs: Keyword arguments to pass to ``fn``'s constructor.
This can only be provided if ``fn`` is a callable class and the actor
compute strategy is being used. These arguments will be top-level
arguments in the underlying Ray actor construction task that's
submitted.
This can only be provided if ``fn`` is a callable class. These arguments
are top-level arguments in the underlying Ray actor construction task.
ray_remote_args: Additional resource requirements to request from
ray (e.g., num_gpus=1 to request GPUs for the map tasks).
"""
ray (e.g., ``num_gpus=1`` to request GPUs for the map tasks).

.. seealso::

:meth:`~Dataset.iter_batches`
Call this function to iterate over batches of data.

:meth:`~Dataset.native_batch_format`
Call this function to determine the default batch type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to learn? User cannot choose default type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "determine" is okay in this context.

This is one of the Google definitions:

ascertain or establish exactly, typically as a result of research or calculation.
"the point of our study was to determine what is true, not what is practicable"

""" # noqa: E501
import pandas as pd
import pyarrow as pa

Expand Down Expand Up @@ -3553,6 +3590,68 @@ def _divide(self, block_idx: int) -> ("Dataset[T]", "Dataset[T]"):
)
return l_ds, r_ds

def native_batch_format(self) -> Type:
"""Return this dataset's native batch format.

The native batch format describes what batches of data look like. To learn more
about batch formats, read
:ref:`writing user-defined functions <transform_datasets_writing_udfs>`.

Example:

If your dataset represents a list of Python objects, then the native batch
format is ``list``.

>>> ds = ray.data.range(100)
>>> ds
Dataset(num_blocks=20, num_rows=100, schema=<class 'int'>)
>>> ds.native_batch_format()
<class 'list'>
>>> next(ds.iter_batches(batch_size=4))
[0, 1, 2, 3]

If your dataset contains a single column of ``TensorDtype`` or
``ArrowTensorType``, then the native batch format is ``ndarray``.

>>> ds = ray.data.range_tensor(100)
>>> ds
Dataset(num_blocks=20, num_rows=100, schema={__value__: ArrowTensorType(shape=(1,), dtype=int64)})
>>> ds.native_batch_format()
<class 'numpy.ndarray'>
>>> next(ds.iter_batches(batch_size=4))
array([[0],
[1],
[2],
[3]])

If your dataset represents structured data and the native batch format isn't
``ndarray``, then the native batch format is ``DataFrame``.

>>> import pandas as pd
>>> df = pd.DataFrame({"foo": ["a", "b"], "bar": [0, 1]})
>>> ds = ray.data.from_pandas(df)
>>> ds
Dataset(num_blocks=1, num_rows=2, schema={foo: object, bar: int64})
>>> ds.native_batch_format()
<class 'pandas.core.frame.DataFrame'>
>>> next(ds.iter_batches(batch_size=4))
foo bar
0 a 0
1 b 1

.. seealso::

:meth:`~Dataset.map_batches`
Call this function to transform batches of data.

:meth:`~Dataset.iter_batches`
Call this function to iterate over batches of data.

""" # noqa: E501
block = ray.get(next(self._plan.execute().iter_blocks()))
native_batch = BlockAccessor.for_block(block).to_native()
return type(native_batch)

def _dataset_format(self) -> str:
"""Determine the format of the dataset. Possible values are: "arrow",
"pandas", "simple".
Expand Down
12 changes: 12 additions & 0 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4905,6 +4905,18 @@ def f(x):
), "Number of actors is out of the expected bound"


def test_native_batch_format(shutdown_only):
ds = ray.data.range(100)
assert ds.native_batch_format() == list

ds = ray.data.range_tensor(100)
assert ds.native_batch_format() == np.ndarray

df = pd.DataFrame({"foo": ["a", "b"], "bar": [0, 1]})
ds = ray.data.from_pandas(df)
assert ds.native_batch_format() == pd.DataFrame


if __name__ == "__main__":
import sys

Expand Down