From 02409f99512a5156fcba37c0686cdda85161729d Mon Sep 17 00:00:00 2001 From: Evgeniy Ivanov Date: Tue, 19 Nov 2024 20:25:40 +0100 Subject: [PATCH] Add GRPC thread CPU time metric --- ydb/core/driver_lib/run/run.cpp | 8 ++-- ydb/core/grpc_streaming/grpc_streaming_ut.cpp | 5 ++- ydb/core/http_proxy/ut/datastreams_fixture.h | 2 +- ydb/core/http_proxy/ut/proxy_counters.json | 32 ++++++++++++++- ydb/core/testlib/test_client.cpp | 8 ++-- ydb/library/grpc/server/grpc_server.cpp | 40 +++++++++++++++---- ydb/library/grpc/server/grpc_server.h | 10 ++++- .../yql/providers/dq/service/service_node.cpp | 4 +- ydb/library/yql/providers/dq/service/ya.make | 1 + 9 files changed, 88 insertions(+), 22 deletions(-) diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index dafe627e7278..ecbff23a49d2 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -973,13 +973,13 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { sslData.DoRequestClientCertificate = appConfig.GetClientCertificateAuthorization().GetRequestClientCertificate(); sslOpts.SetSslData(sslData); - GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts) }); + GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts, Counters) }); fillFn(grpcConfig, *GRpcServers.back().second, sslOpts); } if (grpcConfig.GetPort()) { - GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts) }); + GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts, Counters) }); fillFn(grpcConfig, *GRpcServers.back().second, opts); } @@ -996,7 +996,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { xopts.SetEndpointId(ex.GetEndpointId()); } - GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts) }); + GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts, Counters) }); fillFn(ex, *GRpcServers.back().second, xopts); } @@ -1035,7 +1035,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { Y_ABORT_UNLESS(xopts.SslData->Cert, "Cert not set"); Y_ABORT_UNLESS(xopts.SslData->Key, "Key not set"); - GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts) }); + GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts, Counters) }); fillFn(ex, *GRpcServers.back().second, xopts); } } diff --git a/ydb/core/grpc_streaming/grpc_streaming_ut.cpp b/ydb/core/grpc_streaming/grpc_streaming_ut.cpp index d6c723a9b27b..f0f63a51505b 100644 --- a/ydb/core/grpc_streaming/grpc_streaming_ut.cpp +++ b/ydb/core/grpc_streaming/grpc_streaming_ut.cpp @@ -95,12 +95,13 @@ class TGRpcTestServer { Server->GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG); + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(MakeIntrusive<::NMonitoring::TDynamicCounters>()); + NYdbGrpc::TServerOptions options; options.SetPort(grpc); - GRpcServer.Reset(new NYdbGrpc::TGRpcServer(options)); + GRpcServer.Reset(new NYdbGrpc::TGRpcServer(options, counters)); auto* as = Server->GetRuntime()->GetAnyNodeActorSystem(); - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(MakeIntrusive<::NMonitoring::TDynamicCounters>()); GRpcServer->AddService(new TStreamingService(as, counters)); GRpcServer->Start(); diff --git a/ydb/core/http_proxy/ut/datastreams_fixture.h b/ydb/core/http_proxy/ut/datastreams_fixture.h index 1bf4b3ff602c..bbf4b0c6c447 100644 --- a/ydb/core/http_proxy/ut/datastreams_fixture.h +++ b/ydb/core/http_proxy/ut/datastreams_fixture.h @@ -888,7 +888,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { actorId = as->Register(NKikimr::NHttpProxy::CreateHttpProxy(httpProxyConfig)); as->RegisterLocalService(MakeHttpProxyID(), actorId); - GRpcServer = MakeHolder(opts); + GRpcServer = MakeHolder(opts, Counters); GRpcServer->AddService(new NKikimr::NHttpProxy::TGRpcDiscoveryService(as, credentialsProvider, Counters)); GRpcServer->Start(); diff --git a/ydb/core/http_proxy/ut/proxy_counters.json b/ydb/core/http_proxy/ut/proxy_counters.json index 05c7fc5da792..7aa27fc765f7 100644 --- a/ydb/core/http_proxy/ut/proxy_counters.json +++ b/ydb/core/http_proxy/ut/proxy_counters.json @@ -105,7 +105,7 @@ "counters": "utils", "component": "grpc" }, - "value": 2, + "value": 6, "kind": "RATE" }, { @@ -117,6 +117,36 @@ "value": 13, "kind": "RATE" }, + { + "kind":"RATE", + "labels": + { + "counters":"grpc", + "sensor":"RequestDestroyed", + "worker":"0" + }, + "value":0 + }, + { + "kind":"RATE", + "labels": + { + "counters":"grpc", + "sensor":"RequestExecuted", + "worker":"0" + }, + "value":0 + }, + { + "kind":"RATE", + "labels": + { + "counters":"grpc", + "sensor":"ThreadCPU", + "worker":"0" + }, + "value":9 + }, { "labels": { "name": "api.grpc.request.bytes", diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index f8552fecb17e..4753095b82d3 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -346,7 +346,10 @@ namespace Tests { } void TServer::EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId) { - GRpcServer.reset(new NYdbGrpc::TGRpcServer(options)); + GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>(); + auto& counters = GRpcServerRootCounters; + + GRpcServer.reset(new NYdbGrpc::TGRpcServer(options, counters)); auto grpcService = new NGRpcProxy::TGRpcService(); auto system(Runtime->GetActorSystem(grpcServiceNodeId)); @@ -378,9 +381,6 @@ namespace Tests { auto grpcMon = system->Register(NGRpcService::CreateGrpcMonService(), TMailboxType::ReadAsFilled, appData.UserPoolId); system->RegisterLocalService(NGRpcService::GrpcMonServiceId(), grpcMon); - GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>(); - auto& counters = GRpcServerRootCounters; - // Setup discovery for typically used services on the node { TIntrusivePtr desc = new NGRpcService::TGrpcEndpointDescription(); diff --git a/ydb/library/grpc/server/grpc_server.cpp b/ydb/library/grpc/server/grpc_server.cpp index cb8432563f60..0bcb82fcd95e 100644 --- a/ydb/library/grpc/server/grpc_server.cpp +++ b/ydb/library/grpc/server/grpc_server.cpp @@ -1,5 +1,8 @@ #include "grpc_server.h" +#include +#include + #include #include #include @@ -18,19 +21,31 @@ namespace NYdbGrpc { -using NThreading::TFuture; - -static void PullEvents(grpc::ServerCompletionQueue* cq) { +static void PullEvents(grpc::ServerCompletionQueue* cq, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) { TThread::SetCurrentThreadName("grpc_server"); + auto okCounter = counters->GetCounter("RequestExecuted", true); + auto errorCounter = counters->GetCounter("RequestDestroyed", true); + auto cpuTime = counters->GetCounter("ThreadCPU", true); + + NMonotonic::TMonotonic lastCpuTimeTs = {}; while (true) { void* tag; // uniquely identifies a request. bool ok; + auto now = NMonotonic::TMonotonic::Now(); + if (now - lastCpuTimeTs >= TDuration::Seconds(1)) { + lastCpuTimeTs = now; + *cpuTime = ThreadCPUTime(); + } + if (cq->Next(&tag, &ok)) { IQueueEvent* const ev(static_cast(tag)); - if (!ev->Execute(ok)) { + if (ev->Execute(ok)) { + okCounter->Inc(); + } else { ev->DestroyRequest(); + errorCounter->Inc(); } } else { break; @@ -103,10 +118,17 @@ void TGrpcServiceProtectiable::DecRequest() { } } -TGRpcServer::TGRpcServer(const TServerOptions& opts) +TGRpcServer::TGRpcServer(const TServerOptions& opts, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) : Options_(opts) + , Counters_(std::move(counters)) , Limiter_(Options_.MaxGlobalRequestInFlight) - {} +{ + // TODO: remove after migration + if (!Counters_) { + // make a temp stub + Counters_.Reset(new ::NMonitoring::TDynamicCounters()); + } +} TGRpcServer::~TGRpcServer() { Y_ABORT_UNLESS(Ts.empty()); @@ -237,10 +259,12 @@ void TGRpcServer::Start() { } Ts.reserve(Options_.WorkerThreads); + auto grpcCounters = Counters_->GetSubgroup("counters", "grpc"); for (size_t i = 0; i < Options_.WorkerThreads; ++i) { auto* cq = &CQS_[i % CQS_.size()]; - Ts.push_back(SystemThreadFactory()->Run([cq] { - PullEvents(cq->get()); + auto workerCounters = grpcCounters->GetSubgroup("worker", ToString(i)); + Ts.push_back(SystemThreadFactory()->Run([cq, workerCounters] { + PullEvents(cq->get(), std::move(workerCounters)); })); } diff --git a/ydb/library/grpc/server/grpc_server.h b/ydb/library/grpc/server/grpc_server.h index 5ab48d2f24fe..459c94d6167d 100644 --- a/ydb/library/grpc/server/grpc_server.h +++ b/ydb/library/grpc/server/grpc_server.h @@ -18,6 +18,10 @@ #include +namespace NMonitoring { + struct TDynamicCounters; +} // NMonitoring + namespace NYdbGrpc { struct TSslData { @@ -349,8 +353,11 @@ class TGrpcServiceBase: public TGrpcServiceProtectiable { class TGRpcServer { public: using IGRpcServicePtr = TIntrusivePtr; - TGRpcServer(const TServerOptions& opts); + + // TODO: remove default nullptr after migration + TGRpcServer(const TServerOptions& opts, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = nullptr); ~TGRpcServer(); + void AddService(IGRpcServicePtr service); void Start(); // Send stop to registred services and call Shutdown on grpc server @@ -365,6 +372,7 @@ class TGRpcServer { using IThreadRef = TAutoPtr; const TServerOptions Options_; + TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; std::unique_ptr Server_; std::vector> CQS_; TVector Ts; diff --git a/ydb/library/yql/providers/dq/service/service_node.cpp b/ydb/library/yql/providers/dq/service/service_node.cpp index fd031666c18f..3a545c43b872 100644 --- a/ydb/library/yql/providers/dq/service/service_node.cpp +++ b/ydb/library/yql/providers/dq/service/service_node.cpp @@ -8,6 +8,8 @@ #include +#include + #include namespace NYql { @@ -149,7 +151,7 @@ namespace NYql { }) .SetLogger(CreateActorSystemLogger(*ActorSystem, 413)); // 413 - NKikimrServices::GRPC_SERVER - Server = MakeHolder(options); + Server = MakeHolder(options, MakeIntrusive<::NMonitoring::TDynamicCounters>()); Service = TIntrusivePtr(new TDqsGrpcService(*ActorSystem, MetricsRegistry->GetSensors(), dqTaskPreprocessorFactories)); Server->AddService(Service); Server->Start(); diff --git a/ydb/library/yql/providers/dq/service/ya.make b/ydb/library/yql/providers/dq/service/ya.make index 53f66ea05782..39c0f65102ed 100644 --- a/ydb/library/yql/providers/dq/service/ya.make +++ b/ydb/library/yql/providers/dq/service/ya.make @@ -14,6 +14,7 @@ PEERDIR( library/cpp/build_info ydb/library/grpc/server ydb/library/grpc/server/actors + library/cpp/monlib/dynamic_counters library/cpp/svnversion library/cpp/threading/future yql/essentials/sql