Skip to content

Commit

Permalink
[core] Add object owner and copy metrics to node stats (ray-project#3…
Browse files Browse the repository at this point in the history
…5119)

This PR adds the object owner and copy metrics to `GetNodeStats` RPC endpoint.

The inlined small objects are not counted as one copy because it's not stored in object store and when it's used, it'll be copied inline, so no need to count it.

But it's still counted as 1 as ownership for correctness because it's actually owned by worker.

The local copies are retrieved from local object manager directly and owner counts needs the caller to aggregate the metrics from each core worker.

Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
  • Loading branch information
fishbone authored and arvind-chandra committed Aug 31, 2023
1 parent fb3542a commit d4324fc
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 5 deletions.
125 changes: 125 additions & 0 deletions python/ray/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,131 @@ def verify():
wait_for_condition(verify)


def get_owner_info(node_ids):
node_addrs = {
n["NodeID"]: (n["NodeManagerAddress"], n["NodeManagerPort"])
for n in ray.nodes()
}
# Force a global gc to clean up the object store.
ray._private.internal_api.global_gc()
owner_stats = {n: 0 for n in node_ids}
primary_copy_stats = {n: 0 for n in node_ids}

for node_id in node_ids:
node_stats = ray._private.internal_api.node_stats(
node_addrs[node_id][0], node_addrs[node_id][1], False
)
owner_stats[node_id] = sum(
[stats.num_owned_objects for stats in node_stats.core_workers_stats]
)
primary_copy_stats[
node_id
] = node_stats.store_stats.num_object_store_primary_copies

print(owner_stats)
print(node_ids)
owner_stats = [owner_stats.get(node_id, 0) for node_id in node_ids]
primary_copy_stats = [primary_copy_stats.get(node_id, 0) for node_id in node_ids]
print("owner_stats", owner_stats)
print("primary_copy_stats", primary_copy_stats)

return owner_stats, primary_copy_stats


def test_node_object_metrics(ray_start_cluster, monkeypatch):
NUM_NODES = 3
cluster = ray_start_cluster
for i in range(NUM_NODES):
cluster.add_node(True, resources={f"node_{i}": 1})
if i == 0:
ray.init(address=cluster.address)
node_ids = []

for i in range(NUM_NODES):

@ray.remote(resources={f"node_{i}": 1})
def get_node_id():
return ray.get_runtime_context().get_node_id()

node_ids.append(ray.get(get_node_id.remote()))

# Object store stats
# x is owned by node_0
# x is stored at node_0
x = ray.put([1]) # noqa: F841
wait_for_condition(lambda: get_owner_info(node_ids) == ([1, 0, 0], [1, 0, 0]))

# Test nested with put
@ray.remote(resources={"node_1": 1})
def big_obj():
# b is owned by node_1
# b is stored at node_1
b = ray.put([1] * 1024 * 1024 * 10)
return b

# Object store stats
# big_obj is owned by node_0
# big_obj is stored in memory (no primary copy)
big_obj_ref = big_obj.remote() # noqa: F841
wait_for_condition(lambda: get_owner_info(node_ids) == ([2, 1, 0], [1, 1, 0]))

# Test nested with task (small output)
@ray.remote(resources={"node_1": 1})
def nest_task(s):
@ray.remote(resources={"node_2": 1})
def task():
return [1] * s

# t is owned by node_1
# if s is small,
# then it's is stored in memory of node_1 (no primary copy)
# else it's stored in object store of node_1
t = task.remote()
return t

# nest_ref is owned by node_0
# nest_ref is stored in memory (no primary copy)
nest_ref = nest_task.remote(1) # noqa: F841
wait_for_condition(lambda: get_owner_info(node_ids) == ([3, 2, 0], [1, 1, 0]))

big_nest = nest_task.remote(1024 * 1024 * 10) # noqa: F841

wait_for_condition(lambda: get_owner_info(node_ids) == ([4, 3, 0], [1, 1, 1]))

# Test with assigned owned
@ray.remote(resources={"node_2": 0.5}, num_cpus=0)
class A:
def ready(self):
return

def gen(self):
return ray.put(10)

# actor is owned by node_0
# actor is not an object, so no object store copies
actor = A.remote() # noqa: F841
ray.get(actor.ready.remote())
# o is owned by actor (node_2)
# o is stored in object store of node_0
o = ray.put(1, _owner=actor) # noqa: F841
wait_for_condition(lambda: get_owner_info(node_ids) == ([5, 3, 1], [2, 1, 1]))

# Test with detached owned
# detached actor is owned by GCS. So it's not counted in the owner stats
detached_actor = A.options(lifetime="detached", name="A").remote()
ray.get(detached_actor.ready.remote())
for i in range(3):
assert get_owner_info(node_ids) == ([5, 3, 1], [2, 1, 1])
import time

time.sleep(1)
# gen_obj is owned by node_0
# the inner object is owned by A (node_2)
# the inner object is stored in object store of node_2
gen_obj = detached_actor.gen.remote() # noqa: F841
wait_for_condition(lambda: get_owner_info(node_ids) == ([6, 3, 2], [2, 1, 2]))


