diff --git a/BUILD.bazel b/BUILD.bazel index 27c30daeb07b4..2f07a0f8e77e5 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -846,7 +846,6 @@ cc_library( "//src/ray/protobuf:worker_cc_proto", "@boost//:circular_buffer", "@boost//:fiber", - "@com_google_absl//absl/cleanup:cleanup", "@com_google_absl//absl/container:btree", "@com_google_absl//absl/container:flat_hash_map", "@com_google_absl//absl/container:flat_hash_set", diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 440ce5579731e..14c7547d088fd 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -20,7 +20,6 @@ #include -#include "absl/cleanup/cleanup.h" #include "absl/strings/str_format.h" #include "boost/fiber/all.hpp" #include "ray/common/bundle_spec.h" @@ -121,12 +120,6 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ task_execution_service_work_(task_execution_service_), exiting_detail_(std::nullopt), pid_(getpid()) { - // Notify that core worker is initialized. - auto initialzed_scope_guard = absl::MakeCleanup([this] { - absl::MutexLock lock(&initialize_mutex_); - initialized_ = true; - intialize_cv_.SignalAll(); - }); RAY_LOG(DEBUG) << "Constructing CoreWorker, worker_id: " << worker_id; // Initialize task receivers. diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index e20255f8cdb72..be1584dd8824b 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1520,14 +1520,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { } } - /// Wait until the worker is initialized. - void WaitUntilInitialized() override { - absl::MutexLock lock(&initialize_mutex_); - while (!initialized_) { - intialize_cv_.WaitWithTimeout(&initialize_mutex_, absl::Seconds(1)); - } - } - const CoreWorkerOptions options_; /// Callback to get the current language (e.g., Python) call site. @@ -1556,11 +1548,6 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { std::string main_thread_task_name_ GUARDED_BY(mutex_); - /// States that used for initialization. - absl::Mutex initialize_mutex_; - absl::CondVar intialize_cv_; - bool initialized_ GUARDED_BY(initialize_mutex_) = false; - /// Event loop where the IO events are handled. e.g. async GCS operations. instrumented_io_context io_service_; diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index 82d078ed3ef91..b6f42b391acd2 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -64,16 +64,6 @@ enum class ServerCallState { class ServerCallFactory; -/// Represents a service handler that might not -/// be ready to serve RPCs immediately after construction. -class DelayedServiceHandler { - public: - virtual ~DelayedServiceHandler() = default; - - /// Blocks until the service is ready to serve RPCs. - virtual void WaitUntilInitialized() = 0; -}; - /// Represents an incoming request of a gRPC server. /// /// The lifecycle and state transition of a `ServerCall` is as follows: @@ -158,8 +148,6 @@ class ServerCallImpl : public ServerCall { /// \param[in] io_service The event loop. /// \param[in] call_name The name of the RPC call. /// \param[in] record_metrics If true, it records and exports the gRPC server metrics. - /// \param[in] preprocess_function If not nullptr, it will be called before handling - /// request. ServerCallImpl( const ServerCallFactory &factory, ServiceHandler &service_handler, @@ -167,8 +155,7 @@ class ServerCallImpl : public ServerCall { instrumented_io_context &io_service, std::string call_name, const ClusterID &cluster_id, - bool record_metrics, - std::function preprocess_function = nullptr) + bool record_metrics) : state_(ServerCallState::PENDING), factory_(factory), service_handler_(service_handler), @@ -209,9 +196,6 @@ class ServerCallImpl : public ServerCall { } void HandleRequestImpl() { - if constexpr (std::is_base_of_v) { - service_handler_.WaitUntilInitialized(); - } state_ = ServerCallState::PROCESSING; // NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in // a different thread, and will cause `this` to be deleted. diff --git a/src/ray/rpc/worker/core_worker_server.h b/src/ray/rpc/worker/core_worker_server.h index 74bb9436928d7..d6fc43dd2f9fe 100644 --- a/src/ray/rpc/worker/core_worker_server.h +++ b/src/ray/rpc/worker/core_worker_server.h @@ -86,11 +86,9 @@ namespace rpc { DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(NumPendingTasks) /// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`. -class CoreWorkerServiceHandler : public DelayedServiceHandler { +class CoreWorkerServiceHandler { public: - /// Blocks until the service is ready to serve RPCs. - virtual void WaitUntilInitialized() = 0; - + virtual ~CoreWorkerServiceHandler() {} /// Handlers. For all of the following handlers, the implementations can /// handle the request asynchronously. When handling is done, the /// `send_reply_callback` should be called. See