Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] [Docs] Update map_batches documentation #28435

Merged
Merged
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d88840f
Add `Dataset.native_batch_format`
bveeramani Sep 11, 2022
1db147f
[Datasets] [Docs] Update `map_batches` reference
bveeramani Sep 11, 2022
dcf01c7
Update dataset.py
bveeramani Sep 11, 2022
8c21deb
Update dataset.py
bveeramani Sep 11, 2022
5d1f880
Update example
bveeramani Sep 13, 2022
fe95ffa
Change name and implementation
bveeramani Sep 13, 2022
07b3a85
Rename "native" as "default"
bveeramani Sep 13, 2022
35c12a3
Update docs
bveeramani Sep 13, 2022
56df5f5
Add warnings
bveeramani Sep 13, 2022
a596e4d
Merge branch 'bveeramani/native-to-default' into bveeramani/native-ba…
bveeramani Sep 13, 2022
a7724aa
Update dataset.py
bveeramani Sep 15, 2022
7045ebd
Merge remote-tracking branch 'upstream/master' into bveeramani/map-ba…
bveeramani Sep 15, 2022
067292a
Merge branch 'bveeramani/native-batch-format' into bveeramani/map-bat…
bveeramani Sep 15, 2022
472c00f
Update dataset.rst
bveeramani Sep 15, 2022
d9eb36a
Merge remote-tracking branch 'upstream/master' into bveeramani/native…
bveeramani Sep 15, 2022
99e1c16
Merge branch 'bveeramani/native-batch-format' into bveeramani/map-bat…
bveeramani Sep 15, 2022
6623897
Update dataset.py
bveeramani Sep 15, 2022
9b3db87
Update dataset.py
bveeramani Sep 15, 2022
28d9ad2
Update dataset.py
bveeramani Sep 15, 2022
acf27e7
Address review comments
bveeramani Sep 15, 2022
86bdfba
Address suggestion
bveeramani Sep 22, 2022
7958a72
Merge branch 'master' into bveeramani/map-batches-example
bveeramani Sep 22, 2022
ab49e64
Update dataset.py
bveeramani Sep 22, 2022
bcaed20
Update dataset.py
bveeramani Sep 23, 2022
e137def
Delete model
bveeramani Sep 23, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 101 additions & 60 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,88 +326,129 @@ def map_batches(
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
**ray_remote_args,
) -> "Dataset[Any]":
"""Apply the given function to batches of records of this dataset.
"""Apply the given function to batches of data.

The format of the data batch provided to ``fn`` can be controlled via the
``batch_format`` argument, and the output of the UDF can be any batch type.
Batches are represented as dataframes, ndarrays, or lists. The default batch
type is determined by your dataset's schema. To determine the default batch
type, call :meth:`~Dataset.default_batch_format`. Alternatively, set the batch
type with ``batch_format``.

This is a blocking operation.
To learn more about writing functions for :meth:`~Dataset.map_batches`, read
:ref:`writing user-defined functions <transform_datasets_writing_udfs>`.

.. tip::
If you're using :ref:`Ray AIR <air>` for training or batch inference,
consider using :class:`~ray.data.preprocessors.BatchMapper`. It's more
performant and easier to use.

Examples:

>>> import pandas as pd
>>> import ray
>>> # Transform python objects.
>>> ds = ray.data.range(1000)
>>> # Transform batches in parallel.
>>> ds.map_batches(lambda batch: [v * 2 for v in batch])
Dataset(num_blocks=..., num_rows=1000, schema=<class 'int'>)
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> init_model = ... # doctest: +SKIP
>>> df = pd.DataFrame({
... "name": ["Luna", "Rory", "Scout"],
... "age": [4, 14, 9]
... })
>>> ds = ray.data.from_pandas(df)
>>> ds
Dataset(num_blocks=1, num_rows=3, schema={name: object, age: int64})

Call :meth:`.default_batch_format` to determine the default batch
type.

>>> ds.default_batch_format()
<class 'pandas.core.frame.DataFrame'>

.. tip::

Datasets created from tabular data like Arrow tables and Parquet files
yield ``pd.DataFrame`` batches.

Once you know the batch type, define a function that transforms batches
of data. ``ds.map_batches`` applies the function in parallel.

>>> def map_fn(batch: pd.DataFrame) -> pd.DataFrame:
... batch["age_in_dog_years"] = 7 * batch["age"]
... return batch
>>> ds = ds.map_batches(map_fn)
>>> ds
Dataset(num_blocks=1, num_rows=3, schema={name: object, age: int64, age_in_dog_years: int64})

Your ``fn`` can return a different type than the input type. To learn more
about supported output types, read
:ref:`user-defined function output types <transform_datasets_batch_output_types>`.

>>> from typing import List
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
>>> def map_fn(batch: pd.DataFrame) -> List[int]:
... return list(batch["age_in_dog_years"])
>>> ds = ds.map_batches(map_fn)
>>> ds
Dataset(num_blocks=1, num_rows=3, schema=<class 'int'>)

:ref:`Actors <actor-guide>` can improve the performance of some workloads.
For example, you can use :ref:`actors <actor-guide>` to load a model once
per worker instead of once per inference.