def test_multi_node_metrics_export_port_discovery(ray_start_cluster):
NUM_NODES = 3
cluster = ray_start_cluster
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3515,6 +3515,7 @@ void CoreWorker::HandleGetCoreWorkerStats(rpc::GetCoreWorkerStatsRequest request
stats->set_task_queue_length(task_queue_length_);
stats->set_num_executed_tasks(num_executed_tasks_);
stats->set_num_object_refs_in_scope(reference_counter_->NumObjectIDsInScope());
stats->set_num_owned_objects(reference_counter_->NumObjectOwnedByUs());
stats->set_ip_address(rpc_address_.ip_address());
stats->set_port(rpc_address_.port());
stats->set_pid(getpid());
Expand Down
9 changes: 9 additions & 0 deletions src/ray/core_worker/reference_count.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ bool ReferenceCounter::AddOwnedObjectInternal(
if (object_id_refs_.count(object_id) != 0) {
return false;
}
num_objects_owned_by_us_++;
RAY_LOG(DEBUG) << "Adding owned object " << object_id;
// If the entry doesn't exist, we initialize the direct reference count to zero
// because this corresponds to a submitted task whose return ObjectID will be created
Expand Down Expand Up @@ -666,6 +667,9 @@ void ReferenceCounter::EraseReference(ReferenceTable::iterator it) {
reconstructable_owned_objects_index_.erase(index_it);
}
freed_objects_.erase(it->first);
if (it->second.owned_by_us) {
num_objects_owned_by_us_--;
}
object_id_refs_.erase(it);
ShutdownIfNeeded();
}
Expand Down Expand Up @@ -811,6 +815,11 @@ size_t ReferenceCounter::NumObjectIDsInScope() const {
return object_id_refs_.size();
}

size_t ReferenceCounter::NumObjectOwnedByUs() const {
absl::MutexLock lock(&mutex_);
return num_objects_owned_by_us_;
}

std::unordered_set<ObjectID> ReferenceCounter::GetAllInScopeObjectIDs() const {
absl::MutexLock lock(&mutex_);
std::unordered_set<ObjectID> in_scope_object_ids;
Expand Down
5 changes: 5 additions & 0 deletions src/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// Returns the total number of ObjectIDs currently in scope.
size_t NumObjectIDsInScope() const LOCKS_EXCLUDED(mutex_);

size_t NumObjectOwnedByUs() const LOCKS_EXCLUDED(mutex_);

/// Returns a set of all ObjectIDs currently in scope (i.e., nonzero reference count).
std::unordered_set<ObjectID> GetAllInScopeObjectIDs() const LOCKS_EXCLUDED(mutex_);

Expand Down Expand Up @@ -1010,6 +1012,9 @@ class ReferenceCounter : public ReferenceCounterInterface,
/// due to node failure. These objects are still in scope and need to be
/// recovered.
std::vector<ObjectID> objects_to_recover_ GUARDED_BY(mutex_);

/// Keep track of objects owend by this worker.
size_t num_objects_owned_by_us_ GUARDED_BY(mutex_) = 0;
};

} // namespace core
Expand Down
2 changes: 2 additions & 0 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,8 @@ message CoreWorkerStats {
WorkerType worker_type = 23;
// Length of the number of objects without truncation.
int64 objects_total = 24;
// Number of objects owned by the worker.
int64 num_owned_objects = 25;
}

// Resource usage reported by the node reporter.
Expand Down
2 changes: 2 additions & 0 deletions src/ray/protobuf/node_manager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ message ObjectStoreStats {
// the node has more pull requests than available object store
// memory.
bool object_pulls_queued = 13;
// The number of primary copies of objects in the local node.
int64 num_object_store_primary_copies = 14;
}

message GetNodeStatsReply {
Expand Down
3 changes: 2 additions & 1 deletion src/ray/raylet/local_object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ void LocalObjectManager::DeleteSpilledObjects(std::vector<std::string> urls_to_d
});
}

void LocalObjectManager::FillObjectSpillingStats(rpc::GetNodeStatsReply *reply) const {
void LocalObjectManager::FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const {
auto stats = reply->mutable_store_stats();
stats->set_spill_time_total_s(spill_time_total_s_);
stats->set_spilled_bytes_total(spilled_bytes_total_);
Expand All @@ -602,6 +602,7 @@ void LocalObjectManager::FillObjectSpillingStats(rpc::GetNodeStatsReply *reply)
stats->set_restored_bytes_total(restored_bytes_total_);
stats->set_restored_objects_total(restored_objects_total_);
stats->set_object_store_bytes_primary_copy(pinned_objects_size_);
stats->set_num_object_store_primary_copies(local_objects_.size());
}

void LocalObjectManager::RecordMetrics() const {
Expand Down
6 changes: 3 additions & 3 deletions src/ray/raylet/local_object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ class LocalObjectManager {
/// \return True if spilling is still in progress. False otherwise.
bool IsSpillingInProgress();

/// Populate object spilling stats.
/// Populate object store stats.
///
/// \param Output parameter.
void FillObjectSpillingStats(rpc::GetNodeStatsReply *reply) const;
/// \param reply Output parameter.
void FillObjectStoreStats(rpc::GetNodeStatsReply *reply) const;

/// Record object spilling stats to metrics.
void RecordMetrics() const;
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2502,7 +2502,7 @@ void NodeManager::HandleGetNodeStats(rpc::GetNodeStatsRequest node_stats_request
rpc::GetNodeStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
// Report object spilling stats.
local_object_manager_.FillObjectSpillingStats(reply);
local_object_manager_.FillObjectStoreStats(reply);
// Report object store stats.
object_manager_.FillObjectStoreStats(reply);
// As a result of the HandleGetNodeStats, we are collecting information from all
Expand Down

0 comments on commit d4324fc

Please sign in to comment.