Skip to content

Commit

Permalink
Merge 03d8bdc into 41baadd
Browse files Browse the repository at this point in the history
  • Loading branch information
eivanov89 authored Nov 21, 2024
2 parents 41baadd + 03d8bdc commit 41701aa
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 19 deletions.
8 changes: 4 additions & 4 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -973,13 +973,13 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
sslData.DoRequestClientCertificate = appConfig.GetClientCertificateAuthorization().GetRequestClientCertificate();
sslOpts.SetSslData(sslData);

GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts) });
GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(sslOpts, Counters) });

fillFn(grpcConfig, *GRpcServers.back().second, sslOpts);
}

if (grpcConfig.GetPort()) {
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts) });
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(opts, Counters) });

fillFn(grpcConfig, *GRpcServers.back().second, opts);
}
Expand All @@ -996,7 +996,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
xopts.SetEndpointId(ex.GetEndpointId());
}

GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts) });
GRpcServers.push_back({ "grpc", new NYdbGrpc::TGRpcServer(xopts, Counters) });
fillFn(ex, *GRpcServers.back().second, xopts);
}

Expand Down Expand Up @@ -1035,7 +1035,7 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
Y_ABORT_UNLESS(xopts.SslData->Cert, "Cert not set");
Y_ABORT_UNLESS(xopts.SslData->Key, "Key not set");

GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts) });
GRpcServers.push_back({ "grpcs", new NYdbGrpc::TGRpcServer(xopts, Counters) });
fillFn(ex, *GRpcServers.back().second, xopts);
}
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/grpc_streaming/grpc_streaming_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,13 @@ class TGRpcTestServer {

Server->GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);

TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(MakeIntrusive<::NMonitoring::TDynamicCounters>());

NYdbGrpc::TServerOptions options;
options.SetPort(grpc);
GRpcServer.Reset(new NYdbGrpc::TGRpcServer(options));
GRpcServer.Reset(new NYdbGrpc::TGRpcServer(options, counters));

auto* as = Server->GetRuntime()->GetAnyNodeActorSystem();
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters(MakeIntrusive<::NMonitoring::TDynamicCounters>());

GRpcServer->AddService(new TStreamingService<TImplActor>(as, counters));
GRpcServer->Start();
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/testlib/test_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,10 @@ namespace Tests {
}

