Skip to content

Commit

Permalink
[/] statistics counters
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov committed Nov 22, 2024
1 parent a9308aa commit e62ecb5
Show file tree
Hide file tree
Showing 17 changed files with 28 additions and 399 deletions.
182 changes: 0 additions & 182 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,6 @@ void TPersQueue::ApplyNewConfig(const NKikimrPQ::TPQTabletConfig& newConfig,
ctx.Send(CacheActor, new TEvPQ::TEvChangeCacheConfig(cacheSize));
}

SetupTransactionCounters(ctx);

InitializeMeteringSink(ctx);
}

Expand Down Expand Up @@ -1007,7 +1005,6 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
}

EndInitTransactions();
SetupTransactionCounters(ctx);
EndReadConfig(ctx);
}

Expand Down Expand Up @@ -2961,160 +2958,6 @@ TPersQueue::TPersQueue(const TActorId& tablet, TTabletStorageInfo *info)
State.Clear();
}

void TPersQueue::SetupTransactionCounters(const TActorContext& ctx)
{
if (!TxExecutionTime.empty() || !TopicConverter) {
return;
}

NMonitoring::TDynamicCounterPtr counters = AppData(ctx)->Counters;
TVector<NPersQueue::TPQLabelsInfo> labels;

TVector<std::pair<TString, TString>> subgroups;
subgroups.emplace_back("topic", TopicConverter->GetClientsideName());

SetupTransactionExecutionTimeCounter(counters, labels, subgroups);
SetupTransactionStartedCounter(counters, labels, subgroups);
SetupTransactionCompletedCounter(counters, labels, subgroups);
SetupTransactionResponseTime(counters, labels, subgroups);
}

TVector<std::pair<ui64, TString>> GetTransactionCounterIntervals()
{
TVector<std::pair<ui64, TString>> intervals;
intervals.emplace_back(1, "1ms");
intervals.emplace_back(2, "2ms");
intervals.emplace_back(5, "5ms");
intervals.emplace_back(10, "10ms");
intervals.emplace_back(20, "20ms");
intervals.emplace_back(50, "50ms");
for (unsigned k = 80; k <= 600; k += 30) {
intervals.emplace_back(k, TStringBuilder() << k << "ms");
}
intervals.emplace_back(1000, "1000ms");

return intervals;
}

void TPersQueue::SetupTransactionExecutionTimeCounter(NMonitoring::TDynamicCounterPtr counters,
const TVector<NPersQueue::TPQLabelsInfo>& labels,
const TVector<std::pair<TString, TString>>& subgroupsSrc)
{
auto intervals = GetTransactionCounterIntervals();

const TVector<NKikimrPQ::TTransaction::EState> states {
NKikimrPQ::TTransaction::UNKNOWN,
NKikimrPQ::TTransaction::PREPARING,
NKikimrPQ::TTransaction::PREPARED,
NKikimrPQ::TTransaction::PLANNING,
NKikimrPQ::TTransaction::PLANNED,
NKikimrPQ::TTransaction::CALCULATING,
NKikimrPQ::TTransaction::CALCULATED,
NKikimrPQ::TTransaction::WAIT_RS,
NKikimrPQ::TTransaction::EXECUTING,
NKikimrPQ::TTransaction::EXECUTED,
NKikimrPQ::TTransaction::WAIT_RS_ACKS,
NKikimrPQ::TTransaction::DELETING,
};

auto subgroups = subgroupsSrc;
subgroups.emplace_back("name", "topic.transaction.execution_time");

for (auto state : states) {
// cluster
// service = data-stream | data-stream-serverless
// name = topic.transaction.execution_time
// state = ...
// bin = *
subgroups.emplace_back("state", NKikimrPQ::TTransaction_EState_Name(state));
TxExecutionTime[state] =
MakeHolder<NKikimr::NPQ::TPercentileCounter>(NPersQueue::GetCountersForTopic(counters, IsServerless),
labels,
subgroups,
"bin",
intervals,
true);
subgroups.pop_back();
}
}

