From 3a306414fa7af94657034cb300dcabe7b769bd07 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen <128072568+can-anyscale@users.noreply.github.com> Date: Fri, 23 Aug 2024 16:11:49 -0700 Subject: [PATCH] Revert "[core] Add PREPARED state for placement groups. (#46858)" This reverts commit 2897f46f359fb93e021349aa7e88faf5c8c0a3c0. --- cpp/include/ray/api/task_options.h | 9 +- python/ray/_private/state.py | 2 - .../tests/test_placement_group_failover.py | 115 +------------ python/ray/util/state/custom_types.py | 1 - .../gcs_placement_group_scheduler.h | 13 +- .../gcs_server/gcs_placement_group_manager.cc | 162 ++++++------------ .../gcs_placement_group_scheduler.cc | 79 +-------- .../gcs_placement_group_scheduler.h | 37 ++-- .../gcs_placement_group_manager_mock_test.cc | 31 ++-- .../test/gcs_placement_group_manager_test.cc | 23 ++- .../gcs_placement_group_scheduler_test.cc | 54 +----- .../gcs_server/test/gcs_server_test_util.h | 12 -- src/ray/protobuf/gcs.proto | 8 +- 13 files changed, 125 insertions(+), 421 deletions(-) diff --git a/cpp/include/ray/api/task_options.h b/cpp/include/ray/api/task_options.h index 1873c09f3db85..c2284249da639 100644 --- a/cpp/include/ray/api/task_options.h +++ b/cpp/include/ray/api/task_options.h @@ -51,10 +51,9 @@ enum class PlacementStrategy { enum PlacementGroupState { PENDING = 0, - PREPARED = 1, - CREATED = 2, - REMOVED = 3, - RESCHEDULING = 4, + CREATED = 1, + REMOVED = 2, + RESCHEDULING = 3, UNRECOGNIZED = -1, }; @@ -113,4 +112,4 @@ struct ActorCreationOptions { }; } // namespace internal -} // namespace ray +} // namespace ray \ No newline at end of file diff --git a/python/ray/_private/state.py b/python/ray/_private/state.py index 738bf51ffaa9a..babe4a3d67966 100644 --- a/python/ray/_private/state.py +++ b/python/ray/_private/state.py @@ -317,8 +317,6 @@ def _gen_placement_group_info(self, placement_group_info): def get_state(state): if state == gcs_pb2.PlacementGroupTableData.PENDING: return "PENDING" - elif state == gcs_pb2.PlacementGroupTableData.PREPARED: - return "PREPARED" elif state == gcs_pb2.PlacementGroupTableData.CREATED: return "CREATED" elif state == gcs_pb2.PlacementGroupTableData.RESCHEDULING: diff --git a/python/ray/tests/test_placement_group_failover.py b/python/ray/tests/test_placement_group_failover.py index 0a89f0f146a67..cccc803230811 100755 --- a/python/ray/tests/test_placement_group_failover.py +++ b/python/ray/tests/test_placement_group_failover.py @@ -5,7 +5,6 @@ import ray.cluster_utils from ray._private.test_utils import get_other_nodes, wait_for_condition from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy -from ray.util import placement_group_table MB = 1024 * 1024 @@ -162,117 +161,5 @@ def _check_actor_with_pg_is_ready(): ) -@pytest.mark.parametrize("kill_bad_node", ["before_gcs_restart", "after_gcs_restart"]) -def test_gcs_restart_when_pg_committing( - monkeypatch, ray_start_cluster_head_with_external_redis, kill_bad_node -): - """ - Tests GCS restart preserves already-committed bundles for a PREPARED pg. - Timeline: - 1. Create a placement group with 2 bundles, no nodes yet. - - [Test] PENDING - 2. Create 2 actors in the pg, one for each bundle. - 3. Create 1 good node, and 1 slow committing node - - [Test] PREPARED - - [Test] There should be 1 alive actor. - 4. Kill GCS. - - [Test] There should be 1 alive actor. - 5. switch `kill_bad_node` - 1. `kill_bad_node` == "before_gcs_restart": - i. kill the slow committing node. - ii. restart GCS. - 2. `kill_bad_node` == "after_gcs_restart": - i. restart GCS. - - [Test] PREPARED - - [Test] There should be 1 alive actor. - ii. kill the slow committing node. - - [Test] PREPARED -> RESCHEDULING - - [Test] There should be 1 alive actor. - 6. Add a new, normal node. - - [Test] RESCHEDULING -> CREATED - - [Test] There should be 2 alive actors. - """ - MY_RESOURCE_ONE = {"MyResource": 1} - - @ray.remote(resources=MY_RESOURCE_ONE, num_cpus=0) - class Actor: - def ready(self): - return True - - def alive_actors(actors): - """Returns a list of actors that are alive.""" - ping_map = {actor.ready.remote(): actor for actor in actors} - pings = list(ping_map.keys()) - ready, _ = ray.wait(pings, timeout=1) - assert all(ray.get(ready)), f"{ready=}" - return [ping_map[ping] for ping in ready] - - cluster = ray_start_cluster_head_with_external_redis - - # 1. Create a placement group with 2 bundles, no nodes yet. - bundles = [MY_RESOURCE_ONE, MY_RESOURCE_ONE] - pg = ray.util.placement_group( - name="pg_2_nodes", strategy="STRICT_SPREAD", bundles=bundles - ) - assert placement_group_table(pg)["state"] == "PENDING" - - # 2. Create 2 actors in the pg, one for each bundle. - actor0 = Actor.options( - scheduling_strategy=PlacementGroupSchedulingStrategy( - placement_group=pg, placement_group_bundle_index=0 - ) - ).remote() - actor1 = Actor.options( - scheduling_strategy=PlacementGroupSchedulingStrategy( - placement_group=pg, placement_group_bundle_index=1 - ) - ).remote() - - actors = [actor0, actor1] - print(f"Created 2 actors: {actors}") - - # 3. Create 1 good node, and 1 slow committing node - cluster.add_node(num_cpus=1, resources=MY_RESOURCE_ONE) - with monkeypatch.context() as monkeypatch: - monkeypatch.setenv( - "RAY_testing_asio_delay_us", - "NodeManagerService.grpc_server.CommitBundleResources=500000000:500000000", - ) - bad_node = cluster.add_node(num_cpus=1, resources=MY_RESOURCE_ONE) - - assert not pg.wait(timeout_seconds=1) - assert placement_group_table(pg)["state"] == "PREPARED" - # Wait for the actor to be ready. One of them are ready. - assert len(alive_actors(actors)) == 1 - - # 4. Kill GCS. - cluster.head_node.kill_gcs_server() - assert len(alive_actors(actors)) == 1 - - if kill_bad_node == "before_gcs_restart": - # 5.1. Kill the slow committing node. - cluster.remove_node(bad_node) - # 5.2. Restart GCS. - cluster.head_node.start_gcs_server() - else: - assert kill_bad_node == "after_gcs_restart" - # 5.1. Restart GCS. - cluster.head_node.start_gcs_server() - assert placement_group_table(pg)["state"] == "PREPARED" - assert len(alive_actors(actors)) == 1 - # 5.2. Kill the slow committing node. - cluster.remove_node(bad_node) - - time.sleep(1) - assert placement_group_table(pg)["state"] == "RESCHEDULING" - assert len(alive_actors(actors)) == 1 - - # 6. Add a new, normal node. - cluster.add_node(num_cpus=1, resources=MY_RESOURCE_ONE) - assert pg.wait() - assert placement_group_table(pg)["state"] == "CREATED" - ray.get([actor.ready.remote() for actor in actors]) - - if __name__ == "__main__": - sys.exit(pytest.main(["-sv", __file__])) + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/util/state/custom_types.py b/python/ray/util/state/custom_types.py index a8349b2540ac6..2263bba5954bb 100644 --- a/python/ray/util/state/custom_types.py +++ b/python/ray/util/state/custom_types.py @@ -26,7 +26,6 @@ TypeActorStatus = Literal[tuple(ACTOR_STATUS)] PLACEMENT_GROUP_STATUS = [ "PENDING", - "PREPARED", "CREATED", "REMOVED", "RESCHEDULING", diff --git a/src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index 1fd3d4b5bb5fe..64e3ff009411e 100644 --- a/src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/mock/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -30,7 +30,9 @@ class MockGcsPlacementGroupSchedulerInterface public: MOCK_METHOD(void, ScheduleUnplacedBundles, - (const SchedulePgRequest &request), + (std::shared_ptr placement_group, + PGSchedulingFailureCallback failure_callback, + PGSchedulingSuccessfulCallback success_callback), (override)); MOCK_METHOD((absl::flat_hash_map>), GetAndRemoveBundlesOnNode, @@ -56,9 +58,8 @@ class MockGcsPlacementGroupSchedulerInterface MOCK_METHOD(void, Initialize, ((const absl::flat_hash_map< - PlacementGroupID, - std::vector>> &group_to_bundles), - const std::vector &prepared_pgs), + PlacementGroupID, + std::vector>> &group_to_bundles)), (override)); }; @@ -92,7 +93,9 @@ class MockGcsPlacementGroupScheduler : public GcsPlacementGroupScheduler { public: MOCK_METHOD(void, ScheduleUnplacedBundles, - (const SchedulePgRequest &request), + (std::shared_ptr placement_group, + PGSchedulingFailureCallback failure_handler, + PGSchedulingSuccessfulCallback success_handler), (override)); MOCK_METHOD(void, DestroyPlacementGroupBundleResourcesIfExists, diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index 3f2839d60b7b2..f874c2446433e 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -24,36 +24,11 @@ namespace ray { namespace gcs { -namespace { - -ExponentialBackOff CreateDefaultBackoff() { - // std::chrono conversions are unwieldy but safer. - // ms -> ns - using std::chrono::duration_cast; - using std::chrono::milliseconds; - using std::chrono::nanoseconds; - const uint64_t initial_delay_ns = - duration_cast( - milliseconds( - RayConfig::instance().gcs_create_placement_group_retry_min_interval_ms())) - .count(); - const uint64_t max_delay_ns = - duration_cast( - milliseconds( - RayConfig::instance().gcs_create_placement_group_retry_max_interval_ms())) - .count(); - return ExponentialBackOff( - initial_delay_ns, - RayConfig::instance().gcs_create_placement_group_retry_multiplier(), - max_delay_ns); -} -} // namespace - void GcsPlacementGroup::UpdateState( rpc::PlacementGroupTableData::PlacementGroupState state) { - if (state == rpc::PlacementGroupTableData::CREATED) { - RAY_CHECK_EQ(placement_group_table_data_.state(), - rpc::PlacementGroupTableData::PREPARED); + if (placement_group_table_data_.state() == + rpc::PlacementGroupTableData_PlacementGroupState_PENDING && + state == rpc::PlacementGroupTableData_PlacementGroupState_CREATED) { placement_group_table_data_.set_placement_group_final_bundle_placement_timestamp_ms( current_sys_time_ms()); @@ -305,9 +280,8 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationFailed( std::shared_ptr placement_group, ExponentialBackOff backoff, bool is_feasible) { - RAY_LOG(DEBUG).WithField(placement_group->GetPlacementGroupID()) - << "Failed to create placement group " << placement_group->GetName() - << ", try again."; + RAY_LOG(DEBUG) << "Failed to create placement group " << placement_group->GetName() + << ", id: " << placement_group->GetPlacementGroupID() << ", try again."; auto stats = placement_group->GetMutableStats(); if (!is_feasible) { @@ -327,7 +301,7 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationFailed( // group by rescheduling the bundles of the dead node. This should have higher // priority than trying to place other placement groups. stats->set_scheduling_state(rpc::PlacementGroupStats::FAILED_TO_COMMIT_RESOURCES); - AddToPendingQueue(std::move(placement_group), /*rank=*/0); + AddToPendingQueue(std::move(placement_group), /* rank */ 0); } else if (state == rpc::PlacementGroupTableData::PENDING) { stats->set_scheduling_state(rpc::PlacementGroupStats::NO_RESOURCES); AddToPendingQueue(std::move(placement_group), std::nullopt, backoff); @@ -430,18 +404,16 @@ void GcsPlacementGroupManager::SchedulePendingPlacementGroups() { stats->set_scheduling_attempt(stats->scheduling_attempt() + 1); stats->set_scheduling_started_time_ns(absl::GetCurrentTimeNanos()); MarkSchedulingStarted(placement_group_id); - gcs_placement_group_scheduler_->ScheduleUnplacedBundles(SchedulePgRequest{ - .placement_group = placement_group, - .failure_callback = - [this, backoff](std::shared_ptr placement_group, - bool is_feasible) { - OnPlacementGroupCreationFailed( - std::move(placement_group), backoff, is_feasible); - }, - .success_callback = - [this](std::shared_ptr placement_group) { - OnPlacementGroupCreationSuccess(placement_group); - }}); + gcs_placement_group_scheduler_->ScheduleUnplacedBundles( + placement_group, + [this, backoff](std::shared_ptr placement_group, + bool is_feasible) { + OnPlacementGroupCreationFailed( + std::move(placement_group), backoff, is_feasible); + }, + [this](std::shared_ptr placement_group) { + OnPlacementGroupCreationSuccess(std::move(placement_group)); + }); is_new_placement_group_scheduled = true; } // If the placement group is not registered == removed. @@ -759,7 +731,14 @@ void GcsPlacementGroupManager::AddToPendingQueue( pg->GetMutableStats()->set_highest_retry_delay_ms(absl::Nanoseconds(last_delay) / absl::Milliseconds(1)); if (!exp_backer) { - exp_backer = CreateDefaultBackoff(); + auto initial_delay_ns = + 1000000 * + RayConfig::instance().gcs_create_placement_group_retry_min_interval_ms(); + exp_backer = ExponentialBackOff( + initial_delay_ns, + RayConfig::instance().gcs_create_placement_group_retry_multiplier(), + 1000000 * + RayConfig::instance().gcs_create_placement_group_retry_max_interval_ms()); } else { *rank += static_cast(exp_backer->Next()); } @@ -953,83 +932,54 @@ void GcsPlacementGroupManager::UpdatePlacementGroupLoad() { } void GcsPlacementGroupManager::Initialize(const GcsInitData &gcs_init_data) { - // Bundles that are PREPARED or COMMITTED that we wanna keep. All others are going to be - // removed by raylet. - absl::flat_hash_map> bundles_in_use; - // Bundles that are COMMITTED that we want the Scheduler to track. + absl::flat_hash_map> node_to_bundles; absl::flat_hash_map>> - commited_bundles; - // Bundles that are PREPARED. The scheduler will commit them asap. - std::vector prepared_pgs; - + group_to_bundles; std::vector groups_to_remove; const auto &jobs = gcs_init_data.Jobs(); for (auto &item : gcs_init_data.PlacementGroups()) { auto placement_group = std::make_shared(item.second, placement_group_state_counter_); - const auto state = item.second.state(); - const auto &pg_id = placement_group->GetPlacementGroupID(); - if (state == rpc::PlacementGroupTableData::REMOVED) { - // ignore this pg... - continue; - } - registered_placement_groups_.emplace(item.first, placement_group); - if (!placement_group->GetName().empty()) { - named_placement_groups_[placement_group->GetRayNamespace()].emplace( - placement_group->GetName(), pg_id); - } - if (state == rpc::PlacementGroupTableData::PREPARED) { - RAY_CHECK(!placement_group->HasUnplacedBundles()); - // The PG is PREPARED. Add to `bundles_in_use` and `prepared_pgs`. - for (const auto &bundle : item.second.bundles()) { - bundles_in_use[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle); + if (item.second.state() != rpc::PlacementGroupTableData::REMOVED) { + registered_placement_groups_.emplace(item.first, placement_group); + if (!placement_group->GetName().empty()) { + named_placement_groups_[placement_group->GetRayNamespace()].emplace( + placement_group->GetName(), placement_group->GetPlacementGroupID()); } - prepared_pgs.emplace_back(SchedulePgRequest{ - .placement_group = placement_group, - .failure_callback = - [this](std::shared_ptr placement_group, - bool is_feasible) { - OnPlacementGroupCreationFailed( - std::move(placement_group), CreateDefaultBackoff(), is_feasible); - }, - .success_callback = - [this](std::shared_ptr placement_group) { - OnPlacementGroupCreationSuccess(placement_group); - }, - }); - } - if (state == rpc::PlacementGroupTableData::CREATED || - state == rpc::PlacementGroupTableData::RESCHEDULING) { - const auto &bundles = item.second.bundles(); - for (const auto &bundle : bundles) { - if (!NodeID::FromBinary(bundle.node_id()).IsNil()) { - bundles_in_use[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle); - commited_bundles[PlacementGroupID::FromBinary( - bundle.bundle_id().placement_group_id())] - .emplace_back(std::make_shared(bundle)); + + if (item.second.state() == rpc::PlacementGroupTableData::CREATED || + item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) { + const auto &bundles = item.second.bundles(); + for (const auto &bundle : bundles) { + if (!NodeID::FromBinary(bundle.node_id()).IsNil()) { + node_to_bundles[NodeID::FromBinary(bundle.node_id())].emplace_back(bundle); + group_to_bundles[PlacementGroupID::FromBinary( + bundle.bundle_id().placement_group_id())] + .emplace_back(std::make_shared(bundle)); + } } } - } - auto job_iter = jobs.find(placement_group->GetCreatorJobId()); - auto is_job_dead = (job_iter == jobs.end() || job_iter->second.is_dead()); - if (is_job_dead) { - placement_group->MarkCreatorJobDead(); - if (placement_group->IsPlacementGroupLifetimeDone()) { - groups_to_remove.push_back(placement_group->GetPlacementGroupID()); - continue; + auto job_iter = jobs.find(placement_group->GetCreatorJobId()); + auto is_job_dead = (job_iter == jobs.end() || job_iter->second.is_dead()); + if (is_job_dead) { + placement_group->MarkCreatorJobDead(); + if (placement_group->IsPlacementGroupLifetimeDone()) { + groups_to_remove.push_back(placement_group->GetPlacementGroupID()); + continue; + } } - } - if (state == rpc::PlacementGroupTableData::PENDING || - state == rpc::PlacementGroupTableData::RESCHEDULING) { - AddToPendingQueue(std::move(placement_group)); + if (item.second.state() == rpc::PlacementGroupTableData::PENDING || + item.second.state() == rpc::PlacementGroupTableData::RESCHEDULING) { + AddToPendingQueue(std::move(placement_group)); + } } } // Notify raylets to release unused bundles. - gcs_placement_group_scheduler_->ReleaseUnusedBundles(bundles_in_use); - gcs_placement_group_scheduler_->Initialize(commited_bundles, prepared_pgs); + gcs_placement_group_scheduler_->ReleaseUnusedBundles(node_to_bundles); + gcs_placement_group_scheduler_->Initialize(group_to_bundles); for (const auto &placement_group_id : groups_to_remove) { RemovePlacementGroup(placement_group_id, [placement_group_id](Status status) { diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc index 6bc2737c14a65..4a40e27c8319f 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc @@ -32,13 +32,12 @@ GcsPlacementGroupScheduler::GcsPlacementGroupScheduler( gcs_table_storage_(std::move(gcs_table_storage)), gcs_node_manager_(gcs_node_manager), cluster_resource_scheduler_(cluster_resource_scheduler), - raylet_client_pool_(std::move(raylet_client_pool)) {} + raylet_client_pool_(raylet_client_pool) {} void GcsPlacementGroupScheduler::ScheduleUnplacedBundles( - const SchedulePgRequest &request) { - const auto &placement_group = request.placement_group; - const auto &failure_callback = request.failure_callback; - const auto &success_callback = request.success_callback; + std::shared_ptr placement_group, + PGSchedulingFailureCallback failure_callback, + PGSchedulingSuccessfulCallback success_callback) { // We need to ensure that the PrepareBundleResources won't be sent before the reply of // ReleaseUnusedBundles is returned. if (!nodes_of_releasing_unused_bundles_.empty()) { @@ -296,17 +295,6 @@ void GcsPlacementGroupScheduler::CommitAllBundles( const std::shared_ptr &lease_status_tracker, const PGSchedulingFailureCallback &schedule_failure_handler, const PGSchedulingSuccessfulCallback &schedule_success_handler) { - // TOCTOU: this method is called after an async write to Redis. If the PG is cancelled - // after the write, we don't commit the bundles. Instead we just return them. - if (lease_status_tracker->GetLeasingState() == LeasingState::CANCELLED) { - DestroyPlacementGroupCommittedBundleResources( - lease_status_tracker->GetPlacementGroup()->GetPlacementGroupID()); - ReturnBundleResources(lease_status_tracker->GetBundleLocations()); - schedule_failure_handler(lease_status_tracker->GetPlacementGroup(), - /*is_feasible=*/true); - return; - } - const std::shared_ptr &prepared_bundle_locations = lease_status_tracker->GetPreparedBundleLocations(); std::unordered_map>> @@ -393,17 +381,8 @@ void GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned( ->set_node_id(location.first.Binary()); } - placement_group->UpdateState(rpc::PlacementGroupTableData::PREPARED); - - RAY_CHECK_OK(gcs_table_storage_->PlacementGroupTable().Put( - placement_group_id, - placement_group->GetPlacementGroupTableData(), - [this, lease_status_tracker, schedule_failure_handler, schedule_success_handler]( - Status status) { - RAY_CHECK_OK(status); - CommitAllBundles( - lease_status_tracker, schedule_failure_handler, schedule_success_handler); - })); + CommitAllBundles( + lease_status_tracker, schedule_failure_handler, schedule_success_handler); } void GcsPlacementGroupScheduler::OnAllBundleCommitRequestReturned( @@ -548,8 +527,7 @@ void GcsPlacementGroupScheduler::ReleaseUnusedBundles( void GcsPlacementGroupScheduler::Initialize( const absl::flat_hash_map>> - &committed_bundles, - const std::vector &prepared_pgs) { + &group_to_bundles) { // We need to reinitialize the `committed_bundle_location_index_`, otherwise, // it will get an empty bundle set when raylet fo occurred after GCS server restart. @@ -557,7 +535,7 @@ void GcsPlacementGroupScheduler::Initialize( auto &alive_nodes = gcs_node_manager_.GetAllAliveNodes(); committed_bundle_location_index_.AddNodes(alive_nodes); - for (const auto &group : committed_bundles) { + for (const auto &group : group_to_bundles) { const auto &placement_group_id = group.first; std::shared_ptr committed_bundle_locations = std::make_shared(); @@ -573,19 +551,6 @@ void GcsPlacementGroupScheduler::Initialize( .GetBundleLocationIndex() .AddOrUpdateBundleLocations(committed_bundle_locations); } - for (const auto &req : prepared_pgs) { - const auto &pg = req.placement_group; - // The PG should have all bundles placeed. - RAY_CHECK(!pg->HasUnplacedBundles()); - const auto &prepared_bundles = pg->GetBundles(); - - const auto pg_id = pg->GetPlacementGroupID(); - auto tracker = LeaseStatusTracker::CreatePrepared(pg, prepared_bundles); - RAY_CHECK(placement_group_leasing_in_progress_.emplace(pg_id, tracker).second); - - RAY_LOG(DEBUG).WithField(pg_id) << "Recommitting prepared pg"; - CommitAllBundles(tracker, req.failure_callback, req.success_callback); - } } void GcsPlacementGroupScheduler::DestroyPlacementGroupPreparedBundleResources( @@ -830,32 +795,6 @@ LeaseStatusTracker::LeaseStatusTracker( } } -std::shared_ptr LeaseStatusTracker::CreatePrepared( - std::shared_ptr placement_group, - const std::vector> &prepared_bundles) { - ScheduleMap schedule_map; - for (const auto &bundle : prepared_bundles) { - BundleID bundle_id = bundle->BundleId(); - NodeID node_id = bundle->NodeId(); - RAY_CHECK(!node_id.IsNil()); - schedule_map[bundle_id] = node_id; - } - - auto tracker = std::make_shared( - placement_group, /*unplaced_bundles=*/prepared_bundles, schedule_map); - - for (const auto &bundle : prepared_bundles) { - const BundleID bundle_id = bundle->BundleId(); - const NodeID node_id = schedule_map[bundle_id]; - tracker->MarkPreparePhaseStarted(node_id, bundle); - tracker->MarkPrepareRequestReturned(node_id, bundle, Status::OK()); - } - - RAY_CHECK(tracker->AllPrepareRequestsReturned()); - RAY_CHECK(tracker->AllPrepareRequestsSuccessful()); - return tracker; -} - bool LeaseStatusTracker::MarkPreparePhaseStarted( const NodeID &node_id, const std::shared_ptr &bundle) { const auto &bundle_id = bundle->BundleId(); @@ -951,7 +890,7 @@ const std::vector> return bundles_to_schedule_; } -LeasingState LeaseStatusTracker::GetLeasingState() const { return leasing_state_; } +const LeasingState LeaseStatusTracker::GetLeasingState() const { return leasing_state_; } void LeaseStatusTracker::MarkPlacementGroupScheduleCancelled() { UpdateLeasingState(LeasingState::CANCELLED); diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h index ec7ac53941bda..180f6f38760e3 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h @@ -49,19 +49,17 @@ using raylet_scheduling_policy::SchedulingResultStatus; using ScheduleMap = absl::flat_hash_map; -struct SchedulePgRequest { - /// The placement group to be scheduled. - std::shared_ptr placement_group; - // Called if the pg failed to schedule (prepare or commit). - PGSchedulingFailureCallback failure_callback; - // Called if the pg is successfully committed. - PGSchedulingSuccessfulCallback success_callback; -}; - class GcsPlacementGroupSchedulerInterface { public: /// Schedule unplaced bundles of the specified placement group. - virtual void ScheduleUnplacedBundles(const SchedulePgRequest &request) = 0; + /// + /// \param placement_group The placement group to be scheduled. + /// \param failure_callback This function is called if the schedule is failed. + /// \param success_callback This function is called if the schedule is successful. + virtual void ScheduleUnplacedBundles( + std::shared_ptr placement_group, + PGSchedulingFailureCallback failure_callback, + PGSchedulingSuccessfulCallback success_callback) = 0; /// Get and remove bundles belong to the specified node. /// @@ -103,12 +101,10 @@ class GcsPlacementGroupSchedulerInterface { /// This should be called when GCS server restarts after a failure. /// /// \param node_to_bundles Bundles used by each node. - /// \param prepared_pgs placement groups in state PREPARED. Need to be committed asap. virtual void Initialize( const absl::flat_hash_map>> - &group_to_bundles, - const std::vector &prepared_pgs) = 0; + &group_to_bundles) = 0; virtual ~GcsPlacementGroupSchedulerInterface() {} }; @@ -133,11 +129,6 @@ class LeaseStatusTracker { const ScheduleMap &schedule_map); ~LeaseStatusTracker() = default; - // Creates a LeaseStatusTracker that starts with PREPARED status. - static std::shared_ptr CreatePrepared( - std::shared_ptr placement_group, - const std::vector> &unplaced_bundles); - /// Indicate the tracker that prepare requests are sent to a specific node. /// /// \param node_id Id of a node where prepare request is sent. @@ -220,7 +211,7 @@ class LeaseStatusTracker { /// Return the leasing state. /// /// \return Leasing state. - LeasingState GetLeasingState() const; + const LeasingState GetLeasingState() const; /// Mark that this leasing is cancelled. void MarkPlacementGroupScheduleCancelled(); @@ -307,7 +298,9 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// \param placement_group to be scheduled. /// \param failure_callback This function is called if the schedule is failed. /// \param success_callback This function is called if the schedule is successful. - void ScheduleUnplacedBundles(const SchedulePgRequest &request) override; + void ScheduleUnplacedBundles(std::shared_ptr placement_group, + PGSchedulingFailureCallback failure_handler, + PGSchedulingSuccessfulCallback success_handler) override; /// Destroy the actual bundle resources or locked resources (for 2PC) /// on all nodes associated with this placement group. @@ -353,12 +346,10 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface { /// This should be called when GCS server restarts after a failure. /// /// \param node_to_bundles Bundles used by each node. - /// \param prepared_pgs placement groups in state PREPARED. Need to be committed asap. void Initialize( const absl::flat_hash_map>> - &group_to_bundles, - const std::vector &prepared_pgs) override; + &group_to_bundles) override; /// Add resources changed listener. void AddResourcesChangedListener(std::function listener); diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc index 6cfd689ac168b..6ca0402880c09 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_mock_test.cc @@ -72,12 +72,13 @@ TEST_F(GcsPlacementGroupManagerMockTest, PendingQueuePriorityReschedule) { Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::SPREAD, 1); auto pg = std::make_shared(req, "", counter_); auto cb = [](Status s) {}; - SchedulePgRequest request; + PGSchedulingFailureCallback failure_callback; + PGSchedulingSuccessfulCallback success_callback; std::function put_cb; EXPECT_CALL(*store_client_, AsyncPut(_, _, _, _, _)) .WillOnce(DoAll(SaveArg<4>(&put_cb), Return(Status::OK()))); - EXPECT_CALL(*gcs_placement_group_scheduler_, ScheduleUnplacedBundles(_)) - .WillOnce(DoAll(SaveArg<0>(&request))); + EXPECT_CALL(*gcs_placement_group_scheduler_, ScheduleUnplacedBundles(_, _, _)) + .WillOnce(DoAll(SaveArg<1>(&failure_callback), SaveArg<2>(&success_callback))); auto now = absl::GetCurrentTimeNanos(); gcs_placement_group_manager_->RegisterPlacementGroup(pg, cb); auto &pending_queue = gcs_placement_group_manager_->pending_placement_groups_; @@ -86,7 +87,7 @@ TEST_F(GcsPlacementGroupManagerMockTest, PendingQueuePriorityReschedule) { ASSERT_GE(absl::GetCurrentTimeNanos(), pending_queue.begin()->first); put_cb(true); pg->UpdateState(rpc::PlacementGroupTableData::RESCHEDULING); - request.failure_callback(pg, true); + failure_callback(pg, true); ASSERT_EQ(1, pending_queue.size()); ASSERT_GE(0, pending_queue.begin()->first); } @@ -98,13 +99,15 @@ TEST_F(GcsPlacementGroupManagerMockTest, PendingQueuePriorityFailed) { Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::SPREAD, 1); auto pg = std::make_shared(req, "", counter_); auto cb = [](Status s) {}; - SchedulePgRequest request; + PGSchedulingFailureCallback failure_callback; + PGSchedulingSuccessfulCallback success_callback; std::function put_cb; EXPECT_CALL(*store_client_, AsyncPut(_, _, _, _, _)) .WillOnce(DoAll(SaveArg<4>(&put_cb), Return(Status::OK()))); - EXPECT_CALL(*gcs_placement_group_scheduler_, ScheduleUnplacedBundles(_)) + EXPECT_CALL(*gcs_placement_group_scheduler_, ScheduleUnplacedBundles(_, _, _)) .Times(2) - .WillRepeatedly(DoAll(SaveArg<0>(&request))); + .WillRepeatedly( + DoAll(SaveArg<1>(&failure_callback), SaveArg<2>(&success_callback))); auto now = absl::GetCurrentTimeNanos(); gcs_placement_group_manager_->RegisterPlacementGroup(pg, cb); auto &pending_queue = gcs_placement_group_manager_->pending_placement_groups_; @@ -114,7 +117,7 @@ TEST_F(GcsPlacementGroupManagerMockTest, PendingQueuePriorityFailed) { put_cb(true); pg->UpdateState(rpc::PlacementGroupTableData::PENDING); now = absl::GetCurrentTimeNanos(); - request.failure_callback(pg, true); + failure_callback(pg, true); auto exp_backer = ExponentialBackOff( 1000000 * RayConfig::instance().gcs_create_placement_group_retry_min_interval_ms(), RayConfig::instance().gcs_create_placement_group_retry_multiplier(), @@ -137,7 +140,7 @@ TEST_F(GcsPlacementGroupManagerMockTest, PendingQueuePriorityFailed) { ASSERT_EQ(0, pending_queue.size()); pg->UpdateState(rpc::PlacementGroupTableData::PENDING); now = absl::GetCurrentTimeNanos(); - request.failure_callback(pg, true); + failure_callback(pg, true); next = RayConfig::instance().gcs_create_placement_group_retry_multiplier() * next; ASSERT_EQ(1, pending_queue.size()); ASSERT_LE(now + next, pending_queue.begin()->first); @@ -154,14 +157,16 @@ TEST_F(GcsPlacementGroupManagerMockTest, PendingQueuePriorityOrder) { Mocker::GenCreatePlacementGroupRequest("", rpc::PlacementStrategy::SPREAD, 1); auto pg2 = std::make_shared(req2, "", counter_); auto cb = [](Status s) {}; - SchedulePgRequest request; + PGSchedulingFailureCallback failure_callback; + PGSchedulingSuccessfulCallback success_callback; std::function put_cb; EXPECT_CALL(*store_client_, AsyncPut(_, _, _, _, _)) .Times(2) .WillRepeatedly(DoAll(SaveArg<4>(&put_cb), Return(Status::OK()))); - EXPECT_CALL(*gcs_placement_group_scheduler_, ScheduleUnplacedBundles(_)) + EXPECT_CALL(*gcs_placement_group_scheduler_, ScheduleUnplacedBundles(_, _, _)) .Times(2) - .WillRepeatedly(DoAll(SaveArg<0>(&request))); + .WillRepeatedly( + DoAll(SaveArg<1>(&failure_callback), SaveArg<2>(&success_callback))); gcs_placement_group_manager_->RegisterPlacementGroup(pg1, cb); gcs_placement_group_manager_->RegisterPlacementGroup(pg2, cb); auto &pending_queue = gcs_placement_group_manager_->pending_placement_groups_; @@ -170,7 +175,7 @@ TEST_F(GcsPlacementGroupManagerMockTest, PendingQueuePriorityOrder) { ASSERT_EQ(1, pending_queue.size()); // PG1 is scheduled first, so PG2 is in pending queue ASSERT_EQ(pg2, pending_queue.begin()->second.second); - request.failure_callback(pg1, true); + failure_callback(pg1, true); ASSERT_EQ(2, pending_queue.size()); gcs_placement_group_manager_->SchedulePendingPlacementGroups(); // PG2 is scheduled for the next, so PG1 is in pending queue diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index 268096815cbed..35dd8319f7e1b 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -37,8 +37,12 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf public: MockPlacementGroupScheduler() = default; - void ScheduleUnplacedBundles(const SchedulePgRequest &request) override { - placement_groups_.push_back(request.placement_group); + void ScheduleUnplacedBundles( + std::shared_ptr placement_group, + std::function, bool)> failure_handler, + std::function)> success_handler) + override { + placement_groups_.push_back(placement_group); } MOCK_METHOD1(DestroyPlacementGroupBundleResourcesIfExists, @@ -50,12 +54,11 @@ class MockPlacementGroupScheduler : public gcs::GcsPlacementGroupSchedulerInterf ReleaseUnusedBundles, void(const absl::flat_hash_map> &node_to_bundles)); - MOCK_METHOD2( + MOCK_METHOD1( Initialize, void(const absl::flat_hash_map>> - &group_to_bundles, - const std::vector &prepared_pgs)); + &group_to_bundles)); MOCK_METHOD((absl::flat_hash_map>), GetBundlesOnNode, @@ -138,10 +141,6 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { placement_group->GetMutableBundle(bundle_index) ->set_node_id(NodeID::FromRandom().Binary()); } - // A placement group must first become PREPARED then it can become CREATED. - // Normally transition to PREPARED is performed by - // GcsPlacementGroupScheduler::OnAllBundlePrepareRequestReturned. - placement_group->UpdateState(rpc::PlacementGroupTableData::PREPARED); gcs_placement_group_manager_->OnPlacementGroupCreationSuccess(placement_group); RunIOService(); promise.get_future().get(); @@ -577,8 +576,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestSchedulerReinitializeAfterGcsRestart) { EXPECT_CALL(*mock_placement_group_scheduler_, ReleaseUnusedBundles(_)).Times(1); EXPECT_CALL( *mock_placement_group_scheduler_, - Initialize(testing::Contains(testing::Key(placement_group->GetPlacementGroupID())), - /*prepared_pgs=*/testing::IsEmpty())) + Initialize(testing::Contains(testing::Key(placement_group->GetPlacementGroupID())))) .Times(1); gcs_placement_group_manager_->Initialize(*gcs_init_data); ASSERT_EQ(placement_group->GetState(), rpc::PlacementGroupTableData::CREATED); @@ -1001,8 +999,7 @@ TEST_F(GcsPlacementGroupManagerTest, TestCheckCreatorJobIsDeadWhenGcsRestart) { gcs_init_data->PlacementGroups().end()); EXPECT_CALL( *mock_placement_group_scheduler_, - Initialize(testing::Contains(testing::Key(placement_group->GetPlacementGroupID())), - /*prepared_pgs=*/testing::IsEmpty())) + Initialize(testing::Contains(testing::Key(placement_group->GetPlacementGroupID())))) .Times(1); gcs_placement_group_manager_->Initialize(*gcs_init_data); // Make sure placement group is removed after gcs restart for the creator job is dead diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 5d3f11ed39b05..cbbf4b7f89939 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -605,66 +605,16 @@ TEST_F(GcsPlacementGroupSchedulerTest, PlacementGroupCancelledDuringCommit) { // Now, cancel the schedule request. ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources()); + scheduler_->MarkScheduleCancelled(placement_group_id); WaitPendingDone(raylet_clients_[0]->commit_callbacks, 1); WaitPendingDone(raylet_clients_[1]->commit_callbacks, 1); - // Here: PG is PREPARED. Grant 1 commit and then cancel. ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources()); - scheduler_->MarkScheduleCancelled(placement_group_id); ASSERT_TRUE(raylet_clients_[1]->GrantCommitBundleResources()); ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve()); ASSERT_TRUE(raylet_clients_[1]->GrantCancelResourceReserve()); WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE); } -TEST_F(GcsPlacementGroupSchedulerTest, PlacementGroupCancelledDuringPreparedPut) { - // After a PG is prepared by all nodes, GCS writes to Redis then commit-all. - // If a Cancel is happening during prepare, or during the Redis write, i.e. before the - // commit-all is called, the PG should be removed and no commits should be sent. - auto node0 = Mocker::GenNodeInfo(0); - auto node1 = Mocker::GenNodeInfo(1); - AddNode(node0); - AddNode(node1); - ASSERT_EQ(2, gcs_node_manager_->GetAllAliveNodes().size()); - - auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest(); - auto placement_group = std::make_shared( - create_placement_group_request, "", counter_); - - // Schedule the placement group successfully. - auto failure_handler = [this](std::shared_ptr placement_group, - bool is_insfeasble) { - absl::MutexLock lock(&placement_group_requests_mutex_); - failure_placement_groups_.emplace_back(std::move(placement_group)); - }; - auto success_handler = [this](std::shared_ptr placement_group) { - absl::MutexLock lock(&placement_group_requests_mutex_); - success_placement_groups_.emplace_back(std::move(placement_group)); - }; - - scheduler_->ScheduleUnplacedBundles(placement_group, failure_handler, success_handler); - ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources()); - scheduler_->MarkScheduleCancelled(placement_group->GetPlacementGroupID()); - ASSERT_TRUE(raylet_clients_[1]->GrantPrepareBundleResources()); - - WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::FAILURE); - - // Make sure the commit requests are not sent. - ASSERT_EQ(raylet_clients_[0]->commit_callbacks.size(), 0); - ASSERT_EQ(raylet_clients_[1]->commit_callbacks.size(), 0); - - // Raylet receives the cancel request. - ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve()); - ASSERT_TRUE(raylet_clients_[1]->GrantCancelResourceReserve()); - - // Make sure there's no more bundles on nodes. - auto bundles_on_node0 = - scheduler_->GetAndRemoveBundlesOnNode(NodeID::FromBinary(node0->node_id())); - ASSERT_EQ(0, bundles_on_node0.size()); - auto bundles_on_node1 = - scheduler_->GetAndRemoveBundlesOnNode(NodeID::FromBinary(node1->node_id())); - ASSERT_EQ(0, bundles_on_node1.size()); -} - TEST_F(GcsPlacementGroupSchedulerTest, TestPackStrategyReschedulingWhenNodeAdd) { ReschedulingWhenNodeAddTest(rpc::PlacementStrategy::PACK); } @@ -1241,7 +1191,7 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestInitialize) { std::make_shared(*placement_group->GetMutableBundle(0))); group_to_bundles[placement_group->GetPlacementGroupID()].emplace_back( std::make_shared(*placement_group->GetMutableBundle(1))); - scheduler_->Initialize(group_to_bundles, /*prepared_pgs=*/{}); + scheduler_->Initialize(group_to_bundles); auto bundles = scheduler_->GetAndRemoveBundlesOnNode(NodeID::FromBinary(node0->node_id())); diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index 8d5cbc7ed4b24..e565acb5ed586 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -383,18 +383,6 @@ struct GcsServerMocker { size_t GetWaitingRemovedBundlesSize() { return waiting_removed_bundles_.size(); } - using gcs::GcsPlacementGroupScheduler::ScheduleUnplacedBundles; - // Extra conveinence overload for the mock tests to keep using the old interface. - void ScheduleUnplacedBundles( - const std::shared_ptr &placement_group, - gcs::PGSchedulingFailureCallback failure_callback, - gcs::PGSchedulingSuccessfulCallback success_callback) { - ScheduleUnplacedBundles( - gcs::SchedulePgRequest{.placement_group = placement_group, - .failure_callback = failure_callback, - .success_callback = success_callback}); - }; - protected: friend class GcsPlacementGroupSchedulerTest; FRIEND_TEST(GcsPlacementGroupSchedulerTest, TestCheckingWildcardResource); diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index ccc5807c6ff87..c9f9f60a43f2a 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -626,14 +626,12 @@ message PlacementGroupTableData { enum PlacementGroupState { // Placement Group is pending or scheduling PENDING = 0; - // Placement Group is scheduled, and all nodes have prepared the resources. - PREPARED = 1; // Placement Group is created. - CREATED = 2; + CREATED = 1; // Placement Group is already removed and won't be reschedule. - REMOVED = 3; + REMOVED = 2; // Placement Group is rescheduling because the node it placed is dead. - RESCHEDULING = 4; + RESCHEDULING = 3; } // ID of the PlacementGroup.