From 95e5c4d7a44279923e344d9507610742a3047999 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Tue, 19 Nov 2024 07:14:55 -0800 Subject: [PATCH] [Core] Support labels for ray.remote (#48715) Signed-off-by: Jiajun Yao Signed-off-by: hjiang --- python/ray/_private/ray_option_utils.py | 1 + python/ray/_private/worker.py | 2 +- python/ray/_raylet.pyx | 36 ++++++++++++--- python/ray/actor.py | 2 + python/ray/includes/common.pxd | 6 ++- python/ray/remote_function.py | 3 ++ python/ray/tests/test_advanced.py | 58 ++++++++++++++++++++----- src/ray/common/task/task_util.h | 4 +- src/ray/core_worker/actor_handle.cc | 11 +++-- src/ray/core_worker/actor_handle.h | 7 ++- src/ray/core_worker/actor_manager.h | 1 + src/ray/core_worker/common.h | 22 ++++++---- src/ray/core_worker/core_worker.cc | 17 +++++--- src/ray/core_worker/core_worker.h | 3 +- src/ray/core_worker/task_manager.cc | 15 +++++-- src/ray/core_worker/task_manager.h | 4 +- src/ray/protobuf/common.proto | 10 ++++- src/ray/protobuf/core_worker.proto | 3 ++ 18 files changed, 159 insertions(+), 46 deletions(-) diff --git a/python/ray/_private/ray_option_utils.py b/python/ray/_private/ray_option_utils.py index 91345e5364467..61c898aff8c4c 100644 --- a/python/ray/_private/ray_option_utils.py +++ b/python/ray/_private/ray_option_utils.py @@ -147,6 +147,7 @@ def _validate_resources(resources: Optional[Dict[str, float]]) -> Optional[str]: ), "_metadata": Option((dict, type(None))), "enable_task_events": Option(bool, default_value=True), + "_labels": Option((dict, type(None))), } diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 118c556ec9668..e62375c5e5239 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -3549,7 +3549,7 @@ def method(self): for more details. _metadata: Extended options for Ray libraries. For example, _metadata={"workflows.io/options": } for Ray workflows. - + _labels: The key-value labels of a task or actor. """ # "callable" returns true for both function and class. if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 3ddf101189dc1..944adba8fc330 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -737,11 +737,26 @@ cdef class Language: JAVA = Language.from_native(LANGUAGE_JAVA) +cdef int prepare_labels( + dict label_dict, + unordered_map[c_string, c_string] *label_map) except -1: + + if label_dict is None: + return 0 + + for key, value in label_dict.items(): + if not isinstance(key, str): + raise ValueError(f"Label key must be string, but got {type(key)}") + if not isinstance(value, str): + raise ValueError(f"Label value must be string, but got {type(value)}") + label_map[0][key.encode("utf-8")] = value.encode("utf-8") + + return 0 + cdef int prepare_resources( dict resource_dict, unordered_map[c_string, double] *resource_map) except -1: cdef: - unordered_map[c_string, double] out c_string resource_name list unit_resources @@ -4009,10 +4024,12 @@ cdef class CoreWorker: c_string debugger_breakpoint, c_string serialized_runtime_env_info, int64_t generator_backpressure_num_objects, - c_bool enable_task_events + c_bool enable_task_events, + labels, ): cdef: unordered_map[c_string, double] c_resources + unordered_map[c_string, c_string] c_labels CRayFunction ray_function CTaskOptions task_options c_vector[unique_ptr[CTaskArg]] args_vector @@ -4032,6 +4049,7 @@ cdef class CoreWorker: with self.profile_event(b"submit_task"): prepare_resources(resources, &c_resources) + prepare_labels(labels, &c_labels) ray_function = CRayFunction( language.lang, function_descriptor.descriptor) prepare_args_and_increment_put_refs( @@ -4043,7 +4061,9 @@ cdef class CoreWorker: b"", generator_backpressure_num_objects, serialized_runtime_env_info, - enable_task_events) + enable_task_events, + c_labels, + ) current_c_task_id = current_task.native() @@ -4089,6 +4109,7 @@ cdef class CoreWorker: int32_t max_pending_calls, scheduling_strategy, c_bool enable_task_events, + labels, ): cdef: CRayFunction ray_function @@ -4101,6 +4122,7 @@ cdef class CoreWorker: CSchedulingStrategy c_scheduling_strategy c_vector[CObjectID] incremented_put_arg_ids optional[c_bool] is_detached_optional = nullopt + unordered_map[c_string, c_string] c_labels self.python_scheduling_strategy_to_c( scheduling_strategy, &c_scheduling_strategy) @@ -4108,6 +4130,7 @@ cdef class CoreWorker: with self.profile_event(b"submit_task"): prepare_resources(resources, &c_resources) prepare_resources(placement_resources, &c_placement_resources) + prepare_labels(labels, &c_labels) ray_function = CRayFunction( language.lang, function_descriptor.descriptor) prepare_args_and_increment_put_refs( @@ -4136,7 +4159,8 @@ cdef class CoreWorker: # async or threaded actors. is_asyncio or max_concurrency > 1, max_pending_calls, - enable_task_events), + enable_task_events, + c_labels), extension_data, &c_actor_id) @@ -4247,6 +4271,7 @@ cdef class CoreWorker: TaskID current_task = self.get_current_task_id() c_string serialized_retry_exception_allowlist c_string serialized_runtime_env = b"{}" + unordered_map[c_string, c_string] c_labels serialized_retry_exception_allowlist = serialize_retry_exception_allowlist( retry_exception_allowlist, @@ -4275,7 +4300,8 @@ cdef class CoreWorker: concurrency_group_name, generator_backpressure_num_objects, serialized_runtime_env, - enable_task_events), + enable_task_events, + c_labels), max_retries, retry_exceptions, serialized_retry_exception_allowlist, diff --git a/python/ray/actor.py b/python/ray/actor.py index 222f52c24b5f1..824de9efad734 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -924,6 +924,7 @@ def _remote(self, args=None, kwargs=None, **actor_options): scheduling_strategy: Strategy about how to schedule this actor. enable_task_events: True if tracing is enabled, i.e., task events from the actor should be reported. Defaults to True. + _labels: The key-value labels of the actor. Returns: A handle to the newly created actor. @@ -1197,6 +1198,7 @@ def _remote(self, args=None, kwargs=None, **actor_options): max_pending_calls=max_pending_calls, scheduling_strategy=scheduling_strategy, enable_task_events=enable_task_events, + labels=actor_options.get("_labels"), ) if _actor_launch_hook: diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 7d4b6ece9e7ab..f5c6d4655ac99 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -329,7 +329,8 @@ cdef extern from "ray/core_worker/common.h" nogil: unordered_map[c_string, double] &resources, c_string concurrency_group_name, int64_t generator_backpressure_num_objects, - c_string serialized_runtime_env, c_bool enable_task_events) + c_string serialized_runtime_env, c_bool enable_task_events, + const unordered_map[c_string, c_string] &labels) cdef cppclass CActorCreationOptions "ray::core::ActorCreationOptions": CActorCreationOptions() @@ -347,7 +348,8 @@ cdef extern from "ray/core_worker/common.h" nogil: const c_vector[CConcurrencyGroup] &concurrency_groups, c_bool execute_out_of_order, int32_t max_pending_calls, - c_bool enable_task_events) + c_bool enable_task_events, + const unordered_map[c_string, c_string] &labels) cdef cppclass CPlacementGroupCreationOptions \ "ray::core::PlacementGroupCreationOptions": diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 7c910c2ba068a..b44eae3d84ce6 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -239,6 +239,7 @@ def options(self, **task_options): _metadata: Extended options for Ray libraries. For example, _metadata={"workflows.io/options": } for Ray workflows. + _labels: The key-value labels of a task. Examples: @@ -450,6 +451,7 @@ def _remote( # Override enable_task_events to default for actor if not specified (i.e. None) enable_task_events = task_options.get("enable_task_events") + labels = task_options.get("_labels") def invocation(args, kwargs): if self._is_cross_language: @@ -480,6 +482,7 @@ def invocation(args, kwargs): serialized_runtime_env_info or "{}", generator_backpressure_num_objects, enable_task_events, + labels, ) # Reset worker's debug context from the last "remote" command # (which applies only to this .remote call). diff --git a/python/ray/tests/test_advanced.py b/python/ray/tests/test_advanced.py index a3cc669c8c90b..8642099b042be 100644 --- a/python/ray/tests/test_advanced.py +++ b/python/ray/tests/test_advanced.py @@ -125,11 +125,11 @@ def test_internal_get_local_ongoing_lineage_reconstruction_tasks( ray_start_cluster_enabled, ): cluster = ray_start_cluster_enabled - cluster.add_node(resources={"head": 1}) + cluster.add_node(resources={"head": 2}) ray.init(address=cluster.address) - worker1 = cluster.add_node(resources={"worker": 1}) + worker1 = cluster.add_node(resources={"worker": 2}) - @ray.remote(resources={"head": 1}) + @ray.remote(num_cpus=0, resources={"head": 1}) class Counter: def __init__(self): self.count = 0 @@ -138,7 +138,9 @@ def inc(self): self.count = self.count + 1 return self.count - @ray.remote(max_retries=-1, num_cpus=0, resources={"worker": 1}) + @ray.remote( + max_retries=-1, num_cpus=0, resources={"worker": 1}, _labels={"key1": "value1"} + ) def task(counter): count = ray.get(counter.inc.remote()) if count > 1: @@ -146,10 +148,31 @@ def task(counter): time.sleep(100000) return [1] * 1024 * 1024 - counter = Counter.remote() - obj = task.remote(counter) + @ray.remote( + max_restarts=-1, + max_task_retries=-1, + num_cpus=0, + resources={"worker": 1}, + _labels={"key2": "value2"}, + ) + class Actor: + def run(self, counter): + count = ray.get(counter.inc.remote()) + if count > 1: + # lineage reconstruction + time.sleep(100000) + return [1] * 1024 * 1024 + + counter1 = Counter.remote() + obj1 = task.remote(counter1) # Wait for task to finish - ray.wait([obj], fetch_local=False) + ray.wait([obj1], fetch_local=False) + + counter2 = Counter.remote() + actor = Actor.remote() + obj2 = actor.run.remote(counter2) + # Wait for actor task to finish + ray.wait([obj2], fetch_local=False) assert len(get_local_ongoing_lineage_reconstruction_tasks()) == 0 @@ -158,16 +181,27 @@ def task(counter): def verify(expected_task_status): lineage_reconstruction_tasks = get_local_ongoing_lineage_reconstruction_tasks() - return ( - len(lineage_reconstruction_tasks) == 1 - and lineage_reconstruction_tasks[0][0].name == "task" - and lineage_reconstruction_tasks[0][0].resources == {"worker": 1.0} + lineage_reconstruction_tasks.sort(key=lambda task: task[0].name) + assert len(lineage_reconstruction_tasks) == 2 + assert [ + lineage_reconstruction_tasks[0][0].name, + lineage_reconstruction_tasks[1][0].name, + ] == ["Actor.run", "task"] + assert ( + lineage_reconstruction_tasks[0][0].labels == {"key2": "value2"} and lineage_reconstruction_tasks[0][0].status == expected_task_status and lineage_reconstruction_tasks[0][1] == 1 ) + assert ( + lineage_reconstruction_tasks[1][0].labels == {"key1": "value1"} + and lineage_reconstruction_tasks[1][0].status == expected_task_status + and lineage_reconstruction_tasks[1][1] == 1 + ) + + return True wait_for_condition(lambda: verify(common_pb2.TaskStatus.PENDING_NODE_ASSIGNMENT)) - cluster.add_node(resources={"worker": 1}) + cluster.add_node(resources={"worker": 2}) wait_for_condition(lambda: verify(common_pb2.TaskStatus.SUBMITTED_TO_WORKER)) diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 488c52069aa49..4ecfab358c7ba 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -135,7 +135,8 @@ class TaskSpecBuilder { const TaskID &submitter_task_id, const std::shared_ptr runtime_env_info = nullptr, const std::string &concurrency_group_name = "", - bool enable_task_events = true) { + bool enable_task_events = true, + const std::unordered_map &labels = {}) { message_->set_type(TaskType::NORMAL_TASK); message_->set_name(name); message_->set_language(language); @@ -165,6 +166,7 @@ class TaskSpecBuilder { } message_->set_concurrency_group_name(concurrency_group_name); message_->set_enable_task_events(enable_task_events); + message_->mutable_labels()->insert(labels.begin(), labels.end()); return *this; } diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index 4228cd392351e..39257bbb7fcc6 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -33,7 +33,8 @@ rpc::ActorHandle CreateInnerActorHandle( const std::string &ray_namespace, int32_t max_pending_calls, bool execute_out_of_order, - absl::optional enable_task_events) { + absl::optional enable_task_events, + const std::unordered_map &labels) { rpc::ActorHandle inner; inner.set_actor_id(actor_id.Data(), actor_id.Size()); inner.set_owner_id(owner_id.Binary()); @@ -50,6 +51,7 @@ rpc::ActorHandle CreateInnerActorHandle( inner.set_execute_out_of_order(execute_out_of_order); inner.set_max_pending_calls(max_pending_calls); inner.set_enable_task_events(enable_task_events.value_or(kDefaultTaskEventEnabled)); + inner.mutable_labels()->insert(labels.begin(), labels.end()); return inner; } @@ -82,6 +84,7 @@ rpc::ActorHandle CreateInnerActorHandleFromActorData( inner.set_execute_out_of_order( task_spec.actor_creation_task_spec().execute_out_of_order()); inner.set_max_pending_calls(task_spec.actor_creation_task_spec().max_pending_calls()); + inner.mutable_labels()->insert(task_spec.labels().begin(), task_spec.labels().end()); return inner; } } // namespace @@ -100,7 +103,8 @@ ActorHandle::ActorHandle( const std::string &ray_namespace, int32_t max_pending_calls, bool execute_out_of_order, - absl::optional enable_task_events) + absl::optional enable_task_events, + const std::unordered_map &labels) : ActorHandle(CreateInnerActorHandle(actor_id, owner_id, owner_address, @@ -114,7 +118,8 @@ ActorHandle::ActorHandle( ray_namespace, max_pending_calls, execute_out_of_order, - enable_task_events)) {} + enable_task_events, + labels)) {} ActorHandle::ActorHandle(const std::string &serialized) : ActorHandle(CreateInnerActorHandleFromString(serialized)) {} diff --git a/src/ray/core_worker/actor_handle.h b/src/ray/core_worker/actor_handle.h index 22f00c066dffc..98306cb6d6b61 100644 --- a/src/ray/core_worker/actor_handle.h +++ b/src/ray/core_worker/actor_handle.h @@ -45,7 +45,8 @@ class ActorHandle { const std::string &ray_namespace, int32_t max_pending_calls, bool execute_out_of_order = false, - absl::optional enable_task_events = absl::nullopt); + absl::optional enable_task_events = absl::nullopt, + const std::unordered_map &labels = {}); /// Constructs an ActorHandle from a serialized string. explicit ActorHandle(const std::string &serialized); @@ -105,6 +106,10 @@ class ActorHandle { bool ExecuteOutOfOrder() const { return inner_.execute_out_of_order(); } + const ::google::protobuf::Map &GetLabels() const { + return inner_.labels(); + } + private: // Protobuf-defined persistent state of the actor handle. const rpc::ActorHandle inner_; diff --git a/src/ray/core_worker/actor_manager.h b/src/ray/core_worker/actor_manager.h index dcfe8e11a68d6..a42cdcc13d6bc 100644 --- a/src/ray/core_worker/actor_manager.h +++ b/src/ray/core_worker/actor_manager.h @@ -20,6 +20,7 @@ #include "ray/core_worker/actor_creator.h" #include "ray/core_worker/actor_handle.h" #include "ray/core_worker/reference_count.h" +#include "ray/core_worker/transport/actor_task_submitter.h" #include "ray/core_worker/transport/task_receiver.h" #include "ray/gcs/gcs_client/gcs_client.h" namespace ray { diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 3a160cd302d86..bc4c18c22bb10 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -67,14 +67,16 @@ struct TaskOptions { const std::string &concurrency_group_name = "", int64_t generator_backpressure_num_objects = -1, const std::string &serialized_runtime_env_info = "{}", - bool enable_task_events = kDefaultTaskEventEnabled) + bool enable_task_events = kDefaultTaskEventEnabled, + const std::unordered_map &labels = {}) : name(name), num_returns(num_returns), resources(resources), concurrency_group_name(concurrency_group_name), serialized_runtime_env_info(serialized_runtime_env_info), generator_backpressure_num_objects(generator_backpressure_num_objects), - enable_task_events(enable_task_events) {} + enable_task_events(enable_task_events), + labels(labels) {} /// The name of this task. std::string name; @@ -95,6 +97,7 @@ struct TaskOptions { /// True if task events (worker::TaskEvent) from this task should be reported, default /// to true. bool enable_task_events = kDefaultTaskEventEnabled; + std::unordered_map labels; }; /// Options for actor creation tasks. @@ -115,7 +118,8 @@ struct ActorCreationOptions { const std::vector &concurrency_groups = {}, bool execute_out_of_order = false, int32_t max_pending_calls = -1, - bool enable_task_events = kDefaultTaskEventEnabled) + bool enable_task_events = kDefaultTaskEventEnabled, + const std::unordered_map &labels = {}) : max_restarts(max_restarts), max_task_retries(max_task_retries), max_concurrency(max_concurrency), @@ -132,7 +136,8 @@ struct ActorCreationOptions { execute_out_of_order(execute_out_of_order), max_pending_calls(max_pending_calls), scheduling_strategy(scheduling_strategy), - enable_task_events(enable_task_events) { + enable_task_events(enable_task_events), + labels(labels) { // Check that resources is a subset of placement resources. for (auto &resource : resources) { auto it = this->placement_resources.find(resource.first); @@ -187,6 +192,7 @@ struct ActorCreationOptions { /// True if task events (worker::TaskEvent) from this creation task should be reported /// default to true. const bool enable_task_events = kDefaultTaskEventEnabled; + const std::unordered_map labels; }; using PlacementStrategy = rpc::PlacementStrategy; @@ -285,11 +291,11 @@ template <> struct hash { size_t operator()(const ray::rpc::LineageReconstructionTask &task) const { size_t hash = std::hash()(task.name()); - for (const auto &resource : task.resources()) { - hash ^= std::hash()(resource.first); - hash ^= std::hash()(resource.second); - } hash ^= std::hash()(task.status()); + for (const auto &label : task.labels()) { + hash ^= std::hash()(label.first); + hash ^= std::hash()(label.second); + } return hash; } }; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 4fa0432dbd086..09c0544eb2ca6 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -2164,7 +2164,8 @@ void CoreWorker::BuildCommonTaskSpec( const std::string &concurrency_group_name, bool include_job_config, int64_t generator_backpressure_num_objects, - bool enable_task_events) { + bool enable_task_events, + const std::unordered_map &labels) { // Build common task spec. auto override_runtime_env_info = OverrideTaskOrActorRuntimeEnvInfo(serialized_runtime_env_info); @@ -2210,7 +2211,8 @@ void CoreWorker::BuildCommonTaskSpec( main_thread_current_task_id, override_runtime_env_info, concurrency_group_name, - enable_task_events); + enable_task_events, + labels); // Set task arguments. for (const auto &arg : args) { builder.AddArg(*arg); @@ -2269,7 +2271,8 @@ std::vector CoreWorker::SubmitTask( /*include_job_config*/ true, /*generator_backpressure_num_objects*/ task_options.generator_backpressure_num_objects, - /*enable_task_event*/ task_options.enable_task_events); + /*enable_task_event*/ task_options.enable_task_events, + task_options.labels); ActorID root_detached_actor_id; if (!worker_context_.GetRootDetachedActorID().IsNil()) { root_detached_actor_id = worker_context_.GetRootDetachedActorID(); @@ -2360,7 +2363,8 @@ Status CoreWorker::CreateActor(const RayFunction &function, /*concurrency_group_name*/ "", /*include_job_config*/ true, /*generator_backpressure_num_objects*/ -1, - /*enable_task_events*/ actor_creation_options.enable_task_events); + /*enable_task_events*/ actor_creation_options.enable_task_events, + actor_creation_options.labels); // If the namespace is not specified, get it from the job. const auto ray_namespace = (actor_creation_options.ray_namespace.empty() @@ -2380,7 +2384,8 @@ Status CoreWorker::CreateActor(const RayFunction &function, ray_namespace, actor_creation_options.max_pending_calls, actor_creation_options.execute_out_of_order, - actor_creation_options.enable_task_events); + actor_creation_options.enable_task_events, + actor_creation_options.labels); std::string serialized_actor_handle; actor_handle->Serialize(&serialized_actor_handle); ActorID root_detached_actor_id; @@ -3991,7 +3996,7 @@ void CoreWorker::ProcessSubscribeObjectLocations( std::unordered_map CoreWorker::GetLocalOngoingLineageReconstructionTasks() const { - return task_manager_->GetOngoingLineageReconstructionTasks(); + return task_manager_->GetOngoingLineageReconstructionTasks(*actor_manager_); } Status CoreWorker::GetLocalObjectLocations( diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index b38cd7007aa02..72536ed1f1e03 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1467,7 +1467,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { const std::string &concurrency_group_name = "", bool include_job_config = false, int64_t generator_backpressure_num_objects = -1, - bool enable_task_events = true); + bool enable_task_events = true, + const std::unordered_map &labels = {}); void SetCurrentTaskId(const TaskID &task_id, uint64_t attempt_number, const std::string &task_name); diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index bc5a78c7862ee..cb175bfd2ebb0 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -16,6 +16,7 @@ #include "ray/common/buffer.h" #include "ray/common/common_protocol.h" +#include "ray/core_worker/actor_manager.h" #include "ray/gcs/pb_util.h" #include "ray/util/exponential_backoff.h" #include "ray/util/util.h" @@ -1475,7 +1476,8 @@ void TaskManager::SetTaskStatus( } std::unordered_map -TaskManager::GetOngoingLineageReconstructionTasks() const { +TaskManager::GetOngoingLineageReconstructionTasks( + const ActorManager &actor_manager) const { absl::MutexLock lock(&mu_); std::unordered_map result; for (const auto &task_it : submissible_tasks_) { @@ -1491,9 +1493,16 @@ TaskManager::GetOngoingLineageReconstructionTasks() const { rpc::LineageReconstructionTask task; task.set_name(task_entry.spec.GetName()); - auto resources = task_entry.spec.GetRequiredResources().GetResourceUnorderedMap(); - task.mutable_resources()->insert(resources.begin(), resources.end()); task.set_status(task_entry.GetStatus()); + if (task_entry.spec.IsNormalTask()) { + task.mutable_labels()->insert(task_entry.spec.GetMessage().labels().begin(), + task_entry.spec.GetMessage().labels().end()); + } else if (task_entry.spec.IsActorTask()) { + auto actor_handle = actor_manager.GetActorHandle(task_entry.spec.ActorId()); + RAY_CHECK(actor_handle) << "Actor task must be submitted via actor handle"; + const auto &labels = actor_handle->GetLabels(); + task.mutable_labels()->insert(labels.begin(), labels.end()); + } if (result.find(task) != result.end()) { result[task] += 1; diff --git a/src/ray/core_worker/task_manager.h b/src/ray/core_worker/task_manager.h index 196e18beb2776..49188e9a630fc 100644 --- a/src/ray/core_worker/task_manager.h +++ b/src/ray/core_worker/task_manager.h @@ -30,6 +30,8 @@ namespace ray { namespace core { +class ActorManager; + class TaskFinisherInterface { public: virtual void CompletePendingTask(const TaskID &task_id, @@ -603,7 +605,7 @@ class TaskManager : public TaskFinisherInterface, public TaskResubmissionInterfa /// Key is the lineage reconstruction task info. /// Value is the number of ongoing lineage reconstruction tasks of this type. std::unordered_map - GetOngoingLineageReconstructionTasks() const; + GetOngoingLineageReconstructionTasks(const ActorManager &actor_manager) const; /// Returns the generator ID that contains the dynamically allocated /// ObjectRefs, if the task is dynamic. Else, returns Nil. diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 604c92d0d4dbd..f18175a12f2c0 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -556,6 +556,8 @@ message TaskSpec { // this field contains the detached actor id. // Otherwise it's empty and is originated from a driver. bytes root_detached_actor_id = 40; + // The key-value labels for task and actor. + map labels = 41; } message TaskInfoEntry { @@ -986,6 +988,10 @@ message NamedActorInfo { message LineageReconstructionTask { string name = 1; - map resources = 2; - TaskStatus status = 3; + TaskStatus status = 2; + // If the task is a normal task, + // this has the labels of the normal task. + // If the task is an actor task, + // this has the labels of the corresponding actor. + map labels = 3; } diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index caa869ce18e98..9b6dad5191dd3 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -70,6 +70,9 @@ message ActorHandle { // Whether task events will be reported from this actor. bool enable_task_events = 14; + + // The key-value labels for actor. + map labels = 15; } message PushTaskRequest {