diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index 0293badd5a01..64126ef80031 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -586,6 +586,7 @@ void TRowDispatcher::UpdateMetrics() { } for (const auto& key : toDelete) { SetQueryMetrics(key, 0, 0, 0); + Metrics.Counters->RemoveSubgroup("query_id", key.QueryId); AggrStats.LastQueryStats.erase(key); } PrintStateToLog(); diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 46db3de18521..0baf82b93b16 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -93,20 +93,27 @@ class TTopicSession : public TActorBootstrapped { struct TClientsInfo : public IClientDataConsumer { using TPtr = TIntrusivePtr; - TClientsInfo(TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, NMonitoring::TDynamicCounterPtr& counters) + TClientsInfo(TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, const NMonitoring::TDynamicCounterPtr& counters, const TString& readGroup) : Self(self) , LogPrefix(logPrefix) , HandlerSettings(handlerSettings) , Settings(ev->Get()->Record) , ReadActorId(ev->Sender) - , FilteredDataRate(counters->GetCounter("FilteredDataRate", true)) - , RestartSessionByOffsetsByQuery(counters->GetCounter("RestartSessionByOffsetsByQuery", true)) + , Counters(counters) { if (Settings.HasOffset()) { NextMessageOffset = Settings.GetOffset(); InitialOffset = Settings.GetOffset(); } Y_UNUSED(TDuration::TryParse(Settings.GetSource().GetReconnectPeriod(), ReconnectPeriod)); + auto queryGroup = Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId()); + auto topicGroup = queryGroup->GetSubgroup("read_group", CleanupCounterValueString(readGroup)); + FilteredDataRate = topicGroup->GetCounter("FilteredDataRate", true); + RestartSessionByOffsetsByQuery = counters->GetCounter("RestartSessionByOffsetsByQuery", true); + } + + ~TClientsInfo() { + Counters->RemoveSubgroup("query_id", Settings.GetQueryId()); } TActorId GetClientId() const override { @@ -188,6 +195,7 @@ class TTopicSession : public TActorBootstrapped { // Metrics ui64 InitialOffset = 0; TStats Stat; // Send (filtered) to read_actor + const ::NMonitoring::TDynamicCounterPtr Counters; NMonitoring::TDynamicCounters::TCounterPtr FilteredDataRate; // filtered NMonitoring::TDynamicCounters::TCounterPtr RestartSessionByOffsetsByQuery; }; @@ -707,10 +715,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { const TString& format = source.GetFormat(); ITopicFormatHandler::TSettings handlerSettings = {.ParsingFormat = format ? format : "raw"}; - auto queryGroup = Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId()); - auto readGroup = queryGroup->GetSubgroup("read_group", CleanupCounterValueString(ReadGroup)); - auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive(*this, LogPrefix, handlerSettings, ev, readGroup)}).first->second; - + auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup)}).first->second; auto formatIt = FormatHandlers.find(handlerSettings); if (formatIt == FormatHandlers.end()) { formatIt = FormatHandlers.insert({handlerSettings, CreateTopicFormatHandler(ActorContext(), FormatHandlerConfig, handlerSettings, Metrics.PartitionGroup)}).first; diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 810d54cd8693..0989e24dd037 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -86,7 +86,7 @@ struct TRowDispatcherReadActorMetrics { } ~TRowDispatcherReadActorMetrics() { - SubGroup->RemoveSubgroup("id", TxId); + SubGroup->RemoveSubgroup("tx_id", TxId); } TString TxId; diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index 900b0f9916bb..6534679783c8 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -106,7 +106,7 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: } ~TMetrics() { - SubGroup->RemoveSubgroup("id", TxId); + SubGroup->RemoveSubgroup("tx_id", TxId); } TString TxId; diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index 18bb17683c4f..33dc408efb11 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -106,7 +106,7 @@ class TDqPqWriteActor : public NActors::TActor, public IDqCompu } ~TMetrics() { - SubGroup->RemoveSubgroup("id", TxId); + SubGroup->RemoveSubgroup("tx_id", TxId); } TString TxId;