Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Only get single node info rather then all when needed #49727

Merged
merged 4 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ ray_cc_library(
"//src/ray/common:ray_config",
"//src/ray/common:ray_syncer",
"//src/ray/common:status",
"//src/ray/common:status_or",
"//src/ray/common:task_common",
"//src/ray/common:test_util",
"//src/ray/protobuf:gcs_cc_proto",
Expand Down Expand Up @@ -2043,9 +2044,9 @@ ray_cc_test(
"//:redis-server",
],
tags = [
"exclusive",
"no_tsan",
"team:core",
"exclusive",
],
deps = [
":gcs_client_lib",
Expand Down
2 changes: 1 addition & 1 deletion python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
const c_string &node_ip_address,
c_string *node_to_connect)
CRayStatus GetNode(
const c_string &node_id,
const c_string &node_id_hex_str,
c_string *node_info)

cdef extern from * namespace "ray::gcs" nogil:
Expand Down
10 changes: 10 additions & 0 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,16 @@ Status NodeInfoAccessor::GetAllNoCache(int64_t timeout_ms,
return Status::OK();
}

StatusOr<std::vector<rpc::GcsNodeInfo>> NodeInfoAccessor::GetAllNoCacheWithFilters(
int64_t timeout_ms, rpc::GetAllNodeInfoRequest_Filters filters) {
rpc::GetAllNodeInfoRequest request;
*request.mutable_filters() = std::move(filters);
rpc::GetAllNodeInfoReply reply;
RAY_RETURN_NOT_OK(
client_impl_->GetGcsRpcClient().SyncGetAllNodeInfo(request, &reply, timeout_ms));
return VectorFromProtobuf(std::move(*reply.mutable_node_info_list()));
}

Status NodeInfoAccessor::CheckAlive(const std::vector<std::string> &raylet_addresses,
int64_t timeout_ms,
std::vector<bool> &nodes_alive) {
Expand Down
9 changes: 8 additions & 1 deletion src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "absl/types/optional.h"
#include "ray/common/id.h"
#include "ray/common/placement_group.h"
#include "ray/common/status_or.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/callback.h"
#include "ray/rpc/client_call.h"
Expand Down Expand Up @@ -414,9 +415,15 @@ class NodeInfoAccessor {

/// Get information of all nodes from an RPC to GCS synchronously.
///
/// \return All nodes in cache.
/// \return All nodes from gcs without cache.
virtual Status GetAllNoCache(int64_t timeout_ms, std::vector<rpc::GcsNodeInfo> &nodes);

/// Get information of all nodes from an RPC to GCS synchronously with filters.
///
/// \return All nodes that match the given filters from the gcs without the cache.
virtual StatusOr<std::vector<rpc::GcsNodeInfo>> GetAllNoCacheWithFilters(
int64_t timeout_ms, rpc::GetAllNodeInfoRequest_Filters filters);

/// Send a check alive request to GCS for the liveness of some nodes.
///
/// \param raylet_addresses The addresses of the nodes to check, each like "ip:port".
Expand Down
199 changes: 80 additions & 119 deletions src/ray/gcs/gcs_client/global_state_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -396,145 +396,106 @@ std::string GlobalStateAccessor::GetSystemConfig() {
return future.get();
}

ray::Status GlobalStateAccessor::GetAliveNodes(std::vector<rpc::GcsNodeInfo> &nodes) {
std::promise<std::pair<Status, std::vector<rpc::GcsNodeInfo>>> promise;
{
absl::ReaderMutexLock lock(&mutex_);
RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetAll(
[&promise](Status status, std::vector<rpc::GcsNodeInfo> &&nodes) {
promise.set_value(
std::pair<Status, std::vector<rpc::GcsNodeInfo>>(status, std::move(nodes)));
},
/*timeout_ms=*/-1));
}
auto result = promise.get_future().get();
auto status = result.first;
if (!status.ok()) {
return status;
}

std::copy_if(result.second.begin(),
result.second.end(),
std::back_inserter(nodes),
[](const rpc::GcsNodeInfo &node) {
return node.state() == rpc::GcsNodeInfo::ALIVE;
});
return status;
}

ray::Status GlobalStateAccessor::GetNode(const std::string &node_id,
ray::Status GlobalStateAccessor::GetNode(const std::string &node_id_hex_str,
std::string *node_info) {
auto start_ms = current_time_ms();
auto node_id_binary = NodeID::FromHex(node_id).Binary();
const auto end_time_point =
current_time_ms() + RayConfig::instance().raylet_start_wait_time_s() * 1000;
const auto node_id_binary = NodeID::FromHex(node_id_hex_str).Binary();

std::vector<rpc::GcsNodeInfo> node_infos;
while (true) {
std::vector<rpc::GcsNodeInfo> nodes;
auto status = GetAliveNodes(nodes);
if (!status.ok()) {
return status;
rpc::GetAllNodeInfoRequest_Filters filters;
filters.set_state(rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE);
filters.set_node_id(node_id_binary);
{
absl::ReaderMutexLock lock(&mutex_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gcs_client_ is protected by mutex, this is how it's used throughout

auto timeout_ms =
std::max(end_time_point - current_time_ms(), static_cast<int64_t>(0));
RAY_ASSIGN_OR_RETURN(
node_infos,
gcs_client_->Nodes().GetAllNoCacheWithFilters(timeout_ms, std::move(filters)));
}

if (nodes.empty()) {
status = Status::NotFound("GCS has started but no raylets have registered yet.");
} else {
int relevant_client_index = -1;
for (int i = 0; i < static_cast<int>(nodes.size()); i++) {
const auto &node = nodes[i];
if (node_id_binary == node.node_id()) {
relevant_client_index = i;
break;
}
}

if (relevant_client_index < 0) {
status = Status::NotFound(
"GCS cannot find the node with node ID " + node_id +
". The node registration may not be complete yet before the timeout." +
" Try increase the RAY_raylet_start_wait_time_s config.");
} else {
*node_info = nodes[relevant_client_index].SerializeAsString();
return Status::OK();
}
if (!node_infos.empty()) {
*node_info = node_infos[0].SerializeAsString();
return Status::OK();
}

if (current_time_ms() - start_ms >=
RayConfig::instance().raylet_start_wait_time_s() * 1000) {
return status;
if (current_time_ms() >= end_time_point) {
return Status::NotFound(
"GCS cannot find the node with node ID " + node_id_hex_str +
". The node registration may not be complete yet before the timeout." +
" Try increase the RAY_raylet_start_wait_time_s config.");
}
RAY_LOG(WARNING) << "Retrying to get node with node ID " << node_id;
RAY_LOG(WARNING) << "Retrying to get node with node ID " << node_id_hex_str;
// Some of the information may not be in GCS yet, so wait a little bit.
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

ray::Status GlobalStateAccessor::GetNodeToConnectForDriver(
const std::string &node_ip_address, std::string *node_to_connect) {
auto start_ms = current_time_ms();
const auto end_time_point =
current_time_ms() + RayConfig::instance().raylet_start_wait_time_s() * 1000;

std::vector<rpc::GcsNodeInfo> node_infos;
rpc::GetAllNodeInfoRequest_Filters filters;
filters.set_state(rpc::GcsNodeInfo_GcsNodeState::GcsNodeInfo_GcsNodeState_ALIVE);
filters.set_node_ip_address(node_ip_address);
while (true) {
std::vector<rpc::GcsNodeInfo> nodes;
auto status = GetAliveNodes(nodes);
if (!status.ok()) {
return status;
{
absl::ReaderMutexLock lock(&mutex_);
auto timeout_ms =
std::max(end_time_point - current_time_ms(), static_cast<int64_t>(0));
RAY_ASSIGN_OR_RETURN(
node_infos, gcs_client_->Nodes().GetAllNoCacheWithFilters(timeout_ms, filters));
}
if (!node_infos.empty()) {
*node_to_connect = node_infos[0].SerializeAsString();
return Status::OK();
}

if (nodes.empty()) {
status = Status::NotFound("GCS has started but no raylets have registered yet.");
} else {
int relevant_client_index = -1;
int head_node_client_index = -1;
std::pair<std::string, int> gcs_address;
std::string gcs_address;
{
absl::WriterMutexLock lock(&mutex_);
auto [address, _] = gcs_client_->GetGcsServerAddress();
gcs_address = std::move(address);
}
filters.set_node_ip_address(gcs_address);
{
absl::ReaderMutexLock lock(&mutex_);
auto timeout_ms = end_time_point - current_time_ms();
RAY_ASSIGN_OR_RETURN(
node_infos, gcs_client_->Nodes().GetAllNoCacheWithFilters(timeout_ms, filters));
}
if (node_infos.empty() && node_ip_address == gcs_address) {
filters.set_node_ip_address("127.0.0.1");
Comment on lines +470 to +471
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you know, can you comment on what this case is?

Copy link
Contributor Author

@dayshah dayshah Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related code (listen_to_localhost_only sets GrpcServer to listen to 127.0.0.1) with and pr that introduced this #16810. Don't see a concrete reason to have this last case anywhere, ideally the above case of looking for gcs_address should cover it, but maybe some case of container or something?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, lets keep it for now.

{
absl::WriterMutexLock lock(&mutex_);
gcs_address = gcs_client_->GetGcsServerAddress();
}

for (int i = 0; i < static_cast<int>(nodes.size()); i++) {
const auto &node = nodes[i];
std::string ip_address = node.node_manager_address();
if (ip_address == node_ip_address) {
relevant_client_index = i;
break;
}
// TODO(kfstorm): Do we need to replace `node_ip_address` with
// `get_node_ip_address()`?
if ((ip_address == "127.0.0.1" && gcs_address.first == node_ip_address) ||
ip_address == gcs_address.first) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i feel like the error messages don't totally align with the error messages, here we'll just return a node_info as long it matches the gcs_address we fetched

head_node_client_index = i;
}
}

if (relevant_client_index < 0 && head_node_client_index >= 0) {
RAY_LOG(INFO) << "This node has an IP address of " << node_ip_address
<< ", but we cannot find a local Raylet with the same address. "
<< "This can happen when you connect to the Ray cluster "
<< "with a different IP address or when connecting to a container.";
relevant_client_index = head_node_client_index;
}
if (relevant_client_index < 0) {
std::ostringstream oss;
oss << "This node has an IP address of " << node_ip_address << ", and Ray "
<< "expects this IP address to be either the GCS address or one of"
<< " the Raylet addresses. Connected to GCS at " << gcs_address.first
<< " and found raylets at ";
for (size_t i = 0; i < nodes.size(); i++) {
if (i > 0) {
oss << ", ";
}
oss << nodes[i].node_manager_address();
}
oss << " but none of these match this node's IP " << node_ip_address
<< ". Are any of these actually a different IP address for the same node?"
<< "You might need to provide --node-ip-address to specify the IP "
<< "address that the head should use when sending to this node.";
status = Status::NotFound(oss.str());
} else {
*node_to_connect = nodes[relevant_client_index].SerializeAsString();
return Status::OK();
absl::ReaderMutexLock lock(&mutex_);
auto timeout_ms =
std::max(end_time_point - current_time_ms(), static_cast<int64_t>(0));
RAY_ASSIGN_OR_RETURN(
node_infos,
gcs_client_->Nodes().GetAllNoCacheWithFilters(timeout_ms, filters));
}
}
if (!node_infos.empty()) {
RAY_LOG(INFO) << "This node has an IP address of " << node_ip_address
<< ", but we cannot find a local Raylet with the same address. "
<< "This can happen when you connect to the Ray cluster "
<< "with a different IP address or when connecting to a container.";
Comment on lines +482 to +485
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously we were still logging this if we don't get any node info that matches with the node_ip_address id and have to resort to using the gcs_address we fetched instead of the node_ip_address passed into the function

*node_to_connect = node_infos[0].SerializeAsString();
return Status::OK();
}

if (current_time_ms() - start_ms >=
RayConfig::instance().raylet_start_wait_time_s() * 1000) {
return status;
if (current_time_ms() >= end_time_point) {
std::ostringstream oss;
oss << "This node has an IP address of " << node_ip_address << ", and Ray "
<< "expects this IP address to be either the GCS address or one of"
<< " the Raylet addresses. Connected to GCS at " << gcs_address
<< ", and found no Raylet with this IP address. "
<< "You might need to provide --node-ip-address to specify the IP "
<< "address that the head should use when sending to this node.";
return Status::NotFound(oss.str());
}
RAY_LOG(WARNING) << "Some processes that the driver needs to connect to have "
"not registered with GCS, so retrying. Have you run "
Expand Down
8 changes: 1 addition & 7 deletions src/ray/gcs/gcs_client/global_state_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class GlobalStateAccessor {
/// \param[out] node_info The output parameter to store the node info. To support
/// multi-language, we serialize each GcsNodeInfo and return the serialized string.
/// Where used, it needs to be deserialized with protobuf function.
ray::Status GetNode(const std::string &node_id, std::string *node_info)
ray::Status GetNode(const std::string &node_id_hex_str, std::string *node_info)
ABSL_LOCKS_EXCLUDED(mutex_);

/// Get the node to connect for a Ray driver.
Expand All @@ -223,12 +223,6 @@ class GlobalStateAccessor {
ABSL_LOCKS_EXCLUDED(mutex_);

private:
/// Synchronously get the current alive nodes from GCS Service.
///
/// \param[out] nodes The output parameter to store the alive nodes.
ray::Status GetAliveNodes(std::vector<rpc::GcsNodeInfo> &nodes)
ABSL_LOCKS_EXCLUDED(mutex_);

/// MultiItem transformation helper in template style.
///
/// \return MultiItemCallback within in rpc type DATA.
Expand Down
44 changes: 28 additions & 16 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,19 +221,29 @@ void GcsNodeManager::HandleGetAllNodeInfo(rpc::GetAllNodeInfoRequest request,
rpc::SendReplyCallback send_reply_callback) {
int64_t limit =
(request.limit() > 0) ? request.limit() : std::numeric_limits<int64_t>::max();
NodeID filter_node_id = request.filters().has_node_id()
? NodeID::FromBinary(request.filters().node_id())
: NodeID::Nil();
std::optional<rpc::GcsNodeInfo::GcsNodeState> filter_state = std::nullopt;
if (request.filters().has_state()) {
filter_state = request.filters().state();
}
std::string filter_node_name = request.filters().node_name();
auto filter_fn = [&filter_node_id, &filter_node_name](const rpc::GcsNodeInfo &node) {
if (!filter_node_id.IsNil() && filter_node_id != NodeID::FromBinary(node.node_id())) {
std::optional<NodeID> filter_node_id =
request.filters().has_node_id()
? std::make_optional(NodeID::FromBinary(request.filters().node_id()))
: std::nullopt;
std::optional<std::string> filter_node_name =
request.filters().has_node_name()
? std::make_optional(request.filters().node_name())
: std::nullopt;
std::optional<std::string> filter_node_ip_address =
request.filters().has_node_ip_address()
? std::make_optional(request.filters().node_ip_address())
: std::nullopt;
auto filter_fn = [&filter_node_id, &filter_node_name, &filter_node_ip_address](
const rpc::GcsNodeInfo &node) {
if (filter_node_id.has_value() &&
*filter_node_id != NodeID::FromBinary(node.node_id())) {
return false;
}
if (filter_node_name.has_value() && *filter_node_name != node.node_name()) {
return false;
}
if (!filter_node_name.empty() && filter_node_name != node.node_name()) {
if (filter_node_ip_address.has_value() &&
*filter_node_ip_address != node.node_manager_address()) {
return false;
}
return true;
Expand All @@ -243,18 +253,21 @@ void GcsNodeManager::HandleGetAllNodeInfo(rpc::GetAllNodeInfoRequest request,
auto add_to_response =
[limit, reply, filter_fn, &num_added, &num_filtered](
const absl::flat_hash_map<NodeID, std::shared_ptr<rpc::GcsNodeInfo>> &nodes) {
for (const auto &entry : nodes) {
for (const auto &[node_id, node_info_ptr] : nodes) {
if (num_added >= limit) {
break;
}
if (filter_fn(*entry.second)) {
*reply->add_node_info_list() = *entry.second;
if (filter_fn(*node_info_ptr)) {
*reply->add_node_info_list() = *node_info_ptr;
num_added += 1;
} else {
num_filtered += 1;
}
}
};
std::optional<rpc::GcsNodeInfo::GcsNodeState> filter_state =
request.filters().has_state() ? std::make_optional(request.filters().state())
: std::nullopt;
if (filter_state == std::nullopt) {
add_to_response(alive_nodes_);
add_to_response(dead_nodes_);
Expand All @@ -271,8 +284,7 @@ void GcsNodeManager::HandleGetAllNodeInfo(rpc::GetAllNodeInfoRequest request,
++counts_[CountType::GET_ALL_NODE_INFO_REQUEST];
return;
}
size_t total = alive_nodes_.size() + dead_nodes_.size();
reply->set_total(total);
reply->set_total(alive_nodes_.size() + dead_nodes_.size());
reply->set_num_filtered(num_filtered);
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::GET_ALL_NODE_INFO_REQUEST];
Expand Down
Loading
Loading