Skip to content

Commit

Permalink
Revert "[core] Add PREPARED state for placement groups. (#46858)"
Browse files Browse the repository at this point in the history
This reverts commit 2897f46.
  • Loading branch information
can-anyscale authored Aug 23, 2024
1 parent 9908246 commit 3a30641
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 421 deletions.
9 changes: 4 additions & 5 deletions cpp/include/ray/api/task_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ enum class PlacementStrategy {

enum PlacementGroupState {
PENDING = 0,
PREPARED = 1,
CREATED = 2,
REMOVED = 3,
RESCHEDULING = 4,
CREATED = 1,
REMOVED = 2,
RESCHEDULING = 3,
UNRECOGNIZED = -1,
};

Expand Down Expand Up @@ -113,4 +112,4 @@ struct ActorCreationOptions {
};
} // namespace internal

} // namespace ray
} // namespace ray
2 changes: 0 additions & 2 deletions python/ray/_private/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,6 @@ def _gen_placement_group_info(self, placement_group_info):
def get_state(state):
if state == gcs_pb2.PlacementGroupTableData.PENDING:
return "PENDING"
elif state == gcs_pb2.PlacementGroupTableData.PREPARED:
return "PREPARED"
elif state == gcs_pb2.PlacementGroupTableData.CREATED:
return "CREATED"
elif state == gcs_pb2.PlacementGroupTableData.RESCHEDULING:
Expand Down
115 changes: 1 addition & 114 deletions python/ray/tests/test_placement_group_failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import ray.cluster_utils
from ray._private.test_utils import get_other_nodes, wait_for_condition
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
from ray.util import placement_group_table

MB = 1024 * 1024

Expand Down Expand Up @@ -162,117 +161,5 @@ def _check_actor_with_pg_is_ready():
)


@pytest.mark.parametrize("kill_bad_node", ["before_gcs_restart", "after_gcs_restart"])
def test_gcs_restart_when_pg_committing(
monkeypatch, ray_start_cluster_head_with_external_redis, kill_bad_node
):
"""
Tests GCS restart preserves already-committed bundles for a PREPARED pg.
Timeline:
1. Create a placement group with 2 bundles, no nodes yet.
- [Test] PENDING
2. Create 2 actors in the pg, one for each bundle.
3. Create 1 good node, and 1 slow committing node
- [Test] PREPARED
- [Test] There should be 1 alive actor.
4. Kill GCS.
- [Test] There should be 1 alive actor.
5. switch `kill_bad_node`
1. `kill_bad_node` == "before_gcs_restart":
i. kill the slow committing node.
ii. restart GCS.
2. `kill_bad_node` == "after_gcs_restart":
i. restart GCS.
- [Test] PREPARED
- [Test] There should be 1 alive actor.
ii. kill the slow committing node.
- [Test] PREPARED -> RESCHEDULING
- [Test] There should be 1 alive actor.
6. Add a new, normal node.
- [Test] RESCHEDULING -> CREATED
- [Test] There should be 2 alive actors.
"""
MY_RESOURCE_ONE = {"MyResource": 1}

@ray.remote(resources=MY_RESOURCE_ONE, num_cpus=0)
class Actor:
def ready(self):
return True

def alive_actors(actors):
"""Returns a list of actors that are alive."""
ping_map = {actor.ready.remote(): actor for actor in actors}
pings = list(ping_map.keys())
ready, _ = ray.wait(pings, timeout=1)
assert all(ray.get(ready)), f"{ready=}"
return [ping_map[ping] for ping in ready]

cluster = ray_start_cluster_head_with_external_redis

# 1. Create a placement group with 2 bundles, no nodes yet.
bundles = [MY_RESOURCE_ONE, MY_RESOURCE_ONE]
pg = ray.util.placement_group(
name="pg_2_nodes", strategy="STRICT_SPREAD", bundles=bundles
)
assert placement_group_table(pg)["state"] == "PENDING"

