diff --git a/python/ray/tests/test_streaming_generator.py b/python/ray/tests/test_streaming_generator.py index 68b0c6ba5ed3d..4005d40205967 100644 --- a/python/ray/tests/test_streaming_generator.py +++ b/python/ray/tests/test_streaming_generator.py @@ -466,9 +466,32 @@ def get_data(self): for ref in chain_actor_4.get_data.options(num_returns="streaming").remote(): assert np.array_equal(np.ones(5 * 1024 * 1024), ray.get(ref)) + print("getting the next data") del ref +def test_generator_slow_pinning_requests(monkeypatch, shutdown_only): + """ + Verify when the Object pinning request from the raylet + is reported slowly, there's no refernece leak. + """ + with monkeypatch.context() as m: + # defer for 10s for the second node. + m.setenv( + "RAY_testing_asio_delay_us", + "CoreWorkerService.grpc_server.PubsubLongPolling=1000000:1000000", + ) + + @ray.remote + def f(): + yield np.ones(5 * 1024 * 1024) + + for ref in f.options(num_returns="streaming").remote(): + del ref + + print(list_objects()) + + @pytest.mark.parametrize("store_in_plasma", [False, True]) def test_actor_streaming_generator(shutdown_only, store_in_plasma): """Test actor/async actor with sync/async generator interfaces.""" diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 327a04c671a2f..60911031222b1 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -3300,10 +3300,9 @@ void CoreWorker::ProcessSubscribeForObjectEviction( const auto generator_id = ObjectID::FromBinary(message.generator_id()); RAY_CHECK(!generator_id.IsNil()); if (task_manager_->ObjectRefStreamExists(generator_id)) { - // It is possible this reference will leak if the ObjectRefStream is - // deleted or the corresponding object ID is not reported via - // HandleReportGeneratorItemReturns. TODO(sang): Handle the edge case. - reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id); + // ObjectRefStreamExists is used to distinguigsh num_returns="dynamic" vs + // "streaming". + task_manager_->TemporarilyOwnGeneratorReturnRefIfNeeded(object_id, generator_id); } else { reference_counter_->AddDynamicReturn(object_id, generator_id); } @@ -3439,12 +3438,10 @@ void CoreWorker::AddSpilledObjectLocationOwner( // object is spilled before the reply from the task that created the // object. Add the dynamically created object to our ref counter so that we // know that it exists. - RAY_CHECK(!generator_id->IsNil()); if (task_manager_->ObjectRefStreamExists(*generator_id)) { - // It is possible this reference will leak if the ObjectRefStream is - // deleted or the corresponding object ID is not reported via - // HandleReportGeneratorItemReturns. TODO(sang): Handle the edge case. - reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, *generator_id); + // ObjectRefStreamExists is used to distinguigsh num_returns="dynamic" vs + // "streaming". + task_manager_->TemporarilyOwnGeneratorReturnRefIfNeeded(object_id, *generator_id); } else { reference_counter_->AddDynamicReturn(object_id, *generator_id); } @@ -3476,10 +3473,10 @@ void CoreWorker::AddObjectLocationOwner(const ObjectID &object_id, const auto &maybe_generator_id = task_manager_->TaskGeneratorId(object_id.TaskId()); if (!maybe_generator_id.IsNil()) { if (task_manager_->ObjectRefStreamExists(maybe_generator_id)) { - // It is possible this reference will leak if the ObjectRefStream is - // deleted or the corresponding object ID is not reported via - // HandleReportGeneratorItemReturns. TODO(sang): Handle the edge case. - reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, maybe_generator_id); + // ObjectRefStreamExists is used to distinguigsh num_returns="dynamic" vs + // "streaming". + task_manager_->TemporarilyOwnGeneratorReturnRefIfNeeded(object_id, + maybe_generator_id); } else { // The task is a generator and may not have finished yet. Add the internal // ObjectID so that we can update its location. diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 4577a294f09a6..f3a60f3a80117 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -43,6 +43,10 @@ std::vector ObjectRefStream::GetItemsUnconsumed() const { result.push_back(object_id); } } + // Temporarily owned refs are not consumed. + for (const auto &object_id : temporarily_owned_refs_) { + result.push_back(object_id); + } return result; } @@ -80,6 +84,28 @@ Status ObjectRefStream::TryReadNextItem(ObjectID *object_id_out) { return Status::OK(); } +bool ObjectRefStream::TemporarilyInsertToStreamIfNeeded(const ObjectID &object_id) { + // Write to a stream if the object ID is not consumed yet. + auto last_consumed_index = next_index_ - 1; + if (item_index_to_refs_.find(last_consumed_index) != item_index_to_refs_.end()) { + // Object ID from the generator task always increment. I.e., the first + // return has a lower ObjectID bytes than the second return. + // If the last conusumed object ID's index is lower than a given ObjectID, + // it means the given ref is not consumed yet, meaning we should + // write to a stream. + auto not_consumed_yet = + item_index_to_refs_[last_consumed_index].ObjectIndex() < object_id.ObjectIndex(); + return not_consumed_yet; + } + + if (refs_written_to_stream_.find(object_id) == refs_written_to_stream_.end()) { + temporarily_owned_refs_.insert(object_id); + return true; + } + + return false; +} + bool ObjectRefStream::InsertToStream(const ObjectID &object_id, int64_t item_index) { if (end_of_stream_index_ != -1) { RAY_CHECK(next_index_ <= end_of_stream_index_); @@ -90,6 +116,11 @@ bool ObjectRefStream::InsertToStream(const ObjectID &object_id, int64_t item_ind return false; } + if (temporarily_owned_refs_.find(object_id) != temporarily_owned_refs_.end()) { + temporarily_owned_refs_.erase(object_id); + } + refs_written_to_stream_.insert(object_id); + auto it = item_index_to_refs_.find(item_index); if (it != item_index_to_refs_.end()) { // It means the when a task is retried it returns a different object id @@ -413,7 +444,7 @@ void TaskManager::DelObjectRefStream(const ObjectID &generator_id) { for (const auto &object_id : object_ids_unconsumed) { std::vector deleted; reference_counter_->RemoveLocalReference(object_id, &deleted); - RAY_CHECK_EQ(deleted.size(), 1UL); + RAY_CHECK_GE(deleted.size(), 1UL); } } @@ -511,6 +542,29 @@ bool TaskManager::HandleReportGeneratorItemReturns( return num_objects_written != 0; } +bool TaskManager::TemporarilyOwnGeneratorReturnRefIfNeeded(const ObjectID &object_id, + const ObjectID &generator_id) { + bool inserted_to_stream = false; + { + absl::MutexLock lock(&mu_); + auto stream_it = object_ref_streams_.find(generator_id); + if (stream_it == object_ref_streams_.end()) { + return false; + } + + auto &stream = stream_it->second; + inserted_to_stream = stream.TemporarilyInsertToStreamIfNeeded(object_id); + } + + // We shouldn't hold a lock when calling refernece counter API. + if (inserted_to_stream) { + reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id); + return true; + } + + return false; +} + void TaskManager::CompletePendingTask(const TaskID &task_id, const rpc::PushTaskReply &reply, const rpc::Address &worker_addr, diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 5db92ca75ff1e..5b2ec2d7055b7 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -112,10 +112,27 @@ class ObjectRefStream { /// /// \param[in] object_id The object id that will be read at index item_index. /// \param[in] item_index The index where the object id will be written. - /// \return True if the idx hasn't been used. False otherwise. + /// If -1 is given, it means an index is not known yet. In this case, + /// the ref will be temporarily written until it is written with an index. + /// \return True if the ref is written to a stream. False otherwise. bool InsertToStream(const ObjectID &object_id, int64_t item_index); - /// Mark the stream canont be used anymore. + /// Sometimes, index of the object ID is not known. + /// + /// In this case, we should temporarily write the object ref to the + /// stream until it is written with an index. + /// + /// In the following scenario, the API will be no-op because + /// it means the object ID was already written with an index. + /// - If the object ID is already consumed. + /// - If the object ID is already written with an index. + /// + /// \param[in] object_id The temporarily written object id. + /// \return True if object ID is temporarily written. False otherwise. + bool TemporarilyInsertToStreamIfNeeded(const ObjectID &object_id); + + /// Mark that after a given item_index, the stream cannot be written + /// anymore. /// /// \param[in] The last item index that means the end of stream. void MarkEndOfStream(int64_t item_index); @@ -130,6 +147,11 @@ class ObjectRefStream { /// The item_index -> object reference ids. absl::flat_hash_map item_index_to_refs_; + /// Refs that are temporarily owned. It means a ref is + /// written to a stream, but index is not known yet. + absl::flat_hash_set temporarily_owned_refs_; + // A set of refs that's already written to a stream. + absl::flat_hash_set refs_written_to_stream_; /// The last index of the stream. /// item_index < last will contain object references. /// If -1, that means the stream hasn't reached to EoF. @@ -219,12 +241,43 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa const rpc::Address &worker_addr, bool is_application_error) override; - /// Handle the task return reported before the task terminates. + /// Handle the generator task return so that it will be accessible + /// via TryReadObjectRefStream. + /// + /// Generator tasks can report task returns before task is finished. + /// It is the opposite of regular tasks which can only batch + /// report the task returns after the task finishes. /// /// \return True if a task return is registered. False otherwise. bool HandleReportGeneratorItemReturns( const rpc::ReportGeneratorItemReturnsRequest &request); + /// Temporarily register a given generator return reference. + /// + /// For a generator return, the references are not known until + /// it is reported from an executor (via HandleReportGeneratorItemReturns). + /// However, there are times when generator return references need to be + /// owned before the return values are reported. + /// + /// For example, when an object is created or spilled from the object store, + /// pinning or OBOD update requests could be sent from raylets, + /// and it is possible those requests come before generator returns + /// are reported. In this case, we should own a reference temporarily, + /// otherwise, these requests will be ignored. + /// + /// In the following scenario, references don't need to be owned. In this case, + /// the API will be no-op. + /// - The stream has been already deleted. + /// - The reference is already read/consumed from a stream. + /// In this case, we already owned or GC'ed the refernece. + /// - The reference is already owned via HandleReportGeneratorItemReturns. + /// + /// \param object_id The object ID to temporarily owns. + /// \param generator_id The return ref ID of a generator task. + /// \return True if we temporarily owned the reference. False otherwise. + bool TemporarilyOwnGeneratorReturnRefIfNeeded(const ObjectID &object_id, + const ObjectID &generator_id); + /// Delete the object ref stream. /// /// Once the stream is deleted, it will clean up all unconsumed diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index e01d8f8f8d31b..36f8a79eed366 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -1732,6 +1732,100 @@ TEST_F(TaskManagerTest, TestObjectRefStreamDelOutOfOrder) { ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1); } +TEST_F(TaskManagerTest, TestObjectRefStreamTemporarilyOwnGeneratorReturnRefIfNeeded) { + /** + * Test TemporarilyOwnGeneratorReturnRefIfNeeded + */ + // Submit a generator task. + rpc::Address caller_address; + auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true); + auto generator_id = spec.ReturnId(0); + manager_.AddPendingTask(caller_address, spec, "", /*num_retries=*/0); + manager_.MarkDependenciesResolved(spec.TaskId()); + manager_.MarkTaskWaitingForExecution( + spec.TaskId(), NodeID::FromRandom(), WorkerID::FromRandom()); + + /** + * Test TemporarilyOwnGeneratorReturnRefIfNeeded is no-op when the stream is + * not created yet. + */ + auto dynamic_return_id_index_0 = ObjectID::FromIndex(spec.TaskId(), 2); + manager_.TemporarilyOwnGeneratorReturnRefIfNeeded(dynamic_return_id_index_0, + generator_id); + // It is no-op if the object ref stream is not created. + ASSERT_FALSE(reference_counter_->HasReference(dynamic_return_id_index_0)); + + /** + * Test TemporarilyOwnGeneratorReturnRefIfNeeded called before any + * HandleReportGeneratorItemReturns adds a refernece. + */ + // CREATE + manager_.CreateObjectRefStream(generator_id); + manager_.TemporarilyOwnGeneratorReturnRefIfNeeded(dynamic_return_id_index_0, + generator_id); + // We has a reference to this object before the ref is + // reported via HandleReportGeneratorItemReturns. + ASSERT_TRUE(reference_counter_->HasReference(dynamic_return_id_index_0)); + + /** + * Test TemporarilyOwnGeneratorReturnRefIfNeeded called after the + * ref consumed / removed will be no-op. + */ + // WRITE 0 -> WRITE 1 + auto data = GenerateRandomBuffer(); + auto req = GetIntermediateTaskReturn( + /*idx*/ 0, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id_index_0, + /*data*/ data, + /*set_in_plasma*/ false); + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req)); + auto dynamic_return_id_index_1 = ObjectID::FromIndex(spec.TaskId(), 3); + data = GenerateRandomBuffer(); + req = GetIntermediateTaskReturn( + /*idx*/ 1, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id_index_1, + /*data*/ data, + /*set_in_plasma*/ false); + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req)); + + // READ 0 -> READ 1 + for (auto i = 0; i < 2; i++) { + ObjectID object_id; + auto status = manager_.TryReadObjectRefStream(generator_id, &object_id); + ASSERT_TRUE(status.ok()); + } + + std::vector removed; + reference_counter_->RemoveLocalReference(dynamic_return_id_index_1, &removed); + ASSERT_EQ(removed.size(), 1UL); + ASSERT_FALSE(reference_counter_->HasReference(dynamic_return_id_index_1)); + // If the ref has been already consumed and deleted, + // this shouldn't add a reference. + manager_.TemporarilyOwnGeneratorReturnRefIfNeeded(dynamic_return_id_index_1, + generator_id); + ASSERT_FALSE(reference_counter_->HasReference(dynamic_return_id_index_1)); + + /** + * Test TemporarilyOwnGeneratorReturnRefIfNeeded called but + * HandleReportGeneratorItemReturns is never called. In this case, when + * the stream is deleted these refs should be cleaned up. + */ + manager_.DelObjectRefStream(generator_id); + manager_.CreateObjectRefStream(generator_id); + manager_.TemporarilyOwnGeneratorReturnRefIfNeeded(dynamic_return_id_index_0, + generator_id); + ASSERT_TRUE(reference_counter_->HasReference(dynamic_return_id_index_0)); + manager_.DelObjectRefStream(generator_id); + ASSERT_FALSE(reference_counter_->HasReference(dynamic_return_id_index_0)); + + rpc::PushTaskReply reply; + manager_.CompletePendingTask(spec.TaskId(), reply, caller_address, false); +} + } // namespace core } // namespace ray