void TPersQueue::SetupTransactionStartedCounter(NMonitoring::TDynamicCounterPtr counters,
const TVector<NPersQueue::TPQLabelsInfo>& labels,
const TVector<std::pair<TString, TString>>& subgroups)
{
// cluster
// service = data-stream | data-stream-serverless
// name = topic.transaction.started
TxStarted = MakeHolder<NKikimr::NPQ::TMultiCounter>(NPersQueue::GetCountersForTopic(counters, IsServerless),
labels,
subgroups,
TVector<TString>(1, "topic.transaction.started"),
true,
"name");
}

void TPersQueue::SetupTransactionCompletedCounter(NMonitoring::TDynamicCounterPtr counters,
const TVector<NPersQueue::TPQLabelsInfo>& labels,
const TVector<std::pair<TString, TString>>& subgroups)
{
// cluster
// service = data-stream | data-stream-serverless
// name = topic.transaction.completed
TxCompleted = MakeHolder<NKikimr::NPQ::TMultiCounter>(NPersQueue::GetCountersForTopic(counters, IsServerless),
labels,
subgroups,
TVector<TString>(1, "topic.transaction.completed"),
true,
"name");
}

void TPersQueue::SetupTransactionResponseTime(NMonitoring::TDynamicCounterPtr counters,
const TVector<NPersQueue::TPQLabelsInfo>& labels,
const TVector<std::pair<TString, TString>>& subgroupsSrc)
{
auto intervals = GetTransactionCounterIntervals();

auto subgroups = subgroupsSrc;
subgroups.emplace_back("name", "topic.transaction.response_time");

// cluster
// service = data-stream | data-stream-serverless
// name = topic.transaction.response_time
// bin = *
TxResponseTime =
MakeHolder<NKikimr::NPQ::TPercentileCounter>(NPersQueue::GetCountersForTopic(counters, IsServerless),
labels,
subgroups,
"bin",
intervals,
true);
}

void TPersQueue::IncTxStarted()
{
if (!TxStarted) {
return;
}
TxStarted->Inc();
}

void TPersQueue::IncTxCompleted()
{
if (!TxCompleted) {
return;
}
TxCompleted->Inc();
}

void TPersQueue::AccountTxResponseTime(const TDistributedTransaction& tx)
{
if (!TxResponseTime) {
return;
}
auto now = TAppData::TimeProvider->Now();
TxResponseTime->IncFor((now - *tx.BeginTime).MilliSeconds());
}

