Skip to content

Commit

Permalink
Added metrics for the number of active and inactive topic partitions (y…
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored May 21, 2024
1 parent ccdedf8 commit 680ee23
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 19 deletions.
65 changes: 47 additions & 18 deletions ydb/core/persqueue/read_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,8 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr
}

Balancer->UpdateConfig(newPartitionsIds, deletedPartitions, ctx);

UpdateConfigCounters();
}


Expand Down Expand Up @@ -799,50 +801,74 @@ void TPersQueueReadBalancer::CheckStat(const TActorContext& ctx) {
UpdateCounters(ctx);
}

void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
if (!AggregatedStats.Stats.size())
void TPersQueueReadBalancer::InitCounters(const TActorContext& ctx) {
if (!DatabasePath) {
return;
}

if (!DatabasePath)
if (DynamicCounters) {
return;

using TPartitionLabeledCounters = TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>;
THolder<TPartitionLabeledCounters> labeledCounters;
using TConsumerLabeledCounters = TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor>;
THolder<TConsumerLabeledCounters> labeledConsumerCounters;


labeledCounters.Reset(new TPartitionLabeledCounters("topic", 0, DatabasePath));
labeledConsumerCounters.Reset(new TConsumerLabeledCounters("topic|x|consumer", 0, DatabasePath));

auto counters = AppData(ctx)->Counters;
bool isServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe
}

TStringBuf name = TStringBuf(Path);
name.SkipPrefix(DatabasePath);
name.SkipPrefix("/");
counters = counters->GetSubgroup("counters", isServerless ? "topics_serverless" : "topics")

bool isServerless = AppData(ctx)->FeatureFlags.GetEnableDbCounters(); //TODO: find out it via describe
DynamicCounters = AppData(ctx)->Counters->GetSubgroup("counters", isServerless ? "topics_serverless" : "topics")
->GetSubgroup("host", "")
->GetSubgroup("database", DatabasePath)
->GetSubgroup("cloud_id", CloudId)
->GetSubgroup("folder_id", FolderId)
->GetSubgroup("database_id", DatabaseId)
->GetSubgroup("topic", TString(name));

ActivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.active_count", false);
InactivePartitionCountCounter = DynamicCounters->GetExpiringNamedCounter("name", "topic.partition.inactive_count", false);
}

void TPersQueueReadBalancer::UpdateConfigCounters() {
if (!DynamicCounters) {
return;
}

size_t inactiveCount = std::count_if(TabletConfig.GetPartitions().begin(), TabletConfig.GetPartitions().end(), [](auto& p) {
return p.GetStatus() == NKikimrPQ::ETopicPartitionStatus::Inactive;
});

ActivePartitionCountCounter->Set(PartitionsInfo.size() - inactiveCount);
InactivePartitionCountCounter->Set(inactiveCount);
}

