Skip to content

Commit

Permalink
[core] Add resource idle time to resource report from node. (#36670)
Browse files Browse the repository at this point in the history
Signed-off-by: rickyyx <rickyx@anyscale.com>
  • Loading branch information
rickyyx authored Jun 27, 2023
1 parent 35aaa14 commit 1974443
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 19 deletions.
4 changes: 4 additions & 0 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,10 @@ message ResourcesData {
int64 resources_normal_task_timestamp = 13;
// Whether this node has detected a resource deadlock (full of actors).
bool cluster_full_of_actors_detected = 14;
// The duration in ms during which all the node's resources are idle. If the
// node currently has any resource being used, this will 0.
int64 idle_duration_ms = 15;
// Last unused = 16
}

message ResourceUsageBatchData {
Expand Down
11 changes: 11 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ ResourceRequest ResourceMapToResourceRequest(
return res;
}

/// Convert a map of resources to a ResourceRequest data structure.
ResourceRequest ResourceMapToResourceRequest(
const absl::flat_hash_map<ResourceID, double> &resource_map,
bool requires_object_store_memory) {
ResourceRequest res({}, requires_object_store_memory);
for (auto entry : resource_map) {
res.Set(entry.first, FixedPoint(entry.second));
}
return res;
}

/// Convert a map of resources to a ResourceRequest data structure.
///
/// \param string_to_int_map: Map between names and ids maintained by the
Expand Down
5 changes: 5 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -513,4 +513,9 @@ ResourceRequest ResourceMapToResourceRequest(
const absl::flat_hash_map<std::string, double> &resource_map,
bool requires_object_store_memory);

/// Convert a map of resources to a ResourceRequest data structure.
ResourceRequest ResourceMapToResourceRequest(
const absl::flat_hash_map<ResourceID, double> &resource_map,
bool requires_object_store_memory);

} // namespace ray
76 changes: 72 additions & 4 deletions src/ray/raylet/scheduling/local_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,18 @@ LocalResourceManager::LocalResourceManager(
resource_change_subscriber_(resource_change_subscriber) {
local_resources_.available = TaskResourceInstances(node_resources.available);
local_resources_.total = TaskResourceInstances(node_resources.total);
const auto now = absl::Now();
for (const auto &resource_id : local_resources_.total.ResourceIds()) {
resources_last_idle_time_[resource_id] = now;
}
RAY_LOG(DEBUG) << "local resources: " << local_resources_.DebugString();
}

void LocalResourceManager::AddLocalResourceInstances(
scheduling::ResourceID resource_id, const std::vector<FixedPoint> &instances) {
local_resources_.available.Add(resource_id, instances);
local_resources_.total.Add(resource_id, instances);
resources_last_idle_time_[resource_id] = absl::Now();
OnResourceChanged();
}

Expand Down Expand Up @@ -67,7 +72,8 @@ uint64_t LocalResourceManager::GetNumCpus() const {
std::vector<FixedPoint> LocalResourceManager::AddAvailableResourceInstances(
const std::vector<FixedPoint> &available,
const std::vector<FixedPoint> &local_total,
std::vector<FixedPoint> &local_available) const {
std::vector<FixedPoint> &local_available,
bool *is_idle) const {
RAY_CHECK(available.size() == local_available.size())
<< available.size() << ", " << local_available.size();
std::vector<FixedPoint> overflow(available.size(), 0.);
Expand All @@ -77,6 +83,10 @@ std::vector<FixedPoint> LocalResourceManager::AddAvailableResourceInstances(
overflow[i] = (local_available[i] - local_total[i]);
local_available[i] = local_total[i];
}
// If any resource instance is not idle, the whole resource is not idle.
if (is_idle != nullptr) {
*is_idle = *is_idle && (local_available[i] == local_total[i]);
}
}

return overflow;
Expand Down Expand Up @@ -202,18 +212,26 @@ bool LocalResourceManager::AllocateTaskResourceInstances(
FreeTaskResourceInstances(task_allocation);
return false;
}

SetResourceNonIdle(resource_id);
}
return true;
}

void LocalResourceManager::FreeTaskResourceInstances(
std::shared_ptr<TaskResourceInstances> task_allocation) {
std::shared_ptr<TaskResourceInstances> task_allocation, bool record_idle_resource) {
RAY_CHECK(task_allocation != nullptr);
for (auto &resource_id : task_allocation->ResourceIds()) {
if (local_resources_.total.Has(resource_id)) {
bool is_idle = true;
AddAvailableResourceInstances(task_allocation->Get(resource_id),
local_resources_.total.GetMutable(resource_id),
local_resources_.available.GetMutable(resource_id));
local_resources_.available.GetMutable(resource_id),
&is_idle);

if (record_idle_resource && is_idle) {
SetResourceIdle(resource_id);
}
}
}
}
Expand All @@ -227,10 +245,16 @@ std::vector<double> LocalResourceManager::AddResourceInstances(
return resource_instances; // No overflow.
}

bool is_idle = true;
auto overflow =
AddAvailableResourceInstances(resource_instances_fp,
local_resources_.total.GetMutable(resource_id),
local_resources_.available.GetMutable(resource_id));
local_resources_.available.GetMutable(resource_id),
&is_idle);

if (is_idle) {
SetResourceIdle(resource_id);
}
OnResourceChanged();

