Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support generators to allow tasks to return a dynamic number of objects #28291

Merged
merged 37 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
5636eb9
Worker side
stephanie-wang Sep 4, 2022
3457a46
Owner side, works except for when spilling?
stephanie-wang Sep 5, 2022
031640b
now it works for spilling/in-plasma objects
stephanie-wang Sep 5, 2022
e59dd65
recovery test. TODO:
stephanie-wang Sep 5, 2022
775ff3e
Sort of fix nondeterminism
stephanie-wang Sep 6, 2022
b316b9f
Update src/ray/protobuf/node_manager.proto
stephanie-wang Sep 6, 2022
2d0f5d4
Update src/ray/protobuf/pubsub.proto
stephanie-wang Sep 6, 2022
faf64cb
C++
stephanie-wang Sep 6, 2022
5d92940
doc
stephanie-wang Sep 6, 2022
7c69b32
fixes
stephanie-wang Sep 7, 2022
c1ecf26
Update python/ray/_raylet.pyx
stephanie-wang Sep 8, 2022
422e4b6
minor
stephanie-wang Sep 9, 2022
62b427c
refactor
stephanie-wang Sep 9, 2022
636954c
ref counting during recovery
stephanie-wang Sep 10, 2022
0b73319
ref counting fix
stephanie-wang Sep 10, 2022
450d428
num_returns=dynamic
stephanie-wang Sep 10, 2022
a0bca43
Return generator instead of ObjRef
stephanie-wang Sep 12, 2022
6511a04
doc
stephanie-wang Sep 12, 2022
7ea1404
lint
stephanie-wang Sep 12, 2022
6e4663a
docs
stephanie-wang Sep 12, 2022
7bba5b7
doc
stephanie-wang Sep 12, 2022
c31b1c4
fixes
stephanie-wang Sep 13, 2022
167c6be
update
stephanie-wang Sep 13, 2022
44d9d85
fix
stephanie-wang Sep 13, 2022
39ba48f
Merge remote-tracking branch 'upstream/master' into generators-forreal
stephanie-wang Sep 13, 2022
805a088
x
stephanie-wang Sep 14, 2022
3360d63
cpp
stephanie-wang Sep 14, 2022
2ed252a
fix
stephanie-wang Sep 14, 2022
3a811dc
x
stephanie-wang Sep 14, 2022
285d98d
x
stephanie-wang Sep 15, 2022
7a86f19
Merge remote-tracking branch 'upstream/master' into generators-forreal
stephanie-wang Sep 17, 2022
b045d34
options
stephanie-wang Sep 17, 2022
7b3cf27
x
stephanie-wang Sep 19, 2022
92a8491
experimental
stephanie-wang Sep 21, 2022
492b95e
experimental
stephanie-wang Sep 21, 2022
07ca73d
x
stephanie-wang Sep 21, 2022
c196c67
fix
stephanie-wang Sep 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cpp/src/ray/runtime/task/task_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ std::pair<Status, std::shared_ptr<msgpack::sbuffer>> GetExecuteResult(
}

