Skip to content

Commit

Permalink
[refactoring] Get rid of "GetPeer()" grpc request implementation copy…
Browse files Browse the repository at this point in the history
… paste and unused functions (#9814)
  • Loading branch information
dcherednik authored Sep 30, 2024
1 parent 5dc2621 commit fbe6aca
Show file tree
Hide file tree
Showing 14 changed files with 25 additions and 82 deletions.
12 changes: 6 additions & 6 deletions ydb/core/client/server/grpc_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class TSimpleRequest
}

TString GetPeer() const override {
return GetPeerName();
return TGrpcBaseAsyncContext::GetPeer();
}

TVector<TStringBuf> FindClientCert() const override {
Expand All @@ -253,7 +253,7 @@ class TSimpleRequest

void Finish(const TOut& resp, ui32 status) {
LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] issuing response Name# %s data# %s peer# %s", this,
Name, NYdbGrpc::FormatMessage<TOut>(resp).data(), GetPeerName().c_str());
Name, NYdbGrpc::FormatMessage<TOut>(resp).data(), GetPeer().c_str());
ResponseSize = resp.ByteSize();
ResponseStatus = status;
StateFunc = &TSimpleRequest::FinishDone;
Expand All @@ -266,7 +266,7 @@ class TSimpleRequest
TOut resp;
TString msg = "no resource";
LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] issuing response Name# %s nodata (no resources) peer# %s", this,
Name, GetPeerName().c_str());
Name, GetPeer().c_str());

StateFunc = &TSimpleRequest::FinishDoneWithoutProcessing;
OnBeforeCall();
Expand All @@ -280,7 +280,7 @@ class TSimpleRequest
OnAfterCall();

LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] received request Name# %s ok# %s data# %s peer# %s current inflight# %li", this,
Name, ok ? "true" : "false", NYdbGrpc::FormatMessage<TIn>(Request, ok).data(), GetPeerName().c_str(), Server->GetCurrentInFlight());
Name, ok ? "true" : "false", NYdbGrpc::FormatMessage<TIn>(Request, ok).data(), GetPeer().c_str(), Server->GetCurrentInFlight());

if (Context.c_call() == nullptr) {
Y_ABORT_UNLESS(!ok);
Expand Down Expand Up @@ -318,7 +318,7 @@ class TSimpleRequest
bool FinishDone(bool ok) {
OnAfterCall();
LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] finished request Name# %s ok# %s peer# %s", this,
Name, ok ? "true" : "false", GetPeerName().c_str());
Name, ok ? "true" : "false", GetPeer().c_str());
Counters->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
TDuration::Seconds(RequestTimer.Passed()));
Server->DecRequest();
Expand All @@ -330,7 +330,7 @@ class TSimpleRequest
bool FinishDoneWithoutProcessing(bool ok) {
OnAfterCall();
LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] finished request without processing Name# %s ok# %s peer# %s", this,
Name, ok ? "true" : "false", GetPeerName().c_str());
Name, ok ? "true" : "false", GetPeer().c_str());

return false;
}
Expand Down
12 changes: 0 additions & 12 deletions ydb/core/grpc_services/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,6 @@ class IRequestCtx
friend class TProtoResponseHelper;
public:
using EStreamCtrl = NYdbGrpc::IRequestContextBase::EStreamCtrl;
virtual google::protobuf::Message* GetRequestMut() = 0;

virtual void SetRuHeader(ui64 ru) = 0;
virtual void AddServerHint(const TString& hint) = 0;
Expand Down Expand Up @@ -1166,13 +1165,6 @@ class TGRpcRequestWrapperImpl
return request;
}

template <typename T>
static TRequest* GetProtoRequestMut(const T& req) {
auto request = dynamic_cast<TRequest*>(req->GetRequestMut());
Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper");
return request;
}

const TRequest* GetProtoRequest() const {
return GetProtoRequest(this);
}
Expand Down Expand Up @@ -1272,10 +1264,6 @@ class TGRpcRequestWrapperImpl
return Ctx_->GetRequest();
}

google::protobuf::Message* GetRequestMut() override {
return Ctx_->GetRequestMut();
}

void SetRespHook(TRespHook&& hook) override {
RespHook = std::move(hook);
}
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/grpc_services/local_grpc/local_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,6 @@ class TContext
return &Request_;
}

//! Get mutable pointer to the request's message.
NProtoBuf::Message* GetRequestMut() override {
return &Request_;
}

