From 0144e9812aeb10422e1bee089725a5c145a7ef1e Mon Sep 17 00:00:00 2001 From: scv119 Date: Wed, 5 Jul 2023 13:40:33 -0700 Subject: [PATCH 01/10] update --- src/ray/core_worker/core_worker.cc | 30 ++++++++++++++++++++++++++++++ src/ray/core_worker/core_worker.h | 15 +++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 0164df3235fff..d87ee89dd52d2 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -611,6 +611,13 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ // Verify driver and worker are never mixed in the same process. RAY_CHECK_EQ(options_.worker_type != WorkerType::DRIVER, niced); #endif + + // Notify that core worker is initialized. + { + absl::MutexLock lock(&initialize_mutex_); + initialized_ = true; + intialize_cv_.notify_all(); + } } CoreWorker::~CoreWorker() { RAY_LOG(INFO) << "Core worker is destructed"; } @@ -2867,6 +2874,7 @@ void CoreWorker::HandleReportGeneratorItemReturns( rpc::ReportGeneratorItemReturnsRequest request, rpc::ReportGeneratorItemReturnsReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); task_manager_->HandleReportGeneratorItemReturns(request); send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -3011,6 +3019,7 @@ void CoreWorker::HandlePushTask(rpc::PushTaskRequest request, rpc::SendReplyCallback send_reply_callback) { RAY_LOG(DEBUG) << "Received Handle Push Task " << TaskID::FromBinary(request.task_spec().task_id()); + WaitUntilInitialized(); if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()), send_reply_callback)) { return; @@ -3070,6 +3079,7 @@ void CoreWorker::HandleDirectActorCallArgWaitComplete( rpc::DirectActorCallArgWaitCompleteRequest request, rpc::DirectActorCallArgWaitCompleteReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()), send_reply_callback)) { return; @@ -3091,6 +3101,7 @@ void CoreWorker::HandleRayletNotifyGCSRestart( rpc::RayletNotifyGCSRestartRequest request, rpc::RayletNotifyGCSRestartReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); gcs_client_->AsyncResubscribe(); send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -3098,6 +3109,7 @@ void CoreWorker::HandleRayletNotifyGCSRestart( void CoreWorker::HandleGetObjectStatus(rpc::GetObjectStatusRequest request, rpc::GetObjectStatusReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); if (HandleWrongRecipient(WorkerID::FromBinary(request.owner_worker_id()), send_reply_callback)) { RAY_LOG(INFO) << "Handling GetObjectStatus for object produced by a previous worker " @@ -3177,6 +3189,7 @@ void CoreWorker::HandleWaitForActorOutOfScope( rpc::WaitForActorOutOfScopeRequest request, rpc::WaitForActorOutOfScopeReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); // Currently WaitForActorOutOfScope is only used when GCS actor service is enabled. if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()), send_reply_callback)) { @@ -3304,6 +3317,7 @@ void CoreWorker::ProcessPubsubCommands(const Commands &commands, void CoreWorker::HandlePubsubLongPolling(rpc::PubsubLongPollingRequest request, rpc::PubsubLongPollingReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); const auto subscriber_id = NodeID::FromBinary(request.subscriber_id()); RAY_LOG(DEBUG) << "Got a long polling request from a node " << subscriber_id; object_info_publisher_->ConnectToSubscriber( @@ -3313,6 +3327,7 @@ void CoreWorker::HandlePubsubLongPolling(rpc::PubsubLongPollingRequest request, void CoreWorker::HandlePubsubCommandBatch(rpc::PubsubCommandBatchRequest request, rpc::PubsubCommandBatchReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); const auto subscriber_id = NodeID::FromBinary(request.subscriber_id()); ProcessPubsubCommands(request.commands(), subscriber_id); send_reply_callback(Status::OK(), nullptr, nullptr); @@ -3322,6 +3337,7 @@ void CoreWorker::HandleUpdateObjectLocationBatch( rpc::UpdateObjectLocationBatchRequest request, rpc::UpdateObjectLocationBatchReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); const auto &worker_id = request.intended_worker_id(); if (HandleWrongRecipient(WorkerID::FromBinary(worker_id), send_reply_callback)) { return; @@ -3457,6 +3473,7 @@ void CoreWorker::HandleGetObjectLocationsOwner( rpc::GetObjectLocationsOwnerRequest request, rpc::GetObjectLocationsOwnerReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); auto &object_location_request = request.object_location_request(); if (HandleWrongRecipient( WorkerID::FromBinary(object_location_request.intended_worker_id()), @@ -3496,6 +3513,7 @@ void CoreWorker::ProcessSubscribeForRefRemoved( void CoreWorker::HandleRemoteCancelTask(rpc::RemoteCancelTaskRequest request, rpc::RemoteCancelTaskReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); auto status = CancelTask(ObjectID::FromBinary(request.remote_object_id()), request.force_kill(), request.recursive()); @@ -3505,6 +3523,7 @@ void CoreWorker::HandleRemoteCancelTask(rpc::RemoteCancelTaskRequest request, void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request, rpc::CancelTaskReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); TaskID task_id = TaskID::FromBinary(request.intended_task_id()); bool requested_task_running; { @@ -3559,6 +3578,7 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request, void CoreWorker::HandleKillActor(rpc::KillActorRequest request, rpc::KillActorReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); ActorID intended_actor_id = ActorID::FromBinary(request.intended_actor_id()); if (intended_actor_id != worker_context_.GetCurrentActorID()) { std::ostringstream stream; @@ -3590,6 +3610,7 @@ void CoreWorker::HandleKillActor(rpc::KillActorRequest request, void CoreWorker::HandleGetCoreWorkerStats(rpc::GetCoreWorkerStatsRequest request, rpc::GetCoreWorkerStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); absl::MutexLock lock(&mutex_); auto limit = request.has_limit() ? request.limit() : -1; auto stats = reply->mutable_core_worker_stats(); @@ -3649,6 +3670,7 @@ void CoreWorker::HandleGetCoreWorkerStats(rpc::GetCoreWorkerStatsRequest request void CoreWorker::HandleLocalGC(rpc::LocalGCRequest request, rpc::LocalGCReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); if (options_.gc_collect != nullptr) { options_.gc_collect(request.triggered_by_global_gc()); send_reply_callback(Status::OK(), nullptr, nullptr); @@ -3661,6 +3683,7 @@ void CoreWorker::HandleLocalGC(rpc::LocalGCRequest request, void CoreWorker::HandleDeleteObjects(rpc::DeleteObjectsRequest request, rpc::DeleteObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); std::vector object_ids; for (const auto &obj_id : request.object_ids()) { object_ids.push_back(ObjectID::FromBinary(obj_id)); @@ -3692,6 +3715,7 @@ Status CoreWorker::DeleteImpl(const std::vector &object_ids, bool loca void CoreWorker::HandleSpillObjects(rpc::SpillObjectsRequest request, rpc::SpillObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); if (options_.spill_objects != nullptr) { auto object_refs = VectorFromProtobuf(request.object_refs_to_spill()); @@ -3709,6 +3733,7 @@ void CoreWorker::HandleSpillObjects(rpc::SpillObjectsRequest request, void CoreWorker::HandleRestoreSpilledObjects(rpc::RestoreSpilledObjectsRequest request, rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); if (options_.restore_spilled_objects != nullptr) { // Get a list of object ids. std::vector object_refs_to_restore; @@ -3739,6 +3764,7 @@ void CoreWorker::HandleRestoreSpilledObjects(rpc::RestoreSpilledObjectsRequest r void CoreWorker::HandleDeleteSpilledObjects(rpc::DeleteSpilledObjectsRequest request, rpc::DeleteSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); if (options_.delete_spilled_objects != nullptr) { std::vector spilled_objects_url; spilled_objects_url.reserve(request.spilled_objects_url_size()); @@ -3758,6 +3784,7 @@ void CoreWorker::HandleDeleteSpilledObjects(rpc::DeleteSpilledObjectsRequest req void CoreWorker::HandleExit(rpc::ExitRequest request, rpc::ExitReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); bool own_objects = reference_counter_->OwnObjects(); int64_t pins_in_flight = local_raylet_client_->GetPinsInFlight(); // We consider the worker to be idle if it doesn't own any objects and it doesn't have @@ -3794,6 +3821,7 @@ void CoreWorker::HandleExit(rpc::ExitRequest request, void CoreWorker::HandleAssignObjectOwner(rpc::AssignObjectOwnerRequest request, rpc::AssignObjectOwnerReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); ObjectID object_id = ObjectID::FromBinary(request.object_id()); const auto &borrower_address = request.borrower_address(); std::string call_site = request.call_site(); @@ -3821,6 +3849,7 @@ void CoreWorker::HandleAssignObjectOwner(rpc::AssignObjectOwnerRequest request, void CoreWorker::HandleNumPendingTasks(rpc::NumPendingTasksRequest request, rpc::NumPendingTasksReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); RAY_LOG(DEBUG) << "Received NumPendingTasks request."; reply->set_num_pending_tasks(task_manager_->NumPendingTasks()); send_reply_callback(Status::OK(), nullptr, nullptr); @@ -3895,6 +3924,7 @@ void CoreWorker::PlasmaCallback(SetResultCallback success, void CoreWorker::HandlePlasmaObjectReady(rpc::PlasmaObjectReadyRequest request, rpc::PlasmaObjectReadyReply *reply, rpc::SendReplyCallback send_reply_callback) { + WaitUntilInitialized(); std::vector> callbacks; { absl::MutexLock lock(&plasma_mutex_); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index b03ff010a9084..8bc3ac5814abc 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1520,6 +1520,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { } } + /// If the worker is not initialized yet, send an error reply and return true. + /// This can happen since we first start the grpc server and then + /// intialize internal components. + void WaitUntilInitialized() { + absl::MutexLock lock(&initialize_mutex_); + while (!initialized_) { + intialize_cv_.WaitWithTimeout(&initialize_mutex_, absl::Seconds(1)); + } + } + const CoreWorkerOptions options_; /// Callback to get the current language (e.g., Python) call site. @@ -1548,6 +1558,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { std::string main_thread_task_name_ GUARDED_BY(mutex_); + /// Stats that used for waiting for initialization. + absl::Mutex initialize_mutex_; + absl::CondVar intialize_cv_; + bool initialized_ GUARDED_BY(initialize_mutex_) = false; + /// Event loop where the IO events are handled. e.g. async GCS operations. instrumented_io_context io_service_; From 4aef9853fd99ff52a0c7813fff28e5d54aa970b7 Mon Sep 17 00:00:00 2001 From: scv119 Date: Wed, 5 Jul 2023 13:41:31 -0700 Subject: [PATCH 02/10] update --- src/ray/core_worker/core_worker.h | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 8bc3ac5814abc..98d97c95cce55 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1520,9 +1520,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { } } - /// If the worker is not initialized yet, send an error reply and return true. - /// This can happen since we first start the grpc server and then - /// intialize internal components. + /// Wait until the worker is initialized. void WaitUntilInitialized() { absl::MutexLock lock(&initialize_mutex_); while (!initialized_) { From a981dc2b2488f9e909e7921eabf5506f31be9e8d Mon Sep 17 00:00:00 2001 From: scv119 Date: Wed, 5 Jul 2023 15:54:27 -0700 Subject: [PATCH 03/10] api --- src/ray/core_worker/core_worker.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index d87ee89dd52d2..6b219df33028c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -616,7 +616,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ { absl::MutexLock lock(&initialize_mutex_); initialized_ = true; - intialize_cv_.notify_all(); + intialize_cv_.SignalAll(); } } From a428987516e418149086d428687fd029272c88e3 Mon Sep 17 00:00:00 2001 From: scv119 Date: Wed, 5 Jul 2023 15:55:20 -0700 Subject: [PATCH 04/10] update --- src/ray/core_worker/core_worker.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 98d97c95cce55..b8ee1120e5213 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1556,7 +1556,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { std::string main_thread_task_name_ GUARDED_BY(mutex_); - /// Stats that used for waiting for initialization. + /// States that used for initialization. absl::Mutex initialize_mutex_; absl::CondVar intialize_cv_; bool initialized_ GUARDED_BY(initialize_mutex_) = false; From 9419e3a100b0bed57e832b0dc97b5e8684d4a51f Mon Sep 17 00:00:00 2001 From: scv119 Date: Wed, 5 Jul 2023 22:11:49 -0700 Subject: [PATCH 05/10] update --- BUILD.bazel | 1 + src/ray/core_worker/core_worker.cc | 14 +++++++------- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 2f07a0f8e77e5..27c30daeb07b4 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -846,6 +846,7 @@ cc_library( "//src/ray/protobuf:worker_cc_proto", "@boost//:circular_buffer", "@boost//:fiber", + "@com_google_absl//absl/cleanup:cleanup", "@com_google_absl//absl/container:btree", "@com_google_absl//absl/container:flat_hash_map", "@com_google_absl//absl/container:flat_hash_set", diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 6b219df33028c..f25e9968ad6ad 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -20,6 +20,7 @@ #include +#include "absl/cleanup/cleanup.h" #include "absl/strings/str_format.h" #include "boost/fiber/all.hpp" #include "ray/common/bundle_spec.h" @@ -119,6 +120,12 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ grpc_service_(io_service_, *this), task_execution_service_work_(task_execution_service_), exiting_detail_(std::nullopt) { + // Notify that core worker is initialized. + auto initialzed_scope_guard = absl::MakeCleanup([this] { + absl::MutexLock lock(&initialize_mutex_); + initialized_ = true; + intialize_cv_.SignalAll(); + }); RAY_LOG(DEBUG) << "Constructing CoreWorker, worker_id: " << worker_id; // Initialize task receivers. @@ -611,13 +618,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ // Verify driver and worker are never mixed in the same process. RAY_CHECK_EQ(options_.worker_type != WorkerType::DRIVER, niced); #endif - - // Notify that core worker is initialized. - { - absl::MutexLock lock(&initialize_mutex_); - initialized_ = true; - intialize_cv_.SignalAll(); - } } CoreWorker::~CoreWorker() { RAY_LOG(INFO) << "Core worker is destructed"; } From 9a683987ec5f0982dc75561eb275557c6660b70f Mon Sep 17 00:00:00 2001 From: scv119 Date: Wed, 5 Jul 2023 22:53:32 -0700 Subject: [PATCH 06/10] update --- src/ray/core_worker/core_worker.cc | 23 -------------- src/ray/core_worker/core_worker.h | 2 +- src/ray/rpc/worker/core_worker_server.h | 40 ++++++++++++++++++++++++- 3 files changed, 40 insertions(+), 25 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index f25e9968ad6ad..cbf0b060875fe 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2874,7 +2874,6 @@ void CoreWorker::HandleReportGeneratorItemReturns( rpc::ReportGeneratorItemReturnsRequest request, rpc::ReportGeneratorItemReturnsReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); task_manager_->HandleReportGeneratorItemReturns(request); send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -3019,7 +3018,6 @@ void CoreWorker::HandlePushTask(rpc::PushTaskRequest request, rpc::SendReplyCallback send_reply_callback) { RAY_LOG(DEBUG) << "Received Handle Push Task " << TaskID::FromBinary(request.task_spec().task_id()); - WaitUntilInitialized(); if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()), send_reply_callback)) { return; @@ -3079,7 +3077,6 @@ void CoreWorker::HandleDirectActorCallArgWaitComplete( rpc::DirectActorCallArgWaitCompleteRequest request, rpc::DirectActorCallArgWaitCompleteReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()), send_reply_callback)) { return; @@ -3101,7 +3098,6 @@ void CoreWorker::HandleRayletNotifyGCSRestart( rpc::RayletNotifyGCSRestartRequest request, rpc::RayletNotifyGCSRestartReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); gcs_client_->AsyncResubscribe(); send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -3109,7 +3105,6 @@ void CoreWorker::HandleRayletNotifyGCSRestart( void CoreWorker::HandleGetObjectStatus(rpc::GetObjectStatusRequest request, rpc::GetObjectStatusReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); if (HandleWrongRecipient(WorkerID::FromBinary(request.owner_worker_id()), send_reply_callback)) { RAY_LOG(INFO) << "Handling GetObjectStatus for object produced by a previous worker " @@ -3189,7 +3184,6 @@ void CoreWorker::HandleWaitForActorOutOfScope( rpc::WaitForActorOutOfScopeRequest request, rpc::WaitForActorOutOfScopeReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); // Currently WaitForActorOutOfScope is only used when GCS actor service is enabled. if (HandleWrongRecipient(WorkerID::FromBinary(request.intended_worker_id()), send_reply_callback)) { @@ -3317,7 +3311,6 @@ void CoreWorker::ProcessPubsubCommands(const Commands &commands, void CoreWorker::HandlePubsubLongPolling(rpc::PubsubLongPollingRequest request, rpc::PubsubLongPollingReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); const auto subscriber_id = NodeID::FromBinary(request.subscriber_id()); RAY_LOG(DEBUG) << "Got a long polling request from a node " << subscriber_id; object_info_publisher_->ConnectToSubscriber( @@ -3327,7 +3320,6 @@ void CoreWorker::HandlePubsubLongPolling(rpc::PubsubLongPollingRequest request, void CoreWorker::HandlePubsubCommandBatch(rpc::PubsubCommandBatchRequest request, rpc::PubsubCommandBatchReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); const auto subscriber_id = NodeID::FromBinary(request.subscriber_id()); ProcessPubsubCommands(request.commands(), subscriber_id); send_reply_callback(Status::OK(), nullptr, nullptr); @@ -3337,7 +3329,6 @@ void CoreWorker::HandleUpdateObjectLocationBatch( rpc::UpdateObjectLocationBatchRequest request, rpc::UpdateObjectLocationBatchReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); const auto &worker_id = request.intended_worker_id(); if (HandleWrongRecipient(WorkerID::FromBinary(worker_id), send_reply_callback)) { return; @@ -3473,7 +3464,6 @@ void CoreWorker::HandleGetObjectLocationsOwner( rpc::GetObjectLocationsOwnerRequest request, rpc::GetObjectLocationsOwnerReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); auto &object_location_request = request.object_location_request(); if (HandleWrongRecipient( WorkerID::FromBinary(object_location_request.intended_worker_id()), @@ -3513,7 +3503,6 @@ void CoreWorker::ProcessSubscribeForRefRemoved( void CoreWorker::HandleRemoteCancelTask(rpc::RemoteCancelTaskRequest request, rpc::RemoteCancelTaskReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); auto status = CancelTask(ObjectID::FromBinary(request.remote_object_id()), request.force_kill(), request.recursive()); @@ -3523,7 +3512,6 @@ void CoreWorker::HandleRemoteCancelTask(rpc::RemoteCancelTaskRequest request, void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request, rpc::CancelTaskReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); TaskID task_id = TaskID::FromBinary(request.intended_task_id()); bool requested_task_running; { @@ -3578,7 +3566,6 @@ void CoreWorker::HandleCancelTask(rpc::CancelTaskRequest request, void CoreWorker::HandleKillActor(rpc::KillActorRequest request, rpc::KillActorReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); ActorID intended_actor_id = ActorID::FromBinary(request.intended_actor_id()); if (intended_actor_id != worker_context_.GetCurrentActorID()) { std::ostringstream stream; @@ -3610,7 +3597,6 @@ void CoreWorker::HandleKillActor(rpc::KillActorRequest request, void CoreWorker::HandleGetCoreWorkerStats(rpc::GetCoreWorkerStatsRequest request, rpc::GetCoreWorkerStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); absl::MutexLock lock(&mutex_); auto limit = request.has_limit() ? request.limit() : -1; auto stats = reply->mutable_core_worker_stats(); @@ -3670,7 +3656,6 @@ void CoreWorker::HandleGetCoreWorkerStats(rpc::GetCoreWorkerStatsRequest request void CoreWorker::HandleLocalGC(rpc::LocalGCRequest request, rpc::LocalGCReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); if (options_.gc_collect != nullptr) { options_.gc_collect(request.triggered_by_global_gc()); send_reply_callback(Status::OK(), nullptr, nullptr); @@ -3683,7 +3668,6 @@ void CoreWorker::HandleLocalGC(rpc::LocalGCRequest request, void CoreWorker::HandleDeleteObjects(rpc::DeleteObjectsRequest request, rpc::DeleteObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); std::vector object_ids; for (const auto &obj_id : request.object_ids()) { object_ids.push_back(ObjectID::FromBinary(obj_id)); @@ -3715,7 +3699,6 @@ Status CoreWorker::DeleteImpl(const std::vector &object_ids, bool loca void CoreWorker::HandleSpillObjects(rpc::SpillObjectsRequest request, rpc::SpillObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); if (options_.spill_objects != nullptr) { auto object_refs = VectorFromProtobuf(request.object_refs_to_spill()); @@ -3733,7 +3716,6 @@ void CoreWorker::HandleSpillObjects(rpc::SpillObjectsRequest request, void CoreWorker::HandleRestoreSpilledObjects(rpc::RestoreSpilledObjectsRequest request, rpc::RestoreSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); if (options_.restore_spilled_objects != nullptr) { // Get a list of object ids. std::vector object_refs_to_restore; @@ -3764,7 +3746,6 @@ void CoreWorker::HandleRestoreSpilledObjects(rpc::RestoreSpilledObjectsRequest r void CoreWorker::HandleDeleteSpilledObjects(rpc::DeleteSpilledObjectsRequest request, rpc::DeleteSpilledObjectsReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); if (options_.delete_spilled_objects != nullptr) { std::vector spilled_objects_url; spilled_objects_url.reserve(request.spilled_objects_url_size()); @@ -3784,7 +3765,6 @@ void CoreWorker::HandleDeleteSpilledObjects(rpc::DeleteSpilledObjectsRequest req void CoreWorker::HandleExit(rpc::ExitRequest request, rpc::ExitReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); bool own_objects = reference_counter_->OwnObjects(); int64_t pins_in_flight = local_raylet_client_->GetPinsInFlight(); // We consider the worker to be idle if it doesn't own any objects and it doesn't have @@ -3821,7 +3801,6 @@ void CoreWorker::HandleExit(rpc::ExitRequest request, void CoreWorker::HandleAssignObjectOwner(rpc::AssignObjectOwnerRequest request, rpc::AssignObjectOwnerReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); ObjectID object_id = ObjectID::FromBinary(request.object_id()); const auto &borrower_address = request.borrower_address(); std::string call_site = request.call_site(); @@ -3849,7 +3828,6 @@ void CoreWorker::HandleAssignObjectOwner(rpc::AssignObjectOwnerRequest request, void CoreWorker::HandleNumPendingTasks(rpc::NumPendingTasksRequest request, rpc::NumPendingTasksReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); RAY_LOG(DEBUG) << "Received NumPendingTasks request."; reply->set_num_pending_tasks(task_manager_->NumPendingTasks()); send_reply_callback(Status::OK(), nullptr, nullptr); @@ -3924,7 +3902,6 @@ void CoreWorker::PlasmaCallback(SetResultCallback success, void CoreWorker::HandlePlasmaObjectReady(rpc::PlasmaObjectReadyRequest request, rpc::PlasmaObjectReadyReply *reply, rpc::SendReplyCallback send_reply_callback) { - WaitUntilInitialized(); std::vector> callbacks; { absl::MutexLock lock(&plasma_mutex_); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index b8ee1120e5213..3eaaae20dfea2 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1521,7 +1521,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { } /// Wait until the worker is initialized. - void WaitUntilInitialized() { + void WaitUntilInitialized() override { absl::MutexLock lock(&initialize_mutex_); while (!initialized_) { intialize_cv_.WaitWithTimeout(&initialize_mutex_, absl::Seconds(1)); diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index d6fc43dd2f9fe..babe1b72b10de 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -89,6 +89,10 @@ namespace rpc { class CoreWorkerServiceHandler { public: virtual ~CoreWorkerServiceHandler() {} + + /// Blocks until the service is ready to serve RPCs. + virtual void WaitUntilInitialized() = 0; + /// Handlers. For all of the following handlers, the implementations can /// handle the request asynchronously. When handling is done, the /// `send_reply_callback` should be called. See @@ -102,6 +106,30 @@ class CoreWorkerServiceHandler { RAY_CORE_WORKER_DECLARE_RPC_HANDLERS }; +/// The `ServerCallFactory` for `CoreWorkerService`. It waits until the +/// `CoreWorkerServiceHandler` is initialized before creating a new call. +class CoreWorkerServerCallFactory : public ServerCallFactory { + public: + /// Constructor. + CoreWorkerServerCallFactory(std::unique_ptr delegate, + CoreWorkerServiceHandler &service_handler) + : delegate_(std::move(delegate)), service_handler_(service_handler) {} + + void CreateCall() const override { + service_handler_.WaitUntilInitialized(); + delegate_->CreateCall(); + } + + /// Get the maximum request number to handle at the same time. -1 means no limit. + virtual int64_t GetMaxActiveRPCs() const override { + return delegate_->GetMaxActiveRPCs(); + } + + private: + std::unique_ptr delegate_; + CoreWorkerServiceHandler &service_handler_; +}; + /// The `GrpcServer` for `CoreWorkerService`. class CoreWorkerGrpcService : public GrpcService { public: @@ -120,10 +148,20 @@ class CoreWorkerGrpcService : public GrpcService { const std::unique_ptr &cq, std::vector> *server_call_factories, const ClusterID &cluster_id) override { - RAY_CORE_WORKER_RPC_HANDLERS + std::vector> tmp_server_call_factories; + InitServerCallFactoriesImpl(cq, &tmp_server_call_factories, cluster_id); + for (auto &factory : tmp_server_call_factories) { + server_call_factories->emplace_back(std::make_unique( + std::move(factory), service_handler_)); + } } private: + void InitServerCallFactoriesImpl( + const std::unique_ptr &cq, + std::vector> *server_call_factories, + const ClusterID &cluster_id){RAY_CORE_WORKER_RPC_HANDLERS} + /// The grpc async service object. CoreWorkerService::AsyncService service_; From 3f9c24773c8f8653b149944385f92e408a77450a Mon Sep 17 00:00:00 2001 From: scv119 Date: Tue, 11 Jul 2023 00:13:19 -0700 Subject: [PATCH 07/10] update --- src/ray/rpc/server_call.h | 16 +++++++++++++++- src/ray/rpc/worker/core_worker_server.h | 16 ++-------------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index b6f42b391acd2..2ade29be58f61 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -64,6 +64,14 @@ enum class ServerCallState { class ServerCallFactory; +class DelayedServiceHandler { + public: + virtual ~DelayedServiceHandler() = default; + + /// Blocks until the service is ready to serve RPCs. + virtual void WaitUntilInitialized() = 0; +}; + /// Represents an incoming request of a gRPC server. /// /// The lifecycle and state transition of a `ServerCall` is as follows: @@ -148,6 +156,8 @@ class ServerCallImpl : public ServerCall { /// \param[in] io_service The event loop. /// \param[in] call_name The name of the RPC call. /// \param[in] record_metrics If true, it records and exports the gRPC server metrics. + /// \param[in] preprocess_function If not nullptr, it will be called before handling + /// request. ServerCallImpl( const ServerCallFactory &factory, ServiceHandler &service_handler, @@ -155,7 +165,8 @@ class ServerCallImpl : public ServerCall { instrumented_io_context &io_service, std::string call_name, const ClusterID &cluster_id, - bool record_metrics) + bool record_metrics, + std::function preprocess_function = nullptr) : state_(ServerCallState::PENDING), factory_(factory), service_handler_(service_handler), @@ -196,6 +207,9 @@ class ServerCallImpl : public ServerCall { } void HandleRequestImpl() { + if (std::is_base_of_v) { + dynamic_cast(service_handler_).WaitUntilInitialized(); + } state_ = ServerCallState::PROCESSING; // NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in // a different thread, and will cause `this` to be deleted. diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index babe1b72b10de..fa7e22468e501 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -86,10 +86,8 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(NumPendingTasks) /// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`. -class CoreWorkerServiceHandler { +class CoreWorkerServiceHandler : public DelayedServiceHandler { public: - virtual ~CoreWorkerServiceHandler() {} - /// Blocks until the service is ready to serve RPCs. virtual void WaitUntilInitialized() = 0; @@ -148,20 +146,10 @@ class CoreWorkerGrpcService : public GrpcService { const std::unique_ptr &cq, std::vector> *server_call_factories, const ClusterID &cluster_id) override { - std::vector> tmp_server_call_factories; - InitServerCallFactoriesImpl(cq, &tmp_server_call_factories, cluster_id); - for (auto &factory : tmp_server_call_factories) { - server_call_factories->emplace_back(std::make_unique( - std::move(factory), service_handler_)); - } + RAY_CORE_WORKER_RPC_HANDLERS } private: - void InitServerCallFactoriesImpl( - const std::unique_ptr &cq, - std::vector> *server_call_factories, - const ClusterID &cluster_id){RAY_CORE_WORKER_RPC_HANDLERS} - /// The grpc async service object. CoreWorkerService::AsyncService service_; From 65b5cfec5e74126199ee2ce0ad13e37e502761f0 Mon Sep 17 00:00:00 2001 From: scv119 Date: Tue, 11 Jul 2023 00:15:48 -0700 Subject: [PATCH 08/10] update --- src/ray/rpc/server_call.h | 2 ++ src/ray/rpc/worker/core_worker_server.h | 24 ------------------------ 2 files changed, 2 insertions(+), 24 deletions(-) diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index 2ade29be58f61..b3e185acccd4f 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -64,6 +64,8 @@ enum class ServerCallState { class ServerCallFactory; +/// Represents a service handler that might not +/// be ready to serve RPCs immediately after construction. class DelayedServiceHandler { public: virtual ~DelayedServiceHandler() = default; diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index fa7e22468e501..74bb9436928d7 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -104,30 +104,6 @@ class CoreWorkerServiceHandler : public DelayedServiceHandler { RAY_CORE_WORKER_DECLARE_RPC_HANDLERS }; -/// The `ServerCallFactory` for `CoreWorkerService`. It waits until the -/// `CoreWorkerServiceHandler` is initialized before creating a new call. -class CoreWorkerServerCallFactory : public ServerCallFactory { - public: - /// Constructor. - CoreWorkerServerCallFactory(std::unique_ptr delegate, - CoreWorkerServiceHandler &service_handler) - : delegate_(std::move(delegate)), service_handler_(service_handler) {} - - void CreateCall() const override { - service_handler_.WaitUntilInitialized(); - delegate_->CreateCall(); - } - - /// Get the maximum request number to handle at the same time. -1 means no limit. - virtual int64_t GetMaxActiveRPCs() const override { - return delegate_->GetMaxActiveRPCs(); - } - - private: - std::unique_ptr delegate_; - CoreWorkerServiceHandler &service_handler_; -}; - /// The `GrpcServer` for `CoreWorkerService`. class CoreWorkerGrpcService : public GrpcService { public: From 4ac577dd04f9703baaa102df52b806ceebfc38e0 Mon Sep 17 00:00:00 2001 From: scv119 Date: Tue, 11 Jul 2023 01:14:54 -0700 Subject: [PATCH 09/10] update --- src/ray/rpc/server_call.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index b3e185acccd4f..fb9518b7e35eb 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -209,8 +209,8 @@ class ServerCallImpl : public ServerCall { } void HandleRequestImpl() { - if (std::is_base_of_v) { - dynamic_cast(service_handler_).WaitUntilInitialized(); + if constexpr (std::is_base_of_v) { + &service_handler_.WaitUntilInitialized(); } state_ = ServerCallState::PROCESSING; // NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in From f2942c2891a5b1fb1b68cc1721ce52da8309b961 Mon Sep 17 00:00:00 2001 From: scv119 Date: Tue, 11 Jul 2023 01:17:20 -0700 Subject: [PATCH 10/10] update --- src/ray/rpc/server_call.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index fb9518b7e35eb..82d078ed3ef91 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -210,7 +210,7 @@ class ServerCallImpl : public ServerCall { void HandleRequestImpl() { if constexpr (std::is_base_of_v) { - &service_handler_.WaitUntilInitialized(); + service_handler_.WaitUntilInitialized(); } state_ = ServerCallState::PROCESSING; // NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in