return FixedPointVectorToDouble(overflow);
Expand All @@ -256,6 +280,33 @@ std::vector<double> LocalResourceManager::SubtractResourceInstances(
return FixedPointVectorToDouble(underflow);
}

void LocalResourceManager::SetResourceNonIdle(const scheduling::ResourceID &resource_id) {
// We o
resources_last_idle_time_[resource_id] = absl::nullopt;
}

void LocalResourceManager::SetResourceIdle(const scheduling::ResourceID &resource_id) {
resources_last_idle_time_[resource_id] = absl::Now();
}

absl::optional<absl::Time> LocalResourceManager::GetResourceIdleTime() const {
// If all the resources are idle.
absl::Time all_idle_time = absl::InfinitePast();

for (const auto &iter : resources_last_idle_time_) {
const auto &idle_time_or_busy = iter.second;

if (idle_time_or_busy == absl::nullopt) {
// One resource is busy, entire resources should be considered non-idle.
return absl::nullopt;
}

// Update the all resource idle time to be the most recent idle time.
all_idle_time = std::max(all_idle_time, idle_time_or_busy.value());
}
return all_idle_time;
}

bool LocalResourceManager::AllocateLocalTaskResources(
const ResourceRequest &resource_request,
std::shared_ptr<TaskResourceInstances> task_allocation) {
Expand Down Expand Up @@ -312,6 +363,19 @@ void LocalResourceManager::UpdateAvailableObjectStoreMemResource() {
local_resources_.available.Set(ResourceID::ObjectStoreMemory(),
std::move(new_available));
OnResourceChanged();

// This is more of a discrete approximate of the last idle object store memory usage.
// TODO(rickyx): in order to know exactly when object store becomes idle/busy, we
// would need to plumb the info out of the object store directly.
if (used == 0.0) {
// Set it to idle as of now.
RAY_LOG(INFO) << "Object store memory is idle.";
resources_last_idle_time_[ResourceID::ObjectStoreMemory()] = absl::Now();
} else {
// Clear the idle info since we know it's being used.
RAY_LOG(INFO) << "Object store memory is not idle.";
resources_last_idle_time_[ResourceID::ObjectStoreMemory()] = absl::nullopt;
}
}
}

Expand Down Expand Up @@ -403,6 +467,10 @@ std::optional<syncer::RaySyncMessage> LocalResourceManager::CreateSyncMessage(

resources_data.set_resources_available_changed(true);

const auto now = absl::Now();
resources_data.set_idle_duration_ms(
absl::ToInt64Milliseconds(now - GetResourceIdleTime().value_or(now)));

msg.set_node_id(local_node_id_.Binary());
msg.set_version(version_);
msg.set_message_type(message_type);
Expand Down
21 changes: 19 additions & 2 deletions src/ray/raylet/scheduling/local_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,17 @@ class LocalResourceManager : public syncer::ReporterInterface {
/// \param available A list of available capacities for resource's instances.
/// \param local_total Local total resource instances.
/// \param local_available Local available resource instances being updated.
/// \param[out] is_idle Pointer to record if the resource are idle after the
/// addition. This is nullptr if not recording idle resources.
///
/// \return Overflow capacities of "local_available" after adding instance
/// capacities in "available", i.e.,
/// min(available + local_available, local_total)
std::vector<FixedPoint> AddAvailableResourceInstances(
const std::vector<FixedPoint> &available,
const std::vector<FixedPoint> &local_total,
std::vector<FixedPoint> &local_available) const;
std::vector<FixedPoint> &local_available,
bool *is_idle = nullptr) const;

/// Decrease the available capacities of the instances of a given resource.
///
Expand Down Expand Up @@ -246,14 +249,27 @@ class LocalResourceManager : public syncer::ReporterInterface {
/// added back to the node's local available resources.
///
/// \param task_allocation: Task's resources to be freed.
void FreeTaskResourceInstances(std::shared_ptr<TaskResourceInstances> task_allocation);
/// \param record_idle_resource: Whether to record the idle resource. This is false
/// when the resource was allocated partially so its idle state is actually not
/// affected.
void FreeTaskResourceInstances(std::shared_ptr<TaskResourceInstances> task_allocation,
bool record_idle_resource = true);

void UpdateAvailableObjectStoreMemResource();

void SetResourceIdle(const scheduling::ResourceID &resource_id);

void SetResourceNonIdle(const scheduling::ResourceID &resource_id);

absl::optional<absl::Time> GetResourceIdleTime() const;

/// Identifier of local node.
scheduling::NodeID local_node_id_;
/// Resources of local node.
NodeResourceInstances local_resources_;
/// A map storing when the resource was last idle.
absl::flat_hash_map<scheduling::ResourceID, absl::optional<absl::Time>>
resources_last_idle_time_;
/// Cached resources, used to compare with newest one in light heartbeat mode.
std::unique_ptr<NodeResources> last_report_resources_;
/// Function to get used object store memory.
Expand All @@ -277,6 +293,7 @@ class LocalResourceManager : public syncer::ReporterInterface {

friend class LocalResourceManagerTest;
FRIEND_TEST(LocalResourceManagerTest, BasicGetResourceUsageMapTest);
FRIEND_TEST(LocalResourceManagerTest, IdleResourceTimeTest);
};

} // end namespace ray
Loading

0 comments on commit 1974443

Please sign in to comment.