void Reply(NProtoBuf::Message* proto, ui32 status = 0) override {
Y_UNUSED(status);
TResp* resp = dynamic_cast<TResp*>(proto);
Expand Down
8 changes: 0 additions & 8 deletions ydb/core/grpc_services/local_rpc/local_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,6 @@ class TLocalRpcCtx : public TLocalRpcCtxImpl<TRpc, TCbWrapper, IsOperation> {
return &Request;
}

google::protobuf::Message* GetRequestMut() override {
return &Request;
}

void SetFinishAction(std::function<void()>&&) override {}

bool IsClientLost() const override { return false; }
Expand Down Expand Up @@ -417,10 +413,6 @@ class TStreamReadProcessor : public NGRpcService::NLocalGrpc::TContextBase {
return GetBaseRequest().GetRequest();
}

NProtoBuf::Message* GetRequestMut() override {
return GetBaseRequest().GetRequestMut();
}

TAsyncFinishResult GetFinishFuture() override {
return FinishPromise.GetFuture();
}
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/grpc_services/rpc_deferrable.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ class TRpcRequestWithOperationParamsActor : public TActorBootstrapped<TDerived>
return TRequest::GetProtoRequest(Request_);
}

typename TRequest::TRequest* GetProtoRequestMut() {
return TRequest::GetProtoRequestMut(Request_);
}

Ydb::Operations::OperationParams::OperationMode GetOperationMode() const {
return GetProtoRequest()->operation_params().operation_mode();
}
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/grpc_services/ydb_over_fq/rpc_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ class TRpcStreamingBase : public TActorBootstrapped<TDerived> {
return TReq::GetProtoRequest(Request_);
}

TRequest* GetProtoRequestMut() noexcept {
return TReq::GetProtoRequestMut(Request_);
}

IRequestNoOpCtx& Request() noexcept { return *Request_; }

private:
Expand Down
28 changes: 14 additions & 14 deletions ydb/core/grpc_streaming/grpc_streaming.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class TGRpcStreamingRequest final
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream accepted Name# %s ok# %s peer# %s",
this, Name,
status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
this->GetPeerName().c_str());
this->GetPeer().c_str());

if (status == NYdbGrpc::EQueueEventStatus::ERROR) {
// Don't bother registering if accept failed
Expand Down Expand Up @@ -265,7 +265,7 @@ class TGRpcStreamingRequest final
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream done notification Name# %s ok# %s peer# %s",
this, Name,
status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
this->GetPeerName().c_str());
this->GetPeer().c_str());

bool success = status == NYdbGrpc::EQueueEventStatus::OK;

Expand All @@ -285,7 +285,7 @@ class TGRpcStreamingRequest final
void Cancel() {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade cancel Name# %s peer# %s",
this, Name,
this->GetPeerName().c_str());
this->GetPeer().c_str());

this->Context.TryCancel();
}
Expand All @@ -298,7 +298,7 @@ class TGRpcStreamingRequest final
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade attach Name# %s actor# %s peer# %s",
this, Name,
actor.ToString().c_str(),
this->GetPeerName().c_str());
this->GetPeer().c_str());

auto guard = SingleThreaded.Enforce();

Expand All @@ -322,7 +322,7 @@ class TGRpcStreamingRequest final
bool Read() {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade read Name# %s peer# %s",
this, Name,
this->GetPeerName().c_str());
this->GetPeer().c_str());

auto guard = SingleThreaded.Enforce();

Expand Down Expand Up @@ -350,7 +350,7 @@ class TGRpcStreamingRequest final
this, Name,
status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
NYdbGrpc::FormatMessage<TIn>(ReadInProgress->Record, status == NYdbGrpc::EQueueEventStatus::OK).c_str(),
this->GetPeerName().c_str());
this->GetPeer().c_str());

// Take current in-progress read first
auto read = std::move(ReadInProgress);
Expand All @@ -373,7 +373,7 @@ class TGRpcStreamingRequest final
Y_DEBUG_ABORT_UNLESS(flags & FlagFinishCalled);
if (Flags.compare_exchange_weak(flags, flags & ~FlagRegistered, std::memory_order_acq_rel)) {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] deregistering request Name# %s peer# %s (read done)",
this, Name, this->GetPeerName().c_str());
this, Name, this->GetPeer().c_str());
Server->DeregisterRequestCtx(this);
break;
}
Expand All @@ -391,14 +391,14 @@ class TGRpcStreamingRequest final
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s grpc status# (%d) message# %s",
this, Name,
NYdbGrpc::FormatMessage<TOut>(message).c_str(),
this->GetPeerName().c_str(),
this->GetPeer().c_str(),
static_cast<int>(status->error_code()),
status->error_message().c_str());
} else {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s",
this, Name,
NYdbGrpc::FormatMessage<TOut>(message).c_str(),
this->GetPeerName().c_str());
this->GetPeer().c_str());
}

Y_ABORT_UNLESS(!options.is_corked(),
Expand Down Expand Up @@ -453,7 +453,7 @@ class TGRpcStreamingRequest final
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] write finished Name# %s ok# %s peer# %s",
this, Name,
status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
this->GetPeerName().c_str());
this->GetPeer().c_str());

