Skip to content

Commit

Permalink
Revert "Revert "[Core] Fix the race condition where grpc requests are…
Browse files Browse the repository at this point in the history
… handled while c… (#37301)" (#37342)"

This reverts commit 913fea3.
  • Loading branch information
can-anyscale authored Jul 15, 2023
1 parent d340728 commit 465b577
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 2 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ cc_library(
"//src/ray/protobuf:worker_cc_proto",
"@boost//:circular_buffer",
"@boost//:fiber",
"@com_google_absl//absl/cleanup:cleanup",
"@com_google_absl//absl/container:btree",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
Expand Down
7 changes: 7 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <google/protobuf/util/json_util.h>

#include "absl/cleanup/cleanup.h"
#include "absl/strings/str_format.h"
#include "boost/fiber/all.hpp"
#include "ray/common/bundle_spec.h"
Expand Down Expand Up @@ -119,6 +120,12 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
grpc_service_(io_service_, *this),
task_execution_service_work_(task_execution_service_),
exiting_detail_(std::nullopt) {
// Notify that core worker is initialized.
auto initialzed_scope_guard = absl::MakeCleanup([this] {
absl::MutexLock lock(&initialize_mutex_);
initialized_ = true;
intialize_cv_.SignalAll();
});
RAY_LOG(DEBUG) << "Constructing CoreWorker, worker_id: " << worker_id;

// Initialize task receivers.
Expand Down
13 changes: 13 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,14 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
}
}

/// Wait until the worker is initialized.
void WaitUntilInitialized() override {
absl::MutexLock lock(&initialize_mutex_);
while (!initialized_) {
intialize_cv_.WaitWithTimeout(&initialize_mutex_, absl::Seconds(1));
}
}

const CoreWorkerOptions options_;

/// Callback to get the current language (e.g., Python) call site.
Expand Down Expand Up @@ -1548,6 +1556,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {

std::string main_thread_task_name_ GUARDED_BY(mutex_);

/// States that used for initialization.
absl::Mutex initialize_mutex_;
absl::CondVar intialize_cv_;
bool initialized_ GUARDED_BY(initialize_mutex_) = false;

/// Event loop where the IO events are handled. e.g. async GCS operations.
instrumented_io_context io_service_;

Expand Down
14 changes: 14 additions & 0 deletions src/ray/rpc/server_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ enum class ServerCallState {

class ServerCallFactory;

/// Represents a service handler that might not
/// be ready to serve RPCs immediately after construction.
class DelayedServiceHandler {
public:
virtual ~DelayedServiceHandler() = default;

/// Blocks until the service is ready to serve RPCs.
virtual void WaitUntilInitialized() = 0;
};

/// Represents an incoming request of a gRPC server.
///
/// The lifecycle and state transition of a `ServerCall` is as follows:
Expand Down Expand Up @@ -148,6 +158,7 @@ class ServerCallImpl : public ServerCall {
/// \param[in] io_service The event loop.
/// \param[in] call_name The name of the RPC call.
/// \param[in] record_metrics If true, it records and exports the gRPC server metrics.
/// request.
ServerCallImpl(
const ServerCallFactory &factory,
ServiceHandler &service_handler,
Expand Down Expand Up @@ -196,6 +207,9 @@ class ServerCallImpl : public ServerCall {
}

void HandleRequestImpl() {
if constexpr (std::is_base_of_v<DelayedServiceHandler, ServiceHandler>) {
service_handler_.WaitUntilInitialized();
}
state_ = ServerCallState::PROCESSING;
// NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in
// a different thread, and will cause `this` to be deleted.
Expand Down
6 changes: 4 additions & 2 deletions src/ray/rpc/worker/core_worker_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ namespace rpc {
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(NumPendingTasks)

/// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`.
class CoreWorkerServiceHandler {
class CoreWorkerServiceHandler : public DelayedServiceHandler {
public:
virtual ~CoreWorkerServiceHandler() {}
/// Blocks until the service is ready to serve RPCs.
virtual void WaitUntilInitialized() = 0;

/// Handlers. For all of the following handlers, the implementations can
/// handle the request asynchronously. When handling is done, the
/// `send_reply_callback` should be called. See
Expand Down

0 comments on commit 465b577

Please sign in to comment.