Skip to content

Commit

Permalink
[core] Dynamic generators that error return partial ObjectRefs follow…
Browse files Browse the repository at this point in the history
…ed by exception ObjectRef (#28864)

Previously if a dynamic generator task errored, we would store the error as the top-level ObjectRef, which could lead to a leak of any objects already stored, since the ObjectRefs are never returned to Python.

This PR improves usability by storing the exception as an additional final ObjectRef returned by the generator (see included examples in docs code). All successfully stored objects will still be returned by the generator as usual. Updated some tests to match this new behavior.
  • Loading branch information
stephanie-wang authored Sep 30, 2022
1 parent 809f194 commit 9142e1b
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 123 deletions.
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/task/task_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ Status TaskExecutor::ExecuteTask(
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
const std::string name_of_concurrency_group_to_execute) {
const std::string name_of_concurrency_group_to_execute,
bool is_reattempt) {
RAY_LOG(DEBUG) << "Execute task type: " << TaskType_Name(task_type)
<< " name:" << task_name;
RAY_CHECK(ray_function.GetLanguage() == ray::Language::CPP);
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/task/task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ class TaskExecutor {
std::shared_ptr<ray::LocalMemoryBuffer> &creation_task_exception_pb_bytes,
bool *is_retryable_error,
const std::vector<ConcurrencyGroup> &defined_concurrency_groups,
const std::string name_of_concurrency_group_to_execute);
const std::string name_of_concurrency_group_to_execute,
bool is_reattempt);

virtual ~TaskExecutor(){};

Expand Down
47 changes: 47 additions & 0 deletions doc/source/ray-core/doc_code/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,50 @@ def get_size(ref_generator : ObjectRefGenerator):
# (get_size pid=1504184) <ray._raylet.ObjectRefGenerator object at 0x7f81c4251b50>
# __dynamic_generator_pass_end__
# fmt: on


# fmt: off
# __generator_errors_start__
@ray.remote
def generator():
for i in range(2):
yield i
raise Exception("error")


ref1, ref2, ref3, ref4 = generator.options(num_returns=4).remote()
assert ray.get([ref1, ref2]) == [0, 1]
# All remaining ObjectRefs will contain the error.
try:
ray.get([ref3, ref4])
except Exception as error:
print(error)

dynamic_ref = generator.options(num_returns="dynamic").remote()
ref_generator = ray.get(dynamic_ref)
ref1, ref2, ref3 = ref_generator
assert ray.get([ref1, ref2]) == [0, 1]
# Generators with num_returns="dynamic" will store the exception in the final
# ObjectRef.
try:
ray.get(ref3)
except Exception as error:
print(error)
# __generator_errors_end__
# fmt: on

# fmt: off
# __generator_errors_unsupported_start__
# Generators that yield more values than expected currently do not throw an
# exception (the error is only logged).
# See https://github.com/ray-project/ray/issues/28689.
ref1, ref2 = generator.options(num_returns=2).remote()
assert ray.get([ref1, ref2]) == [0, 1]
"""
(generator pid=2375938) 2022-09-28 11:08:51,386 ERROR worker.py:755 --
Unhandled error: Task threw exception, but all return values already
created. This should only occur when using generator tasks.
...
"""
# __generator_errors_unsupported_end__
# fmt: on
32 changes: 23 additions & 9 deletions doc/source/ray-core/tasks/generators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,32 @@ We can also pass the ``ObjectRef`` returned by a task with ``num_returns="dynami
:start-after: __dynamic_generator_pass_start__
:end-before: __dynamic_generator_pass_end__

Limitations
-----------
Exception handling
------------------

Although a generator function creates ``ObjectRefs`` one at a time, currently Ray will not schedule dependent tasks until the entire task is complete and all values have been created. This is similar to the semantics used by tasks that return multiple values as a list.
If a generator function raises an exception before yielding all its values, the values that it already stored will still be accessible through their ``ObjectRefs``.
The remaining ``ObjectRefs`` will contain the thrown exception.
This is true for both static and dynamic ``num_returns``.
If the task was called with ``num_returns="dynamic"``, the exception will be stored as an additional final ``ObjectRef`` in the ``ObjectRefGenerator``.

``num_returns="dynamic"`` is not yet supported for actor tasks.
.. literalinclude:: ../doc_code/generator.py
:language: python
:start-after: __generator_errors_start__
:end-before: __generator_errors_end__

Note that there is currently a known bug where exceptions will not be propagated for generators that yield more values than expected. This can occur in two cases:

If a generator function raises an exception before yielding all its values, all values returned by the generator will be replaced by the exception traceback, including values that were already successfully yielded.
If the task was called with ``num_returns="dynamic"``, the exception will be stored in the ``ObjectRef`` returned by the task instead of the usual ``ObjectRefGenerator``.
1. When ``num_returns`` is set by the caller, but the generator task returns more than this value.
2. When a generator task with ``num_returns="dynamic"`` is :ref:`re-executed <task-retries>`, and the re-executed task yields more values than the original execution. Note that in general, Ray does not guarantee correctness for task re-execution if the task is nondeterministic, and it is recommended to set ``@ray.remote(num_retries=0)`` for such tasks.

Note that there is currently a known bug where exceptions will not be propagated for generators that yield objects in Ray's shared-memory object store before erroring. In this case, these objects will still be accessible through the returned ``ObjectRefs`` and you may see an error like the following:
.. literalinclude:: ../doc_code/generator.py
:language: python
:start-after: __generator_errors_unsupported_start__
:end-before: __generator_errors_unsupported_end__

.. code-block:: text
Limitations
-----------

$ ERROR worker.py:754 -- Generator threw exception after returning partial values in the object store, error may be unhandled.
Although a generator function creates ``ObjectRefs`` one at a time, currently Ray will not schedule dependent tasks until the entire task is complete and all values have been created. This is similar to the semantics used by tasks that return multiple values as a list.

``num_returns="dynamic"`` is not yet supported for actor tasks.
170 changes: 110 additions & 60 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,53 @@ cdef c_bool determine_if_retryable(
# Check that e is in allowlist.
return isinstance(e, exception_allowlist)

cdef store_task_errors(
worker,
exc,
task_exception,
actor,
function_name,
CTaskType task_type,
proctitle,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns,
):
cdef:
CoreWorker core_worker = worker.core_worker

# If the debugger is enabled, drop into the remote pdb here.
if "RAY_PDB" in os.environ:
ray.util.pdb.post_mortem()

backtrace = ray._private.utils.format_error_message(
traceback.format_exc(), task_exception=task_exception)

# Generate the actor repr from the actor class.
actor_repr = repr(actor) if actor else None

if isinstance(exc, RayTaskError):
# Avoid recursive nesting of RayTaskError.
failure_object = RayTaskError(function_name, backtrace,
exc.cause, proctitle=proctitle,
actor_repr=actor_repr)
else:
failure_object = RayTaskError(function_name, backtrace,
exc, proctitle=proctitle,
actor_repr=actor_repr)
errors = []
for _ in range(returns[0].size()):
errors.append(failure_object)
num_errors_stored = core_worker.store_task_outputs(
worker, errors,
returns)

ray._private.utils.push_error_to_driver(
worker,
ray_constants.TASK_PUSH_ERROR,
str(failure_object),
job_id=worker.current_job_id)
if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
raise RayActorError.from_task_error(failure_object)
return num_errors_stored

cdef execute_task(
const CAddress &caller_address,
Expand All @@ -562,7 +609,8 @@ cdef execute_task(
# This parameter is only used for actor creation task to define
# the concurrency groups of this actor.
const c_vector[CConcurrencyGroup] &c_defined_concurrency_groups,
const c_string c_name_of_concurrency_group_to_execute):
const c_string c_name_of_concurrency_group_to_execute,
c_bool is_reattempt):

is_retryable_error[0] = False

Expand Down Expand Up @@ -817,10 +865,45 @@ cdef execute_task(
raise ValueError(
"Functions with @ray.remote(num_returns=\"dynamic\" "
"must return a generator")
core_worker.store_task_outputs(
worker, outputs,
dynamic_returns,
returns[0][0].first)
task_exception = True
try:
core_worker.store_task_outputs(
worker, outputs,
dynamic_returns,
returns[0][0].first)
except Exception as error:
if not is_reattempt:
# If this is the first execution, we should
# generate one additional ObjectRef. This last
# ObjectRef will contain the error.
error_id = (CCoreWorkerProcess.GetCoreWorker()
.AllocateDynamicReturnId())
dynamic_returns[0].push_back(
c_pair[CObjectID, shared_ptr[CRayObject]](
error_id, shared_ptr[CRayObject]()))

# If a generator task fails mid-execution, we fail the
# dynamically generated nested ObjectRefs instead of
# the top-level ObjectRefGenerator.
num_errors_stored = store_task_errors(
worker, error, task_exception, actor,
function_name, task_type, title,
dynamic_returns)
if num_errors_stored == 0:
assert is_reattempt
# TODO(swang): The generator task failed and we
# also failed to store the error in any of its
# return values. This should only occur if the
# generator task was re-executed and returned more
# values than the initial execution.
logger.error(
"Unhandled error: Re-executed generator task "
"returned more than the "
f"{dynamic_returns[0].size()} values returned "
"by the first execution.\n"
"See https://github.com/ray-project/ray/issues/28688.")

task_exception = False
dynamic_refs = []
for idx in range(dynamic_returns.size()):
dynamic_refs.append(ObjectRef(
Expand All @@ -834,47 +917,15 @@ cdef execute_task(
worker, outputs,
returns)
except Exception as error:
# If the debugger is enabled, drop into the remote pdb here.
if "RAY_PDB" in os.environ:
ray.util.pdb.post_mortem()

backtrace = ray._private.utils.format_error_message(
traceback.format_exc(), task_exception=task_exception)

# Generate the actor repr from the actor class.
actor_repr = repr(actor) if actor else None

if isinstance(error, RayTaskError):
# Avoid recursive nesting of RayTaskError.
failure_object = RayTaskError(function_name, backtrace,
error.cause, proctitle=title,
actor_repr=actor_repr)
else:
failure_object = RayTaskError(function_name, backtrace,
error, proctitle=title,
actor_repr=actor_repr)
errors = []
for _ in range(returns[0].size()):
errors.append(failure_object)
core_worker.store_task_outputs(
worker, errors,
returns)
if dynamic_returns != NULL:
# Store errors for any dynamically generated objects too.
dynamic_errors = []
for _ in range(dynamic_returns[0].size()):
dynamic_errors.append(failure_object)
core_worker.store_task_outputs(
worker, dynamic_errors,
dynamic_returns)

ray._private.utils.push_error_to_driver(
worker,
ray_constants.TASK_PUSH_ERROR,
str(failure_object),
job_id=worker.current_job_id)
if (<int>task_type == <int>TASK_TYPE_ACTOR_CREATION_TASK):
raise RayActorError.from_task_error(failure_object)
num_errors_stored = store_task_errors(
worker, error, task_exception, actor, function_name,
task_type, title, returns)
if returns[0].size() > 0 and num_errors_stored == 0:
logger.exception(
"Unhandled error: Task threw exception, but all "
f"{returns[0].size()} return values already created. "
"This should only occur when using generator tasks.\n"
"See https://github.com/ray-project/ray/issues/28689.")

if execution_info.max_calls != 0:
# Reset the state of the worker for the next task to execute.
Expand Down Expand Up @@ -910,7 +961,8 @@ cdef CRayStatus task_execution_handler(
shared_ptr[LocalMemoryBuffer] &creation_task_exception_pb_bytes,
c_bool *is_retryable_error,
const c_vector[CConcurrencyGroup] &defined_concurrency_groups,
const c_string name_of_concurrency_group_to_execute) nogil:
const c_string name_of_concurrency_group_to_execute,
c_bool is_reattempt) nogil:
with gil, disable_client_hook():
try:
try:
Expand All @@ -925,7 +977,8 @@ cdef CRayStatus task_execution_handler(
dynamic_returns,
is_retryable_error,
defined_concurrency_groups,
name_of_concurrency_group_to_execute)
name_of_concurrency_group_to_execute,
is_reattempt)
except Exception as e:
sys_exit = SystemExit()
if isinstance(e, RayActorError) and \
Expand Down Expand Up @@ -2169,6 +2222,7 @@ cdef class CoreWorker:
int64_t task_output_inlined_bytes
int64_t num_returns = -1
shared_ptr[CRayObject] *return_ptr
num_outputs_stored = 0
if not ref_generator_id.IsNil():
# The task specified a dynamic number of return values. Determine
# the expected number of return values.
Expand All @@ -2189,7 +2243,7 @@ cdef class CoreWorker:
num_returns = returns[0].size()

if num_returns == 0:
return
return num_outputs_stored

task_output_inlined_bytes = 0
i = -1
Expand All @@ -2210,18 +2264,11 @@ cdef class CoreWorker:
returns[0][i].second = shared_ptr[CRayObject]()
return_ptr = &returns[0][i].second

# Skip return values that we already created and that were stored
# in plasma. This can occur if there were multiple return values,
# and we errored while trying to create one of them.
# Skip return values that we already created. This can occur if
# there were multiple return values, and we initially errored while
# trying to create one of them.
if (return_ptr.get() != NULL and return_ptr.get().GetData().get()
!= NULL and
return_ptr.get().GetData().get().IsPlasmaBuffer()):
# TODO(swang): This return object already has a value stored in Plasma
# because we created it before the error triggered. We should
# try to delete the current value and store the same error
# instead here.
logger.error("Generator threw exception after returning partial "
"values in the object store, error may be unhandled.")
!= NULL):
continue

context = worker.get_serialization_context()
Expand Down Expand Up @@ -2255,13 +2302,16 @@ cdef class CoreWorker:
data_size, metadata,
contained_id, &task_output_inlined_bytes,
return_ptr)
num_outputs_stored += 1

i += 1
if i < num_returns:
raise ValueError(
"Task returned {} objects, but num_returns={}.".format(
i, num_returns))

return num_outputs_stored

cdef c_function_descriptors_to_python(
self,
const c_vector[CFunctionDescriptor] &c_function_descriptors):
Expand Down
3 changes: 2 additions & 1 deletion python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
&creation_task_exception_pb_bytes,
c_bool *is_retryable_error,
const c_vector[CConcurrencyGroup] &defined_concurrency_groups,
const c_string name_of_concurrency_group_to_execute) nogil
const c_string name_of_concurrency_group_to_execute,
c_bool is_reattempt) nogil
) task_execution_callback
(void(const CWorkerID &) nogil) on_worker_shutdown
(CRayStatus() nogil) check_signals
Expand Down
Loading

0 comments on commit 9142e1b

Please sign in to comment.