auto event = MakeHolder<typename IContext::TEvWriteFinished>();
event->Success = status == NYdbGrpc::EQueueEventStatus::OK;
Expand Down Expand Up @@ -506,7 +506,7 @@ class TGRpcStreamingRequest final
bool Finish(const grpc::Status& status) {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade finish Name# %s peer# %s grpc status# (%d) message# %s",
this, Name,
this->GetPeerName().c_str(),
this->GetPeer().c_str(),
static_cast<int>(status.error_code()),
status.error_message().c_str());

Expand Down Expand Up @@ -542,7 +542,7 @@ class TGRpcStreamingRequest final
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream finished Name# %s ok# %s peer# %s grpc status# (%d) message# %s",
this, Name,
status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false",
this->GetPeerName().c_str(),
this->GetPeer().c_str(),
static_cast<int>(Status->error_code()),
Status->error_message().c_str());

Expand Down Expand Up @@ -577,7 +577,7 @@ class TGRpcStreamingRequest final
while ((flags & FlagRegistered) && ReadQueue.load() == 0) {
if (Flags.compare_exchange_weak(flags, flags & ~FlagRegistered, std::memory_order_acq_rel)) {
LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] deregistering request Name# %s peer# %s (finish done)",
this, Name, this->GetPeerName().c_str());
this, Name, this->GetPeer().c_str());
Server->DeregisterRequestCtx(this);
break;
}
Expand Down Expand Up @@ -646,7 +646,7 @@ class TGRpcStreamingRequest final
}

TString GetPeerName() const override {
return Self->GetPeerName();
return Self->GetPeer();
}

TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override {
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/kafka_proxy/actors/control_plane_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,6 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC
return DummyAuditLogParts;
};

google::protobuf::Message* GetRequestMut() override {
return nullptr;
};

void SetRuHeader(ui64 ru) override {
Y_UNUSED(ru);
};
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
return DummyAuditLogParts;
};

google::protobuf::Message* GetRequestMut() override {
return nullptr;
};

void SetRuHeader(ui64 ru) override {
Y_UNUSED(ru);
};
Expand Down
4 changes: 0 additions & 4 deletions ydb/core/public_http/grpc_request_context_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ namespace NKikimr::NPublicHttp {
return Request.get();
}

NProtoBuf::Message* TGrpcRequestContextWrapper::GetRequestMut() {
return Request.get();
}

NYdbGrpc::TAuthState& TGrpcRequestContextWrapper::GetAuthState() {
return AuthState;
}
Expand Down
1 change: 0 additions & 1 deletion ydb/core/public_http/grpc_request_context_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class TGrpcRequestContextWrapper : public NYdbGrpc::IRequestContextBase {
public:
TGrpcRequestContextWrapper(const THttpRequestContext& requestContext, std::unique_ptr<NProtoBuf::Message> request, TReplySender replySender);
virtual const NProtoBuf::Message* GetRequest() const;
virtual NProtoBuf::Message* GetRequestMut();
virtual NYdbGrpc::TAuthState& GetAuthState();
virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0);
virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0, EStreamCtrl ctrl = EStreamCtrl::CONT);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/grpc/server/grpc_async_ctx_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TBaseAsyncContext: public ICancelableContext {
{
}

TString GetPeerName() const {
TString GetPeer() const {
// Decode URL-encoded square brackets
auto ip = Context.peer();
CGIUnescape(ip);
Expand Down
16 changes: 4 additions & 12 deletions ydb/library/grpc/server/grpc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,6 @@ class TGRpcRequestImpl
return bool(StreamAdaptor_);
}

TString GetPeer() const override {
// Decode URL-encoded square brackets
auto ip = TString(this->Context.peer());
CGIUnescape(ip);
return ip;
}

bool SslServer() const override {
return Server_->SslServer();
}
Expand Down Expand Up @@ -168,6 +161,10 @@ class TGRpcRequestImpl
UnRef();
}

TString GetPeer() const override {
return TBaseAsyncContext<TService>::GetPeer();
}

TInstant Deadline() const override {
return TBaseAsyncContext<TService>::Deadline();
}
Expand All @@ -193,11 +190,6 @@ class TGRpcRequestImpl
return Request_;
}

NProtoBuf::Message* GetRequestMut() override {
return Request_;
}


TAuthState& GetAuthState() override {
return AuthState_;
}
Expand Down
3 changes: 0 additions & 3 deletions ydb/library/grpc/server/grpc_request_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ class IRequestContextBase: public TThrRefBase {
//! Get pointer to the request's message.
virtual const NProtoBuf::Message* GetRequest() const = 0;

//! Get mutable pointer to the request's message.
virtual NProtoBuf::Message* GetRequestMut() = 0;

//! Get current auth state
virtual TAuthState& GetAuthState() = 0;

Expand Down

0 comments on commit fbe6aca

Please sign in to comment.