void TPersQueue::CreatedHook(const TActorContext& ctx)
{
IsServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe
Expand Down Expand Up @@ -3701,20 +3544,10 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
EvProposeTransactionQueue.pop_front();

const NKikimrPQ::TEvProposeTransaction& event = front->GetRecord();
if (!Txs.contains(event.GetTxId())) {
if (TxStarted) {
TxStarted->Inc();
}
}
TDistributedTransaction& tx = Txs[event.GetTxId()];
if (!tx.BeginTime.Defined()) {
tx.BeginTime = TAppData::TimeProvider->Now();
}

switch (tx.State) {
case NKikimrPQ::TTransaction::UNKNOWN:
tx.ChangeStateTime = TAppData::TimeProvider->Now();

tx.OnProposeTransaction(event, GetAllowedStep(),
TabletID());

Expand Down Expand Up @@ -4197,20 +4030,7 @@ TDistributedTransaction* TPersQueue::GetTransaction(const TActorContext& ctx,
void TPersQueue::ChangeTxState(TDistributedTransaction& tx,
TDistributedTransaction::EState newState)
{
auto now = TAppData::TimeProvider->Now();

if (TxExecutionTime.contains(tx.State)) {
Y_ABORT_UNLESS(tx.ChangeStateTime.Defined(),
"PQ %" PRIu64 ", TxId %" PRIu64,
TabletID(), tx.TxId);
Y_ABORT_UNLESS(now >= *tx.ChangeStateTime,
"PQ %" PRIu64 ", TxId %" PRIu64,
TabletID(), tx.TxId);
TxExecutionTime[tx.State]->IncFor((now - *tx.ChangeStateTime).MilliSeconds());
}

tx.State = newState;
tx.ChangeStateTime = now;

PQ_LOG_D("TxId " << tx.TxId <<
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
Expand Down Expand Up @@ -4479,8 +4299,6 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,

if (tx.PartitionRepliesCount == tx.PartitionRepliesExpected) {
SendEvProposeTransactionResult(ctx, tx);
IncTxCompleted();
AccountTxResponseTime(tx);
PQ_LOG_D("complete TxId " << tx.TxId);

switch (tx.Kind) {
Expand Down
23 changes: 0 additions & 23 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -547,29 +547,6 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void DeleteWriteId(const TMaybe<TWriteId>& writeId);

void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const;

void SetupTransactionCounters(const TActorContext& ctx);
void SetupTransactionExecutionTimeCounter(NMonitoring::TDynamicCounterPtr counters,
const TVector<NPersQueue::TPQLabelsInfo>& labels,
const TVector<std::pair<TString, TString>>& subgroups);
void SetupTransactionStartedCounter(NMonitoring::TDynamicCounterPtr counters,
const TVector<NPersQueue::TPQLabelsInfo>& labels,
const TVector<std::pair<TString, TString>>& subgroups);
void SetupTransactionCompletedCounter(NMonitoring::TDynamicCounterPtr counters,
const TVector<NPersQueue::TPQLabelsInfo>& labels,
const TVector<std::pair<TString, TString>>& subgroups);
void SetupTransactionResponseTime(NMonitoring::TDynamicCounterPtr counters,
const TVector<NPersQueue::TPQLabelsInfo>& labels,
const TVector<std::pair<TString, TString>>& subgroups);

void IncTxStarted();
void IncTxCompleted();
void AccountTxResponseTime(const TDistributedTransaction& tx);

THashMap<NKikimrPQ::TTransaction::EState, THolder<TPercentileCounter>> TxExecutionTime;
THolder<TPercentileCounter> TxResponseTime;
THolder<NKikimr::NPQ::TMultiCounter> TxStarted;
THolder<NKikimr::NPQ::TMultiCounter> TxCompleted;
};


Expand Down
21 changes: 0 additions & 21 deletions ydb/core/persqueue/sli_duration_counter.cpp

This file was deleted.

24 changes: 0 additions & 24 deletions ydb/core/persqueue/sli_duration_counter.h

This file was deleted.

14 changes: 0 additions & 14 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction&
}

PartitionsData = std::move(tx.GetPartitions());

if (tx.HasBeginTime()) {
BeginTime = TInstant::MicroSeconds(tx.GetBeginTime());
}
if (tx.HasChangeStateTime()) {
ChangeStateTime = TInstant::MicroSeconds(tx.GetChangeStateTime());
}
}

TString TDistributedTransaction::LogPrefix() const
Expand Down Expand Up @@ -427,13 +420,6 @@ NKikimrPQ::TTransaction TDistributedTransaction::Serialize(EState state) {

*tx.MutablePartitions() = PartitionsData;

if (BeginTime.Defined()) {
tx.SetBeginTime(BeginTime->MicroSeconds());
}
if (ChangeStateTime.Defined()) {
tx.SetChangeStateTime(ChangeStateTime->MicroSeconds());
}

return tx;
}

Expand Down
2 changes: 0 additions & 2 deletions ydb/core/persqueue/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ struct TDistributedTransaction {
THashMap<ui64, bool> PredicateRecipients;
TVector<NKikimrPQ::TPartitionOperation> Operations;
TMaybe<TWriteId> WriteId;
TMaybe<TInstant> ChangeStateTime;
TMaybe<TInstant> BeginTime;

EDecision SelfDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN;
EDecision ParticipantsDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN;
Expand Down
Loading

0 comments on commit e62ecb5

Please sign in to comment.