Skip to content

Commit

Permalink
[Datasets] [Docs] Update map_batches documentation (#28435)
Browse files Browse the repository at this point in the history
- Users aren't aware of BatchMapper
- Users can't figure out how to write UDFs, and there's no link to the relevant user guide
- Users get confused by the examples
  • Loading branch information
bveeramani authored Sep 27, 2022
1 parent 7bc265c commit b233222
Showing 1 changed file with 101 additions and 60 deletions.
161 changes: 101 additions & 60 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,88 +326,129 @@ def map_batches(
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
**ray_remote_args,
) -> "Dataset[Any]":
"""Apply the given function to batches of records of this dataset.
"""Apply the given function to batches of data.
The format of the data batch provided to ``fn`` can be controlled via the
``batch_format`` argument, and the output of the UDF can be any batch type.
Batches are represented as dataframes, ndarrays, or lists. The default batch
type is determined by your dataset's schema. To determine the default batch
type, call :meth:`~Dataset.default_batch_format`. Alternatively, set the batch
type with ``batch_format``.
This is a blocking operation.
To learn more about writing functions for :meth:`~Dataset.map_batches`, read
:ref:`writing user-defined functions <transform_datasets_writing_udfs>`.
.. tip::
If you're using :ref:`Ray AIR <air>` for training or batch inference,
consider using :class:`~ray.data.preprocessors.BatchMapper`. It's more
performant and easier to use.
Examples:
>>> import pandas as pd
>>> import ray
>>> # Transform python objects.
>>> ds = ray.data.range(1000)
>>> # Transform batches in parallel.
>>> ds.map_batches(lambda batch: [v * 2 for v in batch])
Dataset(num_blocks=..., num_rows=1000, schema=<class 'int'>)
>>> # Define a callable class that persists state across
>>> # function invocations for efficiency.
>>> init_model = ... # doctest: +SKIP
>>> df = pd.DataFrame({
... "name": ["Luna", "Rory", "Scout"],
... "age": [4, 14, 9]
... })
>>> ds = ray.data.from_pandas(df)
>>> ds
Dataset(num_blocks=1, num_rows=3, schema={name: object, age: int64})
Call :meth:`.default_batch_format` to determine the default batch
type.
>>> ds.default_batch_format()
<class 'pandas.core.frame.DataFrame'>
.. tip::
Datasets created from tabular data like Arrow tables and Parquet files
yield ``pd.DataFrame`` batches.
Once you know the batch type, define a function that transforms batches
of data. ``ds.map_batches`` applies the function in parallel.
>>> def map_fn(batch: pd.DataFrame) -> pd.DataFrame:
... batch["age_in_dog_years"] = 7 * batch["age"]
... return batch
>>> ds = ds.map_batches(map_fn)
>>> ds
Dataset(num_blocks=1, num_rows=3, schema={name: object, age: int64, age_in_dog_years: int64})
Your ``fn`` can return a different type than the input type. To learn more
about supported output types, read
:ref:`user-defined function output types <transform_datasets_batch_output_types>`.
>>> from typing import List
>>> def map_fn(batch: pd.DataFrame) -> List[int]:
... return list(batch["age_in_dog_years"])
>>> ds = ds.map_batches(map_fn)
>>> ds
Dataset(num_blocks=1, num_rows=3, schema=<class 'int'>)
:ref:`Actors <actor-guide>` can improve the performance of some workloads.
For example, you can use :ref:`actors <actor-guide>` to load a model once
per worker instead of once per inference.
To transform batches with :ref:`actors <actor-guide>`, pass a callable type
to ``fn`` and specify an :class:`~ray.data.ActorPoolStrategy>`.
In the example below, ``CachedModel`` is called on an autoscaling pool of
two to eight :ref:`actors <actor-guide>`, each allocated one GPU by Ray.
>>> from ray.data import ActorPoolStrategy
>>> init_large_model = ... # doctest: +SKIP
>>> class CachedModel:
... def __init__(self):
... self.model = init_model()
... self.model = init_large_model()
... def __call__(self, item):
... return self.model(item)
>>> # Apply the transform in parallel on GPUs. Since
>>> # compute=ActorPoolStrategy(2, 8) the transform will be applied on an
>>> # autoscaling pool of 2-8 Ray actors, each allocated 1 GPU by Ray.
>>> from ray.data._internal.compute import ActorPoolStrategy
>>> ds.map_batches( # doctest: +SKIP
... CachedModel, # doctest: +SKIP
... batch_size=256, # doctest: +SKIP
... compute=ActorPoolStrategy(2, 8), # doctest: +SKIP
... num_gpus=1) # doctest: +SKIP
You can use ``map_batches`` to efficiently filter records.
>>> import ray
>>> ds = ray.data.range(10000)
>>> ds.count()
10000
>>> ds = ds.map_batches(lambda batch: [x for x in batch if x % 2 == 0])
>>> ds.count()
5000
Time complexity: O(dataset size / parallelism)
... num_gpus=1,
... ) # doctest: +SKIP
Args:
fn: The function to apply to each record batch, or a class type
that can be instantiated to create such a callable. Callable classes are
only supported for the actor compute strategy.
batch_size: The number of rows in each batch, or None to use entire blocks
as batches (blocks may contain different number of rows).
The final batch may include fewer than ``batch_size`` rows.
Defaults to 4096.
compute: The compute strategy, either "tasks" (default) to use Ray
tasks, or "actors" to use an autoscaling actor pool. If wanting to
configure the min or max size of the autoscaling actor pool, you can
provide an
:class:`ActorPoolStrategy(min, max) <ray.data.ActorPoolStrategy>`
instance. If using callable classes for fn, the actor compute strategy
must be used.
batch_format: Specify "default" to use the default block format (promotes
tables to Pandas and tensors to NumPy), "pandas" to select
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or "numpy"
to select ``numpy.ndarray`` for tensor datasets and
batch_size: The number of rows in each batch, or ``None`` to use entire
blocks as batches. Blocks can contain different number of rows, and
the last batch can include fewer than ``batch_size`` rows. Defaults to
``4096``.
compute: The compute strategy, either ``"tasks"`` (default) to use Ray
tasks, or ``"actors"`` to use an autoscaling actor pool. If you want to
configure the size of the autoscaling actor pool, provide an
:class:`ActorPoolStrategy <ray.data.ActorPoolStrategy>` instance.
If you're passing callable type to ``fn``, you must pass an
:class:`ActorPoolStrategy <ray.data.ActorPoolStrategy>`.
batch_format: Specify ``"default"`` to use the default block format
(promotes tables to Pandas and tensors to NumPy), ``"pandas"`` to select
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or
``"numpy"`` to select ``numpy.ndarray`` for tensor datasets and
``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "default".
fn_args: Positional arguments to pass to ``fn``, after the data batch. These
arguments will be top-level arguments in the underlying Ray task that's
submitted.
fn_kwargs: Keyword arguments to pass to ``fn``. These arguments will be
top-level arguments in the underlying Ray task that's submitted.
fn_args: Positional arguments to pass to ``fn`` after the first argument.
These arguments are top-level arguments to the underlying Ray task.
fn_kwargs: Keyword arguments to pass to ``fn``. These arguments are
top-level arguments to the underlying Ray task.
fn_constructor_args: Positional arguments to pass to ``fn``'s constructor.
This can only be provided if ``fn`` is a callable class and the actor
compute strategy is being used. These arguments will be top-level
arguments in the underlying Ray actor construction task that's
submitted.
You can only provide this if ``fn`` is a callable class. These arguments
are top-level arguments in the underlying Ray actor construction task.
fn_constructor_kwargs: Keyword arguments to pass to ``fn``'s constructor.
This can only be provided if ``fn`` is a callable class and the actor
compute strategy is being used. These arguments will be top-level
arguments in the underlying Ray actor construction task that's
submitted.
This can only be provided if ``fn`` is a callable class. These arguments
are top-level arguments in the underlying Ray actor construction task.
ray_remote_args: Additional resource requirements to request from
ray (e.g., num_gpus=1 to request GPUs for the map tasks).
"""
ray (e.g., ``num_gpus=1`` to request GPUs for the map tasks).
.. seealso::
:meth:`~Dataset.iter_batches`
Call this function to iterate over batches of data.
:meth:`~Dataset.default_batch_format`
Call this function to determine the default batch type.
""" # noqa: E501
import pandas as pd
import pyarrow as pa

Expand Down

0 comments on commit b233222

Please sign in to comment.