From a114500c401b2feb0b9ef04f95cf6ccb4181342b Mon Sep 17 00:00:00 2001 From: LarryLian <554538252@qq.com> Date: Mon, 29 May 2023 15:55:26 +0800 Subject: [PATCH] [Core][Node Labels 3/n]Add node labels to node resources and publish to all node Signed-off-by: LarryLian <554538252@qq.com> --- .../gcs/gcs_server/gcs_resource_manager.cc | 16 +++++++++---- .../test/gcs_resource_manager_test.cc | 23 ++++++++++++++++++- src/ray/raylet/node_manager.cc | 9 +++++++- .../scheduling/cluster_resource_data.cc | 14 ++++++++--- .../raylet/scheduling/cluster_resource_data.h | 8 ++++++- .../scheduling/cluster_resource_manager.cc | 11 +++++++++ .../scheduling/cluster_resource_manager.h | 3 +++ .../scheduling/cluster_resource_scheduler.cc | 7 +++--- .../scheduling/cluster_resource_scheduler.h | 3 ++- 9 files changed, 79 insertions(+), 15 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index 96b458d164ee..d1d9d61e03ac 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -286,20 +286,26 @@ void GcsResourceManager::Initialize(const GcsInitData &gcs_init_data) { } void GcsResourceManager::OnNodeAdd(const rpc::GcsNodeInfo &node) { + NodeID node_id = NodeID::FromBinary(node.node_id()); + scheduling::NodeID scheduling_node_id(node_id.Binary()); if (!node.resources_total().empty()) { - scheduling::NodeID node_id(node.node_id()); for (const auto &entry : node.resources_total()) { cluster_resource_manager_.UpdateResourceCapacity( - node_id, scheduling::ResourceID(entry.first), entry.second); + scheduling_node_id, scheduling::ResourceID(entry.first), entry.second); } } else { - RAY_LOG(WARNING) << "The registered node " << NodeID::FromBinary(node.node_id()) + RAY_LOG(WARNING) << "The registered node " << node_id << " doesn't set the total resources."; } + + absl::flat_hash_map labels(node.labels().begin(), + node.labels().end()); + cluster_resource_manager_.SetNodeLabels(scheduling_node_id, labels); + rpc::ResourcesData data; - data.set_node_id(node.node_id()); + data.set_node_id(node_id.Binary()); data.set_node_manager_address(node.node_manager_address()); - node_resource_usages_.emplace(NodeID::FromBinary(node.node_id()), std::move(data)); + node_resource_usages_.emplace(node_id, std::move(data)); num_alive_nodes_++; } diff --git a/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc index 553a7270f351..332d0bafbb03 100644 --- a/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc @@ -50,8 +50,9 @@ TEST_F(GcsResourceManagerTest, TestBasic) { // Get and check cluster resources. const auto &resource_view = cluster_resource_manager_.GetResourceView(); ASSERT_EQ(1, resource_view.size()); - scheduling::NodeID scheduling_node_id(node->node_id()); + ASSERT_TRUE(resource_view.at(scheduling_node_id).GetLocalView().labels.empty()); + auto resource_request = ResourceMapToResourceRequest(resource_map, /*requires_object_store_memory=*/false); @@ -129,6 +130,26 @@ TEST_F(GcsResourceManagerTest, TestSetAvailableResourcesWhenNodeDead) { ASSERT_EQ(cluster_resource_manager_.GetResourceView().size(), 0); } +TEST_F(GcsResourceManagerTest, TestNodeLabels) { + const std::string cpu_resource = "CPU"; + absl::flat_hash_map resource_map; + resource_map[cpu_resource] = 10; + absl::flat_hash_map labels = {{"key", "value"}, + {"gpu_type", "a100"}}; + + auto node = Mocker::GenNodeInfo(); + node->mutable_resources_total()->insert(resource_map.begin(), resource_map.end()); + node->mutable_labels()->insert(labels.begin(), labels.end()); + // Add node resources. + gcs_resource_manager_->OnNodeAdd(*node); + + // Get and check cluster resources. + const auto &resource_view = cluster_resource_manager_.GetResourceView(); + ASSERT_EQ(1, resource_view.size()); + scheduling::NodeID scheduling_node_id(node->node_id()); + ASSERT_EQ(resource_view.at(scheduling_node_id).GetLocalView().labels, labels); +} + } // namespace ray int main(int argc, char **argv) { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index cbdb828f3676..dcd1ae2afc19 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -318,7 +318,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service, } }, /*get_pull_manager_at_capacity*/ - [this]() { return object_manager_.PullManagerHasPullsQueued(); }); + [this]() { return object_manager_.PullManagerHasPullsQueued(); }, + /*labels*/ config.labels); auto get_node_info_func = [this](const NodeID &node_id) { return gcs_client_->Nodes().Get(node_id); @@ -988,6 +989,12 @@ void NodeManager::NodeAdded(const GcsNodeInfo &node_info) { remote_node_manager_addresses_[node_id] = std::make_pair(node_info.node_manager_address(), node_info.node_manager_port()); + // Reset node labels when node added. + absl::flat_hash_map labels(node_info.labels().begin(), + node_info.labels().end()); + cluster_resource_scheduler_->GetClusterResourceManager().SetNodeLabels( + scheduling::NodeID(node_id.Binary()), labels); + // Fetch resource info for the remote node and update cluster resource map. RAY_CHECK_OK(gcs_client_->NodeResources().AsyncGetResources( node_id, diff --git a/src/ray/raylet/scheduling/cluster_resource_data.cc b/src/ray/raylet/scheduling/cluster_resource_data.cc index ebd554b29118..3a6c6247905c 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.cc +++ b/src/ray/raylet/scheduling/cluster_resource_data.cc @@ -44,10 +44,12 @@ ResourceRequest ResourceMapToResourceRequest( /// \request Conversion result to a ResourceRequest data structure. NodeResources ResourceMapToNodeResources( const absl::flat_hash_map &resource_map_total, - const absl::flat_hash_map &resource_map_available) { + const absl::flat_hash_map &resource_map_available, + const absl::flat_hash_map &node_labels) { NodeResources node_resources; node_resources.total = ResourceMapToResourceRequest(resource_map_total, false); node_resources.available = ResourceMapToResourceRequest(resource_map_available, false); + node_resources.labels = node_labels; return node_resources; } @@ -97,7 +99,8 @@ bool NodeResources::IsFeasible(const ResourceRequest &resource_request) const { } bool NodeResources::operator==(const NodeResources &other) const { - return this->available == other.available && this->total == other.total; + return this->available == other.available && this->total == other.total && + this->labels == other.labels; } bool NodeResources::operator!=(const NodeResources &other) const { @@ -106,7 +109,7 @@ bool NodeResources::operator!=(const NodeResources &other) const { std::string NodeResources::DebugString() const { std::stringstream buffer; - buffer << "{"; + buffer << "{\"resources\":{"; bool first = true; for (auto &resource_id : total.ResourceIds()) { if (!first) { @@ -116,6 +119,11 @@ std::string NodeResources::DebugString() const { buffer << resource_id.Binary() << ": " << available.Get(resource_id) << "/" << total.Get(resource_id); } + + buffer << "}, \"labels\":{"; + for (const auto &[key, value] : labels) { + buffer << "\"" << key << "\":\"" << value << "\","; + } buffer << "}"; return buffer.str(); } diff --git a/src/ray/raylet/scheduling/cluster_resource_data.h b/src/ray/raylet/scheduling/cluster_resource_data.h index 649271b8ecc6..bd2f07690a50 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.h +++ b/src/ray/raylet/scheduling/cluster_resource_data.h @@ -422,6 +422,7 @@ class NodeResources { available(other.available), load(other.load), normal_task_resources(other.normal_task_resources), + labels(other.labels), latest_resources_normal_task_timestamp( other.latest_resources_normal_task_timestamp), object_pulls_queued(other.object_pulls_queued) {} @@ -431,6 +432,10 @@ class NodeResources { ResourceRequest load; /// Resources owned by normal tasks. ResourceRequest normal_task_resources; + + // The key-value labels of this node. + absl::flat_hash_map labels; + /// Normal task resources could be uploaded by 1) Raylets' periodical reporters; 2) /// Rejected RequestWorkerLeaseReply. So we need the timestamps to decide whether an /// upload is latest. @@ -500,7 +505,8 @@ struct Node { /// \request Conversion result to a ResourceRequest data structure. NodeResources ResourceMapToNodeResources( const absl::flat_hash_map &resource_map_total, - const absl::flat_hash_map &resource_map_available); + const absl::flat_hash_map &resource_map_available, + const absl::flat_hash_map &node_labels = {}); /// Convert a map of resources to a ResourceRequest data structure. ResourceRequest ResourceMapToResourceRequest( diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.cc b/src/ray/raylet/scheduling/cluster_resource_manager.cc index 30b7b63e546e..0332e9c990e3 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.cc +++ b/src/ray/raylet/scheduling/cluster_resource_manager.cc @@ -292,4 +292,15 @@ BundleLocationIndex &ClusterResourceManager::GetBundleLocationIndex() { return bundle_location_index_; } +void ClusterResourceManager::SetNodeLabels( + const scheduling::NodeID &node_id, + const absl::flat_hash_map &labels) { + auto it = nodes_.find(node_id); + if (it == nodes_.end()) { + NodeResources node_resources; + it = nodes_.emplace(node_id, node_resources).first; + } + it->second.GetMutableLocalView()->labels = labels; +} + } // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.h b/src/ray/raylet/scheduling/cluster_resource_manager.h index c3ce01465146..ddbe2503b77c 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.h +++ b/src/ray/raylet/scheduling/cluster_resource_manager.h @@ -133,6 +133,9 @@ class ClusterResourceManager { BundleLocationIndex &GetBundleLocationIndex(); + void SetNodeLabels(const scheduling::NodeID &node_id, + const absl::flat_hash_map &labels); + private: friend class ClusterResourceScheduler; friend class gcs::GcsActorSchedulerTest; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 85505a82da2d..13c78ab425ca 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -44,10 +44,11 @@ ClusterResourceScheduler::ClusterResourceScheduler( const absl::flat_hash_map &local_node_resources, std::function is_node_available_fn, std::function get_used_object_store_memory, - std::function get_pull_manager_at_capacity) + std::function get_pull_manager_at_capacity, + const absl::flat_hash_map &local_node_labels) : local_node_id_(local_node_id), is_node_available_fn_(is_node_available_fn) { - NodeResources node_resources = - ResourceMapToNodeResources(local_node_resources, local_node_resources); + NodeResources node_resources = ResourceMapToNodeResources( + local_node_resources, local_node_resources, local_node_labels); Init(io_service, node_resources, get_used_object_store_memory, diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 2722989cb083..e18946651758 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -62,7 +62,8 @@ class ClusterResourceScheduler { const absl::flat_hash_map &local_node_resources, std::function is_node_available_fn, std::function get_used_object_store_memory = nullptr, - std::function get_pull_manager_at_capacity = nullptr); + std::function get_pull_manager_at_capacity = nullptr, + const absl::flat_hash_map &local_node_labels = {}); /// Schedule the specified resources to the cluster nodes. ///