void TPersQueueReadBalancer::UpdateCounters(const TActorContext& ctx) {
if (!AggregatedStats.Stats.size())
return;

if (!DynamicCounters)
return;

using TPartitionLabeledCounters = TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>;
THolder<TPartitionLabeledCounters> labeledCounters;
using TConsumerLabeledCounters = TProtobufTabletLabeledCounters<EClientLabeledCounters_descriptor>;
THolder<TConsumerLabeledCounters> labeledConsumerCounters;

labeledCounters.Reset(new TPartitionLabeledCounters("topic", 0, DatabasePath));
labeledConsumerCounters.Reset(new TConsumerLabeledCounters("topic|x|consumer", 0, DatabasePath));

if (AggregatedCounters.empty()) {
for (ui32 i = 0; i < labeledCounters->GetCounters().Size(); ++i) {
TString name = labeledCounters->GetNames()[i];
TStringBuf nameBuf = name;
nameBuf.SkipPrefix("PQ/");
name = nameBuf;
AggregatedCounters.push_back(name.empty() ? nullptr : counters->GetExpiringNamedCounter("name", name, false));
AggregatedCounters.push_back(name.empty() ? nullptr : DynamicCounters->GetExpiringNamedCounter("name", name, false));
}
}

for (auto& [consumer, info]: Consumers) {
info.Aggr.Reset(new TTabletLabeledCountersBase{});
if (info.AggregatedCounters.empty()) {
auto clientCounters = counters->GetSubgroup("consumer", NPersQueue::ConvertOldConsumerName(consumer, ctx));
auto clientCounters = DynamicCounters->GetSubgroup("consumer", NPersQueue::ConvertOldConsumerName(consumer, ctx));
for (ui32 i = 0; i < labeledConsumerCounters->GetCounters().Size(); ++i) {
TString name = labeledConsumerCounters->GetNames()[i];
TStringBuf nameBuf = name;
Expand Down Expand Up @@ -1106,6 +1132,9 @@ void TPersQueueReadBalancer::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated
if (attr.GetKey() == "cloud_id") CloudId = attr.GetValue();
if (attr.GetKey() == "database_id") DatabaseId = attr.GetValue();
}

InitCounters(ctx);
UpdateConfigCounters();
}

if (PartitionsScaleManager) {
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/persqueue/read_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
void RequestTabletIfNeeded(const ui64 tabletId, const TActorContext&, bool pipeReconnected = false);
void ClosePipe(const ui64 tabletId, const TActorContext&);
void CheckStat(const TActorContext&);

void InitCounters(const TActorContext&);
void UpdateCounters(const TActorContext&);
void UpdateConfigCounters();

void RespondWithACL(
const TEvPersQueue::TEvCheckACL::TPtr &request,
Expand Down Expand Up @@ -217,6 +220,10 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa

std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedCounters;

NMonitoring::TDynamicCounterPtr DynamicCounters;
NMonitoring::TDynamicCounters::TCounterPtr ActivePartitionCountCounter;
NMonitoring::TDynamicCounters::TCounterPtr InactivePartitionCountCounter;

TString DatabasePath;
TString DatabaseId;
TString FolderId;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/read_balancer__txinit.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct TPersQueueReadBalancer::TTxInit : public ITransaction {
Self->Consumers[consumer.GetName()];
}
Self->PartitionGraph = MakePartitionGraph(Self->TabletConfig);
Self->UpdateConfigCounters();
}
Self->Inited = true;
if (!dataRowset.Next())
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/persqueue/ut/common/pq_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,15 @@ void PQBalancerPrepare(const TString topic, const TVector<std::pair<ui32, std::p
part->SetPartition(p.first);
part->SetGroup(p.second.second);
part->SetTabletId(p.second.first);
part->SetStatus(::NKikimrPQ::ETopicPartitionStatus::Active);

auto tablet = request->Record.AddTablets();
tablet->SetTabletId(p.second.first);
tablet->SetOwner(1);
tablet->SetIdx(p.second.first);

auto* pp = request->Record.MutableTabletConfig()->AddPartitions();
pp->SetStatus(::NKikimrPQ::ETopicPartitionStatus::Active);
}
request->Record.SetTxId(12345);
request->Record.SetPathId(1);
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/persqueue/ut/counters_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,10 @@ Y_UNIT_TEST(PartitionFirstClass) {
auto counters = tc.Runtime->GetAppData(0).Counters;
auto dbGroup = GetServiceCounters(counters, "topics_serverless", false);

auto group = dbGroup->GetSubgroup("host", "")->GetSubgroup("database", "/Root")->GetSubgroup("cloud_id", "cloud_id")->GetSubgroup("folder_id", "folder_id")
auto group = dbGroup->GetSubgroup("host", "")
->GetSubgroup("database", "/Root")
->GetSubgroup("cloud_id", "cloud_id")
->GetSubgroup("folder_id", "folder_id")
->GetSubgroup("database_id", "database_id")->GetSubgroup("topic", "topic");
group->GetNamedCounter("name", "topic.partition.uptime_milliseconds_min", false)->Set(30000);
group->GetNamedCounter("name", "topic.partition.write.lag_milliseconds_max", false)->Set(600);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/ut/resources/counters_topics.html
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
database_id=database_id:

topic=topic:
name=topic.partition.active_count: 1
name=topic.partition.alive_count: 1
name=topic.partition.inactive_count: 0
name=topic.partition.init_duration_milliseconds_max: 0
name=topic.partition.producers_count_max: 3
name=topic.partition.read.inflight_throttled_microseconds_max: 0
Expand Down

0 comments on commit 680ee23

Please sign in to comment.