Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support generators to allow tasks to return a dynamic number of objects #28291

Merged
merged 37 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
5636eb9
Worker side
stephanie-wang Sep 4, 2022
3457a46
Owner side, works except for when spilling?
stephanie-wang Sep 5, 2022
031640b
now it works for spilling/in-plasma objects
stephanie-wang Sep 5, 2022
e59dd65
recovery test. TODO:
stephanie-wang Sep 5, 2022
775ff3e
Sort of fix nondeterminism
stephanie-wang Sep 6, 2022
b316b9f
Update src/ray/protobuf/node_manager.proto
stephanie-wang Sep 6, 2022
2d0f5d4
Update src/ray/protobuf/pubsub.proto
stephanie-wang Sep 6, 2022
faf64cb
C++
stephanie-wang Sep 6, 2022
5d92940
doc
stephanie-wang Sep 6, 2022
7c69b32
fixes
stephanie-wang Sep 7, 2022
c1ecf26
Update python/ray/_raylet.pyx
stephanie-wang Sep 8, 2022
422e4b6
minor
stephanie-wang Sep 9, 2022
62b427c
refactor
stephanie-wang Sep 9, 2022
636954c
ref counting during recovery
stephanie-wang Sep 10, 2022
0b73319
ref counting fix
stephanie-wang Sep 10, 2022
450d428
num_returns=dynamic
stephanie-wang Sep 10, 2022
a0bca43
Return generator instead of ObjRef
stephanie-wang Sep 12, 2022
6511a04
doc
stephanie-wang Sep 12, 2022
7ea1404
lint
stephanie-wang Sep 12, 2022
6e4663a
docs
stephanie-wang Sep 12, 2022
7bba5b7
doc
stephanie-wang Sep 12, 2022
c31b1c4
fixes
stephanie-wang Sep 13, 2022
167c6be
update
stephanie-wang Sep 13, 2022
44d9d85
fix
stephanie-wang Sep 13, 2022
39ba48f
Merge remote-tracking branch 'upstream/master' into generators-forreal
stephanie-wang Sep 13, 2022
805a088
x
stephanie-wang Sep 14, 2022
3360d63
cpp
stephanie-wang Sep 14, 2022
2ed252a
fix
stephanie-wang Sep 14, 2022
3a811dc
x
stephanie-wang Sep 14, 2022
285d98d
x
stephanie-wang Sep 15, 2022
7a86f19
Merge remote-tracking branch 'upstream/master' into generators-forreal
stephanie-wang Sep 17, 2022
b045d34
options
stephanie-wang Sep 17, 2022
7b3cf27
x
stephanie-wang Sep 19, 2022
92a8491
experimental
stephanie-wang Sep 21, 2022
492b95e
experimental
stephanie-wang Sep 21, 2022
07ca73d
x
stephanie-wang Sep 21, 2022
c196c67
fix
stephanie-wang Sep 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cpp/src/ray/runtime/task/task_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ std::pair<Status, std::shared_ptr<msgpack::sbuffer>> GetExecuteResult(
}

