Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][autoscaler] Fix pg id serialization with hex rather than binary for cluster state reporting #37132 #37176

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 49 additions & 13 deletions python/ray/autoscaler/v2/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

# coding: utf-8
from dataclasses import dataclass
from typing import Callable, List, Optional
from typing import Callable, List, Optional, Tuple

import pytest

Expand All @@ -29,6 +29,19 @@ def _autoscaler_state_service_stub():
return autoscaler_pb2_grpc.AutoscalerStateServiceStub(gcs_channel)


def get_node_ids() -> Tuple[str, List[str]]:
"""Get the node ids of the head node and a worker node"""
head_node_id = None
nodes = list_nodes()
worker_node_ids = []
for node in nodes:
if node.is_head_node:
head_node_id = node.node_id
else:
worker_node_ids += [node.node_id]
return head_node_id, worker_node_ids


def assert_cluster_resource_constraints(
state: ClusterResourceState, expected: List[dict]
):
Expand Down Expand Up @@ -57,6 +70,7 @@ class NodeState:
node_id: str
node_status: NodeStatus
idle_time_check_cb: Optional[Callable] = None
labels: Optional[dict] = None


def assert_node_states(state: ClusterResourceState, expected_nodes: List[NodeState]):
Expand All @@ -75,6 +89,9 @@ def assert_node_states(state: ClusterResourceState, expected_nodes: List[NodeSta
if expected_node.idle_time_check_cb:
assert expected_node.idle_time_check_cb(actual_node.idle_duration_ms)

if expected_node.labels:
assert sorted(actual_node.dynamic_labels) == sorted(expected_node.labels)


def test_request_cluster_resources_basic(shutdown_only):
ray.init(num_cpus=1)
Expand All @@ -101,6 +118,35 @@ def verify():
wait_for_condition(verify)


def test_pg_usage_labels(shutdown_only):

ray.init(num_cpus=1)

# Create a pg
pg = ray.util.placement_group([{"CPU": 1}])
ray.get(pg.ready())

# Check the labels
stub = _autoscaler_state_service_stub()
head_node_id, _ = get_node_ids()

pg_id = pg.id.hex()

def verify():
state = get_cluster_resource_state(stub)
assert_node_states(
state,
[
NodeState(
head_node_id, NodeStatus.RUNNING, labels={f"_PG_{pg_id}": ""}
),
],
)
return True

wait_for_condition(verify)


def test_node_state_lifecycle_basic(ray_start_cluster):

cluster = ray_start_cluster
Expand All @@ -119,18 +165,8 @@ def nodes_up():

wait_for_condition(nodes_up)

def get_node_ids():
head_node_id = None
node_id = None
nodes = list_nodes()
for node in nodes:
if node.is_head_node:
head_node_id = node.node_id
else:
node_id = node.node_id
return head_node_id, node_id

head_node_id, node_id = get_node_ids()
head_node_id, worker_node_ids = get_node_ids()
node_id = worker_node_ids[0]

def verify_cluster_idle():
state = get_cluster_resource_state(stub)
Expand Down
5 changes: 3 additions & 2 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ void GcsAutoscalerStateManager::GetPendingGangResourceRequests(
<< "Placement group load should only include pending/rescheduling PGs. ";

const auto pg_constraint = GenPlacementConstraintForPlacementGroup(
pg_data.placement_group_id(), pg_data.strategy());
PlacementGroupID::FromBinary(pg_data.placement_group_id()).Hex(),
pg_data.strategy());

// Copy the PG's bundles to the request.
for (const auto &bundle : pg_data.bundles()) {
Expand Down Expand Up @@ -222,7 +223,7 @@ void GcsAutoscalerStateManager::GetNodeStates(
NodeID::FromBinary(gcs_node_info.node_id()));
for (const auto &[pg_id, _bundle_indices] : pgs_on_node) {
node_state_proto->mutable_dynamic_labels()->insert(
{FormatPlacementGroupLabelName(pg_id.Binary()), ""});
{FormatPlacementGroupLabelName(pg_id.Hex()), ""});
}
};

Expand Down
26 changes: 13 additions & 13 deletions src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,31 +281,31 @@ TEST_F(GcsAutoscalerStateManagerTest, TestGenPlacementConstraintForPlacementGrou
auto pg = PlacementGroupID::Of(JobID::FromInt(0));
{
auto strict_spread_constraint = GenPlacementConstraintForPlacementGroup(
pg.Binary(), rpc::PlacementStrategy::STRICT_SPREAD);
pg.Hex(), rpc::PlacementStrategy::STRICT_SPREAD);
ASSERT_TRUE(strict_spread_constraint.has_value());
ASSERT_TRUE(strict_spread_constraint->has_anti_affinity());
ASSERT_EQ(strict_spread_constraint->anti_affinity().label_name(),
FormatPlacementGroupLabelName(pg.Binary()));
FormatPlacementGroupLabelName(pg.Hex()));
}

