Skip to content

Commit

Permalink
[Core] Support labels for ray.remote (ray-project#48715)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: hjiang <dentinyhao@gmail.com>
  • Loading branch information
jjyao authored and dentiny committed Dec 7, 2024
1 parent e2fe6c7 commit 95e5c4d
Show file tree
Hide file tree
Showing 18 changed files with 159 additions and 46 deletions.
1 change: 1 addition & 0 deletions python/ray/_private/ray_option_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def _validate_resources(resources: Optional[Dict[str, float]]) -> Optional[str]:
),
"_metadata": Option((dict, type(None))),
"enable_task_events": Option(bool, default_value=True),
"_labels": Option((dict, type(None))),
}


Expand Down
2 changes: 1 addition & 1 deletion python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3549,7 +3549,7 @@ def method(self):
for more details.
_metadata: Extended options for Ray libraries. For example,
_metadata={"workflows.io/options": <workflow options>} for Ray workflows.
_labels: The key-value labels of a task or actor.
"""
# "callable" returns true for both function and class.
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
Expand Down
36 changes: 31 additions & 5 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -737,11 +737,26 @@ cdef class Language:
JAVA = Language.from_native(LANGUAGE_JAVA)


cdef int prepare_labels(
dict label_dict,
unordered_map[c_string, c_string] *label_map) except -1:

if label_dict is None:
return 0

for key, value in label_dict.items():
if not isinstance(key, str):
raise ValueError(f"Label key must be string, but got {type(key)}")
if not isinstance(value, str):
raise ValueError(f"Label value must be string, but got {type(value)}")
label_map[0][key.encode("utf-8")] = value.encode("utf-8")

return 0

cdef int prepare_resources(
dict resource_dict,
unordered_map[c_string, double] *resource_map) except -1:
cdef:
unordered_map[c_string, double] out
c_string resource_name
list unit_resources

Expand Down Expand Up @@ -4009,10 +4024,12 @@ cdef class CoreWorker:
c_string debugger_breakpoint,
c_string serialized_runtime_env_info,
int64_t generator_backpressure_num_objects,
c_bool enable_task_events
c_bool enable_task_events,
labels,
):
cdef:
unordered_map[c_string, double] c_resources
unordered_map[c_string, c_string] c_labels
CRayFunction ray_function
CTaskOptions task_options
c_vector[unique_ptr[CTaskArg]] args_vector
Expand All @@ -4032,6 +4049,7 @@ cdef class CoreWorker:

with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
prepare_labels(labels, &c_labels)
ray_function = CRayFunction(
language.lang, function_descriptor.descriptor)
prepare_args_and_increment_put_refs(
Expand All @@ -4043,7 +4061,9 @@ cdef class CoreWorker:
b"",
generator_backpressure_num_objects,
serialized_runtime_env_info,
enable_task_events)
enable_task_events,
c_labels,
)

current_c_task_id = current_task.native()

Expand Down Expand Up @@ -4089,6 +4109,7 @@ cdef class CoreWorker:
int32_t max_pending_calls,
scheduling_strategy,
c_bool enable_task_events,
labels,
):
cdef:
CRayFunction ray_function
Expand All @@ -4101,13 +4122,15 @@ cdef class CoreWorker:
CSchedulingStrategy c_scheduling_strategy
c_vector[CObjectID] incremented_put_arg_ids
optional[c_bool] is_detached_optional = nullopt
unordered_map[c_string, c_string] c_labels

self.python_scheduling_strategy_to_c(
scheduling_strategy, &c_scheduling_strategy)

with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
prepare_resources(placement_resources, &c_placement_resources)
prepare_labels(labels, &c_labels)
ray_function = CRayFunction(
language.lang, function_descriptor.descriptor)
prepare_args_and_increment_put_refs(
Expand Down Expand Up @@ -4136,7 +4159,8 @@ cdef class CoreWorker:
# async or threaded actors.
is_asyncio or max_concurrency > 1,
max_pending_calls,
enable_task_events),
enable_task_events,
c_labels),
extension_data,
&c_actor_id)

Expand Down Expand Up @@ -4247,6 +4271,7 @@ cdef class CoreWorker:
TaskID current_task = self.get_current_task_id()
c_string serialized_retry_exception_allowlist
c_string serialized_runtime_env = b"{}"
unordered_map[c_string, c_string] c_labels

serialized_retry_exception_allowlist = serialize_retry_exception_allowlist(
retry_exception_allowlist,
Expand Down Expand Up @@ -4275,7 +4300,8 @@ cdef class CoreWorker:
concurrency_group_name,
generator_backpressure_num_objects,
serialized_runtime_env,
enable_task_events),
enable_task_events,
c_labels),
max_retries,
retry_exceptions,
serialized_retry_exception_allowlist,
Expand Down
2 changes: 2 additions & 0 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,7 @@ def _remote(self, args=None, kwargs=None, **actor_options):
scheduling_strategy: Strategy about how to schedule this actor.
enable_task_events: True if tracing is enabled, i.e., task events from
the actor should be reported. Defaults to True.
_labels: The key-value labels of the actor.
Returns:
A handle to the newly created actor.
Expand Down Expand Up @@ -1197,6 +1198,7 @@ def _remote(self, args=None, kwargs=None, **actor_options):
max_pending_calls=max_pending_calls,
scheduling_strategy=scheduling_strategy,
enable_task_events=enable_task_events,
labels=actor_options.get("_labels"),
)

if _actor_launch_hook:
Expand Down
6 changes: 4 additions & 2 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ cdef extern from "ray/core_worker/common.h" nogil:
unordered_map[c_string, double] &resources,
c_string concurrency_group_name,
int64_t generator_backpressure_num_objects,
c_string serialized_runtime_env, c_bool enable_task_events)
c_string serialized_runtime_env, c_bool enable_task_events,
const unordered_map[c_string, c_string] &labels)

cdef cppclass CActorCreationOptions "ray::core::ActorCreationOptions":
CActorCreationOptions()
Expand All @@ -347,7 +348,8 @@ cdef extern from "ray/core_worker/common.h" nogil:
const c_vector[CConcurrencyGroup] &concurrency_groups,
c_bool execute_out_of_order,
int32_t max_pending_calls,
c_bool enable_task_events)
c_bool enable_task_events,
const unordered_map[c_string, c_string] &labels)

cdef cppclass CPlacementGroupCreationOptions \
"ray::core::PlacementGroupCreationOptions":
Expand Down
3 changes: 3 additions & 0 deletions python/ray/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ def options(self, **task_options):
_metadata: Extended options for Ray libraries. For example,
_metadata={"workflows.io/options": <workflow options>} for
Ray workflows.
_labels: The key-value labels of a task.
Examples:
Expand Down Expand Up @@ -450,6 +451,7 @@ def _remote(

# Override enable_task_events to default for actor if not specified (i.e. None)
enable_task_events = task_options.get("enable_task_events")
labels = task_options.get("_labels")

def invocation(args, kwargs):
if self._is_cross_language:
Expand Down Expand Up @@ -480,6 +482,7 @@ def invocation(args, kwargs):
serialized_runtime_env_info or "{}",
generator_backpressure_num_objects,
enable_task_events,
labels,
)
# Reset worker's debug context from the last "remote" command
# (which applies only to this .remote call).
Expand Down
58 changes: 46 additions & 12 deletions python/ray/tests/test_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ def test_internal_get_local_ongoing_lineage_reconstruction_tasks(
ray_start_cluster_enabled,
):
cluster = ray_start_cluster_enabled
cluster.add_node(resources={"head": 1})
cluster.add_node(resources={"head": 2})
ray.init(address=cluster.address)
worker1 = cluster.add_node(resources={"worker": 1})
worker1 = cluster.add_node(resources={"worker": 2})

@ray.remote(resources={"head": 1})
@ray.remote(num_cpus=0, resources={"head": 1})
class Counter:
def __init__(self):
self.count = 0
Expand All @@ -138,18 +138,41 @@ def inc(self):
self.count = self.count + 1
return self.count

@ray.remote(max_retries=-1, num_cpus=0, resources={"worker": 1})
@ray.remote(
max_retries=-1, num_cpus=0, resources={"worker": 1}, _labels={"key1": "value1"}
)
def task(counter):
count = ray.get(counter.inc.remote())
if count > 1:
# lineage reconstruction
time.sleep(100000)
return [1] * 1024 * 1024

counter = Counter.remote()
obj = task.remote(counter)
@ray.remote(
max_restarts=-1,
max_task_retries=-1,
num_cpus=0,
resources={"worker": 1},
_labels={"key2": "value2"},
)
class Actor:
def run(self, counter):
count = ray.get(counter.inc.remote())
if count > 1:
# lineage reconstruction
time.sleep(100000)
return [1] * 1024 * 1024

counter1 = Counter.remote()
obj1 = task.remote(counter1)
# Wait for task to finish
ray.wait([obj], fetch_local=False)
ray.wait([obj1], fetch_local=False)

counter2 = Counter.remote()
actor = Actor.remote()
obj2 = actor.run.remote(counter2)
# Wait for actor task to finish
ray.wait([obj2], fetch_local=False)

assert len(get_local_ongoing_lineage_reconstruction_tasks()) == 0

Expand All @@ -158,16 +181,27 @@ def task(counter):

def verify(expected_task_status):
lineage_reconstruction_tasks = get_local_ongoing_lineage_reconstruction_tasks()
return (
len(lineage_reconstruction_tasks) == 1
and lineage_reconstruction_tasks[0][0].name == "task"
and lineage_reconstruction_tasks[0][0].resources == {"worker": 1.0}
lineage_reconstruction_tasks.sort(key=lambda task: task[0].name)
assert len(lineage_reconstruction_tasks) == 2
assert [
lineage_reconstruction_tasks[0][0].name,
lineage_reconstruction_tasks[1][0].name,
] == ["Actor.run", "task"]
assert (
lineage_reconstruction_tasks[0][0].labels == {"key2": "value2"}
and lineage_reconstruction_tasks[0][0].status == expected_task_status
and lineage_reconstruction_tasks[0][1] == 1
)
assert (
lineage_reconstruction_tasks[1][0].labels == {"key1": "value1"}
and lineage_reconstruction_tasks[1][0].status == expected_task_status
and lineage_reconstruction_tasks[1][1] == 1
)

return True

wait_for_condition(lambda: verify(common_pb2.TaskStatus.PENDING_NODE_ASSIGNMENT))
cluster.add_node(resources={"worker": 1})
cluster.add_node(resources={"worker": 2})
wait_for_condition(lambda: verify(common_pb2.TaskStatus.SUBMITTED_TO_WORKER))


Expand Down
4 changes: 3 additions & 1 deletion src/ray/common/task/task_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ class TaskSpecBuilder {
const TaskID &submitter_task_id,
const std::shared_ptr<rpc::RuntimeEnvInfo> runtime_env_info = nullptr,
const std::string &concurrency_group_name = "",
bool enable_task_events = true) {
bool enable_task_events = true,
const std::unordered_map<std::string, std::string> &labels = {}) {
message_->set_type(TaskType::NORMAL_TASK);
message_->set_name(name);
message_->set_language(language);
Expand Down Expand Up @@ -165,6 +166,7 @@ class TaskSpecBuilder {
}
message_->set_concurrency_group_name(concurrency_group_name);
message_->set_enable_task_events(enable_task_events);
message_->mutable_labels()->insert(labels.begin(), labels.end());
return *this;
}

Expand Down
11 changes: 8 additions & 3 deletions src/ray/core_worker/actor_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ rpc::ActorHandle CreateInnerActorHandle(
const std::string &ray_namespace,
int32_t max_pending_calls,
bool execute_out_of_order,
absl::optional<bool> enable_task_events) {
absl::optional<bool> enable_task_events,
const std::unordered_map<std::string, std::string> &labels) {
rpc::ActorHandle inner;
inner.set_actor_id(actor_id.Data(), actor_id.Size());
inner.set_owner_id(owner_id.Binary());
Expand All @@ -50,6 +51,7 @@ rpc::ActorHandle CreateInnerActorHandle(
inner.set_execute_out_of_order(execute_out_of_order);
inner.set_max_pending_calls(max_pending_calls);
inner.set_enable_task_events(enable_task_events.value_or(kDefaultTaskEventEnabled));
inner.mutable_labels()->insert(labels.begin(), labels.end());
return inner;
}

Expand Down Expand Up @@ -82,6 +84,7 @@ rpc::ActorHandle CreateInnerActorHandleFromActorData(
inner.set_execute_out_of_order(
task_spec.actor_creation_task_spec().execute_out_of_order());
inner.set_max_pending_calls(task_spec.actor_creation_task_spec().max_pending_calls());
inner.mutable_labels()->insert(task_spec.labels().begin(), task_spec.labels().end());
return inner;
}
} // namespace
Expand All @@ -100,7 +103,8 @@ ActorHandle::ActorHandle(
const std::string &ray_namespace,
int32_t max_pending_calls,
bool execute_out_of_order,
absl::optional<bool> enable_task_events)
absl::optional<bool> enable_task_events,
const std::unordered_map<std::string, std::string> &labels)
: ActorHandle(CreateInnerActorHandle(actor_id,
owner_id,
owner_address,
Expand All @@ -114,7 +118,8 @@ ActorHandle::ActorHandle(
ray_namespace,
max_pending_calls,
execute_out_of_order,
enable_task_events)) {}
enable_task_events,
labels)) {}

ActorHandle::ActorHandle(const std::string &serialized)
: ActorHandle(CreateInnerActorHandleFromString(serialized)) {}
Expand Down
7 changes: 6 additions & 1 deletion src/ray/core_worker/actor_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class ActorHandle {
const std::string &ray_namespace,
int32_t max_pending_calls,
bool execute_out_of_order = false,
absl::optional<bool> enable_task_events = absl::nullopt);
absl::optional<bool> enable_task_events = absl::nullopt,
const std::unordered_map<std::string, std::string> &labels = {});

/// Constructs an ActorHandle from a serialized string.
explicit ActorHandle(const std::string &serialized);
Expand Down Expand Up @@ -105,6 +106,10 @@ class ActorHandle {

bool ExecuteOutOfOrder() const { return inner_.execute_out_of_order(); }

const ::google::protobuf::Map<std::string, std::string> &GetLabels() const {
return inner_.labels();
}

private:
// Protobuf-defined persistent state of the actor handle.
const rpc::ActorHandle inner_;
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "ray/core_worker/actor_creator.h"
#include "ray/core_worker/actor_handle.h"
#include "ray/core_worker/reference_count.h"
#include "ray/core_worker/transport/actor_task_submitter.h"
#include "ray/core_worker/transport/task_receiver.h"
#include "ray/gcs/gcs_client/gcs_client.h"
namespace ray {
Expand Down
Loading

0 comments on commit 95e5c4d

Please sign in to comment.