Skip to content

Commit

Permalink
Get rid of shared_ptr for GcsNodeManager (ray-project#36738)
Browse files Browse the repository at this point in the history
Looks like GcsNodeManager lives in the monitor server and GCS server, but monitor server lives in GCS server as well. Make it a unique_ptr so ownership is clear

Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
  • Loading branch information
vitsai authored and arvind-chandra committed Aug 31, 2023
1 parent b86149d commit 6dedfa8
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 14 deletions.
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_monitor_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void GcsPlacementGroupToResourceRequest(const GcsPlacementGroup &gcs_placement_g
}

GcsMonitorServer::GcsMonitorServer(
std::shared_ptr<GcsNodeManager> gcs_node_manager,
GcsNodeManager &gcs_node_manager,
ClusterResourceManager &cluster_resource_manager,
std::shared_ptr<GcsResourceManager> gcs_resource_manager,
std::shared_ptr<GcsPlacementGroupManager> gcs_placement_group_manager)
Expand All @@ -76,15 +76,15 @@ void GcsMonitorServer::HandleDrainAndKillNode(
rpc::SendReplyCallback send_reply_callback) {
for (const auto &node_id_bytes : request.node_ids()) {
const auto node_id = NodeID::FromBinary(node_id_bytes);
gcs_node_manager_->DrainNode(node_id);
gcs_node_manager_.DrainNode(node_id);
*reply->add_drained_nodes() = node_id_bytes;
}
send_reply_callback(Status::OK(), nullptr, nullptr);
}

void GcsMonitorServer::PopulateNodeStatuses(rpc::GetSchedulingStatusReply *reply) const {
const auto &scheduling_nodes = cluster_resource_manager_.GetResourceView();
const auto &gcs_node_manager_nodes = gcs_node_manager_->GetAllAliveNodes();
const auto &gcs_node_manager_nodes = gcs_node_manager_.GetAllAliveNodes();

for (const auto &pair : gcs_node_manager_nodes) {
const auto &node_id = pair.first;
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_monitor_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void GcsPlacementGroupToResourceRequest(const GcsPlacementGroup &gcs_placement_g
class GcsMonitorServer : public rpc::MonitorServiceHandler {
public:
explicit GcsMonitorServer(
std::shared_ptr<GcsNodeManager> gcs_node_manager,
GcsNodeManager &gcs_node_manager,
ClusterResourceManager &cluster_resource_manager,
std::shared_ptr<GcsResourceManager> gcs_resource_manager,
std::shared_ptr<GcsPlacementGroupManager> gcs_placement_group_manager);
Expand All @@ -53,7 +53,7 @@ class GcsMonitorServer : public rpc::MonitorServiceHandler {
void PopulateResourceDemands(rpc::GetSchedulingStatusReply *reply) const;
void PopulatePlacementGroupDemands(rpc::GetSchedulingStatusReply *reply) const;

std::shared_ptr<GcsNodeManager> gcs_node_manager_;
GcsNodeManager &gcs_node_manager_;
ClusterResourceManager &cluster_resource_manager_;
std::shared_ptr<GcsResourceManager> gcs_resource_manager_;
std::shared_ptr<GcsPlacementGroupManager> gcs_placement_group_manager_;
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void GcsServer::Stop() {

void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_publisher_);
gcs_node_manager_ = std::make_shared<GcsNodeManager>(
gcs_node_manager_ = std::make_unique<GcsNodeManager>(
gcs_publisher_, gcs_table_storage_, raylet_client_pool_);
// Initialize by gcs tables data.
gcs_node_manager_->Initialize(gcs_init_data);
Expand Down Expand Up @@ -630,7 +630,7 @@ void GcsServer::InitGcsTaskManager() {

void GcsServer::InitMonitorServer() {
monitor_server_ = std::make_unique<GcsMonitorServer>(
gcs_node_manager_,
*gcs_node_manager_,
cluster_resource_scheduler_->GetClusterResourceManager(),
gcs_resource_manager_,
gcs_placement_group_manager_);
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class GcsServer {
/// The autoscaler state manager.
std::unique_ptr<GcsAutoscalerStateManager> gcs_autoscaler_state_manager_;
/// The gcs node manager.
std::shared_ptr<GcsNodeManager> gcs_node_manager_;
std::unique_ptr<GcsNodeManager> gcs_node_manager_;
/// The health check manager.
std::shared_ptr<GcsHealthCheckManager> gcs_healthcheck_manager_;
/// The gcs redis failure detector.
Expand Down
12 changes: 6 additions & 6 deletions src/ray/gcs/gcs_server/test/gcs_monitor_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ std::shared_ptr<gcs::GcsPlacementGroup> ConstructPlacementGroupDemand(
class GcsMonitorServerTest : public ::testing::Test {
public:
GcsMonitorServerTest()
: mock_node_manager_(std::make_shared<gcs::MockGcsNodeManager>()),
: mock_node_manager_(gcs::MockGcsNodeManager()),
cluster_resource_manager_(io_context_),
mock_resource_manager_(
std::make_shared<gcs::MockGcsResourceManager>(cluster_resource_manager_)),
Expand All @@ -88,13 +88,13 @@ class GcsMonitorServerTest : public ::testing::Test {
}

absl::flat_hash_map<NodeID, std::shared_ptr<rpc::GcsNodeInfo>> &AliveNodes() {
return mock_node_manager_->alive_nodes_;
return mock_node_manager_.alive_nodes_;
}

absl::btree_multimap<
int64_t,
std::pair<ExponentialBackOff, std::shared_ptr<gcs::GcsPlacementGroup>>>
&PendingPlacementGroups() {
std::pair<ExponentialBackOff, std::shared_ptr<gcs::GcsPlacementGroup>>> &
PendingPlacementGroups() {
return mock_placement_group_manager_->pending_placement_groups_;
}

Expand All @@ -104,7 +104,7 @@ class GcsMonitorServerTest : public ::testing::Test {

protected:
instrumented_io_context io_context_;
std::shared_ptr<gcs::MockGcsNodeManager> mock_node_manager_;
gcs::MockGcsNodeManager mock_node_manager_;
ClusterResourceManager cluster_resource_manager_;
std::shared_ptr<gcs::MockGcsResourceManager> mock_resource_manager_;
std::shared_ptr<gcs::MockGcsPlacementGroupManager> mock_placement_group_manager_;
Expand Down Expand Up @@ -136,7 +136,7 @@ TEST_F(GcsMonitorServerTest, TestDrainAndKillNode) {
*request.add_node_ids() = NodeID::FromRandom().Binary();
*request.add_node_ids() = NodeID::FromRandom().Binary();

EXPECT_CALL(*mock_node_manager_, DrainNode(_)).Times(Exactly(2));
EXPECT_CALL(mock_node_manager_, DrainNode(_)).Times(Exactly(2));
monitor_server_.HandleDrainAndKillNode(request, &reply, send_reply_callback);

ASSERT_EQ(reply.drained_nodes().size(), 2);
Expand Down

0 comments on commit 6dedfa8

Please sign in to comment.