From d4324fcc949a03c1840c9e593e0401853b540091 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 9 May 2023 17:02:26 -0700 Subject: [PATCH] [core] Add object owner and copy metrics to node stats (#35119) This PR adds the object owner and copy metrics to `GetNodeStats` RPC endpoint. The inlined small objects are not counted as one copy because it's not stored in object store and when it's used, it'll be copied inline, so no need to count it. But it's still counted as 1 as ownership for correctness because it's actually owned by worker. The local copies are retrieved from local object manager directly and owner counts needs the caller to aggregate the metrics from each core worker. Signed-off-by: e428265 --- python/ray/tests/test_metrics.py | 125 +++++++++++++++++++++++++ src/ray/core_worker/core_worker.cc | 1 + src/ray/core_worker/reference_count.cc | 9 ++ src/ray/core_worker/reference_count.h | 5 + src/ray/protobuf/common.proto | 2 + src/ray/protobuf/node_manager.proto | 2 + src/ray/raylet/local_object_manager.cc | 3 +- src/ray/raylet/local_object_manager.h | 6 +- src/ray/raylet/node_manager.cc | 2 +- 9 files changed, 150 insertions(+), 5 deletions(-) diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 4e629da92441..a9357461d0aa 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -100,6 +100,131 @@ def verify(): wait_for_condition(verify) +def get_owner_info(node_ids): + node_addrs = { + n["NodeID"]: (n["NodeManagerAddress"], n["NodeManagerPort"]) + for n in ray.nodes() + } + # Force a global gc to clean up the object store. + ray._private.internal_api.global_gc() + owner_stats = {n: 0 for n in node_ids} + primary_copy_stats = {n: 0 for n in node_ids} + + for node_id in node_ids: + node_stats = ray._private.internal_api.node_stats( + node_addrs[node_id][0], node_addrs[node_id][1], False + ) + owner_stats[node_id] = sum( + [stats.num_owned_objects for stats in node_stats.core_workers_stats] + ) + primary_copy_stats[ + node_id + ] = node_stats.store_stats.num_object_store_primary_copies + + print(owner_stats) + print(node_ids) + owner_stats = [owner_stats.get(node_id, 0) for node_id in node_ids] + primary_copy_stats = [primary_copy_stats.get(node_id, 0) for node_id in node_ids] + print("owner_stats", owner_stats) + print("primary_copy_stats", primary_copy_stats) + + return owner_stats, primary_copy_stats + + +def test_node_object_metrics(ray_start_cluster, monkeypatch): + NUM_NODES = 3 + cluster = ray_start_cluster + for i in range(NUM_NODES): + cluster.add_node(True, resources={f"node_{i}": 1}) + if i == 0: + ray.init(address=cluster.address) + node_ids = [] + + for i in range(NUM_NODES): + + @ray.remote(resources={f"node_{i}": 1}) + def get_node_id(): + return ray.get_runtime_context().get_node_id() + + node_ids.append(ray.get(get_node_id.remote())) + + # Object store stats + # x is owned by node_0 + # x is stored at node_0 + x = ray.put([1]) # noqa: F841 + wait_for_condition(lambda: get_owner_info(node_ids) == ([1, 0, 0], [1, 0, 0])) + + # Test nested with put + @ray.remote(resources={"node_1": 1}) + def big_obj(): + # b is owned by node_1 + # b is stored at node_1 + b = ray.put([1] * 1024 * 1024 * 10) + return b + + # Object store stats + # big_obj is owned by node_0 + # big_obj is stored in memory (no primary copy) + big_obj_ref = big_obj.remote() # noqa: F841 + wait_for_condition(lambda: get_owner_info(node_ids) == ([2, 1, 0], [1, 1, 0])) + + # Test nested with task (small output) + @ray.remote(resources={"node_1": 1}) + def nest_task(s): + @ray.remote(resources={"node_2": 1}) + def task(): + return [1] * s + + # t is owned by node_1 + # if s is small, + # then it's is stored in memory of node_1 (no primary copy) + # else it's stored in object store of node_1 + t = task.remote() + return t + + # nest_ref is owned by node_0 + # nest_ref is stored in memory (no primary copy) + nest_ref = nest_task.remote(1) # noqa: F841 + wait_for_condition(lambda: get_owner_info(node_ids) == ([3, 2, 0], [1, 1, 0])) + + big_nest = nest_task.remote(1024 * 1024 * 10) # noqa: F841 + + wait_for_condition(lambda: get_owner_info(node_ids) == ([4, 3, 0], [1, 1, 1])) + + # Test with assigned owned + @ray.remote(resources={"node_2": 0.5}, num_cpus=0) + class A: + def ready(self): + return + + def gen(self): + return ray.put(10) + + # actor is owned by node_0 + # actor is not an object, so no object store copies + actor = A.remote() # noqa: F841 + ray.get(actor.ready.remote()) + # o is owned by actor (node_2) + # o is stored in object store of node_0 + o = ray.put(1, _owner=actor) # noqa: F841 + wait_for_condition(lambda: get_owner_info(node_ids) == ([5, 3, 1], [2, 1, 1])) + + # Test with detached owned + # detached actor is owned by GCS. So it's not counted in the owner stats + detached_actor = A.options(lifetime="detached", name="A").remote() + ray.get(detached_actor.ready.remote()) + for i in range(3): + assert get_owner_info(node_ids) == ([5, 3, 1], [2, 1, 1]) + import time + + time.sleep(1) + # gen_obj is owned by node_0 + # the inner object is owned by A (node_2) + # the inner object is stored in object store of node_2 + gen_obj = detached_actor.gen.remote() # noqa: F841 + wait_for_condition(lambda: get_owner_info(node_ids) == ([6, 3, 2], [2, 1, 2])) + + def test_multi_node_metrics_export_port_discovery(ray_start_cluster): NUM_NODES = 3 cluster = ray_start_cluster diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 4c27c9bf7feb..ff03b5b85508 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -3515,6 +3515,7 @@ void CoreWorker::HandleGetCoreWorkerStats(rpc::GetCoreWorkerStatsRequest request stats->set_task_queue_length(task_queue_length_); stats->set_num_executed_tasks(num_executed_tasks_); stats->set_num_object_refs_in_scope(reference_counter_->NumObjectIDsInScope()); + stats->set_num_owned_objects(reference_counter_->NumObjectOwnedByUs()); stats->set_ip_address(rpc_address_.ip_address()); stats->set_port(rpc_address_.port()); stats->set_pid(getpid()); diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 970c9c990c65..ba5321828207 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -251,6 +251,7 @@ bool ReferenceCounter::AddOwnedObjectInternal( if (object_id_refs_.count(object_id) != 0) { return false; } + num_objects_owned_by_us_++; RAY_LOG(DEBUG) << "Adding owned object " << object_id; // If the entry doesn't exist, we initialize the direct reference count to zero // because this corresponds to a submitted task whose return ObjectID will be created @@ -666,6 +667,9 @@ void ReferenceCounter::EraseReference(ReferenceTable::iterator it) { reconstructable_owned_objects_index_.erase(index_it); } freed_objects_.erase(it->first); + if (it->second.owned_by_us) { + num_objects_owned_by_us_--; + } object_id_refs_.erase(it); ShutdownIfNeeded(); } @@ -811,6 +815,11 @@ size_t ReferenceCounter::NumObjectIDsInScope() const { return object_id_refs_.size(); } +size_t ReferenceCounter::NumObjectOwnedByUs() const { + absl::MutexLock lock(&mutex_); + return num_objects_owned_by_us_; +} + std::unordered_set ReferenceCounter::GetAllInScopeObjectIDs() const { absl::MutexLock lock(&mutex_); std::unordered_set in_scope_object_ids; diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index daf79082dd9a..c16ee0392119 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -315,6 +315,8 @@ class ReferenceCounter : public ReferenceCounterInterface, /// Returns the total number of ObjectIDs currently in scope. size_t NumObjectIDsInScope() const LOCKS_EXCLUDED(mutex_); + size_t NumObjectOwnedByUs() const LOCKS_EXCLUDED(mutex_); + /// Returns a set of all ObjectIDs currently in scope (i.e., nonzero reference count). std::unordered_set GetAllInScopeObjectIDs() const LOCKS_EXCLUDED(mutex_); @@ -1010,6 +1012,9 @@ class ReferenceCounter : public ReferenceCounterInterface, /// due to node failure. These objects are still in scope and need to be /// recovered. std::vector objects_to_recover_ GUARDED_BY(mutex_); + + /// Keep track of objects owend by this worker. + size_t num_objects_owned_by_us_ GUARDED_BY(mutex_) = 0; }; } // namespace core diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 751194bc8f0d..6ac9b1411135 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -763,6 +763,8 @@ message CoreWorkerStats { WorkerType worker_type = 23; // Length of the number of objects without truncation. int64 objects_total = 24; + // Number of objects owned by the worker. + int64 num_owned_objects = 25; } // Resource usage reported by the node reporter. diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index d5861747faab..16194eea2af7 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -219,6 +219,8 @@ message ObjectStoreStats { // the node has more pull requests than available object store // memory. bool object_pulls_queued = 13; + // The number of primary copies of objects in the local node. + int64 num_object_store_primary_copies = 14; } message GetNodeStatsReply { diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index e745adff3083..6747e5c93564 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -593,7 +593,7 @@ void LocalObjectManager::DeleteSpilledObjects(std::vector urls_to_d }); } -void LocalObjectManager::FillObjectSpillingStats(rpc::GetNodeStatsReply *reply) const { +void LocalObjectManager::FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const { auto stats = reply->mutable_store_stats(); stats->set_spill_time_total_s(spill_time_total_s_); stats->set_spilled_bytes_total(spilled_bytes_total_); @@ -602,6 +602,7 @@ void LocalObjectManager::FillObjectSpillingStats(rpc::GetNodeStatsReply *reply) stats->set_restored_bytes_total(restored_bytes_total_); stats->set_restored_objects_total(restored_objects_total_); stats->set_object_store_bytes_primary_copy(pinned_objects_size_); + stats->set_num_object_store_primary_copies(local_objects_.size()); } void LocalObjectManager::RecordMetrics() const { diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 72cb4db5d400..116776bd1d16 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -147,10 +147,10 @@ class LocalObjectManager { /// \return True if spilling is still in progress. False otherwise. bool IsSpillingInProgress(); - /// Populate object spilling stats. + /// Populate object store stats. /// - /// \param Output parameter. - void FillObjectSpillingStats(rpc::GetNodeStatsReply *reply) const; + /// \param reply Output parameter. + void FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const; /// Record object spilling stats to metrics. void RecordMetrics() const; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4fd965c5b940..f7249347a772 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2502,7 +2502,7 @@ void NodeManager::HandleGetNodeStats(rpc::GetNodeStatsRequest node_stats_request rpc::GetNodeStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { // Report object spilling stats. - local_object_manager_.FillObjectSpillingStats(reply); + local_object_manager_.FillObjectStoreStats(reply); // Report object store stats. object_manager_.FillObjectStoreStats(reply); // As a result of the HandleGetNodeStats, we are collecting information from all