Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Only get single node info rather then all when needed #49727

Merged
merged 4 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ ray_cc_library(
"//src/ray/common:ray_config",
"//src/ray/common:ray_syncer",
"//src/ray/common:status",
"//src/ray/common:status_or",
"//src/ray/common:task_common",
"//src/ray/common:test_util",
"//src/ray/protobuf:gcs_cc_proto",
Expand Down Expand Up @@ -2043,9 +2044,9 @@ ray_cc_test(
"//:redis-server",
],
tags = [
"exclusive",
"no_tsan",
"team:core",
"exclusive",
],
deps = [
":gcs_client_lib",
Expand Down
2 changes: 1 addition & 1 deletion python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
const c_string &node_ip_address,
c_string *node_to_connect)
CRayStatus GetNode(
const c_string &node_id,
const c_string &node_id_str,
dayshah marked this conversation as resolved.
Show resolved Hide resolved
c_string *node_info)

cdef extern from * namespace "ray::gcs" nogil:
Expand Down
10 changes: 10 additions & 0 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,16 @@ Status NodeInfoAccessor::GetAllNoCache(int64_t timeout_ms,
return Status::OK();
}

StatusOr<std::vector<rpc::GcsNodeInfo>> NodeInfoAccessor::GetAllNoCacheWithFilter(
int64_t timeout_ms, rpc::GetAllNodeInfoRequest_Filters filter) {
dayshah marked this conversation as resolved.
Show resolved Hide resolved
rpc::GetAllNodeInfoRequest request;
*request.mutable_filters() = std::move(filter);
rpc::GetAllNodeInfoReply reply;
RAY_RETURN_NOT_OK(
client_impl_->GetGcsRpcClient().SyncGetAllNodeInfo(request, &reply, timeout_ms));
return VectorFromProtobuf(std::move(*reply.mutable_node_info_list()));
}

Status NodeInfoAccessor::CheckAlive(const std::vector<std::string> &raylet_addresses,
int64_t timeout_ms,
std::vector<bool> &nodes_alive) {
Expand Down
9 changes: 8 additions & 1 deletion src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "absl/types/optional.h"
#include "ray/common/id.h"
#include "ray/common/placement_group.h"
#include "ray/common/status_or.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/callback.h"
#include "ray/rpc/client_call.h"
Expand Down Expand Up @@ -414,9 +415,15 @@ class NodeInfoAccessor {

/// Get information of all nodes from an RPC to GCS synchronously.
///
/// \return All nodes in cache.
/// \return All nodes from gcs without cache.
virtual Status GetAllNoCache(int64_t timeout_ms, std::vector<rpc::GcsNodeInfo> &nodes);

/// Get information of all nodes from an RPC to GCS synchronously with filter.
dayshah marked this conversation as resolved.
Show resolved Hide resolved
///
/// \return All nodes that match the given filter from the gcs without the cache.
dayshah marked this conversation as resolved.
Show resolved Hide resolved
virtual StatusOr<std::vector<rpc::GcsNodeInfo>> GetAllNoCacheWithFilter(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

dayshah marked this conversation as resolved.
Show resolved Hide resolved
int64_t timeout_ms, rpc::GetAllNodeInfoRequest_Filters filter);
dayshah marked this conversation as resolved.
Show resolved Hide resolved

/// Send a check alive request to GCS for the liveness of some nodes.
///
/// \param raylet_addresses The addresses of the nodes to check, each like "ip:port".
Expand Down
200 changes: 83 additions & 117 deletions src/ray/gcs/gcs_client/global_state_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -396,69 +396,38 @@ std::string GlobalStateAccessor::GetSystemConfig() {
return future.get();
}

ray::Status GlobalStateAccessor::GetAliveNodes(std::vector<rpc::GcsNodeInfo> &nodes) {
std::promise<std::pair<Status, std::vector<rpc::GcsNodeInfo>>> promise;
{
absl::ReaderMutexLock lock(&mutex_);
RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAll(
[&promise](Status status, std::vector<rpc::GcsNodeInfo> &&nodes) {
promise.set_value(
std::pair<Status, std::vector<rpc::GcsNodeInfo>>(status, std::move(nodes)));
},
/*timeout_ms=*/-1));
}
auto result = promise.get_future().get();
auto status = result.first;
if (!status.ok()) {
return status;
}

std::copy_if(result.second.begin(),
result.second.end(),
std::back_inserter(nodes),
[](const rpc::GcsNodeInfo &node) {
return node.state() == rpc::GcsNodeInfo::ALIVE;
});
return status;
}

ray::Status GlobalStateAccessor::GetNode(const std::string &node_id,
ray::Status GlobalStateAccessor::GetNode(const std::string &node_id_str,
dayshah marked this conversation as resolved.
Show resolved Hide resolved
std::string *node_info) {
auto start_ms = current_time_ms();
auto node_id_binary = NodeID::FromHex(node_id).Binary();
auto end_time_point =
dayshah marked this conversation as resolved.
Show resolved Hide resolved
current_time_ms() + RayConfig::instance().raylet_start_wait_time_s() * 1000;
auto node_id = NodeID::FromHex(node_id_str);
dayshah marked this conversation as resolved.
Show resolved Hide resolved

StatusOr<std::vector<rpc::GcsNodeInfo>> node_infos_status{Status::NotFound("")};
while (true) {
std::vector<rpc::GcsNodeInfo> nodes;
auto status = GetAliveNodes(nodes);
if (!status.ok()) {
return status;
rpc::GetAllNodeInfoRequest_Filters filter;
dayshah marked this conversation as resolved.
Show resolved Hide resolved
filter.set_state(rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE);
filter.set_node_id(node_id.Binary());
{
absl::ReaderMutexLock lock(&mutex_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gcs_client_ is protected by mutex, this is how it's used throughout

auto timeout_ms = std::max(end_time_point - current_time_ms(), 0LL);
node_infos_status =
gcs_client_->Nodes().GetAllNoCacheWithFilter(timeout_ms, std::move(filter));
}
if (!node_infos_status.ok()) {
dayshah marked this conversation as resolved.
Show resolved Hide resolved
return node_infos_status.status();
}
if (node_infos_status->empty()) {
node_infos_status = Status::NotFound(
"GCS cannot find the node with node ID " + node_id_str +
". The node registration may not be complete yet before the timeout." +
" Try increase the RAY_raylet_start_wait_time_s config.");

if (nodes.empty()) {
status = Status::NotFound("GCS has started but no raylets have registered yet.");
} else {
int relevant_client_index = -1;
for (int i = 0; i < static_cast<int>(nodes.size()); i++) {
const auto &node = nodes[i];
if (node_id_binary == node.node_id()) {
relevant_client_index = i;
break;
}
}

if (relevant_client_index < 0) {
status = Status::NotFound(
"GCS cannot find the node with node ID " + node_id +
". The node registration may not be complete yet before the timeout." +
" Try increase the RAY_raylet_start_wait_time_s config.");
} else {
*node_info = nodes[relevant_client_index].SerializeAsString();
return Status::OK();
}
*node_info = (*node_infos_status)[0].SerializeAsString();
return Status::OK();
}

if (current_time_ms() - start_ms >=
RayConfig::instance().raylet_start_wait_time_s() * 1000) {
return status;
if (current_time_ms() >= end_time_point) {
return node_infos_status.status();
}
RAY_LOG(WARNING) << "Retrying to get node with node ID " << node_id;
// Some of the information may not be in GCS yet, so wait a little bit.
Expand All @@ -468,73 +437,70 @@ ray::Status GlobalStateAccessor::GetNode(const std::string &node_id,

ray::Status GlobalStateAccessor::GetNodeToConnectForDriver(
const std::string &node_ip_address, std::string *node_to_connect) {
auto start_ms = current_time_ms();
auto end_time_point =
dayshah marked this conversation as resolved.
Show resolved Hide resolved
current_time_ms() + RayConfig::instance().raylet_start_wait_time_s() * 1000;

StatusOr<std::vector<rpc::GcsNodeInfo>> node_infos_status{Status::NotFound("")};
rpc::GetAllNodeInfoRequest_Filters filter;
dayshah marked this conversation as resolved.
Show resolved Hide resolved
filter.set_state(rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE);
filter.set_node_ip_address(node_ip_address);
while (true) {
std::vector<rpc::GcsNodeInfo> nodes;
auto status = GetAliveNodes(nodes);
if (!status.ok()) {
return status;
{
absl::ReaderMutexLock lock(&mutex_);
auto timeout_ms = std::max(end_time_point - current_time_ms(), 0LL);
node_infos_status =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would RAY_ASSIGN_OR_RETURN work here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya this was super useful ty!

gcs_client_->Nodes().GetAllNoCacheWithFilter(timeout_ms, filter);
RAY_RETURN_NOT_OK(node_infos_status.status());
}
if (!node_infos_status->empty()) {
*node_to_connect = (*node_infos_status)[0].SerializeAsString();
return Status::OK();
}

if (nodes.empty()) {
status = Status::NotFound("GCS has started but no raylets have registered yet.");
} else {
int relevant_client_index = -1;
int head_node_client_index = -1;
std::pair<std::string, int> gcs_address;
std::string gcs_address;
{
absl::WriterMutexLock lock(&mutex_);
auto [address, _] = gcs_client_->GetGcsServerAddress();
gcs_address = std::move(address);
}
filter.set_node_ip_address(gcs_address);
{
absl::ReaderMutexLock lock(&mutex_);
auto timeout_ms = end_time_point - current_time_ms();
node_infos_status =
gcs_client_->Nodes().GetAllNoCacheWithFilter(timeout_ms, filter);
RAY_RETURN_NOT_OK(node_infos_status.status());
}
if (node_infos_status->empty() && node_ip_address == gcs_address) {
filter.set_node_ip_address("127.0.0.1");
{
absl::WriterMutexLock lock(&mutex_);
gcs_address = gcs_client_->GetGcsServerAddress();
}

for (int i = 0; i < static_cast<int>(nodes.size()); i++) {
const auto &node = nodes[i];
std::string ip_address = node.node_manager_address();
if (ip_address == node_ip_address) {
relevant_client_index = i;
break;
}
// TODO(kfstorm): Do we need to replace `node_ip_address` with
// `get_node_ip_address()`?
if ((ip_address == "127.0.0.1" && gcs_address.first == node_ip_address) ||
ip_address == gcs_address.first) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i feel like the error messages don't totally align with the error messages, here we'll just return a node_info as long it matches the gcs_address we fetched

head_node_client_index = i;
}
}

if (relevant_client_index < 0 && head_node_client_index >= 0) {
RAY_LOG(INFO) << "This node has an IP address of " << node_ip_address
<< ", but we cannot find a local Raylet with the same address. "
<< "This can happen when you connect to the Ray cluster "
<< "with a different IP address or when connecting to a container.";
relevant_client_index = head_node_client_index;
}
if (relevant_client_index < 0) {
std::ostringstream oss;
oss << "This node has an IP address of " << node_ip_address << ", and Ray "
<< "expects this IP address to be either the GCS address or one of"
<< " the Raylet addresses. Connected to GCS at " << gcs_address.first
<< " and found raylets at ";
for (size_t i = 0; i < nodes.size(); i++) {
if (i > 0) {
oss << ", ";
}
oss << nodes[i].node_manager_address();
}
oss << " but none of these match this node's IP " << node_ip_address
<< ". Are any of these actually a different IP address for the same node?"
<< "You might need to provide --node-ip-address to specify the IP "
<< "address that the head should use when sending to this node.";
status = Status::NotFound(oss.str());
} else {
*node_to_connect = nodes[relevant_client_index].SerializeAsString();
return Status::OK();
absl::ReaderMutexLock lock(&mutex_);
auto timeout_ms = std::max(end_time_point - current_time_ms(), 0LL);
node_infos_status =
gcs_client_->Nodes().GetAllNoCacheWithFilter(timeout_ms, filter);
RAY_RETURN_NOT_OK(node_infos_status.status());
}
}
if (!node_infos_status->empty()) {
RAY_LOG(INFO) << "This node has an IP address of " << node_ip_address
<< ", but we cannot find a local Raylet with the same address. "
<< "This can happen when you connect to the Ray cluster "
<< "with a different IP address or when connecting to a container.";
Comment on lines +482 to +485
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously we were still logging this if we don't get any node info that matches with the node_ip_address id and have to resort to using the gcs_address we fetched instead of the node_ip_address passed into the function

*node_to_connect = (*node_infos_status)[0].SerializeAsString();
return Status::OK();
}

if (current_time_ms() - start_ms >=
RayConfig::instance().raylet_start_wait_time_s() * 1000) {
return status;
std::ostringstream oss;
oss << "This node has an IP address of " << node_ip_address << ", and Ray "
<< "expects this IP address to be either the GCS address or one of"
<< " the Raylet addresses. Connected to GCS at " << gcs_address
<< ", and found no Raylet with this IP address. "
<< "You might need to provide --node-ip-address to specify the IP "
<< "address that the head should use when sending to this node.";
node_infos_status = Status::NotFound(oss.str());

if (current_time_ms() >= end_time_point) {
return node_infos_status.status();
}
RAY_LOG(WARNING) << "Some processes that the driver needs to connect to have "
"not registered with GCS, so retrying. Have you run "
Expand Down
8 changes: 1 addition & 7 deletions src/ray/gcs/gcs_client/global_state_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class GlobalStateAccessor {
/// \param[out] node_info The output parameter to store the node info. To support
/// multi-language, we serialize each GcsNodeInfo and return the serialized string.
/// Where used, it needs to be deserialized with protobuf function.
ray::Status GetNode(const std::string &node_id, std::string *node_info)
ray::Status GetNode(const std::string &node_id_str, std::string *node_info)
ABSL_LOCKS_EXCLUDED(mutex_);

/// Get the node to connect for a Ray driver.
Expand All @@ -223,12 +223,6 @@ class GlobalStateAccessor {
ABSL_LOCKS_EXCLUDED(mutex_);

private:
/// Synchronously get the current alive nodes from GCS Service.
///
/// \param[out] nodes The output parameter to store the alive nodes.
ray::Status GetAliveNodes(std::vector<rpc::GcsNodeInfo> &nodes)
ABSL_LOCKS_EXCLUDED(mutex_);

/// MultiItem transformation helper in template style.
///
/// \return MultiItemCallback within in rpc type DATA.
Expand Down
45 changes: 29 additions & 16 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,19 +221,30 @@ void GcsNodeManager::HandleGetAllNodeInfo(rpc::GetAllNodeInfoRequest request,
rpc::SendReplyCallback send_reply_callback) {
int64_t limit =
(request.limit() > 0) ? request.limit() : std::numeric_limits<int64_t>::max();
NodeID filter_node_id = request.filters().has_node_id()
? NodeID::FromBinary(request.filters().node_id())
: NodeID::Nil();
std::optional<rpc::GcsNodeInfo::GcsNodeState> filter_state = std::nullopt;
if (request.filters().has_state()) {
filter_state = request.filters().state();
}
std::string filter_node_name = request.filters().node_name();
auto filter_fn = [&filter_node_id, &filter_node_name](const rpc::GcsNodeInfo &node) {
if (!filter_node_id.IsNil() && filter_node_id != NodeID::FromBinary(node.node_id())) {
std::optional<NodeID> filter_node_id =
request.filters().has_node_id()
? std::make_optional(NodeID::FromBinary(request.filters().node_id()))
: std::nullopt;
std::optional<std::string> filter_node_name =
request.filters().has_node_name()
? std::make_optional(request.filters().node_name())
: std::nullopt;
std::optional<std::string> filter_node_ip_address =
request.filters().has_node_ip_address()
? std::make_optional(request.filters().node_ip_address())
: std::nullopt;
std::optional<std::string> filter_node_;
dayshah marked this conversation as resolved.
Show resolved Hide resolved
auto filter_fn = [&filter_node_id, &filter_node_name, &filter_node_ip_address](
const rpc::GcsNodeInfo &node) {
if (filter_node_id.has_value() &&
*filter_node_id != NodeID::FromBinary(node.node_id())) {
return false;
}
if (filter_node_name.has_value() && *filter_node_name != node.node_name()) {
return false;
}
if (!filter_node_name.empty() && filter_node_name != node.node_name()) {
if (filter_node_ip_address.has_value() &&
*filter_node_ip_address != node.node_manager_address()) {
return false;
}
return true;
Expand All @@ -243,18 +254,21 @@ void GcsNodeManager::HandleGetAllNodeInfo(rpc::GetAllNodeInfoRequest request,
auto add_to_response =
[limit, reply, filter_fn, &num_added, &num_filtered](
const absl::flat_hash_map<NodeID, std::shared_ptr<rpc::GcsNodeInfo>> &nodes) {
for (const auto &entry : nodes) {
for (const auto &[node_id, node_info_ptr] : nodes) {
if (num_added >= limit) {
break;
}
if (filter_fn(*entry.second)) {
*reply->add_node_info_list() = *entry.second;
if (filter_fn(*node_info_ptr)) {
*reply->add_node_info_list() = *node_info_ptr;
num_added += 1;
} else {
num_filtered += 1;
}
}
};
std::optional<rpc::GcsNodeInfo::GcsNodeState> filter_state =
request.filters().has_state() ? std::make_optional(request.filters().state())
: std::nullopt;
if (filter_state == std::nullopt) {
add_to_response(alive_nodes_);
add_to_response(dead_nodes_);
Expand All @@ -271,8 +285,7 @@ void GcsNodeManager::HandleGetAllNodeInfo(rpc::GetAllNodeInfoRequest request,
++counts_[CountType::GET_ALL_NODE_INFO_REQUEST];
return;
}
size_t total = alive_nodes_.size() + dead_nodes_.size();
reply->set_total(total);
reply->set_total(alive_nodes_.size() + dead_nodes_.size());
reply->set_num_filtered(num_filtered);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::GET_ALL_NODE_INFO_REQUEST];
Expand Down
Loading
Loading