Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support generators to allow tasks to return a dynamic number of objects #28291

Merged
merged 37 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
5636eb9
Worker side
stephanie-wang Sep 4, 2022
3457a46
Owner side, works except for when spilling?
stephanie-wang Sep 5, 2022
031640b
now it works for spilling/in-plasma objects
stephanie-wang Sep 5, 2022
e59dd65
recovery test. TODO:
stephanie-wang Sep 5, 2022
775ff3e
Sort of fix nondeterminism
stephanie-wang Sep 6, 2022
b316b9f
Update src/ray/protobuf/node_manager.proto
stephanie-wang Sep 6, 2022
2d0f5d4
Update src/ray/protobuf/pubsub.proto
stephanie-wang Sep 6, 2022
faf64cb
C++
stephanie-wang Sep 6, 2022
5d92940
doc
stephanie-wang Sep 6, 2022
7c69b32
fixes
stephanie-wang Sep 7, 2022
c1ecf26
Update python/ray/_raylet.pyx
stephanie-wang Sep 8, 2022
422e4b6
minor
stephanie-wang Sep 9, 2022
62b427c
refactor
stephanie-wang Sep 9, 2022
636954c
ref counting during recovery
stephanie-wang Sep 10, 2022
0b73319
ref counting fix
stephanie-wang Sep 10, 2022
450d428
num_returns=dynamic
stephanie-wang Sep 10, 2022
a0bca43
Return generator instead of ObjRef
stephanie-wang Sep 12, 2022
6511a04
doc
stephanie-wang Sep 12, 2022
7ea1404
lint
stephanie-wang Sep 12, 2022
6e4663a
docs
stephanie-wang Sep 12, 2022
7bba5b7
doc
stephanie-wang Sep 12, 2022
c31b1c4
fixes
stephanie-wang Sep 13, 2022
167c6be
update
stephanie-wang Sep 13, 2022
44d9d85
fix
stephanie-wang Sep 13, 2022
39ba48f
Merge remote-tracking branch 'upstream/master' into generators-forreal
stephanie-wang Sep 13, 2022
805a088
x
stephanie-wang Sep 14, 2022
3360d63
cpp
stephanie-wang Sep 14, 2022
2ed252a
fix
stephanie-wang Sep 14, 2022
3a811dc
x
stephanie-wang Sep 14, 2022
285d98d
x
stephanie-wang Sep 15, 2022
7a86f19
Merge remote-tracking branch 'upstream/master' into generators-forreal
stephanie-wang Sep 17, 2022
b045d34
options
stephanie-wang Sep 17, 2022
7b3cf27
x
stephanie-wang Sep 19, 2022
92a8491
experimental
stephanie-wang Sep 21, 2022
492b95e
experimental
stephanie-wang Sep 21, 2022
07ca73d
x
stephanie-wang Sep 21, 2022
c196c67
fix
stephanie-wang Sep 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/ray/runtime/task/local_mode_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation,
local_mode_ray_tuntime_.GetCurrentTaskId(),
address,
1,
/*returns_dynamic=*/false,
required_resources,
required_placement_resources,
"",
Expand Down
15 changes: 9 additions & 6 deletions cpp/src/ray/runtime/task/task_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,17 @@ std::pair<Status, std::shared_ptr<msgpack::sbuffer>> GetExecuteResult(
}