void TServer::EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId) {
GRpcServer.reset(new NYdbGrpc::TGRpcServer(options));
GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
auto& counters = GRpcServerRootCounters;

GRpcServer.reset(new NYdbGrpc::TGRpcServer(options), counters);
auto grpcService = new NGRpcProxy::TGRpcService();

auto system(Runtime->GetActorSystem(grpcServiceNodeId));
Expand Down Expand Up @@ -378,9 +381,6 @@ namespace Tests {
auto grpcMon = system->Register(NGRpcService::CreateGrpcMonService(), TMailboxType::ReadAsFilled, appData.UserPoolId);
system->RegisterLocalService(NGRpcService::GrpcMonServiceId(), grpcMon);

GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
auto& counters = GRpcServerRootCounters;

// Setup discovery for typically used services on the node
{
TIntrusivePtr<NGRpcService::TGrpcEndpointDescription> desc = new NGRpcService::TGrpcEndpointDescription();
Expand Down
32 changes: 25 additions & 7 deletions ydb/library/grpc/server/grpc_server.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#include "grpc_server.h"

#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/time_provider/monotonic.h>

#include <util/string/join.h>
#include <util/generic/yexception.h>
#include <util/system/thread.h>
Expand All @@ -18,19 +21,31 @@

namespace NYdbGrpc {

using NThreading::TFuture;

static void PullEvents(grpc::ServerCompletionQueue* cq) {
static void PullEvents(grpc::ServerCompletionQueue* cq, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) {
TThread::SetCurrentThreadName("grpc_server");
auto okCounter = counters->GetCounter("RequestExecuted", true);
auto errorCounter = counters->GetCounter("RequestDestroyed", true);
auto cpuTime = counters->GetCounter("ThreadCPU", true);

NMonotonic::TMonotonic lastCpuTimeTs = {};
while (true) {
void* tag; // uniquely identifies a request.
bool ok;

auto now = NMonotonic::TMonotonic::Now();
if (now - lastCpuTimeTs >= TDuration::Seconds(1)) {
lastCpuTimeTs = now;
*cpuTime = ThreadCPUTime();
}

if (cq->Next(&tag, &ok)) {
IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));

if (!ev->Execute(ok)) {
if (ev->Execute(ok)) {
okCounter->Inc();
} else {
ev->DestroyRequest();
errorCounter->Inc();
}
} else {
break;
Expand Down Expand Up @@ -103,8 +118,9 @@ void TGrpcServiceProtectiable::DecRequest() {
}
}

TGRpcServer::TGRpcServer(const TServerOptions& opts)
TGRpcServer::TGRpcServer(const TServerOptions& opts, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters)
: Options_(opts)
, Counters_(std::move(counters))
, Limiter_(Options_.MaxGlobalRequestInFlight)
{}

Expand Down Expand Up @@ -237,10 +253,12 @@ void TGRpcServer::Start() {
}

Ts.reserve(Options_.WorkerThreads);
auto grpcCounters = Counters_->GetSubgroup("counters", "grpc");
for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
auto* cq = &CQS_[i % CQS_.size()];
Ts.push_back(SystemThreadFactory()->Run([cq] {
PullEvents(cq->get());
auto workerCounters = grpcCounters->GetSubgroup("worker", ToString(i));
Ts.push_back(SystemThreadFactory()->Run([cq, workerCounters] {
PullEvents(cq->get(), std::move(workerCounters));
}));
}

Expand Down
7 changes: 6 additions & 1 deletion ydb/library/grpc/server/grpc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

#include <grpcpp/grpcpp.h>

namespace NMonitoring {
struct TDynamicCounters;
} // NMonitoring

namespace NYdbGrpc {

struct TSslData {
Expand Down Expand Up @@ -349,7 +353,7 @@ class TGrpcServiceBase: public TGrpcServiceProtectiable {
class TGRpcServer {
public:
using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;
TGRpcServer(const TServerOptions& opts);
TGRpcServer(const TServerOptions& opts, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters);
~TGRpcServer();
void AddService(IGRpcServicePtr service);
void Start();
Expand All @@ -365,6 +369,7 @@ class TGRpcServer {
using IThreadRef = TAutoPtr<IThreadFactory::IThread>;

const TServerOptions Options_;
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_;
std::unique_ptr<grpc::Server> Server_;
std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;
TVector<IThreadRef> Ts;
Expand Down
4 changes: 3 additions & 1 deletion ydb/library/yql/providers/dq/service/service_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include <ydb/library/grpc/server/actors/logger.h>

#include <library/cpp/monlib/dynamic_counters/counters.h>

#include <utility>

namespace NYql {
Expand Down Expand Up @@ -149,7 +151,7 @@ namespace NYql {
})
.SetLogger(CreateActorSystemLogger(*ActorSystem, 413)); // 413 - NKikimrServices::GRPC_SERVER

Server = MakeHolder<TGRpcServer>(options);
Server = MakeHolder<TGRpcServer>(options, MakeIntrusive<::NMonitoring::TDynamicCounters>());
Service = TIntrusivePtr<IGRpcService>(new TDqsGrpcService(*ActorSystem, MetricsRegistry->GetSensors(), dqTaskPreprocessorFactories));
Server->AddService(Service);
Server->Start();
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/service/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ PEERDIR(
library/cpp/build_info
ydb/library/grpc/server
ydb/library/grpc/server/actors
library/cpp/monlib/dynamic_counters
library/cpp/svnversion
library/cpp/threading/future
yql/essentials/sql
Expand Down

0 comments on commit 41701aa

Please sign in to comment.