Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][Node Labels 3/n]Add node labels to node resources and publish to all node #36009

Merged
merged 1 commit into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -286,20 +286,26 @@ void GcsResourceManager::Initialize(const GcsInitData &gcs_init_data) {
}

void GcsResourceManager::OnNodeAdd(const rpc::GcsNodeInfo &node) {
NodeID node_id = NodeID::FromBinary(node.node_id());
scheduling::NodeID scheduling_node_id(node_id.Binary());
if (!node.resources_total().empty()) {
scheduling::NodeID node_id(node.node_id());
for (const auto &entry : node.resources_total()) {
cluster_resource_manager_.UpdateResourceCapacity(
node_id, scheduling::ResourceID(entry.first), entry.second);
scheduling_node_id, scheduling::ResourceID(entry.first), entry.second);
}
} else {
RAY_LOG(WARNING) << "The registered node " << NodeID::FromBinary(node.node_id())
RAY_LOG(WARNING) << "The registered node " << node_id
<< " doesn't set the total resources.";
}

absl::flat_hash_map<std::string, std::string> labels(node.labels().begin(),
node.labels().end());
cluster_resource_manager_.SetNodeLabels(scheduling_node_id, labels);

rpc::ResourcesData data;
data.set_node_id(node.node_id());
data.set_node_id(node_id.Binary());
data.set_node_manager_address(node.node_manager_address());
node_resource_usages_.emplace(NodeID::FromBinary(node.node_id()), std::move(data));
node_resource_usages_.emplace(node_id, std::move(data));
num_alive_nodes_++;
}

Expand Down
23 changes: 22 additions & 1 deletion src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ TEST_F(GcsResourceManagerTest, TestBasic) {
// Get and check cluster resources.
const auto &resource_view = cluster_resource_manager_.GetResourceView();
ASSERT_EQ(1, resource_view.size());

scheduling::NodeID scheduling_node_id(node->node_id());
ASSERT_TRUE(resource_view.at(scheduling_node_id).GetLocalView().labels.empty());

auto resource_request =
ResourceMapToResourceRequest(resource_map, /*requires_object_store_memory=*/false);

Expand Down Expand Up @@ -129,6 +130,26 @@ TEST_F(GcsResourceManagerTest, TestSetAvailableResourcesWhenNodeDead) {
ASSERT_EQ(cluster_resource_manager_.GetResourceView().size(), 0);
}

TEST_F(GcsResourceManagerTest, TestNodeLabels) {
const std::string cpu_resource = "CPU";
absl::flat_hash_map<std::string, double> resource_map;
resource_map[cpu_resource] = 10;
absl::flat_hash_map<std::string, std::string> labels = {{"key", "value"},
{"gpu_type", "a100"}};

auto node = Mocker::GenNodeInfo();
node->mutable_resources_total()->insert(resource_map.begin(), resource_map.end());
node->mutable_labels()->insert(labels.begin(), labels.end());
// Add node resources.
gcs_resource_manager_->OnNodeAdd(*node);

// Get and check cluster resources.
const auto &resource_view = cluster_resource_manager_.GetResourceView();
ASSERT_EQ(1, resource_view.size());
scheduling::NodeID scheduling_node_id(node->node_id());
ASSERT_EQ(resource_view.at(scheduling_node_id).GetLocalView().labels, labels);
}

} // namespace ray

int main(int argc, char **argv) {
Expand Down
9 changes: 8 additions & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
}
},
/*get_pull_manager_at_capacity*/
[this]() { return object_manager_.PullManagerHasPullsQueued(); });
[this]() { return object_manager_.PullManagerHasPullsQueued(); },
/*labels*/ config.labels);

