diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index b84d91909dad1..329fda454b125 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -783,7 +783,7 @@ void CoreWorker::Exit( exit_type, detail = std::move(detail), creation_task_exception_pb_bytes]() { - rpc::DrainAndResetServerCallExecutor(); + rpc::DrainServerCallExecutor(); Disconnect(exit_type, detail, creation_task_exception_pb_bytes); KillChildProcs(); Shutdown(); diff --git a/src/ray/gcs/gcs_client/test/gcs_client_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_test.cc index 6039392ca032d..d3baeeb964d02 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_test.cc @@ -105,13 +105,14 @@ class GcsClientTest : public ::testing::TestWithParam { gcs_client_.reset(); server_io_service_->stop(); - rpc::DrainAndResetServerCallExecutor(); + rpc::DrainServerCallExecutor(); server_io_service_thread_->join(); gcs_server_->Stop(); gcs_server_.reset(); if (!no_redis_) { TestSetupUtil::FlushAllRedisServers(); } + rpc::ResetServerCallExecutor(); } void RestartGcsServer() { diff --git a/src/ray/gcs/gcs_server/gcs_server_main.cc b/src/ray/gcs/gcs_server/gcs_server_main.cc index 682830597bc3c..151a56efdc645 100644 --- a/src/ray/gcs/gcs_server/gcs_server_main.cc +++ b/src/ray/gcs/gcs_server/gcs_server_main.cc @@ -107,7 +107,7 @@ int main(int argc, char *argv[]) { int signal_number) { RAY_LOG(INFO) << "GCS server received SIGTERM, shutting down..."; main_service.stop(); - ray::rpc::DrainAndResetServerCallExecutor(); + ray::rpc::DrainServerCallExecutor(); gcs_server.Stop(); ray::stats::Shutdown(); }; diff --git a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc index cabad9872701a..cf5078762e1fd 100644 --- a/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc @@ -59,11 +59,12 @@ class GcsServerTest : public ::testing::Test { void TearDown() override { io_service_.stop(); - rpc::DrainAndResetServerCallExecutor(); + rpc::DrainServerCallExecutor(); gcs_server_->Stop(); thread_io_service_->join(); gcs_server_.reset(); ray::gcs::RedisCallbackManager::instance().Clear(); + rpc::ResetServerCallExecutor(); } bool AddJob(const rpc::AddJobRequest &request) { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index ca0b4e015cd7f..4b0593ea85158 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2028,7 +2028,7 @@ void NodeManager::HandleShutdownRaylet(rpc::ShutdownRayletRequest request, return; } auto shutdown_after_reply = []() { - rpc::DrainAndResetServerCallExecutor(); + rpc::DrainServerCallExecutor(); // Note that the callback is posted to the io service after the shutdown GRPC request // is replied. Otherwise, the RPC might not be replied to GCS before it shutsdown // itself. Implementation note: When raylet is shutdown by ray stop, the CLI sends a diff --git a/src/ray/rpc/server_call.cc b/src/ray/rpc/server_call.cc index b28317598e051..2f432999b29e0 100644 --- a/src/ray/rpc/server_call.cc +++ b/src/ray/rpc/server_call.cc @@ -30,8 +30,9 @@ std::unique_ptr &_GetServerCallExecutor() { boost::asio::thread_pool &GetServerCallExecutor() { return *_GetServerCallExecutor(); } -void DrainAndResetServerCallExecutor() { - GetServerCallExecutor().join(); +void DrainServerCallExecutor() { GetServerCallExecutor().join(); } + +void ResetServerCallExecutor() { _GetServerCallExecutor() = std::make_unique( ::RayConfig::instance().num_server_call_thread()); } diff --git a/src/ray/rpc/server_call.h b/src/ray/rpc/server_call.h index 31d078ff78f0b..8242c6b69fe8a 100644 --- a/src/ray/rpc/server_call.h +++ b/src/ray/rpc/server_call.h @@ -32,9 +32,14 @@ namespace rpc { /// This pool is shared across gRPC servers. boost::asio::thread_pool &GetServerCallExecutor(); -/// For testing -/// Drain the executor and reset it. -void DrainAndResetServerCallExecutor(); +/// Drain the executor. +void DrainServerCallExecutor(); + +/// Reset the server call executor. +/// Testing only. After you drain the executor +/// you need to regenerate the executor +/// because they are global. +void ResetServerCallExecutor(); /// Represents the callback function to be called when a `ServiceHandler` finishes /// handling a request.