From 6050f2346f69bfea6971e6fe72973fbfb3514f56 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Mon, 3 Jul 2023 17:28:05 -0500 Subject: [PATCH 1/4] Suppress error Signed-off-by: Stephanie Wang --- python/ray/_private/serialization.py | 3 +++ python/ray/_raylet.pyx | 10 ++++++---- python/ray/exceptions.py | 9 +++++++++ src/ray/core_worker/core_worker.cc | 7 +++++++ 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/python/ray/_private/serialization.py b/python/ray/_private/serialization.py index 64b2b1129fe3a..5e3281c1c1971 100644 --- a/python/ray/_private/serialization.py +++ b/python/ray/_private/serialization.py @@ -43,6 +43,7 @@ TaskUnschedulableError, WorkerCrashedError, OutOfMemoryError, + ObjectRefStreamEndOfStreamError, ) from ray.util import serialization_addons from ray.util import inspect_serializability @@ -359,6 +360,8 @@ def _deserialize_object(self, data, metadata, object_ref): elif error_type == ErrorType.Value("ACTOR_UNSCHEDULABLE_ERROR"): error_info = self._deserialize_error_info(data, metadata_fields) return ActorUnschedulableError(error_info.error_message) + elif error_type == ErrorType.Value("END_OF_STREAMING_GENERATOR"): + return ObjectRefStreamEndOfStreamError() else: return RaySystemError("Unrecognized error type " + str(error_type)) elif data: diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 5f5ec5e328b02..42699f965f694 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -151,6 +151,7 @@ from ray.exceptions import ( AsyncioActorExit, PendingCallsLimitExceeded, RpcError, + ObjectRefStreamEndOfStreamError, ) from ray._private import external_storage from ray.util.scheduling_strategies import ( @@ -221,10 +222,6 @@ class ObjectRefGenerator: return len(self._refs) -class ObjectRefStreamEndOfStreamError(RayError): - pass - - class StreamingObjectRefGenerator: def __init__(self, generator_ref: ObjectRef, worker: "Worker"): # The reference to a generator task. @@ -338,6 +335,11 @@ class StreamingObjectRefGenerator: ready, unready = await asyncio.wait([ref], timeout=timeout_s) if len(unready) > 0: return ObjectRef.nil() + # NOTE(swang): Hack to avoid asyncio warnings about not retrieving the + # exception. The exception will get returned to the user through the + # below code. + for result in ready: + exc = result.exception() try: ref = core_worker.try_read_next_object_ref_stream( diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index 276acfd372c66..27a9eaa35e283 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -714,6 +714,15 @@ def __str__(self): return f"The actor is not schedulable: {self.error_message}" +@DeveloperAPI +class ObjectRefStreamEndOfStreamError(RayError): + """Raised by streaming generator tasks when there are no more ObjectRefs to + read. + """ + + pass + + RAY_EXCEPTION_TYPES = [ PlasmaObjectNotAvailable, RayError, diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index b1c14ef87f996..57a9f4052dd04 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -321,6 +321,13 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ local_raylet_client_, options_.check_signals, [this](const RayObject &obj) { + rpc::ErrorType error_type; + if (obj.IsException(&error_type) && + error_type == rpc::ErrorType::END_OF_STREAMING_GENERATOR) { + // End-of-stream ObjectRefs are sentinels and should never get + // returned to the caller. + return; + } // Run this on the event loop to avoid calling back into the language runtime // from the middle of user operations. io_service_.post( From 19fc2a98fe37f8851d2b33e532985eefff0fff29 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Wed, 5 Jul 2023 12:55:10 -0500 Subject: [PATCH 2/4] less hacky Signed-off-by: Stephanie Wang --- python/ray/_raylet.pyx | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 42699f965f694..608c7d4342a06 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -321,6 +321,16 @@ class StreamingObjectRefGenerator: raise StopIteration return ref + async def suppress_exceptions(self, ref: ObjectRef): + # Wrap a streamed ref to avoid asyncio warnings about not retrieving + # the exception when we are just waiting for the ref to become ready. + # The exception will get returned (or warned) to the user once they + # actually await the ref. + try: + await ref + except Exception: + pass + async def _next_async( self, timeout_s: Optional[float] = None, @@ -332,14 +342,9 @@ class StreamingObjectRefGenerator: ref = core_worker.peek_object_ref_stream( self._generator_ref) # TODO(swang): Avoid fetching the value. - ready, unready = await asyncio.wait([ref], timeout=timeout_s) + ready, unready = await asyncio.wait([self.suppress_exceptions(ref)], timeout=timeout_s) if len(unready) > 0: return ObjectRef.nil() - # NOTE(swang): Hack to avoid asyncio warnings about not retrieving the - # exception. The exception will get returned to the user through the - # below code. - for result in ready: - exc = result.exception() try: ref = core_worker.try_read_next_object_ref_stream( From afb344c3ba90bb9d658bf71798e33075b75126d9 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Wed, 5 Jul 2023 12:55:31 -0500 Subject: [PATCH 3/4] lint Signed-off-by: Stephanie Wang --- python/ray/_raylet.pyx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 608c7d4342a06..cb6756f852bfd 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -342,7 +342,8 @@ class StreamingObjectRefGenerator: ref = core_worker.peek_object_ref_stream( self._generator_ref) # TODO(swang): Avoid fetching the value. - ready, unready = await asyncio.wait([self.suppress_exceptions(ref)], timeout=timeout_s) + ready, unready = await asyncio.wait([self.suppress_exceptions(ref)], + timeout=timeout_s) if len(unready) > 0: return ObjectRef.nil() From f2cc8d77f9af3ebb7e359b21ca445971e26f915f Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Wed, 5 Jul 2023 18:21:55 -0500 Subject: [PATCH 4/4] Remove unnecessary test Signed-off-by: Stephanie Wang --- python/ray/tests/test_streaming_generator.py | 90 -------------------- 1 file changed, 90 deletions(-) diff --git a/python/ray/tests/test_streaming_generator.py b/python/ray/tests/test_streaming_generator.py index 06e85a87ac2aa..cc03f0c3086d7 100644 --- a/python/ray/tests/test_streaming_generator.py +++ b/python/ray/tests/test_streaming_generator.py @@ -144,96 +144,6 @@ def test_streaming_object_ref_generator_task_failed_unit(mocked_worker): ref = generator._next_sync(timeout_s=0) -@pytest.mark.asyncio -async def test_streaming_object_ref_generator_unit_async(mocked_worker): - """ - Verify the basic case: - create a generator -> read values -> nothing more to read -> delete. - """ - c = mocked_worker.core_worker - generator_ref = ray.ObjectRef.from_random() - generator = StreamingObjectRefGenerator(generator_ref, mocked_worker) - c.try_read_next_object_ref_stream.return_value = ray.ObjectRef.nil() - - # Test when there's no new ref, it returns a nil. - next_ref = ray.ObjectRef.from_random() - - async def coro_ref(): - await asyncio.sleep(1) - return next_ref - - c.peek_object_ref_stream.return_value = coro_ref() - ref = await generator._next_async(timeout_s=0) - assert ref.is_nil() - - # When the new ref is available, next should return it. - for _ in range(3): - next_ref = ray.ObjectRef.from_random() - - async def coro_ref(): - return next_ref - - c.peek_object_ref_stream.return_value = coro_ref() - c.try_read_next_object_ref_stream.return_value = next_ref - ref = await generator._next_async(timeout_s=0) - assert next_ref == ref - - # When try_read_next_object_ref_stream raises a - # ObjectRefStreamEndOfStreamError, it should raise a stop iteration. - - async def coro_ref(): - return next_ref - - c.peek_object_ref_stream.return_value = coro_ref() - generator._generator_ref = coro_ref() - c.try_read_next_object_ref_stream.side_effect = ObjectRefStreamEndOfStreamError( - "" - ) # noqa - with pytest.raises(StopAsyncIteration): - ref = await generator._next_async(timeout_s=0) - - -@pytest.mark.asyncio -async def test_async_ref_generator_task_failed_unit(mocked_worker): - """ - Verify when a task is failed by a system error, - the generator ref is returned. - """ - c = mocked_worker.core_worker - generator_ref = ray.ObjectRef.from_random() - generator = StreamingObjectRefGenerator(generator_ref, mocked_worker) - - # Simulate the worker failure happens. - next_ref = ray.ObjectRef.from_random() - - async def coro_ref(): - return next_ref - - c.peek_object_ref_stream.return_value = coro_ref() - - # generator ref should raise an exception when a task fails. - - async def generator_ref_coro(): - raise WorkerCrashedError() - - generator_coro = generator_ref_coro() - generator._generator_ref = generator_coro - c.try_read_next_object_ref_stream.side_effect = ObjectRefStreamEndOfStreamError( - "" - ) # noqa - ref = await generator._next_async(timeout_s=0) - # If the generator task fails by a systsem error, - # meaning the ref will raise an exception - # it should be returned. - assert ref == generator_coro - - # Once exception is raised, it should always - # raise stopIteration regardless of what - # the ref contains now. - with pytest.raises(StopAsyncIteration): - ref = await generator._next_async(timeout_s=0) - - def test_generator_basic(shutdown_only): ray.init(num_cpus=1)