auto get_node_info_func = [this](const NodeID &node_id) {
return gcs_client_->Nodes().Get(node_id);
Expand Down Expand Up @@ -988,6 +989,12 @@ void NodeManager::NodeAdded(const GcsNodeInfo &node_info) {
remote_node_manager_addresses_[node_id] =
std::make_pair(node_info.node_manager_address(), node_info.node_manager_port());

// Reset node labels when node added.
absl::flat_hash_map<std::string, std::string> labels(node_info.labels().begin(),
node_info.labels().end());
cluster_resource_scheduler_->GetClusterResourceManager().SetNodeLabels(
scheduling::NodeID(node_id.Binary()), labels);

// Fetch resource info for the remote node and update cluster resource map.
RAY_CHECK_OK(gcs_client_->NodeResources().AsyncGetResources(
node_id,
Expand Down
14 changes: 11 additions & 3 deletions src/ray/raylet/scheduling/cluster_resource_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ ResourceRequest ResourceMapToResourceRequest(
/// \request Conversion result to a ResourceRequest data structure.
NodeResources ResourceMapToNodeResources(
const absl::flat_hash_map<std::string, double> &resource_map_total,
const absl::flat_hash_map<std::string, double> &resource_map_available) {
const absl::flat_hash_map<std::string, double> &resource_map_available,
const absl::flat_hash_map<std::string, std::string> &node_labels) {
NodeResources node_resources;
node_resources.total = ResourceMapToResourceRequest(resource_map_total, false);
node_resources.available = ResourceMapToResourceRequest(resource_map_available, false);
node_resources.labels = node_labels;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After we have labels, the current naming (NodeResources) is no longer that accurate. We should probably have something like

class Node {
   NodeResources resources;
  map<string, string> labels;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Labels can also be regarded as a resource of node. So it is very suitable to put it in NodeResources, and it reduces a lot of code changes.
  2. NodeResources do not need to be computable. The ResourceRequest in NodeResources is Computable. Adding labels to NodeResources does not affect the original available/tatal, etc.
    image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! Got the design.

return node_resources;
}

Expand Down Expand Up @@ -97,7 +99,8 @@ bool NodeResources::IsFeasible(const ResourceRequest &resource_request) const {
}

bool NodeResources::operator==(const NodeResources &other) const {
return this->available == other.available && this->total == other.total;
return this->available == other.available && this->total == other.total &&
this->labels == other.labels;
}

bool NodeResources::operator!=(const NodeResources &other) const {
Expand All @@ -106,7 +109,7 @@ bool NodeResources::operator!=(const NodeResources &other) const {

std::string NodeResources::DebugString() const {
std::stringstream buffer;
buffer << "{";
buffer << "{\"resources\":{";
bool first = true;
for (auto &resource_id : total.ResourceIds()) {
if (!first) {
Expand All @@ -116,6 +119,11 @@ std::string NodeResources::DebugString() const {
buffer << resource_id.Binary() << ": " << available.Get(resource_id) << "/"
<< total.Get(resource_id);
}

buffer << "}, \"labels\":{";
for (const auto &[key, value] : labels) {
buffer << "\"" << key << "\":\"" << value << "\",";
}
buffer << "}";
return buffer.str();
}
Expand Down
8 changes: 7 additions & 1 deletion src/ray/raylet/scheduling/cluster_resource_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ class NodeResources {
available(other.available),
load(other.load),
normal_task_resources(other.normal_task_resources),
labels(other.labels),
latest_resources_normal_task_timestamp(
other.latest_resources_normal_task_timestamp),
object_pulls_queued(other.object_pulls_queued) {}
Expand All @@ -431,6 +432,10 @@ class NodeResources {
ResourceRequest load;
/// Resources owned by normal tasks.
ResourceRequest normal_task_resources;

// The key-value labels of this node.
absl::flat_hash_map<std::string, std::string> labels;

/// Normal task resources could be uploaded by 1) Raylets' periodical reporters; 2)
/// Rejected RequestWorkerLeaseReply. So we need the timestamps to decide whether an
/// upload is latest.
Expand Down Expand Up @@ -500,7 +505,8 @@ struct Node {
/// \request Conversion result to a ResourceRequest data structure.
NodeResources ResourceMapToNodeResources(
const absl::flat_hash_map<std::string, double> &resource_map_total,
const absl::flat_hash_map<std::string, double> &resource_map_available);
const absl::flat_hash_map<std::string, double> &resource_map_available,
const absl::flat_hash_map<std::string, std::string> &node_labels = {});

/// Convert a map of resources to a ResourceRequest data structure.
ResourceRequest ResourceMapToResourceRequest(
Expand Down
11 changes: 11 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,4 +292,15 @@ BundleLocationIndex &ClusterResourceManager::GetBundleLocationIndex() {
return bundle_location_index_;
}

void ClusterResourceManager::SetNodeLabels(
const scheduling::NodeID &node_id,
const absl::flat_hash_map<std::string, std::string> &labels) {
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
NodeResources node_resources;
it = nodes_.emplace(node_id, node_resources).first;
}
Comment on lines +299 to +302
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bit weird that we can have node with only labels but empty resources. We should be able to set labels and resources together when we add a node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reconsidered and still think it's better to keep this interface.

  1. it's not weird because the original code logic already had cases where the total resources of a node were not set.
    image

  2. To update resources, use UpdateResourceCapacity, and to update labels, use UpdateNodeLabels. This way, the interface is more focused, and the code is simpler.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually wondering when we may not have total resources. Should it be a check instead of warning? Let me check with the team.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we actually change if (it == nodes_.end()) { to CHECK which makes code easier to reason about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My current implementation requires me to retain this logic. Since the labels information is already available in 'GcsNodeInfo', I will first call 'ResetNodeLabels()', and then update the resources.
image

it->second.GetMutableLocalView()->labels = labels;
}

} // namespace ray
3 changes: 3 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class ClusterResourceManager {

BundleLocationIndex &GetBundleLocationIndex();

void SetNodeLabels(const scheduling::NodeID &node_id,
const absl::flat_hash_map<std::string, std::string> &labels);

private:
friend class ClusterResourceScheduler;
friend class gcs::GcsActorSchedulerTest;
Expand Down
7 changes: 4 additions & 3 deletions src/ray/raylet/scheduling/cluster_resource_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ ClusterResourceScheduler::ClusterResourceScheduler(
const absl::flat_hash_map<std::string, double> &local_node_resources,
std::function<bool(scheduling::NodeID)> is_node_available_fn,
std::function<int64_t(void)> get_used_object_store_memory,
std::function<bool(void)> get_pull_manager_at_capacity)
std::function<bool(void)> get_pull_manager_at_capacity,
const absl::flat_hash_map<std::string, std::string> &local_node_labels)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the change that creates the ClusterResourceScheduler and pass in the labels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

: local_node_id_(local_node_id), is_node_available_fn_(is_node_available_fn) {
NodeResources node_resources =
ResourceMapToNodeResources(local_node_resources, local_node_resources);
NodeResources node_resources = ResourceMapToNodeResources(
local_node_resources, local_node_resources, local_node_labels);
Init(io_service,
node_resources,
get_used_object_store_memory,
Expand Down
3 changes: 2 additions & 1 deletion src/ray/raylet/scheduling/cluster_resource_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ class ClusterResourceScheduler {
const absl::flat_hash_map<std::string, double> &local_node_resources,
std::function<bool(scheduling::NodeID)> is_node_available_fn,
std::function<int64_t(void)> get_used_object_store_memory = nullptr,
std::function<bool(void)> get_pull_manager_at_capacity = nullptr);
std::function<bool(void)> get_pull_manager_at_capacity = nullptr,
const absl::flat_hash_map<std::string, std::string> &local_node_labels = {});

/// Schedule the specified resources to the cluster nodes.
///
Expand Down