Skip to content

Commit

Permalink
[core] Add PREPARED state for placement groups. (#46858)
Browse files Browse the repository at this point in the history
Ray Placement Groups use 2pc protocol to reserve resources. However we
don't track PREPARED state; we only had PENDING (before any prepare) and
CREATED (after all committed). This means when the GCS restarts, it does
not know any Commits ever happened. Indeed, it instructs all raylets to
return all bundles prepared or committed.

If the GCS restart happens when a PG has finished all Prepare and is
doing Commit, the already-Committed bundles may have scheduled
actors/tasks, and those actors/tasks may be killed by the GCS restart,
making the GCS restart user-visible.

This PR introduces a PREPARED state between PENDING and CREATED. Rules:

- (New!) On a PG received all Prepare replies successfully: state change
PENDING -> PREPARED, and persist to Redis. This also persists all bundle
locations (node IDs).
- After the persist: send all Commit calls.
- On a PG received all Commit replies successfully: state change
PREPARED -> CREATED, and persist to Redis.
- On GCS Restart, if a PG is PENDING: return any bundles, reschedule
from zero.
- (New!) On GCS Restart, if a PG is PREPARED, continue to send all
Commit calls.

In order to do this cleanly, did a bunch of refactoring. The idea is to
keep all interface changes to the `GcsPlacementGroupScheduler` but not
to `GcsPlacementGroupManager`. Specifically:

- `GcsPlacementGroupScheduler::ScheduleUnplacedBundles` now accepts an
aggregated arg struct `SchedulePgRequest`.
- `GcsPlacementGroupScheduler::Initialize` accepts one more vector of
arg struct `SchedulePreparedPgRequest`, which is the regular
`SchedulePgRequest` plus the prepared bundle information.
- `GcsPlacementGroupManager::Initialize` now distinguishes PREPARED PGs
by creating requests and send them to the Scheduler. They are also kept
in `used_bundles` to restrain from being removed (<- actual fix).
- Unit test for the original issue. Failing on master and passing on
this PR.
- Quality of life changes: `RAY_LOG().WithField(PlacementGroupID)`.

Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: Ruiyang Wang <56065503+rynewang@users.noreply.github.com>
Co-authored-by: SangBin Cho <rkooo567@gmail.com>
  • Loading branch information
rynewang and rkooo567 authored Aug 23, 2024
1 parent 43250f4 commit 2897f46
Show file tree
Hide file tree
Showing 13 changed files with 421 additions and 125 deletions.
9 changes: 5 additions & 4 deletions cpp/include/ray/api/task_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ enum class PlacementStrategy {

enum PlacementGroupState {
PENDING = 0,
CREATED = 1,
REMOVED = 2,
RESCHEDULING = 3,
PREPARED = 1,
CREATED = 2,
REMOVED = 3,
RESCHEDULING = 4,
UNRECOGNIZED = -1,
};

Expand Down Expand Up @@ -112,4 +113,4 @@ struct ActorCreationOptions {
};
} // namespace internal

} // namespace ray
} // namespace ray
2 changes: 2 additions & 0 deletions python/ray/_private/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ def _gen_placement_group_info(self, placement_group_info):
def get_state(state):
if state == gcs_pb2.PlacementGroupTableData.PENDING:
return "PENDING"
elif state == gcs_pb2.PlacementGroupTableData.PREPARED:
return "PREPARED"
elif state == gcs_pb2.PlacementGroupTableData.CREATED:
return "CREATED"
elif state == gcs_pb2.PlacementGroupTableData.RESCHEDULING:
Expand Down
115 changes: 114 additions & 1 deletion python/ray/tests/test_placement_group_failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import ray.cluster_utils
from ray._private.test_utils import get_other_nodes, wait_for_condition
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.util import placement_group_table

MB = 1024 * 1024

Expand Down Expand Up @@ -161,5 +162,117 @@ def _check_actor_with_pg_is_ready():
)


@pytest.mark.parametrize("kill_bad_node", ["before_gcs_restart", "after_gcs_restart"])
def test_gcs_restart_when_pg_committing(
monkeypatch, ray_start_cluster_head_with_external_redis, kill_bad_node
):
"""
Tests GCS restart preserves already-committed bundles for a PREPARED pg.
Timeline:
1. Create a placement group with 2 bundles, no nodes yet.
- [Test] PENDING
2. Create 2 actors in the pg, one for each bundle.
3. Create 1 good node, and 1 slow committing node
- [Test] PREPARED
- [Test] There should be 1 alive actor.
4. Kill GCS.
- [Test] There should be 1 alive actor.
5. switch `kill_bad_node`
1. `kill_bad_node` == "before_gcs_restart":
i. kill the slow committing node.
ii. restart GCS.
2. `kill_bad_node` == "after_gcs_restart":
i. restart GCS.
- [Test] PREPARED
- [Test] There should be 1 alive actor.
ii. kill the slow committing node.
- [Test] PREPARED -> RESCHEDULING
- [Test] There should be 1 alive actor.
6. Add a new, normal node.
- [Test] RESCHEDULING -> CREATED
- [Test] There should be 2 alive actors.
"""
MY_RESOURCE_ONE = {"MyResource": 1}

