diff --git a/ydb/core/client/server/grpc_server.cpp b/ydb/core/client/server/grpc_server.cpp index 02326b38d2bd..c1e53a4ea2b2 100644 --- a/ydb/core/client/server/grpc_server.cpp +++ b/ydb/core/client/server/grpc_server.cpp @@ -239,7 +239,7 @@ class TSimpleRequest } TString GetPeer() const override { - return GetPeerName(); + return TGrpcBaseAsyncContext::GetPeer(); } TVector FindClientCert() const override { @@ -253,7 +253,7 @@ class TSimpleRequest void Finish(const TOut& resp, ui32 status) { LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] issuing response Name# %s data# %s peer# %s", this, - Name, NYdbGrpc::FormatMessage(resp).data(), GetPeerName().c_str()); + Name, NYdbGrpc::FormatMessage(resp).data(), GetPeer().c_str()); ResponseSize = resp.ByteSize(); ResponseStatus = status; StateFunc = &TSimpleRequest::FinishDone; @@ -266,7 +266,7 @@ class TSimpleRequest TOut resp; TString msg = "no resource"; LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] issuing response Name# %s nodata (no resources) peer# %s", this, - Name, GetPeerName().c_str()); + Name, GetPeer().c_str()); StateFunc = &TSimpleRequest::FinishDoneWithoutProcessing; OnBeforeCall(); @@ -280,7 +280,7 @@ class TSimpleRequest OnAfterCall(); LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] received request Name# %s ok# %s data# %s peer# %s current inflight# %li", this, - Name, ok ? "true" : "false", NYdbGrpc::FormatMessage(Request, ok).data(), GetPeerName().c_str(), Server->GetCurrentInFlight()); + Name, ok ? "true" : "false", NYdbGrpc::FormatMessage(Request, ok).data(), GetPeer().c_str(), Server->GetCurrentInFlight()); if (Context.c_call() == nullptr) { Y_ABORT_UNLESS(!ok); @@ -318,7 +318,7 @@ class TSimpleRequest bool FinishDone(bool ok) { OnAfterCall(); LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] finished request Name# %s ok# %s peer# %s", this, - Name, ok ? "true" : "false", GetPeerName().c_str()); + Name, ok ? "true" : "false", GetPeer().c_str()); Counters->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus, TDuration::Seconds(RequestTimer.Passed())); Server->DecRequest(); @@ -330,7 +330,7 @@ class TSimpleRequest bool FinishDoneWithoutProcessing(bool ok) { OnAfterCall(); LOG_DEBUG(ActorSystem, NKikimrServices::GRPC_SERVER, "[%p] finished request without processing Name# %s ok# %s peer# %s", this, - Name, ok ? "true" : "false", GetPeerName().c_str()); + Name, ok ? "true" : "false", GetPeer().c_str()); return false; } diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 7885c14f066a..65bbceccbb55 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -439,7 +439,6 @@ class IRequestCtx friend class TProtoResponseHelper; public: using EStreamCtrl = NYdbGrpc::IRequestContextBase::EStreamCtrl; - virtual google::protobuf::Message* GetRequestMut() = 0; virtual void SetRuHeader(ui64 ru) = 0; virtual void AddServerHint(const TString& hint) = 0; @@ -1166,13 +1165,6 @@ class TGRpcRequestWrapperImpl return request; } - template - static TRequest* GetProtoRequestMut(const T& req) { - auto request = dynamic_cast(req->GetRequestMut()); - Y_ABORT_UNLESS(request != nullptr, "Wrong using of TGRpcRequestWrapper"); - return request; - } - const TRequest* GetProtoRequest() const { return GetProtoRequest(this); } @@ -1272,10 +1264,6 @@ class TGRpcRequestWrapperImpl return Ctx_->GetRequest(); } - google::protobuf::Message* GetRequestMut() override { - return Ctx_->GetRequestMut(); - } - void SetRespHook(TRespHook&& hook) override { RespHook = std::move(hook); } diff --git a/ydb/core/grpc_services/local_grpc/local_grpc.h b/ydb/core/grpc_services/local_grpc/local_grpc.h index 8c00724d81c1..6da8e538097e 100644 --- a/ydb/core/grpc_services/local_grpc/local_grpc.h +++ b/ydb/core/grpc_services/local_grpc/local_grpc.h @@ -130,11 +130,6 @@ class TContext return &Request_; } - //! Get mutable pointer to the request's message. - NProtoBuf::Message* GetRequestMut() override { - return &Request_; - } - void Reply(NProtoBuf::Message* proto, ui32 status = 0) override { Y_UNUSED(status); TResp* resp = dynamic_cast(proto); diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index eb4b4a8deee2..63b4cbe0687d 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -196,10 +196,6 @@ class TLocalRpcCtx : public TLocalRpcCtxImpl { return &Request; } - google::protobuf::Message* GetRequestMut() override { - return &Request; - } - void SetFinishAction(std::function&&) override {} bool IsClientLost() const override { return false; } @@ -417,10 +413,6 @@ class TStreamReadProcessor : public NGRpcService::NLocalGrpc::TContextBase { return GetBaseRequest().GetRequest(); } - NProtoBuf::Message* GetRequestMut() override { - return GetBaseRequest().GetRequestMut(); - } - TAsyncFinishResult GetFinishFuture() override { return FinishPromise.GetFuture(); } diff --git a/ydb/core/grpc_services/rpc_deferrable.h b/ydb/core/grpc_services/rpc_deferrable.h index 8de3a317cdd0..b14564a44ffa 100644 --- a/ydb/core/grpc_services/rpc_deferrable.h +++ b/ydb/core/grpc_services/rpc_deferrable.h @@ -64,10 +64,6 @@ class TRpcRequestWithOperationParamsActor : public TActorBootstrapped return TRequest::GetProtoRequest(Request_); } - typename TRequest::TRequest* GetProtoRequestMut() { - return TRequest::GetProtoRequestMut(Request_); - } - Ydb::Operations::OperationParams::OperationMode GetOperationMode() const { return GetProtoRequest()->operation_params().operation_mode(); } diff --git a/ydb/core/grpc_services/ydb_over_fq/rpc_base.h b/ydb/core/grpc_services/ydb_over_fq/rpc_base.h index e8f7a402042f..4ecc392b1baa 100644 --- a/ydb/core/grpc_services/ydb_over_fq/rpc_base.h +++ b/ydb/core/grpc_services/ydb_over_fq/rpc_base.h @@ -71,10 +71,6 @@ class TRpcStreamingBase : public TActorBootstrapped { return TReq::GetProtoRequest(Request_); } - TRequest* GetProtoRequestMut() noexcept { - return TReq::GetProtoRequestMut(Request_); - } - IRequestNoOpCtx& Request() noexcept { return *Request_; } private: diff --git a/ydb/core/grpc_streaming/grpc_streaming.h b/ydb/core/grpc_streaming/grpc_streaming.h index c6d6fd57e3aa..bfb80c7b949b 100644 --- a/ydb/core/grpc_streaming/grpc_streaming.h +++ b/ydb/core/grpc_streaming/grpc_streaming.h @@ -224,7 +224,7 @@ class TGRpcStreamingRequest final LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream accepted Name# %s ok# %s peer# %s", this, Name, status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", - this->GetPeerName().c_str()); + this->GetPeer().c_str()); if (status == NYdbGrpc::EQueueEventStatus::ERROR) { // Don't bother registering if accept failed @@ -265,7 +265,7 @@ class TGRpcStreamingRequest final LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream done notification Name# %s ok# %s peer# %s", this, Name, status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", - this->GetPeerName().c_str()); + this->GetPeer().c_str()); bool success = status == NYdbGrpc::EQueueEventStatus::OK; @@ -285,7 +285,7 @@ class TGRpcStreamingRequest final void Cancel() { LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade cancel Name# %s peer# %s", this, Name, - this->GetPeerName().c_str()); + this->GetPeer().c_str()); this->Context.TryCancel(); } @@ -298,7 +298,7 @@ class TGRpcStreamingRequest final LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade attach Name# %s actor# %s peer# %s", this, Name, actor.ToString().c_str(), - this->GetPeerName().c_str()); + this->GetPeer().c_str()); auto guard = SingleThreaded.Enforce(); @@ -322,7 +322,7 @@ class TGRpcStreamingRequest final bool Read() { LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade read Name# %s peer# %s", this, Name, - this->GetPeerName().c_str()); + this->GetPeer().c_str()); auto guard = SingleThreaded.Enforce(); @@ -350,7 +350,7 @@ class TGRpcStreamingRequest final this, Name, status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", NYdbGrpc::FormatMessage(ReadInProgress->Record, status == NYdbGrpc::EQueueEventStatus::OK).c_str(), - this->GetPeerName().c_str()); + this->GetPeer().c_str()); // Take current in-progress read first auto read = std::move(ReadInProgress); @@ -373,7 +373,7 @@ class TGRpcStreamingRequest final Y_DEBUG_ABORT_UNLESS(flags & FlagFinishCalled); if (Flags.compare_exchange_weak(flags, flags & ~FlagRegistered, std::memory_order_acq_rel)) { LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] deregistering request Name# %s peer# %s (read done)", - this, Name, this->GetPeerName().c_str()); + this, Name, this->GetPeer().c_str()); Server->DeregisterRequestCtx(this); break; } @@ -391,14 +391,14 @@ class TGRpcStreamingRequest final LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s grpc status# (%d) message# %s", this, Name, NYdbGrpc::FormatMessage(message).c_str(), - this->GetPeerName().c_str(), + this->GetPeer().c_str(), static_cast(status->error_code()), status->error_message().c_str()); } else { LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade write Name# %s data# %s peer# %s", this, Name, NYdbGrpc::FormatMessage(message).c_str(), - this->GetPeerName().c_str()); + this->GetPeer().c_str()); } Y_ABORT_UNLESS(!options.is_corked(), @@ -453,7 +453,7 @@ class TGRpcStreamingRequest final LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] write finished Name# %s ok# %s peer# %s", this, Name, status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", - this->GetPeerName().c_str()); + this->GetPeer().c_str()); auto event = MakeHolder(); event->Success = status == NYdbGrpc::EQueueEventStatus::OK; @@ -506,7 +506,7 @@ class TGRpcStreamingRequest final bool Finish(const grpc::Status& status) { LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] facade finish Name# %s peer# %s grpc status# (%d) message# %s", this, Name, - this->GetPeerName().c_str(), + this->GetPeer().c_str(), static_cast(status.error_code()), status.error_message().c_str()); @@ -542,7 +542,7 @@ class TGRpcStreamingRequest final LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] stream finished Name# %s ok# %s peer# %s grpc status# (%d) message# %s", this, Name, status == NYdbGrpc::EQueueEventStatus::OK ? "true" : "false", - this->GetPeerName().c_str(), + this->GetPeer().c_str(), static_cast(Status->error_code()), Status->error_message().c_str()); @@ -577,7 +577,7 @@ class TGRpcStreamingRequest final while ((flags & FlagRegistered) && ReadQueue.load() == 0) { if (Flags.compare_exchange_weak(flags, flags & ~FlagRegistered, std::memory_order_acq_rel)) { LOG_DEBUG(ActorSystem, LoggerServiceId, "[%p] deregistering request Name# %s peer# %s (finish done)", - this, Name, this->GetPeerName().c_str()); + this, Name, this->GetPeer().c_str()); Server->DeregisterRequestCtx(this); break; } @@ -646,7 +646,7 @@ class TGRpcStreamingRequest final } TString GetPeerName() const override { - return Self->GetPeerName(); + return Self->GetPeer(); } TVector GetPeerMetaValues(TStringBuf key) const override { diff --git a/ydb/core/kafka_proxy/actors/control_plane_common.h b/ydb/core/kafka_proxy/actors/control_plane_common.h index 597d808a269e..9895ab2b4f14 100644 --- a/ydb/core/kafka_proxy/actors/control_plane_common.h +++ b/ydb/core/kafka_proxy/actors/control_plane_common.h @@ -292,10 +292,6 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC return DummyAuditLogParts; }; - google::protobuf::Message* GetRequestMut() override { - return nullptr; - }; - void SetRuHeader(ui64 ru) override { Y_UNUSED(ru); }; diff --git a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp index 521f8598f9b3..2c6d5632325f 100644 --- a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp @@ -127,10 +127,6 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt return DummyAuditLogParts; }; - google::protobuf::Message* GetRequestMut() override { - return nullptr; - }; - void SetRuHeader(ui64 ru) override { Y_UNUSED(ru); }; diff --git a/ydb/core/public_http/grpc_request_context_wrapper.cpp b/ydb/core/public_http/grpc_request_context_wrapper.cpp index 19624005a448..42ef99493631 100644 --- a/ydb/core/public_http/grpc_request_context_wrapper.cpp +++ b/ydb/core/public_http/grpc_request_context_wrapper.cpp @@ -19,10 +19,6 @@ namespace NKikimr::NPublicHttp { return Request.get(); } - NProtoBuf::Message* TGrpcRequestContextWrapper::GetRequestMut() { - return Request.get(); - } - NYdbGrpc::TAuthState& TGrpcRequestContextWrapper::GetAuthState() { return AuthState; } diff --git a/ydb/core/public_http/grpc_request_context_wrapper.h b/ydb/core/public_http/grpc_request_context_wrapper.h index 35f47e3bdd69..35d05be0bd88 100644 --- a/ydb/core/public_http/grpc_request_context_wrapper.h +++ b/ydb/core/public_http/grpc_request_context_wrapper.h @@ -24,7 +24,6 @@ class TGrpcRequestContextWrapper : public NYdbGrpc::IRequestContextBase { public: TGrpcRequestContextWrapper(const THttpRequestContext& requestContext, std::unique_ptr request, TReplySender replySender); virtual const NProtoBuf::Message* GetRequest() const; - virtual NProtoBuf::Message* GetRequestMut(); virtual NYdbGrpc::TAuthState& GetAuthState(); virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0); virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0, EStreamCtrl ctrl = EStreamCtrl::CONT); diff --git a/ydb/library/grpc/server/grpc_async_ctx_base.h b/ydb/library/grpc/server/grpc_async_ctx_base.h index 5ece9974317b..4837cc5b1d4b 100644 --- a/ydb/library/grpc/server/grpc_async_ctx_base.h +++ b/ydb/library/grpc/server/grpc_async_ctx_base.h @@ -25,7 +25,7 @@ class TBaseAsyncContext: public ICancelableContext { { } - TString GetPeerName() const { + TString GetPeer() const { // Decode URL-encoded square brackets auto ip = Context.peer(); CGIUnescape(ip); diff --git a/ydb/library/grpc/server/grpc_request.h b/ydb/library/grpc/server/grpc_request.h index 3be0cb44ee9f..67435f502984 100644 --- a/ydb/library/grpc/server/grpc_request.h +++ b/ydb/library/grpc/server/grpc_request.h @@ -120,13 +120,6 @@ class TGRpcRequestImpl return bool(StreamAdaptor_); } - TString GetPeer() const override { - // Decode URL-encoded square brackets - auto ip = TString(this->Context.peer()); - CGIUnescape(ip); - return ip; - } - bool SslServer() const override { return Server_->SslServer(); } @@ -168,6 +161,10 @@ class TGRpcRequestImpl UnRef(); } + TString GetPeer() const override { + return TBaseAsyncContext::GetPeer(); + } + TInstant Deadline() const override { return TBaseAsyncContext::Deadline(); } @@ -193,11 +190,6 @@ class TGRpcRequestImpl return Request_; } - NProtoBuf::Message* GetRequestMut() override { - return Request_; - } - - TAuthState& GetAuthState() override { return AuthState_; } diff --git a/ydb/library/grpc/server/grpc_request_base.h b/ydb/library/grpc/server/grpc_request_base.h index 2ad89993c850..a43c1c41d618 100644 --- a/ydb/library/grpc/server/grpc_request_base.h +++ b/ydb/library/grpc/server/grpc_request_base.h @@ -58,9 +58,6 @@ class IRequestContextBase: public TThrRefBase { //! Get pointer to the request's message. virtual const NProtoBuf::Message* GetRequest() const = 0; - //! Get mutable pointer to the request's message. - virtual NProtoBuf::Message* GetRequestMut() = 0; - //! Get current auth state virtual TAuthState& GetAuthState() = 0;