Skip to content

Commit

Permalink
[core][autoscaler] Fix idle resource status accounting from raylet #3…
Browse files Browse the repository at this point in the history
…8268 (#38285)


---------

Signed-off-by: rickyyx <rickyx@anyscale.com>
  • Loading branch information
rickyyx authored Aug 10, 2023
1 parent c8cde88 commit 8a434b4
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 9 deletions.
16 changes: 8 additions & 8 deletions python/ray/autoscaler/v2/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ py_test(
name = "test_storage",
size = "small",
srcs = ["tests/test_storage.py"],
tags = ["team:core"],
tags = ["team:core", "exclusive"],
deps = ["//:ray_lib",],
)

py_test(
name = "test_config",
size = "small",
srcs = ["tests/test_config.py"],
tags = ["team:core"],
tags = ["team:core", "exclusive"],
deps = [
"//:ray_lib",
]
Expand All @@ -42,39 +42,39 @@ py_test(
name = "test_node_provider",
size = "small",
srcs = ["tests/test_node_provider.py"],
tags = ["team:core"],
tags = ["team:core", "exclusive"],
deps = ["//:ray_lib",],
)

py_test(
name = "test_ray_installer",
size = "small",
srcs = ["tests/test_ray_installer.py"],
tags = ["team:core"],
tags = ["team:core", "exclusive"],
deps = ["//:ray_lib",],
)

py_test(
name = "test_instance_launcher",
size = "small",
srcs = ["tests/test_instance_launcher.py"],
tags = ["team:core"],
tags = ["team:core", "exclusive"],
deps = ["//:ray_lib",],
)

py_test(
name = "test_reconciler",
size = "small",
srcs = ["tests/test_reconciler.py"],
tags = ["team:core"],
tags = ["team:core", "exclusive"],
deps = ["//:ray_lib",],
)

py_test(
name = "test_threaded_ray_installer",
size = "small",
srcs = ["tests/test_threaded_ray_installer.py"],
tags = ["team:core"],
tags = ["team:core", "exclusive"],
deps = ["//:ray_lib",],
)

Expand All @@ -90,7 +90,7 @@ py_test(
name = "test_utils",
size = "small",
srcs = ["tests/test_utils.py"],
tags = ["team:core"],
tags = ["team:core", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

Expand Down
40 changes: 40 additions & 0 deletions python/ray/autoscaler/v2/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ray._private.test_utils import run_string_as_driver_nonblocking, wait_for_condition
from ray.autoscaler.v2.sdk import get_cluster_status
from ray.cluster_utils import AutoscalingCluster
from ray.util.placement_group import placement_group, remove_placement_group
from ray.util.state.api import list_placement_groups, list_tasks


Expand Down Expand Up @@ -156,6 +157,45 @@ def pg_created():
cluster.shutdown()


def test_placement_group_removal_idle_node():
# Test that nodes become idle after placement group removal.
cluster = AutoscalingCluster(
head_resources={"CPU": 2},
worker_node_types={
"type-1": {
"resources": {"CPU": 2},
"node_config": {},
"min_workers": 0,
"max_workers": 2,
},
},
)
try:
cluster.start()
ray.init("auto")

# Schedule a pg on nodes
pg = placement_group([{"CPU": 2}] * 3, strategy="STRICT_SPREAD")
ray.get(pg.ready())

time.sleep(2)
remove_placement_group(pg)

time.sleep(1)
from ray.autoscaler.v2.sdk import get_cluster_status

cluster_state = get_cluster_status()

# Verify that nodes are idle.
assert len((cluster_state.healthy_nodes)) == 3
for node in cluster_state.healthy_nodes:
assert node.node_status == "IDLE"
assert node.resource_usage.idle_time_ms >= 1000
finally:
ray.shutdown()
cluster.shutdown()


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
Expand Down
11 changes: 11 additions & 0 deletions src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,9 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) {
available_gpu_instances.end(),
expected_available_gpu_instances.begin()));

ASSERT_FALSE(
resource_scheduler.GetLocalResourceManager().GetResourceIdleTime().has_value());

resource_scheduler.GetLocalResourceManager().AddResourceInstances(
ResourceID::GPU(), allocate_gpu_instances);
available_gpu_instances = resource_scheduler.GetLocalResourceManager()
Expand All @@ -1047,10 +1050,16 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) {
available_gpu_instances.end(),
expected_available_gpu_instances.begin()));

ASSERT_TRUE(
resource_scheduler.GetLocalResourceManager().GetResourceIdleTime().has_value());

allocate_gpu_instances = {1.5, 1.5, .5, 1.5};
std::vector<double> underflow =
resource_scheduler.GetLocalResourceManager().SubtractResourceInstances(
ResourceID::GPU(), allocate_gpu_instances);
ASSERT_FALSE(
resource_scheduler.GetLocalResourceManager().GetResourceIdleTime().has_value());

std::vector<double> expected_underflow{.5, .5, 0., .5};
ASSERT_TRUE(
std::equal(underflow.begin(), underflow.end(), expected_underflow.begin()));
Expand All @@ -1067,6 +1076,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) {
std::vector<double> overflow =
resource_scheduler.GetLocalResourceManager().AddResourceInstances(
ResourceID::GPU(), allocate_gpu_instances);
ASSERT_FALSE(
resource_scheduler.GetLocalResourceManager().GetResourceIdleTime().has_value());
std::vector<double> expected_overflow{.0, .0, .5, 0.};
ASSERT_TRUE(std::equal(overflow.begin(), overflow.end(), expected_overflow.begin()));
available_gpu_instances = resource_scheduler.GetLocalResourceManager()
Expand Down
11 changes: 10 additions & 1 deletion src/ray/raylet/scheduling/local_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void LocalResourceManager::AddLocalResourceInstances(
void LocalResourceManager::DeleteLocalResource(scheduling::ResourceID resource_id) {
local_resources_.available.Remove(resource_id);
local_resources_.total.Remove(resource_id);
resources_last_idle_time_.erase(resource_id);
OnResourceChanged();
}

Expand Down Expand Up @@ -275,13 +276,21 @@ std::vector<double> LocalResourceManager::SubtractResourceInstances(
resource_instances_fp,
local_resources_.available.GetMutable(resource_id),
allow_going_negative);

// If there's any non 0 instance delta to be subtracted, the source should be marked as
// non-idle.
for (const auto &to_subtract_instance : resource_instances_fp) {
if (to_subtract_instance > 0) {
SetResourceNonIdle(resource_id);
break;
}
}
OnResourceChanged();

return FixedPointVectorToDouble(underflow);
}

void LocalResourceManager::SetResourceNonIdle(const scheduling::ResourceID &resource_id) {
// We o
resources_last_idle_time_[resource_id] = absl::nullopt;
}

Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/scheduling/local_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ class LocalResourceManager : public syncer::ReporterInterface {
FRIEND_TEST(ClusterResourceSchedulerTest, TaskResourceInstanceWithHardRequestTest);
FRIEND_TEST(ClusterResourceSchedulerTest, TaskResourceInstanceWithoutCpuUnitTest);
FRIEND_TEST(ClusterResourceSchedulerTest, CustomResourceInstanceTest);
FRIEND_TEST(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest);

friend class LocalResourceManagerTest;
FRIEND_TEST(LocalResourceManagerTest, BasicGetResourceUsageMapTest);
Expand Down

0 comments on commit 8a434b4

Please sign in to comment.