diff --git a/python/ray/autoscaler/v2/tests/test_sdk.py b/python/ray/autoscaler/v2/tests/test_sdk.py index 0da3ecd7d87e4..81e9422583e81 100644 --- a/python/ray/autoscaler/v2/tests/test_sdk.py +++ b/python/ray/autoscaler/v2/tests/test_sdk.py @@ -1,9 +1,11 @@ -# coding: utf-8 import os import sys -from typing import List -import pytest # noqa +# coding: utf-8 +from dataclasses import dataclass +from typing import Callable, List, Optional + +import pytest import ray import ray._private.ray_constants as ray_constants @@ -11,7 +13,11 @@ from ray.autoscaler.v2.sdk import request_cluster_resources from ray.autoscaler.v2.tests.util import get_cluster_resource_state from ray.core.generated.experimental import autoscaler_pb2_grpc -from ray.core.generated.experimental.autoscaler_pb2 import GetClusterResourceStateReply +from ray.core.generated.experimental.autoscaler_pb2 import ( + ClusterResourceState, + NodeStatus, +) +from ray.util.state.api import list_nodes def _autoscaler_state_service_stub(): @@ -24,16 +30,16 @@ def _autoscaler_state_service_stub(): def assert_cluster_resource_constraints( - reply: GetClusterResourceStateReply, expected: List[dict] + state: ClusterResourceState, expected: List[dict] ): """ Assert a GetClusterResourceStateReply has cluster_resource_constraints that matches with the expected resources. """ # We only have 1 constraint for now. - assert len(reply.cluster_resource_constraints) == 1 + assert len(state.cluster_resource_constraints) == 1 - min_bundles = reply.cluster_resource_constraints[0].min_bundles + min_bundles = state.cluster_resource_constraints[0].min_bundles assert len(min_bundles) == len(expected) # Sort all the bundles by bundle's resource names @@ -46,6 +52,30 @@ def assert_cluster_resource_constraints( assert dict(actual_bundle.resources_bundle) == expected_bundle +@dataclass +class NodeState: + node_id: str + node_status: NodeStatus + idle_time_check_cb: Optional[Callable] = None + + +def assert_node_states(state: ClusterResourceState, expected_nodes: List[NodeState]): + """ + Assert a GetClusterResourceStateReply has node states that + matches with the expected nodes. + """ + assert len(state.node_states) == len(expected_nodes) + + # Sort all the nodes by node's node_id + node_states = sorted(state.node_states, key=lambda node: node.node_id) + expected_nodes = sorted(expected_nodes, key=lambda node: node.node_id) + + for actual_node, expected_node in zip(node_states, expected_nodes): + assert actual_node.status == expected_node.node_status + if expected_node.idle_time_check_cb: + assert expected_node.idle_time_check_cb(actual_node.idle_duration_ms) + + def test_request_cluster_resources_basic(shutdown_only): ray.init(num_cpus=1) stub = _autoscaler_state_service_stub() @@ -54,8 +84,8 @@ def test_request_cluster_resources_basic(shutdown_only): request_cluster_resources([{"CPU": 1}]) def verify(): - reply = get_cluster_resource_state(stub) - assert_cluster_resource_constraints(reply, [{"CPU": 1}]) + state = get_cluster_resource_state(stub) + assert_cluster_resource_constraints(state, [{"CPU": 1}]) return True wait_for_condition(verify) @@ -64,13 +94,100 @@ def verify(): request_cluster_resources([{"CPU": 2, "GPU": 1}, {"CPU": 1}]) def verify(): - reply = get_cluster_resource_state(stub) - assert_cluster_resource_constraints(reply, [{"CPU": 2, "GPU": 1}, {"CPU": 1}]) + state = get_cluster_resource_state(stub) + assert_cluster_resource_constraints(state, [{"CPU": 2, "GPU": 1}, {"CPU": 1}]) return True wait_for_condition(verify) +def test_node_state_lifecycle_basic(ray_start_cluster): + + cluster = ray_start_cluster + cluster.add_node(num_cpus=0) + ray.init(address=cluster.address) + + node = cluster.add_node(num_cpus=1) + + stub = _autoscaler_state_service_stub() + + # We don't have node id from `add_node` unfortunately. + def nodes_up(): + nodes = list_nodes() + assert len(nodes) == 2 + return True + + wait_for_condition(nodes_up) + + def get_node_ids(): + head_node_id = None + node_id = None + nodes = list_nodes() + for node in nodes: + if node.is_head_node: + head_node_id = node.node_id + else: + node_id = node.node_id + return head_node_id, node_id + + head_node_id, node_id = get_node_ids() + + def verify_cluster_idle(): + state = get_cluster_resource_state(stub) + assert_node_states( + state, + [ + NodeState(node_id, NodeStatus.IDLE, lambda idle_ms: idle_ms > 0), + NodeState(head_node_id, NodeStatus.IDLE, lambda idle_ms: idle_ms > 0), + ], + ) + return True + + wait_for_condition(verify_cluster_idle) + + # Schedule a task running + @ray.remote(num_cpus=0.1) + def f(): + while True: + pass + + t = f.remote() + + def verify_cluster_busy(): + state = get_cluster_resource_state(stub) + assert_node_states( + state, + [ + NodeState(node_id, NodeStatus.RUNNING, lambda idle_ms: idle_ms == 0), + NodeState(head_node_id, NodeStatus.IDLE, lambda idle_ms: idle_ms > 0), + ], + ) + return True + + wait_for_condition(verify_cluster_busy) + + # Kill the task + ray.cancel(t, force=True) + + wait_for_condition(verify_cluster_idle) + + # Kill the node. + cluster.remove_node(node) + + def verify_cluster_no_node(): + state = get_cluster_resource_state(stub) + assert_node_states( + state, + [ + NodeState(node_id, NodeStatus.DEAD), + NodeState(head_node_id, NodeStatus.IDLE, lambda idle_ms: idle_ms > 0), + ], + ) + return True + + wait_for_condition(verify_cluster_no_node) + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc index 541466291c77b..21b7c73fadf70 100644 --- a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc @@ -183,41 +183,52 @@ void GcsAutoscalerStateManager::GetPendingResourceRequests( void GcsAutoscalerStateManager::GetNodeStates( rpc::autoscaler::ClusterResourceState *state) { - auto populate_node_state = [&](const rpc::GcsNodeInfo &gcs_node_info, - rpc::autoscaler::NodeStatus status) { + auto populate_node_state = [&](const rpc::GcsNodeInfo &gcs_node_info) { auto node_state_proto = state->add_node_states(); node_state_proto->set_node_id(gcs_node_info.node_id()); node_state_proto->set_instance_id(gcs_node_info.instance_id()); node_state_proto->set_ray_node_type_name(gcs_node_info.node_type_name()); node_state_proto->set_node_state_version(last_cluster_resource_state_version_); - node_state_proto->set_status(status); - - if (status == rpc::autoscaler::RUNNING) { - auto const &node_resource_data = cluster_resource_manager_.GetNodeResources( - scheduling::NodeID(node_state_proto->node_id())); - - // Copy resource available - const auto &available = node_resource_data.available.ToResourceMap(); - node_state_proto->mutable_available_resources()->insert(available.begin(), - available.end()); - - // Copy total resources - const auto &total = node_resource_data.total.ToResourceMap(); - node_state_proto->mutable_total_resources()->insert(total.begin(), total.end()); - - // Add dynamic PG labels. - const auto &pgs_on_node = gcs_placement_group_manager_.GetBundlesOnNode( - NodeID::FromBinary(gcs_node_info.node_id())); - for (const auto &[pg_id, _bundle_indices] : pgs_on_node) { - node_state_proto->mutable_dynamic_labels()->insert( - {FormatPlacementGroupLabelName(pg_id.Binary()), ""}); - } + + if (gcs_node_info.state() == rpc::GcsNodeInfo::DEAD) { + node_state_proto->set_status(rpc::autoscaler::NodeStatus::DEAD); + // We don't need populate other info for a dead node. + return; + } + + // THe node is alive. We need to check if the node is idle. + auto const &node_resource_data = cluster_resource_manager_.GetNodeResources( + scheduling::NodeID(node_state_proto->node_id())); + if (node_resource_data.idle_resource_duration_ms > 0) { + // The node is idle. + node_state_proto->set_status(rpc::autoscaler::NodeStatus::IDLE); + node_state_proto->set_idle_duration_ms( + node_resource_data.idle_resource_duration_ms); + } else { + node_state_proto->set_status(rpc::autoscaler::NodeStatus::RUNNING); + } + + // Copy resource available + const auto &available = node_resource_data.available.ToResourceMap(); + node_state_proto->mutable_available_resources()->insert(available.begin(), + available.end()); + + // Copy total resources + const auto &total = node_resource_data.total.ToResourceMap(); + node_state_proto->mutable_total_resources()->insert(total.begin(), total.end()); + + // Add dynamic PG labels. + const auto &pgs_on_node = gcs_placement_group_manager_.GetBundlesOnNode( + NodeID::FromBinary(gcs_node_info.node_id())); + for (const auto &[pg_id, _bundle_indices] : pgs_on_node) { + node_state_proto->mutable_dynamic_labels()->insert( + {FormatPlacementGroupLabelName(pg_id.Binary()), ""}); } }; const auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes(); std::for_each(alive_nodes.begin(), alive_nodes.end(), [&](const auto &gcs_node_info) { - populate_node_state(*gcs_node_info.second, rpc::autoscaler::RUNNING); + populate_node_state(*gcs_node_info.second); }); // This might be large if there are many nodes for a long-running cluster. @@ -227,7 +238,7 @@ void GcsAutoscalerStateManager::GetNodeStates( // https://github.com/ray-project/ray/issues/35874 const auto &dead_nodes = gcs_node_manager_.GetAllDeadNodes(); std::for_each(dead_nodes.begin(), dead_nodes.end(), [&](const auto &gcs_node_info) { - populate_node_state(*gcs_node_info.second, rpc::autoscaler::DEAD); + populate_node_state(*gcs_node_info.second); }); } diff --git a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc index 54a060829fc06..6d035b0082c58 100644 --- a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc @@ -67,15 +67,20 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test { gcs_resource_manager_->OnNodeAdd(*node); } - void RemoveNode(const NodeID &node_id) { + void RemoveNode(const std::shared_ptr &node) { + const auto node_id = NodeID::FromBinary(node->node_id()); + node->set_state(rpc::GcsNodeInfo::DEAD); gcs_node_manager_->alive_nodes_.erase(node_id); + gcs_node_manager_->dead_nodes_[node_id] = node; gcs_resource_manager_->OnNodeDead(node_id); } void CheckNodeResources( const rpc::autoscaler::NodeState &node_state, const absl::flat_hash_map &total_resources, - const absl::flat_hash_map &available_resources) { + const absl::flat_hash_map &available_resources, + const rpc::autoscaler::NodeStatus &status = rpc::autoscaler::NodeStatus::RUNNING, + int64_t idle_ms = 0) { ASSERT_EQ(node_state.total_resources_size(), total_resources.size()); ASSERT_EQ(node_state.available_resources_size(), available_resources.size()); for (const auto &resource : total_resources) { @@ -84,6 +89,8 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test { for (const auto &resource : available_resources) { ASSERT_EQ(node_state.available_resources().at(resource.first), resource.second); } + ASSERT_EQ(node_state.status(), status); + ASSERT_EQ(node_state.idle_duration_ms(), idle_ms); } void CheckNodeLabels(const rpc::autoscaler::NodeState &node_state, @@ -119,13 +126,15 @@ class GcsAutoscalerStateManagerTest : public ::testing::Test { const NodeID &node_id, const absl::flat_hash_map &available_resources, const absl::flat_hash_map &total_resources, - bool available_resources_changed) { + bool available_resources_changed, + int64_t idle_ms = 0) { rpc::ResourcesData resources_data; Mocker::FillResourcesData(resources_data, node_id, available_resources, total_resources, - available_resources_changed); + available_resources_changed, + idle_ms); gcs_resource_manager_->UpdateFromResourceReport(resources_data); } @@ -334,10 +343,13 @@ TEST_F(GcsAutoscalerStateManagerTest, TestNodeAddUpdateRemove) { // Remove a node - test node states correct. { - RemoveNode(NodeID::FromBinary(node->node_id())); - gcs_resource_manager_->OnNodeDead(NodeID::FromBinary(node->node_id())); + RemoveNode(node); const auto &state = GetClusterResourceStateSync(); - ASSERT_EQ(state.node_states_size(), 0); + ASSERT_EQ(state.node_states_size(), 1); + CheckNodeResources(state.node_states(0), + /*total*/ {}, + /*available*/ {}, + rpc::autoscaler::NodeStatus::DEAD); } } @@ -404,7 +416,7 @@ TEST_F(GcsAutoscalerStateManagerTest, TestBasicResourceRequests) { // Remove node should clear it. { - RemoveNode(NodeID::FromBinary(node->node_id())); + RemoveNode(node); auto reply = GetClusterResourceStateSync(); ASSERT_EQ(reply.pending_resource_requests_size(), 0); } @@ -599,6 +611,55 @@ TEST_F(GcsAutoscalerStateManagerTest, TestReportAutoscalingState) { } } +TEST_F(GcsAutoscalerStateManagerTest, TestIdleTime) { + auto node = Mocker::GenNodeInfo(); + + // Adding a node. + node->mutable_resources_total()->insert({"CPU", 2}); + node->mutable_resources_total()->insert({"GPU", 1}); + node->set_instance_id("instance_1"); + AddNode(node); + + // No report yet - so idle time should be 0. + { + const auto &state = GetClusterResourceStateSync(); + ASSERT_EQ(state.node_states_size(), 1); + CheckNodeResources(state.node_states(0), + /*total*/ {{"CPU", 2}, {"GPU", 1}}, + /*available*/ {{"CPU", 2}, {"GPU", 1}}); + } + + // Report idle node info. + UpdateFromResourceReportSync(NodeID::FromBinary(node->node_id()), + {/* available */ {"CPU", 2}, {"GPU", 1}}, + /* total*/ {{"CPU", 2}, {"GPU", 1}}, + /* available_changed*/ true, + /* idle_duration_ms */ 10); + + // Check report idle time is set. + { + const auto &state = GetClusterResourceStateSync(); + ASSERT_EQ(state.node_states_size(), 1); + CheckNodeResources(state.node_states(0), + /*total*/ {{"CPU", 2}, {"GPU", 1}}, + /*available*/ {{"CPU", 2}, {"GPU", 1}}, + /*status*/ rpc::autoscaler::NodeStatus::IDLE, + /*idle_ms*/ 10); + } + + // Dead node should make it no longer idle. + { + RemoveNode(node); + gcs_resource_manager_->OnNodeDead(NodeID::FromBinary(node->node_id())); + const auto &state = GetClusterResourceStateSync(); + ASSERT_EQ(state.node_states_size(), 1); + CheckNodeResources(state.node_states(0), + /*total*/ {}, + /*available*/ {}, + rpc::autoscaler::NodeStatus::DEAD); + } +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index cade44b8b42ca..aca3c7f3dbf6b 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -204,6 +204,7 @@ struct Mocker { node->set_node_manager_address(address); node->set_node_name(node_name); node->set_instance_id("instance_x"); + node->set_state(rpc::GcsNodeInfo::ALIVE); return node; } @@ -311,7 +312,8 @@ struct Mocker { const NodeID &node_id, const absl::flat_hash_map &available_resources, const absl::flat_hash_map &total_resources, - bool available_resources_changed) { + bool available_resources_changed, + int64_t idle_ms = 0) { resources_data.set_node_id(node_id.Binary()); for (const auto &resource : available_resources) { (*resources_data.mutable_resources_available())[resource.first] = resource.second; @@ -320,6 +322,7 @@ struct Mocker { (*resources_data.mutable_resources_total())[resource.first] = resource.second; } resources_data.set_resources_available_changed(available_resources_changed); + resources_data.set_idle_duration_ms(idle_ms); } static void FillResourcesData(rpc::ResourcesData &data, diff --git a/src/ray/protobuf/experimental/autoscaler.proto b/src/ray/protobuf/experimental/autoscaler.proto index 8b747686a32ce..23c299ed95607 100644 --- a/src/ray/protobuf/experimental/autoscaler.proto +++ b/src/ray/protobuf/experimental/autoscaler.proto @@ -128,9 +128,8 @@ message NodeState { // The status of the node. NodeStatus status = 8; - // The time since the last status change of the node, i.e. how long - // the node has been in the current status. - int64 time_since_last_status_change_ms = 9; + // The time in ms since the node is idle. + int64 idle_duration_ms = 9; // TODO(rickyx): to be populated. // Node ip address. diff --git a/src/ray/raylet/scheduling/cluster_resource_data.h b/src/ray/raylet/scheduling/cluster_resource_data.h index 1f7a3b6fee648..fb8c9e5fdc318 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.h +++ b/src/ray/raylet/scheduling/cluster_resource_data.h @@ -436,6 +436,9 @@ class NodeResources { // The key-value labels of this node. absl::flat_hash_map labels; + // The idle duration of the node from resources. + int64_t idle_resource_duration_ms = 0; + /// Normal task resources could be uploaded by 1) Raylets' periodical reporters; 2) /// Rejected RequestWorkerLeaseReply. So we need the timestamps to decide whether an /// upload is latest. diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.cc b/src/ray/raylet/scheduling/cluster_resource_manager.cc index 0332e9c990e34..78bdfa18c2d68 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.cc +++ b/src/ray/raylet/scheduling/cluster_resource_manager.cc @@ -253,6 +253,9 @@ bool ClusterResourceManager::UpdateNodeAvailableResourcesIfExist( for (auto &resource_id : node_resources->total.ResourceIds()) { node_resources->available.Set(resource_id, resources.Get(resource_id)); } + + // Update the idle duration for the node in terms of resources usage. + node_resources->idle_resource_duration_ms = resource_data.idle_duration_ms(); return true; }