Skip to content

Commit

Permalink
Peel back everything that's not grpc-layer changes
Browse files Browse the repository at this point in the history
Signed-off-by: vitsai <victoria@anyscale.com>
  • Loading branch information
vitsai committed Jun 17, 2023
1 parent 3ef76cf commit 41d8dbd
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/ray/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ constexpr int kRayletStoreErrorExitCode = 100;
/// Prefix for the object table keys in redis.
constexpr char kObjectTablePrefix[] = "ObjectTable";

constexpr char kClusterIdKey[] = "ray_cluster_id";

constexpr char kWorkerDynamicOptionPlaceholder[] =
"RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER";

Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
config.grpc_server_thread_num,
/*keepalive_time_ms=*/RayConfig::instance().grpc_keepalive_time_ms()),
client_call_manager_(main_service,
ClusterID::Nil(),
RayConfig::instance().gcs_server_rpc_client_thread_num()),
raylet_client_pool_(
std::make_shared<rpc::NodeManagerClientPool>(client_call_manager_)),
Expand Down
3 changes: 2 additions & 1 deletion src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ ObjectManager::ObjectManager(
config_.object_manager_address == "127.0.0.1",
config_.rpc_service_threads_number),
object_manager_service_(rpc_service_, *this),
client_call_manager_(main_service, config_.rpc_service_threads_number),
client_call_manager_(
main_service, ClusterID::Nil(), config_.rpc_service_threads_number),
restore_spilled_object_(restore_spilled_object),
get_spilled_object_url_(get_spilled_object_url),
pull_retry_timer_(*main_service_,
Expand Down
24 changes: 22 additions & 2 deletions src/ray/rpc/client_call.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

#include <grpcpp/grpcpp.h>

#include <atomic>
#include <boost/asio.hpp>
#include <chrono>

#include "absl/synchronization/mutex.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/grpc_util.h"
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/util/util.h"

Expand Down Expand Up @@ -67,6 +69,7 @@ class ClientCallImpl : public ClientCall {
///
/// \param[in] callback The callback function to handle the reply.
explicit ClientCallImpl(const ClientCallback<Reply> &callback,
const ClusterID &cluster_id,
std::shared_ptr<StatsHandle> stats_handle,
int64_t timeout_ms = -1)
: callback_(std::move(const_cast<ClientCallback<Reply> &>(callback))),
Expand All @@ -76,6 +79,9 @@ class ClientCallImpl : public ClientCall {
std::chrono::system_clock::now() + std::chrono::milliseconds(timeout_ms);
context_.set_deadline(deadline);
}
if (!cluster_id.IsNil()) {
context_.AddMetadata(kClusterIdKey, cluster_id.Hex());
}
}

Status GetStatus() override {
Expand Down Expand Up @@ -185,9 +191,11 @@ class ClientCallManager {
/// \param[in] main_service The main event loop, to which the callback functions will be
/// posted.
explicit ClientCallManager(instrumented_io_context &main_service,
const ClusterID &cluster_id = ClusterID::Nil(),
int num_threads = 1,
int64_t call_timeout_ms = -1)
: main_service_(main_service),
: cluster_id_(cluster_id),
main_service_(main_service),
num_threads_(num_threads),
shutdown_(false),
call_timeout_ms_(call_timeout_ms) {
Expand Down Expand Up @@ -239,8 +247,9 @@ class ClientCallManager {
if (method_timeout_ms == -1) {
method_timeout_ms = call_timeout_ms_;
}

auto call = std::make_shared<ClientCallImpl<Reply>>(
callback, std::move(stats_handle), method_timeout_ms);
callback, cluster_id_.load(), std::move(stats_handle), method_timeout_ms);
// Send request.
// Find the next completion queue to wait for response.
call->response_reader_ = (stub.*prepare_async_function)(
Expand All @@ -258,6 +267,14 @@ class ClientCallManager {
return call;
}

void SetClusterId(const ClusterID &cluster_id) {
auto old_id = cluster_id_.exchange(ClusterID::Nil());
if (!old_id.IsNil() && (old_id != cluster_id)) {
RAY_LOG(FATAL) << "Expected cluster ID to be Nil or " << cluster_id << ", but got"
<< old_id;
}
}

/// Get the main service of this rpc.
instrumented_io_context &GetMainService() { return main_service_; }

Expand Down Expand Up @@ -309,6 +326,9 @@ class ClientCallManager {
}
}

/// UUID of the cluster.
std::atomic<ClusterID> cluster_id_;

/// The main event loop, to which the callback functions will be posted.
instrumented_io_context &main_service_;

Expand Down
5 changes: 4 additions & 1 deletion src/ray/rpc/test/grpc_server_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ TEST_F(TestGrpcServerClientFixture, TestClientCallManagerTimeout) {
grpc_client_.reset();
client_call_manager_.reset();
client_call_manager_.reset(new ClientCallManager(client_io_service_,
ClusterID::Nil(),
/*num_thread=*/1,
/*call_timeout_ms=*/100));
grpc_client_.reset(new GrpcClient<TestService>(
Expand Down Expand Up @@ -244,6 +245,7 @@ TEST_F(TestGrpcServerClientFixture, TestClientDiedBeforeReply) {
grpc_client_.reset();
client_call_manager_.reset();
client_call_manager_.reset(new ClientCallManager(client_io_service_,
ClusterID::Nil(),
/*num_thread=*/1,
/*call_timeout_ms=*/100));
grpc_client_.reset(new GrpcClient<TestService>(
Expand Down Expand Up @@ -273,7 +275,8 @@ TEST_F(TestGrpcServerClientFixture, TestClientDiedBeforeReply) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
// Reinit client with infinite timeout.
client_call_manager_.reset(new ClientCallManager(client_io_service_));
client_call_manager_.reset(
new ClientCallManager(client_io_service_, ClusterID::FromRandom()));
grpc_client_.reset(new GrpcClient<TestService>(
"127.0.0.1", grpc_server_->GetPort(), *client_call_manager_));
// Send again, this request should be replied. If any leaking happened, this call won't
Expand Down

0 comments on commit 41d8dbd

Please sign in to comment.