Skip to content

Commit

Permalink
[core][autoscaler] Add pg details in details for gang request #37161
Browse files Browse the repository at this point in the history
This adds the PG-related info in details of a GangResourceRequest for observability. This info shouldn't be used for any scaling decisions of autoscaler, but just circles back from autoscaler to be included in cluster status output.
Signed-off-by: rickyyx <rickyx@anyscale.com>
  • Loading branch information
rickyyx authored Jul 8, 2023
1 parent e239453 commit a7852a4
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 0 deletions.
58 changes: 58 additions & 0 deletions python/ray/autoscaler/v2/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,35 @@ def assert_node_states(state: ClusterResourceState, expected_nodes: List[NodeSta
assert sorted(actual_node.dynamic_labels) == sorted(expected_node.labels)


@dataclass
class GangResourceRequest:
# Resource bundles.
bundles: List[dict]
# List of detail information about the request
details: List[str]


def assert_gang_requests(
state: ClusterResourceState, expected: List[GangResourceRequest]
):
"""
Assert a GetClusterResourceStateReply has gang requests that
matches with the expected requests.
"""
assert len(state.pending_gang_resource_requests) == len(expected)

# Sort all the requests by request's details
requests = sorted(
state.pending_gang_resource_requests, key=lambda request: request.details
)
expected = sorted(expected, key=lambda request: "".join(request.details))

for actual_request, expected_request in zip(requests, expected):
# Assert the detail contains the expected details
for detail_str in expected_request.details:
assert detail_str in actual_request.details


def test_request_cluster_resources_basic(shutdown_only):
ray.init(num_cpus=1)
stub = _autoscaler_state_service_stub()
Expand All @@ -119,6 +148,35 @@ def verify():
wait_for_condition(verify)


def test_pg_pending_gang_requests_basic(ray_start_cluster):
ray.init(num_cpus=1)

# Create a pg that's pending.
pg = ray.util.placement_group([{"CPU": 1}] * 3, strategy="STRICT_SPREAD")
try:
ray.get(pg.ready(), timeout=2)
except TimeoutError:
pass

pg_id = pg.id.hex()

stub = _autoscaler_state_service_stub()

def verify():
state = get_cluster_resource_state(stub)
assert_gang_requests(
state,
[
GangResourceRequest(
[{"CPU": 1}] * 3, details=[pg_id, "STRICT_SPREAD", "PENDING"]
)
],
)
return True

wait_for_condition(verify)


def test_pg_usage_labels(shutdown_only):

ray.init(num_cpus=1)
Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ void GcsAutoscalerStateManager::GetPendingGangResourceRequests(
PlacementGroupID::FromBinary(pg_data.placement_group_id()).Hex(),
pg_data.strategy());

// Add the strategy as detail info for the gang resource request.
gang_resource_req->set_details(FormatPlacementGroupDetails(pg_data));

// Copy the PG's bundles to the request.
for (const auto &bundle : pg_data.bundles()) {
if (!NodeID::FromBinary(bundle.node_id()).IsNil()) {
Expand Down
13 changes: 13 additions & 0 deletions src/ray/gcs/pb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,19 @@ inline std::string FormatPlacementGroupLabelName(const std::string &pg_id) {
return kPlacementGroupConstraintKeyPrefix + pg_id;
}

/// \brief Format placement group details.
/// Format:
/// <pg_id>:<strategy>:<state>
///
/// \param pg_data
/// \return
inline std::string FormatPlacementGroupDetails(
const rpc::PlacementGroupTableData &pg_data) {
return PlacementGroupID::FromBinary(pg_data.placement_group_id()).Hex() + ":" +
rpc::PlacementStrategy_Name(pg_data.strategy()) + "|" +
rpc::PlacementGroupTableData::PlacementGroupState_Name(pg_data.state());
}

/// Generate a placement constraint for placement group.
///
/// \param pg_id The ID of placement group.
Expand Down

0 comments on commit a7852a4

Please sign in to comment.