Skip to content

Commit

Permalink
[Pubsub] Wrap Redis-based publisher in GCS to allow incrementally swi…
Browse files Browse the repository at this point in the history
…tching to the GCS-based publisher (#19600)

## Why are these changes needed?
The most significant change of the PR is the `GcsPublisher` wrapper added to `src/ray/gcs/pubsub/gcs_pub_sub.h`. It forwards publishing to the underlying `GcsPubSub` (Redis-based) or `pubsub::Publisher` (GCS-based) depending on the migration status, so it allows incremental migration by channel.
   -  Since it was decided that we want to use typed ID and messages for GCS-based publishing, each member function of `GcsPublisher` accepts a typed message.

Most of the modified files are from migrating publishing logic in GCS to use `GcsPublisher` instead of `GcsPubSub`.

Later on, `GcsPublisher` member functions will be migrated to use GCS-based publishing.

This change should make no functionality difference. If this looks ok, a similar change would be made for subscribers in GCS client.

## Related issue number
  • Loading branch information
mwtian authored Oct 22, 2021
1 parent 0760fe8 commit 530f2d7
Show file tree
Hide file tree
Showing 34 changed files with 347 additions and 236 deletions.
17 changes: 7 additions & 10 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -485,20 +485,17 @@ cc_binary(

cc_library(
name = "gcs_pub_sub_lib",
srcs = glob(
[
"src/ray/gcs/pubsub/gcs_pub_sub.cc",
],
),
hdrs = glob(
[
"src/ray/gcs/pubsub/gcs_pub_sub.h",
],
),
srcs = [
"src/ray/gcs/pubsub/gcs_pub_sub.cc",
],
hdrs = [
"src/ray/gcs/pubsub/gcs_pub_sub.h",
],
copts = COPTS,
strip_include_prefix = "src",
deps = [
":gcs",
":pubsub_lib",
":ray_common",
":redis_client",
],
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_actor_distribution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ const ResourceSet &GcsActorWorkerAssignment::GetResources() const {
bool GcsActorWorkerAssignment::IsShared() const { return is_shared_; }

GcsBasedActorScheduler::GcsBasedActorScheduler(
instrumented_io_context &io_context, gcs::GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager, std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
instrumented_io_context &io_context, GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::shared_ptr<GcsResourceManager> gcs_resource_manager,
std::shared_ptr<GcsResourceScheduler> gcs_resource_scheduler,
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsActor>, const rpc::PushTaskReply &reply)>
schedule_success_handler,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
rpc::ClientFactoryFn client_factory)
: GcsActorScheduler(io_context, gcs_actor_table, gcs_node_manager, gcs_pub_sub,
: GcsActorScheduler(io_context, gcs_actor_table, gcs_node_manager,
schedule_failure_handler, schedule_success_handler,
raylet_client_pool, client_factory),
gcs_resource_manager_(std::move(gcs_resource_manager)),
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_actor_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ class GcsBasedActorScheduler : public GcsActorScheduler {
/// \param client_factory Factory to create remote core worker client, default factor
/// will be used if not set.
explicit GcsBasedActorScheduler(
instrumented_io_context &io_context, gcs::GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager, std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
instrumented_io_context &io_context, GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::shared_ptr<GcsResourceManager> gcs_resource_manager,
std::shared_ptr<GcsResourceScheduler> gcs_resource_scheduler,
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,
Expand Down
40 changes: 16 additions & 24 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ void GcsActor::SetActorWorkerAssignment(
GcsActorManager::GcsActorManager(
boost::asio::io_context &io_context,
std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub, RuntimeEnvManager &runtime_env_manager,
std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> gcs_publisher, RuntimeEnvManager &runtime_env_manager,
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
std::function<std::string(const JobID &)> get_ray_namespace,
std::function<void(std::function<void(void)>, boost::posix_time::milliseconds)>
Expand All @@ -119,7 +119,7 @@ GcsActorManager::GcsActorManager(
: io_context_(io_context),
gcs_actor_scheduler_(std::move(scheduler)),
gcs_table_storage_(std::move(gcs_table_storage)),
gcs_pub_sub_(std::move(gcs_pub_sub)),
gcs_publisher_(std::move(gcs_publisher)),
worker_client_factory_(worker_client_factory),
destroy_owned_placement_group_if_needed_(destroy_owned_placement_group_if_needed),
get_ray_namespace_(get_ray_namespace),
Expand Down Expand Up @@ -373,8 +373,8 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
absl::GetCurrentTimeNanos(), job_id);

RAY_LOG(WARNING) << error_data_ptr->SerializeAsString();
RAY_CHECK_OK(gcs_pub_sub_->Publish(ERROR_INFO_CHANNEL, job_id.Hex(),
error_data_ptr->SerializeAsString(), nullptr));
RAY_CHECK_OK(
gcs_publisher_->PublishError(job_id.Hex(), *error_data_ptr, nullptr));
}

actors_in_namespace.emplace(actor->GetName(), actor->GetActorID());
Expand Down Expand Up @@ -420,9 +420,8 @@ Status GcsActorManager::RegisterActor(const ray::rpc::RegisterActorRequest &requ
// the actor state to DEAD to avoid race condition.
return;
}
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor->GetActorID().Hex(),
actor->GetActorTableData().SerializeAsString(),
nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishActor(actor->GetActorID(),
actor->GetActorTableData(), nullptr));
// Invoke all callbacks for all registration requests of this actor (duplicated
// requests are included) and remove all of them from
// actor_to_register_callbacks_.
Expand Down Expand Up @@ -491,8 +490,7 @@ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request,
actor->GetMutableActorTableData()->set_state(rpc::ActorTableData::PENDING_CREATION);
const auto &actor_table_data = actor->GetActorTableData();
// Pub this state for dashboard showing.
RAY_CHECK_OK(gcs_pub_sub_->Publish(ACTOR_CHANNEL, actor_id.Hex(),
actor_table_data.SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishActor(actor_id, actor_table_data, nullptr));
RemoveUnresolvedActor(actor);

// Update the registered actor as its creation task specification may have changed due
Expand Down Expand Up @@ -668,9 +666,8 @@ void GcsActorManager::DestroyActor(const ActorID &actor_id) {
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor->GetActorID(), *actor_table_data,
[this, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
GenActorDataOnlyWithStates(*actor_table_data)->SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(*actor_table_data), nullptr));
// Destroy placement group owned by this actor.
destroy_owned_placement_group_if_needed_(actor_id);
}));
Expand Down Expand Up @@ -867,10 +864,8 @@ void GcsActorManager::ReconstructActor(
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, *mutable_actor_table_data,
[this, actor_id, mutable_actor_table_data](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
GenActorDataOnlyWithStates(*mutable_actor_table_data)->SerializeAsString(),
nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr));
}));
gcs_actor_scheduler_->Schedule(actor);
} else {
Expand Down Expand Up @@ -907,10 +902,8 @@ void GcsActorManager::ReconstructActor(
if (actor->IsDetached()) {
DestroyActor(actor_id);
}
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
GenActorDataOnlyWithStates(*mutable_actor_table_data)->SerializeAsString(),
nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(*mutable_actor_table_data), nullptr));
}));
// The actor is dead, but we should not remove the entry from the
// registered actors yet. If the actor is owned, we will destroy the actor
Expand Down Expand Up @@ -958,9 +951,8 @@ void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr<GcsActor> &ac
RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, actor_table_data,
[this, actor_id, actor_table_data, actor, reply](Status status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
ACTOR_CHANNEL, actor_id.Hex(),
GenActorDataOnlyWithStates(actor_table_data)->SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(actor_table_data), nullptr));
// Invoke all callbacks for all registration requests of this actor (duplicated
// requests are included) and remove all of them from
// actor_to_create_callbacks_.
Expand Down
12 changes: 6 additions & 6 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,12 @@ class GcsActorManager : public rpc::ActorInfoHandler {
///
/// \param scheduler Used to schedule actor creation tasks.
/// \param gcs_table_storage Used to flush actor data to storage.
/// \param gcs_pub_sub Used to publish gcs message.
/// \param gcs_publisher Used to publish gcs message.
GcsActorManager(
boost::asio::io_context &io_context,
std::shared_ptr<GcsActorSchedulerInterface> scheduler,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub, RuntimeEnvManager &runtime_env_manager,
std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> gcs_publisher, RuntimeEnvManager &runtime_env_manager,
std::function<void(const ActorID &)> destroy_ownded_placement_group_if_needed,
std::function<std::string(const JobID &)> get_ray_namespace,
std::function<void(std::function<void(void)>, boost::posix_time::milliseconds)>
Expand Down Expand Up @@ -494,11 +494,11 @@ class GcsActorManager : public rpc::ActorInfoHandler {

boost::asio::io_context &io_context_;
/// The scheduler to schedule all registered actors.
std::shared_ptr<gcs::GcsActorSchedulerInterface> gcs_actor_scheduler_;
std::shared_ptr<GcsActorSchedulerInterface> gcs_actor_scheduler_;
/// Used to update actor information upon creation, deletion, etc.
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsTableStorage> gcs_table_storage_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
std::shared_ptr<GcsPublisher> gcs_publisher_;
/// Factory to produce clients to workers. This is used to communicate with
/// actors and their owners.
rpc::ClientFactoryFn worker_client_factory_;
Expand Down
6 changes: 2 additions & 4 deletions src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ namespace ray {
namespace gcs {

GcsActorScheduler::GcsActorScheduler(
instrumented_io_context &io_context, gcs::GcsActorTable &gcs_actor_table,
const gcs::GcsNodeManager &gcs_node_manager,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
instrumented_io_context &io_context, GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsActor>, const rpc::PushTaskReply &reply)>
schedule_success_handler,
Expand All @@ -35,7 +34,6 @@ GcsActorScheduler::GcsActorScheduler(
: io_context_(io_context),
gcs_actor_table_(gcs_actor_table),
gcs_node_manager_(gcs_node_manager),
gcs_pub_sub_(std::move(gcs_pub_sub)),
schedule_failure_handler_(std::move(schedule_failure_handler)),
schedule_success_handler_(std::move(schedule_success_handler)),
raylet_client_pool_(raylet_client_pool),
Expand Down
6 changes: 2 additions & 4 deletions src/ray/gcs/gcs_server/gcs_actor_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
/// \param client_factory Factory to create remote core worker client, default factor
/// will be used if not set.
explicit GcsActorScheduler(
instrumented_io_context &io_context, gcs::GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager, std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
instrumented_io_context &io_context, GcsActorTable &gcs_actor_table,
const GcsNodeManager &gcs_node_manager,
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler,
std::function<void(std::shared_ptr<GcsActor>, const rpc::PushTaskReply &reply)>
schedule_success_handler,
Expand Down Expand Up @@ -305,8 +305,6 @@ class GcsActorScheduler : public GcsActorSchedulerInterface {
node_to_workers_when_creating_;
/// Reference of GcsNodeManager.
const GcsNodeManager &gcs_node_manager_;
/// A publisher for publishing gcs messages.
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
/// The handler to handle the scheduling failures.
std::function<void(std::shared_ptr<GcsActor>)> schedule_failure_handler_;
/// The handler to handle the successful scheduling.
Expand Down
10 changes: 3 additions & 7 deletions src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ void GcsJobManager::HandleAddJob(const rpc::AddJobRequest &request,
RAY_LOG(ERROR) << "Failed to add job, job id = " << job_id
<< ", driver pid = " << mutable_job_table_data.driver_pid();
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Hex(),
mutable_job_table_data.SerializeAsString(),
nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, mutable_job_table_data, nullptr));
if (mutable_job_table_data.config().has_runtime_env()) {
runtime_env_manager_.AddURIReference(
job_id.Hex(), mutable_job_table_data.config().runtime_env());
Expand Down Expand Up @@ -81,8 +79,7 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to mark job state, job id = " << job_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(JOB_CHANNEL, job_id.Hex(),
job_table_data.SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishJob(job_id, job_table_data, nullptr));
runtime_env_manager_.RemoveURIReference(job_id.Hex());
ClearJobInfos(job_id);
RAY_LOG(INFO) << "Finished marking job state, job id = " << job_id;
Expand Down Expand Up @@ -159,8 +156,7 @@ void GcsJobManager::HandleReportJobError(const rpc::ReportJobErrorRequest &reque
rpc::ReportJobErrorReply *reply,
rpc::SendReplyCallback send_reply_callback) {
auto job_id = JobID::FromBinary(request.job_error().job_id());
RAY_CHECK_OK(gcs_pub_sub_->Publish(ERROR_INFO_CHANNEL, job_id.Hex(),
request.job_error().SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishError(job_id.Hex(), request.job_error(), nullptr));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

Expand Down
10 changes: 5 additions & 5 deletions src/ray/gcs/gcs_server/gcs_job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ namespace gcs {
/// This implementation class of `JobInfoHandler`.
class GcsJobManager : public rpc::JobInfoHandler {
public:
explicit GcsJobManager(std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
explicit GcsJobManager(std::shared_ptr<GcsTableStorage> gcs_table_storage,
std::shared_ptr<GcsPublisher> gcs_publisher,
RuntimeEnvManager &runtime_env_manager)
: gcs_table_storage_(std::move(gcs_table_storage)),
gcs_pub_sub_(std::move(gcs_pub_sub)),
gcs_publisher_(std::move(gcs_publisher)),
runtime_env_manager_(runtime_env_manager) {}

void Initialize(const GcsInitData &gcs_init_data);
Expand Down Expand Up @@ -60,8 +60,8 @@ class GcsJobManager : public rpc::JobInfoHandler {
std::string GetRayNamespace(const JobID &job_id) const;

private:
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
std::shared_ptr<GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsPublisher> gcs_publisher_;

/// Listeners which monitors the finish of jobs.
std::vector<std::function<void(std::shared_ptr<JobID>)>> job_finished_listeners_;
Expand Down
22 changes: 9 additions & 13 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ namespace ray {
namespace gcs {

//////////////////////////////////////////////////////////////////////////////////////////
GcsNodeManager::GcsNodeManager(std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage)
: gcs_pub_sub_(gcs_pub_sub), gcs_table_storage_(gcs_table_storage) {}
GcsNodeManager::GcsNodeManager(std::shared_ptr<GcsPublisher> gcs_publisher,
std::shared_ptr<GcsTableStorage> gcs_table_storage)
: gcs_publisher_(gcs_publisher), gcs_table_storage_(gcs_table_storage) {}

void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request,
rpc::RegisterNodeReply *reply,
Expand All @@ -40,8 +40,7 @@ void GcsNodeManager::HandleRegisterNode(const rpc::RegisterNodeRequest &request,
RAY_CHECK_OK(status);
RAY_LOG(INFO) << "Finished registering node info, node id = " << node_id
<< ", address = " << request.node_info().node_manager_address();
RAY_CHECK_OK(gcs_pub_sub_->Publish(NODE_CHANNEL, node_id.Hex(),
request.node_info().SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, request.node_info(), nullptr));
AddNode(std::make_shared<rpc::GcsNodeInfo>(request.node_info()));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Expand All @@ -68,8 +67,7 @@ void GcsNodeManager::HandleUnregisterNode(const rpc::UnregisterNodeRequest &requ
send_reply_callback](const Status &status) {
auto on_done = [this, node_id, node_info_delta, reply,
send_reply_callback](const Status &status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
NODE_CHANNEL, node_id.Hex(), node_info_delta->SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr));
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
RAY_LOG(INFO) << "Finished unregistering node info, node id = " << node_id;
};
Expand Down Expand Up @@ -159,14 +157,13 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
<< " has been marked dead because the detector"
<< " has missed too many heartbeats from it. This can happen when a "
"raylet crashes unexpectedly or has lagging heartbeats.";
auto error_data_ptr =
gcs::CreateErrorTableData(type, error_message.str(), current_time_ms());
RAY_EVENT(ERROR, EL_RAY_NODE_REMOVED)
.WithField("node_id", node_id.Hex())
.WithField("ip", removed_node->node_manager_address())
<< error_message.str();
RAY_CHECK_OK(gcs_pub_sub_->Publish(ERROR_INFO_CHANNEL, node_id.Hex(),
error_data_ptr->SerializeAsString(), nullptr));
auto error_data_ptr =
gcs::CreateErrorTableData(type, error_message.str(), current_time_ms());
RAY_CHECK_OK(gcs_publisher_->PublishError(node_id.Hex(), *error_data_ptr, nullptr));
}

// Notify all listeners.
Expand All @@ -189,8 +186,7 @@ void GcsNodeManager::OnNodeFailure(const NodeID &node_id) {

auto on_done = [this, node_id, node_info_delta](const Status &status) {
auto on_done = [this, node_id, node_info_delta](const Status &status) {
RAY_CHECK_OK(gcs_pub_sub_->Publish(
NODE_CHANNEL, node_id.Hex(), node_info_delta->SerializeAsString(), nullptr));
RAY_CHECK_OK(gcs_publisher_->PublishNodeInfo(node_id, *node_info_delta, nullptr));
};
RAY_CHECK_OK(gcs_table_storage_->NodeResourceTable().Delete(node_id, on_done));
};
Expand Down
Loading

0 comments on commit 530f2d7

Please sign in to comment.