From 8b6a2c99d39471eb11a15ea9545b67509b816bac Mon Sep 17 00:00:00 2001 From: Ricky Xu Date: Thu, 6 Jul 2023 11:10:21 -0700 Subject: [PATCH] [core][autoscaler] Fix pg id serialization with hex rather than binary for cluster state reporting #37132 Why are these changes needed? The labels are declared as strings, and PG will generate (anti)affinity labels. The current implementation geneates _PG_ as the label key. However, binary chars are not encodable in string. This PR changes the pg generated dynamic labels to _PG_ which is more readable as well. --- python/ray/autoscaler/v2/tests/test_sdk.py | 62 +++++++++++++++---- .../gcs_autoscaler_state_manager.cc | 5 +- .../test/gcs_autoscaler_state_manager_test.cc | 26 ++++---- .../protobuf/experimental/autoscaler.proto | 4 +- 4 files changed, 67 insertions(+), 30 deletions(-) diff --git a/python/ray/autoscaler/v2/tests/test_sdk.py b/python/ray/autoscaler/v2/tests/test_sdk.py index 81e9422583e81..0799e9dc4a788 100644 --- a/python/ray/autoscaler/v2/tests/test_sdk.py +++ b/python/ray/autoscaler/v2/tests/test_sdk.py @@ -3,7 +3,7 @@ # coding: utf-8 from dataclasses import dataclass -from typing import Callable, List, Optional +from typing import Callable, List, Optional, Tuple import pytest @@ -29,6 +29,19 @@ def _autoscaler_state_service_stub(): return autoscaler_pb2_grpc.AutoscalerStateServiceStub(gcs_channel) +def get_node_ids() -> Tuple[str, List[str]]: + """Get the node ids of the head node and a worker node""" + head_node_id = None + nodes = list_nodes() + worker_node_ids = [] + for node in nodes: + if node.is_head_node: + head_node_id = node.node_id + else: + worker_node_ids += [node.node_id] + return head_node_id, worker_node_ids + + def assert_cluster_resource_constraints( state: ClusterResourceState, expected: List[dict] ): @@ -57,6 +70,7 @@ class NodeState: node_id: str node_status: NodeStatus idle_time_check_cb: Optional[Callable] = None + labels: Optional[dict] = None def assert_node_states(state: ClusterResourceState, expected_nodes: List[NodeState]): @@ -75,6 +89,9 @@ def assert_node_states(state: ClusterResourceState, expected_nodes: List[NodeSta if expected_node.idle_time_check_cb: assert expected_node.idle_time_check_cb(actual_node.idle_duration_ms) + if expected_node.labels: + assert sorted(actual_node.dynamic_labels) == sorted(expected_node.labels) + def test_request_cluster_resources_basic(shutdown_only): ray.init(num_cpus=1) @@ -101,6 +118,35 @@ def verify(): wait_for_condition(verify) +def test_pg_usage_labels(shutdown_only): + + ray.init(num_cpus=1) + + # Create a pg + pg = ray.util.placement_group([{"CPU": 1}]) + ray.get(pg.ready()) + + # Check the labels + stub = _autoscaler_state_service_stub() + head_node_id, _ = get_node_ids() + + pg_id = pg.id.hex() + + def verify(): + state = get_cluster_resource_state(stub) + assert_node_states( + state, + [ + NodeState( + head_node_id, NodeStatus.RUNNING, labels={f"_PG_{pg_id}": ""} + ), + ], + ) + return True + + wait_for_condition(verify) + + def test_node_state_lifecycle_basic(ray_start_cluster): cluster = ray_start_cluster @@ -119,18 +165,8 @@ def nodes_up(): wait_for_condition(nodes_up) - def get_node_ids(): - head_node_id = None - node_id = None - nodes = list_nodes() - for node in nodes: - if node.is_head_node: - head_node_id = node.node_id - else: - node_id = node.node_id - return head_node_id, node_id - - head_node_id, node_id = get_node_ids() + head_node_id, worker_node_ids = get_node_ids() + node_id = worker_node_ids[0] def verify_cluster_idle(): state = get_cluster_resource_state(stub) diff --git a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc index 21b7c73fadf70..cea8ec74e24cf 100644 --- a/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc @@ -126,7 +126,8 @@ void GcsAutoscalerStateManager::GetPendingGangResourceRequests( << "Placement group load should only include pending/rescheduling PGs. "; const auto pg_constraint = GenPlacementConstraintForPlacementGroup( - pg_data.placement_group_id(), pg_data.strategy()); + PlacementGroupID::FromBinary(pg_data.placement_group_id()).Hex(), + pg_data.strategy()); // Copy the PG's bundles to the request. for (const auto &bundle : pg_data.bundles()) { @@ -222,7 +223,7 @@ void GcsAutoscalerStateManager::GetNodeStates( NodeID::FromBinary(gcs_node_info.node_id())); for (const auto &[pg_id, _bundle_indices] : pgs_on_node) { node_state_proto->mutable_dynamic_labels()->insert( - {FormatPlacementGroupLabelName(pg_id.Binary()), ""}); + {FormatPlacementGroupLabelName(pg_id.Hex()), ""}); } }; diff --git a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc index 6d035b0082c58..f25bcf4fd4a46 100644 --- a/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc @@ -281,31 +281,31 @@ TEST_F(GcsAutoscalerStateManagerTest, TestGenPlacementConstraintForPlacementGrou auto pg = PlacementGroupID::Of(JobID::FromInt(0)); { auto strict_spread_constraint = GenPlacementConstraintForPlacementGroup( - pg.Binary(), rpc::PlacementStrategy::STRICT_SPREAD); + pg.Hex(), rpc::PlacementStrategy::STRICT_SPREAD); ASSERT_TRUE(strict_spread_constraint.has_value()); ASSERT_TRUE(strict_spread_constraint->has_anti_affinity()); ASSERT_EQ(strict_spread_constraint->anti_affinity().label_name(), - FormatPlacementGroupLabelName(pg.Binary())); + FormatPlacementGroupLabelName(pg.Hex())); } { auto strict_pack_constraint = GenPlacementConstraintForPlacementGroup( - pg.Binary(), rpc::PlacementStrategy::STRICT_PACK); + pg.Hex(), rpc::PlacementStrategy::STRICT_PACK); ASSERT_TRUE(strict_pack_constraint.has_value()); ASSERT_TRUE(strict_pack_constraint->has_affinity()); ASSERT_EQ(strict_pack_constraint->affinity().label_name(), - FormatPlacementGroupLabelName(pg.Binary())); + FormatPlacementGroupLabelName(pg.Hex())); } { - auto no_pg_constraint_for_pack = GenPlacementConstraintForPlacementGroup( - pg.Binary(), rpc::PlacementStrategy::PACK); + auto no_pg_constraint_for_pack = + GenPlacementConstraintForPlacementGroup(pg.Hex(), rpc::PlacementStrategy::PACK); ASSERT_FALSE(no_pg_constraint_for_pack.has_value()); } { - auto no_pg_constraint_for_spread = GenPlacementConstraintForPlacementGroup( - pg.Binary(), rpc::PlacementStrategy::SPREAD); + auto no_pg_constraint_for_spread = + GenPlacementConstraintForPlacementGroup(pg.Hex(), rpc::PlacementStrategy::SPREAD); ASSERT_FALSE(no_pg_constraint_for_spread.has_value()); } } @@ -378,8 +378,8 @@ TEST_F(GcsAutoscalerStateManagerTest, TestNodeDynamicLabelsWithPG) { const auto &state = GetClusterResourceStateSync(); ASSERT_EQ(state.node_states_size(), 1); CheckNodeLabels(state.node_states(0), - {{FormatPlacementGroupLabelName(pg1.Binary()), ""}, - {FormatPlacementGroupLabelName(pg2.Binary()), ""}}); + {{FormatPlacementGroupLabelName(pg1.Hex()), ""}, + {FormatPlacementGroupLabelName(pg2.Hex()), ""}}); } } @@ -450,7 +450,7 @@ TEST_F(GcsAutoscalerStateManagerTest, TestGangResourceRequestsBasic) { auto state = GetClusterResourceStateSync(); CheckGangResourceRequests(state, {{GenPlacementConstraintForPlacementGroup( - pg.Binary(), rpc::PlacementStrategy::STRICT_SPREAD) + pg.Hex(), rpc::PlacementStrategy::STRICT_SPREAD) ->DebugString(), {{{"CPU", 1}}, {{"GPU", 1}}}}}); } @@ -469,7 +469,7 @@ TEST_F(GcsAutoscalerStateManagerTest, TestGangResourceRequestsBasic) { auto state = GetClusterResourceStateSync(); CheckGangResourceRequests(state, {{GenPlacementConstraintForPlacementGroup( - pg.Binary(), rpc::PlacementStrategy::STRICT_PACK) + pg.Hex(), rpc::PlacementStrategy::STRICT_PACK) ->DebugString(), {{{"CPU", 1}}, {{"GPU", 1}}}}}); } @@ -534,7 +534,7 @@ TEST_F(GcsAutoscalerStateManagerTest, TestGangResourceRequestsPartialReschedulin // CPU_success_2 should not be reported as needed. CheckGangResourceRequests(state, {{GenPlacementConstraintForPlacementGroup( - pg1.Binary(), rpc::PlacementStrategy::STRICT_SPREAD) + pg1.Hex(), rpc::PlacementStrategy::STRICT_SPREAD) ->DebugString(), {{{"CPU_failed_1", 1}}}}}); } diff --git a/src/ray/protobuf/experimental/autoscaler.proto b/src/ray/protobuf/experimental/autoscaler.proto index 23c299ed95607..c26cae2b3b6ba 100644 --- a/src/ray/protobuf/experimental/autoscaler.proto +++ b/src/ray/protobuf/experimental/autoscaler.proto @@ -27,7 +27,7 @@ package ray.rpc.autoscaler; // value. // // This is now used to implement placement group anti-affinity, i.e. -// strict-spread. The label_name is "_PG_", +// strict-spread. The label_name is "_PG_", // and the label_value is empty string. message AntiAffinityConstraint { string label_name = 1; @@ -38,7 +38,7 @@ message AntiAffinityConstraint { // should be allocated to node with the same label name and value. // // This is now used to implement placement group affinity, i.e. -// strict-pack. The label_name is "_PG_", +// strict-pack. The label_name is "_PG_", // and the label_value is empty string. message AffinityConstraint { string label_name = 1;