To transform batches with :ref:`actors <actor-guide>`, pass a callable type
to ``fn`` and specify an :class:`~ray.data.ActorPoolStrategy>`.

In the example below, ``CachedModel`` is called on an autoscaling pool of
two to eight :ref:`actors <actor-guide>`, each allocated one GPU by Ray.

>>> from ray.data import ActorPoolStrategy
>>> init_large_model = ... # doctest: +SKIP
>>> class CachedModel:
... def __init__(self):
... self.model = init_model()
... self.model = init_large_model()
... def __call__(self, item):
... return self.model(item)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute=ActorPoolStrategy(2, 8) the transform will be applied on an
>>> # autoscaling pool of 2-8 Ray actors, each allocated 1 GPU by Ray.
>>> from ray.data._internal.compute import ActorPoolStrategy
>>> ds.map_batches( # doctest: +SKIP
... CachedModel, # doctest: +SKIP
... batch_size=256, # doctest: +SKIP
... compute=ActorPoolStrategy(2, 8), # doctest: +SKIP
... num_gpus=1) # doctest: +SKIP

You can use ``map_batches`` to efficiently filter records.

>>> import ray
>>> ds = ray.data.range(10000)
>>> ds.count()
10000
>>> ds = ds.map_batches(lambda batch: [x for x in batch if x % 2 == 0])
>>> ds.count()
5000

Time complexity: O(dataset size / parallelism)
... num_gpus=1,
... ) # doctest: +SKIP

Args:
fn: The function to apply to each record batch, or a class type
that can be instantiated to create such a callable. Callable classes are
only supported for the actor compute strategy.
batch_size: The number of rows in each batch, or None to use entire blocks
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows.
Defaults to 4096.
compute: The compute strategy, either "tasks" (default) to use Ray
tasks, or "actors" to use an autoscaling actor pool. If wanting to
configure the min or max size of the autoscaling actor pool, you can
provide an
:class:`ActorPoolStrategy(min, max) <ray.data.ActorPoolStrategy>`
instance. If using callable classes for fn, the actor compute strategy
must be used.
batch_format: Specify "default" to use the default block format (promotes
tables to Pandas and tensors to NumPy), "pandas" to select
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or "numpy"
to select ``numpy.ndarray`` for tensor datasets and
batch_size: The number of rows in each batch, or ``None`` to use entire
blocks as batches. Blocks can contain different number of rows, and
the last batch can include fewer than ``batch_size`` rows. Defaults to
``4096``.
compute: The compute strategy, either ``"tasks"`` (default) to use Ray
tasks, or ``"actors"`` to use an autoscaling actor pool. If you want to
configure the size of the autoscaling actor pool, provide an
:class:`ActorPoolStrategy <ray.data.ActorPoolStrategy>` instance.
If you're passing callable type to ``fn``, you must pass an
:class:`ActorPoolStrategy <ray.data.ActorPoolStrategy>`.
batch_format: Specify ``"default"`` to use the default block format
(promotes tables to Pandas and tensors to NumPy), ``"pandas"`` to select
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or
``"numpy"`` to select ``numpy.ndarray`` for tensor datasets and
``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "default".
fn_args: Positional arguments to pass to ``fn``, after the data batch. These
arguments will be top-level arguments in the underlying Ray task that's
submitted.
fn_kwargs: Keyword arguments to pass to ``fn``. These arguments will be
top-level arguments in the underlying Ray task that's submitted.
fn_args: Positional arguments to pass to ``fn`` after the first argument.
These arguments are top-level arguments to the underlying Ray task.
fn_kwargs: Keyword arguments to pass to ``fn``. These arguments are
top-level arguments to the underlying Ray task.
fn_constructor_args: Positional arguments to pass to ``fn``'s constructor.
This can only be provided if ``fn`` is a callable class and the actor
compute strategy is being used. These arguments will be top-level
arguments in the underlying Ray actor construction task that's
submitted.
You can only provide this if ``fn`` is a callable class. These arguments
are top-level arguments in the underlying Ray actor construction task.
fn_constructor_kwargs: Keyword arguments to pass to ``fn``'s constructor.
This can only be provided if ``fn`` is a callable class and the actor
compute strategy is being used. These arguments will be top-level
arguments in the underlying Ray actor construction task that's
submitted.
This can only be provided if ``fn`` is a callable class. These arguments
are top-level arguments in the underlying Ray actor construction task.
ray_remote_args: Additional resource requirements to request from
ray (e.g., num_gpus=1 to request GPUs for the map tasks).
"""
ray (e.g., ``num_gpus=1`` to request GPUs for the map tasks).

.. seealso::

:meth:`~Dataset.iter_batches`
Call this function to iterate over batches of data.

:meth:`~Dataset.default_batch_format`
Call this function to determine the default batch type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to learn? User cannot choose default type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "determine" is okay in this context.

This is one of the Google definitions:

ascertain or establish exactly, typically as a result of research or calculation.
"the point of our study was to determine what is true, not what is practicable"

""" # noqa: E501
import pandas as pd
import pyarrow as pa

Expand Down