Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Suppress harmless ObjectRefStreamEndOfStreamError when using asyncio #37062

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/ray/_private/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
TaskUnschedulableError,
WorkerCrashedError,
OutOfMemoryError,
ObjectRefStreamEndOfStreamError,
)
from ray.util import serialization_addons
from ray.util import inspect_serializability
Expand Down Expand Up @@ -359,6 +360,8 @@ def _deserialize_object(self, data, metadata, object_ref):
elif error_type == ErrorType.Value("ACTOR_UNSCHEDULABLE_ERROR"):
error_info = self._deserialize_error_info(data, metadata_fields)
return ActorUnschedulableError(error_info.error_message)
elif error_type == ErrorType.Value("END_OF_STREAMING_GENERATOR"):
return ObjectRefStreamEndOfStreamError()
else:
return RaySystemError("Unrecognized error type " + str(error_type))
elif data:
Expand Down
10 changes: 6 additions & 4 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ from ray.exceptions import (
AsyncioActorExit,
PendingCallsLimitExceeded,
RpcError,
ObjectRefStreamEndOfStreamError,
)
from ray._private import external_storage
from ray.util.scheduling_strategies import (
Expand Down Expand Up @@ -221,10 +222,6 @@ class ObjectRefGenerator:
return len(self._refs)


class ObjectRefStreamEndOfStreamError(RayError):
pass


class StreamingObjectRefGenerator:
def __init__(self, generator_ref: ObjectRef, worker: "Worker"):
# The reference to a generator task.
Expand Down Expand Up @@ -338,6 +335,11 @@ class StreamingObjectRefGenerator:
ready, unready = await asyncio.wait([ref], timeout=timeout_s)
if len(unready) > 0:
return ObjectRef.nil()
# NOTE(swang): Hack to avoid asyncio warnings about not retrieving the
# exception. The exception will get returned to the user through the
# below code.
for result in ready:
exc = result.exception()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will suppress any form of unexpected exception. I would suggest to explicitly wrap the ref in a task instead and ignore the specific exception type:

async def peek_stream_wrapper(ref: "ImNotSureWhatTypeThisIs"):
    try:
        await ref
    except ObjectRefStreamEndOfStreamError:
        pass # Expected.

peek_stream_task = asyncio.create_task(
    peek_stream_wrapper(core_worker.peek_object_ref_stream(self._generator_ref))
)
ready, unready = await asyncio.wait([peek_stream_task], timeout=timeout_s)

asyncio.wait does this under the hood anyways so there shouldn't be additional overhead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm actually I don't think we should do this. This part of the code is only meant to wait for the ref to become ready; we just don't have a way to do this for asyncio right now without also fetching the data.

The expected use case is that the user will then do something with the returned ref once it's ready. I believe Ray also warns if the ref has an exception and is never read, so we'll be warning twice if we don't suppress the asyncio error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see so this could raise other exceptions aside from ObjectRefStreamEndOfStreamError?

Ideally we would "whitelist" the expected exceptions here -- maybe a conservative way to do it would be except RayError? I'm just a little uneasy about suppressing all exceptions, there may be some corner case that raises an unexpected exception and identifying/debugging it will be much easier if the exception is logged.

Ultimately up to you though, if you don't think this is an issue then suppressing them all is fine. Though it's probably still slightly less hacky to wrap it in a task and catch bare Exception.

Copy link
Contributor Author

@stephanie-wang stephanie-wang Jul 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, to clarify, the reasoning for suppressing all errors is that at this point, the user hasn't actually tried to receive the error yet. This function is supposed to return the ObjectRef that the user can await, and then at point, asyncio will warn if the user doesn't retrieve the exception.

So either we don't suppress the error here, and the user will always see an extra warning, or we do suppress the error here, and then asyncio or Ray will warn if the user doesn't do anything with the ObjectRef.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks for clarifying.


try:
ref = core_worker.try_read_next_object_ref_stream(
Expand Down
9 changes: 9 additions & 0 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,15 @@ def __str__(self):
return f"The actor is not schedulable: {self.error_message}"


@DeveloperAPI
class ObjectRefStreamEndOfStreamError(RayError):
"""Raised by streaming generator tasks when there are no more ObjectRefs to
read.
"""

pass


RAY_EXCEPTION_TYPES = [
PlasmaObjectNotAvailable,
RayError,
Expand Down
7 changes: 7 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,13 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
local_raylet_client_,
options_.check_signals,
[this](const RayObject &obj) {
rpc::ErrorType error_type;
if (obj.IsException(&error_type) &&
error_type == rpc::ErrorType::END_OF_STREAMING_GENERATOR) {
// End-of-stream ObjectRefs are sentinels and should never get
// returned to the caller.
return;
}
// Run this on the event loop to avoid calling back into the language runtime
// from the middle of user operations.
io_service_.post(
Expand Down