diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 6d2366b0cb79..f36c8b57e734 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -610,8 +610,6 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig, ctx.Send(CacheActor, new TEvPQ::TEvChangeCacheConfig(cacheSize)); } - SetupTransactionCounters(ctx); - InitializeMeteringSink(ctx); } @@ -1007,7 +1005,6 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult& } EndInitTransactions(); - SetupTransactionCounters(ctx); EndReadConfig(ctx); } @@ -2961,160 +2958,6 @@ TPersQueue::TPersQueue(const TActorId& tablet, TTabletStorageInfo *info) State.Clear(); } -void TPersQueue::SetupTransactionCounters(const TActorContext& ctx) -{ - if (!TxExecutionTime.empty() || !TopicConverter) { - return; - } - - NMonitoring::TDynamicCounterPtr counters = AppData(ctx)->Counters; - TVector labels; - - TVector> subgroups; - subgroups.emplace_back("topic", TopicConverter->GetClientsideName()); - - SetupTransactionExecutionTimeCounter(counters, labels, subgroups); - SetupTransactionStartedCounter(counters, labels, subgroups); - SetupTransactionCompletedCounter(counters, labels, subgroups); - SetupTransactionResponseTime(counters, labels, subgroups); -} - -TVector> GetTransactionCounterIntervals() -{ - TVector> intervals; - intervals.emplace_back(1, "1ms"); - intervals.emplace_back(2, "2ms"); - intervals.emplace_back(5, "5ms"); - intervals.emplace_back(10, "10ms"); - intervals.emplace_back(20, "20ms"); - intervals.emplace_back(50, "50ms"); - for (unsigned k = 80; k <= 600; k += 30) { - intervals.emplace_back(k, TStringBuilder() << k << "ms"); - } - intervals.emplace_back(1000, "1000ms"); - - return intervals; -} - -void TPersQueue::SetupTransactionExecutionTimeCounter(NMonitoring::TDynamicCounterPtr counters, - const TVector& labels, - const TVector>& subgroupsSrc) -{ - auto intervals = GetTransactionCounterIntervals(); - - const TVector states { - NKikimrPQ::TTransaction::UNKNOWN, - NKikimrPQ::TTransaction::PREPARING, - NKikimrPQ::TTransaction::PREPARED, - NKikimrPQ::TTransaction::PLANNING, - NKikimrPQ::TTransaction::PLANNED, - NKikimrPQ::TTransaction::CALCULATING, - NKikimrPQ::TTransaction::CALCULATED, - NKikimrPQ::TTransaction::WAIT_RS, - NKikimrPQ::TTransaction::EXECUTING, - NKikimrPQ::TTransaction::EXECUTED, - NKikimrPQ::TTransaction::WAIT_RS_ACKS, - NKikimrPQ::TTransaction::DELETING, - }; - - auto subgroups = subgroupsSrc; - subgroups.emplace_back("name", "topic.transaction.execution_time"); - - for (auto state : states) { - // cluster - // service = data-stream | data-stream-serverless - // name = topic.transaction.execution_time - // state = ... - // bin = * - subgroups.emplace_back("state", NKikimrPQ::TTransaction_EState_Name(state)); - TxExecutionTime[state] = - MakeHolder(NPersQueue::GetCountersForTopic(counters, IsServerless), - labels, - subgroups, - "bin", - intervals, - true); - subgroups.pop_back(); - } -} - -void TPersQueue::SetupTransactionStartedCounter(NMonitoring::TDynamicCounterPtr counters, - const TVector& labels, - const TVector>& subgroups) -{ - // cluster - // service = data-stream | data-stream-serverless - // name = topic.transaction.started - TxStarted = MakeHolder(NPersQueue::GetCountersForTopic(counters, IsServerless), - labels, - subgroups, - TVector(1, "topic.transaction.started"), - true, - "name"); -} - -void TPersQueue::SetupTransactionCompletedCounter(NMonitoring::TDynamicCounterPtr counters, - const TVector& labels, - const TVector>& subgroups) -{ - // cluster - // service = data-stream | data-stream-serverless - // name = topic.transaction.completed - TxCompleted = MakeHolder(NPersQueue::GetCountersForTopic(counters, IsServerless), - labels, - subgroups, - TVector(1, "topic.transaction.completed"), - true, - "name"); -} - -void TPersQueue::SetupTransactionResponseTime(NMonitoring::TDynamicCounterPtr counters, - const TVector& labels, - const TVector>& subgroupsSrc) -{ - auto intervals = GetTransactionCounterIntervals(); - - auto subgroups = subgroupsSrc; - subgroups.emplace_back("name", "topic.transaction.response_time"); - - // cluster - // service = data-stream | data-stream-serverless - // name = topic.transaction.response_time - // bin = * - TxResponseTime = - MakeHolder(NPersQueue::GetCountersForTopic(counters, IsServerless), - labels, - subgroups, - "bin", - intervals, - true); -} - -void TPersQueue::IncTxStarted() -{ - if (!TxStarted) { - return; - } - TxStarted->Inc(); -} - -void TPersQueue::IncTxCompleted() -{ - if (!TxCompleted) { - return; - } - TxCompleted->Inc(); -} - -void TPersQueue::AccountTxResponseTime(const TDistributedTransaction& tx) -{ - if (!TxResponseTime) { - return; - } - auto now = TAppData::TimeProvider->Now(); - TxResponseTime->IncFor((now - *tx.BeginTime).MilliSeconds()); -} - void TPersQueue::CreatedHook(const TActorContext& ctx) { IsServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe @@ -3701,20 +3544,10 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx) EvProposeTransactionQueue.pop_front(); const NKikimrPQ::TEvProposeTransaction& event = front->GetRecord(); - if (!Txs.contains(event.GetTxId())) { - if (TxStarted) { - TxStarted->Inc(); - } - } TDistributedTransaction& tx = Txs[event.GetTxId()]; - if (!tx.BeginTime.Defined()) { - tx.BeginTime = TAppData::TimeProvider->Now(); - } switch (tx.State) { case NKikimrPQ::TTransaction::UNKNOWN: - tx.ChangeStateTime = TAppData::TimeProvider->Now(); - tx.OnProposeTransaction(event, GetAllowedStep(), TabletID()); @@ -4197,20 +4030,7 @@ TDistributedTransaction* TPersQueue::GetTransaction(const TActorContext& ctx, void TPersQueue::ChangeTxState(TDistributedTransaction& tx, TDistributedTransaction::EState newState) { - auto now = TAppData::TimeProvider->Now(); - - if (TxExecutionTime.contains(tx.State)) { - Y_ABORT_UNLESS(tx.ChangeStateTime.Defined(), - "PQ %" PRIu64 ", TxId %" PRIu64, - TabletID(), tx.TxId); - Y_ABORT_UNLESS(now >= *tx.ChangeStateTime, - "PQ %" PRIu64 ", TxId %" PRIu64, - TabletID(), tx.TxId); - TxExecutionTime[tx.State]->IncFor((now - *tx.ChangeStateTime).MilliSeconds()); - } - tx.State = newState; - tx.ChangeStateTime = now; PQ_LOG_D("TxId " << tx.TxId << ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State)); @@ -4479,8 +4299,6 @@ void TPersQueue::CheckTxState(const TActorContext& ctx, if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) { SendEvProposeTransactionResult(ctx, tx); - IncTxCompleted(); - AccountTxResponseTime(tx); PQ_LOG_D("complete TxId " << tx.TxId); switch (tx.Kind) { diff --git a/ydb/core/persqueue/pq_impl.h b/ydb/core/persqueue/pq_impl.h index a0bd1716423d..0c5f9f20826a 100644 --- a/ydb/core/persqueue/pq_impl.h +++ b/ydb/core/persqueue/pq_impl.h @@ -547,29 +547,6 @@ class TPersQueue : public NKeyValue::TKeyValueFlat { void DeleteWriteId(const TMaybe& writeId); void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const; - - void SetupTransactionCounters(const TActorContext& ctx); - void SetupTransactionExecutionTimeCounter(NMonitoring::TDynamicCounterPtr counters, - const TVector& labels, - const TVector>& subgroups); - void SetupTransactionStartedCounter(NMonitoring::TDynamicCounterPtr counters, - const TVector& labels, - const TVector>& subgroups); - void SetupTransactionCompletedCounter(NMonitoring::TDynamicCounterPtr counters, - const TVector& labels, - const TVector>& subgroups); - void SetupTransactionResponseTime(NMonitoring::TDynamicCounterPtr counters, - const TVector& labels, - const TVector>& subgroups); - - void IncTxStarted(); - void IncTxCompleted(); - void AccountTxResponseTime(const TDistributedTransaction& tx); - - THashMap> TxExecutionTime; - THolder TxResponseTime; - THolder TxStarted; - THolder TxCompleted; }; diff --git a/ydb/core/persqueue/sli_duration_counter.cpp b/ydb/core/persqueue/sli_duration_counter.cpp deleted file mode 100644 index c821ee46582f..000000000000 --- a/ydb/core/persqueue/sli_duration_counter.cpp +++ /dev/null @@ -1,21 +0,0 @@ -#include "sli_duration_counter.h" - -#include -#include - -namespace NKikimr::NPQ { - -TPercentileCounter CreateSLIDurationCounter(const NActors::TActorContext& ctx, - const TString& name, - const TVector& durations, - const TString& account, - ui32 border) -{ - return CreateSLIDurationCounter(GetServiceCounters(AppData(ctx)->Counters, "pqproxy|SLI"), - TVector{{{{"Account", account}}, {"total"}}}, - name, - border, - durations); -} - -} diff --git a/ydb/core/persqueue/sli_duration_counter.h b/ydb/core/persqueue/sli_duration_counter.h deleted file mode 100644 index 20ad40d19b85..000000000000 --- a/ydb/core/persqueue/sli_duration_counter.h +++ /dev/null @@ -1,24 +0,0 @@ -#pragma once - -#include "percentile_counter.h" - -#include - -#include -#include -#include - -namespace NKikimr::NPQ { - -TPercentileCounter CreateSLIDurationCounter(const NActors::TActorContext& ctx, - const TString& name, - const TVector& durations, - const TString& account, - ui32 border); - -inline -ui64 ToMilliSeconds(TDuration d) { - return d.MilliSeconds(); -} - -} diff --git a/ydb/core/persqueue/transaction.cpp b/ydb/core/persqueue/transaction.cpp index b6ace38cadfe..63fe2a184b20 100644 --- a/ydb/core/persqueue/transaction.cpp +++ b/ydb/core/persqueue/transaction.cpp @@ -60,13 +60,6 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction& } PartitionsData = std::move(tx.GetPartitions()); - - if (tx.HasBeginTime()) { - BeginTime = TInstant::MicroSeconds(tx.GetBeginTime()); - } - if (tx.HasChangeStateTime()) { - ChangeStateTime = TInstant::MicroSeconds(tx.GetChangeStateTime()); - } } TString TDistributedTransaction::LogPrefix() const @@ -427,13 +420,6 @@ NKikimrPQ::TTransaction TDistributedTransaction::Serialize(EState state) { *tx.MutablePartitions() = PartitionsData; - if (BeginTime.Defined()) { - tx.SetBeginTime(BeginTime->MicroSeconds()); - } - if (ChangeStateTime.Defined()) { - tx.SetChangeStateTime(ChangeStateTime->MicroSeconds()); - } - return tx; } diff --git a/ydb/core/persqueue/transaction.h b/ydb/core/persqueue/transaction.h index 7d412b9ea7a7..69600bd34e60 100644 --- a/ydb/core/persqueue/transaction.h +++ b/ydb/core/persqueue/transaction.h @@ -50,8 +50,6 @@ struct TDistributedTransaction { THashMap PredicateRecipients; TVector Operations; TMaybe WriteId; - TMaybe ChangeStateTime; - TMaybe BeginTime; EDecision SelfDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN; EDecision ParticipantsDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN; diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 6363836541a6..2fce5dd8d66a 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include #include @@ -106,20 +105,19 @@ TString TEvPartitionWriter::TEvWriteResponse::ToString() const { return out; } -class TPartitionWriter: public TActorBootstrapped, private TRlHelpers { +class TPartitionWriter : public TActorBootstrapped, private TRlHelpers { using EErrorCode = TEvPartitionWriter::TEvWriteResponse::EErrorCode; struct TUserWriteRequest { NKikimrClient::TPersQueueRequest Request; - TInstant BeginTime; }; - struct RequestHolder { + struct TRequestHolder { TUserWriteRequest Write; bool QuotaCheckEnabled; bool QuotaAccepted; - RequestHolder(TUserWriteRequest&& write, bool quotaCheckEnabled) + TRequestHolder(TUserWriteRequest&& write, bool quotaCheckEnabled) : Write(std::move(write)) , QuotaCheckEnabled(quotaCheckEnabled) , QuotaAccepted(false) { @@ -128,8 +126,6 @@ class TPartitionWriter: public TActorBootstrapped, private TRl struct TSentRequest { ui64 Cookie; - TInstant BeginTime; - TInstant SendTime; }; static constexpr size_t MAX_QUOTA_INFLIGHT = 3; @@ -171,22 +167,16 @@ class TPartitionWriter: public TActorBootstrapped, private TRl } void SendError(const TString& error) { - TInstant now = ActorContext().Now(); - for (auto request : std::exchange(PendingWrite, {})) { - RequestLatency.IncFor(ToMilliSeconds(now - request.BeginTime)); - PQTabletLatency.IncFor(ToMilliSeconds(now - request.SendTime)); - SendWriteResult(ErrorCode, error, MakeResponse(request.Cookie)); - } - for (const auto& [cookie, request] : std::exchange(PendingReserve, {})) { - RequestLatency.IncFor(ToMilliSeconds(now - request.Write.BeginTime)); + for (auto& [cookie] : std::exchange(PendingWrite, {})) { SendWriteResult(ErrorCode, error, MakeResponse(cookie)); } - for (const auto& [cookie, request] : std::exchange(ReceivedReserve, {})) { - RequestLatency.IncFor(ToMilliSeconds(now - request.Write.BeginTime)); + for (const auto& [cookie, _] : std::exchange(PendingReserve, {})) { SendWriteResult(ErrorCode, error, MakeResponse(cookie)); } - for (const auto& [cookie, request] : std::exchange(Pending, {})) { - RequestLatency.IncFor(ToMilliSeconds(now - request.BeginTime)); + for (const auto& [cookie, _] : std::exchange(ReceivedReserve, {})) { + SendWriteResult(ErrorCode, error, MakeResponse(cookie)); + } + for (const auto& [cookie, _] : std::exchange(Pending, {})) { SendWriteResult(ErrorCode, error, MakeResponse(cookie)); } } @@ -256,9 +246,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl /// GetWriteId void GetWriteId(const TActorContext& ctx) { - GetWriteIdBegin = ctx.Now(); auto ev = MakeWriteIdRequest(); - GetWriteIdBegin = ctx.Now(); ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); Become(&TThis::StateGetWriteId); } @@ -285,8 +273,6 @@ class TPartitionWriter: public TActorBootstrapped, private TRl return InitResult("Invalid KQP session", record); } - GetWriteIdLatency.IncFor(ToMilliSeconds(ctx.Now() - GetWriteIdBegin)); - WriteId = NPQ::GetWriteId(record.GetResponse().GetTopicOperations()); LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, @@ -410,13 +396,11 @@ class TPartitionWriter: public TActorBootstrapped, private TRl Y_ABORT_UNLESS(HasWriteId()); Y_ABORT_UNLESS(HasSupportivePartitionId()); - SavePartitionIdBegin = ctx.Now(); - auto ev = MakeWriteIdRequest(); ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); } - void HandlePartitionIdSaved(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + void HandlePartitionIdSaved(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) { auto& record = ev->Get()->Record; switch (record.GetYdbStatus()) { case Ydb::StatusIds::SUCCESS: @@ -428,8 +412,6 @@ class TPartitionWriter: public TActorBootstrapped, private TRl return InitResult("Invalid KQP session", record); } - SavePartitionIdLatency.IncFor(ToMilliSeconds(ctx.Now() - SavePartitionIdBegin)); - GetMaxSeqNo(); } @@ -576,8 +558,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl SetWriteId(*record.MutablePartitionRequest()); - TUserWriteRequest request{std::move(record), ActorContext().Now()}; - Pending.emplace(cookie, std::move(request)); + Pending.emplace(cookie, TUserWriteRequest(std::move(record))); return true; } @@ -621,7 +602,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl NTabletPipe::SendData(SelfId(), PipeClient, ev.Release()); - PendingReserve.emplace(it->first, RequestHolder{ std::move(it->second), needToRequestQuota }); + PendingReserve.emplace(it->first, TRequestHolder{std::move(it->second), needToRequestQuota}); Pending.erase(it); if (needToRequestQuota && processed == MAX_QUOTA_INFLIGHT) { @@ -710,7 +691,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl ReceivedQuota.clear(); } - void Write(ui64 cookie, RequestHolder&& holder) { + void Write(ui64 cookie, TRequestHolder&& holder) { auto ev = MakeHolder(); ev->Record = std::move(holder.Write.Request); @@ -728,7 +709,7 @@ class TPartitionWriter: public TActorBootstrapped, private TRl NTabletPipe::SendData(SelfId(), PipeClient, ev.Release()); - PendingWrite.emplace_back(cookie, holder.Write.BeginTime, ActorContext().Now()); + PendingWrite.emplace_back(cookie); } void Handle(TEvPersQueue::TEvResponse::TPtr& ev) { @@ -786,11 +767,6 @@ class TPartitionWriter: public TActorBootstrapped, private TRl return WriteResult(EErrorCode::InternalError, error, std::move(record)); } - TInstant now = ActorContext().Now(); - - PQTabletLatency.IncFor(ToMilliSeconds(now - front.SendTime)); - RequestLatency.IncFor(ToMilliSeconds(now - front.BeginTime)); - WriteResult(std::move(record)); } } @@ -899,27 +875,6 @@ class TPartitionWriter: public TActorBootstrapped, private TRl PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, config)); - RequestLatency = NPQ::CreateSLIDurationCounter(ctx, - "PartitionWriterLatency", - {2, 5, 10, 20, 50, 100, 150, 200, 250, 300, 400, 500, 750, 1'000, 1'500, 2'000}, - "", // account - 0); - GetWriteIdLatency = NPQ::CreateSLIDurationCounter(ctx, - "GetWriteIdLatency", - {2, 5, 10, 20, 50, 100, 150, 200, 250, 300, 400, 500, 750, 1'000, 1'500, 2'000}, - "", // account - 0); - SavePartitionIdLatency = NPQ::CreateSLIDurationCounter(ctx, - "SavePartitionIdLatency", - {2, 5, 10, 20, 50, 100, 150, 200, 250, 300, 400, 500, 750, 1'000, 1'500, 2'000}, - "", // account - 0); - PQTabletLatency = NPQ::CreateSLIDurationCounter(ctx, - "PQTabletLatency", - {2, 5, 10, 20, 50, 100, 150, 200, 250, 300, 400, 500, 750, 1'000, 1'500, 2'000}, - "", // account - 0); - if (Opts.Database && Opts.SessionId && Opts.TxId) { GetWriteId(ctx); } else { @@ -965,8 +920,8 @@ class TPartitionWriter: public TActorBootstrapped, private TRl ui64 MessageNo = 0; TMap Pending; - TMap PendingReserve; - TMap ReceivedReserve; + TMap PendingReserve; + TMap ReceivedReserve; TDeque PendingQuota; ui64 PendingQuotaAmount = 0; TDeque ReceivedQuota; @@ -995,14 +950,6 @@ class TPartitionWriter: public TActorBootstrapped, private TRl }; IRetryState::TPtr RetryState; - - TPercentileCounter RequestLatency; - TPercentileCounter GetWriteIdLatency; - TPercentileCounter SavePartitionIdLatency; - TPercentileCounter PQTabletLatency; - - TInstant GetWriteIdBegin; - TInstant SavePartitionIdBegin; }; // TPartitionWriter diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make index 83ddeb42eddb..90ec81e4e109 100644 --- a/ydb/core/persqueue/ya.make +++ b/ydb/core/persqueue/ya.make @@ -48,7 +48,6 @@ SRCS( microseconds_sliding_window.cpp dread_cache_service/caching_service.cpp write_id.cpp - sli_duration_counter.cpp ) GENERATE_ENUM_SERIALIZATION(read_balancer__balancing.h) diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index ada1d7e49a01..6af81ad2544a 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -1151,9 +1151,6 @@ message TTransaction { optional NActorsProto.TActorId SourceActor = 14; optional TWriteId WriteId = 15; - - optional uint64 BeginTime = 17; - optional uint64 ChangeStateTime = 18; }; message TTabletTxInfo { diff --git a/ydb/services/persqueue_v1/actors/partition_writer.cpp b/ydb/services/persqueue_v1/actors/partition_writer.cpp index 99d21093ef51..7f180d6a25d3 100644 --- a/ydb/services/persqueue_v1/actors/partition_writer.cpp +++ b/ydb/services/persqueue_v1/actors/partition_writer.cpp @@ -19,13 +19,11 @@ void TPartitionWriter::OnWriteRequest(THolderRecord.HasPartitionRequest()); if (SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { - SentRequests.emplace_back(ev->Record.GetPartitionRequest().GetCookie(), - ctx.Now()); + SentRequests.emplace_back(ev->Record.GetPartitionRequest().GetCookie()); ctx.Send(Actor, ev.Release()); } else { - QuotedRequests.emplace_back(std::move(ev), - ctx.Now()); + QuotedRequests.emplace_back(std::move(ev)); } } @@ -36,7 +34,7 @@ void TPartitionWriter::OnWriteAccepted(const NPQ::TEvPartitionWriter::TEvWriteAc const TSentRequest& front = SentRequests.front(); - AcceptedRequests.emplace_back(front.Cookie, front.BeginTime); + AcceptedRequests.emplace_back(front.Cookie); SentRequests.pop_front(); if (QuotedRequests.empty()) { @@ -47,25 +45,20 @@ void TPartitionWriter::OnWriteAccepted(const NPQ::TEvPartitionWriter::TEvWriteAc auto next = std::move(QuotedRequests.front()); QuotedRequests.pop_front(); - SentRequests.emplace_back(next.Write->Record.GetPartitionRequest().GetCookie(), - next.BeginTime); + SentRequests.emplace_back(next.Write->Record.GetPartitionRequest().GetCookie()); ctx.Send(Actor, next.Write.Release()); } } -TInstant TPartitionWriter::OnWriteResponse(const NPQ::TEvPartitionWriter::TEvWriteResponse& ev) +void TPartitionWriter::OnWriteResponse(const NPQ::TEvPartitionWriter::TEvWriteResponse& ev) { Y_ABORT_UNLESS(ev.IsSuccess()); Y_ABORT_UNLESS(!AcceptedRequests.empty()); Y_ABORT_UNLESS(ev.Record.GetPartitionResponse().GetCookie() == AcceptedRequests.front().Cookie); - TInstant beginTime = AcceptedRequests.front().BeginTime; - AcceptedRequests.pop_front(); - - return beginTime; } bool TPartitionWriter::HasPendingRequests() const diff --git a/ydb/services/persqueue_v1/actors/partition_writer.h b/ydb/services/persqueue_v1/actors/partition_writer.h index d936ea796a1c..a0dcbca075a0 100644 --- a/ydb/services/persqueue_v1/actors/partition_writer.h +++ b/ydb/services/persqueue_v1/actors/partition_writer.h @@ -12,7 +12,7 @@ struct TPartitionWriter { void OnEvInitResult(const NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev); void OnWriteRequest(THolder&& ev, const TActorContext& ctx); void OnWriteAccepted(const NPQ::TEvPartitionWriter::TEvWriteAccepted& ev, const TActorContext& ctx); - TInstant OnWriteResponse(const NPQ::TEvPartitionWriter::TEvWriteResponse& ev); + void OnWriteResponse(const NPQ::TEvPartitionWriter::TEvWriteResponse& ev); bool HasPendingRequests() const; @@ -23,12 +23,10 @@ struct TPartitionWriter { struct TUserWriteRequest { THolder Write; - TInstant BeginTime; }; struct TSentRequest { ui64 Cookie; - TInstant BeginTime; }; // Quoted, but not sent requests diff --git a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp index 81a8b51d087d..2e7ddff5e107 100644 --- a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp +++ b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp @@ -18,12 +18,6 @@ void TPartitionWriterCacheActor::Bootstrap(const TActorContext& ctx) { RegisterDefaultPartitionWriter(ctx); - RequestLatency = NPQ::CreateSLIDurationCounter(ctx, - "SessionCacheLatency", - {2, 5, 10, 20, 50, 100, 150, 200, 250, 300, 400, 500, 750, 1'000, 1'500, 2'000}, - "", // account - 0); - this->Become(&TPartitionWriterCacheActor::StateWork); } @@ -181,9 +175,7 @@ void TPartitionWriterCacheActor::Handle(NPQ::TEvPartitionWriter::TEvWriteRespons if (result.IsSuccess()) { ui64 cookie = result.Record.GetPartitionResponse().GetCookie(); if (cookie == p->second->AcceptedRequests.front().Cookie) { - TInstant beginTime = p->second->OnWriteResponse(result); - - RequestLatency.IncFor(NPQ::ToMilliSeconds(ctx.Now() - beginTime)); + p->second->OnWriteResponse(result); TryForwardToOwner(ev->Release().Release(), PendingWriteResponse, cookie, diff --git a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h index 1430571ae831..de3d4254b746 100644 --- a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h +++ b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h @@ -3,7 +3,6 @@ #include "events.h" #include "partition_writer.h" -#include #include namespace NKikimr::NGRpcProxy::V1 { @@ -75,8 +74,6 @@ class TPartitionWriterCacheActor : public NActors::TActorBootstrapped PendingWriteAccepted; TEventQueue PendingWriteResponse; - - NPQ::TPercentileCounter RequestLatency; }; } diff --git a/ydb/services/persqueue_v1/actors/write_request_info.h b/ydb/services/persqueue_v1/actors/write_request_info.h index 898c7d7d2e23..bf92e295bc2e 100644 --- a/ydb/services/persqueue_v1/actors/write_request_info.h +++ b/ydb/services/persqueue_v1/actors/write_request_info.h @@ -8,7 +8,6 @@ struct TWriteRequestInfoImpl : public TSimpleRefCount Write; - TInstant BeginTime; }; explicit TWriteRequestInfoImpl(ui64 cookie) @@ -35,9 +34,6 @@ struct TWriteRequestInfoImpl : public TSimpleRefCount diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.cpp b/ydb/services/persqueue_v1/actors/write_session_actor.cpp index 8f89112b55d0..bfed08e7a3e9 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.cpp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.cpp @@ -20,8 +20,6 @@ #include #include -#include - using namespace NActors; using namespace NKikimrClient; @@ -712,17 +710,6 @@ void TWriteSessionActor::ProceedPartition(const ui32 parti } Y_ABORT_UNLESS(FullConverter); - - RequestLatency = NPQ::CreateSLIDurationCounter(ctx, - "WriteSessionLatency", - {2, 5, 10, 20, 50, 100, 150, 200, 250, 300, 400, 500, 750, 1'000, 1'500, 2'000}, - FullConverter->GetAccount(), - 0); - QuotaLatency = NPQ::CreateSLIDurationCounter(ctx, - "QuotaLatency", - {2, 5, 10, 20, 50, 100, 150, 200, 250, 300, 400, 500, 750, 1'000, 1'500, 2'000}, - FullConverter->GetAccount(), - 0); } template @@ -1006,7 +993,7 @@ void TWriteSessionActor::ProcessWriteResponse( ui32 partitionCmdWriteResultIndex = 0; // TODO: Send single batch write response for all user write requests up to some max size/count - for (const auto& [userWriteRequest, beginTime] : writeRequest->UserWriteRequests) { + for (const auto& [userWriteRequest] : writeRequest->UserWriteRequests) { TServerMessage result; result.set_status(Ydb::StatusIds::SUCCESS); @@ -1059,8 +1046,6 @@ void TWriteSessionActor::ProcessWriteResponse( } - RequestLatency.IncFor(ToMilliSeconds(ctx.Now() - beginTime)); - if (!Request->GetStreamCtx()->Write(std::move(result))) { // TODO: Log gRPC write error code LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc write failed"); @@ -1222,7 +1207,7 @@ void TWriteSessionActor::PrepareRequest(THolder& } } - pendingRequest->UserWriteRequests.emplace_back(std::move(ev), ctx.Now()); + pendingRequest->UserWriteRequests.emplace_back(std::move(ev)); pendingRequest->ByteSize = request.ByteSize(); auto msgMetaEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicMessageMeta(); @@ -1246,7 +1231,6 @@ void TWriteSessionActor::PrepareRequest(THolder& if (!PendingQuotaRequest) { if (MaybeRequestQuota(PendingRequests.front()->RequiredQuota, EWakeupTag::RlAllowed, ctx)) { PendingQuotaRequest = std::move(PendingRequests.front()); - PendingQuotaRequest->QuotaTime = ctx.Now(); PendingRequests.pop_front(); } } @@ -1264,7 +1248,7 @@ void TWriteSessionActor::SendWriteRequest(typename TWriteR Y_ABORT_UNLESS(request->PartitionWriteRequest); i64 diff = 0; - for (const auto& [w, _] : request->UserWriteRequests) { + for (const auto& [w] : request->UserWriteRequests) { diff -= w->Request.ByteSize(); } @@ -1283,7 +1267,6 @@ void TWriteSessionActor::SendWriteRequest(typename TWriteR std::make_unique(sessionId, txId, std::move(request->PartitionWriteRequest)); - request->SendTime = ctx.Now(); ctx.Send(PartitionWriterCache, std::move(event)); BytesWrittenByUserAgent->Add(request->ByteSize); @@ -1542,14 +1525,11 @@ void TWriteSessionActor::Handle(TEvents::TEvWakeup::TPtr& counters->AddConsumedRequestUnits(PendingQuotaRequest->RequiredQuota); } - QuotaLatency.IncFor(ToMilliSeconds(ctx.Now() - PendingQuotaRequest->QuotaTime)); - SendWriteRequest(std::move(PendingQuotaRequest), ctx); if (!PendingRequests.empty()) { Y_ABORT_UNLESS(MaybeRequestQuota(PendingRequests.front()->RequiredQuota, EWakeupTag::RlAllowed, ctx)); PendingQuotaRequest = std::move(PendingRequests.front()); - PendingQuotaRequest->QuotaTime = ctx.Now(); PendingRequests.pop_front(); } @@ -1558,9 +1538,7 @@ void TWriteSessionActor::Handle(TEvents::TEvWakeup::TPtr& case EWakeupTag::RlNoResource: case EWakeupTag::RlInitNoResource: - QuotaLatency.IncFor(ToMilliSeconds(ctx.Now() - PendingQuotaRequest->QuotaTime)); if (PendingQuotaRequest) { - PendingQuotaRequest->QuotaTime = ctx.Now(); Y_ABORT_UNLESS(MaybeRequestQuota(PendingQuotaRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx)); } else { return CloseSession("Throughput limit exceeded", PersQueue::ErrorCode::OVERLOAD, ctx); diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index 57c839aacbd3..aef94f37da55 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -269,8 +269,6 @@ class TWriteSessionActor TInstant StartTime; NKikimr::NPQ::TPercentileCounter InitLatency; NKikimr::NPQ::TMultiCounter SLIBigLatency; - NKikimr::NPQ::TPercentileCounter RequestLatency; - NKikimr::NPQ::TPercentileCounter QuotaLatency; TInitRequest InitRequest;