@ray.remote(resources=MY_RESOURCE_ONE, num_cpus=0)
class Actor:
def ready(self):
return True

def alive_actors(actors):
"""Returns a list of actors that are alive."""
ping_map = {actor.ready.remote(): actor for actor in actors}
pings = list(ping_map.keys())
ready, _ = ray.wait(pings, timeout=1)
assert all(ray.get(ready)), f"{ready=}"
return [ping_map[ping] for ping in ready]

cluster = ray_start_cluster_head_with_external_redis

# 1. Create a placement group with 2 bundles, no nodes yet.
bundles = [MY_RESOURCE_ONE, MY_RESOURCE_ONE]
pg = ray.util.placement_group(
name="pg_2_nodes", strategy="STRICT_SPREAD", bundles=bundles
)
assert placement_group_table(pg)["state"] == "PENDING"

# 2. Create 2 actors in the pg, one for each bundle.
actor0 = Actor.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=0
)
).remote()
actor1 = Actor.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=1
)
).remote()

actors = [actor0, actor1]
print(f"Created 2 actors: {actors}")

# 3. Create 1 good node, and 1 slow committing node
cluster.add_node(num_cpus=1, resources=MY_RESOURCE_ONE)
with monkeypatch.context() as monkeypatch:
monkeypatch.setenv(
"RAY_testing_asio_delay_us",
"NodeManagerService.grpc_server.CommitBundleResources=500000000:500000000",
)
bad_node = cluster.add_node(num_cpus=1, resources=MY_RESOURCE_ONE)

assert not pg.wait(timeout_seconds=1)
assert placement_group_table(pg)["state"] == "PREPARED"
# Wait for the actor to be ready. One of them are ready.
assert len(alive_actors(actors)) == 1

# 4. Kill GCS.
cluster.head_node.kill_gcs_server()
assert len(alive_actors(actors)) == 1

if kill_bad_node == "before_gcs_restart":
# 5.1. Kill the slow committing node.
cluster.remove_node(bad_node)
# 5.2. Restart GCS.
cluster.head_node.start_gcs_server()
else:
assert kill_bad_node == "after_gcs_restart"
# 5.1. Restart GCS.
cluster.head_node.start_gcs_server()
assert placement_group_table(pg)["state"] == "PREPARED"
assert len(alive_actors(actors)) == 1
# 5.2. Kill the slow committing node.
cluster.remove_node(bad_node)

time.sleep(1)
assert placement_group_table(pg)["state"] == "RESCHEDULING"
assert len(alive_actors(actors)) == 1

# 6. Add a new, normal node.
cluster.add_node(num_cpus=1, resources=MY_RESOURCE_ONE)
assert pg.wait()
assert placement_group_table(pg)["state"] == "CREATED"
ray.get([actor.ready.remote() for actor in actors])


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
sys.exit(pytest.main(["-sv", __file__]))
1 change: 1 addition & 0 deletions python/ray/util/state/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
TypeActorStatus = Literal[tuple(ACTOR_STATUS)]
PLACEMENT_GROUP_STATUS = [
"PENDING",
"PREPARED",
"CREATED",
"REMOVED",
"RESCHEDULING",
Expand Down
13 changes: 5 additions & 8 deletions src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ class MockGcsPlacementGroupSchedulerInterface
public:
MOCK_METHOD(void,
ScheduleUnplacedBundles,
(std::shared_ptr<GcsPlacementGroup> placement_group,
PGSchedulingFailureCallback failure_callback,
PGSchedulingSuccessfulCallback success_callback),
(const SchedulePgRequest &request),
(override));
MOCK_METHOD((absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>>),
GetAndRemoveBundlesOnNode,
Expand All @@ -58,8 +56,9 @@ class MockGcsPlacementGroupSchedulerInterface
MOCK_METHOD(void,
Initialize,
((const absl::flat_hash_map<
PlacementGroupID,
std::vector<std::shared_ptr<BundleSpecification>>> &group_to_bundles)),
PlacementGroupID,
std::vector<std::shared_ptr<BundleSpecification>>> &group_to_bundles),
const std::vector<SchedulePgRequest> &prepared_pgs),
(override));
};

Expand Down Expand Up @@ -93,9 +92,7 @@ class MockGcsPlacementGroupScheduler : public GcsPlacementGroupScheduler {
public:
MOCK_METHOD(void,
ScheduleUnplacedBundles,
(std::shared_ptr<GcsPlacementGroup> placement_group,
PGSchedulingFailureCallback failure_handler,
PGSchedulingSuccessfulCallback success_handler),
(const SchedulePgRequest &request),
(override));
MOCK_METHOD(void,
DestroyPlacementGroupBundleResourcesIfExists,
Expand Down
Loading

0 comments on commit 2897f46

Please sign in to comment.