Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add PREPARED state for placement groups. #46858

Merged
merged 11 commits into from
Aug 23, 2024
9 changes: 5 additions & 4 deletions cpp/include/ray/api/task_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ enum class PlacementStrategy {

enum PlacementGroupState {
PENDING = 0,
CREATED = 1,
REMOVED = 2,
RESCHEDULING = 3,
PREPARED = 1,
CREATED = 2,
REMOVED = 3,
RESCHEDULING = 4,
UNRECOGNIZED = -1,
};

Expand Down Expand Up @@ -112,4 +113,4 @@ struct ActorCreationOptions {
};
} // namespace internal

} // namespace ray
} // namespace ray
2 changes: 2 additions & 0 deletions python/ray/_private/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ def _gen_placement_group_info(self, placement_group_info):
def get_state(state):
if state == gcs_pb2.PlacementGroupTableData.PENDING:
return "PENDING"
elif state == gcs_pb2.PlacementGroupTableData.PREPARED:
return "PREPARED"
elif state == gcs_pb2.PlacementGroupTableData.CREATED:
return "CREATED"
elif state == gcs_pb2.PlacementGroupTableData.RESCHEDULING:
Expand Down
115 changes: 114 additions & 1 deletion python/ray/tests/test_placement_group_failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import ray.cluster_utils
from ray._private.test_utils import get_other_nodes, wait_for_condition
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.util import placement_group_table

MB = 1024 * 1024

Expand Down Expand Up @@ -161,5 +162,117 @@ def _check_actor_with_pg_is_ready():
)


@pytest.mark.parametrize("kill_bad_node", ["before_gcs_restart", "after_gcs_restart"])
def test_gcs_restart_when_pg_committing(
monkeypatch, ray_start_cluster_head_with_external_redis, kill_bad_node
):
"""
Tests GCS restart preserves already-committed bundles for a PREPARED pg.
Timeline:
1. Create a placement group with 2 bundles, no nodes yet.
- [Test] PENDING
2. Create 2 actors in the pg, one for each bundle.
3. Create 1 good node, and 1 slow committing node
- [Test] PREPARED
- [Test] There should be 1 alive actor.
4. Kill GCS.
- [Test] There should be 1 alive actor.
5. switch `kill_bad_node`
1. `kill_bad_node` == "before_gcs_restart":
i. kill the slow committing node.
ii. restart GCS.
2. `kill_bad_node` == "after_gcs_restart":
i. restart GCS.
- [Test] PREPARED
- [Test] There should be 1 alive actor.
ii. kill the slow committing node.
- [Test] PREPARED -> RESCHEDULING
- [Test] There should be 1 alive actor.
6. Add a new, normal node.
- [Test] RESCHEDULING -> CREATED
- [Test] There should be 2 alive actors.
"""
MY_RESOURCE_ONE = {"MyResource": 1}

@ray.remote(resources=MY_RESOURCE_ONE, num_cpus=0)
class Actor:
def ready(self):
return True

def alive_actors(actors):
"""Returns a list of actors that are alive."""
ping_map = {actor.ready.remote(): actor for actor in actors}
pings = list(ping_map.keys())
ready, _ = ray.wait(pings, timeout=1)
assert all(ray.get(ready)), f"{ready=}"
return [ping_map[ping] for ping in ready]

cluster = ray_start_cluster_head_with_external_redis

# 1. Create a placement group with 2 bundles, no nodes yet.
bundles = [MY_RESOURCE_ONE, MY_RESOURCE_ONE]
pg = ray.util.placement_group(
name="pg_2_nodes", strategy="STRICT_SPREAD", bundles=bundles
)
assert placement_group_table(pg)["state"] == "PENDING"

# 2. Create 2 actors in the pg, one for each bundle.
actor0 = Actor.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=0
)
).remote()
actor1 = Actor.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=1
)
).remote()

actors = [actor0, actor1]
print(f"Created 2 actors: {actors}")

# 3. Create 1 good node, and 1 slow committing node
cluster.add_node(num_cpus=1, resources=MY_RESOURCE_ONE)
with monkeypatch.context() as monkeypatch:
monkeypatch.setenv(
"RAY_testing_asio_delay_us",
"NodeManagerService.grpc_server.CommitBundleResources=500000000:500000000",
)
bad_node = cluster.add_node(num_cpus=1, resources=MY_RESOURCE_ONE)

assert not pg.wait(timeout_seconds=1)
assert placement_group_table(pg)["state"] == "PREPARED"
# Wait for the actor to be ready. One of them are ready.
assert len(alive_actors(actors)) == 1

# 4. Kill GCS.
cluster.head_node.kill_gcs_server()
assert len(alive_actors(actors)) == 1

if kill_bad_node == "before_gcs_restart":
# 5.1. Kill the slow committing node.
cluster.remove_node(bad_node)
# 5.2. Restart GCS.
cluster.head_node.start_gcs_server()
else:
assert kill_bad_node == "after_gcs_restart"
# 5.1. Restart GCS.
cluster.head_node.start_gcs_server()
assert placement_group_table(pg)["state"] == "PREPARED"
assert len(alive_actors(actors)) == 1
# 5.2. Kill the slow committing node.
cluster.remove_node(bad_node)

time.sleep(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get rid of time.sleep using wait_for_condition etc?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

assert placement_group_table(pg)["state"] == "RESCHEDULING"
assert len(alive_actors(actors)) == 1

# 6. Add a new, normal node.
cluster.add_node(num_cpus=1, resources=MY_RESOURCE_ONE)
assert pg.wait()
assert placement_group_table(pg)["state"] == "CREATED"
ray.get([actor.ready.remote() for actor in actors])


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
sys.exit(pytest.main(["-sv", __file__]))
1 change: 1 addition & 0 deletions python/ray/util/state/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
TypeActorStatus = Literal[tuple(ACTOR_STATUS)]
PLACEMENT_GROUP_STATUS = [
"PENDING",
"PREPARED",
"CREATED",
"REMOVED",
"RESCHEDULING",
Expand Down
13 changes: 5 additions & 8 deletions src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ class MockGcsPlacementGroupSchedulerInterface
public:
MOCK_METHOD(void,
ScheduleUnplacedBundles,
(std::shared_ptr<GcsPlacementGroup> placement_group,
PGSchedulingFailureCallback failure_callback,
PGSchedulingSuccessfulCallback success_callback),
(const SchedulePgRequest &request),
(override));
MOCK_METHOD((absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>>),
GetAndRemoveBundlesOnNode,
Expand All @@ -58,8 +56,9 @@ class MockGcsPlacementGroupSchedulerInterface
MOCK_METHOD(void,
Initialize,
((const absl::flat_hash_map<
PlacementGroupID,
std::vector<std::shared_ptr<BundleSpecification>>> &group_to_bundles)),
PlacementGroupID,
std::vector<std::shared_ptr<BundleSpecification>>> &group_to_bundles),
const std::vector<SchedulePgRequest> &prepared_pgs),
(override));
};

Expand Down Expand Up @@ -93,9 +92,7 @@ class MockGcsPlacementGroupScheduler : public GcsPlacementGroupScheduler {
public:
MOCK_METHOD(void,
ScheduleUnplacedBundles,
(std::shared_ptr<GcsPlacementGroup> placement_group,
PGSchedulingFailureCallback failure_handler,
PGSchedulingSuccessfulCallback success_handler),
(const SchedulePgRequest &request),
(override));
MOCK_METHOD(void,
DestroyPlacementGroupBundleResourcesIfExists,
Expand Down
Loading
Loading