diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 9be8234a00843..6f6b9612a2674 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -599,8 +599,7 @@ cdef store_task_errors( CTaskType task_type, proctitle, c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns, - c_string* application_error, - ): + c_string* application_error): cdef: CoreWorker core_worker = worker.core_worker @@ -655,6 +654,7 @@ cdef store_task_errors( raise RayActorError.from_task_error(failure_object) return num_errors_stored + cdef execute_dynamic_generator_and_store_task_outputs( generator, const CObjectID &generator_id, @@ -995,7 +995,6 @@ cdef void execute_task( # Store the outputs in the object store. with core_worker.profile_event(b"task:store_outputs"): - num_returns = returns[0].size() if dynamic_returns != NULL: if not inspect.isgenerator(outputs): raise ValueError( diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 42c17b8572ca4..2c71e1a5d809c 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -146,8 +146,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: c_bool PinExistingReturnObject( const CObjectID& return_id, shared_ptr[CRayObject] *return_object, - const CObjectID& generator_id - ) + const CObjectID& generator_id) CObjectID AllocateDynamicReturnId() CJobID GetCurrentJobId() @@ -237,6 +236,12 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: int64_t timeout_ms, c_vector[shared_ptr[CObjectLocation]] *results) CRayStatus TriggerGlobalGC() + CRayStatus ReportGeneratorItemReturns( + const pair[CObjectID, shared_ptr[CRayObject]] &dynamic_return_object, + const CObjectID &generator_id, + const CAddress &caller_address, + int64_t item_index, + c_bool finished) c_string MemoryUsageString() CWorkerContext &GetWorkerContext() diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 2d1162b6ce336..79853deff0988 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -306,6 +306,7 @@ def _remote(self, args=None, kwargs=None, **task_options): num_returns = task_options["num_returns"] if num_returns == "dynamic": num_returns = -1 + max_retries = task_options["max_retries"] retry_exceptions = task_options["retry_exceptions"] if isinstance(retry_exceptions, (list, tuple)): diff --git a/python/ray/tests/test_generators.py b/python/ray/tests/test_generators.py index 64cd59d6002a6..9284c6a3f8c3d 100644 --- a/python/ray/tests/test_generators.py +++ b/python/ray/tests/test_generators.py @@ -264,7 +264,7 @@ def read(gen): gen = ray.get( remote_generator_fn.options(num_returns="dynamic").remote(0, store_in_plasma) ) - assert len(gen) == 0 + assert len(list(gen)) == 0 # Check that passing as task arg. gen = remote_generator_fn.options(num_returns="dynamic").remote(10, store_in_plasma) @@ -284,7 +284,9 @@ def static(num_returns): return list(range(num_returns)) with pytest.raises(ray.exceptions.RayTaskError): - ray.get(static.remote(3)) + gen = ray.get(static.remote(3)) + for ref in gen: + ray.get(ref) def test_dynamic_generator_distributed(ray_start_cluster): @@ -535,7 +537,7 @@ def maybe_empty_generator(exec_counter): @ray.remote def check(empty_generator): - return len(empty_generator) == 0 + return len(list(empty_generator)) == 0 exec_counter = ExecutionCounter.remote() gen = maybe_empty_generator.remote(exec_counter) diff --git a/src/ray/common/status.h b/src/ray/common/status.h index bda9860ddc4a5..25d9befdfd089 100644 --- a/src/ray/common/status.h +++ b/src/ray/common/status.h @@ -114,7 +114,8 @@ enum class StatusCode : char { OutOfDisk = 28, ObjectUnknownOwner = 29, RpcError = 30, - OutOfResource = 31 + OutOfResource = 31, + ObjectRefStreamEoF = 32 }; #if defined(__clang__) @@ -146,6 +147,10 @@ class RAY_EXPORT Status { return Status(StatusCode::KeyError, msg); } + static Status ObjectRefStreamEoF(const std::string &msg) { + return Status(StatusCode::ObjectRefStreamEoF, msg); + } + static Status TypeError(const std::string &msg) { return Status(StatusCode::TypeError, msg); } @@ -254,6 +259,7 @@ class RAY_EXPORT Status { bool IsOutOfMemory() const { return code() == StatusCode::OutOfMemory; } bool IsOutOfDisk() const { return code() == StatusCode::OutOfDisk; } bool IsKeyError() const { return code() == StatusCode::KeyError; } + bool IsObjectRefStreamEoF() const { return code() == StatusCode::ObjectRefStreamEoF; } bool IsInvalid() const { return code() == StatusCode::Invalid; } bool IsIOError() const { return code() == StatusCode::IOError; } bool IsTypeError() const { return code() == StatusCode::TypeError; } diff --git a/src/ray/core_worker/common.cc b/src/ray/core_worker/common.cc index e0849c29ec1f2..0f640e154bc37 100644 --- a/src/ray/core_worker/common.cc +++ b/src/ray/core_worker/common.cc @@ -49,5 +49,35 @@ std::string GenerateCachedActorName(const std::string &ns, return ns + "-" + actor_name; } +void SerializeReturnObject(const ObjectID &object_id, + const std::shared_ptr &return_object, + rpc::ReturnObject *return_object_proto) { + return_object_proto->set_object_id(object_id.Binary()); + + if (!return_object) { + // This should only happen if the local raylet died. Caller should + // retry the task. + RAY_LOG(WARNING) << "Failed to create task return object " << object_id + << " in the object store, exiting."; + QuickExit(); + } + return_object_proto->set_size(return_object->GetSize()); + if (return_object->GetData() != nullptr && return_object->GetData()->IsPlasmaBuffer()) { + return_object_proto->set_in_plasma(true); + } else { + if (return_object->GetData() != nullptr) { + return_object_proto->set_data(return_object->GetData()->Data(), + return_object->GetData()->Size()); + } + if (return_object->GetMetadata() != nullptr) { + return_object_proto->set_metadata(return_object->GetMetadata()->Data(), + return_object->GetMetadata()->Size()); + } + } + for (const auto &nested_ref : return_object->GetNestedRefs()) { + return_object_proto->add_nested_inlined_refs()->CopyFrom(nested_ref); + } +} + } // namespace core } // namespace ray diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 9ca7daa2a9504..86d7499b0f4bb 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -21,6 +21,7 @@ #include "ray/common/task/task_spec.h" #include "ray/raylet_client/raylet_client.h" #include "ray/util/util.h" +#include "src/ray/protobuf/common.pb.h" namespace ray { namespace core { @@ -37,6 +38,10 @@ std::string LanguageString(Language language); // `namespace-[job_id-]actor_name` std::string GenerateCachedActorName(const std::string &ns, const std::string &actor_name); +void SerializeReturnObject(const ObjectID &object_id, + const std::shared_ptr &return_object, + rpc::ReturnObject *return_object_proto); + /// Information about a remote function. class RayFunction { public: diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 1d0f313c527e0..f73bc01a470d4 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2613,6 +2613,9 @@ Status CoreWorker::ExecuteTask( dynamic_return_objects = NULL; } else if (task_spec.AttemptNumber() > 0) { for (const auto &dynamic_return_id : task_spec.DynamicReturnIds()) { + // Increase the put index so that when the generator creates a new obj + // the object id won't conflict. + worker_context_.GetNextPutIndex(); dynamic_return_objects->push_back( std::make_pair<>(dynamic_return_id, std::shared_ptr())); RAY_LOG(DEBUG) << "Re-executed task " << task_spec.TaskId() @@ -2829,6 +2832,59 @@ ObjectID CoreWorker::AllocateDynamicReturnId() { return return_id; } +Status CoreWorker::ReportGeneratorItemReturns( + const std::pair> &dynamic_return_object, + const ObjectID &generator_id, + const rpc::Address &caller_address, + int64_t item_index, + bool finished) { + RAY_LOG(DEBUG) << "Write the object ref stream, index: " << item_index + << " finished: " << finished << ", id: " << dynamic_return_object.first; + rpc::ReportGeneratorItemReturnsRequest request; + request.mutable_worker_addr()->CopyFrom(rpc_address_); + request.set_item_index(item_index); + request.set_finished(finished); + request.set_generator_id(generator_id.Binary()); + auto client = core_worker_client_pool_->GetOrConnect(caller_address); + + if (!dynamic_return_object.first.IsNil()) { + RAY_CHECK_EQ(finished, false); + auto return_object_proto = request.add_dynamic_return_objects(); + SerializeReturnObject( + dynamic_return_object.first, dynamic_return_object.second, return_object_proto); + std::vector deleted; + // When we allocate a dynamic return ID (AllocateDynamicReturnId), + // we borrow the object. When the object value is allocatd, the + // memory store is updated. We should clear borrowers and memory store + // here. + ReferenceCounter::ReferenceTableProto borrowed_refs; + reference_counter_->PopAndClearLocalBorrowers( + {dynamic_return_object.first}, &borrowed_refs, &deleted); + memory_store_->Delete(deleted); + } else { + // fininshed must be set when dynamic_return_object is nil. + RAY_CHECK_EQ(finished, true); + } + + client->ReportGeneratorItemReturns( + request, + [](const Status &status, const rpc::ReportGeneratorItemReturnsReply &reply) { + if (!status.ok()) { + // TODO(sang): Handle network error more gracefully. + RAY_LOG(ERROR) << "Failed to send the object ref."; + } + }); + return Status::OK(); +} + +void CoreWorker::HandleReportGeneratorItemReturns( + rpc::ReportGeneratorItemReturnsRequest request, + rpc::ReportGeneratorItemReturnsReply *reply, + rpc::SendReplyCallback send_reply_callback) { + task_manager_->HandleReportGeneratorItemReturns(request); + send_reply_callback(Status::OK(), nullptr, nullptr); +} + std::vector CoreWorker::ExecuteTaskLocalMode( const TaskSpecification &task_spec, const ActorID &actor_id) { auto resource_ids = std::make_shared(); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 3ca65a09594ed..5c68373d04e03 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -409,6 +409,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { std::vector deleted; reference_counter_->RemoveLocalReference(object_id, &deleted); // TOOD(ilr): better way of keeping an object from being deleted + // TODO(sang): This seems bad... We should delete the memory store + // properly from reference counter. if (!options_.is_local_mode) { memory_store_->Delete(deleted); } @@ -704,6 +706,48 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Trigger garbage collection on each worker in the cluster. void TriggerGlobalGC(); + /// Report the task caller at caller_address that the intermediate + /// task return. It means if this API is used, the caller will be notified + /// the task return before the current task is terminated. The caller must + /// implement HandleReportGeneratorItemReturns API endpoint + /// to handle the intermediate result report. + /// This API makes sense only for a generator task + /// (task that can return multiple intermediate + /// result before the task terminates). + /// + /// NOTE: The API doesn't guarantee the ordering of the report. The + /// caller is supposed to reorder the report based on the item_index. + /// + /// \param[in] dynamic_return_object A intermediate ray object to report + /// to the caller before the task terminates. This object must have been + /// created dynamically from this worker via AllocateReturnObject. + /// If the Object ID is nil, it means it is the end of the task return. + /// In this case, the caller is responsible for setting finished = true, + /// otherwise it will panic. + /// \param[in] generator_id The return object ref ID from a current generator + /// task. + /// \param[in] caller_address The address of the caller of the current task + /// that created a generator_id. + /// \param[in] item_index The index of the task return. It is used to reorder the + /// report from the caller side. + /// \param[in] finished True indicates there's going to be no more intermediate + /// task return. When finished is provided dynamic_return_object's key must be + /// pair + Status ReportGeneratorItemReturns( + const std::pair> &dynamic_return_object, + const ObjectID &generator_id, + const rpc::Address &caller_address, + int64_t item_index, + bool finished); + + /// Implements gRPC server handler. + /// If an executor can generator task return before the task is finished, + /// it invokes this endpoint via ReportGeneratorItemReturns RPC. + void HandleReportGeneratorItemReturns( + rpc::ReportGeneratorItemReturnsRequest request, + rpc::ReportGeneratorItemReturnsReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + /// Get a string describing object store memory usage for debugging purposes. /// /// \return std::string The string describing memory usage. diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index ba5321828207a..cadb8b40e5889 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -239,6 +239,40 @@ void ReferenceCounter::AddDynamicReturn(const ObjectID &object_id, AddNestedObjectIdsInternal(generator_id, {object_id}, owner_address); } +void ReferenceCounter::OwnDynamicStreamingTaskReturnRef(const ObjectID &object_id, + const ObjectID &generator_id) { + absl::MutexLock lock(&mutex_); + // NOTE: The upper layer (the layer that manges the object ref stream) + // should make sure the generator ref is not GC'ed until the + // stream is deleted. + auto outer_it = object_id_refs_.find(generator_id); + if (outer_it == object_id_refs_.end()) { + // Generator object already went out of scope. + // It means the generator is already GC'ed. No need to + // update the reference. + RAY_LOG(DEBUG) + << "Ignore OwnDynamicStreamingTaskReturnRef. The dynamic return reference " + << object_id << " is registered after the generator id " << generator_id + << " went out of scope."; + return; + } + RAY_LOG(DEBUG) << "Adding dynamic return " << object_id + << " contained in generator object " << generator_id; + RAY_CHECK(outer_it->second.owned_by_us); + RAY_CHECK(outer_it->second.owner_address.has_value()); + rpc::Address owner_address(outer_it->second.owner_address.value()); + // We add a local reference here. The ref removal will be handled + // by the ObjectRefStream. + RAY_UNUSED(AddOwnedObjectInternal(object_id, + {}, + owner_address, + outer_it->second.call_site, + /*object_size=*/-1, + outer_it->second.is_reconstructable, + /*add_local_ref=*/true, + absl::optional())); +} + bool ReferenceCounter::AddOwnedObjectInternal( const ObjectID &object_id, const std::vector &inner_ids, @@ -382,7 +416,7 @@ void ReferenceCounter::UpdateSubmittedTaskReferences( std::vector *deleted) { absl::MutexLock lock(&mutex_); for (const auto &return_id : return_ids) { - UpdateObjectPendingCreation(return_id, true); + UpdateObjectPendingCreationInternal(return_id, true); } for (const ObjectID &argument_id : argument_ids_to_add) { RAY_LOG(DEBUG) << "Increment ref count for submitted task argument " << argument_id; @@ -411,7 +445,7 @@ void ReferenceCounter::UpdateResubmittedTaskReferences( const std::vector return_ids, const std::vector &argument_ids) { absl::MutexLock lock(&mutex_); for (const auto &return_id : return_ids) { - UpdateObjectPendingCreation(return_id, true); + UpdateObjectPendingCreationInternal(return_id, true); } for (const ObjectID &argument_id : argument_ids) { auto it = object_id_refs_.find(argument_id); @@ -433,7 +467,7 @@ void ReferenceCounter::UpdateFinishedTaskReferences( std::vector *deleted) { absl::MutexLock lock(&mutex_); for (const auto &return_id : return_ids) { - UpdateObjectPendingCreation(return_id, false); + UpdateObjectPendingCreationInternal(return_id, false); } // Must merge the borrower refs before decrementing any ref counts. This is // to make sure that for serialized IDs, we increment the borrower count for @@ -1278,8 +1312,8 @@ void ReferenceCounter::RemoveObjectLocationInternal(ReferenceTable::iterator it, PushToLocationSubscribers(it); } -void ReferenceCounter::UpdateObjectPendingCreation(const ObjectID &object_id, - bool pending_creation) { +void ReferenceCounter::UpdateObjectPendingCreationInternal(const ObjectID &object_id, + bool pending_creation) { auto it = object_id_refs_.find(object_id); bool push = false; if (it != object_id_refs_.end()) { @@ -1439,6 +1473,11 @@ bool ReferenceCounter::IsObjectReconstructable(const ObjectID &object_id, return it->second.is_reconstructable; } +void ReferenceCounter::UpdateObjectReady(const ObjectID &object_id) { + absl::MutexLock lock(&mutex_); + UpdateObjectPendingCreationInternal(object_id, /*pending_creation*/ false); +} + bool ReferenceCounter::IsObjectPendingCreation(const ObjectID &object_id) const { absl::MutexLock lock(&mutex_); auto it = object_id_refs_.find(object_id); diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index c16ee03921190..894b426a9d97b 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -201,6 +201,28 @@ class ReferenceCounter : public ReferenceCounterInterface, void AddDynamicReturn(const ObjectID &object_id, const ObjectID &generator_id) LOCKS_EXCLUDED(mutex_); + /// Own an object that the current owner (current process) dynamically created. + /// + /// The API is idempotent. + /// + /// TODO(sang): This API should be merged with AddDynamicReturn when + /// we turn on streaming generator by default. + /// + /// For normal task return, the owner creates and owns the references before + /// the object values are created. However, when you dynamically create objects, + /// the owner doesn't know (i.e., own) the references until it is reported from + /// the executor side. + /// + /// This API is used to own this type of dynamically generated references. + /// The executor should ensure the objects are not GC'ed until the owner + /// registers the dynamically created references by this API. + /// + /// \param[in] object_id The ID of the object that we now own. + /// \param[in] generator_id The Object ID of the streaming generator task. + void OwnDynamicStreamingTaskReturnRef(const ObjectID &object_id, + const ObjectID &generator_id) + LOCKS_EXCLUDED(mutex_); + /// Update the size of the object. /// /// \param[in] object_id The ID of the object. @@ -510,6 +532,9 @@ class ReferenceCounter : public ReferenceCounterInterface, /// \param[in] min_bytes_to_evict The minimum number of bytes to evict. int64_t EvictLineage(int64_t min_bytes_to_evict); + /// Update that the object is ready to be fetched. + void UpdateObjectReady(const ObjectID &object_id); + /// Whether the object is pending creation (the task that creates it is /// scheduled/executing). bool IsObjectPendingCreation(const ObjectID &object_id) const; @@ -915,7 +940,8 @@ class ReferenceCounter : public ReferenceCounterInterface, void RemoveObjectLocationInternal(ReferenceTable::iterator it, const NodeID &node_id) EXCLUSIVE_LOCKS_REQUIRED(mutex_); - void UpdateObjectPendingCreation(const ObjectID &object_id, bool pending_creation) + void UpdateObjectPendingCreationInternal(const ObjectID &object_id, + bool pending_creation) EXCLUSIVE_LOCKS_REQUIRED(mutex_); /// Publish object locations to all subscribers. diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index f5de3de65cc3c..33f14580d5e09 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -30,6 +30,73 @@ const int64_t kTaskFailureThrottlingThreshold = 50; // Throttle task failure logs to once this interval. const int64_t kTaskFailureLoggingFrequencyMillis = 5000; +Status ObjectRefStream::TryReadNextItem(ObjectID *object_id_out) { + bool is_eof_set = end_of_stream_index_ != -1; + if (is_eof_set && next_index_ >= end_of_stream_index_) { + // next_index_ cannot be bigger than end_of_stream_index_. + RAY_CHECK(next_index_ == end_of_stream_index_); + RAY_LOG(DEBUG) << "ObjectRefStream of an id " << generator_id_ + << " has no more objects."; + *object_id_out = ObjectID::Nil(); + return Status::ObjectRefStreamEoF(""); + } + + auto it = item_index_to_refs_.find(next_index_); + if (it != item_index_to_refs_.end()) { + // If the current index has been written, + // return the object ref. + // The returned object ref will always have a ref count of 1. + // The caller of this API is supposed to remove the reference + // when the obtained object id goes out of scope. + *object_id_out = it->second; + next_index_ += 1; + RAY_LOG_EVERY_MS(DEBUG, 10000) << "Get the next object id " << *object_id_out + << " generator id: " << generator_id_; + } else { + // If the current index hasn't been written, return nothing. + // The caller is supposed to retry. + RAY_LOG_EVERY_MS(DEBUG, 10000) + << "Object not available. Current index: " << next_index_ + << " end_of_stream_index_: " << end_of_stream_index_ + << " generator id: " << generator_id_; + *object_id_out = ObjectID::Nil(); + } + return Status::OK(); +} + +bool ObjectRefStream::InsertToStream(const ObjectID &object_id, int64_t item_index) { + if (end_of_stream_index_ != -1) { + RAY_CHECK(next_index_ <= end_of_stream_index_); + } + + if (item_index < next_index_) { + // Index is already used. Don't write it to the stream. + return false; + } + + auto it = item_index_to_refs_.find(item_index); + if (it != item_index_to_refs_.end()) { + // It means the when a task is retried it returns a different object id + // for the same index, which means the task was not deterministic. + // Fail the owner if it happens. + // It can happen if the second try is none determinstic and + // execute more ray.put, which modifies the put index that + // can return a different object ref. + RAY_CHECK_EQ(object_id, it->second) + << "The task has been retried with none deterministic task return ids. Previous " + "return id: " + << it->second << ". New task return id: " << object_id + << ". It means a undeterministic task has been retried. Disable the retry " + "feature using `max_retries=0` (task) or `max_task_retries=0` (actor)."; + } + item_index_to_refs_.emplace(item_index, object_id); + return true; +} + +void ObjectRefStream::MarkEndOfStream(int64_t item_index) { + end_of_stream_index_ = item_index; +} + std::vector TaskManager::AddPendingTask( const rpc::Address &caller_address, const TaskSpecification &spec, @@ -300,6 +367,152 @@ bool TaskManager::HandleTaskReturn(const ObjectID &object_id, return direct_return; } +void TaskManager::CreateObjectRefStream(const ObjectID &generator_id) { + RAY_LOG(DEBUG) << "Create an object ref stream of an id " << generator_id; + absl::MutexLock lock(&mu_); + auto it = object_ref_streams_.find(generator_id); + RAY_CHECK(it == object_ref_streams_.end()) + << "CreateObjectRefStream can be called only once. The caller of the API should " + "guarantee the API is not called twice."; + object_ref_streams_.emplace(generator_id, ObjectRefStream(generator_id)); +} + +void TaskManager::DelObjectRefStream(const ObjectID &generator_id) { + RAY_LOG(DEBUG) << "Deleting an object ref stream of an id " << generator_id; + std::vector object_ids_unconsumed; + + { + absl::MutexLock lock(&mu_); + auto it = object_ref_streams_.find(generator_id); + if (it == object_ref_streams_.end()) { + return; + } + + while (true) { + ObjectID object_id; + const auto &status = TryReadObjectRefStreamInternal(generator_id, &object_id); + + // keyError means the stream reaches to EoF. + if (status.IsObjectRefStreamEoF()) { + break; + } + + if (object_id == ObjectID::Nil()) { + // No more objects to obtain. Stop iteration. + break; + } else { + // It means the object hasn't been consumed. + // We should remove references since we have 1 reference to this object. + object_ids_unconsumed.push_back(object_id); + } + } + + object_ref_streams_.erase(generator_id); + } + + // When calling RemoveLocalReference, we shouldn't hold a lock. + for (const auto &object_id : object_ids_unconsumed) { + std::vector deleted; + reference_counter_->RemoveLocalReference(object_id, &deleted); + RAY_CHECK_EQ(deleted.size(), 1UL); + } +} + +Status TaskManager::TryReadObjectRefStreamInternal(const ObjectID &generator_id, + ObjectID *object_id_out) { + RAY_CHECK(object_id_out != nullptr); + auto stream_it = object_ref_streams_.find(generator_id); + RAY_CHECK(stream_it != object_ref_streams_.end()) + << "TryReadObjectRefStreamInternal API can be used only when the stream has been " + "created " + "and not removed."; + const auto &status = stream_it->second.TryReadNextItem(object_id_out); + return status; +} + +Status TaskManager::TryReadObjectRefStream(const ObjectID &generator_id, + ObjectID *object_id_out) { + absl::MutexLock lock(&mu_); + return TryReadObjectRefStreamInternal(generator_id, object_id_out); +} + +bool TaskManager::ObjectRefStreamExists(const ObjectID &generator_id) { + absl::MutexLock lock(&mu_); + auto it = object_ref_streams_.find(generator_id); + return it != object_ref_streams_.end(); +} + +bool TaskManager::HandleReportGeneratorItemReturns( + const rpc::ReportGeneratorItemReturnsRequest &request) { + const auto &generator_id = ObjectID::FromBinary(request.generator_id()); + const auto &task_id = generator_id.TaskId(); + int64_t item_index = request.item_index(); + // Every generated object has the same task id. + RAY_LOG(DEBUG) << "Received an intermediate result of index " << item_index + << " generator_id: " << generator_id; + { + absl::MutexLock lock(&mu_); + auto stream_it = object_ref_streams_.find(generator_id); + if (stream_it == object_ref_streams_.end()) { + // SANG-TODO add an unit test. + // Stream has been already deleted. Do not handle it. + return false; + } + + if (request.finished()) { + RAY_LOG(DEBUG) << "Write EoF to the object ref stream. Index: " << item_index; + stream_it->second.MarkEndOfStream(item_index); + RAY_CHECK(request.dynamic_return_objects_size() == 0); + return true; + } + } + + // Handle the intermediate values. + // NOTE: Until we support the retry, this is always empty return value. + const auto store_in_plasma_ids = GetTaskReturnObjectsToStoreInPlasma(task_id); + + // TODO(sang): Support the regular return values as well. + size_t num_objects_written = 0; + for (const auto &return_object : request.dynamic_return_objects()) { + const auto object_id = ObjectID::FromBinary(return_object.object_id()); + RAY_LOG(DEBUG) << "Write an object " << object_id + << " to the object ref stream of id " << generator_id; + bool index_not_used_yet = false; + { + absl::MutexLock lock(&mu_); + auto stream_it = object_ref_streams_.find(generator_id); + if (stream_it != object_ref_streams_.end()) { + index_not_used_yet = stream_it->second.InsertToStream(object_id, item_index); + } + // TODO(sang): Update the reconstruct ids and task spec + // when we support retry. + } + // If the ref was written to a stream, we should also + // own the dynamically generated task return. + // NOTE: If we call this method while holding a lock, it can deadlock. + if (index_not_used_yet) { + reference_counter_->OwnDynamicStreamingTaskReturnRef(object_id, generator_id); + num_objects_written += 1; + } + // When an object is reported, the object is ready to be fetched. + // TODO(sang): It is possible this invairant is not true + // if tasks can be retried. For example, imagine the intermediate + // task return is reported after a task is resubmitted. + // It is okay now because we don't support retry yet. But when + // we support retry, we should guarantee it is not called + // after the task resubmission. We can do it by guaranteeing + // HandleReportGeneratorItemReturns is not called after the task + // CompletePendingTask. + reference_counter_->UpdateObjectReady(object_id); + HandleTaskReturn(object_id, + return_object, + NodeID::FromBinary(request.worker_addr().raylet_id()), + /*store_in_plasma*/ store_in_plasma_ids.count(object_id)); + } + + return num_objects_written != 0; +} + void TaskManager::CompletePendingTask(const TaskID &task_id, const rpc::PushTaskReply &reply, const rpc::Address &worker_addr, @@ -710,8 +923,12 @@ absl::flat_hash_set TaskManager::GetTaskReturnObjectsToStoreInPlasma( absl::flat_hash_set store_in_plasma_ids = {}; absl::MutexLock lock(&mu_); auto it = submissible_tasks_.find(task_id); - RAY_CHECK(it != submissible_tasks_.end()) - << "Tried to store return values for task that was not pending " << task_id; + if (it == submissible_tasks_.end()) { + // When a generator task is used, it is possible + // this API is used after the task has been removed + // from submissible_tasks_. Do nothing in this case. + return {}; + } first_execution = it->second.num_successful_executions == 0; if (!first_execution) { store_in_plasma_ids = it->second.reconstructable_return_ids; diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 0ab8621368d6d..31b039711f721 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -87,6 +87,53 @@ using PushErrorCallback = std::function; +/// When the streaming generator tasks are submitted, +/// the intermediate return objects are streamed +/// back to the task manager. +/// This class manages the references of intermediately +/// streamed object references. +/// The API is not thread-safe. +class ObjectRefStream { + public: + ObjectRefStream(const ObjectID &generator_id) : generator_id_(generator_id) {} + + /// Asynchronously read object reference of the next index. + /// + /// \param[out] object_id_out The next object ID from the stream. + /// Nil ID is returned if the next index hasn't been written. + /// \return KeyError if it reaches to EoF. Ok otherwise. + Status TryReadNextItem(ObjectID *object_id_out); + + /// Insert the object id to the stream of an index item_index. + /// + /// If the item_index has been already read (by TryReadNextItem), + /// the write request will be ignored. If the item_index has been + /// already written, it will be no-op. It doesn't override. + /// + /// \param[in] object_id The object id that will be read at index item_index. + /// \param[in] item_index The index where the object id will be written. + /// \return True if the idx hasn't been used. False otherwise. + bool InsertToStream(const ObjectID &object_id, int64_t item_index); + + /// Mark the stream canont be used anymore. + /// + /// \param[in] The last item index that means the end of stream. + void MarkEndOfStream(int64_t item_index); + + private: + const ObjectID generator_id_; + + /// The item_index -> object reference ids. + absl::flat_hash_map item_index_to_refs_; + /// The last index of the stream. + /// item_index < last will contain object references. + /// If -1, that means the stream hasn't reached to EoF. + int64_t end_of_stream_index_ = -1; + /// The next index of the stream. + /// If next_index_ == end_of_stream_index_, that means it is the end of the stream. + int64_t next_index_ = 0; +}; + class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterface { public: TaskManager(std::shared_ptr in_memory_store, @@ -167,6 +214,57 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa const rpc::Address &worker_addr, bool is_application_error) override; + /// Handle the task return reported before the task terminates. + /// + /// \return True if a task return is registered. False otherwise. + bool HandleReportGeneratorItemReturns( + const rpc::ReportGeneratorItemReturnsRequest &request); + + /// Delete the object ref stream. + /// + /// Once the stream is deleted, it will clean up all unconsumed + /// object references, and all the future intermediate report + /// will be ignored. + /// + /// This method is idempotent. It is because the language + /// frontend often calls this method upon destructor, but + /// not every langauge guarantees the destructor is called + /// only once. + /// + /// \param[in] generator_id The object ref id of the streaming + /// generator task. + void DelObjectRefStream(const ObjectID &generator_id); + + /// Create the object ref stream. + /// If the object ref stream is not created by this API, + /// all object ref stream operation will be no-op. + /// Once the stream is created, it has to be deleted + /// by DelObjectRefStream when it is not used anymore. + /// The API is not idempotent. + /// + /// \param[in] generator_id The object ref id of the streaming + /// generator task. + void CreateObjectRefStream(const ObjectID &generator_id); + + /// Return true if the object ref stream exists. + /// + /// \param[in] generator_id The object ref id of the streaming + /// generator task. + bool ObjectRefStreamExists(const ObjectID &generator_id); + + /// Asynchronously read object reference of the next index from the + /// object stream of a generator_id. + /// + /// The caller should ensure the ObjectRefStream is already created + /// via CreateObjectRefStream. + /// If it is called after the stream hasn't been created or deleted + /// it will panic. + /// + /// \param[out] object_id_out The next object ID from the stream. + /// Nil ID is returned if the next index hasn't been written. + /// \return KeyError if it reaches to EoF. Ok otherwise. + Status TryReadObjectRefStream(const ObjectID &generator_id, ObjectID *object_id_out); + /// Returns true if task can be retried. /// /// \param[in] task_id ID of the task to be retried. @@ -459,17 +557,19 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa const rpc::Address &worker_addr, const ReferenceCounter::ReferenceTableProto &borrowed_refs); - // Get the objects that were stored in plasma upon the first successful - // execution of this task. If the task is re-executed, these objects should - // get stored in plasma again, even if they are small and were returned - // directly in the worker's reply. This ensures that any reference holders - // that are already scheduled at the raylet can retrieve these objects - // through plasma. - // \param[in] task_id The task ID. - // \param[out] first_execution Whether the task has been successfully - // executed before. If this is false, then the objects to store in plasma - // will be empty. - // \param [out] Return objects that should be stored in plasma. + /// Get the objects that were stored in plasma upon the first successful + /// execution of this task. If the task is re-executed, these objects should + /// get stored in plasma again, even if they are small and were returned + /// directly in the worker's reply. This ensures that any reference holders + /// that are already scheduled at the raylet can retrieve these objects + /// through plasma. + /// + /// \param[in] task_id The task ID. + /// \param[out] first_execution Whether the task has been successfully + /// executed before. If this is false, then the objects to store in plasma + /// will be empty. + /// \param [out] Return objects that should be stored in plasma. If the + /// task has been already terminated, it returns an empty set. absl::flat_hash_set GetTaskReturnObjectsToStoreInPlasma( const TaskID &task_id, bool *first_execution = nullptr) const LOCKS_EXCLUDED(mu_); @@ -505,6 +605,10 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// \param task_entry Task entry for the corresponding task attempt void MarkTaskRetryOnFailed(TaskEntry &task_entry, const rpc::RayErrorInfo &error_info); + Status TryReadObjectRefStreamInternal(const ObjectID &generator_id, + ObjectID *object_id_out) + EXCLUSIVE_LOCKS_REQUIRED(mu_); + /// Used to store task results. std::shared_ptr in_memory_store_; @@ -513,6 +617,9 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// submitted tasks (dependencies and return objects). std::shared_ptr reference_counter_; + /// Mapping from a streaming generator task id -> object ref stream. + absl::flat_hash_map object_ref_streams_ GUARDED_BY(mu_); + /// Callback to store objects in plasma. This is used for objects that were /// originally stored in plasma. During reconstruction, we ensure that these /// objects get stored in plasma again so that any reference holders can diff --git a/src/ray/core_worker/test/reference_count_test.cc b/src/ray/core_worker/test/reference_count_test.cc index 51b5d51523ac2..ee26467516bc3 100644 --- a/src/ray/core_worker/test/reference_count_test.cc +++ b/src/ray/core_worker/test/reference_count_test.cc @@ -2946,6 +2946,57 @@ TEST_F(ReferenceCountTest, TestForwardNestedRefs) { borrower2->rc_.RemoveLocalReference(inner_id, nullptr); } +TEST_F(ReferenceCountTest, TestOwnDynamicStreamingTaskReturnRef) { + auto object_id = ObjectID::FromRandom(); + auto generator_id = ObjectID::FromRandom(); + auto generator_id_2 = ObjectID::FromRandom(); + rpc::Address added_address; + + // Verify OwnDynamicStreamingTaskReturnRef is ignored + // when there's no generator id. + rc->OwnDynamicStreamingTaskReturnRef(object_id, generator_id); + ASSERT_FALSE(rc->GetOwner(generator_id, &added_address)); + ASSERT_FALSE(rc->GetOwner(object_id, &added_address)); + ASSERT_FALSE(rc->HasReference(object_id)); + ASSERT_FALSE(rc->HasReference(generator_id)); + + // Add a generator id. + rpc::Address address; + address.set_ip_address("1234"); + rc->AddOwnedObject(generator_id, {}, address, "", 0, false, /*add_local_ref=*/true); + ASSERT_TRUE(rc->HasReference(generator_id)); + + // Verify object id is not registered if the incorrect generator id is given. + rc->OwnDynamicStreamingTaskReturnRef(object_id, generator_id_2); + ASSERT_FALSE(rc->HasReference(object_id)); + + // Verify object is owned. + rc->OwnDynamicStreamingTaskReturnRef(object_id, generator_id); + ASSERT_TRUE(rc->HasReference(object_id)); + // Verify the number of objects: Generator + object. + ASSERT_EQ(rc->NumObjectIDsInScope(), 2); + // Verify it is owned by us. + ASSERT_TRUE(rc->GetOwner(object_id, &added_address)); + ASSERT_EQ(address.ip_address(), added_address.ip_address()); + // Verify it had 1 local reference. + std::vector deleted; + rc->RemoveLocalReference(object_id, &deleted); + ASSERT_EQ(rc->NumObjectIDsInScope(), 1); + ASSERT_EQ(deleted.size(), 1); + ASSERT_FALSE(rc->GetOwner(object_id, &added_address)); + + // Remove the generator. + rc->RemoveLocalReference(generator_id, nullptr); + ASSERT_EQ(rc->NumObjectIDsInScope(), 0); + ASSERT_FALSE(rc->GetOwner(generator_id, &added_address)); + + // Verify we cannot register a new object after the generator id is removed. + auto object_id_2 = ObjectID::FromRandom(); + rc->OwnDynamicStreamingTaskReturnRef(object_id_2, generator_id); + ASSERT_FALSE(rc->GetOwner(object_id_2, &added_address)); + ASSERT_FALSE(rc->HasReference(object_id_2)); +} + } // namespace core } // namespace ray diff --git a/src/ray/core_worker/test/task_manager_test.cc b/src/ray/core_worker/test/task_manager_test.cc index 94d7749f521a5..d6daeebbdad8f 100644 --- a/src/ray/core_worker/test/task_manager_test.cc +++ b/src/ray/core_worker/test/task_manager_test.cc @@ -51,6 +51,37 @@ rpc::Address GetRandomWorkerAddr() { return addr; } +rpc::ReportGeneratorItemReturnsRequest GetIntermediateTaskReturn( + int64_t idx, + bool finished, + const ObjectID &generator_id, + const ObjectID &dynamic_return_id, + std::shared_ptr data, + bool set_in_plasma) { + rpc::ReportGeneratorItemReturnsRequest request; + rpc::Address addr; + request.mutable_worker_addr()->CopyFrom(addr); + request.set_item_index(idx); + request.set_finished(finished); + request.set_generator_id(generator_id.Binary()); + auto dynamic_return_object = request.add_dynamic_return_objects(); + dynamic_return_object->set_object_id(dynamic_return_id.Binary()); + dynamic_return_object->set_data(data->Data(), data->Size()); + dynamic_return_object->set_in_plasma(set_in_plasma); + return request; +} + +rpc::ReportGeneratorItemReturnsRequest GetEoFTaskReturn(int64_t idx, + const ObjectID &generator_id) { + rpc::ReportGeneratorItemReturnsRequest request; + rpc::Address addr; + request.mutable_worker_addr()->CopyFrom(addr); + request.set_item_index(idx); + request.set_finished(true); + request.set_generator_id(generator_id.Binary()); + return request; +} + class MockTaskEventBuffer : public worker::TaskEventBuffer { public: MOCK_METHOD(void, @@ -73,7 +104,8 @@ class TaskManagerTest : public ::testing::Test { public: TaskManagerTest(bool lineage_pinning_enabled = false, int64_t max_lineage_bytes = 1024 * 1024 * 1024) - : addr_(GetRandomWorkerAddr()), + : lineage_pinning_enabled_(lineage_pinning_enabled), + addr_(GetRandomWorkerAddr()), publisher_(std::make_shared()), subscriber_(std::make_shared()), task_event_buffer_mock_(std::make_unique()), @@ -113,6 +145,7 @@ class TaskManagerTest : public ::testing::Test { ASSERT_EQ(manager_.total_lineage_footprint_bytes_, 0); } + bool lineage_pinning_enabled_; rpc::Address addr_; std::shared_ptr publisher_; std::shared_ptr subscriber_; @@ -1145,6 +1178,501 @@ TEST_F(TaskManagerLineageTest, TestResubmittedDynamicReturnsTaskFails) { ASSERT_EQ(stored_in_plasma.size(), 3); } +TEST_F(TaskManagerTest, TestObjectRefStreamCreateDelete) { + /** + * Test create and deletion of stream works. + * CREATE EXISTS (true) DELETE EXISTS (false) + */ + auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true); + auto generator_id = spec.ReturnId(0); + manager_.CreateObjectRefStream(generator_id); + ASSERT_TRUE(manager_.ObjectRefStreamExists(generator_id)); + manager_.DelObjectRefStream(generator_id); + ASSERT_FALSE(manager_.ObjectRefStreamExists(generator_id)); + // Test DelObjectRefStream is idempotent + manager_.DelObjectRefStream(generator_id); + manager_.DelObjectRefStream(generator_id); + manager_.DelObjectRefStream(generator_id); + manager_.DelObjectRefStream(generator_id); + ASSERT_FALSE(manager_.ObjectRefStreamExists(generator_id)); +} + +TEST_F(TaskManagerTest, TestObjectRefStreamDeletedStreamIgnored) { + /** + * Test that when DELETE is called, all subsequent Writes are ignored. + * CREATE DELETE WRITE READ + */ + auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true); + auto generator_id = spec.ReturnId(0); + manager_.CreateObjectRefStream(generator_id); + manager_.DelObjectRefStream(generator_id); + ASSERT_FALSE(manager_.ObjectRefStreamExists(generator_id)); + + auto dynamic_return_id = ObjectID::FromIndex(spec.TaskId(), 2); + auto data = GenerateRandomBuffer(); + + // WRITE + auto req = GetIntermediateTaskReturn( + /*idx*/ 0, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id, + /*data*/ data, + /*set_in_plasma*/ false); + ASSERT_FALSE(manager_.HandleReportGeneratorItemReturns(req)); +} + +TEST_F(TaskManagerTest, TestObjectRefStreamBasic) { + /** + * Test the basic cases (write -> read). + * CREATE WRITE, WRITE, WRITEEoF, READ, READ, KeyERROR DELETE + */ + auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true); + auto generator_id = spec.ReturnId(0); + // CREATE + manager_.CreateObjectRefStream(generator_id); + + auto last_idx = 2; + std::vector dynamic_return_ids; + std::vector> datas; + for (auto i = 0; i < last_idx; i++) { + auto dynamic_return_id = ObjectID::FromIndex(spec.TaskId(), i + 2); + dynamic_return_ids.push_back(dynamic_return_id); + auto data = GenerateRandomBuffer(); + datas.push_back(data); + + auto req = GetIntermediateTaskReturn( + /*idx*/ i, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id, + /*data*/ data, + /*set_in_plasma*/ false); + // WRITE * 2 + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req)); + } + // WRITEEoF + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns( + GetEoFTaskReturn(last_idx, generator_id))); + + ObjectID obj_id; + for (auto i = 0; i < last_idx; i++) { + // READ * 2 + auto status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(obj_id, dynamic_return_ids[i]); + } + // READ (EoF) + auto status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.IsObjectRefStreamEoF()); + ASSERT_EQ(obj_id, ObjectID::Nil()); + // DELETE + manager_.DelObjectRefStream(generator_id); +} + +TEST_F(TaskManagerTest, TestObjectRefStreamMixture) { + /** + * Test the basic cases, but write and read are mixed up. + * CREATE WRITE READ WRITE READ WRITEEoF KeyError DELETE + */ + auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true); + auto generator_id = spec.ReturnId(0); + // CREATE + manager_.CreateObjectRefStream(generator_id); + + auto last_idx = 2; + std::vector dynamic_return_ids; + std::vector> datas; + for (auto i = 0; i < last_idx; i++) { + auto dynamic_return_id = ObjectID::FromIndex(spec.TaskId(), i + 2); + dynamic_return_ids.push_back(dynamic_return_id); + auto data = GenerateRandomBuffer(); + datas.push_back(data); + + auto req = GetIntermediateTaskReturn( + /*idx*/ i, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id, + /*data*/ data, + /*set_in_plasma*/ false); + // WRITE + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req)); + // READ + ObjectID obj_id; + auto status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(obj_id, dynamic_return_ids[i]); + } + // WRITEEoF + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns( + GetEoFTaskReturn(last_idx, generator_id))); + + ObjectID obj_id; + // READ (EoF) + auto status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.IsObjectRefStreamEoF()); + ASSERT_EQ(obj_id, ObjectID::Nil()); + // DELETE + manager_.DelObjectRefStream(generator_id); +} + +TEST_F(TaskManagerTest, TestObjectRefStreamEoF) { + /** + * Test that after writing EoF, write/read doesn't work. + * CREATE WRITE WRITEEoF, WRITE(verify no op) DELETE + */ + auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true); + auto generator_id = spec.ReturnId(0); + // CREATE + manager_.CreateObjectRefStream(generator_id); + + // WRITE + auto dynamic_return_id = ObjectID::FromIndex(spec.TaskId(), 2); + auto data = GenerateRandomBuffer(); + auto req = GetIntermediateTaskReturn( + /*idx*/ 0, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id, + /*data*/ data, + /*set_in_plasma*/ false); + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req)); + // WRITEEoF + ASSERT_TRUE( + manager_.HandleReportGeneratorItemReturns(GetEoFTaskReturn(1, generator_id))); + // READ (works) + ObjectID obj_id; + auto status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(obj_id, dynamic_return_id); + + // WRITE + dynamic_return_id = ObjectID::FromIndex(spec.TaskId(), 3); + data = GenerateRandomBuffer(); + req = GetIntermediateTaskReturn( + /*idx*/ 2, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id, + /*data*/ data, + /*set_in_plasma*/ false); + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req)); + // READ (doesn't works because EoF is already written) + status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.IsObjectRefStreamEoF()); +} + +TEST_F(TaskManagerTest, TestObjectRefStreamIndexDiscarded) { + /** + * Test that when the ObjectRefStream is already written + * the WRITE will be ignored. + */ + auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true); + auto generator_id = spec.ReturnId(0); + // CREATE + manager_.CreateObjectRefStream(generator_id); + + // WRITE + auto dynamic_return_id = ObjectID::FromIndex(spec.TaskId(), 2); + auto data = GenerateRandomBuffer(); + auto req = GetIntermediateTaskReturn( + /*idx*/ 0, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id, + /*data*/ data, + /*set_in_plasma*/ false); + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req)); + // READ + ObjectID obj_id; + auto status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(obj_id, dynamic_return_id); + + // WRITE to the first index again. + dynamic_return_id = ObjectID::FromIndex(spec.TaskId(), 3); + data = GenerateRandomBuffer(); + req = GetIntermediateTaskReturn( + /*idx*/ 0, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id, + /*data*/ data, + /*set_in_plasma*/ false); + ASSERT_FALSE(manager_.HandleReportGeneratorItemReturns(req)); + // READ (New write will be ignored). + status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(obj_id, ObjectID::Nil()); +} + +TEST_F(TaskManagerTest, TestObjectRefStreamReadIgnoredWhenNothingWritten) { + /** + * Test read will return Nil if nothing was written. + * CREATE READ (no op) WRITE READ (working) READ (no op) + */ + auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true); + auto generator_id = spec.ReturnId(0); + // CREATE + manager_.CreateObjectRefStream(generator_id); + + // READ (no-op) + ObjectID obj_id; + auto status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(obj_id, ObjectID::Nil()); + + // WRITE + auto dynamic_return_id = ObjectID::FromIndex(spec.TaskId(), 2); + auto data = GenerateRandomBuffer(); + auto req = GetIntermediateTaskReturn( + /*idx*/ 0, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id, + /*data*/ data, + /*set_in_plasma*/ false); + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req)); + // READ (works this time) + status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(obj_id, dynamic_return_id); + + // READ (nothing should return) + status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(obj_id, ObjectID::Nil()); +} + +TEST_F(TaskManagerTest, TestObjectRefStreamEndtoEnd) { + /** + * Test e2e + * (task submitted -> report intermediate task return -> task finished) + * This also tests if we can read / write stream before / after task finishes. + */ + // Submit a task. + rpc::Address caller_address; + auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true); + auto generator_id = spec.ReturnId(0); + manager_.AddPendingTask(caller_address, spec, "", /*num_retries=*/0); + // CREATE + manager_.CreateObjectRefStream(generator_id); + manager_.MarkDependenciesResolved(spec.TaskId()); + manager_.MarkTaskWaitingForExecution( + spec.TaskId(), NodeID::FromRandom(), WorkerID::FromRandom()); + + // The results are reported before the task is finished. + auto dynamic_return_id = ObjectID::FromIndex(spec.TaskId(), 2); + auto data = GenerateRandomBuffer(); + auto req = GetIntermediateTaskReturn( + /*idx*/ 0, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id, + /*data*/ data, + /*set_in_plasma*/ false); + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req)); + + // NumObjectIDsInScope == Generator + intermediate result. + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 2); + std::vector> results; + WorkerContext ctx(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0)); + RAY_CHECK_OK(store_->Get({dynamic_return_id}, 1, 1, ctx, false, &results)); + ASSERT_EQ(results.size(), 1); + + // Make sure you can read. + ObjectID obj_id; + auto status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(obj_id, dynamic_return_id); + + // Finish the task. + rpc::PushTaskReply reply; + auto return_object = reply.add_return_objects(); + return_object->set_object_id(generator_id.Binary()); + data = GenerateRandomBuffer(); + return_object->set_data(data->Data(), data->Size()); + manager_.CompletePendingTask(spec.TaskId(), reply, caller_address, false); + + // Test you can write to the stream after task finishes. + // TODO(sang): Make sure this doesn't happen by ensuring the ordering + // from the executor side. + auto dynamic_return_id2 = ObjectID::FromIndex(spec.TaskId(), 3); + data = GenerateRandomBuffer(); + req = GetIntermediateTaskReturn( + /*idx*/ 1, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id2, + /*data*/ data, + /*set_in_plasma*/ false); + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req)); + // EoF + ASSERT_TRUE( + manager_.HandleReportGeneratorItemReturns(GetEoFTaskReturn(2, generator_id))); + + // NumObjectIDsInScope == Generator + 2 intermediate result. + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); + results.clear(); + RAY_CHECK_OK(store_->Get({dynamic_return_id2}, 1, 1, ctx, false, &results)); + ASSERT_EQ(results.size(), 1); + + // Make sure you can read. + status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(obj_id, dynamic_return_id2); + + // Nothing more to read. + status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.IsObjectRefStreamEoF()); + + manager_.DelObjectRefStream(generator_id); +} + +TEST_F(TaskManagerTest, TestObjectRefStreamDelCleanReferences) { + /** + * Verify DEL cleans all references and ignore all future WRITE. + * + * CREATE WRITE WRITE DEL (make sure no refs are leaked) + */ + // Submit a task so that generator ID will be available + // to the reference counter. + rpc::Address caller_address; + auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true); + auto generator_id = spec.ReturnId(0); + manager_.AddPendingTask(caller_address, spec, "", /*num_retries=*/0); + manager_.MarkDependenciesResolved(spec.TaskId()); + manager_.MarkTaskWaitingForExecution( + spec.TaskId(), NodeID::FromRandom(), WorkerID::FromRandom()); + + // CREATE + manager_.CreateObjectRefStream(generator_id); + // WRITE + auto dynamic_return_id = ObjectID::FromIndex(spec.TaskId(), 2); + auto data = GenerateRandomBuffer(); + auto req = GetIntermediateTaskReturn( + /*idx*/ 0, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id, + /*data*/ data, + /*set_in_plasma*/ false); + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req)); + // WRITE 2 + auto dynamic_return_id2 = ObjectID::FromIndex(spec.TaskId(), 3); + data = GenerateRandomBuffer(); + req = GetIntermediateTaskReturn( + /*idx*/ 1, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id2, + /*data*/ data, + /*set_in_plasma*/ false); + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req)); + + // NumObjectIDsInScope == Generator + 2 WRITE + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 3); + std::vector> results; + WorkerContext ctx(WorkerType::WORKER, WorkerID::FromRandom(), JobID::FromInt(0)); + RAY_CHECK_OK(store_->Get({dynamic_return_id}, 1, 1, ctx, false, &results)); + ASSERT_EQ(results.size(), 1); + results.clear(); + RAY_CHECK_OK(store_->Get({dynamic_return_id2}, 1, 1, ctx, false, &results)); + ASSERT_EQ(results.size(), 1); + results.clear(); + + // DELETE. This should clean all references except generator id. + manager_.DelObjectRefStream(generator_id); + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1); + // Unfortunately, when the obj ref goes out of scope, + // this is called from the language frontend. We mimic this behavior + // by manually calling these APIs. + store_->Delete({dynamic_return_id}); + store_->Delete({dynamic_return_id2}); + ASSERT_TRUE(store_->Get({dynamic_return_id}, 1, 1, ctx, false, &results).IsTimedOut()); + results.clear(); + ASSERT_TRUE(store_->Get({dynamic_return_id2}, 1, 1, ctx, false, &results).IsTimedOut()); + results.clear(); + + // NOTE: We panic if READ is called after DELETE. The + // API caller should guarantee this doesn't happen. + // So we don't test it. + + // WRITE 3. Should be ignored. + auto dynamic_return_id3 = ObjectID::FromIndex(spec.TaskId(), 4); + data = GenerateRandomBuffer(); + req = GetIntermediateTaskReturn( + /*idx*/ 2, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id3, + /*data*/ data, + /*set_in_plasma*/ false); + ASSERT_FALSE(manager_.HandleReportGeneratorItemReturns(req)); + // The write should have been no op. No refs and no obj values except the generator id. + ASSERT_EQ(reference_counter_->NumObjectIDsInScope(), 1); + ASSERT_TRUE(store_->Get({dynamic_return_id3}, 1, 1, ctx, false, &results).IsTimedOut()); + results.clear(); + + // Finish the task. + // This is needed to pass AssertNoLeaks. + rpc::PushTaskReply reply; + auto return_object = reply.add_return_objects(); + return_object->set_object_id(generator_id.Binary()); + data = GenerateRandomBuffer(); + return_object->set_data(data->Data(), data->Size()); + manager_.CompletePendingTask(spec.TaskId(), reply, caller_address, false); +} + +TEST_F(TaskManagerTest, TestObjectRefStreamOutofOrder) { + /** + * Test the case where the task return RPC is received out of order + */ + auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true); + auto generator_id = spec.ReturnId(0); + // CREATE + manager_.CreateObjectRefStream(generator_id); + + auto last_idx = 2; + std::vector dynamic_return_ids; + // EoF reported first. + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns( + GetEoFTaskReturn(last_idx, generator_id))); + + // Write index 1 -> 0 + for (auto i = last_idx - 1; i > -1; i--) { + auto dynamic_return_id = ObjectID::FromIndex(spec.TaskId(), i + 2); + dynamic_return_ids.insert(dynamic_return_ids.begin(), dynamic_return_id); + auto data = GenerateRandomBuffer(); + + auto req = GetIntermediateTaskReturn( + /*idx*/ i, + /*finished*/ false, + generator_id, + /*dynamic_return_id*/ dynamic_return_id, + /*data*/ data, + /*set_in_plasma*/ false); + // WRITE * 2 + ASSERT_TRUE(manager_.HandleReportGeneratorItemReturns(req)); + } + + // Verify read works. + ObjectID obj_id; + for (auto i = 0; i < last_idx; i++) { + auto status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(obj_id, dynamic_return_ids[i]); + } + + // READ (EoF) + auto status = manager_.TryReadObjectRefStream(generator_id, &obj_id); + ASSERT_TRUE(status.IsObjectRefStreamEoF()); + ASSERT_EQ(obj_id, ObjectID::Nil()); + // DELETE + manager_.DelObjectRefStream(generator_id); +} + } // namespace core } // namespace ray diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index c355d5f421086..57e7dbd1ca768 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -25,36 +25,6 @@ using namespace ray::gcs; namespace ray { namespace core { -void SerializeReturnObject(const ObjectID &object_id, - const std::shared_ptr &return_object, - rpc::ReturnObject *return_object_proto) { - return_object_proto->set_object_id(object_id.Binary()); - - if (!return_object) { - // This should only happen if the local raylet died. Caller should - // retry the task. - RAY_LOG(WARNING) << "Failed to create task return object " << object_id - << " in the object store, exiting."; - QuickExit(); - } - return_object_proto->set_size(return_object->GetSize()); - if (return_object->GetData() != nullptr && return_object->GetData()->IsPlasmaBuffer()) { - return_object_proto->set_in_plasma(true); - } else { - if (return_object->GetData() != nullptr) { - return_object_proto->set_data(return_object->GetData()->Data(), - return_object->GetData()->Size()); - } - if (return_object->GetMetadata() != nullptr) { - return_object_proto->set_metadata(return_object->GetMetadata()->Data(), - return_object->GetMetadata()->Size()); - } - } - for (const auto &nested_ref : return_object->GetNestedRefs()) { - return_object_proto->add_nested_inlined_refs()->CopyFrom(nested_ref); - } -} - void CoreWorkerDirectTaskReceiver::Init( std::shared_ptr client_pool, rpc::Address rpc_address, diff --git a/src/ray/core_worker/transport/direct_actor_transport.h b/src/ray/core_worker/transport/direct_actor_transport.h index a81899f4127e2..d77ec7fcb34ea 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.h +++ b/src/ray/core_worker/transport/direct_actor_transport.h @@ -30,6 +30,7 @@ #include "ray/common/ray_object.h" #include "ray/core_worker/actor_creator.h" #include "ray/core_worker/actor_handle.h" +#include "ray/core_worker/common.h" #include "ray/core_worker/context.h" #include "ray/core_worker/fiber.h" #include "ray/core_worker/store_provider/memory_store/memory_store.h" diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 6ac9b14111358..b78e354768a4f 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -539,6 +539,22 @@ message TaskArg { repeated ObjectReference nested_inlined_refs = 4; } +message ReturnObject { + // Object ID. + bytes object_id = 1; + // If set, indicates the data is in plasma instead of inline. This + // means that data and metadata will be empty. + bool in_plasma = 2; + // Data of the object. + bytes data = 3; + // Metadata of the object. + bytes metadata = 4; + // ObjectIDs that were nested in data. This is only set for inlined objects. + repeated ObjectReference nested_inlined_refs = 5; + // Size of this object. + int64 size = 6; +} + // Task spec of an actor creation task. message ActorCreationTaskSpec { // ID of the actor that will be created by this task. diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index ab709d8cd9a32..23350fcb0f0f4 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -69,22 +69,6 @@ message ActorHandle { int32 max_pending_calls = 13; } -message ReturnObject { - // Object ID. - bytes object_id = 1; - // If set, indicates the data is in plasma instead of inline. This - // means that data and metadata will be empty. - bool in_plasma = 2; - // Data of the object. - bytes data = 3; - // Metadata of the object. - bytes metadata = 4; - // ObjectIDs that were nested in data. This is only set for inlined objects. - repeated ObjectReference nested_inlined_refs = 5; - // Size of this object. - int64 size = 6; -} - message PushTaskRequest { // The ID of the worker this message is intended for. bytes intended_worker_id = 1; @@ -398,6 +382,27 @@ message RayletNotifyGCSRestartRequest {} message RayletNotifyGCSRestartReply {} +message ReportGeneratorItemReturnsRequest { + // The intermediate return object that's dynamically + // generated from the executor side. + repeated ReturnObject dynamic_return_objects = 1; + // The address of the executor. + Address worker_addr = 2; + // The index of the task return. It is used to + // reorder the intermediate return object + // because the ordering of this request + // is not guaranteed. + int64 item_index = 3; + // If true, it means there's going to be no more + // task return after this request. + bool finished = 4; + // The object ref id of the executor task that + // generates intermediate results. + bytes generator_id = 5; +} + +message ReportGeneratorItemReturnsReply {} + service CoreWorkerService { // Notify core worker GCS has restarted. rpc RayletNotifyGCSRestart(RayletNotifyGCSRestartRequest) @@ -418,6 +423,9 @@ service CoreWorkerService { /// It is replied once there are batch of objects that need to be published to /// the caller (subscriber). rpc PubsubLongPolling(PubsubLongPollingRequest) returns (PubsubLongPollingReply); + // The RPC to report the intermediate task return to the caller. + rpc ReportGeneratorItemReturns(ReportGeneratorItemReturnsRequest) + returns (ReportGeneratorItemReturnsReply); /// The pubsub command batch request used by the subscriber. rpc PubsubCommandBatch(PubsubCommandBatchRequest) returns (PubsubCommandBatchReply); // Update the batched object location information to the ownership-based object diff --git a/src/ray/rpc/worker/core_worker_client.h b/src/ray/rpc/worker/core_worker_client.h index de9b68ba0fd50..b8341f7eb6b81 100644 --- a/src/ray/rpc/worker/core_worker_client.h +++ b/src/ray/rpc/worker/core_worker_client.h @@ -154,6 +154,10 @@ class CoreWorkerClientInterface : public pubsub::SubscriberClientInterface { const GetObjectLocationsOwnerRequest &request, const ClientCallback &callback) {} + virtual void ReportGeneratorItemReturns( + const ReportGeneratorItemReturnsRequest &request, + const ClientCallback &callback) {} + /// Tell this actor to exit immediately. virtual void KillActor(const KillActorRequest &request, const ClientCallback &callback) {} @@ -283,6 +287,12 @@ class CoreWorkerClient : public std::enable_shared_from_this, /*method_timeout_ms*/ -1, override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, + ReportGeneratorItemReturns, + grpc_client_, + /*method_timeout_ms*/ -1, + override) + VOID_RPC_CLIENT_METHOD(CoreWorkerService, GetCoreWorkerStats, grpc_client_, diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index b881778f03dea..9c548463a7861 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -43,6 +43,8 @@ namespace rpc { CoreWorkerService, UpdateObjectLocationBatch, -1) \ RPC_SERVICE_HANDLER_SERVER_METRICS_DISABLED( \ CoreWorkerService, GetObjectLocationsOwner, -1) \ + RPC_SERVICE_HANDLER_SERVER_METRICS_DISABLED( \ + CoreWorkerService, ReportGeneratorItemReturns, -1) \ RPC_SERVICE_HANDLER_SERVER_METRICS_DISABLED(CoreWorkerService, KillActor, -1) \ RPC_SERVICE_HANDLER_SERVER_METRICS_DISABLED(CoreWorkerService, CancelTask, -1) \ RPC_SERVICE_HANDLER_SERVER_METRICS_DISABLED(CoreWorkerService, RemoteCancelTask, -1) \ @@ -68,6 +70,7 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(PubsubCommandBatch) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(UpdateObjectLocationBatch) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(GetObjectLocationsOwner) \ + DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(ReportGeneratorItemReturns) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(KillActor) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(CancelTask) \ DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(RemoteCancelTask) \