From 530f2d7c5eedff4ece7f94ea119ff444318013aa Mon Sep 17 00:00:00 2001 From: mwtian <81660174+mwtian@users.noreply.github.com> Date: Fri, 22 Oct 2021 10:52:36 -0700 Subject: [PATCH] [Pubsub] Wrap Redis-based publisher in GCS to allow incrementally switching to the GCS-based publisher (#19600) ## Why are these changes needed? The most significant change of the PR is the `GcsPublisher` wrapper added to `src/ray/gcs/pubsub/gcs_pub_sub.h`. It forwards publishing to the underlying `GcsPubSub` (Redis-based) or `pubsub::Publisher` (GCS-based) depending on the migration status, so it allows incremental migration by channel. - Since it was decided that we want to use typed ID and messages for GCS-based publishing, each member function of `GcsPublisher` accepts a typed message. Most of the modified files are from migrating publishing logic in GCS to use `GcsPublisher` instead of `GcsPubSub`. Later on, `GcsPublisher` member functions will be migrated to use GCS-based publishing. This change should make no functionality difference. If this looks ok, a similar change would be made for subscribers in GCS client. ## Related issue number --- BUILD.bazel | 17 +-- .../gcs/gcs_server/gcs_actor_distribution.cc | 6 +- .../gcs/gcs_server/gcs_actor_distribution.h | 4 +- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 40 +++--- src/ray/gcs/gcs_server/gcs_actor_manager.h | 12 +- src/ray/gcs/gcs_server/gcs_actor_scheduler.cc | 6 +- src/ray/gcs/gcs_server/gcs_actor_scheduler.h | 6 +- src/ray/gcs/gcs_server/gcs_job_manager.cc | 10 +- src/ray/gcs/gcs_server/gcs_job_manager.h | 10 +- src/ray/gcs/gcs_server/gcs_node_manager.cc | 22 ++-- src/ray/gcs/gcs_server/gcs_node_manager.h | 10 +- src/ray/gcs/gcs_server/gcs_object_manager.cc | 8 +- src/ray/gcs/gcs_server/gcs_object_manager.h | 8 +- .../gcs/gcs_server/gcs_resource_manager.cc | 17 +-- src/ray/gcs/gcs_server/gcs_resource_manager.h | 6 +- src/ray/gcs/gcs_server/gcs_server.cc | 46 ++++--- src/ray/gcs/gcs_server/gcs_server.h | 5 +- src/ray/gcs/gcs_server/gcs_worker_manager.cc | 14 +- src/ray/gcs/gcs_server/gcs_worker_manager.h | 6 +- .../gcs/gcs_server/task_info_handler_impl.cc | 5 +- .../gcs/gcs_server/task_info_handler_impl.h | 6 +- .../gcs_server/test/gcs_actor_manager_test.cc | 7 +- .../test/gcs_actor_scheduler_mock_test.cc | 5 +- .../test/gcs_based_actor_scheduler_test.cc | 13 +- .../gcs_server/test/gcs_node_manager_test.cc | 9 +- .../test/gcs_object_manager_test.cc | 10 +- .../test/gcs_placement_group_manager_test.cc | 5 +- .../gcs_placement_group_scheduler_test.cc | 7 +- .../gcs_server/test/gcs_server_test_util.h | 2 +- .../test/raylet_based_actor_scheduler_test.cc | 9 +- src/ray/gcs/pubsub/gcs_pub_sub.cc | 99 +++++++++++--- src/ray/gcs/pubsub/gcs_pub_sub.h | 123 +++++++++++++++--- src/ray/gcs/pubsub/test/gcs_pub_sub_test.cc | 6 +- src/ray/pubsub/publisher.h | 24 ++-- 34 files changed, 347 insertions(+), 236 deletions(-) diff --git a/BUILD.bazel b/BUILD.bazel index 34f086a27709..949a93dfcd84 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -485,20 +485,17 @@ cc_binary( cc_library( name = "gcs_pub_sub_lib", - srcs = glob( - [ - "src/ray/gcs/pubsub/gcs_pub_sub.cc", - ], - ), - hdrs = glob( - [ - "src/ray/gcs/pubsub/gcs_pub_sub.h", - ], - ), + srcs = [ + "src/ray/gcs/pubsub/gcs_pub_sub.cc", + ], + hdrs = [ + "src/ray/gcs/pubsub/gcs_pub_sub.h", + ], copts = COPTS, strip_include_prefix = "src", deps = [ ":gcs", + ":pubsub_lib", ":ray_common", ":redis_client", ], diff --git a/src/ray/gcs/gcs_server/gcs_actor_distribution.cc b/src/ray/gcs/gcs_server/gcs_actor_distribution.cc index bec0fb7b89f7..17f6563bb83b 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_distribution.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_distribution.cc @@ -34,8 +34,8 @@ const ResourceSet &GcsActorWorkerAssignment::GetResources() const { bool GcsActorWorkerAssignment::IsShared() const { return is_shared_; } GcsBasedActorScheduler::GcsBasedActorScheduler( - instrumented_io_context &io_context, gcs::GcsActorTable &gcs_actor_table, - const GcsNodeManager &gcs_node_manager, std::shared_ptr gcs_pub_sub, + instrumented_io_context &io_context, GcsActorTable &gcs_actor_table, + const GcsNodeManager &gcs_node_manager, std::shared_ptr gcs_resource_manager, std::shared_ptr gcs_resource_scheduler, std::function)> schedule_failure_handler, @@ -43,7 +43,7 @@ GcsBasedActorScheduler::GcsBasedActorScheduler( schedule_success_handler, std::shared_ptr raylet_client_pool, rpc::ClientFactoryFn client_factory) - : GcsActorScheduler(io_context, gcs_actor_table, gcs_node_manager, gcs_pub_sub, + : GcsActorScheduler(io_context, gcs_actor_table, gcs_node_manager, schedule_failure_handler, schedule_success_handler, raylet_client_pool, client_factory), gcs_resource_manager_(std::move(gcs_resource_manager)), diff --git a/src/ray/gcs/gcs_server/gcs_actor_distribution.h b/src/ray/gcs/gcs_server/gcs_actor_distribution.h index 55f0f492e9a7..0411d4e1debf 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_distribution.h +++ b/src/ray/gcs/gcs_server/gcs_actor_distribution.h @@ -81,8 +81,8 @@ class GcsBasedActorScheduler : public GcsActorScheduler { /// \param client_factory Factory to create remote core worker client, default factor /// will be used if not set. explicit GcsBasedActorScheduler( - instrumented_io_context &io_context, gcs::GcsActorTable &gcs_actor_table, - const GcsNodeManager &gcs_node_manager, std::shared_ptr gcs_pub_sub, + instrumented_io_context &io_context, GcsActorTable &gcs_actor_table, + const GcsNodeManager &gcs_node_manager, std::shared_ptr gcs_resource_manager, std::shared_ptr gcs_resource_scheduler, std::function)> schedule_failure_handler, diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 48a469f1cb37..a4fd4e9e5386 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -109,8 +109,8 @@ void GcsActor::SetActorWorkerAssignment( GcsActorManager::GcsActorManager( boost::asio::io_context &io_context, std::shared_ptr scheduler, - std::shared_ptr gcs_table_storage, - std::shared_ptr gcs_pub_sub, RuntimeEnvManager &runtime_env_manager, + std::shared_ptr gcs_table_storage, + std::shared_ptr gcs_publisher, RuntimeEnvManager &runtime_env_manager, std::function destroy_owned_placement_group_if_needed, std::function get_ray_namespace, std::function, boost::posix_time::milliseconds)> @@ -119,7 +119,7 @@ GcsActorManager::GcsActorManager( : io_context_(io_context), gcs_actor_scheduler_(std::move(scheduler)), gcs_table_storage_(std::move(gcs_table_storage)), - gcs_pub_sub_(std::move(gcs_pub_sub)), + gcs_publisher_(std::move(gcs_publisher)), worker_client_factory_(worker_client_factory), destroy_owned_placement_group_if_needed_(destroy_owned_placement_group_if_needed), get_ray_namespace_(get_ray_namespace), @@ -373,8 +373,8 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ absl::GetCurrentTimeNanos(), job_id); RAY_LOG(WARNING) << error_data_ptr->SerializeAsString(); - RAY_CHECK_OK(gcs_pub_sub_->Publish(ERROR_INFO_CHANNEL, job_id.Hex(), - error_data_ptr->SerializeAsString(), nullptr)); + RAY_CHECK_OK( + gcs_publisher_->PublishError(job_id.Hex(), *error_data_ptr, nullptr)); } actors_in_namespace.emplace(actor->GetName(), actor->GetActorID()); @@ -420,9 +420,8 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ // the actor state to DEAD to avoid race condition. return; } - RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor->GetActorID().Hex(), - actor->GetActorTableData().SerializeAsString(), - nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishActor(actor->GetActorID(), + actor->GetActorTableData(), nullptr)); // Invoke all callbacks for all registration requests of this actor (duplicated // requests are included) and remove all of them from // actor_to_register_callbacks_. @@ -491,8 +490,7 @@ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request, actor->GetMutableActorTableData()->set_state(rpc::ActorTableData::PENDING_CREATION); const auto &actor_table_data = actor->GetActorTableData(); // Pub this state for dashboard showing. - RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(), - actor_table_data.SerializeAsString(), nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishActor(actor_id, actor_table_data, nullptr)); RemoveUnresolvedActor(actor); // Update the registered actor as its creation task specification may have changed due @@ -668,9 +666,8 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) { RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor->GetActorID(), *actor_table_data, [this, actor_id, actor_table_data](Status status) { - RAY_CHECK_OK(gcs_pub_sub_->Publish( - ACTOR_CHANNEL, actor_id.Hex(), - GenActorDataOnlyWithStates(*actor_table_data)->SerializeAsString(), nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishActor( + actor_id, *GenActorDataOnlyWithStates(*actor_table_data), nullptr)); // Destroy placement group owned by this actor. destroy_owned_placement_group_if_needed_(actor_id); })); @@ -867,10 +864,8 @@ void GcsActorManager::ReconstructActor( RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor_id, *mutable_actor_table_data, [this, actor_id, mutable_actor_table_data](Status status) { - RAY_CHECK_OK(gcs_pub_sub_->Publish( - ACTOR_CHANNEL, actor_id.Hex(), - GenActorDataOnlyWithStates(*mutable_actor_table_data)->SerializeAsString(), - nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishActor( + actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr)); })); gcs_actor_scheduler_->Schedule(actor); } else { @@ -907,10 +902,8 @@ void GcsActorManager::ReconstructActor( if (actor->IsDetached()) { DestroyActor(actor_id); } - RAY_CHECK_OK(gcs_pub_sub_->Publish( - ACTOR_CHANNEL, actor_id.Hex(), - GenActorDataOnlyWithStates(*mutable_actor_table_data)->SerializeAsString(), - nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishActor( + actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr)); })); // The actor is dead, but we should not remove the entry from the // registered actors yet. If the actor is owned, we will destroy the actor @@ -958,9 +951,8 @@ void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr &ac RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put( actor_id, actor_table_data, [this, actor_id, actor_table_data, actor, reply](Status status) { - RAY_CHECK_OK(gcs_pub_sub_->Publish( - ACTOR_CHANNEL, actor_id.Hex(), - GenActorDataOnlyWithStates(actor_table_data)->SerializeAsString(), nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishActor( + actor_id, *GenActorDataOnlyWithStates(actor_table_data), nullptr)); // Invoke all callbacks for all registration requests of this actor (duplicated // requests are included) and remove all of them from // actor_to_create_callbacks_. diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 9050eb4dfc9f..6cc7167edb15 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -193,12 +193,12 @@ class GcsActorManager : public rpc::ActorInfoHandler { /// /// \param scheduler Used to schedule actor creation tasks. /// \param gcs_table_storage Used to flush actor data to storage. - /// \param gcs_pub_sub Used to publish gcs message. + /// \param gcs_publisher Used to publish gcs message. GcsActorManager( boost::asio::io_context &io_context, std::shared_ptr scheduler, - std::shared_ptr gcs_table_storage, - std::shared_ptr gcs_pub_sub, RuntimeEnvManager &runtime_env_manager, + std::shared_ptr gcs_table_storage, + std::shared_ptr gcs_publisher, RuntimeEnvManager &runtime_env_manager, std::function destroy_ownded_placement_group_if_needed, std::function get_ray_namespace, std::function, boost::posix_time::milliseconds)> @@ -494,11 +494,11 @@ class GcsActorManager : public rpc::ActorInfoHandler { boost::asio::io_context &io_context_; /// The scheduler to schedule all registered actors. - std::shared_ptr gcs_actor_scheduler_; + std::shared_ptr gcs_actor_scheduler_; /// Used to update actor information upon creation, deletion, etc. - std::shared_ptr gcs_table_storage_; + std::shared_ptr gcs_table_storage_; /// A publisher for publishing gcs messages. - std::shared_ptr gcs_pub_sub_; + std::shared_ptr gcs_publisher_; /// Factory to produce clients to workers. This is used to communicate with /// actors and their owners. rpc::ClientFactoryFn worker_client_factory_; diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index cc4a426cea65..6311eb6b5c6d 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -24,9 +24,8 @@ namespace ray { namespace gcs { GcsActorScheduler::GcsActorScheduler( - instrumented_io_context &io_context, gcs::GcsActorTable &gcs_actor_table, - const gcs::GcsNodeManager &gcs_node_manager, - std::shared_ptr gcs_pub_sub, + instrumented_io_context &io_context, GcsActorTable &gcs_actor_table, + const GcsNodeManager &gcs_node_manager, std::function)> schedule_failure_handler, std::function, const rpc::PushTaskReply &reply)> schedule_success_handler, @@ -35,7 +34,6 @@ GcsActorScheduler::GcsActorScheduler( : io_context_(io_context), gcs_actor_table_(gcs_actor_table), gcs_node_manager_(gcs_node_manager), - gcs_pub_sub_(std::move(gcs_pub_sub)), schedule_failure_handler_(std::move(schedule_failure_handler)), schedule_success_handler_(std::move(schedule_success_handler)), raylet_client_pool_(raylet_client_pool), diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h index 55bd6b6bd73f..1c203b5e36ad 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.h +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.h @@ -100,8 +100,8 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { /// \param client_factory Factory to create remote core worker client, default factor /// will be used if not set. explicit GcsActorScheduler( - instrumented_io_context &io_context, gcs::GcsActorTable &gcs_actor_table, - const GcsNodeManager &gcs_node_manager, std::shared_ptr gcs_pub_sub, + instrumented_io_context &io_context, GcsActorTable &gcs_actor_table, + const GcsNodeManager &gcs_node_manager, std::function)> schedule_failure_handler, std::function, const rpc::PushTaskReply &reply)> schedule_success_handler, @@ -305,8 +305,6 @@ class GcsActorScheduler : public GcsActorSchedulerInterface { node_to_workers_when_creating_; /// Reference of GcsNodeManager. const GcsNodeManager &gcs_node_manager_; - /// A publisher for publishing gcs messages. - std::shared_ptr gcs_pub_sub_; /// The handler to handle the scheduling failures. std::function)> schedule_failure_handler_; /// The handler to handle the successful scheduling. diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.cc b/src/ray/gcs/gcs_server/gcs_job_manager.cc index 36a0dce664e6..45573ef0a2af 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_job_manager.cc @@ -46,9 +46,7 @@ void GcsJobManager::HandleAddJob(const rpc::AddJobRequest &request, RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id << ", driver pid = " << mutable_job_table_data.driver_pid(); } else { - RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Hex(), - mutable_job_table_data.SerializeAsString(), - nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, mutable_job_table_data, nullptr)); if (mutable_job_table_data.config().has_runtime_env()) { runtime_env_manager_.AddURIReference( job_id.Hex(), mutable_job_table_data.config().runtime_env()); @@ -81,8 +79,7 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data, if (!status.ok()) { RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id; } else { - RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Hex(), - job_table_data.SerializeAsString(), nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, job_table_data, nullptr)); runtime_env_manager_.RemoveURIReference(job_id.Hex()); ClearJobInfos(job_id); RAY_LOG(INFO) << "Finished marking job state, job id = " << job_id; @@ -159,8 +156,7 @@ void GcsJobManager::HandleReportJobError(const rpc::ReportJobErrorRequest &reque rpc::ReportJobErrorReply *reply, rpc::SendReplyCallback send_reply_callback) { auto job_id = JobID::FromBinary(request.job_error().job_id()); - RAY_CHECK_OK(gcs_pub_sub_->Publish(ERROR_INFO_CHANNEL, job_id.Hex(), - request.job_error().SerializeAsString(), nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishError(job_id.Hex(), request.job_error(), nullptr)); GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); } diff --git a/src/ray/gcs/gcs_server/gcs_job_manager.h b/src/ray/gcs/gcs_server/gcs_job_manager.h index ddb0ffb2c0a7..716753a3a846 100644 --- a/src/ray/gcs/gcs_server/gcs_job_manager.h +++ b/src/ray/gcs/gcs_server/gcs_job_manager.h @@ -26,11 +26,11 @@ namespace gcs { /// This implementation class of `JobInfoHandler`. class GcsJobManager : public rpc::JobInfoHandler { public: - explicit GcsJobManager(std::shared_ptr gcs_table_storage, - std::shared_ptr gcs_pub_sub, + explicit GcsJobManager(std::shared_ptr gcs_table_storage, + std::shared_ptr gcs_publisher, RuntimeEnvManager &runtime_env_manager) : gcs_table_storage_(std::move(gcs_table_storage)), - gcs_pub_sub_(std::move(gcs_pub_sub)), + gcs_publisher_(std::move(gcs_publisher)), runtime_env_manager_(runtime_env_manager) {} void Initialize(const GcsInitData &gcs_init_data); @@ -60,8 +60,8 @@ class GcsJobManager : public rpc::JobInfoHandler { std::string GetRayNamespace(const JobID &job_id) const; private: - std::shared_ptr gcs_table_storage_; - std::shared_ptr gcs_pub_sub_; + std::shared_ptr gcs_table_storage_; + std::shared_ptr gcs_publisher_; /// Listeners which monitors the finish of jobs. std::vector)>> job_finished_listeners_; diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index c84e19372a6b..8962dbd6443d 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -25,9 +25,9 @@ namespace ray { namespace gcs { ////////////////////////////////////////////////////////////////////////////////////////// -GcsNodeManager::GcsNodeManager(std::shared_ptr gcs_pub_sub, - std::shared_ptr gcs_table_storage) - : gcs_pub_sub_(gcs_pub_sub), gcs_table_storage_(gcs_table_storage) {} +GcsNodeManager::GcsNodeManager(std::shared_ptr gcs_publisher, + std::shared_ptr gcs_table_storage) + : gcs_publisher_(gcs_publisher), gcs_table_storage_(gcs_table_storage) {} void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request, rpc::RegisterNodeReply *reply, @@ -40,8 +40,7 @@ void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request, RAY_CHECK_OK(status); RAY_LOG(INFO) << "Finished registering node info, node id = " << node_id << ", address = " << request.node_info().node_manager_address(); - RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(), - request.node_info().SerializeAsString(), nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, request.node_info(), nullptr)); AddNode(std::make_shared(request.node_info())); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; @@ -68,8 +67,7 @@ void GcsNodeManager::HandleUnregisterNode(const rpc::UnregisterNodeRequest &requ send_reply_callback](const Status &status) { auto on_done = [this, node_id, node_info_delta, reply, send_reply_callback](const Status &status) { - RAY_CHECK_OK(gcs_pub_sub_->Publish( - NODE_CHANNEL, node_id.Hex(), node_info_delta->SerializeAsString(), nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr)); GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); RAY_LOG(INFO) << "Finished unregistering node info, node id = " << node_id; }; @@ -159,14 +157,13 @@ std::shared_ptr GcsNodeManager::RemoveNode( << " has been marked dead because the detector" << " has missed too many heartbeats from it. This can happen when a " "raylet crashes unexpectedly or has lagging heartbeats."; - auto error_data_ptr = - gcs::CreateErrorTableData(type, error_message.str(), current_time_ms()); RAY_EVENT(ERROR, EL_RAY_NODE_REMOVED) .WithField("node_id", node_id.Hex()) .WithField("ip", removed_node->node_manager_address()) << error_message.str(); - RAY_CHECK_OK(gcs_pub_sub_->Publish(ERROR_INFO_CHANNEL, node_id.Hex(), - error_data_ptr->SerializeAsString(), nullptr)); + auto error_data_ptr = + gcs::CreateErrorTableData(type, error_message.str(), current_time_ms()); + RAY_CHECK_OK(gcs_publisher_->PublishError(node_id.Hex(), *error_data_ptr, nullptr)); } // Notify all listeners. @@ -189,8 +186,7 @@ void GcsNodeManager::OnNodeFailure(const NodeID &node_id) { auto on_done = [this, node_id, node_info_delta](const Status &status) { auto on_done = [this, node_id, node_info_delta](const Status &status) { - RAY_CHECK_OK(gcs_pub_sub_->Publish( - NODE_CHANNEL, node_id.Hex(), node_info_delta->SerializeAsString(), nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr)); }; RAY_CHECK_OK(gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done)); }; diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.h b/src/ray/gcs/gcs_server/gcs_node_manager.h index d7bff17ec5ef..f13538a8bb0f 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.h +++ b/src/ray/gcs/gcs_server/gcs_node_manager.h @@ -36,10 +36,10 @@ class GcsNodeManager : public rpc::NodeInfoHandler { public: /// Create a GcsNodeManager. /// - /// \param gcs_pub_sub GCS message publisher. + /// \param gcs_publisher GCS message publisher. /// \param gcs_table_storage GCS table external storage accessor. - explicit GcsNodeManager(std::shared_ptr gcs_pub_sub, - std::shared_ptr gcs_table_storage); + explicit GcsNodeManager(std::shared_ptr gcs_publisher, + std::shared_ptr gcs_table_storage); /// Handle register rpc request come from raylet. void HandleRegisterNode(const rpc::RegisterNodeRequest &request, @@ -138,9 +138,9 @@ class GcsNodeManager : public rpc::NodeInfoHandler { std::vector)>> node_removed_listeners_; /// A publisher for publishing gcs messages. - std::shared_ptr gcs_pub_sub_; + std::shared_ptr gcs_publisher_; /// Storage for GCS tables. - std::shared_ptr gcs_table_storage_; + std::shared_ptr gcs_table_storage_; // Debug info. enum CountType { diff --git a/src/ray/gcs/gcs_server/gcs_object_manager.cc b/src/ray/gcs/gcs_server/gcs_object_manager.cc index 818904d65b61..60ef1581b072 100644 --- a/src/ray/gcs/gcs_server/gcs_object_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_object_manager.cc @@ -96,8 +96,7 @@ void GcsObjectManager::HandleAddObjectLocation( notification.set_spilled_node_id(spilled_node_id.Binary()); } notification.set_size(size); - RAY_CHECK_OK(gcs_pub_sub_->Publish(OBJECT_CHANNEL, object_id.Hex(), - notification.SerializeAsString(), nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishObject(object_id, notification, nullptr)); RAY_LOG(DEBUG) << "Finished adding object location, job id = " << object_id.TaskId().JobId() << ", object id = " << object_id << ", node id = " << node_id << ", task id = " << object_id.TaskId() @@ -136,9 +135,8 @@ void GcsObjectManager::HandleRemoveObjectLocation( auto on_done = [this, object_id, node_id, reply, send_reply_callback](const Status &status) { if (status.ok()) { - RAY_CHECK_OK(gcs_pub_sub_->Publish( - OBJECT_CHANNEL, object_id.Hex(), - gcs::CreateObjectLocationChange(node_id, false)->SerializeAsString(), nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishObject( + object_id, *gcs::CreateObjectLocationChange(node_id, false), nullptr)); RAY_LOG(DEBUG) << "Finished removing object location, job id = " << object_id.TaskId().JobId() << ", object id = " << object_id << ", node id = " << node_id; diff --git a/src/ray/gcs/gcs_server/gcs_object_manager.h b/src/ray/gcs/gcs_server/gcs_object_manager.h index 6d4d39598cb6..8d77643fcc8e 100644 --- a/src/ray/gcs/gcs_server/gcs_object_manager.h +++ b/src/ray/gcs/gcs_server/gcs_object_manager.h @@ -25,10 +25,10 @@ namespace gcs { class GcsObjectManager : public rpc::ObjectInfoHandler { public: - explicit GcsObjectManager(std::shared_ptr gcs_table_storage, - std::shared_ptr &gcs_pub_sub, + explicit GcsObjectManager(std::shared_ptr gcs_table_storage, + std::shared_ptr &gcs_publisher, gcs::GcsNodeManager &gcs_node_manager) - : gcs_table_storage_(std::move(gcs_table_storage)), gcs_pub_sub_(gcs_pub_sub) { + : gcs_table_storage_(std::move(gcs_table_storage)), gcs_publisher_(gcs_publisher) { gcs_node_manager.AddNodeRemovedListener( [this](const std::shared_ptr &node) { // All of the related actors should be reconstructed when a node is removed from @@ -141,7 +141,7 @@ class GcsObjectManager : public rpc::ObjectInfoHandler { absl::flat_hash_map node_to_objects_ GUARDED_BY(mutex_); std::shared_ptr gcs_table_storage_; - std::shared_ptr gcs_pub_sub_; + std::shared_ptr gcs_publisher_; // Debug info. enum CountType { diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index 8910f9554957..9e36db6efafa 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -21,10 +21,10 @@ namespace ray { namespace gcs { GcsResourceManager::GcsResourceManager( - instrumented_io_context &main_io_service, std::shared_ptr gcs_pub_sub, + instrumented_io_context &main_io_service, std::shared_ptr gcs_publisher, std::shared_ptr gcs_table_storage, bool redis_broadcast_enabled) : periodical_runner_(main_io_service), - gcs_pub_sub_(gcs_pub_sub), + gcs_publisher_(gcs_publisher), gcs_table_storage_(gcs_table_storage), redis_broadcast_enabled_(redis_broadcast_enabled), max_broadcasting_batch_size_( @@ -89,9 +89,8 @@ void GcsResourceManager::HandleUpdateResources( node_resource_change.mutable_updated_resources()->insert(changed_resources->begin(), changed_resources->end()); if (redis_broadcast_enabled_) { - RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_RESOURCE_CHANNEL, node_id.Hex(), - node_resource_change.SerializeAsString(), - nullptr)); + RAY_CHECK_OK( + gcs_publisher_->PublishNodeResource(node_id, node_resource_change, nullptr)); } else { absl::MutexLock guard(&resource_buffer_mutex_); resources_buffer_proto_.add_batch()->mutable_change()->Swap( @@ -145,9 +144,8 @@ void GcsResourceManager::HandleDeleteResources( node_resource_change.add_deleted_resources(resource_name); } if (redis_broadcast_enabled_) { - RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_RESOURCE_CHANNEL, node_id.Hex(), - node_resource_change.SerializeAsString(), - nullptr)); + RAY_CHECK_OK( + gcs_publisher_->PublishNodeResource(node_id, node_resource_change, nullptr)); } else { absl::MutexLock guard(&resource_buffer_mutex_); resources_buffer_proto_.add_batch()->mutable_change()->Swap( @@ -421,8 +419,7 @@ void GcsResourceManager::SendBatchedResourceUsage() { rpc::ResourceUsageBatchData batch; GetResourceUsageBatchForBroadcast_Locked(batch); if (batch.ByteSizeLong() > 0) { - RAY_CHECK_OK(gcs_pub_sub_->Publish(RESOURCES_BATCH_CHANNEL, "", - batch.SerializeAsString(), nullptr)); + RAY_CHECK_OK(gcs_publisher_->PublishResourceBatch(batch, nullptr)); stats::OutboundHeartbeatSizeKB.Record(batch.ByteSizeLong() / 1024.0); } } diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index 016cb16acc53..a62d66642344 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -40,10 +40,10 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler { /// Create a GcsResourceManager. /// /// \param main_io_service The main event loop. - /// \param gcs_pub_sub GCS message publisher. + /// \param gcs_publisher GCS message publisher. /// \param gcs_table_storage GCS table external storage accessor. explicit GcsResourceManager(instrumented_io_context &main_io_service, - std::shared_ptr gcs_pub_sub, + std::shared_ptr gcs_publisher, std::shared_ptr gcs_table_storage, bool redis_broadcast_enabled); @@ -199,7 +199,7 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler { GUARDED_BY(resource_buffer_mutex_); /// A publisher for publishing gcs messages. - std::shared_ptr gcs_pub_sub_; + std::shared_ptr gcs_publisher_; /// Storage for GCS tables. std::shared_ptr gcs_table_storage_; /// Whether or not to broadcast resource usage via redis. diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 127ed2390790..711f04c159ec 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -26,6 +26,7 @@ #include "ray/gcs/gcs_server/gcs_worker_manager.h" #include "ray/gcs/gcs_server/stats_handler_impl.h" #include "ray/gcs/gcs_server/task_info_handler_impl.h" +#include "ray/pubsub/publisher.h" namespace ray { namespace gcs { @@ -60,12 +61,12 @@ void GcsServer::Start() { main_service_, redis_client_->GetPrimaryContext(), [this]() { Stop(); }); gcs_redis_failure_detector_->Start(); - // Init gcs pub sub instance. - gcs_pub_sub_ = std::make_shared(redis_client_); - + // Init GCS publisher instance. + std::unique_ptr inner_publisher; if (config_.grpc_pubsub_enabled) { - // Init grpc based pubsub - grpc_pubsub_publisher_ = std::make_shared( + // Init grpc based pubsub on GCS. + // TODO: Move this into GcsPublisher. + inner_publisher = std::make_unique( /*channels=*/std::vector< rpc::ChannelType>{rpc::ChannelType::WORKER_OBJECT_EVICTION, rpc::ChannelType::WORKER_REF_REMOVED_CHANNEL, @@ -75,6 +76,8 @@ void GcsServer::Start() { /*subscriber_timeout_ms=*/RayConfig::instance().subscriber_timeout_ms(), /*publish_batch_size_=*/RayConfig::instance().publish_batch_size()); } + gcs_publisher_ = + std::make_shared(redis_client_, std::move(inner_publisher)); // Init gcs table storage. gcs_table_storage_ = std::make_shared(redis_client_); @@ -181,8 +184,9 @@ void GcsServer::Stop() { } void GcsServer::InitGcsNodeManager(const GcsInitData &gcs_init_data) { - RAY_CHECK(redis_client_ && gcs_table_storage_ && gcs_pub_sub_); - gcs_node_manager_ = std::make_shared(gcs_pub_sub_, gcs_table_storage_); + RAY_CHECK(redis_client_ && gcs_table_storage_ && gcs_publisher_); + gcs_node_manager_ = + std::make_shared(gcs_publisher_, gcs_table_storage_); // Initialize by gcs tables data. gcs_node_manager_->Initialize(gcs_init_data); // Register service. @@ -209,9 +213,9 @@ void GcsServer::InitGcsHeartbeatManager(const GcsInitData &gcs_init_data) { } void GcsServer::InitGcsResourceManager(const GcsInitData &gcs_init_data) { - RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_); + RAY_CHECK(gcs_table_storage_ && gcs_publisher_); gcs_resource_manager_ = std::make_shared( - main_service_, gcs_pub_sub_, gcs_table_storage_, + main_service_, gcs_publisher_, gcs_table_storage_, !config_.grpc_based_resource_broadcast); // Initialize by gcs tables data. gcs_resource_manager_->Initialize(gcs_init_data); @@ -228,8 +232,8 @@ void GcsServer::InitGcsResourceScheduler() { } void GcsServer::InitGcsJobManager(const GcsInitData &gcs_init_data) { - RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_); - gcs_job_manager_ = std::make_unique(gcs_table_storage_, gcs_pub_sub_, + RAY_CHECK(gcs_table_storage_ && gcs_publisher_); + gcs_job_manager_ = std::make_unique(gcs_table_storage_, gcs_publisher_, *runtime_env_manager_); gcs_job_manager_->Initialize(gcs_init_data); @@ -240,7 +244,7 @@ void GcsServer::InitGcsJobManager(const GcsInitData &gcs_init_data) { } void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) { - RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_ && gcs_node_manager_); + RAY_CHECK(gcs_table_storage_ && gcs_publisher_ && gcs_node_manager_); std::unique_ptr scheduler; auto schedule_failure_handler = [this](std::shared_ptr actor) { // When there are no available nodes to schedule the actor the @@ -260,17 +264,17 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) { if (RayConfig::instance().gcs_actor_scheduling_enabled()) { RAY_CHECK(gcs_resource_manager_ && gcs_resource_scheduler_); scheduler = std::make_unique( - main_service_, gcs_table_storage_->ActorTable(), *gcs_node_manager_, gcs_pub_sub_, + main_service_, gcs_table_storage_->ActorTable(), *gcs_node_manager_, gcs_resource_manager_, gcs_resource_scheduler_, schedule_failure_handler, schedule_success_handler, raylet_client_pool_, client_factory); } else { scheduler = std::make_unique( - main_service_, gcs_table_storage_->ActorTable(), *gcs_node_manager_, gcs_pub_sub_, + main_service_, gcs_table_storage_->ActorTable(), *gcs_node_manager_, schedule_failure_handler, schedule_success_handler, raylet_client_pool_, client_factory); } gcs_actor_manager_ = std::make_shared( - main_service_, std::move(scheduler), gcs_table_storage_, gcs_pub_sub_, + main_service_, std::move(scheduler), gcs_table_storage_, gcs_publisher_, *runtime_env_manager_, [this](const ActorID &actor_id) { gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenActorDead(actor_id); @@ -320,9 +324,9 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) { } void GcsServer::InitObjectManager(const GcsInitData &gcs_init_data) { - RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_ && gcs_node_manager_); + RAY_CHECK(gcs_table_storage_ && gcs_publisher_ && gcs_node_manager_); gcs_object_manager_.reset( - new GcsObjectManager(gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_)); + new GcsObjectManager(gcs_table_storage_, gcs_publisher_, *gcs_node_manager_)); // Initialize by gcs tables data. gcs_object_manager_->Initialize(gcs_init_data); // Register service. @@ -347,9 +351,9 @@ void GcsServer::StoreGcsServerAddressInRedis() { } void GcsServer::InitTaskInfoHandler() { - RAY_CHECK(gcs_table_storage_ && gcs_pub_sub_); + RAY_CHECK(gcs_table_storage_ && gcs_publisher_); task_info_handler_.reset( - new rpc::DefaultTaskInfoHandler(gcs_table_storage_, gcs_pub_sub_)); + new rpc::DefaultTaskInfoHandler(gcs_table_storage_, gcs_publisher_)); // Register service. task_info_service_.reset( new rpc::TaskInfoGrpcService(main_service_, *task_info_handler_)); @@ -424,7 +428,7 @@ void GcsServer::InitRuntimeEnvManager() { void GcsServer::InitGcsWorkerManager() { gcs_worker_manager_ = - std::make_unique(gcs_table_storage_, gcs_pub_sub_); + std::make_unique(gcs_table_storage_, gcs_publisher_); // Register service. worker_info_service_.reset( new rpc::WorkerInfoGrpcService(main_service_, *gcs_worker_manager_)); @@ -508,7 +512,7 @@ void GcsServer::PrintDebugInfo() { << gcs_actor_manager_->DebugString() << "\n" << gcs_object_manager_->DebugString() << "\n" << gcs_placement_group_manager_->DebugString() << "\n" - << gcs_pub_sub_->DebugString() << "\n" + << gcs_publisher_->DebugString() << "\n" << ((rpc::DefaultTaskInfoHandler *)task_info_handler_.get())->DebugString(); if (config_.grpc_based_resource_broadcast) { diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 507ab2820cab..2f4b2d12269e 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -28,7 +28,6 @@ #include "ray/gcs/gcs_server/grpc_based_resource_broadcaster.h" #include "ray/gcs/pubsub/gcs_pub_sub.h" #include "ray/gcs/redis_client.h" -#include "ray/pubsub/publisher.h" #include "ray/rpc/client_call.h" #include "ray/rpc/gcs_server/gcs_rpc_server.h" #include "ray/rpc/node_manager/node_manager_client_pool.h" @@ -214,9 +213,7 @@ class GcsServer { /// Backend client. std::shared_ptr redis_client_; /// A publisher for publishing gcs messages. - std::shared_ptr gcs_pub_sub_; - /// Grpc based pubsub. - std::shared_ptr grpc_pubsub_publisher_; + std::shared_ptr gcs_publisher_; /// Grpc based pubsub's periodical runner. PeriodicalRunner pubsub_periodical_runner_; /// The gcs table storage. diff --git a/src/ray/gcs/gcs_server/gcs_worker_manager.cc b/src/ray/gcs/gcs_server/gcs_worker_manager.cc index 5844a8fb0f04..4614dfaaf8d5 100644 --- a/src/ray/gcs/gcs_server/gcs_worker_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_worker_manager.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "ray/gcs/gcs_server/gcs_worker_manager.h" + #include "ray/stats/stats.h" namespace ray { @@ -57,14 +58,11 @@ void GcsWorkerManager::HandleReportWorkerFailure( stats::UnintentionalWorkerFailures.Record(1); // Only publish worker_id and raylet_id in address as they are the only fields used // by sub clients. - auto worker_failure_delta = std::make_shared(); - worker_failure_delta->set_worker_id( - worker_failure_data->worker_address().worker_id()); - worker_failure_delta->set_raylet_id( - worker_failure_data->worker_address().raylet_id()); - RAY_CHECK_OK(gcs_pub_sub_->Publish(WORKER_CHANNEL, worker_id.Hex(), - worker_failure_delta->SerializeAsString(), - nullptr)); + rpc::WorkerDeltaData worker_failure; + worker_failure.set_worker_id(worker_failure_data->worker_address().worker_id()); + worker_failure.set_raylet_id(worker_failure_data->worker_address().raylet_id()); + RAY_CHECK_OK( + gcs_publisher_->PublishWorkerFailure(worker_id, worker_failure, nullptr)); } GCS_RPC_SEND_REPLY(send_reply_callback, reply, status); }; diff --git a/src/ray/gcs/gcs_server/gcs_worker_manager.h b/src/ray/gcs/gcs_server/gcs_worker_manager.h index 60001aa1208a..4eff051f9531 100644 --- a/src/ray/gcs/gcs_server/gcs_worker_manager.h +++ b/src/ray/gcs/gcs_server/gcs_worker_manager.h @@ -25,8 +25,8 @@ namespace gcs { class GcsWorkerManager : public rpc::WorkerInfoHandler { public: explicit GcsWorkerManager(std::shared_ptr gcs_table_storage, - std::shared_ptr &gcs_pub_sub) - : gcs_table_storage_(gcs_table_storage), gcs_pub_sub_(gcs_pub_sub) {} + std::shared_ptr &gcs_publisher) + : gcs_table_storage_(gcs_table_storage), gcs_publisher_(gcs_publisher) {} void HandleReportWorkerFailure(const rpc::ReportWorkerFailureRequest &request, rpc::ReportWorkerFailureReply *reply, @@ -49,7 +49,7 @@ class GcsWorkerManager : public rpc::WorkerInfoHandler { private: std::shared_ptr gcs_table_storage_; - std::shared_ptr gcs_pub_sub_; + std::shared_ptr gcs_publisher_; std::vector)>> worker_dead_listeners_; }; diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.cc b/src/ray/gcs/gcs_server/task_info_handler_impl.cc index 70cc80f0ee4e..9ae968617c95 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.cc +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.cc @@ -78,9 +78,8 @@ void DefaultTaskInfoHandler::HandleAddTaskLease(const AddTaskLeaseRequest &reque RAY_LOG(ERROR) << "Failed to add task lease, job id = " << task_id.JobId() << ", task id = " << task_id << ", node id = " << node_id; } else { - RAY_CHECK_OK(gcs_pub_sub_->Publish(TASK_LEASE_CHANNEL, task_id.Hex(), - request.task_lease_data().SerializeAsString(), - nullptr)); + RAY_CHECK_OK( + gcs_publisher_->PublishTaskLease(task_id, request.task_lease_data(), nullptr)); RAY_LOG(DEBUG) << "Finished adding task lease, job id = " << task_id.JobId() << ", task id = " << task_id << ", node id = " << node_id; } diff --git a/src/ray/gcs/gcs_server/task_info_handler_impl.h b/src/ray/gcs/gcs_server/task_info_handler_impl.h index c32eb4894ebc..9da6a5244ede 100644 --- a/src/ray/gcs/gcs_server/task_info_handler_impl.h +++ b/src/ray/gcs/gcs_server/task_info_handler_impl.h @@ -25,8 +25,8 @@ namespace rpc { class DefaultTaskInfoHandler : public rpc::TaskInfoHandler { public: explicit DefaultTaskInfoHandler(std::shared_ptr gcs_table_storage, - std::shared_ptr &gcs_pub_sub) - : gcs_table_storage_(gcs_table_storage), gcs_pub_sub_(gcs_pub_sub) {} + std::shared_ptr &gcs_publisher) + : gcs_table_storage_(gcs_table_storage), gcs_publisher_(gcs_publisher) {} void HandleAddTask(const AddTaskRequest &request, AddTaskReply *reply, SendReplyCallback send_reply_callback) override; @@ -48,7 +48,7 @@ class DefaultTaskInfoHandler : public rpc::TaskInfoHandler { private: std::shared_ptr gcs_table_storage_; - std::shared_ptr &gcs_pub_sub_; + std::shared_ptr &gcs_publisher_; // Debug info. enum CountType { diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index f43d40dd392a..50c7273022de 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -102,11 +102,12 @@ class GcsActorManagerTest : public ::testing::Test { worker_client_ = std::make_shared(io_service_); runtime_env_mgr_ = std::make_unique([](auto, auto f) { f(true); }); - gcs_pub_sub_ = std::make_shared(redis_client_); + gcs_publisher_ = std::make_shared( + std::make_unique(redis_client_)); store_client_ = std::make_shared(io_service_); gcs_table_storage_ = std::make_shared(io_service_); gcs_actor_manager_.reset(new gcs::GcsActorManager( - io_service_, mock_actor_scheduler_, gcs_table_storage_, gcs_pub_sub_, + io_service_, mock_actor_scheduler_, gcs_table_storage_, gcs_publisher_, *runtime_env_mgr_, [](const ActorID &actor_id) {}, [this](const JobID &job_id) { return job_namespace_table_[job_id]; }, [this](std::function fn, boost::posix_time::milliseconds delay) { @@ -204,7 +205,7 @@ class GcsActorManagerTest : public ::testing::Test { std::shared_ptr worker_client_; std::unordered_map job_namespace_table_; std::unique_ptr gcs_actor_manager_; - std::shared_ptr gcs_pub_sub_; + std::shared_ptr gcs_publisher_; std::shared_ptr redis_client_; std::unique_ptr runtime_env_mgr_; const std::chrono::milliseconds timeout_ms_{2000}; diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc index 8339bc4b0929..22a0906c9b1c 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_scheduler_mock_test.cc @@ -21,7 +21,6 @@ #include "mock/ray/gcs/gcs_server/gcs_node_manager.h" #include "mock/ray/raylet_client/raylet_client.h" #include "mock/ray/pubsub/subscriber.h" -#include "mock/ray/gcs/pubsub/gcs_pub_sub.h" #include "mock/ray/rpc/worker/core_worker_client.h" // clang-format on using namespace ::testing; @@ -39,13 +38,12 @@ class GcsActorSchedulerTest : public Test { store_client = std::make_shared(); actor_table = std::make_unique(store_client); gcs_node_manager = std::make_unique(); - pub_sub = std::make_shared(); raylet_client = std::make_shared(); core_worker_client = std::make_shared(); client_pool = std::make_shared( [this](const rpc::Address &) { return raylet_client; }); actor_scheduler = std::make_unique( - io_context, *actor_table, *gcs_node_manager, pub_sub, + io_context, *actor_table, *gcs_node_manager, [this](auto a) { schedule_failure_handler(a); }, [this](auto a, const rpc::PushTaskReply) { schedule_success_handler(a); }, client_pool, [this](const rpc::Address &) { return core_worker_client; }); @@ -62,7 +60,6 @@ class GcsActorSchedulerTest : public Test { std::unique_ptr actor_table; std::unique_ptr actor_scheduler; std::unique_ptr gcs_node_manager; - std::shared_ptr pub_sub; std::shared_ptr core_worker_client; std::shared_ptr client_pool; MockCallback schedule_failure_handler; diff --git a/src/ray/gcs/gcs_server/test/gcs_based_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_based_actor_scheduler_test.cc index 48793907f117..d9d46eb0bb60 100644 --- a/src/ray/gcs/gcs_server/test/gcs_based_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_based_actor_scheduler_test.cc @@ -29,23 +29,24 @@ class GcsBasedActorSchedulerTest : public ::testing::Test { void SetUp() override { raylet_client_ = std::make_shared(); worker_client_ = std::make_shared(); - gcs_pub_sub_ = std::make_shared(redis_client_); + gcs_publisher_ = std::make_shared( + std::make_unique(redis_client_)); gcs_table_storage_ = std::make_shared(redis_client_); gcs_node_manager_ = - std::make_shared(gcs_pub_sub_, gcs_table_storage_); + std::make_shared(gcs_publisher_, gcs_table_storage_); store_client_ = std::make_shared(io_service_); gcs_actor_table_ = std::make_shared(store_client_); raylet_client_pool_ = std::make_shared( [this](const rpc::Address &addr) { return raylet_client_; }); gcs_resource_manager_ = std::make_shared( - io_service_, gcs_pub_sub_, gcs_table_storage_, true); + io_service_, gcs_publisher_, gcs_table_storage_, true); auto resource_scheduler = std::make_shared(*gcs_resource_manager_); gcs_actor_scheduler_ = std::make_shared( - io_service_, *gcs_actor_table_, *gcs_node_manager_, gcs_pub_sub_, - gcs_resource_manager_, resource_scheduler, + io_service_, *gcs_actor_table_, *gcs_node_manager_, gcs_resource_manager_, + resource_scheduler, /*schedule_failure_handler=*/ [this](std::shared_ptr actor) { failure_actors_.emplace_back(std::move(actor)); @@ -99,7 +100,7 @@ class GcsBasedActorSchedulerTest : public ::testing::Test { std::shared_ptr gcs_actor_scheduler_; std::vector> success_actors_; std::vector> failure_actors_; - std::shared_ptr gcs_pub_sub_; + std::shared_ptr gcs_publisher_; std::shared_ptr gcs_table_storage_; std::shared_ptr redis_client_; std::shared_ptr raylet_client_pool_; diff --git a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc index b85b594e9a46..592de4dad3ab 100644 --- a/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_node_manager_test.cc @@ -22,17 +22,18 @@ namespace ray { class GcsNodeManagerTest : public ::testing::Test { public: GcsNodeManagerTest() { - gcs_pub_sub_ = std::make_shared(redis_client_); + gcs_publisher_ = std::make_shared( + std::make_unique(redis_client_)); } protected: - std::shared_ptr gcs_pub_sub_; std::shared_ptr redis_client_; std::shared_ptr gcs_table_storage_; + std::shared_ptr gcs_publisher_; }; TEST_F(GcsNodeManagerTest, TestManagement) { - gcs::GcsNodeManager node_manager(gcs_pub_sub_, gcs_table_storage_); + gcs::GcsNodeManager node_manager(gcs_publisher_, gcs_table_storage_); // Test Add/Get/Remove functionality. auto node = Mocker::GenNodeInfo(); auto node_id = NodeID::FromBinary(node->node_id()); @@ -45,7 +46,7 @@ TEST_F(GcsNodeManagerTest, TestManagement) { } TEST_F(GcsNodeManagerTest, TestListener) { - gcs::GcsNodeManager node_manager(gcs_pub_sub_, gcs_table_storage_); + gcs::GcsNodeManager node_manager(gcs_publisher_, gcs_table_storage_); // Test AddNodeAddedListener. int node_count = 1000; std::vector> added_nodes; diff --git a/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc index ea366d856958..c53bc2c48413 100644 --- a/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_object_manager_test.cc @@ -24,9 +24,9 @@ namespace ray { class MockedGcsObjectManager : public gcs::GcsObjectManager { public: explicit MockedGcsObjectManager(std::shared_ptr gcs_table_storage, - std::shared_ptr &gcs_pub_sub, + std::shared_ptr &gcs_publisher, gcs::GcsNodeManager &gcs_node_manager) - : gcs::GcsObjectManager(gcs_table_storage, gcs_pub_sub, gcs_node_manager) {} + : gcs::GcsObjectManager(gcs_table_storage, gcs_publisher, gcs_node_manager) {} public: void AddObjectsLocation(const NodeID &node_id, @@ -56,9 +56,9 @@ class GcsObjectManagerTest : public ::testing::Test { void SetUp() override { gcs_table_storage_ = std::make_shared(io_service_); gcs_node_manager_ = - std::make_shared(gcs_pub_sub_, gcs_table_storage_); + std::make_shared(gcs_publisher_, gcs_table_storage_); gcs_object_manager_ = std::make_shared( - gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_); + gcs_table_storage_, gcs_publisher_, *gcs_node_manager_); GenTestData(); } @@ -85,7 +85,7 @@ class GcsObjectManagerTest : public ::testing::Test { protected: instrumented_io_context io_service_; std::shared_ptr gcs_node_manager_; - std::shared_ptr gcs_pub_sub_; + std::shared_ptr gcs_publisher_; std::shared_ptr gcs_object_manager_; std::shared_ptr gcs_table_storage_; diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc index 8eeed97f7eca..3120d34a093d 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_manager_test.cc @@ -71,7 +71,8 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { public: GcsPlacementGroupManagerTest() : mock_placement_group_scheduler_(new MockPlacementGroupScheduler()) { - gcs_pub_sub_ = std::make_shared(redis_client_); + gcs_publisher_ = std::make_shared( + std::make_unique(redis_client_)); gcs_table_storage_ = std::make_shared(io_service_); gcs_resource_manager_ = std::make_shared(io_service_, nullptr, nullptr, true); @@ -147,7 +148,7 @@ class GcsPlacementGroupManagerTest : public ::testing::Test { instrumented_io_context io_service_; std::shared_ptr gcs_table_storage_; std::shared_ptr gcs_resource_manager_; - std::shared_ptr gcs_pub_sub_; + std::shared_ptr gcs_publisher_; std::shared_ptr redis_client_; }; diff --git a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc index 286337811469..bbc6d106a217 100644 --- a/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_placement_group_scheduler_test.cc @@ -39,13 +39,14 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { raylet_clients_.push_back(std::make_shared()); } gcs_table_storage_ = std::make_shared(io_service_); - gcs_pub_sub_ = std::make_shared(redis_client_); + gcs_publisher_ = std::make_shared( + std::make_unique(redis_client_)); gcs_resource_manager_ = std::make_shared(io_service_, nullptr, nullptr, true); gcs_resource_scheduler_ = std::make_shared(*gcs_resource_manager_); gcs_node_manager_ = - std::make_shared(gcs_pub_sub_, gcs_table_storage_); + std::make_shared(gcs_publisher_, gcs_table_storage_); gcs_table_storage_ = std::make_shared(io_service_); store_client_ = std::make_shared(io_service_); raylet_client_pool_ = std::make_shared( @@ -219,7 +220,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test { GUARDED_BY(placement_group_requests_mutex_); std::vector> failure_placement_groups_ GUARDED_BY(placement_group_requests_mutex_); - std::shared_ptr gcs_pub_sub_; + std::shared_ptr gcs_publisher_; std::shared_ptr gcs_table_storage_; std::shared_ptr redis_client_; std::shared_ptr raylet_client_pool_; diff --git a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h index 11f0783bb846..8ddc6025d5bb 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_test_util.h +++ b/src/ray/gcs/gcs_server/test/gcs_server_test_util.h @@ -451,7 +451,7 @@ struct GcsServerMocker { MockGcsPubSub(std::shared_ptr redis_client) : GcsPubSub(redis_client) {} - Status Publish(const std::string &channel, const std::string &id, + Status Publish(std::string_view channel, const std::string &id, const std::string &data, const gcs::StatusCallback &done) override { return Status::OK(); } diff --git a/src/ray/gcs/gcs_server/test/raylet_based_actor_scheduler_test.cc b/src/ray/gcs/gcs_server/test/raylet_based_actor_scheduler_test.cc index a4ffce3d0e09..242cbe8a197c 100644 --- a/src/ray/gcs/gcs_server/test/raylet_based_actor_scheduler_test.cc +++ b/src/ray/gcs/gcs_server/test/raylet_based_actor_scheduler_test.cc @@ -25,10 +25,11 @@ class RayletBasedActorSchedulerTest : public ::testing::Test { void SetUp() override { raylet_client_ = std::make_shared(); worker_client_ = std::make_shared(); - gcs_pub_sub_ = std::make_shared(redis_client_); + gcs_publisher_ = std::make_shared( + std::make_unique(redis_client_)); gcs_table_storage_ = std::make_shared(redis_client_); gcs_node_manager_ = - std::make_shared(gcs_pub_sub_, gcs_table_storage_); + std::make_shared(gcs_publisher_, gcs_table_storage_); store_client_ = std::make_shared(io_service_); gcs_actor_table_ = std::make_shared(store_client_); @@ -36,7 +37,7 @@ class RayletBasedActorSchedulerTest : public ::testing::Test { [this](const rpc::Address &addr) { return raylet_client_; }); gcs_actor_scheduler_ = std::make_shared( - io_service_, *gcs_actor_table_, *gcs_node_manager_, gcs_pub_sub_, + io_service_, *gcs_actor_table_, *gcs_node_manager_, /*schedule_failure_handler=*/ [this](std::shared_ptr actor) { failure_actors_.emplace_back(std::move(actor)); @@ -61,7 +62,7 @@ class RayletBasedActorSchedulerTest : public ::testing::Test { std::shared_ptr gcs_actor_scheduler_; std::vector> success_actors_; std::vector> failure_actors_; - std::shared_ptr gcs_pub_sub_; + std::shared_ptr gcs_publisher_; std::shared_ptr gcs_table_storage_; std::shared_ptr redis_client_; std::shared_ptr raylet_client_pool_; diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.cc b/src/ray/gcs/pubsub/gcs_pub_sub.cc index 68930f857281..e2c2fa9ea572 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.cc +++ b/src/ray/gcs/pubsub/gcs_pub_sub.cc @@ -14,10 +14,12 @@ #include "ray/gcs/pubsub/gcs_pub_sub.h" +#include "absl/strings/str_cat.h" + namespace ray { namespace gcs { -Status GcsPubSub::Publish(const std::string &channel, const std::string &id, +Status GcsPubSub::Publish(std::string_view channel, const std::string &id, const std::string &data, const StatusCallback &done) { rpc::PubSubMessage message; message.set_id(id); @@ -30,22 +32,20 @@ Status GcsPubSub::Publish(const std::string &channel, const std::string &id, }; return redis_client_->GetPrimaryContext()->PublishAsync( - GenChannelPattern(channel, boost::optional(id)), - message.SerializeAsString(), on_done); + GenChannelPattern(channel, id), message.SerializeAsString(), on_done); } -Status GcsPubSub::Subscribe(const std::string &channel, const std::string &id, +Status GcsPubSub::Subscribe(std::string_view channel, const std::string &id, const Callback &subscribe, const StatusCallback &done) { - return SubscribeInternal(channel, subscribe, done, false, - boost::optional(id)); + return SubscribeInternal(channel, subscribe, done, id); } -Status GcsPubSub::SubscribeAll(const std::string &channel, const Callback &subscribe, +Status GcsPubSub::SubscribeAll(std::string_view channel, const Callback &subscribe, const StatusCallback &done) { - return SubscribeInternal(channel, subscribe, done, true); + return SubscribeInternal(channel, subscribe, done, std::nullopt); } -Status GcsPubSub::Unsubscribe(const std::string &channel_name, const std::string &id) { +Status GcsPubSub::Unsubscribe(std::string_view channel_name, const std::string &id) { std::string pattern = GenChannelPattern(channel_name, id); absl::MutexLock lock(&mutex_); @@ -59,10 +59,9 @@ Status GcsPubSub::Unsubscribe(const std::string &channel_name, const std::string return ExecuteCommandIfPossible(channel->first, channel->second); } -Status GcsPubSub::SubscribeInternal(const std::string &channel_name, +Status GcsPubSub::SubscribeInternal(std::string_view channel_name, const Callback &subscribe, const StatusCallback &done, - bool is_sub_or_unsub_all, - const boost::optional &id) { + const std::optional &id) { std::string pattern = GenChannelPattern(channel_name, id); absl::MutexLock lock(&mutex_); @@ -74,7 +73,8 @@ Status GcsPubSub::SubscribeInternal(const std::string &channel_name, } // Add the SUBSCRIBE command to the queue. - channel->second.command_queue.push_back(Command(subscribe, done, is_sub_or_unsub_all)); + channel->second.command_queue.push_back( + Command(subscribe, done, /*is_sub_or_unsub_all=*/!id.has_value())); total_commands_queued_++; // Process the first command on the queue, if possible. @@ -178,19 +178,18 @@ Status GcsPubSub::ExecuteCommandIfPossible(const std::string &channel_key, return status; } -std::string GcsPubSub::GenChannelPattern(const std::string &channel, - const boost::optional &id) { - std::stringstream pattern; - pattern << channel << ":"; +std::string GcsPubSub::GenChannelPattern(std::string_view channel, + const std::optional &id) { + std::string pattern = absl::StrCat(channel, ":"); if (id) { - pattern << *id; + absl::StrAppend(&pattern, *id); } else { - pattern << "*"; + absl::StrAppend(&pattern, "*"); } - return pattern.str(); + return pattern; } -bool GcsPubSub::IsUnsubscribed(const std::string &channel, const std::string &id) { +bool GcsPubSub::IsUnsubscribed(std::string_view channel, const std::string &id) { std::string pattern = GenChannelPattern(channel, id); absl::MutexLock lock(&mutex_); @@ -206,5 +205,63 @@ std::string GcsPubSub::DebugString() const { return stream.str(); } +Status GcsPublisher::PublishObject(const ObjectID &id, + const rpc::ObjectLocationChange &message, + const StatusCallback &done) { + return pubsub_->Publish(OBJECT_CHANNEL, id.Hex(), message.SerializeAsString(), done); +} + +Status GcsPublisher::PublishActor(const ActorID &id, const rpc::ActorTableData &message, + const StatusCallback &done) { + return pubsub_->Publish(ACTOR_CHANNEL, id.Hex(), message.SerializeAsString(), done); +} + +Status GcsPublisher::PublishJob(const JobID &id, const rpc::JobTableData &message, + const StatusCallback &done) { + return pubsub_->Publish(JOB_CHANNEL, id.Hex(), message.SerializeAsString(), done); +} + +Status GcsPublisher::PublishNodeInfo(const NodeID &id, const rpc::GcsNodeInfo &message, + const StatusCallback &done) { + return pubsub_->Publish(NODE_CHANNEL, id.Hex(), message.SerializeAsString(), done); +} + +Status GcsPublisher::PublishNodeResource(const NodeID &id, + const rpc::NodeResourceChange &message, + const StatusCallback &done) { + return pubsub_->Publish(NODE_RESOURCE_CHANNEL, id.Hex(), message.SerializeAsString(), + done); +} + +Status GcsPublisher::PublishResourceBatch(const rpc::ResourceUsageBatchData &message, + const StatusCallback &done) { + return pubsub_->Publish(RESOURCES_BATCH_CHANNEL, "", message.SerializeAsString(), done); +} + +Status GcsPublisher::PublishWorkerFailure(const WorkerID &id, + const rpc::WorkerDeltaData &message, + const StatusCallback &done) { + return pubsub_->Publish(WORKER_CHANNEL, id.Hex(), message.SerializeAsString(), done); +} + +Status GcsPublisher::PublishTaskLease(const TaskID &id, const rpc::TaskLeaseData &message, + const StatusCallback &done) { + return pubsub_->Publish(TASK_LEASE_CHANNEL, id.Hex(), message.SerializeAsString(), + done); +} + +Status GcsPublisher::PublishError(const std::string &id, + const rpc::ErrorTableData &message, + const StatusCallback &done) { + return pubsub_->Publish(ERROR_INFO_CHANNEL, id, message.SerializeAsString(), done); +} + +std::string GcsPublisher::DebugString() const { + if (pubsub_) { + return pubsub_->DebugString(); + } + return "GcsPublisher {}"; +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.h b/src/ray/gcs/pubsub/gcs_pub_sub.h index 70828a367969..d272516ee789 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.h +++ b/src/ray/gcs/pubsub/gcs_pub_sub.h @@ -14,26 +14,32 @@ #pragma once +#include +#include +#include + #include "absl/container/flat_hash_map.h" #include "absl/synchronization/mutex.h" #include "ray/gcs/callback.h" #include "ray/gcs/redis_client.h" #include "ray/gcs/redis_context.h" +#include "ray/pubsub/publisher.h" #include "src/ray/protobuf/gcs.pb.h" +#include "src/ray/protobuf/gcs_service.pb.h" namespace ray { namespace gcs { -#define JOB_CHANNEL "JOB" -#define NODE_CHANNEL "NODE" -#define NODE_RESOURCE_CHANNEL "NODE_RESOURCE" -#define ACTOR_CHANNEL "ACTOR" -#define WORKER_CHANNEL "WORKER" -#define OBJECT_CHANNEL "OBJECT" -#define TASK_CHANNEL "TASK" -#define TASK_LEASE_CHANNEL "TASK_LEASE" -#define RESOURCES_BATCH_CHANNEL "RESOURCES_BATCH" -#define ERROR_INFO_CHANNEL "ERROR_INFO" +inline constexpr std::string_view JOB_CHANNEL = "JOB"; +inline constexpr std::string_view NODE_CHANNEL = "NODE"; +inline constexpr std::string_view NODE_RESOURCE_CHANNEL = "NODE_RESOURCE"; +inline constexpr std::string_view ACTOR_CHANNEL = "ACTOR"; +inline constexpr std::string_view WORKER_CHANNEL = "WORKER"; +inline constexpr std::string_view OBJECT_CHANNEL = "OBJECT"; +inline constexpr std::string_view TASK_CHANNEL = "TASK"; +inline constexpr std::string_view TASK_LEASE_CHANNEL = "TASK_LEASE"; +inline constexpr std::string_view RESOURCES_BATCH_CHANNEL = "RESOURCES_BATCH"; +inline constexpr std::string_view ERROR_INFO_CHANNEL = "ERROR_INFO"; /// \class GcsPubSub /// @@ -44,7 +50,7 @@ class GcsPubSub { /// The callback is called when a subscription message is received. using Callback = std::function; - explicit GcsPubSub(std::shared_ptr redis_client) + explicit GcsPubSub(const std::shared_ptr &redis_client) : redis_client_(redis_client), total_commands_queued_(0) {} virtual ~GcsPubSub() = default; @@ -56,7 +62,7 @@ class GcsPubSub { /// \param data The data of message to be published to redis. /// \param done Callback that will be called when the message is published to redis. /// \return Status - virtual Status Publish(const std::string &channel, const std::string &id, + virtual Status Publish(std::string_view channel, const std::string &id, const std::string &data, const StatusCallback &done); /// Subscribe to messages with the specified ID under the specified channel. @@ -67,7 +73,7 @@ class GcsPubSub { /// received. /// \param done Callback that will be called when subscription is complete. /// \return Status - Status Subscribe(const std::string &channel, const std::string &id, + Status Subscribe(std::string_view channel, const std::string &id, const Callback &subscribe, const StatusCallback &done); /// Subscribe to messages with the specified channel. @@ -77,7 +83,7 @@ class GcsPubSub { /// received. /// \param done Callback that will be called when subscription is complete. /// \return Status - Status SubscribeAll(const std::string &channel, const Callback &subscribe, + Status SubscribeAll(std::string_view channel, const Callback &subscribe, const StatusCallback &done); /// Unsubscribe to messages with the specified ID under the specified channel. @@ -85,14 +91,14 @@ class GcsPubSub { /// \param channel The channel to unsubscribe from redis. /// \param id The id of message to be unsubscribed from redis. /// \return Status - Status Unsubscribe(const std::string &channel, const std::string &id); + Status Unsubscribe(std::string_view channel, const std::string &id); /// Check if the specified ID under the specified channel is unsubscribed. /// /// \param channel The channel to unsubscribe from redis. /// \param id The id of message to be unsubscribed from redis. /// \return Whether the specified ID under the specified channel is unsubscribed. - bool IsUnsubscribed(const std::string &channel, const std::string &id); + bool IsUnsubscribed(std::string_view channel, const std::string &id); std::string DebugString() const; @@ -155,12 +161,12 @@ class GcsPubSub { GcsPubSub::Channel &channel) EXCLUSIVE_LOCKS_REQUIRED(mutex_); - Status SubscribeInternal(const std::string &channel_name, const Callback &subscribe, - const StatusCallback &done, bool is_sub_or_unsub_all, - const boost::optional &id = boost::none); + Status SubscribeInternal(std::string_view channel_name, const Callback &subscribe, + const StatusCallback &done, + const std::optional &id); - std::string GenChannelPattern(const std::string &channel, - const boost::optional &id); + std::string GenChannelPattern(std::string_view channel, + const std::optional &id); std::shared_ptr redis_client_; @@ -172,5 +178,80 @@ class GcsPubSub { size_t total_commands_queued_ GUARDED_BY(mutex_); }; +/// \class GcsPublisher +/// +/// Supports publishing per-entity data and errors from GCS. Thread safe. +class GcsPublisher { + public: + /// Initializes GcsPublisher with both Redis and GCS based publishers. + /// Publish*() member functions below would be incrementally converted to use the GCS + /// based publisher, if available. + GcsPublisher(const std::shared_ptr &redis_client, + std::unique_ptr publisher) + : pubsub_(std::make_unique(redis_client)), + publisher_(std::move(publisher)) {} + + /// Test only. + /// Initializes GcsPublisher with GcsPubSub, usually a mock. + /// TODO: remove this constructor and inject mock / fake from the other constructor. + explicit GcsPublisher(std::unique_ptr pubsub) : pubsub_(std::move(pubsub)) {} + + /// Each publishing method below publishes to a different "channel". + /// ID is the entity which the message is associated with, e.g. ActorID for Actor data. + /// Subscribers receive typed messages for the ID that they subscribe to. + /// + /// The full stream of NodeResource and Error channels are needed by its subscribers. + /// But for other channels, subscribers should only need the latest data. + /// + /// TODO: Verify GCS pubsub satisfies the streaming semantics. + /// TODO: Implement optimization for channels where only latest data per ID is useful. + + /// Uses Redis pubsub. + Status PublishActor(const ActorID &id, const rpc::ActorTableData &message, + const StatusCallback &done); + + /// Uses Redis pubsub. + Status PublishJob(const JobID &id, const rpc::JobTableData &message, + const StatusCallback &done); + + /// Uses Redis pubsub. + Status PublishNodeInfo(const NodeID &id, const rpc::GcsNodeInfo &message, + const StatusCallback &done); + + /// Uses Redis pubsub. + Status PublishNodeResource(const NodeID &id, const rpc::NodeResourceChange &message, + const StatusCallback &done); + + /// Actually rpc::WorkerDeltaData is not a delta message. + /// Uses Redis pubsub. + Status PublishWorkerFailure(const WorkerID &id, const rpc::WorkerDeltaData &message, + const StatusCallback &done); + + /// Uses Redis pubsub. + Status PublishTaskLease(const TaskID &id, const rpc::TaskLeaseData &message, + const StatusCallback &done); + + /// Uses Redis pubsub. + Status PublishError(const std::string &id, const rpc::ErrorTableData &message, + const StatusCallback &done); + + /// TODO: remove once it is converted to GRPC-based push broadcasting. + /// Uses Redis pubsub. + Status PublishResourceBatch(const rpc::ResourceUsageBatchData &message, + const StatusCallback &done); + + /// TODO: This belongs to a deprecated codepath. Remove this and its callsites. + /// Uses Redis pubsub. + Status PublishObject(const ObjectID &id, const rpc::ObjectLocationChange &message, + const StatusCallback &done); + + /// Prints debugging info for the publisher. + std::string DebugString() const; + + private: + const std::unique_ptr pubsub_; + const std::unique_ptr publisher_; +}; + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/pubsub/test/gcs_pub_sub_test.cc b/src/ray/gcs/pubsub/test/gcs_pub_sub_test.cc index 71beab9e8c4b..239c863530bd 100644 --- a/src/ray/gcs/pubsub/test/gcs_pub_sub_test.cc +++ b/src/ray/gcs/pubsub/test/gcs_pub_sub_test.cc @@ -76,12 +76,12 @@ class GcsPubSubTest : public ::testing::Test { absl::MutexLock lock(&vector_mutex_); result.push_back(std::make_pair(id, data)); }; - RAY_CHECK_OK((pub_sub_->SubscribeAll(channel, subscribe, done))); + RAY_CHECK_OK(pub_sub_->SubscribeAll(channel, subscribe, done)); WaitReady(promise.get_future(), timeout_ms_); } - bool Unsubscribe(const std::string &channel, const std::string &id) { - return pub_sub_->Unsubscribe(channel, id).ok(); + void Unsubscribe(const std::string &channel, const std::string &id) { + RAY_CHECK_OK(pub_sub_->Unsubscribe(channel, id)); } bool Publish(const std::string &channel, const std::string &id, diff --git a/src/ray/pubsub/publisher.h b/src/ray/pubsub/publisher.h index 6ab030bb7902..4704a71f8743 100644 --- a/src/ray/pubsub/publisher.h +++ b/src/ray/pubsub/publisher.h @@ -156,11 +156,11 @@ class PublisherInterface { /// /// \param channel_type The type of the channel. /// \param subscriber_id The node id of the subscriber. - /// \param key_id_binary The key_id that the subscriber is subscribing to. + /// \param key_id The key_id that the subscriber is subscribing to. /// \return True if registration is new. False otherwise. virtual bool RegisterSubscription(const rpc::ChannelType channel_type, const SubscriberID &subscriber_id, - const std::string &key_id_binary) = 0; + const std::string &key_id) = 0; /// Publish the given object id to subscribers. /// @@ -172,20 +172,20 @@ class PublisherInterface { /// It will invoke the failure callback on the subscriber side. /// /// \param channel_type The type of the channel. - /// \param key_id_binary The message id to publish. + /// \param key_id The message id to publish. virtual void PublishFailure(const rpc::ChannelType channel_type, - const std::string &key_id_binary) = 0; + const std::string &key_id) = 0; /// Unregister subscription. It means the given object id won't be published to the /// subscriber anymore. /// /// \param channel_type The type of the channel. /// \param subscriber_id The node id of the subscriber. - /// \param key_id_binary The key_id of the subscriber. + /// \param key_id The key_id of the subscriber. /// \return True if erased. False otherwise. virtual bool UnregisterSubscription(const rpc::ChannelType channel_type, const SubscriberID &subscriber_id, - const std::string &key_id_binary) = 0; + const std::string &key_id) = 0; }; /// Protocol detail @@ -246,11 +246,11 @@ class Publisher : public PublisherInterface { /// /// \param channel_type The type of the channel. /// \param subscriber_id The node id of the subscriber. - /// \param key_id_binary The key_id that the subscriber is subscribing to. + /// \param key_id The key_id that the subscriber is subscribing to. /// \return True if the registration is new. False otherwise. bool RegisterSubscription(const rpc::ChannelType channel_type, const SubscriberID &subscriber_id, - const std::string &key_id_binary) override; + const std::string &key_id) override; /// Publish the given object id to subscribers. /// @@ -262,20 +262,20 @@ class Publisher : public PublisherInterface { /// It will invoke the failure callback on the subscriber side. /// /// \param channel_type The type of the channel. - /// \param key_id_binary The message id to publish. + /// \param key_id The message id to publish. void PublishFailure(const rpc::ChannelType channel_type, - const std::string &key_id_binary) override; + const std::string &key_id) override; /// Unregister subscription. It means the given object id won't be published to the /// subscriber anymore. /// /// \param channel_type The type of the channel. /// \param subscriber_id The node id of the subscriber. - /// \param key_id_binary The key_id of the subscriber. + /// \param key_id The key_id of the subscriber. /// \return True if erased. False otherwise. bool UnregisterSubscription(const rpc::ChannelType channel_type, const SubscriberID &subscriber_id, - const std::string &key_id_binary) override; + const std::string &key_id) override; /// Remove the subscriber. Once the subscriber is removed, messages won't be published /// to it anymore.