Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Fix the race condition where grpc requests are handled while core worker not yet initialized #37117

Merged
merged 11 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,7 @@ cc_library(
"//src/ray/protobuf:worker_cc_proto",
"@boost//:circular_buffer",
"@boost//:fiber",
"@com_google_absl//absl/cleanup:cleanup",
"@com_google_absl//absl/container:btree",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
Expand Down
7 changes: 7 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <google/protobuf/util/json_util.h>

#include "absl/cleanup/cleanup.h"
#include "absl/strings/str_format.h"
#include "boost/fiber/all.hpp"
#include "ray/common/bundle_spec.h"
Expand Down Expand Up @@ -120,6 +121,12 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
task_execution_service_work_(task_execution_service_),
exiting_detail_(std::nullopt),
pid_(getpid()) {
// Notify that core worker is initialized.
auto initialzed_scope_guard = absl::MakeCleanup([this] {
absl::MutexLock lock(&initialize_mutex_);
initialized_ = true;
intialize_cv_.SignalAll();
});
RAY_LOG(DEBUG) << "Constructing CoreWorker, worker_id: " << worker_id;

// Initialize task receivers.
Expand Down
13 changes: 13 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,14 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
}
}

/// Wait until the worker is initialized.
void WaitUntilInitialized() override {
absl::MutexLock lock(&initialize_mutex_);
while (!initialized_) {
intialize_cv_.WaitWithTimeout(&initialize_mutex_, absl::Seconds(1));
}
}

const CoreWorkerOptions options_;

/// Callback to get the current language (e.g., Python) call site.
Expand Down Expand Up @@ -1548,6 +1556,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {

std::string main_thread_task_name_ GUARDED_BY(mutex_);

/// States that used for initialization.
absl::Mutex initialize_mutex_;
absl::CondVar intialize_cv_;
bool initialized_ GUARDED_BY(initialize_mutex_) = false;

/// Event loop where the IO events are handled. e.g. async GCS operations.
instrumented_io_context io_service_;

Expand Down
18 changes: 17 additions & 1 deletion src/ray/rpc/server_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ enum class ServerCallState {

class ServerCallFactory;

/// Represents a service handler that might not
/// be ready to serve RPCs immediately after construction.
class DelayedServiceHandler {
public:
virtual ~DelayedServiceHandler() = default;

/// Blocks until the service is ready to serve RPCs.
virtual void WaitUntilInitialized() = 0;
};

/// Represents an incoming request of a gRPC server.
///
/// The lifecycle and state transition of a `ServerCall` is as follows:
Expand Down Expand Up @@ -148,14 +158,17 @@ class ServerCallImpl : public ServerCall {
/// \param[in] io_service The event loop.
/// \param[in] call_name The name of the RPC call.
/// \param[in] record_metrics If true, it records and exports the gRPC server metrics.
/// \param[in] preprocess_function If not nullptr, it will be called before handling
/// request.
ServerCallImpl(
const ServerCallFactory &factory,
ServiceHandler &service_handler,
HandleRequestFunction<ServiceHandler, Request, Reply> handle_request_function,
instrumented_io_context &io_service,
std::string call_name,
const ClusterID &cluster_id,
bool record_metrics)
bool record_metrics,
std::function<void()> preprocess_function = nullptr)
: state_(ServerCallState::PENDING),
factory_(factory),
service_handler_(service_handler),
Expand Down Expand Up @@ -196,6 +209,9 @@ class ServerCallImpl : public ServerCall {
}

void HandleRequestImpl() {
if constexpr (std::is_base_of_v<DelayedServiceHandler, ServiceHandler>) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++ is amazing

service_handler_.WaitUntilInitialized();
}
state_ = ServerCallState::PROCESSING;
// NOTE(hchen): This `factory` local variable is needed. Because `SendReply` runs in
// a different thread, and will cause `this` to be deleted.
Expand Down
6 changes: 4 additions & 2 deletions src/ray/rpc/worker/core_worker_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ namespace rpc {
DECLARE_VOID_RPC_SERVICE_HANDLER_METHOD(NumPendingTasks)

/// Interface of the `CoreWorkerServiceHandler`, see `src/ray/protobuf/core_worker.proto`.
class CoreWorkerServiceHandler {
class CoreWorkerServiceHandler : public DelayedServiceHandler {
public:
virtual ~CoreWorkerServiceHandler() {}
/// Blocks until the service is ready to serve RPCs.
virtual void WaitUntilInitialized() = 0;

/// Handlers. For all of the following handlers, the implementations can
/// handle the request asynchronously. When handling is done, the
/// `send_reply_callback` should be called. See
Expand Down