diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 2636fe5a96d84..3b846da7bc1f9 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -552,6 +552,10 @@ message ResourcesData { int64 resources_normal_task_timestamp = 13; // Whether this node has detected a resource deadlock (full of actors). bool cluster_full_of_actors_detected = 14; + // The duration in ms during which all the node's resources are idle. If the + // node currently has any resource being used, this will 0. + int64 idle_duration_ms = 15; + // Last unused = 16 } message ResourceUsageBatchData { diff --git a/src/ray/raylet/scheduling/cluster_resource_data.cc b/src/ray/raylet/scheduling/cluster_resource_data.cc index 3a6c6247905c3..c110125e79c66 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.cc +++ b/src/ray/raylet/scheduling/cluster_resource_data.cc @@ -35,6 +35,17 @@ ResourceRequest ResourceMapToResourceRequest( return res; } +/// Convert a map of resources to a ResourceRequest data structure. +ResourceRequest ResourceMapToResourceRequest( + const absl::flat_hash_map &resource_map, + bool requires_object_store_memory) { + ResourceRequest res({}, requires_object_store_memory); + for (auto entry : resource_map) { + res.Set(entry.first, FixedPoint(entry.second)); + } + return res; +} + /// Convert a map of resources to a ResourceRequest data structure. /// /// \param string_to_int_map: Map between names and ids maintained by the diff --git a/src/ray/raylet/scheduling/cluster_resource_data.h b/src/ray/raylet/scheduling/cluster_resource_data.h index bd2f07690a505..1f7a3b6fee648 100644 --- a/src/ray/raylet/scheduling/cluster_resource_data.h +++ b/src/ray/raylet/scheduling/cluster_resource_data.h @@ -513,4 +513,9 @@ ResourceRequest ResourceMapToResourceRequest( const absl::flat_hash_map &resource_map, bool requires_object_store_memory); +/// Convert a map of resources to a ResourceRequest data structure. +ResourceRequest ResourceMapToResourceRequest( + const absl::flat_hash_map &resource_map, + bool requires_object_store_memory); + } // namespace ray diff --git a/src/ray/raylet/scheduling/local_resource_manager.cc b/src/ray/raylet/scheduling/local_resource_manager.cc index d5c5bc1bc474f..18bacd16a7283 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.cc +++ b/src/ray/raylet/scheduling/local_resource_manager.cc @@ -33,6 +33,10 @@ LocalResourceManager::LocalResourceManager( resource_change_subscriber_(resource_change_subscriber) { local_resources_.available = TaskResourceInstances(node_resources.available); local_resources_.total = TaskResourceInstances(node_resources.total); + const auto now = absl::Now(); + for (const auto &resource_id : local_resources_.total.ResourceIds()) { + resources_last_idle_time_[resource_id] = now; + } RAY_LOG(DEBUG) << "local resources: " << local_resources_.DebugString(); } @@ -40,6 +44,7 @@ void LocalResourceManager::AddLocalResourceInstances( scheduling::ResourceID resource_id, const std::vector &instances) { local_resources_.available.Add(resource_id, instances); local_resources_.total.Add(resource_id, instances); + resources_last_idle_time_[resource_id] = absl::Now(); OnResourceChanged(); } @@ -67,7 +72,8 @@ uint64_t LocalResourceManager::GetNumCpus() const { std::vector LocalResourceManager::AddAvailableResourceInstances( const std::vector &available, const std::vector &local_total, - std::vector &local_available) const { + std::vector &local_available, + bool *is_idle) const { RAY_CHECK(available.size() == local_available.size()) << available.size() << ", " << local_available.size(); std::vector overflow(available.size(), 0.); @@ -77,6 +83,10 @@ std::vector LocalResourceManager::AddAvailableResourceInstances( overflow[i] = (local_available[i] - local_total[i]); local_available[i] = local_total[i]; } + // If any resource instance is not idle, the whole resource is not idle. + if (is_idle != nullptr) { + *is_idle = *is_idle && (local_available[i] == local_total[i]); + } } return overflow; @@ -202,18 +212,26 @@ bool LocalResourceManager::AllocateTaskResourceInstances( FreeTaskResourceInstances(task_allocation); return false; } + + SetResourceNonIdle(resource_id); } return true; } void LocalResourceManager::FreeTaskResourceInstances( - std::shared_ptr task_allocation) { + std::shared_ptr task_allocation, bool record_idle_resource) { RAY_CHECK(task_allocation != nullptr); for (auto &resource_id : task_allocation->ResourceIds()) { if (local_resources_.total.Has(resource_id)) { + bool is_idle = true; AddAvailableResourceInstances(task_allocation->Get(resource_id), local_resources_.total.GetMutable(resource_id), - local_resources_.available.GetMutable(resource_id)); + local_resources_.available.GetMutable(resource_id), + &is_idle); + + if (record_idle_resource && is_idle) { + SetResourceIdle(resource_id); + } } } } @@ -227,10 +245,16 @@ std::vector LocalResourceManager::AddResourceInstances( return resource_instances; // No overflow. } + bool is_idle = true; auto overflow = AddAvailableResourceInstances(resource_instances_fp, local_resources_.total.GetMutable(resource_id), - local_resources_.available.GetMutable(resource_id)); + local_resources_.available.GetMutable(resource_id), + &is_idle); + + if (is_idle) { + SetResourceIdle(resource_id); + } OnResourceChanged(); return FixedPointVectorToDouble(overflow); @@ -256,6 +280,33 @@ std::vector LocalResourceManager::SubtractResourceInstances( return FixedPointVectorToDouble(underflow); } +void LocalResourceManager::SetResourceNonIdle(const scheduling::ResourceID &resource_id) { + // We o + resources_last_idle_time_[resource_id] = absl::nullopt; +} + +void LocalResourceManager::SetResourceIdle(const scheduling::ResourceID &resource_id) { + resources_last_idle_time_[resource_id] = absl::Now(); +} + +absl::optional LocalResourceManager::GetResourceIdleTime() const { + // If all the resources are idle. + absl::Time all_idle_time = absl::InfinitePast(); + + for (const auto &iter : resources_last_idle_time_) { + const auto &idle_time_or_busy = iter.second; + + if (idle_time_or_busy == absl::nullopt) { + // One resource is busy, entire resources should be considered non-idle. + return absl::nullopt; + } + + // Update the all resource idle time to be the most recent idle time. + all_idle_time = std::max(all_idle_time, idle_time_or_busy.value()); + } + return all_idle_time; +} + bool LocalResourceManager::AllocateLocalTaskResources( const ResourceRequest &resource_request, std::shared_ptr task_allocation) { @@ -312,6 +363,19 @@ void LocalResourceManager::UpdateAvailableObjectStoreMemResource() { local_resources_.available.Set(ResourceID::ObjectStoreMemory(), std::move(new_available)); OnResourceChanged(); + + // This is more of a discrete approximate of the last idle object store memory usage. + // TODO(rickyx): in order to know exactly when object store becomes idle/busy, we + // would need to plumb the info out of the object store directly. + if (used == 0.0) { + // Set it to idle as of now. + RAY_LOG(INFO) << "Object store memory is idle."; + resources_last_idle_time_[ResourceID::ObjectStoreMemory()] = absl::Now(); + } else { + // Clear the idle info since we know it's being used. + RAY_LOG(INFO) << "Object store memory is not idle."; + resources_last_idle_time_[ResourceID::ObjectStoreMemory()] = absl::nullopt; + } } } @@ -403,6 +467,10 @@ std::optional LocalResourceManager::CreateSyncMessage( resources_data.set_resources_available_changed(true); + const auto now = absl::Now(); + resources_data.set_idle_duration_ms( + absl::ToInt64Milliseconds(now - GetResourceIdleTime().value_or(now))); + msg.set_node_id(local_node_id_.Binary()); msg.set_version(version_); msg.set_message_type(message_type); diff --git a/src/ray/raylet/scheduling/local_resource_manager.h b/src/ray/raylet/scheduling/local_resource_manager.h index 36e94ffe23b36..411445f1c1d84 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.h +++ b/src/ray/raylet/scheduling/local_resource_manager.h @@ -172,6 +172,8 @@ class LocalResourceManager : public syncer::ReporterInterface { /// \param available A list of available capacities for resource's instances. /// \param local_total Local total resource instances. /// \param local_available Local available resource instances being updated. + /// \param[out] is_idle Pointer to record if the resource are idle after the + /// addition. This is nullptr if not recording idle resources. /// /// \return Overflow capacities of "local_available" after adding instance /// capacities in "available", i.e., @@ -179,7 +181,8 @@ class LocalResourceManager : public syncer::ReporterInterface { std::vector AddAvailableResourceInstances( const std::vector &available, const std::vector &local_total, - std::vector &local_available) const; + std::vector &local_available, + bool *is_idle = nullptr) const; /// Decrease the available capacities of the instances of a given resource. /// @@ -246,14 +249,27 @@ class LocalResourceManager : public syncer::ReporterInterface { /// added back to the node's local available resources. /// /// \param task_allocation: Task's resources to be freed. - void FreeTaskResourceInstances(std::shared_ptr task_allocation); + /// \param record_idle_resource: Whether to record the idle resource. This is false + /// when the resource was allocated partially so its idle state is actually not + /// affected. + void FreeTaskResourceInstances(std::shared_ptr task_allocation, + bool record_idle_resource = true); void UpdateAvailableObjectStoreMemResource(); + void SetResourceIdle(const scheduling::ResourceID &resource_id); + + void SetResourceNonIdle(const scheduling::ResourceID &resource_id); + + absl::optional GetResourceIdleTime() const; + /// Identifier of local node. scheduling::NodeID local_node_id_; /// Resources of local node. NodeResourceInstances local_resources_; + /// A map storing when the resource was last idle. + absl::flat_hash_map> + resources_last_idle_time_; /// Cached resources, used to compare with newest one in light heartbeat mode. std::unique_ptr last_report_resources_; /// Function to get used object store memory. @@ -277,6 +293,7 @@ class LocalResourceManager : public syncer::ReporterInterface { friend class LocalResourceManagerTest; FRIEND_TEST(LocalResourceManagerTest, BasicGetResourceUsageMapTest); + FRIEND_TEST(LocalResourceManagerTest, IdleResourceTimeTest); }; } // end namespace ray diff --git a/src/ray/raylet/scheduling/local_resource_manager_test.cc b/src/ray/raylet/scheduling/local_resource_manager_test.cc index 22fedf3329f03..42e5bafd255bc 100644 --- a/src/ray/raylet/scheduling/local_resource_manager_test.cc +++ b/src/ray/raylet/scheduling/local_resource_manager_test.cc @@ -26,10 +26,9 @@ class LocalResourceManagerTest : public ::testing::Test { } NodeResources CreateNodeResources( - absl::flat_hash_map resource_usage_map) { + absl::flat_hash_map resource_usage_map) { NodeResources resources; - for (auto &[resource, total] : resource_usage_map) { - auto resource_id = ResourceID(resource); + for (auto &[resource_id, total] : resource_usage_map) { resources.available.Set(resource_id, total); resources.total.Set(resource_id, total); } @@ -59,13 +58,13 @@ TEST_F(LocalResourceManagerTest, BasicGetResourceUsageMapTest) { auto pg_index_1_resource = "CPU_group_1_4482dec0faaf5ead891ff1659a9501000000"; manager = std::make_unique( local_node_id, - CreateNodeResources({{"CPU", 8.0}, - {"GPU", 2.0}, - {"CUSTOM", 4.0}, - {node_ip_resource, 1.0}, - {pg_wildcard_resource, 4.0}, - {pg_index_0_resource, 2.0}, - {pg_index_1_resource, 2.0}}), + CreateNodeResources({{ResourceID::CPU(), 8.0}, + {ResourceID::GPU(), 2.0}, + {ResourceID("CUSTOM"), 4.0}, + {ResourceID(node_ip_resource), 1.0}, + {ResourceID(pg_wildcard_resource), 4.0}, + {ResourceID(pg_index_0_resource), 2.0}, + {ResourceID(pg_index_1_resource), 2.0}}), nullptr, nullptr, nullptr); @@ -98,11 +97,17 @@ TEST_F(LocalResourceManagerTest, BasicGetResourceUsageMapTest) { /// Test when there's the allocation. /// { - const absl::flat_hash_map task_spec = { - {"CPU", 1.}, {"GPU", 0.5}, {"CUSTOM", 2.0}, {node_ip_resource, 0.01}}; std::shared_ptr task_allocation = std::make_shared(); - ASSERT_TRUE(manager->AllocateLocalTaskResources(task_spec, task_allocation)); + + ResourceRequest resource_request = + ResourceMapToResourceRequest({{ResourceID::CPU(), 1.}, + {ResourceID::GPU(), 0.5}, + {ResourceID("CUSTOM"), 2.0}, + {ResourceID(node_ip_resource), 0.01}}, + false); + + ASSERT_TRUE(manager->AllocateLocalTaskResources(resource_request, task_allocation)); auto resource_usage_map = manager->GetResourceUsageMap(); ResourceUsageMapDebugString(resource_usage_map); @@ -122,4 +127,139 @@ TEST_F(LocalResourceManagerTest, BasicGetResourceUsageMapTest) { } } +TEST_F(LocalResourceManagerTest, IdleResourceTimeTest) { + auto node_ip_resource = "node:127.0.0.1"; + auto pg_wildcard_resource = "CPU_group_4482dec0faaf5ead891ff1659a9501000000"; + auto pg_index_0_resource = "CPU_group_0_4482dec0faaf5ead891ff1659a9501000000"; + auto pg_index_1_resource = "CPU_group_1_4482dec0faaf5ead891ff1659a9501000000"; + auto used_object_store = std::make_unique(0); + manager = std::make_unique( + local_node_id, + CreateNodeResources({{ResourceID::CPU(), 8.0}, + {ResourceID::GPU(), 2.0}, + {ResourceID("CUSTOM"), 4.0}, + {ResourceID::ObjectStoreMemory(), 100.0}, + {ResourceID(node_ip_resource), 1.0}, + {ResourceID(pg_wildcard_resource), 4.0}, + {ResourceID(pg_index_0_resource), 2.0}, + {ResourceID(pg_index_1_resource), 2.0}}), + /* get_used_object_store_memory */ + [&used_object_store]() { return *used_object_store; }, + nullptr, + nullptr); + + /// Test when the resource is all idle when initialized. + { + auto idle_time = manager->GetResourceIdleTime(); + // Sleep for a while. + absl::SleepFor(absl::Seconds(1)); + + ASSERT_NE(idle_time, absl::nullopt); + ASSERT_NE(*idle_time, absl::InfinitePast()); + auto dur = absl::ToInt64Seconds(absl::Now() - *idle_time); + ASSERT_GE(dur, 1); + } + + /// Test that allocate some resources make it non-idle. + { + std::shared_ptr task_allocation = + std::make_shared(); + ResourceRequest resource_request = ResourceMapToResourceRequest( + {{ResourceID::CPU(), 1.}, {ResourceID("CUSTOM"), 1.0}}, false); + + manager->AllocateLocalTaskResources(resource_request, task_allocation); + + auto idle_time = manager->GetResourceIdleTime(); + ASSERT_EQ(idle_time, absl::nullopt); + } + + /// Test that deallocate some resources (not all) should not make it idle. + { + std::shared_ptr task_allocation = + std::make_shared(ResourceRequest( + ResourceMapToResourceRequest({{ResourceID::CPU(), 1.0}}, false))); + manager->FreeTaskResourceInstances(task_allocation, /* record_idle_resource */ true); + + auto idle_time = manager->GetResourceIdleTime(); + ASSERT_EQ(idle_time, absl::nullopt); + } + + // Test that deallocate all used resources make it idle. + { + std::shared_ptr task_allocation = + std::make_shared( + ResourceMapToResourceRequest({{ResourceID("CUSTOM"), 1.}}, false)); + manager->FreeTaskResourceInstances(task_allocation, /* record_idle_resource */ + true); + + auto idle_time = manager->GetResourceIdleTime(); + ASSERT_TRUE(idle_time.has_value()); + auto dur = absl::Now() - *idle_time; + ASSERT_GE(dur, absl::ZeroDuration()); + } + + { + // Sleep for a while should have the right idle time. + absl::SleepFor(absl::Seconds(1)); + { + // Test allocates same resource have the right idle time. + auto idle_time = manager->GetResourceIdleTime(); + ASSERT_TRUE(idle_time.has_value()); + ASSERT_GE(absl::Now() - *idle_time, absl::Seconds(1)); + } + + // Allocate the resource + { + std::shared_ptr task_allocation = + std::make_shared(); + ResourceRequest resource_request = + ResourceMapToResourceRequest({{ResourceID::CPU(), 1.}}, false); + + manager->AllocateLocalTaskResources(resource_request, task_allocation); + } + + // Should not be idle. + { + auto idle_time = manager->GetResourceIdleTime(); + ASSERT_EQ(idle_time, absl::nullopt); + } + + // Deallocate the resource + { + std::shared_ptr task_allocation = + std::make_shared( + ResourceMapToResourceRequest({{ResourceID::CPU(), 1.}}, false)); + manager->FreeTaskResourceInstances(task_allocation, /* record_idle_resource */ + true); + } + + // Check the idle time should be reset (not longer than 1 secs). + { + auto idle_time = manager->GetResourceIdleTime(); + ASSERT_TRUE(idle_time.has_value()); + auto dur = absl::Now() - *idle_time; + ASSERT_GE(dur, absl::ZeroDuration()); + ASSERT_LE(dur, absl::Seconds(1)); + } + } + + // Test object store resource is also making node non-idle when used. + { + *used_object_store = 1; + manager->UpdateAvailableObjectStoreMemResource(); + auto idle_time = manager->GetResourceIdleTime(); + ASSERT_EQ(idle_time, absl::nullopt); + } + + // Free object store memory usage should make node resource idle. + { + *used_object_store = 0; + manager->UpdateAvailableObjectStoreMemResource(); + auto idle_time = manager->GetResourceIdleTime(); + ASSERT_TRUE(idle_time.has_value()); + auto dur = absl::Now() - *idle_time; + ASSERT_GE(dur, absl::ZeroDuration()); + } +} + } // namespace ray