diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index c48037ac904bb..5991ab6ab0e14 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -146,7 +146,6 @@ CoreWorkerProcessImpl::CoreWorkerProcessImpl(const CoreWorkerOptions &options) CoreWorkerProcessImpl::~CoreWorkerProcessImpl() { RAY_LOG(INFO) << "Destructing CoreWorkerProcessImpl. pid: " << getpid(); - RAY_LOG(DEBUG) << "Stats stop in core worker."; // Shutdown stats module if worker process exits. stats::Shutdown(); if (options_.enable_logging) { diff --git a/src/ray/stats/metric_exporter.cc b/src/ray/stats/metric_exporter.cc index 396770a98e892..4921e9ea52cd0 100644 --- a/src/ray/stats/metric_exporter.cc +++ b/src/ray/stats/metric_exporter.cc @@ -111,6 +111,7 @@ OpenCensusProtoExporter::OpenCensusProtoExporter(const int port, const std::string address, const WorkerID &worker_id) : client_call_manager_(io_service), worker_id_(worker_id) { + absl::MutexLock l(&mu_); client_.reset(new rpc::MetricsAgentClient(address, port, client_call_manager_)); }; @@ -202,15 +203,18 @@ void OpenCensusProtoExporter::ExportViewData( } } - client_->ReportOCMetrics( - request_proto, [](const Status &status, const rpc::ReportOCMetricsReply &reply) { - RAY_UNUSED(reply); - if (!status.ok()) { - RAY_LOG_EVERY_N(WARNING, 10000) - << "Export metrics to agent failed: " << status - << ". This won't affect Ray, but you can lose metrics from the cluster."; - } - }); + { + absl::MutexLock l(&mu_); + client_->ReportOCMetrics( + request_proto, [](const Status &status, const rpc::ReportOCMetricsReply &reply) { + RAY_UNUSED(reply); + if (!status.ok()) { + RAY_LOG_EVERY_N(WARNING, 10000) + << "Export metrics to agent failed: " << status + << ". This won't affect Ray, but you can lose metrics from the cluster."; + } + }); + } } } // namespace stats diff --git a/src/ray/stats/metric_exporter.h b/src/ray/stats/metric_exporter.h index d82342620223f..4212736da3a9d 100644 --- a/src/ray/stats/metric_exporter.h +++ b/src/ray/stats/metric_exporter.h @@ -117,8 +117,10 @@ class OpenCensusProtoExporter final : public opencensus::stats::StatsExporter::H private: /// Call Manager for gRPC client. rpc::ClientCallManager client_call_manager_; + /// Lock to protect the client + mutable absl::Mutex mu_; /// Client to call a metrics agent gRPC server. - std::unique_ptr client_; + std::unique_ptr client_ GUARDED_BY(&mu_); /// The worker ID of the current component. WorkerID worker_id_; }; diff --git a/src/ray/stats/stats.h b/src/ray/stats/stats.h index 6383fac518bc9..ce2f7093859bc 100644 --- a/src/ray/stats/stats.h +++ b/src/ray/stats/stats.h @@ -125,6 +125,7 @@ static inline void Shutdown() { metrics_io_service_pool = nullptr; exporter = nullptr; StatsConfig::instance().SetIsInitialized(false); + RAY_LOG(INFO) << "Stats module has shutdown."; } } // namespace stats