Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Introduce fail_on_unavailable option for hard NodeAffinitySchedulingStrategy #36718

Merged
merged 2 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3121,6 +3121,8 @@ cdef class CoreWorker:
python_scheduling_strategy.soft)
c_node_affinity_scheduling_strategy[0].set_spill_on_unavailable(
python_scheduling_strategy._spill_on_unavailable)
c_node_affinity_scheduling_strategy[0].set_fail_on_unavailable(
python_scheduling_strategy._fail_on_unavailable)
else:
raise ValueError(
f"Invalid scheduling_strategy value "
Expand Down
1 change: 1 addition & 0 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ cdef extern from "src/ray/protobuf/common.pb.h" nogil:
void set_node_id(const c_string& node_id)
void set_soft(c_bool soft)
void set_spill_on_unavailable(c_bool spill_on_unavailable)
void set_fail_on_unavailable(c_bool fail_on_unavailable)
cdef cppclass CSchedulingStrategy "ray::rpc::SchedulingStrategy":
CSchedulingStrategy()
void clear_scheduling_strategy()
Expand Down
23 changes: 23 additions & 0 deletions python/ray/tests/test_scheduling_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,29 @@ def get_node_id_task(sleep_s=0):
assert target_node_id != soft_node_id


def test_node_affinity_scheduling_strategy_fail_on_unavailable(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)

@ray.remote(num_cpus=1)
class Actor:
def get_node_id(self):
return ray.get_runtime_context().get_node_id()

a1 = Actor.remote()
target_node_id = ray.get(a1.get_node_id.remote())

a2 = Actor.options(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is all the combination tested actually? IIUC, the behavior is


spill: True fail: True
 -> makes no sense (maybe raise an exception?)

spill:True fail:False

-> spill to other node if other node is available

spill: False fail:True
-> fail if the node is not available

spill:False fail:False
not scheduled until the node is available

can you make sure all these scenarios are tested?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently invalid combinations will check failure since these are private options now and not used by users. Once we make them public, we need to throw proper exceptions. All the valid combinations are tested.

scheduling_strategy=NodeAffinitySchedulingStrategy(
target_node_id, soft=False, _fail_on_unavailable=True
)
).remote()

with pytest.raises(ray.exceptions.ActorUnschedulableError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a better error message (and a test) in this case? I think it'd be great the exception contains a message like the task couldn't be scheduled, and _fail_on_unavailable is set to true?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think we should if we make it public. For now, I think it's fine to not have an error message since it's private and I will just use it in serve and I don't need to know the error message.

ray.get(a2.get_node_id.remote())


@pytest.mark.parametrize("connect_to_client", [True, False])
def test_spread_scheduling_strategy(ray_start_cluster, connect_to_client):
cluster = ray_start_cluster
Expand Down
9 changes: 8 additions & 1 deletion python/ray/util/scheduling_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,21 @@ class NodeAffinitySchedulingStrategy:
or be scheduled somewhere else if soft is True.
"""

def __init__(self, node_id: str, soft: bool, _spill_on_unavailable: bool = False):
def __init__(
self,
node_id: str,
soft: bool,
_spill_on_unavailable: bool = False,
_fail_on_unavailable: bool = False,
):
# This will be removed once we standardize on node id being hex string.
if not isinstance(node_id, str):
node_id = node_id.hex()

self.node_id = node_id
self.soft = soft
self._spill_on_unavailable = _spill_on_unavailable
self._fail_on_unavailable = _fail_on_unavailable


SchedulingStrategyT = Union[
Expand Down
6 changes: 5 additions & 1 deletion src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ inline bool operator==(const ray::rpc::SchedulingStrategy &lhs,
(lhs.node_affinity_scheduling_strategy().soft() ==
rhs.node_affinity_scheduling_strategy().soft()) &&
(lhs.node_affinity_scheduling_strategy().spill_on_unavailable() ==
rhs.node_affinity_scheduling_strategy().spill_on_unavailable());
rhs.node_affinity_scheduling_strategy().spill_on_unavailable()) &&
(lhs.node_affinity_scheduling_strategy().fail_on_unavailable() ==
rhs.node_affinity_scheduling_strategy().fail_on_unavailable());
}
case ray::rpc::SchedulingStrategy::kPlacementGroupSchedulingStrategy: {
return (lhs.placement_group_scheduling_strategy().placement_group_id() ==
Expand Down Expand Up @@ -118,6 +120,8 @@ struct hash<ray::rpc::SchedulingStrategy> {
scheduling_strategy.node_affinity_scheduling_strategy().soft());
hash ^= static_cast<size_t>(
scheduling_strategy.node_affinity_scheduling_strategy().spill_on_unavailable());
hash ^= static_cast<size_t>(
scheduling_strategy.node_affinity_scheduling_strategy().fail_on_unavailable());
} else if (scheduling_strategy.scheduling_strategy_case() ==
ray::rpc::SchedulingStrategy::kPlacementGroupSchedulingStrategy) {
hash ^= std::hash<std::string>()(
Expand Down
1 change: 1 addition & 0 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ message NodeAffinitySchedulingStrategy {
bytes node_id = 1;
bool soft = 2;
bool spill_on_unavailable = 3;
bool fail_on_unavailable = 4;
}

message PlacementGroupSchedulingStrategy {
Expand Down
4 changes: 3 additions & 1 deletion src/ray/raylet/scheduling/cluster_resource_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
scheduling_strategy.node_affinity_scheduling_strategy().node_id(),
scheduling_strategy.node_affinity_scheduling_strategy().soft(),
scheduling_strategy.node_affinity_scheduling_strategy()
.spill_on_unavailable()));
.spill_on_unavailable(),
scheduling_strategy.node_affinity_scheduling_strategy()
.fail_on_unavailable()));
} else if (IsAffinityWithBundleSchedule(scheduling_strategy) &&
!is_local_node_with_raylet_) {
// This scheduling strategy is only used for gcs scheduling for the time being.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ scheduling::NodeID NodeAffinitySchedulingPolicy::Schedule(
scheduling::NodeID target_node_id = scheduling::NodeID(options.node_affinity_node_id);
if (nodes_.contains(target_node_id) && is_node_alive_(target_node_id) &&
nodes_.at(target_node_id).GetLocalView().IsFeasible(resource_request)) {
if (!options.node_affinity_spill_on_unavailable) {
if (!options.node_affinity_spill_on_unavailable &&
!options.node_affinity_fail_on_unavailable) {
return target_node_id;
} else if (nodes_.at(target_node_id).GetLocalView().IsAvailable(resource_request)) {
return target_node_id;
Expand Down
10 changes: 8 additions & 2 deletions src/ray/raylet/scheduling/policy/scheduling_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,21 @@ struct SchedulingOptions {
bool require_node_available,
std::string node_id,
bool soft,
bool spill_on_unavailable = false) {
bool spill_on_unavailable = false,
bool fail_on_unavailable = false) {
if (spill_on_unavailable) {
RAY_CHECK(soft);
RAY_CHECK(soft) << "spill_on_unavailable only works with soft == true";
}
if (fail_on_unavailable) {
RAY_CHECK(!soft) << "fail_on_unavailable only works with soft == false";
}
SchedulingOptions scheduling_options =
Hybrid(avoid_local_node, require_node_available);
scheduling_options.scheduling_type = SchedulingType::NODE_AFFINITY;
scheduling_options.node_affinity_node_id = node_id;
scheduling_options.node_affinity_soft = soft;
scheduling_options.node_affinity_spill_on_unavailable = spill_on_unavailable;
scheduling_options.node_affinity_fail_on_unavailable = fail_on_unavailable;
return scheduling_options;
}

Expand Down Expand Up @@ -165,6 +170,7 @@ struct SchedulingOptions {
std::string node_affinity_node_id;
bool node_affinity_soft = false;
bool node_affinity_spill_on_unavailable = false;
bool node_affinity_fail_on_unavailable = false;
// The node where the task is preferred to be placed. By default, this node id
// is empty, which means no preferred node.
std::string preferred_node_id;
Expand Down
7 changes: 7 additions & 0 deletions src/ray/raylet/scheduling/policy/scheduling_policy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ TEST_F(SchedulingPolicyTest, NodeAffinityPolicyTest) {
// Prefer the specified node even if it's not available right now.
ASSERT_EQ(to_schedule, scheduling::NodeID("unavailable"));

to_schedule = scheduling_policy.Schedule(
req,
SchedulingOptions::NodeAffinity(
false, false, "unavailable", false, false, /*fail_on_unavailable=*/true));
// The task is unschedulable since soft is false and fail_on_unavailable is true.
ASSERT_TRUE(to_schedule.IsNil());

to_schedule = scheduling_policy.Schedule(
req, SchedulingOptions::NodeAffinity(false, false, "unavailable", true, true));
// The task is scheduled somewhere else since soft is true and spill_on_unavailable is
Expand Down