diff --git a/src/ray/common/constants.h b/src/ray/common/constants.h index 3b4c8e9282bd..4667d8841d82 100644 --- a/src/ray/common/constants.h +++ b/src/ray/common/constants.h @@ -39,6 +39,8 @@ constexpr int kRayletStoreErrorExitCode = 100; /// Prefix for the object table keys in redis. constexpr char kObjectTablePrefix[] = "ObjectTable"; +constexpr char kClusterIdKey[] = "ray_cluster_id"; + constexpr char kWorkerDynamicOptionPlaceholder[] = "RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER"; diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index b4848252369b..67f5eeb17f1b 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -61,6 +61,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, config.grpc_server_thread_num, /*keepalive_time_ms=*/RayConfig::instance().grpc_keepalive_time_ms()), client_call_manager_(main_service, + ClusterID::Nil(), RayConfig::instance().gcs_server_rpc_client_thread_num()), raylet_client_pool_( std::make_shared(client_call_manager_)), diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 18d3e76eb297..cf054aecfe42 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -102,7 +102,8 @@ ObjectManager::ObjectManager( config_.object_manager_address == "127.0.0.1", config_.rpc_service_threads_number), object_manager_service_(rpc_service_, *this), - client_call_manager_(main_service, config_.rpc_service_threads_number), + client_call_manager_( + main_service, ClusterID::Nil(), config_.rpc_service_threads_number), restore_spilled_object_(restore_spilled_object), get_spilled_object_url_(get_spilled_object_url), pull_retry_timer_(*main_service_, diff --git a/src/ray/rpc/client_call.h b/src/ray/rpc/client_call.h index d39d4373b83a..60d6f90321a1 100644 --- a/src/ray/rpc/client_call.h +++ b/src/ray/rpc/client_call.h @@ -16,12 +16,14 @@ #include +#include #include #include #include "absl/synchronization/mutex.h" #include "ray/common/asio/instrumented_io_context.h" #include "ray/common/grpc_util.h" +#include "ray/common/id.h" #include "ray/common/status.h" #include "ray/util/util.h" @@ -67,6 +69,7 @@ class ClientCallImpl : public ClientCall { /// /// \param[in] callback The callback function to handle the reply. explicit ClientCallImpl(const ClientCallback &callback, + const ClusterID &cluster_id, std::shared_ptr stats_handle, int64_t timeout_ms = -1) : callback_(std::move(const_cast &>(callback))), @@ -76,6 +79,9 @@ class ClientCallImpl : public ClientCall { std::chrono::system_clock::now() + std::chrono::milliseconds(timeout_ms); context_.set_deadline(deadline); } + if (!cluster_id.IsNil()) { + context_.AddMetadata(kClusterIdKey, cluster_id.Hex()); + } } Status GetStatus() override { @@ -185,9 +191,11 @@ class ClientCallManager { /// \param[in] main_service The main event loop, to which the callback functions will be /// posted. explicit ClientCallManager(instrumented_io_context &main_service, + const ClusterID &cluster_id = ClusterID::Nil(), int num_threads = 1, int64_t call_timeout_ms = -1) - : main_service_(main_service), + : cluster_id_(cluster_id), + main_service_(main_service), num_threads_(num_threads), shutdown_(false), call_timeout_ms_(call_timeout_ms) { @@ -239,8 +247,9 @@ class ClientCallManager { if (method_timeout_ms == -1) { method_timeout_ms = call_timeout_ms_; } + auto call = std::make_shared>( - callback, std::move(stats_handle), method_timeout_ms); + callback, cluster_id_.load(), std::move(stats_handle), method_timeout_ms); // Send request. // Find the next completion queue to wait for response. call->response_reader_ = (stub.*prepare_async_function)( @@ -258,6 +267,14 @@ class ClientCallManager { return call; } + void SetClusterId(const ClusterID &cluster_id) { + auto old_id = cluster_id_.exchange(ClusterID::Nil()); + if (!old_id.IsNil() && (old_id != cluster_id)) { + RAY_LOG(FATAL) << "Expected cluster ID to be Nil or " << cluster_id << ", but got" + << old_id; + } + } + /// Get the main service of this rpc. instrumented_io_context &GetMainService() { return main_service_; } @@ -309,6 +326,9 @@ class ClientCallManager { } } + /// UUID of the cluster. + std::atomic cluster_id_; + /// The main event loop, to which the callback functions will be posted. instrumented_io_context &main_service_; diff --git a/src/ray/rpc/test/grpc_server_client_test.cc b/src/ray/rpc/test/grpc_server_client_test.cc index e1e8e9d6c0f5..5670725437da 100644 --- a/src/ray/rpc/test/grpc_server_client_test.cc +++ b/src/ray/rpc/test/grpc_server_client_test.cc @@ -211,6 +211,7 @@ TEST_F(TestGrpcServerClientFixture, TestClientCallManagerTimeout) { grpc_client_.reset(); client_call_manager_.reset(); client_call_manager_.reset(new ClientCallManager(client_io_service_, + ClusterID::Nil(), /*num_thread=*/1, /*call_timeout_ms=*/100)); grpc_client_.reset(new GrpcClient( @@ -244,6 +245,7 @@ TEST_F(TestGrpcServerClientFixture, TestClientDiedBeforeReply) { grpc_client_.reset(); client_call_manager_.reset(); client_call_manager_.reset(new ClientCallManager(client_io_service_, + ClusterID::Nil(), /*num_thread=*/1, /*call_timeout_ms=*/100)); grpc_client_.reset(new GrpcClient( @@ -273,7 +275,8 @@ TEST_F(TestGrpcServerClientFixture, TestClientDiedBeforeReply) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } // Reinit client with infinite timeout. - client_call_manager_.reset(new ClientCallManager(client_io_service_)); + client_call_manager_.reset( + new ClientCallManager(client_io_service_, ClusterID::FromRandom())); grpc_client_.reset(new GrpcClient( "127.0.0.1", grpc_server_->GetPort(), *client_call_manager_)); // Send again, this request should be replied. If any leaking happened, this call won't