From 99034f5af552dcd669d33fb914f37b11587cae02 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 3 Nov 2021 00:11:40 -0700 Subject: [PATCH] =?UTF-8?q?Revert=20"Revert=20"[core]=20Fix=20wrong=20loca?= =?UTF-8?q?l=20resource=20view=20in=20raylet=20(#1991=E2=80=A6=20(#19996)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit f1eedb15b610295b9c34555565369b8d4e5c96e7. ## Why are these changes needed? Self node should avoid reading any updates from gcs for node resource change since it'll maintain local view by itself. ## Related issue number #19438 --- python/ray/tests/test_placement_group_3.py | 52 +++++++++++++++++ .../gcs/gcs_server/gcs_resource_manager.cc | 2 +- src/ray/raylet/node_manager.cc | 56 ++++++++++++------- 3 files changed, 89 insertions(+), 21 deletions(-) diff --git a/python/ray/tests/test_placement_group_3.py b/python/ray/tests/test_placement_group_3.py index cee2b819c1a61..57009bbd90f4e 100644 --- a/python/ray/tests/test_placement_group_3.py +++ b/python/ray/tests/test_placement_group_3.py @@ -646,5 +646,57 @@ def check_bundle_leaks(): wait_for_condition(check_bundle_leaks) +def test_placement_group_local_resource_view(monkeypatch, ray_start_cluster): + """Please refer to https://github.com/ray-project/ray/pull/19911 + for more details. + """ + with monkeypatch.context() as m: + # Increase broadcasting interval so that node resource will arrive + # at raylet after local resource all being allocated. + m.setenv("RAY_raylet_report_resources_period_milliseconds", "2000") + m.setenv("RAY_grpc_based_resource_broadcast", "true") + cluster = ray_start_cluster + + cluster.add_node(num_cpus=16, object_store_memory=1e9) + cluster.wait_for_nodes() + cluster.add_node(num_cpus=16, num_gpus=1) + cluster.wait_for_nodes() + NUM_CPU_BUNDLES = 30 + + @ray.remote(num_cpus=1) + class Worker(object): + def __init__(self, i): + self.i = i + + def work(self): + time.sleep(0.1) + print("work ", self.i) + + @ray.remote(num_cpus=1, num_gpus=1) + class Trainer(object): + def __init__(self, i): + self.i = i + + def train(self): + time.sleep(0.2) + print("train ", self.i) + + ray.init(address="auto") + bundles = [{"CPU": 1, "GPU": 1}] + bundles += [{"CPU": 1} for _ in range(NUM_CPU_BUNDLES)] + pg = placement_group(bundles, strategy="PACK") + ray.get(pg.ready()) + + # Local resource will be allocated and here we are to ensure + # local view is consistent and node resouce updates are discarded + workers = [ + Worker.options(placement_group=pg).remote(i) + for i in range(NUM_CPU_BUNDLES) + ] + trainer = Trainer.options(placement_group=pg).remote(0) + ray.get([workers[i].work.remote() for i in range(NUM_CPU_BUNDLES)]) + ray.get(trainer.train.remote()) + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index 9e36db6efafae..4266c635bf981 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -58,7 +58,7 @@ void GcsResourceManager::HandleUpdateResources( const rpc::UpdateResourcesRequest &request, rpc::UpdateResourcesReply *reply, rpc::SendReplyCallback send_reply_callback) { NodeID node_id = NodeID::FromBinary(request.node_id()); - RAY_LOG(INFO) << "Updating resources, node id = " << node_id; + RAY_LOG(DEBUG) << "Updating resources, node id = " << node_id; auto changed_resources = std::make_shared>(); for (const auto &entry : request.resources()) { changed_resources->emplace(entry.first, entry.second.resource_capacity()); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 7f1b6a2d0677b..4237562065490 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -890,7 +890,15 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id, const ResourceSet &createUpdatedResources) { RAY_LOG(DEBUG) << "[ResourceCreateUpdated] received callback from node id " << node_id << " with created or updated resources: " - << createUpdatedResources.ToString() << ". Updating resource map."; + << createUpdatedResources.ToString() << ". Updating resource map." + << " skip=" << (node_id == self_node_id_); + + // Skip updating local node since local node always has the latest information. + // Updating local node could result in a inconsistence view in cluster resource + // scheduler which could make task hang. + if (node_id == self_node_id_) { + return; + } // Update local_available_resources_ and SchedulingResources for (const auto &resource_pair : createUpdatedResources.GetResourceMap()) { @@ -900,11 +908,7 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id, new_resource_capacity); } RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map."; - - if (node_id == self_node_id_) { - // The resource update is on the local node, check if we can reschedule tasks. - cluster_task_manager_->ScheduleAndDispatchTasks(); - } + cluster_task_manager_->ScheduleAndDispatchTasks(); } void NodeManager::ResourceDeleted(const NodeID &node_id, @@ -916,7 +920,14 @@ void NodeManager::ResourceDeleted(const NodeID &node_id, } RAY_LOG(DEBUG) << "[ResourceDeleted] received callback from node id " << node_id << " with deleted resources: " << oss.str() - << ". Updating resource map."; + << ". Updating resource map. skip=" << (node_id == self_node_id_); + } + + // Skip updating local node since local node always has the latest information. + // Updating local node could result in a inconsistence view in cluster resource + // scheduler which could make task hang. + if (node_id == self_node_id_) { + return; } // Update local_available_resources_ and SchedulingResources @@ -1474,39 +1485,44 @@ void NodeManager::HandleUpdateResourceUsage( rpc::SendReplyCallback send_reply_callback) { rpc::ResourceUsageBroadcastData resource_usage_batch; resource_usage_batch.ParseFromString(request.serialized_resource_usage_batch()); - - if (resource_usage_batch.seq_no() != next_resource_seq_no_) { + // When next_resource_seq_no_ == 0 it means it just started. + // TODO: Fetch a snapshot from gcs for lightweight resource broadcasting + if (next_resource_seq_no_ != 0 && + resource_usage_batch.seq_no() != next_resource_seq_no_) { + // TODO (Alex): Ideally we would be really robust, and potentially eagerly + // pull a full resource "snapshot" from gcs to make sure our state doesn't + // diverge from GCS. RAY_LOG(WARNING) << "Raylet may have missed a resource broadcast. This either means that GCS has " "restarted, the network is heavily congested and is dropping, reordering, or " "duplicating packets. Expected seq#: " << next_resource_seq_no_ << ", but got: " << resource_usage_batch.seq_no() << "."; - // TODO (Alex): Ideally we would be really robust, and potentially eagerly - // pull a full resource "snapshot" from gcs to make sure our state doesn't - // diverge from GCS. + if (resource_usage_batch.seq_no() < next_resource_seq_no_) { + RAY_LOG(WARNING) << "Discard the the resource update since local version is newer"; + return; + } } next_resource_seq_no_ = resource_usage_batch.seq_no() + 1; for (const auto &resource_change_or_data : resource_usage_batch.batch()) { if (resource_change_or_data.has_data()) { const auto &resource_usage = resource_change_or_data.data(); - const NodeID &node_id = NodeID::FromBinary(resource_usage.node_id()); - if (node_id == self_node_id_) { - // Skip messages from self. - continue; + auto node_id = NodeID::FromBinary(resource_usage.node_id()); + // Skip messages from self. + if (node_id != self_node_id_) { + UpdateResourceUsage(node_id, resource_usage); } - UpdateResourceUsage(node_id, resource_usage); } else if (resource_change_or_data.has_change()) { const auto &resource_notification = resource_change_or_data.change(); - auto id = NodeID::FromBinary(resource_notification.node_id()); + auto node_id = NodeID::FromBinary(resource_notification.node_id()); if (resource_notification.updated_resources_size() != 0) { ResourceSet resource_set( MapFromProtobuf(resource_notification.updated_resources())); - ResourceCreateUpdated(id, resource_set); + ResourceCreateUpdated(node_id, resource_set); } if (resource_notification.deleted_resources_size() != 0) { - ResourceDeleted(id, + ResourceDeleted(node_id, VectorFromProtobuf(resource_notification.deleted_resources())); } }