Skip to content

Commit

Permalink
[core][autoscaler] Add idle time information to autoscaler endpoint. (#…
Browse files Browse the repository at this point in the history
…36918)

This PR populates the `idle_duration_ms` for each node, that's part of the cluster resources state. 

This allows autosclaer to decide if a node is idle. 

This completes #36670, which surfaced idle info from raylet.
  • Loading branch information
rickyyx authored Jun 30, 2023
1 parent 43210e0 commit 7275fad
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 49 deletions.
139 changes: 128 additions & 11 deletions python/ray/autoscaler/v2/tests/test_sdk.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
# coding: utf-8
import os
import sys
from typing import List

import pytest # noqa
# coding: utf-8
from dataclasses import dataclass
from typing import Callable, List, Optional

import pytest

import ray
import ray._private.ray_constants as ray_constants
from ray._private.test_utils import wait_for_condition
from ray.autoscaler.v2.sdk import request_cluster_resources
from ray.autoscaler.v2.tests.util import get_cluster_resource_state
from ray.core.generated.experimental import autoscaler_pb2_grpc
from ray.core.generated.experimental.autoscaler_pb2 import GetClusterResourceStateReply
from ray.core.generated.experimental.autoscaler_pb2 import (
ClusterResourceState,
NodeStatus,
)
from ray.util.state.api import list_nodes


def _autoscaler_state_service_stub():
Expand All @@ -24,16 +30,16 @@ def _autoscaler_state_service_stub():


def assert_cluster_resource_constraints(
reply: GetClusterResourceStateReply, expected: List[dict]
state: ClusterResourceState, expected: List[dict]
):
"""
Assert a GetClusterResourceStateReply has cluster_resource_constraints that
matches with the expected resources.
"""
# We only have 1 constraint for now.
assert len(reply.cluster_resource_constraints) == 1
assert len(state.cluster_resource_constraints) == 1

min_bundles = reply.cluster_resource_constraints[0].min_bundles
min_bundles = state.cluster_resource_constraints[0].min_bundles
assert len(min_bundles) == len(expected)

