From 819c6f1a8905dacc5e38e52c47a25ee990ffe777 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 24 May 2023 13:25:58 +0900 Subject: [PATCH] [2/N] Streaming Generator. Support core worker APIs + cython generator interface. (#35324) (#35682) This is the second PR to support streaming generator. The detailed design and API proposal can be found from https://docs.google.com/document/d/1hAASLe2sCoay23raqxqwJdSDiJWNMcNhlTwWJXsJOU4/edit#heading=h.w91y1fgnpu0m. The Execution plan can be found from https://docs.google.com/document/d/1hAASLe2sCoay23raqxqwJdSDiJWNMcNhlTwWJXsJOU4/edit#heading=h.kxktymq5ihf7. There will be 4 PRs to enable streaming generator for Ray Serve (phase 1). This PR -> introduce cpp interfaces to handle intermediate task return [1/N] Streaming Generator. Cpp interfaces and implementation #35291 Support core worker APIs + cython generator interface. [2/N] Streaming Generator. Support core worker APIs + cython generator interface. #35324 < --- this PR E2e integration [3/N] Streaming Generator. E2e integration #35325 (review) Support async actors This PR implements the Cython generator interface that users can use to obtain a next available object reference. --------- Signed-off-by: SangBin Cho --- python/ray/_raylet.pyx | 172 ++++++++++++++++++ python/ray/includes/common.pxd | 4 + python/ray/includes/libcoreworker.pxd | 5 + python/ray/tests/BUILD | 1 + python/ray/tests/test_streaming_generator.py | 143 +++++++++++++++ src/ray/core_worker/core_worker.cc | 22 +++ src/ray/core_worker/core_worker.h | 32 ++++ src/ray/core_worker/test/task_manager_test.cc | 1 - 8 files changed, 379 insertions(+), 1 deletion(-) create mode 100644 python/ray/tests/test_streaming_generator.py diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 4cdf472e2759..90310c081bfb 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -198,6 +198,149 @@ class ObjectRefGenerator: return len(self._refs) +class ObjectRefStreamEoFError(RayError): + pass + + +class StreamingObjectRefGenerator: + def __init__(self, generator_ref: ObjectRef, worker: "Worker"): + # The reference to a generator task. + self._generator_ref = generator_ref + # The last time generator task has completed. + self._generator_task_completed_time = None + # The exception raised from a generator task. + self._generator_task_exception = None + # Ray's worker class. ray._private.worker.global_worker + self.worker = worker + assert hasattr(worker, "core_worker") + self.worker.core_worker.create_object_ref_stream(self._generator_ref) + + def __iter__(self) -> "StreamingObjectRefGenerator": + return self + + def __next__(self) -> ObjectRef: + """Waits until a next ref is available and returns the object ref. + + Raises StopIteration if there's no more objects + to generate. + + The object ref will contain an exception if the task fails. + When the generator task returns N objects, it can return + up to N + 1 objects (if there's a system failure, the + last object will contain a system level exception). + """ + return self._next() + + def _next( + self, + timeout_s: float = -1, + sleep_interval_s: float = 0.0001, + unexpected_network_failure_timeout_s: float = 30) -> ObjectRef: + """Waits for timeout_s and returns the object ref if available. + + If an object is not available within the given timeout, it + returns a nil object reference. + + If -1 timeout is provided, it means it waits infinitely. + + Waiting is implemented as busy waiting. You can control + the busy waiting interval via sleep_interval_s. + + Raises StopIteration if there's no more objects + to generate. + + The object ref will contain an exception if the task fails. + When the generator task returns N objects, it can return + up to N + 1 objects (if there's a system failure, the + last object will contain a system level exception). + + Args: + timeout_s: If the next object is not ready within + this timeout, it returns the nil object ref. + sleep_interval_s: busy waiting interval. + unexpected_network_failure_timeout_s: If the + task is finished, but the next ref is not + available within this time, it will hard fail + the generator. + """ + obj = self._handle_next() + last_time = time.time() + + # The generator ref will be None if the task succeeds. + # It will contain an exception if the task fails by + # a system error. + while obj.is_nil(): + if self._generator_task_exception: + # The generator task has failed already. + # We raise StopIteration + # to conform the next interface in Python. + raise StopIteration from None + else: + # Otherwise, we should ray.get on the generator + # ref to find if the task has a system failure. + # Return the generator ref that contains the system + # error as soon as possible. + r, _ = ray.wait([self._generator_ref], timeout=0) + if len(r) > 0: + try: + ray.get(r) + except Exception as e: + # If it has failed, return the generator task ref + # so that the ref will raise an exception. + self._generator_task_exception = e + return self._generator_ref + finally: + if self._generator_task_completed_time is None: + self._generator_task_completed_time = time.time() + + # Currently, since the ordering of intermediate result report + # is not guaranteed, it is possible that althoug the task + # has succeeded, all of the object references are not reported + # (e.g., when there are network failures). + # If all the object refs are not reported to the generator + # within 30 seconds, we consider is as an unreconverable error. + if self._generator_task_completed_time: + if (time.time() - self._generator_task_completed_time + > unexpected_network_failure_timeout_s): + # It means the next wasn't reported although the task + # has been terminated 30 seconds ago. + self._generator_task_exception = AssertionError + assert False, "Unexpected network failure occured." + + if timeout_s != -1 and time.time() - last_time > timeout_s: + return ObjectRef.nil() + + # 100us busy waiting + time.sleep(sleep_interval_s) + obj = self._handle_next() + return obj + + def _handle_next(self) -> ObjectRef: + try: + if hasattr(self.worker, "core_worker"): + obj = self.worker.core_worker.try_read_next_object_ref_stream( + self._generator_ref) + return obj + else: + raise ValueError( + "Cannot access the core worker. " + "Did you already shutdown Ray via ray.shutdown()?") + except ObjectRefStreamEoFError: + raise StopIteration from None + + def __del__(self): + if hasattr(self.worker, "core_worker"): + # NOTE: This can be called multiple times + # because python doesn't guarantee __del__ is called + # only once. + self.worker.core_worker.delete_object_ref_stream(self._generator_ref) + + def __getstate__(self): + raise TypeError( + "You cannot return or pass a generator to other task. " + "Serializing a StreamingObjectRefGenerator is not allowed.") + + cdef int check_status(const CRayStatus& status) nogil except -1: if status.ok(): return 0 @@ -209,6 +352,8 @@ cdef int check_status(const CRayStatus& status) nogil except -1: raise ObjectStoreFullError(message) elif status.IsOutOfDisk(): raise OutOfDiskError(message) + elif status.IsObjectRefStreamEoF(): + raise ObjectRefStreamEoFError(message) elif status.IsInterrupted(): raise KeyboardInterrupt() elif status.IsTimedOut(): @@ -3139,6 +3284,33 @@ cdef class CoreWorker: CCoreWorkerProcess.GetCoreWorker() \ .RecordTaskLogEnd(out_end_offset, err_end_offset) + def create_object_ref_stream(self, ObjectRef generator_id): + cdef: + CObjectID c_generator_id = generator_id.native() + + CCoreWorkerProcess.GetCoreWorker().CreateObjectRefStream(c_generator_id) + + def delete_object_ref_stream(self, ObjectRef generator_id): + cdef: + CObjectID c_generator_id = generator_id.native() + + CCoreWorkerProcess.GetCoreWorker().DelObjectRefStream(c_generator_id) + + def try_read_next_object_ref_stream(self, ObjectRef generator_id): + cdef: + CObjectID c_generator_id = generator_id.native() + CObjectReference c_object_ref + + check_status( + CCoreWorkerProcess.GetCoreWorker().TryReadObjectRefStream( + c_generator_id, &c_object_ref)) + return ObjectRef( + c_object_ref.object_id(), + c_object_ref.owner_address().SerializeAsString(), + "", + # Already added when the ref is updated. + skip_adding_local_ref=True) + cdef void async_callback(shared_ptr[CRayObject] obj, CObjectID object_ref, void *user_callback) with gil: diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index a06630fd2132..09d1de01c251 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -99,6 +99,9 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: @staticmethod CRayStatus NotFound() + @staticmethod + CRayStatus ObjectRefStreamEoF() + c_bool ok() c_bool IsOutOfMemory() c_bool IsKeyError() @@ -118,6 +121,7 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil: c_bool IsObjectUnknownOwner() c_bool IsRpcError() c_bool IsOutOfResource() + c_bool IsObjectRefStreamEoF() c_string ToString() c_string CodeAsString() diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 2c71e1a5d809..306d5f940fd2 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -147,6 +147,11 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CObjectID& return_id, shared_ptr[CRayObject] *return_object, const CObjectID& generator_id) + void DelObjectRefStream(const CObjectID &generator_id) + void CreateObjectRefStream(const CObjectID &generator_id) + CRayStatus TryReadObjectRefStream( + const CObjectID &generator_id, + CObjectReference *object_ref_out) CObjectID AllocateDynamicReturnId() CJobID GetCurrentJobId() diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 8cb7a0beed60..112c2587c96b 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -46,6 +46,7 @@ py_test_module_list( "test_gcs_fault_tolerance.py", "test_gcs_utils.py", "test_generators.py", + "test_streaming_generator.py", "test_metrics_agent.py", "test_metrics_head.py", "test_component_failures_2.py", diff --git a/python/ray/tests/test_streaming_generator.py b/python/ray/tests/test_streaming_generator.py new file mode 100644 index 000000000000..0f3bb742b465 --- /dev/null +++ b/python/ray/tests/test_streaming_generator.py @@ -0,0 +1,143 @@ +import pytest +import sys +import time + +from unittest.mock import patch, Mock + +import ray +from ray._raylet import StreamingObjectRefGenerator, ObjectRefStreamEoFError +from ray.cloudpickle import dumps +from ray.exceptions import WorkerCrashedError + + +class MockedWorker: + def __init__(self, mocked_core_worker): + self.core_worker = mocked_core_worker + + def reset_core_worker(self): + """Emulate the case ray.shutdown is called + and the core_worker instance is GC'ed. + """ + self.core_worker = None + + +@pytest.fixture +def mocked_worker(): + mocked_core_worker = Mock() + mocked_core_worker.try_read_next_object_ref_stream.return_value = None + mocked_core_worker.delete_object_ref_stream.return_value = None + mocked_core_worker.create_object_ref_stream.return_value = None + worker = MockedWorker(mocked_core_worker) + yield worker + + +def test_streaming_object_ref_generator_basic_unit(mocked_worker): + """ + Verify the basic case: + create a generator -> read values -> nothing more to read -> delete. + """ + with patch("ray.wait") as mocked_ray_wait: + c = mocked_worker.core_worker + generator_ref = ray.ObjectRef.from_random() + generator = StreamingObjectRefGenerator(generator_ref, mocked_worker) + c.try_read_next_object_ref_stream.return_value = ray.ObjectRef.nil() + c.create_object_ref_stream.assert_called() + + # Test when there's no new ref, it returns a nil. + mocked_ray_wait.return_value = [], [generator_ref] + ref = generator._next(timeout_s=0) + assert ref.is_nil() + + # When the new ref is available, next should return it. + for _ in range(3): + new_ref = ray.ObjectRef.from_random() + c.try_read_next_object_ref_stream.return_value = new_ref + ref = generator._next(timeout_s=0) + assert new_ref == ref + + # When try_read_next_object_ref_stream raises a + # ObjectRefStreamEoFError, it should raise a stop iteration. + c.try_read_next_object_ref_stream.side_effect = ObjectRefStreamEoFError( + "" + ) # noqa + with pytest.raises(StopIteration): + ref = generator._next(timeout_s=0) + + # Make sure we cannot serialize the generator. + with pytest.raises(TypeError): + dumps(generator) + + del generator + c.delete_object_ref_stream.assert_called() + + +def test_streaming_object_ref_generator_task_failed_unit(mocked_worker): + """ + Verify when a task is failed by a system error, + the generator ref is returned. + """ + with patch("ray.get") as mocked_ray_get: + with patch("ray.wait") as mocked_ray_wait: + c = mocked_worker.core_worker + generator_ref = ray.ObjectRef.from_random() + generator = StreamingObjectRefGenerator(generator_ref, mocked_worker) + + # Simulate the worker failure happens. + mocked_ray_wait.return_value = [generator_ref], [] + mocked_ray_get.side_effect = WorkerCrashedError() + + c.try_read_next_object_ref_stream.return_value = ray.ObjectRef.nil() + ref = generator._next(timeout_s=0) + # If the generator task fails by a systsem error, + # meaning the ref will raise an exception + # it should be returned. + print(ref) + print(generator_ref) + assert ref == generator_ref + + # Once exception is raised, it should always + # raise stopIteration regardless of what + # the ref contains now. + with pytest.raises(StopIteration): + ref = generator._next(timeout_s=0) + + +def test_streaming_object_ref_generator_network_failed_unit(mocked_worker): + """ + Verify when a task is finished, but if the next ref is not available + on time, it raises an assertion error. + + TODO(sang): Once we move the task subimssion path to use pubsub + to guarantee the ordering, we don't need this test anymore. + """ + with patch("ray.get") as mocked_ray_get: + with patch("ray.wait") as mocked_ray_wait: + c = mocked_worker.core_worker + generator_ref = ray.ObjectRef.from_random() + generator = StreamingObjectRefGenerator(generator_ref, mocked_worker) + + # Simulate the task has finished. + mocked_ray_wait.return_value = [generator_ref], [] + mocked_ray_get.return_value = None + + # If StopIteration is not raised within + # unexpected_network_failure_timeout_s second, + # it should fail. + c.try_read_next_object_ref_stream.return_value = ray.ObjectRef.nil() + ref = generator._next(timeout_s=0, unexpected_network_failure_timeout_s=1) + assert ref == ray.ObjectRef.nil() + time.sleep(1) + with pytest.raises(AssertionError): + generator._next(timeout_s=0, unexpected_network_failure_timeout_s=1) + # After that StopIteration should be raised. + with pytest.raises(StopIteration): + generator._next(timeout_s=0, unexpected_network_failure_timeout_s=1) + + +if __name__ == "__main__": + import os + + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index f73bc01a470d..db6253d7b461 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2769,6 +2769,28 @@ Status CoreWorker::SealReturnObject(const ObjectID &return_id, return status; } +void CoreWorker::CreateObjectRefStream(const ObjectID &generator_id) { + task_manager_->CreateObjectRefStream(generator_id); +} + +void CoreWorker::DelObjectRefStream(const ObjectID &generator_id) { + task_manager_->DelObjectRefStream(generator_id); +} + +Status CoreWorker::TryReadObjectRefStream(const ObjectID &generator_id, + rpc::ObjectReference *object_ref_out) { + ObjectID object_id; + const auto &status = task_manager_->TryReadObjectRefStream(generator_id, &object_id); + if (!status.ok()) { + return status; + } + + RAY_CHECK(object_ref_out != nullptr); + object_ref_out->set_object_id(object_id.Binary()); + object_ref_out->mutable_owner_address()->CopyFrom(rpc_address_); + return status; +} + bool CoreWorker::PinExistingReturnObject(const ObjectID &return_id, std::shared_ptr *return_object, const ObjectID &generator_id) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 5c68373d04e0..2f08d1872794 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -360,6 +360,38 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { NodeID GetCurrentNodeId() const { return NodeID::FromBinary(rpc_address_.raylet_id()); } + /// Create the ObjectRefStream of generator_id. + /// + /// It is a pass-through method. See TaskManager::CreateObjectRefStream + /// for details. + /// + /// \param[in] generator_id The object ref id of the streaming + /// generator task. + void CreateObjectRefStream(const ObjectID &generator_id); + + /// Read the next index of a ObjectRefStream of generator_id. + /// + /// \param[in] generator_id The object ref id of the streaming + /// generator task. + /// \param[out] object_ref_out The ObjectReference + /// that the caller can convert to its own ObjectRef. + /// The current process is always the owner of the + /// generated ObjectReference. + /// \return Status RayKeyError if the stream reaches to EoF. + /// OK otherwise. + Status TryReadObjectRefStream(const ObjectID &generator_id, + rpc::ObjectReference *object_ref_out); + + /// Delete the ObjectRefStream of generator_id + /// created by CreateObjectRefStream. + /// + /// It is a pass-through method. See TaskManager::DelObjectRefStream + /// for details. + /// + /// \param[in] generator_id The object ref id of the streaming + /// generator task. + void DelObjectRefStream(const ObjectID &generator_id); + const PlacementGroupID &GetCurrentPlacementGroupId() const { return worker_context_.GetCurrentPlacementGroupId(); } diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index d6daeebbdad8..ce7c32547446 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -1598,7 +1598,6 @@ TEST_F(TaskManagerTest, TestObjectRefStreamDelCleanReferences) { // NOTE: We panic if READ is called after DELETE. The // API caller should guarantee this doesn't happen. // So we don't test it. - // WRITE 3. Should be ignored. auto dynamic_return_id3 = ObjectID::FromIndex(spec.TaskId(), 4); data = GenerateRandomBuffer();