Skip to content

Commit

Permalink
[Core] Introduce fail_on_unavailable option for hard NodeAffinitySche…
Browse files Browse the repository at this point in the history
…dulingStrategy (ray-project#36718)

Add an experimental fail_on_unavailable option to try out application level scheduling

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
  • Loading branch information
jjyao authored and arvind-chandra committed Aug 31, 2023
1 parent 6dedfa8 commit 6be6cb0
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 6 deletions.
2 changes: 2 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3059,6 +3059,8 @@ cdef class CoreWorker:
python_scheduling_strategy.soft)
c_node_affinity_scheduling_strategy[0].set_spill_on_unavailable(
python_scheduling_strategy._spill_on_unavailable)
c_node_affinity_scheduling_strategy[0].set_fail_on_unavailable(
python_scheduling_strategy._fail_on_unavailable)
else:
raise ValueError(
f"Invalid scheduling_strategy value "
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
void set_node_id(const c_string& node_id)
void set_soft(c_bool soft)
void set_spill_on_unavailable(c_bool spill_on_unavailable)
void set_fail_on_unavailable(c_bool fail_on_unavailable)
cdef cppclass CSchedulingStrategy "ray::rpc::SchedulingStrategy":
CSchedulingStrategy()
void clear_scheduling_strategy()
Expand Down
23 changes: 23 additions & 0 deletions python/ray/tests/test_scheduling_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,29 @@ def get_node_id_task(sleep_s=0):
assert target_node_id != soft_node_id


def test_node_affinity_scheduling_strategy_fail_on_unavailable(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)

@ray.remote(num_cpus=1)
class Actor:
def get_node_id(self):
return ray.get_runtime_context().get_node_id()

a1 = Actor.remote()
target_node_id = ray.get(a1.get_node_id.remote())

a2 = Actor.options(
scheduling_strategy=NodeAffinitySchedulingStrategy(
target_node_id, soft=False, _fail_on_unavailable=True
)
).remote()

with pytest.raises(ray.exceptions.ActorUnschedulableError):
ray.get(a2.get_node_id.remote())


@pytest.mark.parametrize("connect_to_client", [True, False])
def test_spread_scheduling_strategy(ray_start_cluster, connect_to_client):
cluster = ray_start_cluster
Expand Down
9 changes: 8 additions & 1 deletion python/ray/util/scheduling_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,21 @@ class NodeAffinitySchedulingStrategy:
or be scheduled somewhere else if soft is True.
"""

def __init__(self, node_id: str, soft: bool, _spill_on_unavailable: bool = False):
def __init__(
self,
node_id: str,
soft: bool,
_spill_on_unavailable: bool = False,
_fail_on_unavailable: bool = False,
):
# This will be removed once we standardize on node id being hex string.
if not isinstance(node_id, str):
node_id = node_id.hex()

self.node_id = node_id
self.soft = soft
self._spill_on_unavailable = _spill_on_unavailable
self._fail_on_unavailable = _fail_on_unavailable


SchedulingStrategyT = Union[
Expand Down
6 changes: 5 additions & 1 deletion src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ inline bool operator==(const ray::rpc::SchedulingStrategy &lhs,
(lhs.node_affinity_scheduling_strategy().soft() ==
rhs.node_affinity_scheduling_strategy().soft()) &&
(lhs.node_affinity_scheduling_strategy().spill_on_unavailable() ==
rhs.node_affinity_scheduling_strategy().spill_on_unavailable());
rhs.node_affinity_scheduling_strategy().spill_on_unavailable()) &&
(lhs.node_affinity_scheduling_strategy().fail_on_unavailable() ==
rhs.node_affinity_scheduling_strategy().fail_on_unavailable());
}
case ray::rpc::SchedulingStrategy::kPlacementGroupSchedulingStrategy: {
return (lhs.placement_group_scheduling_strategy().placement_group_id() ==
Expand Down Expand Up @@ -118,6 +120,8 @@ struct hash<ray::rpc::SchedulingStrategy> {
scheduling_strategy.node_affinity_scheduling_strategy().soft());
hash ^= static_cast<size_t>(
scheduling_strategy.node_affinity_scheduling_strategy().spill_on_unavailable());
hash ^= static_cast<size_t>(
scheduling_strategy.node_affinity_scheduling_strategy().fail_on_unavailable());
} else if (scheduling_strategy.scheduling_strategy_case() ==
ray::rpc::SchedulingStrategy::kPlacementGroupSchedulingStrategy) {
hash ^= std::hash<std::string>()(
Expand Down
1 change: 1 addition & 0 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ message NodeAffinitySchedulingStrategy {
bytes node_id = 1;
bool soft = 2;
bool spill_on_unavailable = 3;
bool fail_on_unavailable = 4;
}

message PlacementGroupSchedulingStrategy {
Expand Down
4 changes: 3 additions & 1 deletion src/ray/raylet/scheduling/cluster_resource_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
scheduling_strategy.node_affinity_scheduling_strategy().node_id(),
scheduling_strategy.node_affinity_scheduling_strategy().soft(),
scheduling_strategy.node_affinity_scheduling_strategy()
.spill_on_unavailable()));
.spill_on_unavailable(),
scheduling_strategy.node_affinity_scheduling_strategy()
.fail_on_unavailable()));
} else if (IsAffinityWithBundleSchedule(scheduling_strategy) &&
!is_local_node_with_raylet_) {
// This scheduling strategy is only used for gcs scheduling for the time being.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ scheduling::NodeID NodeAffinitySchedulingPolicy::Schedule(
scheduling::NodeID target_node_id = scheduling::NodeID(options.node_affinity_node_id);
if (nodes_.contains(target_node_id) && is_node_alive_(target_node_id) &&
nodes_.at(target_node_id).GetLocalView().IsFeasible(resource_request)) {
if (!options.node_affinity_spill_on_unavailable) {
if (!options.node_affinity_spill_on_unavailable &&
!options.node_affinity_fail_on_unavailable) {
return target_node_id;
} else if (nodes_.at(target_node_id).GetLocalView().IsAvailable(resource_request)) {
return target_node_id;
Expand Down
10 changes: 8 additions & 2 deletions src/ray/raylet/scheduling/policy/scheduling_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,21 @@ struct SchedulingOptions {
bool require_node_available,
std::string node_id,
bool soft,
bool spill_on_unavailable = false) {
bool spill_on_unavailable = false,
bool fail_on_unavailable = false) {
if (spill_on_unavailable) {
RAY_CHECK(soft);
RAY_CHECK(soft) << "spill_on_unavailable only works with soft == true";
}
if (fail_on_unavailable) {
RAY_CHECK(!soft) << "fail_on_unavailable only works with soft == false";
}
SchedulingOptions scheduling_options =
Hybrid(avoid_local_node, require_node_available);
scheduling_options.scheduling_type = SchedulingType::NODE_AFFINITY;
scheduling_options.node_affinity_node_id = node_id;
scheduling_options.node_affinity_soft = soft;
scheduling_options.node_affinity_spill_on_unavailable = spill_on_unavailable;
scheduling_options.node_affinity_fail_on_unavailable = fail_on_unavailable;
return scheduling_options;
}

Expand Down Expand Up @@ -165,6 +170,7 @@ struct SchedulingOptions {
std::string node_affinity_node_id;
bool node_affinity_soft = false;
bool node_affinity_spill_on_unavailable = false;
bool node_affinity_fail_on_unavailable = false;
// The node where the task is preferred to be placed. By default, this node id
// is empty, which means no preferred node.
std::string preferred_node_id;
Expand Down
7 changes: 7 additions & 0 deletions src/ray/raylet/scheduling/policy/scheduling_policy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ TEST_F(SchedulingPolicyTest, NodeAffinityPolicyTest) {
// Prefer the specified node even if it's not available right now.
ASSERT_EQ(to_schedule, scheduling::NodeID("unavailable"));

to_schedule = scheduling_policy.Schedule(
req,
SchedulingOptions::NodeAffinity(
false, false, "unavailable", false, false, /*fail_on_unavailable=*/true));
// The task is unschedulable since soft is false and fail_on_unavailable is true.
ASSERT_TRUE(to_schedule.IsNil());

to_schedule = scheduling_policy.Schedule(
req, SchedulingOptions::NodeAffinity(false, false, "unavailable", true, true));
// The task is scheduled somewhere else since soft is true and spill_on_unavailable is
Expand Down

0 comments on commit 6be6cb0

Please sign in to comment.