# Sort all the bundles by bundle's resource names
Expand All @@ -46,6 +52,30 @@ def assert_cluster_resource_constraints(
assert dict(actual_bundle.resources_bundle) == expected_bundle


@dataclass
class NodeState:
node_id: str
node_status: NodeStatus
idle_time_check_cb: Optional[Callable] = None


def assert_node_states(state: ClusterResourceState, expected_nodes: List[NodeState]):
"""
Assert a GetClusterResourceStateReply has node states that
matches with the expected nodes.
"""
assert len(state.node_states) == len(expected_nodes)

# Sort all the nodes by node's node_id
node_states = sorted(state.node_states, key=lambda node: node.node_id)
expected_nodes = sorted(expected_nodes, key=lambda node: node.node_id)

for actual_node, expected_node in zip(node_states, expected_nodes):
assert actual_node.status == expected_node.node_status
if expected_node.idle_time_check_cb:
assert expected_node.idle_time_check_cb(actual_node.idle_duration_ms)


def test_request_cluster_resources_basic(shutdown_only):
ray.init(num_cpus=1)
stub = _autoscaler_state_service_stub()
Expand All @@ -54,8 +84,8 @@ def test_request_cluster_resources_basic(shutdown_only):
request_cluster_resources([{"CPU": 1}])

def verify():
reply = get_cluster_resource_state(stub)
assert_cluster_resource_constraints(reply, [{"CPU": 1}])
state = get_cluster_resource_state(stub)
assert_cluster_resource_constraints(state, [{"CPU": 1}])
return True

wait_for_condition(verify)
Expand All @@ -64,13 +94,100 @@ def verify():
request_cluster_resources([{"CPU": 2, "GPU": 1}, {"CPU": 1}])

def verify():
reply = get_cluster_resource_state(stub)
assert_cluster_resource_constraints(reply, [{"CPU": 2, "GPU": 1}, {"CPU": 1}])
state = get_cluster_resource_state(stub)
assert_cluster_resource_constraints(state, [{"CPU": 2, "GPU": 1}, {"CPU": 1}])
return True

wait_for_condition(verify)


def test_node_state_lifecycle_basic(ray_start_cluster):

cluster = ray_start_cluster
cluster.add_node(num_cpus=0)
ray.init(address=cluster.address)

node = cluster.add_node(num_cpus=1)

stub = _autoscaler_state_service_stub()

# We don't have node id from `add_node` unfortunately.
def nodes_up():
nodes = list_nodes()
assert len(nodes) == 2
return True

wait_for_condition(nodes_up)

def get_node_ids():
head_node_id = None
node_id = None
nodes = list_nodes()
for node in nodes:
if node.is_head_node:
head_node_id = node.node_id
else:
node_id = node.node_id
return head_node_id, node_id

head_node_id, node_id = get_node_ids()

def verify_cluster_idle():
state = get_cluster_resource_state(stub)
assert_node_states(
state,
[
NodeState(node_id, NodeStatus.IDLE, lambda idle_ms: idle_ms > 0),
NodeState(head_node_id, NodeStatus.IDLE, lambda idle_ms: idle_ms > 0),
],
)
return True

wait_for_condition(verify_cluster_idle)

# Schedule a task running
@ray.remote(num_cpus=0.1)
def f():
while True:
pass

t = f.remote()

def verify_cluster_busy():
state = get_cluster_resource_state(stub)
assert_node_states(
state,
[
NodeState(node_id, NodeStatus.RUNNING, lambda idle_ms: idle_ms == 0),
NodeState(head_node_id, NodeStatus.IDLE, lambda idle_ms: idle_ms > 0),
],
)
return True

wait_for_condition(verify_cluster_busy)

# Kill the task
ray.cancel(t, force=True)

wait_for_condition(verify_cluster_idle)

# Kill the node.
cluster.remove_node(node)

def verify_cluster_no_node():
state = get_cluster_resource_state(stub)
assert_node_states(
state,
[
NodeState(node_id, NodeStatus.DEAD),
NodeState(head_node_id, NodeStatus.IDLE, lambda idle_ms: idle_ms > 0),
],
)
return True

wait_for_condition(verify_cluster_no_node)


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down
63 changes: 37 additions & 26 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,41 +183,52 @@ void GcsAutoscalerStateManager::GetPendingResourceRequests(

void GcsAutoscalerStateManager::GetNodeStates(
rpc::autoscaler::ClusterResourceState *state) {
auto populate_node_state = [&](const rpc::GcsNodeInfo &gcs_node_info,
rpc::autoscaler::NodeStatus status) {
auto populate_node_state = [&](const rpc::GcsNodeInfo &gcs_node_info) {
auto node_state_proto = state->add_node_states();
node_state_proto->set_node_id(gcs_node_info.node_id());
node_state_proto->set_instance_id(gcs_node_info.instance_id());
node_state_proto->set_ray_node_type_name(gcs_node_info.node_type_name());
node_state_proto->set_node_state_version(last_cluster_resource_state_version_);
node_state_proto->set_status(status);

if (status == rpc::autoscaler::RUNNING) {
auto const &node_resource_data = cluster_resource_manager_.GetNodeResources(
scheduling::NodeID(node_state_proto->node_id()));

// Copy resource available
const auto &available = node_resource_data.available.ToResourceMap();
node_state_proto->mutable_available_resources()->insert(available.begin(),
available.end());

// Copy total resources
const auto &total = node_resource_data.total.ToResourceMap();
node_state_proto->mutable_total_resources()->insert(total.begin(), total.end());

// Add dynamic PG labels.
const auto &pgs_on_node = gcs_placement_group_manager_.GetBundlesOnNode(
NodeID::FromBinary(gcs_node_info.node_id()));
for (const auto &[pg_id, _bundle_indices] : pgs_on_node) {
node_state_proto->mutable_dynamic_labels()->insert(
{FormatPlacementGroupLabelName(pg_id.Binary()), ""});
}

if (gcs_node_info.state() == rpc::GcsNodeInfo::DEAD) {
node_state_proto->set_status(rpc::autoscaler::NodeStatus::DEAD);
// We don't need populate other info for a dead node.
return;
}

// THe node is alive. We need to check if the node is idle.
auto const &node_resource_data = cluster_resource_manager_.GetNodeResources(
scheduling::NodeID(node_state_proto->node_id()));
if (node_resource_data.idle_resource_duration_ms > 0) {
// The node is idle.
node_state_proto->set_status(rpc::autoscaler::NodeStatus::IDLE);
node_state_proto->set_idle_duration_ms(
node_resource_data.idle_resource_duration_ms);
} else {
node_state_proto->set_status(rpc::autoscaler::NodeStatus::RUNNING);
}

// Copy resource available
const auto &available = node_resource_data.available.ToResourceMap();
node_state_proto->mutable_available_resources()->insert(available.begin(),
available.end());

// Copy total resources
const auto &total = node_resource_data.total.ToResourceMap();
node_state_proto->mutable_total_resources()->insert(total.begin(), total.end());

// Add dynamic PG labels.
const auto &pgs_on_node = gcs_placement_group_manager_.GetBundlesOnNode(
NodeID::FromBinary(gcs_node_info.node_id()));
for (const auto &[pg_id, _bundle_indices] : pgs_on_node) {
node_state_proto->mutable_dynamic_labels()->insert(
{FormatPlacementGroupLabelName(pg_id.Binary()), ""});
}
};

const auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes();
std::for_each(alive_nodes.begin(), alive_nodes.end(), [&](const auto &gcs_node_info) {
populate_node_state(*gcs_node_info.second, rpc::autoscaler::RUNNING);
populate_node_state(*gcs_node_info.second);
});

// This might be large if there are many nodes for a long-running cluster.
Expand All @@ -227,7 +238,7 @@ void GcsAutoscalerStateManager::GetNodeStates(
// https://github.com/ray-project/ray/issues/35874
const auto &dead_nodes = gcs_node_manager_.GetAllDeadNodes();
std::for_each(dead_nodes.begin(), dead_nodes.end(), [&](const auto &gcs_node_info) {
populate_node_state(*gcs_node_info.second, rpc::autoscaler::DEAD);
populate_node_state(*gcs_node_info.second);
});
}

Expand Down
Loading

0 comments on commit 7275fad

Please sign in to comment.