Status TaskExecutor::ExecuteTask(
const rpc::Address &caller_address,
ray::TaskType task_type,
const std::string task_name,
const RayFunction &ray_function,
Expand All @@ -129,6 +130,7 @@ Status TaskExecutor::ExecuteTask(
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *dynamic_returns,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
Expand Down Expand Up @@ -250,7 +252,10 @@ Status TaskExecutor::ExecuteTask(
}
}

RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result));
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(
result_id,
result,
/*generator_id=*/ObjectID::Nil()));
} else {
if (!status.ok()) {
return ray::Status::CreationTaskError("");
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/ray/runtime/task/task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class TaskExecutor {
absl::Mutex &actor_contexts_mutex);

static Status ExecuteTask(
const rpc::Address &caller_address,
ray::TaskType task_type,
const std::string task_name,
const RayFunction &ray_function,
Expand All @@ -85,6 +86,7 @@ class TaskExecutor {
const std::string &debugger_breakpoint,
const std::string &serialized_retry_exception_allowlist,
std::vector<std::shared_ptr<ray::RayObject>> *results,
std::vector<std::pair<ObjectID, std::shared_ptr<RayObject>>> *dynamic_returns,
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
Expand Down
9 changes: 9 additions & 0 deletions python/ray/_private/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ray._raylet import (
MessagePackSerializedObject,
MessagePackSerializer,
ObjectRefGenerator,
Pickle5SerializedObject,
Pickle5Writer,
RawSerializedObject,
Expand Down Expand Up @@ -121,6 +122,14 @@ def object_ref_reducer(obj):
)

self._register_cloudpickle_reducer(ray.ObjectRef, object_ref_reducer)

def object_ref_generator_reducer(obj):
return ObjectRefGenerator, (obj.refs,)

self._register_cloudpickle_reducer(
ObjectRefGenerator, object_ref_generator_reducer
)

serialization_addons.apply(self)

def _register_cloudpickle_reducer(self, cls, reducer):
Expand Down
24 changes: 20 additions & 4 deletions python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,19 @@ from libc.stdint cimport (
from libcpp cimport bool as c_bool
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector as c_vector
from libcpp.unordered_map cimport unordered_map
from libcpp.memory cimport (
shared_ptr,
unique_ptr
)
from libcpp.pair cimport pair as c_pair
from libcpp.utility cimport pair
from ray.includes.optional cimport (
optional,
nullopt,
make_optional,
)

from ray.includes.common cimport (
CBuffer,
CRayObject,
Expand Down Expand Up @@ -123,13 +132,20 @@ cdef class CoreWorker:
c_bool inline_small_object=*)
cdef unique_ptr[CAddress] _convert_python_address(self, address=*)
cdef store_task_output(
self, serialized_object, const CObjectID &return_id, size_t
data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID]
self, serialized_object,
const CObjectID &return_id,
const CObjectID &generator_id,
size_t data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID]
&contained_id, int64_t *task_output_inlined_bytes,
shared_ptr[CRayObject] *return_ptr)
cdef store_task_outputs(
self, worker, outputs, const c_vector[CObjectID] return_ids,
c_vector[shared_ptr[CRayObject]] *returns)
self,
const CAddress &caller_address,
worker, outputs,
const c_vector[CObjectID] *static_return_ids,
c_vector[shared_ptr[CRayObject]] *returns,
const CObjectID &generator_id,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *dynamic_returns)
cdef yield_current_fiber(self, CFiberEvent &fiber_event)
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle)
cdef c_function_descriptors_to_python(
Expand Down
145 changes: 124 additions & 21 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,19 @@ OPTIMIZED = __OPTIMIZE__

logger = logging.getLogger(__name__)


class ObjectRefGenerator:
def __init__(self, refs):
# TODO(swang): As an optimization, can also store the generator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this: don't we still need to ref count the inner ObjectRefs since Generator can be out of scope while we still have inner ObjectRefs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's possible. The TODO is an optimization for cases where we never use the inner ObjectRefs separately from the outer one. Like if you pass the generator to another task, and that task calls ray.get on each inner ref but deletes the refs before it returns. You can sort of "batch" the ref counts in that case by passing the outer ref to the task instead of the deserialized ObjectRefGenerator.

# ObjectID so that we don't need to keep individual ref counts for the
# inner ObjectRefs.
self.refs = refs
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved

def __iter__(self):
while self.refs:
yield self.refs.pop(0)


cdef int check_status(const CRayStatus& status) nogil except -1:
if status.ok():
return 0
Expand Down Expand Up @@ -531,6 +544,7 @@ cdef c_bool determine_if_retryable(


cdef execute_task(
const CAddress &caller_address,
CTaskType task_type,
const c_string name,
const CRayFunction &ray_function,
Expand All @@ -541,6 +555,7 @@ cdef execute_task(
const c_string debugger_breakpoint,
const c_string serialized_retry_exception_allowlist,
c_vector[shared_ptr[CRayObject]] *returns,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *dynamic_returns,
c_bool *is_retryable_error,
# This parameter is only used for actor creation task to define
# the concurrency groups of this actor.
Expand All @@ -558,6 +573,9 @@ cdef execute_task(
JobID job_id = core_worker.get_current_job_id()
TaskID task_id = core_worker.get_current_task_id()
CFiberEvent task_done_event
CObjectID dynamic_return_id
c_vector[CObjectID] dynamic_return_ids
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
c_vector[shared_ptr[CRayObject]] dynamic_return_ptrs

# Automatically restrict the GPUs available to this task.
ray._private.utils.set_cuda_visible_devices(ray.get_gpu_ids())
Expand Down Expand Up @@ -752,7 +770,7 @@ cdef execute_task(
core_worker.get_current_task_id()),
exc_info=True)
raise e
if c_return_ids.size() == 1:
if c_return_ids.size() == 1 and not inspect.isgenerator(outputs):
# If there is only one return specified, we should return
# all return values as a single object.
outputs = (outputs,)
Expand Down Expand Up @@ -786,8 +804,33 @@ cdef execute_task(

# Store the outputs in the object store.
with core_worker.profile_event(b"task:store_outputs"):
num_returns = c_return_ids.size()
if num_returns == 1 and inspect.isgenerator(outputs):
# If the task has a single return value and it is a
# generator, first store all outputs yielded by the
# generator. We will assign their ObjectIDs dynamically.
# TODO(swang): Fix reconstruction case where generator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this be resolved in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh it might be fixed already... let me check :D

# returned a single value.
core_worker.store_task_outputs(
caller_address, worker, outputs, NULL,
NULL,
c_return_ids[0],
dynamic_returns)
dynamic_refs = []
for idx in range(dynamic_returns.size()):
dynamic_refs.append(ObjectRef(
dynamic_returns[0][idx].first.Binary(),
caller_address.SerializeAsString(),
skip_adding_local_ref=True
))
# Swap out the generator for an ObjectRef generator.
outputs = (ObjectRefGenerator(dynamic_refs), )

core_worker.store_task_outputs(
worker, outputs, c_return_ids, returns)
caller_address, worker, outputs, &c_return_ids,
returns,
CObjectID.Nil(),
NULL)
except Exception as error:
# If the debugger is enabled, drop into the remote pdb here.
if "RAY_PDB" in os.environ:
Expand All @@ -812,7 +855,21 @@ cdef execute_task(
for _ in range(c_return_ids.size()):
errors.append(failure_object)
core_worker.store_task_outputs(
worker, errors, c_return_ids, returns)
caller_address, worker, errors, &c_return_ids,
returns,
CObjectID.Nil(),
NULL)
if not dynamic_returns[0].empty():
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
# We generated dynamic objects during the first execution.
# Store the error for these objects too.
dynamic_errors = []
for _ in range(dynamic_returns[0].size()):
dynamic_errors.append(failure_object)
core_worker.store_task_outputs(
caller_address, worker, dynamic_errors, NULL, NULL,
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
c_return_ids[0],
dynamic_returns)

ray._private.utils.push_error_to_driver(
worker,
ray_constants.TASK_PUSH_ERROR,
Expand Down Expand Up @@ -841,6 +898,7 @@ cdef shared_ptr[LocalMemoryBuffer] ray_error_to_memory_buf(ray_error):
<uint8_t*>py_bytes, len(py_bytes), True)

cdef CRayStatus task_execution_handler(
const CAddress &caller_address,
CTaskType task_type,
const c_string task_name,
const CRayFunction &ray_function,
Expand All @@ -851,6 +909,7 @@ cdef CRayStatus task_execution_handler(
const c_string debugger_breakpoint,
const c_string serialized_retry_exception_allowlist,
c_vector[shared_ptr[CRayObject]] *returns,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *dynamic_returns,
shared_ptr[LocalMemoryBuffer] &creation_task_exception_pb_bytes,
c_bool *is_retryable_error,
const c_vector[CConcurrencyGroup] &defined_concurrency_groups,
Expand All @@ -860,11 +919,13 @@ cdef CRayStatus task_execution_handler(
try:
# The call to execute_task should never raise an exception. If
# it does, that indicates that there was an internal error.
execute_task(task_type, task_name, ray_function, c_resources,
execute_task(caller_address, task_type, task_name,
ray_function, c_resources,
c_args, c_arg_refs, c_return_ids,
debugger_breakpoint,
serialized_retry_exception_allowlist,
returns,
dynamic_returns,
is_retryable_error,
defined_concurrency_groups,
name_of_concurrency_group_to_execute)
Expand Down Expand Up @@ -1401,6 +1462,7 @@ cdef class CoreWorker:
check_status(
CCoreWorkerProcess.GetCoreWorker().SealExisting(
c_object_id, pin_object=False,
generator_id=CObjectID.Nil(),
owner_address=c_owner_address))

def put_serialized_object_and_increment_local_ref(self, serialized_object,
Expand Down Expand Up @@ -1459,6 +1521,7 @@ cdef class CoreWorker:
check_status(
CCoreWorkerProcess.GetCoreWorker().SealExisting(
c_object_id, pin_object=False,
generator_id=CObjectID.Nil(),
owner_address=move(c_owner_address)))

return c_object_id.Binary()
Expand Down Expand Up @@ -2060,6 +2123,7 @@ cdef class CoreWorker:
serialized_object_status))

cdef store_task_output(self, serialized_object, const CObjectID &return_id,
const CObjectID &generator_id,
size_t data_size, shared_ptr[CBuffer] &metadata,
const c_vector[CObjectID] &contained_id,
int64_t *task_output_inlined_bytes,
Expand All @@ -2086,38 +2150,74 @@ cdef class CoreWorker:
with nogil:
check_status(
CCoreWorkerProcess.GetCoreWorker().SealReturnObject(
return_id, return_ptr[0]))
return_id, return_ptr[0], generator_id))
return True
else:
with nogil:
success = (CCoreWorkerProcess.GetCoreWorker()
.PinExistingReturnObject(return_id, return_ptr))
.PinExistingReturnObject(
return_id, return_ptr, generator_id))
return success

cdef store_task_outputs(
self, worker, outputs, const c_vector[CObjectID] return_ids,
c_vector[shared_ptr[CRayObject]] *returns):
cdef store_task_outputs(self,
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
const CAddress &caller_address,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find where this is used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets passed to store_task_output so that we know who the owner is for dynamic objects.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cdef store_task_output(
            self, serialized_object,
            const CObjectID &return_id,
            const CObjectID &generator_id,
            size_t data_size, shared_ptr[CBuffer] &metadata, const c_vector[CObjectID]
            &contained_id, int64_t *task_output_inlined_bytes,
            shared_ptr[CRayObject] *return_ptr)

Seems store_task_output doesn't accept address. Could you point me to the line of code where it's used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, you're right, I'm pulling it directly from CoreWorker state now. Will remove!

worker, outputs,
const c_vector[CObjectID] *static_return_ids,
c_vector[shared_ptr[CRayObject]] *returns,
const CObjectID &generator_id,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]]
*dynamic_returns):
cdef:
CObjectID return_id
size_t data_size
shared_ptr[CBuffer] metadata
c_vector[CObjectID] contained_id
int64_t task_output_inlined_bytes

if return_ids.size() == 0:
int64_t num_returns = -1
shared_ptr[CRayObject] *return_ptr
if static_return_ids is not NULL:
num_returns = static_return_ids[0].size()
elif dynamic_returns is not NULL and dynamic_returns.size() > 0:
num_returns = dynamic_returns[0].size()

if num_returns == 0:
return

n_returns = return_ids.size()
returns.resize(n_returns)
task_output_inlined_bytes = 0
i = -1
outputs = list(outputs)
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
for i, output in enumerate(outputs):
if i >= n_returns:
if num_returns >= 0 and i >= num_returns:
raise ValueError(
"Task returned more than num_returns={} objects.".format(
n_returns))
num_returns))
if static_return_ids != NULL:
return_id = static_return_ids[0][i]
while returns[0].size() <= i:
returns[0].push_back(shared_ptr[CRayObject]())
return_ptr = &returns[0][i]
else:
if num_returns == -1:
return_id = (CCoreWorkerProcess.GetCoreWorker()
.AllocateDynamicReturnId())
dynamic_returns[0].push_back(
c_pair[CObjectID, shared_ptr[CRayObject]](
return_id, shared_ptr[CRayObject]()))
else:
return_id = dynamic_returns[0][i].first
assert i < dynamic_returns[0].size()
return_ptr = &dynamic_returns[0][i].second

# TODO(swang): We should only try to create an existing return
# value if there were multiple return values, and we errored while
# trying to create one of them. We should try to delete the first
# value and store the error instead here.
# Skip return values that we already created.
if return_ptr.get() != NULL:
stephanie-wang marked this conversation as resolved.
Show resolved Hide resolved
continue

return_id = return_ids[i]
context = worker.get_serialization_context()

serialized_object = context.serialize(output)
data_size = serialized_object.total_bytes
metadata_str = serialized_object.metadata
Expand All @@ -2135,21 +2235,24 @@ cdef class CoreWorker:

if not self.store_task_output(
serialized_object, return_id,
generator_id,
data_size, metadata, contained_id,
&task_output_inlined_bytes, &returns[0][i]):
&task_output_inlined_bytes, return_ptr):
# If the object already exists, but we fail to pin the copy, it
# means the existing copy might've gotten evicted. Try to
# create another copy.
self.store_task_output(
serialized_object, return_id, data_size, metadata,
serialized_object, return_id,
generator_id,
data_size, metadata,
contained_id, &task_output_inlined_bytes,
&returns[0][i])
return_ptr)

i += 1
if i < n_returns:
if i < num_returns:
raise ValueError(
"Task returned {} objects, but num_returns={}.".format(
i, n_returns))
i, num_returns))

cdef c_function_descriptors_to_python(
self,
Expand Down
Loading