From 6be6cb0751cd1f9e7e40b091da761ff1c69e8de5 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Fri, 23 Jun 2023 11:32:04 -0700 Subject: [PATCH] [Core] Introduce fail_on_unavailable option for hard NodeAffinitySchedulingStrategy (#36718) Add an experimental fail_on_unavailable option to try out application level scheduling Signed-off-by: Jiajun Yao Signed-off-by: e428265 --- python/ray/_raylet.pyx | 2 ++ python/ray/includes/common.pxd | 1 + python/ray/tests/test_scheduling_2.py | 23 +++++++++++++++++++ python/ray/util/scheduling_strategies.py | 9 +++++++- src/ray/common/task/task_spec.h | 6 ++++- src/ray/protobuf/common.proto | 1 + .../scheduling/cluster_resource_scheduler.cc | 4 +++- .../policy/node_affinity_scheduling_policy.cc | 3 ++- .../scheduling/policy/scheduling_options.h | 10 ++++++-- .../policy/scheduling_policy_test.cc | 7 ++++++ 10 files changed, 60 insertions(+), 6 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 21c6ad9c63b34..5f5ec5e328b02 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3059,6 +3059,8 @@ cdef class CoreWorker: python_scheduling_strategy.soft) c_node_affinity_scheduling_strategy[0].set_spill_on_unavailable( python_scheduling_strategy._spill_on_unavailable) + c_node_affinity_scheduling_strategy[0].set_fail_on_unavailable( + python_scheduling_strategy._fail_on_unavailable) else: raise ValueError( f"Invalid scheduling_strategy value " diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index be97d53ec7f0c..d04a488b892ae 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -178,6 +178,7 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil: void set_node_id(const c_string& node_id) void set_soft(c_bool soft) void set_spill_on_unavailable(c_bool spill_on_unavailable) + void set_fail_on_unavailable(c_bool fail_on_unavailable) cdef cppclass CSchedulingStrategy "ray::rpc::SchedulingStrategy": CSchedulingStrategy() void clear_scheduling_strategy() diff --git a/python/ray/tests/test_scheduling_2.py b/python/ray/tests/test_scheduling_2.py index 5b2d43874c275..983ca6435da72 100644 --- a/python/ray/tests/test_scheduling_2.py +++ b/python/ray/tests/test_scheduling_2.py @@ -472,6 +472,29 @@ def get_node_id_task(sleep_s=0): assert target_node_id != soft_node_id +def test_node_affinity_scheduling_strategy_fail_on_unavailable(ray_start_cluster): + cluster = ray_start_cluster + cluster.add_node(num_cpus=1) + ray.init(address=cluster.address) + + @ray.remote(num_cpus=1) + class Actor: + def get_node_id(self): + return ray.get_runtime_context().get_node_id() + + a1 = Actor.remote() + target_node_id = ray.get(a1.get_node_id.remote()) + + a2 = Actor.options( + scheduling_strategy=NodeAffinitySchedulingStrategy( + target_node_id, soft=False, _fail_on_unavailable=True + ) + ).remote() + + with pytest.raises(ray.exceptions.ActorUnschedulableError): + ray.get(a2.get_node_id.remote()) + + @pytest.mark.parametrize("connect_to_client", [True, False]) def test_spread_scheduling_strategy(ray_start_cluster, connect_to_client): cluster = ray_start_cluster diff --git a/python/ray/util/scheduling_strategies.py b/python/ray/util/scheduling_strategies.py index c966348858c27..a8858cef1958f 100644 --- a/python/ray/util/scheduling_strategies.py +++ b/python/ray/util/scheduling_strategies.py @@ -55,7 +55,13 @@ class NodeAffinitySchedulingStrategy: or be scheduled somewhere else if soft is True. """ - def __init__(self, node_id: str, soft: bool, _spill_on_unavailable: bool = False): + def __init__( + self, + node_id: str, + soft: bool, + _spill_on_unavailable: bool = False, + _fail_on_unavailable: bool = False, + ): # This will be removed once we standardize on node id being hex string. if not isinstance(node_id, str): node_id = node_id.hex() @@ -63,6 +69,7 @@ def __init__(self, node_id: str, soft: bool, _spill_on_unavailable: bool = False self.node_id = node_id self.soft = soft self._spill_on_unavailable = _spill_on_unavailable + self._fail_on_unavailable = _fail_on_unavailable SchedulingStrategyT = Union[ diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index 3d363cb89283a..b7e512d55bc40 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -47,7 +47,9 @@ inline bool operator==(const ray::rpc::SchedulingStrategy &lhs, (lhs.node_affinity_scheduling_strategy().soft() == rhs.node_affinity_scheduling_strategy().soft()) && (lhs.node_affinity_scheduling_strategy().spill_on_unavailable() == - rhs.node_affinity_scheduling_strategy().spill_on_unavailable()); + rhs.node_affinity_scheduling_strategy().spill_on_unavailable()) && + (lhs.node_affinity_scheduling_strategy().fail_on_unavailable() == + rhs.node_affinity_scheduling_strategy().fail_on_unavailable()); } case ray::rpc::SchedulingStrategy::kPlacementGroupSchedulingStrategy: { return (lhs.placement_group_scheduling_strategy().placement_group_id() == @@ -118,6 +120,8 @@ struct hash { scheduling_strategy.node_affinity_scheduling_strategy().soft()); hash ^= static_cast( scheduling_strategy.node_affinity_scheduling_strategy().spill_on_unavailable()); + hash ^= static_cast( + scheduling_strategy.node_affinity_scheduling_strategy().fail_on_unavailable()); } else if (scheduling_strategy.scheduling_strategy_case() == ray::rpc::SchedulingStrategy::kPlacementGroupSchedulingStrategy) { hash ^= std::hash()( diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 70c733c845399..499e5137b8847 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -53,6 +53,7 @@ message NodeAffinitySchedulingStrategy { bytes node_id = 1; bool soft = 2; bool spill_on_unavailable = 3; + bool fail_on_unavailable = 4; } message PlacementGroupSchedulingStrategy { diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 13c78ab425cab..834e367b86da9 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -160,7 +160,9 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( scheduling_strategy.node_affinity_scheduling_strategy().node_id(), scheduling_strategy.node_affinity_scheduling_strategy().soft(), scheduling_strategy.node_affinity_scheduling_strategy() - .spill_on_unavailable())); + .spill_on_unavailable(), + scheduling_strategy.node_affinity_scheduling_strategy() + .fail_on_unavailable())); } else if (IsAffinityWithBundleSchedule(scheduling_strategy) && !is_local_node_with_raylet_) { // This scheduling strategy is only used for gcs scheduling for the time being. diff --git a/src/ray/raylet/scheduling/policy/node_affinity_scheduling_policy.cc b/src/ray/raylet/scheduling/policy/node_affinity_scheduling_policy.cc index 1f948ef070107..737aa33a80f8f 100644 --- a/src/ray/raylet/scheduling/policy/node_affinity_scheduling_policy.cc +++ b/src/ray/raylet/scheduling/policy/node_affinity_scheduling_policy.cc @@ -24,7 +24,8 @@ scheduling::NodeID NodeAffinitySchedulingPolicy::Schedule( scheduling::NodeID target_node_id = scheduling::NodeID(options.node_affinity_node_id); if (nodes_.contains(target_node_id) && is_node_alive_(target_node_id) && nodes_.at(target_node_id).GetLocalView().IsFeasible(resource_request)) { - if (!options.node_affinity_spill_on_unavailable) { + if (!options.node_affinity_spill_on_unavailable && + !options.node_affinity_fail_on_unavailable) { return target_node_id; } else if (nodes_.at(target_node_id).GetLocalView().IsAvailable(resource_request)) { return target_node_id; diff --git a/src/ray/raylet/scheduling/policy/scheduling_options.h b/src/ray/raylet/scheduling/policy/scheduling_options.h index ddfc377694caa..d4d6e7fa31677 100644 --- a/src/ray/raylet/scheduling/policy/scheduling_options.h +++ b/src/ray/raylet/scheduling/policy/scheduling_options.h @@ -76,9 +76,13 @@ struct SchedulingOptions { bool require_node_available, std::string node_id, bool soft, - bool spill_on_unavailable = false) { + bool spill_on_unavailable = false, + bool fail_on_unavailable = false) { if (spill_on_unavailable) { - RAY_CHECK(soft); + RAY_CHECK(soft) << "spill_on_unavailable only works with soft == true"; + } + if (fail_on_unavailable) { + RAY_CHECK(!soft) << "fail_on_unavailable only works with soft == false"; } SchedulingOptions scheduling_options = Hybrid(avoid_local_node, require_node_available); @@ -86,6 +90,7 @@ struct SchedulingOptions { scheduling_options.node_affinity_node_id = node_id; scheduling_options.node_affinity_soft = soft; scheduling_options.node_affinity_spill_on_unavailable = spill_on_unavailable; + scheduling_options.node_affinity_fail_on_unavailable = fail_on_unavailable; return scheduling_options; } @@ -165,6 +170,7 @@ struct SchedulingOptions { std::string node_affinity_node_id; bool node_affinity_soft = false; bool node_affinity_spill_on_unavailable = false; + bool node_affinity_fail_on_unavailable = false; // The node where the task is preferred to be placed. By default, this node id // is empty, which means no preferred node. std::string preferred_node_id; diff --git a/src/ray/raylet/scheduling/policy/scheduling_policy_test.cc b/src/ray/raylet/scheduling/policy/scheduling_policy_test.cc index 56a56a3317ae4..8065d02254d5b 100644 --- a/src/ray/raylet/scheduling/policy/scheduling_policy_test.cc +++ b/src/ray/raylet/scheduling/policy/scheduling_policy_test.cc @@ -103,6 +103,13 @@ TEST_F(SchedulingPolicyTest, NodeAffinityPolicyTest) { // Prefer the specified node even if it's not available right now. ASSERT_EQ(to_schedule, scheduling::NodeID("unavailable")); + to_schedule = scheduling_policy.Schedule( + req, + SchedulingOptions::NodeAffinity( + false, false, "unavailable", false, false, /*fail_on_unavailable=*/true)); + // The task is unschedulable since soft is false and fail_on_unavailable is true. + ASSERT_TRUE(to_schedule.IsNil()); + to_schedule = scheduling_policy.Schedule( req, SchedulingOptions::NodeAffinity(false, false, "unavailable", true, true)); // The task is scheduled somewhere else since soft is true and spill_on_unavailable is