Skip to content

Commit

Permalink
[core][autoscaler] Add pg details in details for gang request ray-pro…
Browse files Browse the repository at this point in the history
…ject#37161

This adds the PG-related info in details of a GangResourceRequest for observability. This info shouldn't be used for any scaling decisions of autoscaler, but just circles back from autoscaler to be included in cluster status output.
Signed-off-by: rickyyx <rickyx@anyscale.com>
  • Loading branch information
rickyyx authored and Bhav00 committed Jul 28, 2023
1 parent 5a5b4db commit b494542
Showing 1 changed file with 29 additions and 98 deletions.
127 changes: 29 additions & 98 deletions python/ray/autoscaler/v2/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,104 +114,6 @@ def assert_node_states(
assert sorted(actual_node.dynamic_labels) == sorted(expected_node.labels)


@dataclass
class ExpectedNodeInfo:
node_id: Optional[str] = None
node_status: Optional[str] = None
idle_time_check_cb: Optional[Callable] = None
instance_id: Optional[str] = None
ray_node_type_name: Optional[str] = None
instance_type_name: Optional[str] = None
ip_address: Optional[str] = None
details: Optional[str] = None

# Check those resources are included in the actual node info.
total_resources: Optional[dict] = None
available_resources: Optional[dict] = None


def assert_nodes(actual_nodes: List[NodeInfo], expected_nodes: List[ExpectedNodeInfo]):

assert len(actual_nodes) == len(expected_nodes)
# Sort the nodes by id.
actual_nodes = sorted(actual_nodes, key=lambda node: node.node_id)
expected_nodes = sorted(expected_nodes, key=lambda node: node.node_id)

for actual_node, expected_node in zip(actual_nodes, expected_nodes):
if expected_node.node_id is not None:
assert actual_node.node_id == expected_node.node_id
if expected_node.node_status is not None:
assert actual_node.node_status == expected_node.node_status
if expected_node.instance_id is not None:
assert actual_node.instance_id == expected_node.instance_id
if expected_node.ray_node_type_name is not None:
assert actual_node.ray_node_type_name == expected_node.ray_node_type_name
if expected_node.instance_type_name is not None:
assert actual_node.instance_type_name == expected_node.instance_type_name
if expected_node.ip_address is not None:
assert actual_node.ip_address == expected_node.ip_address
if expected_node.details is not None:
assert expected_node.details in actual_node.details

if expected_node.idle_time_check_cb:
assert expected_node.idle_time_check_cb(
actual_node.resource_usage.idle_time_ms
)

if expected_node.total_resources:
for resource_name, total in expected_node.total_resources.items():
assert (
total
== get_total_resources(actual_node.resource_usage.usage)[
resource_name
]
)

if expected_node.available_resources:
for resource_name, available in expected_node.available_resources.items():
assert (
available
== get_available_resources(actual_node.resource_usage.usage)[
resource_name
]
)


def assert_launches(
cluster_status: ClusterStatus,
expected_pending_launches: List[LaunchRequest],
expected_failed_launches: List[LaunchRequest],
):
def assert_launches(actuals, expects):
for actual, expect in zip(actuals, expects):
assert actual.instance_type_name == expect.instance_type_name
assert actual.ray_node_type_name == expect.ray_node_type_name
assert actual.count == expect.count
assert actual.state == expect.state
assert actual.request_ts_s == expect.request_ts_s

assert len(cluster_status.pending_launches) == len(expected_pending_launches)
assert len(cluster_status.failed_launches) == len(expected_failed_launches)

actual_pending = sorted(
cluster_status.pending_launches, key=lambda launch: launch.ray_node_type_name
)
expected_pending = sorted(
expected_pending_launches, key=lambda launch: launch.ray_node_type_name
)

assert_launches(actual_pending, expected_pending)

actual_failed = sorted(
cluster_status.failed_launches, key=lambda launch: launch.ray_node_type_name
)
expected_failed = sorted(
expected_failed_launches, key=lambda launch: launch.ray_node_type_name
)

assert_launches(actual_failed, expected_failed)


@dataclass
class GangResourceRequest:
# Resource bundles.
Expand Down Expand Up @@ -336,6 +238,35 @@ def verify():
wait_for_condition(verify)


def test_pg_pending_gang_requests_basic(ray_start_cluster):
ray.init(num_cpus=1)

# Create a pg that's pending.
pg = ray.util.placement_group([{"CPU": 1}] * 3, strategy="STRICT_SPREAD")
try:
ray.get(pg.ready(), timeout=2)
except TimeoutError:
pass

pg_id = pg.id.hex()

stub = _autoscaler_state_service_stub()

def verify():
state = get_cluster_resource_state(stub)
assert_gang_requests(
state,
[
GangResourceRequest(
[{"CPU": 1}] * 3, details=[pg_id, "STRICT_SPREAD", "PENDING"]
)
],
)
return True

wait_for_condition(verify)


def test_pg_usage_labels(shutdown_only):

ray.init(num_cpus=1)
Expand Down

0 comments on commit b494542

Please sign in to comment.