diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index 4e629da924414..a9357461d0aa3 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -100,6 +100,131 @@ def verify(): wait_for_condition(verify) +def get_owner_info(node_ids): + node_addrs = { + n["NodeID"]: (n["NodeManagerAddress"], n["NodeManagerPort"]) + for n in ray.nodes() + } + # Force a global gc to clean up the object store. + ray._private.internal_api.global_gc() + owner_stats = {n: 0 for n in node_ids} + primary_copy_stats = {n: 0 for n in node_ids} + + for node_id in node_ids: + node_stats = ray._private.internal_api.node_stats( + node_addrs[node_id][0], node_addrs[node_id][1], False + ) + owner_stats[node_id] = sum( + [stats.num_owned_objects for stats in node_stats.core_workers_stats] + ) + primary_copy_stats[ + node_id + ] = node_stats.store_stats.num_object_store_primary_copies + + print(owner_stats) + print(node_ids) + owner_stats = [owner_stats.get(node_id, 0) for node_id in node_ids] + primary_copy_stats = [primary_copy_stats.get(node_id, 0) for node_id in node_ids] + print("owner_stats", owner_stats) + print("primary_copy_stats", primary_copy_stats) + + return owner_stats, primary_copy_stats + + +def test_node_object_metrics(ray_start_cluster, monkeypatch): + NUM_NODES = 3 + cluster = ray_start_cluster + for i in range(NUM_NODES): + cluster.add_node(True, resources={f"node_{i}": 1}) + if i == 0: + ray.init(address=cluster.address) + node_ids = [] + + for i in range(NUM_NODES): + + @ray.remote(resources={f"node_{i}": 1}) + def get_node_id(): + return ray.get_runtime_context().get_node_id() + + node_ids.append(ray.get(get_node_id.remote())) + + # Object store stats + # x is owned by node_0 + # x is stored at node_0 + x = ray.put([1]) # noqa: F841 + wait_for_condition(lambda: get_owner_info(node_ids) == ([1, 0, 0], [1, 0, 0])) + + # Test nested with put + @ray.remote(resources={"node_1": 1}) + def big_obj(): + # b is owned by node_1 + # b is stored at node_1 + b = ray.put([1] * 1024 * 1024 * 10) + return b + + # Object store stats + # big_obj is owned by node_0 + # big_obj is stored in memory (no primary copy) + big_obj_ref = big_obj.remote() # noqa: F841 + wait_for_condition(lambda: get_owner_info(node_ids) == ([2, 1, 0], [1, 1, 0])) + + # Test nested with task (small output) + @ray.remote(resources={"node_1": 1}) + def nest_task(s): + @ray.remote(resources={"node_2": 1}) + def task(): + return [1] * s + + # t is owned by node_1 + # if s is small, + # then it's is stored in memory of node_1 (no primary copy) + # else it's stored in object store of node_1 + t = task.remote() + return t + + # nest_ref is owned by node_0 + # nest_ref is stored in memory (no primary copy) + nest_ref = nest_task.remote(1) # noqa: F841 + wait_for_condition(lambda: get_owner_info(node_ids) == ([3, 2, 0], [1, 1, 0])) + + big_nest = nest_task.remote(1024 * 1024 * 10) # noqa: F841 + + wait_for_condition(lambda: get_owner_info(node_ids) == ([4, 3, 0], [1, 1, 1])) + + # Test with assigned owned + @ray.remote(resources={"node_2": 0.5}, num_cpus=0) + class A: + def ready(self): + return + + def gen(self): + return ray.put(10) + + # actor is owned by node_0 + # actor is not an object, so no object store copies + actor = A.remote() # noqa: F841 + ray.get(actor.ready.remote()) + # o is owned by actor (node_2) + # o is stored in object store of node_0 + o = ray.put(1, _owner=actor) # noqa: F841 + wait_for_condition(lambda: get_owner_info(node_ids) == ([5, 3, 1], [2, 1, 1])) + + # Test with detached owned + # detached actor is owned by GCS. So it's not counted in the owner stats + detached_actor = A.options(lifetime="detached", name="A").remote() + ray.get(detached_actor.ready.remote()) + for i in range(3): + assert get_owner_info(node_ids) == ([5, 3, 1], [2, 1, 1]) + import time + + time.sleep(1) + # gen_obj is owned by node_0 + # the inner object is owned by A (node_2) + # the inner object is stored in object store of node_2 + gen_obj = detached_actor.gen.remote() # noqa: F841 + wait_for_condition(lambda: get_owner_info(node_ids) == ([6, 3, 2], [2, 1, 2])) + + def test_multi_node_metrics_export_port_discovery(ray_start_cluster): NUM_NODES = 3 cluster = ray_start_cluster diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 4c27c9bf7febb..ff03b5b855086 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -3515,6 +3515,7 @@ void CoreWorker::HandleGetCoreWorkerStats(rpc::GetCoreWorkerStatsRequest request stats->set_task_queue_length(task_queue_length_); stats->set_num_executed_tasks(num_executed_tasks_); stats->set_num_object_refs_in_scope(reference_counter_->NumObjectIDsInScope()); + stats->set_num_owned_objects(reference_counter_->NumObjectOwnedByUs()); stats->set_ip_address(rpc_address_.ip_address()); stats->set_port(rpc_address_.port()); stats->set_pid(getpid()); diff --git a/src/ray/core_worker/reference_count.cc b/src/ray/core_worker/reference_count.cc index 970c9c990c658..ba5321828207a 100644 --- a/src/ray/core_worker/reference_count.cc +++ b/src/ray/core_worker/reference_count.cc @@ -251,6 +251,7 @@ bool ReferenceCounter::AddOwnedObjectInternal( if (object_id_refs_.count(object_id) != 0) { return false; } + num_objects_owned_by_us_++; RAY_LOG(DEBUG) << "Adding owned object " << object_id; // If the entry doesn't exist, we initialize the direct reference count to zero // because this corresponds to a submitted task whose return ObjectID will be created @@ -666,6 +667,9 @@ void ReferenceCounter::EraseReference(ReferenceTable::iterator it) { reconstructable_owned_objects_index_.erase(index_it); } freed_objects_.erase(it->first); + if (it->second.owned_by_us) { + num_objects_owned_by_us_--; + } object_id_refs_.erase(it); ShutdownIfNeeded(); } @@ -811,6 +815,11 @@ size_t ReferenceCounter::NumObjectIDsInScope() const { return object_id_refs_.size(); } +size_t ReferenceCounter::NumObjectOwnedByUs() const { + absl::MutexLock lock(&mutex_); + return num_objects_owned_by_us_; +} + std::unordered_set ReferenceCounter::GetAllInScopeObjectIDs() const { absl::MutexLock lock(&mutex_); std::unordered_set in_scope_object_ids; diff --git a/src/ray/core_worker/reference_count.h b/src/ray/core_worker/reference_count.h index daf79082dd9a2..c16ee03921190 100644 --- a/src/ray/core_worker/reference_count.h +++ b/src/ray/core_worker/reference_count.h @@ -315,6 +315,8 @@ class ReferenceCounter : public ReferenceCounterInterface, /// Returns the total number of ObjectIDs currently in scope. size_t NumObjectIDsInScope() const LOCKS_EXCLUDED(mutex_); + size_t NumObjectOwnedByUs() const LOCKS_EXCLUDED(mutex_); + /// Returns a set of all ObjectIDs currently in scope (i.e., nonzero reference count). std::unordered_set GetAllInScopeObjectIDs() const LOCKS_EXCLUDED(mutex_); @@ -1010,6 +1012,9 @@ class ReferenceCounter : public ReferenceCounterInterface, /// due to node failure. These objects are still in scope and need to be /// recovered. std::vector objects_to_recover_ GUARDED_BY(mutex_); + + /// Keep track of objects owend by this worker. + size_t num_objects_owned_by_us_ GUARDED_BY(mutex_) = 0; }; } // namespace core diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 751194bc8f0d9..6ac9b14111358 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -763,6 +763,8 @@ message CoreWorkerStats { WorkerType worker_type = 23; // Length of the number of objects without truncation. int64 objects_total = 24; + // Number of objects owned by the worker. + int64 num_owned_objects = 25; } // Resource usage reported by the node reporter. diff --git a/src/ray/protobuf/node_manager.proto b/src/ray/protobuf/node_manager.proto index d5861747faab2..16194eea2af7d 100644 --- a/src/ray/protobuf/node_manager.proto +++ b/src/ray/protobuf/node_manager.proto @@ -219,6 +219,8 @@ message ObjectStoreStats { // the node has more pull requests than available object store // memory. bool object_pulls_queued = 13; + // The number of primary copies of objects in the local node. + int64 num_object_store_primary_copies = 14; } message GetNodeStatsReply { diff --git a/src/ray/raylet/local_object_manager.cc b/src/ray/raylet/local_object_manager.cc index e745adff30838..6747e5c93564d 100644 --- a/src/ray/raylet/local_object_manager.cc +++ b/src/ray/raylet/local_object_manager.cc @@ -593,7 +593,7 @@ void LocalObjectManager::DeleteSpilledObjects(std::vector urls_to_d }); } -void LocalObjectManager::FillObjectSpillingStats(rpc::GetNodeStatsReply *reply) const { +void LocalObjectManager::FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const { auto stats = reply->mutable_store_stats(); stats->set_spill_time_total_s(spill_time_total_s_); stats->set_spilled_bytes_total(spilled_bytes_total_); @@ -602,6 +602,7 @@ void LocalObjectManager::FillObjectSpillingStats(rpc::GetNodeStatsReply *reply) stats->set_restored_bytes_total(restored_bytes_total_); stats->set_restored_objects_total(restored_objects_total_); stats->set_object_store_bytes_primary_copy(pinned_objects_size_); + stats->set_num_object_store_primary_copies(local_objects_.size()); } void LocalObjectManager::RecordMetrics() const { diff --git a/src/ray/raylet/local_object_manager.h b/src/ray/raylet/local_object_manager.h index 72cb4db5d4009..116776bd1d16e 100644 --- a/src/ray/raylet/local_object_manager.h +++ b/src/ray/raylet/local_object_manager.h @@ -147,10 +147,10 @@ class LocalObjectManager { /// \return True if spilling is still in progress. False otherwise. bool IsSpillingInProgress(); - /// Populate object spilling stats. + /// Populate object store stats. /// - /// \param Output parameter. - void FillObjectSpillingStats(rpc::GetNodeStatsReply *reply) const; + /// \param reply Output parameter. + void FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const; /// Record object spilling stats to metrics. void RecordMetrics() const; diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4fd965c5b940d..f7249347a7726 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2502,7 +2502,7 @@ void NodeManager::HandleGetNodeStats(rpc::GetNodeStatsRequest node_stats_request rpc::GetNodeStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { // Report object spilling stats. - local_object_manager_.FillObjectSpillingStats(reply); + local_object_manager_.FillObjectStoreStats(reply); // Report object store stats. object_manager_.FillObjectStoreStats(reply); // As a result of the HandleGetNodeStats, we are collecting information from all