Skip to content

Commit

Permalink
[Data] Add num_cpus and num_gpus as top-level args to map functio…
Browse files Browse the repository at this point in the history
…ns (ray-project#35486)

Add num_cpus and num_gpus as explicit top level argument to map functions (map, map_batches, flat_map). This is not an API change as these arguments were already previously accepted via **ray_remote_args. However, they now show up on API references and IDE hints.
  • Loading branch information
amogkam authored and scv119 committed Jun 11, 2023
1 parent ba266ff commit 8fc5e4c
Showing 1 changed file with 38 additions and 3 deletions.
41 changes: 38 additions & 3 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ def map(
fn: UserDefinedFunction[Dict[str, Any], Dict[str, Any]],
*,
compute: Optional[ComputeStrategy] = None,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
**ray_remote_args,
) -> "Dataset":
"""Apply the given function to each record of this dataset.
Expand Down Expand Up @@ -328,8 +330,12 @@ def map(
tasks, ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed-size actor
pool, or ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` for an
autoscaling actor pool.
num_cpus: The number of CPUs to reserve for each parallel map worker.
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
ray_remote_args: Additional resource requirements to request from
ray (e.g., num_gpus=1 to request GPUs for the map tasks).
ray for each map worker.
.. seealso::
Expand All @@ -350,6 +356,12 @@ def map(

transform_fn = generate_map_rows_fn()

if num_cpus is not None:
ray_remote_args["num_cpus"] = num_cpus

if num_gpus is not None:
ray_remote_args["num_gpus"] = num_gpus

plan = self._plan.with_stage(
OneToOneStage(
"Map",
Expand Down Expand Up @@ -383,6 +395,8 @@ def map_batches(
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
**ray_remote_args,
) -> "Dataset":
"""Apply the given function to batches of data.
Expand Down Expand Up @@ -515,8 +529,11 @@ def map_batches(
fn_constructor_kwargs: Keyword arguments to pass to ``fn``'s constructor.
This can only be provided if ``fn`` is a callable class. These arguments
are top-level arguments in the underlying Ray actor construction task.
num_cpus: The number of CPUs to reserve for each parallel map worker.
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map worker.
ray_remote_args: Additional resource requirements to request from
ray (e.g., ``num_gpus=1`` to request GPUs for the map tasks).
ray for each map worker.
.. seealso::
Expand Down Expand Up @@ -583,6 +600,12 @@ def map_batches(
zero_copy_batch=zero_copy_batch,
)

if num_cpus is not None:
ray_remote_args["num_cpus"] = num_cpus

if num_gpus is not None:
ray_remote_args["num_gpus"] = num_gpus

# TODO(chengsu): pass function name to MapBatches logical operator.
if hasattr(fn, "__self__") and isinstance(
fn.__self__, ray.data.preprocessor.Preprocessor
Expand Down Expand Up @@ -769,6 +792,8 @@ def flat_map(
fn: UserDefinedFunction[Dict[str, Any], List[Dict[str, Any]]],
*,
compute: Optional[ComputeStrategy] = None,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
**ray_remote_args,
) -> "Dataset":
"""Apply the given function to each record and then flatten results.
Expand All @@ -793,8 +818,12 @@ def flat_map(
tasks, ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed-size actor
pool, or ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` for an
autoscaling actor pool.
num_cpus: The number of CPUs to reserve for each parallel map worker.
num_gpus: The number of GPUs to reserve for each parallel map worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel map
worker.
ray_remote_args: Additional resource requirements to request from
ray (e.g., num_gpus=1 to request GPUs for the map tasks).
ray for each map worker.
.. seealso::
Expand All @@ -813,6 +842,12 @@ def flat_map(

transform_fn = generate_flat_map_fn()

if num_cpus is not None:
ray_remote_args["num_cpus"] = num_cpus

if num_gpus is not None:
ray_remote_args["num_gpus"] = num_gpus

plan = self._plan.with_stage(
OneToOneStage("FlatMap", transform_fn, compute, ray_remote_args, fn=fn)
)
Expand Down

0 comments on commit 8fc5e4c

Please sign in to comment.