Status TaskExecutor::ExecuteTask(
const rpc::Address &caller_address,
ray::TaskType task_type,
const std::string task_name,
const RayFunction &ray_function,
Expand All @@ -129,6 +130,7 @@ Status TaskExecutor::ExecuteTask(
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *dynamic_returns,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
Expand Down Expand Up @@ -250,7 +252,10 @@ Status TaskExecutor::ExecuteTask(
}
}

RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result));
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(
result_id,
result,
/*generator_id=*/ObjectID::Nil()));
} else {
if (!status.ok()) {
return ray::Status::CreationTaskError("");
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/ray/runtime/task/task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class TaskExecutor {
absl::Mutex &actor_contexts_mutex);

static Status ExecuteTask(
const rpc::Address &caller_address,
ray::TaskType task_type,
const std::string task_name,
const RayFunction &ray_function,
Expand All @@ -85,6 +86,7 @@ class TaskExecutor {
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *dynamic_returns,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
Expand Down
65 changes: 65 additions & 0 deletions doc/source/ray-core/doc_code/generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# __program_start__
import ray

# fmt: off
# __dynamic_generator_start__
import numpy as np


@ray.remote(num_returns="dynamic")
def split(array, chunk_size):
while len(array) > 0:
yield array[:chunk_size]
array = array[chunk_size:]


array_ref = ray.put(np.zeros(np.random.randint(1000_000)))
block_size = 1000

ref_generator = split.remote(array_ref, block_size) # Returns an ObjectRefGenerator.
print(ref_generator)

i = -1
# NOTE: When the generator is iterated for the first time, this will block
# until the task is complete and the number of ObjectRefs returned by the task
# is known. This is unlike remote functions with a static num_returns, where
# ObjectRefs can be passed to another function before the task is complete.
for i, ref in enumerate(ref_generator):
# Each ObjectRefGenerator iteration returns an ObjectRef.
assert len(ray.get(ref)) <= block_size
num_blocks_generated = i + 1
array_size = len(ray.get(array_ref))
assert array_size <= num_blocks_generated * block_size
print(f"Split array of size {array_size} into {num_blocks_generated} blocks of "
f"size {block_size} each.")
# __dynamic_generator_end__
# fmt: on


# fmt: off
# __dynamic_generator_ray_get_start__
ref_generator = split.remote(array_ref, block_size)
value_generator = ray.get(ref_generator)
print(value_generator)
for array in value_generator:
assert len(array) <= block_size
# __dynamic_generator_ray_get_end__
# fmt: on


# fmt: off
# __dynamic_generator_pass_start__
@ray.remote
def get_size(ref_generator):
value_generator = ray.get(ref_generator)
print(value_generator)
num_elements = 0
for array in value_generator:
assert len(array) <= block_size
num_elements += len(array)
return num_elements

ref_generator = split.remote(array_ref, block_size)
assert array_size == ray.get(get_size.remote(ref_generator))
# __dynamic_generator_pass_end__
# fmt: on
2 changes: 1 addition & 1 deletion doc/source/ray-core/patterns/generators.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.. _generators:
.. _generator-pattern:

Pattern: Using generators to reduce heap memory usage
=====================================================
Expand Down
1 change: 1 addition & 0 deletions doc/source/ray-core/tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ More about Ray Tasks
tasks/resources.rst
tasks/using-ray-with-gpus.rst
tasks/nested-tasks.rst
tasks/generators.rst
tasks/fault-tolerance.rst
tasks/scheduling.rst
tasks/patterns/index.rst
78 changes: 78 additions & 0 deletions doc/source/ray-core/tasks/generators.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
.. _generators:

stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
Generators
==========

Python generators are functions that behave like an iterator, yielding one
value per iteration. Ray supports remote generators for two use cases:

1. To reduce max heap memory usage when returning multiple values from a remote
function. See the :ref:`design pattern guide <generator-pattern>` for an
example.
2. When the number of return values is set dynamically by the remote function
instead of by the caller.


`num_returns` set by the task caller
------------------------------------

Where possible, the caller should set the remote function's number of return values using ``@ray.remote(num_returns=x)`` or ``foo.options(num_returns=x).remote()``.
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
Ray will return this many ``ObjectRefs`` to the caller.
The remote task should then return the same number of values, usually as a tuple or list.

Without changing the caller's syntax, we can also use a remote generator function to return the values iteratively.
The generator should return the same number of return values specified by the caller, and these will be stored one at a time in Ray's object store.
An error will be returned for generators that return a different number of values from the one specified by the caller.

For example, we can swap the following code that returns a list of return values:

.. literalinclude:: ../doc_code/pattern_generators.py
:language: python
:start-after: __large_values_start__
:end-before: __large_values_end__

for this code, which uses a generator function:

.. literalinclude:: ../doc_code/pattern_generators.py
:language: python
:start-after: __large_values_generator_start__
:end-before: __large_values_generator_end__

The advantage of doing so is that the generator function does not need to hold all of its return values in memory at once.
It can return the arrays one at a time to reduce memory pressure.

`num_returns` set by the task executor
--------------------------------------

In some cases, the caller may not know the number of return values to expect from a remote function.
For example, suppose we want to write a task that breaks up its argument into equal-size chunks and returns these.
We may not know the size of the argument until we execute the task, so we don't know the number of return values to expect.

In these cases, we can use a remote generator function that returns a *dynamic* number of values.
To use this feature, set ``num_returns="dynamic"`` in the ``@ray.remote`` decorator or the remote function's ``.options()``.
Then, when invoking the remote function, Ray will return an ``ObjectRefGenerator`` instead of one or multiple ``ObjectRefs``.

Note that when an ``ObjectRefGenerator`` is first iterated upon, *the caller will block until the corresponding task is complete* and the number of return values is known.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I thought the return value of dynamic generator task is also an ObjectRef that we can get the generator by calling ray.get()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a change that I made to try to make the semantics closer to the usual list of ObjectRefs.

The main thing that might be a bit weird is that there's an implicit ray.get when you try to access the iterator. But it's fine to pass it around to other tasks before then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I preferred this syntax over returning an ObjectRef directly, though, since it makes it explicit that a num_returns="dynamic" task does not return a normal ObjectRef.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this limiting though? For example, if you wanted to wait for two generators to be completed, now you couldn't use ray.wait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could still use ray.wait like this as long as we support passing ObjectRefGenerators too:

g1 = gen.remote()
g2 = gen.remote()
gs, _ = ray.wait([g1, g2], num_returns=2)  # Waits until the ref lists for both are ready.

I think the only thing that wouldn't be supported is if you wanted to block on the results of both at the same time with the list syntax:

zipped = zip(g1, g2)
for pair in zipped:
  # First iteration would wait for g1 ref list, then for g2 ref list.
  pass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has implications for other APIs as well right? (e.g., ray.kill, passing as args). It seems at the least this would change the type signature to Union[ObjectRef, ObjectRefGenerator] for many APIs.

Ray will then populate the generator with a list of ``ObjectRefs`` pointing to the values returned by the remote function.
Then, the caller can iterate through the generator like any other list of ``ObjectRefs``.

.. literalinclude:: ../doc_code/generator.py
:language: python
:start-after: __dynamic_generator_start__
:end-before: __dynamic_generator_end__

We can also get the return values as a generator by passing the ``ObjectRefGenerator`` to ``ray.get``.
This will return another generator, this time of the **values** of the contained ``ObjectRefs``.

.. literalinclude:: ../doc_code/generator.py
:language: python
:start-after: __dynamic_generator_ray_get_start__
:end-before: __dynamic_generator_ray_get_end__

Finally, the semantics for passing an ``ObjectRefGenerator`` to another remote function are similar to that of passing a list of ``ObjectRefs``.
The remote task worker will receive the same ``ObjectRefGenerator``, which it can iterate over directly or pass to ``ray.get`` or another task.

.. literalinclude:: ../doc_code/generator.py
:language: python
:start-after: __dynamic_generator_pass_start__
:end-before: __dynamic_generator_pass_end__
8 changes: 7 additions & 1 deletion python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@ def issubclass_safe(obj: Any, cls_: type) -> bool:
),
# override "_common_options"
"num_cpus": _resource_option("num_cpus", default_value=1),
"num_returns": _counting_option("num_returns", False, default_value=1),
"num_returns": Option(
(int, str, type(None)),
lambda x: x is None or x == "dynamic" or x >= 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether we should use -1 instead of "dynamic". This way it will be more friendly to other static type languages like Java.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the syntax here affects what syntax we use in Java. Prefer "dynamic" since it's more clear than -1.

"The keyword num_returns only accepts None, a non-negative integer, or "
'"dynamic" (for generators)',
default_value=1,
),
"object_store_memory": Option( # override "_common_options"
(int, type(None)),
lambda x: x is None,
Expand Down
12 changes: 12 additions & 0 deletions python/ray/_private/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ray._raylet import (
MessagePackSerializedObject,
MessagePackSerializer,
ObjectRefGenerator,
Pickle5SerializedObject,
Pickle5Writer,
RawSerializedObject,
Expand Down Expand Up @@ -121,6 +122,17 @@ def object_ref_reducer(obj):
)

self._register_cloudpickle_reducer(ray.ObjectRef, object_ref_reducer)

def object_ref_generator_reducer(obj):
return ObjectRefGenerator, (
obj._generator_ref,
obj._refs,
)

self._register_cloudpickle_reducer(
ObjectRefGenerator, object_ref_generator_reducer
)

serialization_addons.apply(self)

def _register_cloudpickle_reducer(self, cls, reducer):
Expand Down
26 changes: 22 additions & 4 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import ray._private.services as services
import ray._private.state
import ray._private.storage as storage
from ray._raylet import ObjectRefGenerator

# Ray modules
import ray.cloudpickle as pickle
Expand Down Expand Up @@ -2206,10 +2207,17 @@ def get(object_refs: "ObjectRef[R]", *, timeout: Optional[float] = None) -> R:
...


@overload
def get(
object_ref_generator: "ObjectRefGenerator", *, timeout: Optional[float] = None
) -> Iterator[R]:
...


@PublicAPI
@client_mode_hook(auto_init=True)
def get(
object_refs: Union[ray.ObjectRef, Sequence[ray.ObjectRef]],
object_refs: Union[ray.ObjectRef, Sequence[ray.ObjectRef], ObjectRefGenerator],
*,
timeout: Optional[float] = None,
) -> Union[Any, List[Any]]:
Expand Down Expand Up @@ -2263,12 +2271,22 @@ def get(
if is_individual_id:
object_refs = [object_refs]

if not isinstance(object_refs, list):
if not (
isinstance(object_refs, list) or isinstance(object_refs, ObjectRefGenerator)
):
raise ValueError(
"'object_refs' must either be an object ref "
"or a list of object refs."
"'object_refs' must either be an ObjectRef, "
"a list of ObjectRefs, or an ObjectRefGenerator."
)

if isinstance(object_refs, ObjectRefGenerator):

def value_generator(object_refs, timeout):
for ref in object_refs:
yield ray.get(ref, timeout=timeout)

return value_generator(object_refs, timeout)

# TODO(ujvl): Consider how to allow user to retrieve the ready objects.
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
for i, value in enumerate(values):
Expand Down
23 changes: 19 additions & 4 deletions python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,19 @@ from libc.stdint cimport (
from libcpp cimport bool as c_bool
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
from libcpp.unordered_map cimport unordered_map
from libcpp.memory cimport (
shared_ptr,
unique_ptr
)
from libcpp.pair cimport pair as c_pair
from libcpp.utility cimport pair
from ray.includes.optional cimport (
optional,
nullopt,
make_optional,
)

from ray.includes.common cimport (
CBuffer,
CRayObject,
Expand Down Expand Up @@ -123,13 +132,19 @@ cdef class CoreWorker:
c_bool inline_small_object=*)
cdef unique_ptr[CAddress] _convert_python_address(self, address=*)
cdef store_task_output(
self, serialized_object, const CObjectID &return_id, size_t
data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID]
self, serialized_object,
const CObjectID &return_id,
const CObjectID &generator_id,
size_t data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID]
&contained_id, int64_t *task_output_inlined_bytes,
shared_ptr[CRayObject] *return_ptr)
cdef store_task_outputs(
self, worker, outputs, const c_vector[CObjectID] return_ids,
c_vector[shared_ptr[CRayObject]] *returns)
self,
const CAddress &caller_address,
worker, outputs,
is_dynamic,
const CObjectID &outer_id,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns)
cdef yield_current_fiber(self, CFiberEvent &fiber_event)
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle)
cdef c_function_descriptors_to_python(
Expand Down
Loading