{
auto strict_pack_constraint = GenPlacementConstraintForPlacementGroup(
pg.Binary(), rpc::PlacementStrategy::STRICT_PACK);
pg.Hex(), rpc::PlacementStrategy::STRICT_PACK);
ASSERT_TRUE(strict_pack_constraint.has_value());
ASSERT_TRUE(strict_pack_constraint->has_affinity());
ASSERT_EQ(strict_pack_constraint->affinity().label_name(),
FormatPlacementGroupLabelName(pg.Binary()));
FormatPlacementGroupLabelName(pg.Hex()));
}

{
auto no_pg_constraint_for_pack = GenPlacementConstraintForPlacementGroup(
pg.Binary(), rpc::PlacementStrategy::PACK);
auto no_pg_constraint_for_pack =
GenPlacementConstraintForPlacementGroup(pg.Hex(), rpc::PlacementStrategy::PACK);
ASSERT_FALSE(no_pg_constraint_for_pack.has_value());
}

{
auto no_pg_constraint_for_spread = GenPlacementConstraintForPlacementGroup(
pg.Binary(), rpc::PlacementStrategy::SPREAD);
auto no_pg_constraint_for_spread =
GenPlacementConstraintForPlacementGroup(pg.Hex(), rpc::PlacementStrategy::SPREAD);
ASSERT_FALSE(no_pg_constraint_for_spread.has_value());
}
}
Expand Down Expand Up @@ -378,8 +378,8 @@ TEST_F(GcsAutoscalerStateManagerTest, TestNodeDynamicLabelsWithPG) {
const auto &state = GetClusterResourceStateSync();
ASSERT_EQ(state.node_states_size(), 1);
CheckNodeLabels(state.node_states(0),
{{FormatPlacementGroupLabelName(pg1.Binary()), ""},
{FormatPlacementGroupLabelName(pg2.Binary()), ""}});
{{FormatPlacementGroupLabelName(pg1.Hex()), ""},
{FormatPlacementGroupLabelName(pg2.Hex()), ""}});
}
}

Expand Down Expand Up @@ -450,7 +450,7 @@ TEST_F(GcsAutoscalerStateManagerTest, TestGangResourceRequestsBasic) {
auto state = GetClusterResourceStateSync();
CheckGangResourceRequests(state,
{{GenPlacementConstraintForPlacementGroup(
pg.Binary(), rpc::PlacementStrategy::STRICT_SPREAD)
pg.Hex(), rpc::PlacementStrategy::STRICT_SPREAD)
->DebugString(),
{{{"CPU", 1}}, {{"GPU", 1}}}}});
}
Expand All @@ -469,7 +469,7 @@ TEST_F(GcsAutoscalerStateManagerTest, TestGangResourceRequestsBasic) {
auto state = GetClusterResourceStateSync();
CheckGangResourceRequests(state,
{{GenPlacementConstraintForPlacementGroup(
pg.Binary(), rpc::PlacementStrategy::STRICT_PACK)
pg.Hex(), rpc::PlacementStrategy::STRICT_PACK)
->DebugString(),
{{{"CPU", 1}}, {{"GPU", 1}}}}});
}
Expand Down Expand Up @@ -534,7 +534,7 @@ TEST_F(GcsAutoscalerStateManagerTest, TestGangResourceRequestsPartialReschedulin
// CPU_success_2 should not be reported as needed.
CheckGangResourceRequests(state,
{{GenPlacementConstraintForPlacementGroup(
pg1.Binary(), rpc::PlacementStrategy::STRICT_SPREAD)
pg1.Hex(), rpc::PlacementStrategy::STRICT_SPREAD)
->DebugString(),
{{{"CPU_failed_1", 1}}}}});
}
Expand Down
4 changes: 2 additions & 2 deletions src/ray/protobuf/experimental/autoscaler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ package ray.rpc.autoscaler;
// value.
//
// This is now used to implement placement group anti-affinity, i.e.
// strict-spread. The label_name is "_PG_<pg_id>",
// strict-spread. The label_name is "_PG_<pg_id_in_hex>",
// and the label_value is empty string.
message AntiAffinityConstraint {
string label_name = 1;
Expand All @@ -38,7 +38,7 @@ message AntiAffinityConstraint {
// should be allocated to node with the same label name and value.
//
// This is now used to implement placement group affinity, i.e.
// strict-pack. The label_name is "_PG_<pg_id>",
// strict-pack. The label_name is "_PG_<pg_id_in_hex>",
// and the label_value is empty string.
message AffinityConstraint {
string label_name = 1;
Expand Down