Status TaskExecutor::ExecuteTask(
const rpc::Address &caller_address,
ray::TaskType task_type,
const std::string task_name,
const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<ray::RayObject>> &args_buffer,
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *returns,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *dynamic_returns,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
Expand Down Expand Up @@ -209,11 +210,10 @@ Status TaskExecutor::ExecuteTask(
data = std::make_shared<msgpack::sbuffer>(std::move(buf));
}

results->resize(return_ids.size(), nullptr);
if (task_type != ray::TaskType::ACTOR_CREATION_TASK) {
size_t data_size = data->size();
auto &result_id = return_ids[0];
auto result_ptr = &(*results)[0];
auto &result_id = (*returns)[0].first;
auto result_ptr = &(*returns)[0].second;
int64_t task_output_inlined_bytes = 0;

if (cross_lang && meta_buffer == nullptr) {
Expand Down Expand Up @@ -250,7 +250,10 @@ Status TaskExecutor::ExecuteTask(
}
}

RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result));
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(
result_id,
result,
/*generator_id=*/ObjectID::Nil()));
} else {
if (!status.ok()) {
return ray::Status::CreationTaskError("");
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/ray/runtime/task/task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,17 @@ class TaskExecutor {
absl::Mutex &actor_contexts_mutex);

static Status ExecuteTask(
const rpc::Address &caller_address,
ray::TaskType task_type,
const std::string task_name,
const RayFunction &ray_function,
const std::unordered_map<std::string, double> &required_resources,
const std::vector<std::shared_ptr<ray::RayObject>> &args,
const std::vector<rpc::ObjectReference> &arg_refs,
const std::vector<ObjectID> &return_ids,
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *returns,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *dynamic_returns,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
Expand Down
71 changes: 71 additions & 0 deletions doc/source/ray-core/doc_code/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# __program_start__
import ray
from ray import ObjectRefGenerator

# fmt: off
# __dynamic_generator_start__
import numpy as np


@ray.remote(num_returns="dynamic")
def split(array, chunk_size):
while len(array) > 0:
yield array[:chunk_size]
array = array[chunk_size:]


array_ref = ray.put(np.zeros(np.random.randint(1000_000)))
block_size = 1000

# Returns an ObjectRef[ObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
print(dynamic_ref)
# ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000)

i = -1
ref_generator = ray.get(dynamic_ref)
print(ref_generator)
# <ray._raylet.ObjectRefGenerator object at 0x7f7e2116b290>
for i, ref in enumerate(ref_generator):
# Each ObjectRefGenerator iteration returns an ObjectRef.
assert len(ray.get(ref)) <= block_size
num_blocks_generated = i + 1
array_size = len(ray.get(array_ref))
assert array_size <= num_blocks_generated * block_size
print(f"Split array of size {array_size} into {num_blocks_generated} blocks of "
f"size {block_size} each.")
# Split array of size 63153 into 64 blocks of size 1000 each.

# NOTE: The dynamic_ref points to the generated ObjectRefs. Make sure that this
# ObjectRef goes out of scope so that Ray can garbage-collect the internal
# ObjectRefs.
del dynamic_ref
# __dynamic_generator_end__
# fmt: on


# fmt: off
# __dynamic_generator_pass_start__
@ray.remote
def get_size(ref_generator : ObjectRefGenerator):
print(ref_generator)
num_elements = 0
for ref in ref_generator:
array = ray.get(ref)
assert len(array) <= block_size
num_elements += len(array)
return num_elements


# Returns an ObjectRef[ObjectRefGenerator].
dynamic_ref = split.remote(array_ref, block_size)
assert array_size == ray.get(get_size.remote(dynamic_ref))
# (get_size pid=1504184) <ray._raylet.ObjectRefGenerator object at 0x7f81c4250ad0>

# This also works, but should be avoided because you have to call an additional
# `ray.get`, which blocks the driver.
ref_generator = ray.get(dynamic_ref)
assert array_size == ray.get(get_size.remote(ref_generator))
# (get_size pid=1504184) <ray._raylet.ObjectRefGenerator object at 0x7f81c4251b50>
# __dynamic_generator_pass_end__
# fmt: on
2 changes: 1 addition & 1 deletion doc/source/ray-core/patterns/generators.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.. _generators:
.. _generator-pattern:

Pattern: Using generators to reduce heap memory usage
=====================================================
Expand Down
4 changes: 3 additions & 1 deletion doc/source/ray-core/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ By default, a Ray task only returns a single Object Ref. However, you can config

.. literalinclude:: doc_code/tasks_multiple_returns.py

For tasks that return multiple objects, Ray also supports remote generators that allow a task to return one object at a time to reduce memory usage at the worker. See the :ref:`user guide <generators>` for more details on use cases.

For tasks that return multiple objects, Ray also supports remote generators that allow a task to return one object at a time to reduce memory usage at the worker. Ray also supports an option to set the number of return values dynamically, which can be useful when the task caller does not know how many return values to expect. See the :ref:`user guide <generators>` for more details on use cases.

.. tabbed:: Python

Expand Down Expand Up @@ -213,6 +214,7 @@ More about Ray Tasks
tasks/resources.rst
tasks/using-ray-with-gpus.rst
tasks/nested-tasks.rst
tasks/generators.rst
tasks/fault-tolerance.rst
tasks/scheduling.rst
tasks/patterns/index.rst
86 changes: 86 additions & 0 deletions doc/source/ray-core/tasks/generators.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
.. _generators:

stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
Generators
==========

Python generators are functions that behave like an iterator, yielding one
value per iteration. Ray supports remote generators for two use cases:

1. To reduce max heap memory usage when returning multiple values from a remote
function. See the :ref:`design pattern guide <generator-pattern>` for an
example.
2. When the number of return values is set dynamically by the remote function
instead of by the caller.

Remote generators can be used in both actor and non-actor tasks.

`num_returns` set by the task caller
------------------------------------

Where possible, the caller should set the remote function's number of return values using ``@ray.remote(num_returns=x)`` or ``foo.options(num_returns=x).remote()``.
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
Ray will return this many ``ObjectRefs`` to the caller.
The remote task should then return the same number of values, usually as a tuple or list.
Compared to setting the number of return values dynamically, this adds less complexity to user code and less performance overhead, as Ray will know exactly how many ``ObjectRefs`` to return to the caller ahead of time.

Without changing the caller's syntax, we can also use a remote generator function to return the values iteratively.
The generator should return the same number of return values specified by the caller, and these will be stored one at a time in Ray's object store.
An error will be returned for generators that return a different number of values from the one specified by the caller.

For example, we can swap the following code that returns a list of return values:

.. literalinclude:: ../doc_code/pattern_generators.py
:language: python
:start-after: __large_values_start__
:end-before: __large_values_end__

for this code, which uses a generator function:

.. literalinclude:: ../doc_code/pattern_generators.py
:language: python
:start-after: __large_values_generator_start__
:end-before: __large_values_generator_end__

The advantage of doing so is that the generator function does not need to hold all of its return values in memory at once.
It can return the arrays one at a time to reduce memory pressure.

`num_returns` set by the task executor
--------------------------------------

In some cases, the caller may not know the number of return values to expect from a remote function.
For example, suppose we want to write a task that breaks up its argument into equal-size chunks and returns these.
We may not know the size of the argument until we execute the task, so we don't know the number of return values to expect.

In these cases, we can use a remote generator function that returns a *dynamic* number of values.
To use this feature, set ``num_returns="dynamic"`` in the ``@ray.remote`` decorator or the remote function's ``.options()``.
Then, when invoking the remote function, Ray will return a *single* ``ObjectRef`` that will get populated with an ``ObjectRefGenerator`` when the task completes.
The ``ObjectRefGenerator`` can be used to iterate over a list of ``ObjectRefs`` containing the actual values returned by the task.

.. note:: ``num_returns="dynamic"`` is currently an experimental API in v2.1+.

.. literalinclude:: ../doc_code/generator.py
:language: python
:start-after: __dynamic_generator_start__
:end-before: __dynamic_generator_end__

We can also pass the ``ObjectRef`` returned by a task with ``num_returns="dynamic"`` to another task. The task will receive the ``ObjectRefGenerator``, which it can use to iterate over the task's return values. Similarly, you can also pass an ``ObjectRefGenerator`` as a task argument.

.. literalinclude:: ../doc_code/generator.py
:language: python
:start-after: __dynamic_generator_pass_start__
:end-before: __dynamic_generator_pass_end__

Limitations
-----------

Although a generator function creates ``ObjectRefs`` one at a time, currently Ray will not schedule dependent tasks until the entire task is complete and all values have been created. This is similar to the semantics used by tasks that return multiple values as a list.

``num_returns="dynamic"`` is not yet supported for actor tasks.

If a generator function raises an exception before yielding all its values, all values returned by the generator will be replaced by the exception traceback, including values that were already successfully yielded.
If the task was called with ``num_returns="dynamic"``, the exception will be stored in the ``ObjectRef`` returned by the task instead of the usual ``ObjectRefGenerator``.

Note that there is currently a known bug where exceptions will not be propagated for generators that yield objects in Ray's shared-memory object store before erroring. In this case, these objects will still be accessible through the returned ``ObjectRefs`` and you may see an error like the following:

.. code-block:: text

$ ERROR worker.py:754 -- Generator threw exception after returning partial values in the object store, error may be unhandled.
2 changes: 2 additions & 0 deletions python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def _configure_system():
FunctionID,
ObjectID,
ObjectRef,
ObjectRefGenerator,
TaskID,
UniqueID,
Language,
Expand Down Expand Up @@ -248,6 +249,7 @@ def __getattr__(self, attr):
"FunctionID",
"ObjectID",
"ObjectRef",
"ObjectRefGenerator",
"TaskID",
"UniqueID",
"PlacementGroupID",
Expand Down
8 changes: 7 additions & 1 deletion python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@ def issubclass_safe(obj: Any, cls_: type) -> bool:
),
# override "_common_options"
"num_cpus": _resource_option("num_cpus", default_value=1),
"num_returns": _counting_option("num_returns", False, default_value=1),
"num_returns": Option(
(int, str, type(None)),
lambda x: x is None or x == "dynamic" or x >= 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether we should use -1 instead of "dynamic". This way it will be more friendly to other static type languages like Java.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the syntax here affects what syntax we use in Java. Prefer "dynamic" since it's more clear than -1.

"The keyword 'num_returns' only accepts None, a non-negative integer, or "
'"dynamic" (for generators)',
default_value=1,
),
"object_store_memory": Option( # override "_common_options"
(int, type(None)),
lambda x: x is None,
Expand Down
9 changes: 9 additions & 0 deletions python/ray/_private/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ray._raylet import (
MessagePackSerializedObject,
MessagePackSerializer,
ObjectRefGenerator,
Pickle5SerializedObject,
Pickle5Writer,
RawSerializedObject,
Expand Down Expand Up @@ -121,6 +122,14 @@ def object_ref_reducer(obj):
)

self._register_cloudpickle_reducer(ray.ObjectRef, object_ref_reducer)

def object_ref_generator_reducer(obj):
return ObjectRefGenerator, (obj._refs,)

self._register_cloudpickle_reducer(
ObjectRefGenerator, object_ref_generator_reducer
)

serialization_addons.apply(self)

def _register_cloudpickle_reducer(self, cls, reducer):
Expand Down
10 changes: 6 additions & 4 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2266,8 +2266,7 @@ def get(

if not isinstance(object_refs, list):
raise ValueError(
"'object_refs' must either be an object ref "
"or a list of object refs."
"'object_refs' must either be an ObjectRef or a list of ObjectRefs."
)

# TODO(ujvl): Consider how to allow user to retrieve the ready objects.
Expand Down Expand Up @@ -2888,8 +2887,11 @@ def remote(

Args:
num_returns: This is only for *remote functions*. It specifies
the number of object refs returned by
the remote function invocation.
the number of object refs returned by the remote function
invocation. Pass "dynamic" to allow the task to decide how many
return values to return during execution, and the caller will
receive an ObjectRef[ObjectRefGenerator] (note, this setting is
experimental).
num_cpus: The quantity of CPU cores to reserve
for this task or for the lifetime of the actor.
num_gpus: The quantity of GPUs to reserve
Expand Down
21 changes: 17 additions & 4 deletions python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,19 @@ from libc.stdint cimport (
from libcpp cimport bool as c_bool
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
from libcpp.unordered_map cimport unordered_map
from libcpp.memory cimport (
shared_ptr,
unique_ptr
)
from libcpp.pair cimport pair as c_pair
from libcpp.utility cimport pair
from ray.includes.optional cimport (
optional,
nullopt,
make_optional,
)

from ray.includes.common cimport (
CBuffer,
CRayObject,
Expand Down Expand Up @@ -123,13 +132,17 @@ cdef class CoreWorker:
c_bool inline_small_object=*)
cdef unique_ptr[CAddress] _convert_python_address(self, address=*)
cdef store_task_output(
self, serialized_object, const CObjectID &return_id, size_t
data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID]
self, serialized_object,
const CObjectID &return_id,
const CObjectID &generator_id,
size_t data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID]
&contained_id, int64_t *task_output_inlined_bytes,
shared_ptr[CRayObject] *return_ptr)
cdef store_task_outputs(
self, worker, outputs, const c_vector[CObjectID] return_ids,
c_vector[shared_ptr[CRayObject]] *returns)
self,
worker, outputs,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns,
CObjectID ref_generator_id=*)
cdef yield_current_fiber(self, CFiberEvent &fiber_event)
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle)
cdef c_function_descriptors_to_python(
Expand Down
Loading