From 465b577e2738f9a9130c60d7392f3a925b3e1b34 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen <128072568+can-anyscale@users.noreply.github.com> Date: Fri, 14 Jul 2023 17:17:40 -0700 Subject: [PATCH] =?UTF-8?q?Revert=20"Revert=20"[Core]=20Fix=20the=20race?= =?UTF-8?q?=20condition=20where=20grpc=20requests=20are=20handled=20while?= =?UTF-8?q?=20c=E2=80=A6=20(#37301)"=20(#37342)"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 913fea35314c556c1a2be2a11d1bd8a2082e35cc. --- BUILD.bazel | 1 + src/ray/core_worker/core_worker.cc | 7 +++++++ src/ray/core_worker/core_worker.h | 13 +++++++++++++ src/ray/rpc/server_call.h | 14 ++++++++++++++ src/ray/rpc/worker/core_worker_server.h | 6 ++++-- 5 files changed, 39 insertions(+), 2 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 2f07a0f8e77e5..27c30daeb07b4 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -846,6 +846,7 @@ cc_library( "//src/ray/protobuf:worker_cc_proto", "@boost//:circular_buffer", "@boost//:fiber", + "@com_google_absl//absl/cleanup:cleanup", "@com_google_absl//absl/container:btree", "@com_google_absl//absl/container:flat_hash_map", "@com_google_absl//absl/container:flat_hash_set", diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index fd12629417342..64bea0353877e 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -20,6 +20,7 @@ #include +#include "absl/cleanup/cleanup.h" #include "absl/strings/str_format.h" #include "boost/fiber/all.hpp" #include "ray/common/bundle_spec.h" @@ -119,6 +120,12 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ grpc_service_(io_service_, *this), task_execution_service_work_(task_execution_service_), exiting_detail_(std::nullopt) { + // Notify that core worker is initialized. + auto initialzed_scope_guard = absl::MakeCleanup([this] { + absl::MutexLock lock(&initialize_mutex_); + initialized_ = true; + intialize_cv_.SignalAll(); + }); RAY_LOG(DEBUG) << "Constructing CoreWorker, worker_id: " << worker_id; // Initialize task receivers. diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index b03ff010a9084..3eaaae20dfea2 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1520,6 +1520,14 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { } } + /// Wait until the worker is initialized. + void WaitUntilInitialized() override { + absl::MutexLock lock(&initialize_mutex_); + while (!initialized_) { + intialize_cv_.WaitWithTimeout(&initialize_mutex_, absl::Seconds(1)); + } + } + const CoreWorkerOptions options_; /// Callback to get the current language (e.g., Python) call site. @@ -1548,6 +1556,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { std::string main_thread_task_name_ GUARDED_BY(mutex_); + /// States that used for initialization. + absl::Mutex initialize_mutex_; + absl::CondVar intialize_cv_; + bool initialized_ GUARDED_BY(initialize_mutex_) = false; + /// Event loop where the IO events are handled. e.g. async GCS operations. instrumented_io_context io_service_; diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index b6f42b391acd2..714c2e297b209 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -64,6 +64,16 @@ enum class ServerCallState { class ServerCallFactory; +/// Represents a service handler that might not +/// be ready to serve RPCs immediately after construction. +class DelayedServiceHandler { + public: + virtual ~DelayedServiceHandler() = default; + + /// Blocks until the service is ready to serve RPCs. + virtual void WaitUntilInitialized() = 0; +}; + /// Represents an incoming request of a gRPC server. /// /// The lifecycle and state transition of a `ServerCall` is as follows: @@ -148,6 +158,7 @@ class ServerCallImpl : public ServerCall { /// \param[in] io_service The event loop. /// \param[in] call_name The name of the RPC call. /// \param[in] record_metrics If true, it records and exports the gRPC server metrics. + /// request. ServerCallImpl( const ServerCallFactory &factory, ServiceHandler &service_handler, @@ -196,6 +207,9 @@ class ServerCallImpl : public ServerCall { } void HandleRequestImpl() { + if constexpr (std::is_base_of_v) { + service_handler_.WaitUntilInitialized(); + } state_ = ServerCallState::PROCESSING; // NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in // a different thread, and will cause `this` to be deleted. diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index d6fc43dd2f9fe..74bb9436928d7 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -86,9 +86,11 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(NumPendingTasks) /// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`. -class CoreWorkerServiceHandler { +class CoreWorkerServiceHandler : public DelayedServiceHandler { public: - virtual ~CoreWorkerServiceHandler() {} + /// Blocks until the service is ready to serve RPCs. + virtual void WaitUntilInitialized() = 0; + /// Handlers. For all of the following handlers, the implementations can /// handle the request asynchronously. When handling is done, the /// `send_reply_callback` should be called. See