# 2. Create 2 actors in the pg, one for each bundle.
actor0 = Actor.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=0
)
).remote()
actor1 = Actor.options(
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg, placement_group_bundle_index=1
)
).remote()

actors = [actor0, actor1]
print(f"Created 2 actors: {actors}")

# 3. Create 1 good node, and 1 slow committing node
cluster.add_node(num_cpus=1, resources=MY_RESOURCE_ONE)
with monkeypatch.context() as monkeypatch:
monkeypatch.setenv(
"RAY_testing_asio_delay_us",
"NodeManagerService.grpc_server.CommitBundleResources=500000000:500000000",
)
bad_node = cluster.add_node(num_cpus=1, resources=MY_RESOURCE_ONE)

assert not pg.wait(timeout_seconds=1)
assert placement_group_table(pg)["state"] == "PREPARED"
# Wait for the actor to be ready. One of them are ready.
assert len(alive_actors(actors)) == 1

# 4. Kill GCS.
cluster.head_node.kill_gcs_server()
assert len(alive_actors(actors)) == 1

if kill_bad_node == "before_gcs_restart":
# 5.1. Kill the slow committing node.
cluster.remove_node(bad_node)
# 5.2. Restart GCS.
cluster.head_node.start_gcs_server()
else:
assert kill_bad_node == "after_gcs_restart"
# 5.1. Restart GCS.
cluster.head_node.start_gcs_server()
assert placement_group_table(pg)["state"] == "PREPARED"
assert len(alive_actors(actors)) == 1
# 5.2. Kill the slow committing node.
cluster.remove_node(bad_node)

time.sleep(1)
assert placement_group_table(pg)["state"] == "RESCHEDULING"
assert len(alive_actors(actors)) == 1

# 6. Add a new, normal node.
cluster.add_node(num_cpus=1, resources=MY_RESOURCE_ONE)
assert pg.wait()
assert placement_group_table(pg)["state"] == "CREATED"
ray.get([actor.ready.remote() for actor in actors])


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
sys.exit(pytest.main(["-v", __file__]))
1 change: 0 additions & 1 deletion python/ray/util/state/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
TypeActorStatus = Literal[tuple(ACTOR_STATUS)]
PLACEMENT_GROUP_STATUS = [
"PENDING",
"PREPARED",
"CREATED",
"REMOVED",
"RESCHEDULING",
Expand Down
13 changes: 8 additions & 5 deletions src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class MockGcsPlacementGroupSchedulerInterface
public:
MOCK_METHOD(void,
ScheduleUnplacedBundles,
(const SchedulePgRequest &request),
(std::shared_ptr<GcsPlacementGroup> placement_group,
PGSchedulingFailureCallback failure_callback,
PGSchedulingSuccessfulCallback success_callback),
(override));
MOCK_METHOD((absl::flat_hash_map<PlacementGroupID, std::vector<int64_t>>),
GetAndRemoveBundlesOnNode,
Expand All @@ -56,9 +58,8 @@ class MockGcsPlacementGroupSchedulerInterface
MOCK_METHOD(void,
Initialize,
((const absl::flat_hash_map<
PlacementGroupID,
std::vector<std::shared_ptr<BundleSpecification>>> &group_to_bundles),
const std::vector<SchedulePgRequest> &prepared_pgs),
PlacementGroupID,
std::vector<std::shared_ptr<BundleSpecification>>> &group_to_bundles)),
(override));
};

Expand Down Expand Up @@ -92,7 +93,9 @@ class MockGcsPlacementGroupScheduler : public GcsPlacementGroupScheduler {
public:
MOCK_METHOD(void,
ScheduleUnplacedBundles,
(const SchedulePgRequest &request),
(std::shared_ptr<GcsPlacementGroup> placement_group,
PGSchedulingFailureCallback failure_handler,
PGSchedulingSuccessfulCallback success_handler),
(override));
MOCK_METHOD(void,
DestroyPlacementGroupBundleResourcesIfExists,
Expand Down
Loading

0 comments on commit 3a30641

Please sign in to comment.