diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto index cb1ecf21424a..7b85e1748f5b 100644 --- a/ydb/core/protos/counters_columnshard.proto +++ b/ydb/core/protos/counters_columnshard.proto @@ -135,6 +135,8 @@ enum ECumulativeCounters { COUNTER_READING_EXPORTED_BLOBS = 79 [(CounterOpts) = {Name: "ReadingExportedBlobs"}]; COUNTER_READING_EXPORTED_BYTES = 80 [(CounterOpts) = {Name: "ReadingExportedBytes"}]; COUNTER_READING_EXPORTED_RANGES = 81 [(CounterOpts) = {Name: "ReadingExportedRanges"}]; + COUNTER_PLANNED_TX_COMPLETED = 82 [(CounterOpts) = {Name: "PlannedTxCompleted"}]; + COUNTER_IMMEDIATE_TX_COMPLETED = 83 [(CounterOpts) = {Name: "ImmediateTxCompleted"}]; } enum EPercentileCounters { diff --git a/ydb/core/tx/columnshard/background_controller.h b/ydb/core/tx/columnshard/background_controller.h index bb38f2744061..b57a29d5b072 100644 --- a/ydb/core/tx/columnshard/background_controller.h +++ b/ydb/core/tx/columnshard/background_controller.h @@ -1,6 +1,7 @@ #pragma once #include "engines/changes/abstract/compaction_info.h" #include "engines/portions/meta.h" +#include namespace NKikimr::NOlap { class TColumnEngineChanges; @@ -15,11 +16,16 @@ class TBackgroundController { using TCurrentCompaction = THashMap; TCurrentCompaction ActiveCompactionInfo; + std::shared_ptr Counters; bool ActiveCleanupPortions = false; bool ActiveCleanupTables = false; bool ActiveCleanupInsertTable = false; YDB_READONLY(TMonotonic, LastIndexationInstant, TMonotonic::Zero()); public: + TBackgroundController(std::shared_ptr counters) + : Counters(std::move(counters)) { + } + THashSet GetConflictTTLPortions() const; THashSet GetConflictCompactionPortions() const; @@ -29,6 +35,7 @@ class TBackgroundController { bool StartCompaction(const NOlap::TPlanCompactionInfo& info); void FinishCompaction(const NOlap::TPlanCompactionInfo& info) { Y_ABORT_UNLESS(ActiveCompactionInfo.erase(info.GetPathId())); + Counters->OnCompactionFinish(info.GetPathId()); } const TCurrentCompaction& GetActiveCompaction() const { return ActiveCompactionInfo; diff --git a/ydb/core/tx/columnshard/blobs_action/bs/write.cpp b/ydb/core/tx/columnshard/blobs_action/bs/write.cpp index 021abc972a0d..06ebc64eccf3 100644 --- a/ydb/core/tx/columnshard/blobs_action/bs/write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/bs/write.cpp @@ -15,13 +15,10 @@ void TWriteAction::DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& self, co ui64 blobsWritten = BlobBatch.GetBlobCount(); ui64 bytesWritten = BlobBatch.GetTotalSize(); if (blobsWroteSuccessfully) { - self.IncCounter(NColumnShard::COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten); - self.IncCounter(NColumnShard::COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten); - // self.IncCounter(NColumnShard::COUNTER_RAW_BYTES_UPSERTED, insertedBytes); - self.IncCounter(NColumnShard::COUNTER_WRITE_SUCCESS); + self.Counters.GetTabletCounters()->OnWriteSuccess(blobsWritten, bytesWritten); Manager->SaveBlobBatchOnComplete(std::move(BlobBatch)); } else { - self.IncCounter(NColumnShard::COUNTER_WRITE_FAIL); + self.Counters.GetTabletCounters()->OnWriteFailure(); } } diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 8e12cf1b8a63..b7e9639894b2 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -70,7 +70,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { if (!InsertOneBlob(txc, i, writeId)) { LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix()); - Self->IncCounter(COUNTER_WRITE_DUPLICATE); + Self->Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_DUPLICATE); } } } @@ -140,10 +140,10 @@ void TTxWrite::Complete(const TActorContext& ctx) { } for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) { const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteMeta(); - Self->CSCounters.OnWriteTxComplete(now - writeMeta.GetWriteStartInstant()); - Self->CSCounters.OnSuccessWriteResponse(); + Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant()); + Self->Counters.GetCSCounters().OnSuccessWriteResponse(); } - + Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED); } } diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 3548ccd7f2fe..50fb8ddbe76f 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -5,6 +5,7 @@ #include "engines/writer/buffer/actor.h" #include "engines/column_engine_logs.h" #include "bg_tasks/manager/manager.h" +#include "counters/aggregation/table_stats.h" #include #include @@ -56,21 +57,21 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) { TryRegisterMediatorTimeCast(); EnqueueProgressTx(ctx); } - CSCounters.OnIndexMetadataLimit(NOlap::IColumnEngine::GetMetadataLimit()); + Counters.GetCSCounters().OnIndexMetadataLimit(NOlap::IColumnEngine::GetMetadataLimit()); EnqueueBackgroundActivities(); BackgroundSessionsManager->Start(); ctx.Send(SelfId(), new TEvPrivate::TEvPeriodicWakeup()); NYDBTest::TControllers::GetColumnShardController()->OnSwitchToWork(TabletID()); AFL_VERIFY(!!StartInstant); - CSCounters.Initialization.OnSwitchToWork(TMonotonic::Now() - *StartInstant, TMonotonic::Now() - CreateInstant); + Counters.GetCSCounters().Initialization.OnSwitchToWork(TMonotonic::Now() - *StartInstant, TMonotonic::Now() - CreateInstant); } void TColumnShard::OnActivateExecutor(const TActorContext& ctx) { StartInstant = TMonotonic::Now(); - CSCounters.Initialization.OnActivateExecutor(TMonotonic::Now() - CreateInstant); + Counters.GetCSCounters().Initialization.OnActivateExecutor(TMonotonic::Now() - CreateInstant); const TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("self_id", SelfId()); AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "OnActivateExecutor"); - Executor()->RegisterExternalTabletCounters(TabletCountersPtr.release()); + Executor()->RegisterExternalTabletCounters(TabletCountersHolder.release()); const auto selfActorId = SelfId(); StoragesManager->Initialize(Executor()->Generation()); @@ -148,8 +149,8 @@ void TColumnShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const void TColumnShard::Handle(TEvPrivate::TEvScanStats::TPtr& ev, const TActorContext &ctx) { Y_UNUSED(ctx); - IncCounter(COUNTER_SCANNED_ROWS, ev->Get()->Rows); - IncCounter(COUNTER_SCANNED_BYTES, ev->Get()->Bytes); + Counters.GetTabletCounters()->IncCounter(COUNTER_SCANNED_ROWS, ev->Get()->Rows); + Counters.GetTabletCounters()->IncCounter(COUNTER_SCANNED_BYTES, ev->Get()->Bytes); } void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorContext &ctx) { @@ -165,9 +166,10 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon ui64 txId = ev->Get()->TxId; if (ScanTxInFlight.contains(txId)) { TDuration duration = TAppData::TimeProvider->Now() - ScanTxInFlight[txId]; - IncCounter(COUNTER_SCAN_LATENCY, duration); + Counters.GetTabletCounters()->IncCounter(COUNTER_SCAN_LATENCY, duration); ScanTxInFlight.erase(txId); - SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size()); + Counters.GetTabletCounters()->SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size()); + Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED); } } @@ -217,10 +219,10 @@ void TColumnShard::UpdateInsertTableCounters() { auto& prepared = InsertTable->GetCountersPrepared(); auto& committed = InsertTable->GetCountersCommitted(); - SetCounter(COUNTER_PREPARED_RECORDS, prepared.Rows); - SetCounter(COUNTER_PREPARED_BYTES, prepared.Bytes); - SetCounter(COUNTER_COMMITTED_RECORDS, committed.Rows); - SetCounter(COUNTER_COMMITTED_BYTES, committed.Bytes); + Counters.GetTabletCounters()->SetCounter(COUNTER_PREPARED_RECORDS, prepared.Rows); + Counters.GetTabletCounters()->SetCounter(COUNTER_PREPARED_BYTES, prepared.Bytes); + Counters.GetTabletCounters()->SetCounter(COUNTER_COMMITTED_RECORDS, committed.Rows); + Counters.GetTabletCounters()->SetCounter(COUNTER_COMMITTED_BYTES, committed.Bytes); LOG_S_TRACE("InsertTable. Prepared: " << prepared.Bytes << " in " << prepared.Rows << " records, committed: " << committed.Bytes << " in " << committed.Rows @@ -233,33 +235,34 @@ void TColumnShard::UpdateIndexCounters() { } auto& stats = TablesManager.MutablePrimaryIndex().GetTotalStats(); - SetCounter(COUNTER_INDEX_TABLES, stats.Tables); - SetCounter(COUNTER_INDEX_COLUMN_RECORDS, stats.ColumnRecords); - SetCounter(COUNTER_INSERTED_PORTIONS, stats.GetInsertedStats().Portions); - SetCounter(COUNTER_INSERTED_BLOBS, stats.GetInsertedStats().Blobs); - SetCounter(COUNTER_INSERTED_ROWS, stats.GetInsertedStats().Rows); - SetCounter(COUNTER_INSERTED_BYTES, stats.GetInsertedStats().Bytes); - SetCounter(COUNTER_INSERTED_RAW_BYTES, stats.GetInsertedStats().RawBytes); - SetCounter(COUNTER_COMPACTED_PORTIONS, stats.GetCompactedStats().Portions); - SetCounter(COUNTER_COMPACTED_BLOBS, stats.GetCompactedStats().Blobs); - SetCounter(COUNTER_COMPACTED_ROWS, stats.GetCompactedStats().Rows); - SetCounter(COUNTER_COMPACTED_BYTES, stats.GetCompactedStats().Bytes); - SetCounter(COUNTER_COMPACTED_RAW_BYTES, stats.GetCompactedStats().RawBytes); - SetCounter(COUNTER_SPLIT_COMPACTED_PORTIONS, stats.GetSplitCompactedStats().Portions); - SetCounter(COUNTER_SPLIT_COMPACTED_BLOBS, stats.GetSplitCompactedStats().Blobs); - SetCounter(COUNTER_SPLIT_COMPACTED_ROWS, stats.GetSplitCompactedStats().Rows); - SetCounter(COUNTER_SPLIT_COMPACTED_BYTES, stats.GetSplitCompactedStats().Bytes); - SetCounter(COUNTER_SPLIT_COMPACTED_RAW_BYTES, stats.GetSplitCompactedStats().RawBytes); - SetCounter(COUNTER_INACTIVE_PORTIONS, stats.GetInactiveStats().Portions); - SetCounter(COUNTER_INACTIVE_BLOBS, stats.GetInactiveStats().Blobs); - SetCounter(COUNTER_INACTIVE_ROWS, stats.GetInactiveStats().Rows); - SetCounter(COUNTER_INACTIVE_BYTES, stats.GetInactiveStats().Bytes); - SetCounter(COUNTER_INACTIVE_RAW_BYTES, stats.GetInactiveStats().RawBytes); - SetCounter(COUNTER_EVICTED_PORTIONS, stats.GetEvictedStats().Portions); - SetCounter(COUNTER_EVICTED_BLOBS, stats.GetEvictedStats().Blobs); - SetCounter(COUNTER_EVICTED_ROWS, stats.GetEvictedStats().Rows); - SetCounter(COUNTER_EVICTED_BYTES, stats.GetEvictedStats().Bytes); - SetCounter(COUNTER_EVICTED_RAW_BYTES, stats.GetEvictedStats().RawBytes); + const std::shared_ptr& counters = Counters.GetTabletCounters(); + counters->SetCounter(COUNTER_INDEX_TABLES, stats.Tables); + counters->SetCounter(COUNTER_INDEX_COLUMN_RECORDS, stats.ColumnRecords); + counters->SetCounter(COUNTER_INSERTED_PORTIONS, stats.GetInsertedStats().Portions); + counters->SetCounter(COUNTER_INSERTED_BLOBS, stats.GetInsertedStats().Blobs); + counters->SetCounter(COUNTER_INSERTED_ROWS, stats.GetInsertedStats().Rows); + counters->SetCounter(COUNTER_INSERTED_BYTES, stats.GetInsertedStats().Bytes); + counters->SetCounter(COUNTER_INSERTED_RAW_BYTES, stats.GetInsertedStats().RawBytes); + counters->SetCounter(COUNTER_COMPACTED_PORTIONS, stats.GetCompactedStats().Portions); + counters->SetCounter(COUNTER_COMPACTED_BLOBS, stats.GetCompactedStats().Blobs); + counters->SetCounter(COUNTER_COMPACTED_ROWS, stats.GetCompactedStats().Rows); + counters->SetCounter(COUNTER_COMPACTED_BYTES, stats.GetCompactedStats().Bytes); + counters->SetCounter(COUNTER_COMPACTED_RAW_BYTES, stats.GetCompactedStats().RawBytes); + counters->SetCounter(COUNTER_SPLIT_COMPACTED_PORTIONS, stats.GetSplitCompactedStats().Portions); + counters->SetCounter(COUNTER_SPLIT_COMPACTED_BLOBS, stats.GetSplitCompactedStats().Blobs); + counters->SetCounter(COUNTER_SPLIT_COMPACTED_ROWS, stats.GetSplitCompactedStats().Rows); + counters->SetCounter(COUNTER_SPLIT_COMPACTED_BYTES, stats.GetSplitCompactedStats().Bytes); + counters->SetCounter(COUNTER_SPLIT_COMPACTED_RAW_BYTES, stats.GetSplitCompactedStats().RawBytes); + counters->SetCounter(COUNTER_INACTIVE_PORTIONS, stats.GetInactiveStats().Portions); + counters->SetCounter(COUNTER_INACTIVE_BLOBS, stats.GetInactiveStats().Blobs); + counters->SetCounter(COUNTER_INACTIVE_ROWS, stats.GetInactiveStats().Rows); + counters->SetCounter(COUNTER_INACTIVE_BYTES, stats.GetInactiveStats().Bytes); + counters->SetCounter(COUNTER_INACTIVE_RAW_BYTES, stats.GetInactiveStats().RawBytes); + counters->SetCounter(COUNTER_EVICTED_PORTIONS, stats.GetEvictedStats().Portions); + counters->SetCounter(COUNTER_EVICTED_BLOBS, stats.GetEvictedStats().Blobs); + counters->SetCounter(COUNTER_EVICTED_ROWS, stats.GetEvictedStats().Rows); + counters->SetCounter(COUNTER_EVICTED_BYTES, stats.GetEvictedStats().Bytes); + counters->SetCounter(COUNTER_EVICTED_RAW_BYTES, stats.GetEvictedStats().RawBytes); LOG_S_DEBUG("Index: tables " << stats.Tables << " inserted " << stats.GetInsertedStats().DebugString() @@ -278,8 +281,8 @@ ui64 TColumnShard::MemoryUsage() const { LongTxWrites.size() * (sizeof(TWriteId) + sizeof(TLongTxWriteInfo)) + LongTxWritesByUniqueId.size() * (sizeof(TULID) + sizeof(void*)) + (WaitingScans.size()) * (sizeof(NOlap::TSnapshot) + sizeof(void*)) + - TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) + - TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData); + Counters.GetTabletCounters()->GetValue(COUNTER_PREPARED_RECORDS) * sizeof(NOlap::TInsertedData) + + Counters.GetTabletCounters()->GetValue(COUNTER_COMMITTED_RECORDS) * sizeof(NOlap::TInsertedData); memory += TablesManager.GetMemoryUsage(); return memory; } @@ -290,13 +293,12 @@ void TColumnShard::UpdateResourceMetrics(const TActorContext& ctx, const TUsage& return; } - ui64 storageBytes = - TabletCounters->Simple()[COUNTER_PREPARED_BYTES].Get() + - TabletCounters->Simple()[COUNTER_COMMITTED_BYTES].Get() + - TabletCounters->Simple()[COUNTER_INSERTED_BYTES].Get() + - TabletCounters->Simple()[COUNTER_COMPACTED_BYTES].Get() + - TabletCounters->Simple()[COUNTER_SPLIT_COMPACTED_BYTES].Get() + - TabletCounters->Simple()[COUNTER_INACTIVE_BYTES].Get(); + ui64 storageBytes = Counters.GetTabletCounters()->GetValue(COUNTER_PREPARED_BYTES) + + Counters.GetTabletCounters()->GetValue(COUNTER_COMMITTED_BYTES) + + Counters.GetTabletCounters()->GetValue(COUNTER_INSERTED_BYTES) + + Counters.GetTabletCounters()->GetValue(COUNTER_COMPACTED_BYTES) + + Counters.GetTabletCounters()->GetValue(COUNTER_SPLIT_COMPACTED_BYTES) + + Counters.GetTabletCounters()->GetValue(COUNTER_INACTIVE_BYTES); ui64 memory = MemoryUsage(); @@ -312,36 +314,10 @@ void TColumnShard::UpdateResourceMetrics(const TActorContext& ctx, const TUsage& metrics->TryUpdate(ctx); } -void TColumnShard::ConfigureStats(const NOlap::TColumnEngineStats& indexStats, - ::NKikimrTableStats::TTableStats* tabletStats) { - NOlap::TSnapshot lastIndexUpdate = TablesManager.GetPrimaryIndexSafe().LastUpdate(); - auto activeIndexStats = indexStats.Active(); // data stats excluding inactive and evicted - - if (activeIndexStats.Rows < 0 || activeIndexStats.Bytes < 0) { - LOG_S_WARN("Negative stats counter. Rows: " << activeIndexStats.Rows << " Bytes: " << activeIndexStats.Bytes - << TabletID()); - - activeIndexStats.Rows = (activeIndexStats.Rows < 0) ? 0 : activeIndexStats.Rows; - activeIndexStats.Bytes = (activeIndexStats.Bytes < 0) ? 0 : activeIndexStats.Bytes; - } - - tabletStats->SetRowCount(activeIndexStats.Rows); - tabletStats->SetDataSize(activeIndexStats.Bytes + TabletCounters->Simple()[COUNTER_COMMITTED_BYTES].Get()); - - // TODO: we need row/dataSize counters for evicted data (managed by tablet but stored outside) - // tabletStats->SetIndexSize(); // TODO: calc size of internal tables - - tabletStats->SetLastAccessTime(LastAccessTime.MilliSeconds()); - tabletStats->SetLastUpdateTime(lastIndexUpdate.GetPlanStep()); -} - -void TColumnShard::FillTxTableStats(::NKikimrTableStats::TTableStats* tableStats) const { - tableStats->SetTxRejectedByOverload(TabletCounters->Cumulative()[COUNTER_WRITE_OVERLOAD].Get()); - tableStats->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_OUT_OF_SPACE].Get()); - tableStats->SetInFlightTxCount(Executor()->GetStats().TxInFly); -} - -void TColumnShard::FillOlapStats(const TActorContext& ctx, std::unique_ptr& ev) { +void TColumnShard::FillOlapStats( + const TActorContext& ctx, + std::unique_ptr& ev +) { ev->Record.SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready ev->Record.SetGeneration(Executor()->Generation()); ev->Record.SetRound(StatsReportRound++); @@ -350,30 +326,29 @@ void TColumnShard::FillOlapStats(const TActorContext& ctx, std::unique_ptrGetResourceMetrics()) { resourceMetrics->Fill(*ev->Record.MutableTabletMetrics()); } - auto* tabletStats = ev->Record.MutableTableStats(); - FillTxTableStats(tabletStats); + + TTableStatsBuilder statsBuilder(*ev->Record.MutableTableStats()); + statsBuilder.FillColumnTableStats(*Counters.GetColumnTablesCounters()); + statsBuilder.FillTabletStats(*Counters.GetTabletCounters()); + statsBuilder.FillBackgroundControllerStats(*Counters.GetBackgroundControllerCounters()); + statsBuilder.FillScanCountersStats(Counters.GetScanCounters()); + statsBuilder.FillExecutorStats(*Executor()); if (TablesManager.HasPrimaryIndex()) { - const auto& indexStats = TablesManager.MutablePrimaryIndex().GetTotalStats(); - ConfigureStats(indexStats, tabletStats); + statsBuilder.FillColumnEngineStats(TablesManager.MutablePrimaryIndex().GetTotalStats()); } } -void TColumnShard::FillColumnTableStats(const TActorContext& ctx, - std::unique_ptr& ev) { - if (!TablesManager.HasPrimaryIndex()) { - return; - } - const auto& tablesIndexStats = TablesManager.MutablePrimaryIndex().GetStats(); - LOG_S_DEBUG("There are stats for " << tablesIndexStats.size() << " tables"); - for (const auto& [tableLocalID, columnStats] : tablesIndexStats) { - if (!columnStats) { - LOG_S_ERROR("SendPeriodicStats: empty stats"); - continue; - } +void TColumnShard::FillColumnTableStats( + const TActorContext& ctx, + std::unique_ptr& ev +) { + auto tables = TablesManager.GetTables(); + LOG_S_DEBUG("There are stats for " << tables.size() << " tables"); + for (const auto& [pathId, _] : tables) { auto* periodicTableStats = ev->Record.AddTables(); periodicTableStats->SetDatashardId(TabletID()); - periodicTableStats->SetTableLocalId(tableLocalID); + periodicTableStats->SetTableLocalId(pathId); periodicTableStats->SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready periodicTableStats->SetGeneration(Executor()->Generation()); @@ -385,11 +360,20 @@ void TColumnShard::FillColumnTableStats(const TActorContext& ctx, resourceMetrics->Fill(*periodicTableStats->MutableTabletMetrics()); } - auto* tableStats = periodicTableStats->MutableTableStats(); - FillTxTableStats(tableStats); - ConfigureStats(*columnStats, tableStats); + TTableStatsBuilder statsBuilder(*periodicTableStats->MutableTableStats()); + statsBuilder.FillColumnTableStats(*Counters.GetColumnTablesCounters()->GetPathIdCounter(pathId)); + statsBuilder.FillTabletStats(*Counters.GetTabletCounters()); + statsBuilder.FillBackgroundControllerStats(*Counters.GetBackgroundControllerCounters(), pathId); + statsBuilder.FillScanCountersStats(Counters.GetScanCounters()); + statsBuilder.FillExecutorStats(*Executor()); + if (TablesManager.HasPrimaryIndex()) { + auto columnEngineStats = TablesManager.GetPrimaryIndexSafe().GetStats().FindPtr(pathId); + if (columnEngineStats && *columnEngineStats) { + statsBuilder.FillColumnEngineStats(**columnEngineStats); + } + } - LOG_S_TRACE("Add stats for table, tableLocalID=" << tableLocalID); + LOG_S_TRACE("Add stats for table, tableLocalID=" << pathId); } } diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index 03f8162f1e04..5fb6485094ac 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -168,9 +168,9 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) } Self->TablesManager = std::move(tManagerLocal); - Self->SetCounter(COUNTER_TABLES, Self->TablesManager.GetTables().size()); - Self->SetCounter(COUNTER_TABLE_PRESETS, Self->TablesManager.GetSchemaPresets().size()); - Self->SetCounter(COUNTER_TABLE_TTLS, Self->TablesManager.GetTtl().PathsCount()); + Self->Counters.GetTabletCounters()->SetCounter(COUNTER_TABLES, Self->TablesManager.GetTables().size()); + Self->Counters.GetTabletCounters()->SetCounter(COUNTER_TABLE_PRESETS, Self->TablesManager.GetSchemaPresets().size()); + Self->Counters.GetTabletCounters()->SetCounter(COUNTER_TABLE_TTLS, Self->TablesManager.GetTtl().PathsCount()); ACFL_DEBUG("step", "TTablesManager::Load_Finish"); } @@ -253,7 +253,7 @@ bool TTxInit::Execute(TTransactionContext& txc, const TActorContext& ctx) { } void TTxInit::Complete(const TActorContext& ctx) { - Self->CSCounters.Initialization.OnTxInitFinished(TMonotonic::Now() - StartInstant); + Self->Counters.GetCSCounters().Initialization.OnTxInitFinished(TMonotonic::Now() - StartInstant); Self->ProgressTxController->OnTabletInit(); Self->SwitchToWork(ctx); NYDBTest::TControllers::GetColumnShardController()->OnTabletInitCompleted(*Self); @@ -301,7 +301,7 @@ bool TTxUpdateSchema::Execute(TTransactionContext& txc, const TActorContext&) { void TTxUpdateSchema::Complete(const TActorContext& ctx) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("step", "TTxUpdateSchema.Complete"); - Self->CSCounters.Initialization.OnTxUpdateSchemaFinished(TMonotonic::Now() - StartInstant); + Self->Counters.GetCSCounters().Initialization.OnTxUpdateSchemaFinished(TMonotonic::Now() - StartInstant); if (NormalizerTasks.empty()) { AFL_VERIFY(Self->NormalizerController.IsNormalizationFinished())("details", Self->NormalizerController.DebugString()); Self->Execute(new TTxInit(Self), ctx); @@ -432,7 +432,7 @@ bool TTxInitSchema::Execute(TTransactionContext& txc, const TActorContext&) { } void TTxInitSchema::Complete(const TActorContext& ctx) { - Self->CSCounters.Initialization.OnTxInitSchemaFinished(TMonotonic::Now() - StartInstant); + Self->Counters.GetCSCounters().Initialization.OnTxInitSchemaFinished(TMonotonic::Now() - StartInstant); LOG_S_DEBUG("TxInitSchema.Complete at tablet " << Self->TabletID();); Self->Execute(new TTxUpdateSchema(Self), ctx); } diff --git a/ydb/core/tx/columnshard/columnshard__plan_step.cpp b/ydb/core/tx/columnshard/columnshard__plan_step.cpp index 80dda8112e0b..a36426068aa1 100644 --- a/ydb/core/tx/columnshard/columnshard__plan_step.cpp +++ b/ydb/core/tx/columnshard/columnshard__plan_step.cpp @@ -102,7 +102,7 @@ bool TTxPlanStep::Execute(TTransactionContext& txc, const TActorContext& ctx) { Result = std::make_unique(Self->TabletID(), step); - Self->IncCounter(COUNTER_PLAN_STEP_ACCEPTED); + Self->Counters.GetTabletCounters()->IncCounter(COUNTER_PLAN_STEP_ACCEPTED); if (plannedCount > 0 || Self->ProgressTxController->HaveOutdatedTxs()) { Self->EnqueueProgressTx(ctx); diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index e7d90c111148..557693070328 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -29,7 +29,7 @@ class TColumnShard::TTxProgressTx: public TTransactionBase { NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute"); Y_ABORT_UNLESS(Self->ProgressTxInFlight); - Self->TabletCounters->Simple()[COUNTER_TX_COMPLETE_LAG].Set(Self->GetTxCompleteLag().MilliSeconds()); + Self->Counters.GetTabletCounters()->SetCounter(COUNTER_TX_COMPLETE_LAG, Self->GetTxCompleteLag().MilliSeconds()); const size_t removedCount = Self->ProgressTxController->CleanExpiredTxs(txc); if (removedCount > 0) { @@ -55,6 +55,7 @@ class TColumnShard::TTxProgressTx: public TTransactionBase { TxOperator = Self->ProgressTxController->GetVerifiedTxOperator(txId); AFL_VERIFY(TxOperator->ProgressOnExecute(*Self, NOlap::TSnapshot(step, txId), txc)); Self->ProgressTxController->FinishPlannedTx(txId, txc); + Self->Counters.GetTabletCounters()->IncCounter(COUNTER_PLANNED_TX_COMPLETED); } return true; } diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index d42ad4fc8d4f..9a844ff4858b 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -25,7 +25,7 @@ class TTxProposeTransaction: public NTabletFlatExecutor::TTransactionBaseIncCounter(COUNTER_PREPARE_REQUEST); + Self->Counters.GetTabletCounters()->IncCounter(COUNTER_PREPARE_REQUEST); auto& record = Proto(Ev->Get()); const auto txKind = record.GetTxKind(); diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index a749e3c4f571..ddd6a87dddeb 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -5,6 +5,7 @@ #include "engines/reader/transaction/tx_internal_scan.h" #include +#include namespace NKikimr::NColumnShard { @@ -29,9 +30,9 @@ void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext return; } - LastAccessTime = TAppData::TimeProvider->Now(); - ScanTxInFlight.insert({txId, LastAccessTime}); - SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size()); + Counters.GetColumnTablesCounters()->GetPathIdCounter(record.GetLocalPathId())->OnAccess(); + ScanTxInFlight.insert({txId, TAppData::TimeProvider->Now()}); + Counters.GetTabletCounters()->SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size()); Execute(new NOlap::NReader::TTxScan(this, ev), ctx); } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index c7a59351c639..436370507f60 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -13,30 +13,25 @@ namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, const ui64 cookie, std::unique_ptr&& event, const TActorContext& ctx) { - IncCounter(COUNTER_WRITE_FAIL); + Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); switch (overloadReason) { case EOverloadStatus::Disk: - IncCounter(COUNTER_OUT_OF_SPACE); + Counters.GetCSCounters().OnWriteOverloadDisk(); break; case EOverloadStatus::InsertTable: - IncCounter(COUNTER_WRITE_OVERLOAD); - CSCounters.OnOverloadInsertTable(writeData.GetSize()); + Counters.GetCSCounters().OnWriteOverloadInsertTable(writeData.GetSize()); break; case EOverloadStatus::OverloadMetadata: - IncCounter(COUNTER_WRITE_OVERLOAD); - CSCounters.OnOverloadMetadata(writeData.GetSize()); + Counters.GetCSCounters().OnWriteOverloadMetadata(writeData.GetSize()); break; case EOverloadStatus::ShardTxInFly: - IncCounter(COUNTER_WRITE_OVERLOAD); - CSCounters.OnOverloadShardTx(writeData.GetSize()); + Counters.GetCSCounters().OnWriteOverloadShardTx(writeData.GetSize()); break; case EOverloadStatus::ShardWritesInFly: - IncCounter(COUNTER_WRITE_OVERLOAD); - CSCounters.OnOverloadShardWrites(writeData.GetSize()); + Counters.GetCSCounters().OnWriteOverloadShardWrites(writeData.GetSize()); break; case EOverloadStatus::ShardWritesSizeInFly: - IncCounter(COUNTER_WRITE_OVERLOAD); - CSCounters.OnOverloadShardWritesSize(writeData.GetSize()); + Counters.GetCSCounters().OnWriteOverloadShardWritesSize(writeData.GetSize()); break; case EOverloadStatus::None: Y_ABORT("invalid function usage"); @@ -57,7 +52,7 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId) return EOverloadStatus::InsertTable; } - CSCounters.OnIndexMetadataLimit(NOlap::IColumnEngine::GetMetadataLimit()); + Counters.GetCSCounters().OnIndexMetadataLimit(NOlap::IColumnEngine::GetMetadataLimit()); if (TablesManager.GetPrimaryIndex() && TablesManager.GetPrimaryIndex()->IsOverloadedByMetadata(NOlap::IColumnEngine::GetMetadataLimit())) { return EOverloadStatus::OverloadMetadata; } @@ -69,12 +64,12 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId) AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "tx_in_fly")("sum", Executor()->GetStats().TxInFly)("limit", txLimit); return EOverloadStatus::ShardTxInFly; } - if (writesLimit && WritesMonitor.GetWritesInFlight() > writesLimit) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "writes_in_fly")("sum", WritesMonitor.GetWritesInFlight())("limit", writesLimit); + if (writesLimit && Counters.GetWritesMonitor()->GetWritesInFlight() > writesLimit) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "writes_in_fly")("sum", Counters.GetWritesMonitor()->GetWritesInFlight())("limit", writesLimit); return EOverloadStatus::ShardWritesInFly; } - if (writesSizeLimit && WritesMonitor.GetWritesSizeInFlight() > writesSizeLimit) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "writes_size_in_fly")("sum", WritesMonitor.GetWritesSizeInFlight())("limit", writesSizeLimit); + if (writesSizeLimit && Counters.GetWritesMonitor()->GetWritesSizeInFlight() > writesSizeLimit) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "writes_size_in_fly")("sum", Counters.GetWritesMonitor()->GetWritesSizeInFlight())("limit", writesSizeLimit); return EOverloadStatus::ShardWritesSizeInFly; } return EOverloadStatus::None; @@ -89,25 +84,25 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo auto baseAggregations = wBuffer.GetAggregations(); wBuffer.InitReplyReceived(TMonotonic::Now()); - auto wg = WritesMonitor.FinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size()); + auto wg = Counters.GetWritesMonitor()->OnFinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size()); for (auto&& aggr : baseAggregations) { const auto& writeMeta = aggr->GetWriteMeta(); if (!TablesManager.IsReadyForWrite(writeMeta.GetTableId())) { ACFL_ERROR("event", "absent_pathId")("path_id", writeMeta.GetTableId())("has_index", TablesManager.HasPrimaryIndex()); - IncCounter(COUNTER_WRITE_FAIL); + Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); auto result = std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR); ctx.Send(writeMeta.GetSource(), result.release()); - CSCounters.OnFailedWriteResponse(EWriteFailReason::NoTable); + Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::NoTable); wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator()); continue; } if (putResult.GetPutStatus() != NKikimrProto::OK) { - CSCounters.OnWritePutBlobsFail(TMonotonic::Now() - writeMeta.GetWriteStartInstant()); - IncCounter(COUNTER_WRITE_FAIL); + Counters.GetCSCounters().OnWritePutBlobsFail(TMonotonic::Now() - writeMeta.GetWriteStartInstant()); + Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); auto errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR; if (putResult.GetPutStatus() == NKikimrProto::TIMEOUT || putResult.GetPutStatus() == NKikimrProto::DEADLINE) { @@ -128,17 +123,17 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo ev->Get()->GetErrorMessage() ? ev->Get()->GetErrorMessage() : "put data fails"); ctx.Send(writeMeta.GetSource(), result.release(), 0, operation->GetCookie()); } - CSCounters.OnFailedWriteResponse(EWriteFailReason::PutBlob); + Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::PutBlob); wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator()); } else { const TMonotonic now = TMonotonic::Now(); - CSCounters.OnWritePutBlobsSuccess(now - writeMeta.GetWriteStartInstant()); - CSCounters.OnWriteMiddle1PutBlobsSuccess(now - writeMeta.GetWriteMiddle1StartInstant()); - CSCounters.OnWriteMiddle2PutBlobsSuccess(now - writeMeta.GetWriteMiddle2StartInstant()); - CSCounters.OnWriteMiddle3PutBlobsSuccess(now - writeMeta.GetWriteMiddle3StartInstant()); - CSCounters.OnWriteMiddle4PutBlobsSuccess(now - writeMeta.GetWriteMiddle4StartInstant()); - CSCounters.OnWriteMiddle5PutBlobsSuccess(now - writeMeta.GetWriteMiddle5StartInstant()); - CSCounters.OnWriteMiddle6PutBlobsSuccess(now - writeMeta.GetWriteMiddle6StartInstant()); + Counters.GetCSCounters().OnWritePutBlobsSuccess(now - writeMeta.GetWriteStartInstant()); + Counters.GetCSCounters().OnWriteMiddle1PutBlobsSuccess(now - writeMeta.GetWriteMiddle1StartInstant()); + Counters.GetCSCounters().OnWriteMiddle2PutBlobsSuccess(now - writeMeta.GetWriteMiddle2StartInstant()); + Counters.GetCSCounters().OnWriteMiddle3PutBlobsSuccess(now - writeMeta.GetWriteMiddle3StartInstant()); + Counters.GetCSCounters().OnWriteMiddle4PutBlobsSuccess(now - writeMeta.GetWriteMiddle4StartInstant()); + Counters.GetCSCounters().OnWriteMiddle5PutBlobsSuccess(now - writeMeta.GetWriteMiddle5StartInstant()); + Counters.GetCSCounters().OnWriteMiddle6PutBlobsSuccess(now - writeMeta.GetWriteMiddle6StartInstant()); LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId() << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID()); @@ -152,8 +147,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteDraft::TPtr& ev, const TActorConte } void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { - CSCounters.OnStartWriteRequest(); - LastAccessTime = TAppData::TimeProvider->Now(); + Counters.GetCSCounters().OnStartWriteRequest(); const auto& record = Proto(ev->Get()); const ui64 tableId = record.GetTableId(); @@ -162,6 +156,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex const TString dedupId = record.GetDedupId(); const auto source = ev->Sender; + Counters.GetColumnTablesCounters()->GetPathIdCounter(tableId)->OnUpdate(); + std::optional granuleShardingVersion; if (record.HasGranuleShardingVersion()) { granuleShardingVersion = record.GetGranuleShardingVersion(); @@ -177,7 +173,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex writeMeta.SetWritePartId(record.GetWritePartId()); const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex) { - IncCounter(signalIndex); + Counters.GetTabletCounters()->IncCounter(signalIndex); ctx.Send(source, std::make_unique(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR)); return; @@ -185,7 +181,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "disabled"); - CSCounters.OnFailedWriteResponse(EWriteFailReason::Disabled); + Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::Disabled); return returnFail(COUNTER_WRITE_FAIL); } @@ -193,7 +189,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index") << " at tablet " << TabletID()); - CSCounters.OnFailedWriteResponse(EWriteFailReason::NoTable); + Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::NoTable); return returnFail(COUNTER_WRITE_FAIL); } @@ -202,7 +198,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex if (!arrowData->ParseFromProto(record)) { LOG_S_ERROR("Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId() << " at tablet " << TabletID()); - CSCounters.OnFailedWriteResponse(EWriteFailReason::IncorrectSchema); + Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::IncorrectSchema); return returnFail(COUNTER_WRITE_FAIL); } @@ -212,27 +208,27 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex if (overloadStatus != EOverloadStatus::None) { std::unique_ptr result = std::make_unique(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED); OverloadWriteFail(overloadStatus, writeData, cookie, std::move(result), ctx); - CSCounters.OnFailedWriteResponse(EWriteFailReason::Overload); + Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::Overload); } else { if (ui64 writeId = (ui64)HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) { LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId() << " longTx " << writeMeta.GetLongTxIdUnsafe().ToString() << " at tablet " << TabletID()); - IncCounter(COUNTER_WRITE_DUPLICATE); + Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_DUPLICATE); auto result = std::make_unique( TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS); ctx.Send(writeMeta.GetSource(), result.release()); - CSCounters.OnFailedWriteResponse(EWriteFailReason::LongTxDuplication); + Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::LongTxDuplication); return; } - WritesMonitor.RegisterWrite(writeData.GetSize()); + Counters.GetWritesMonitor()->OnStartWrite(writeData.GetSize()); LOG_S_DEBUG("Write (blob) " << writeData.GetSize() << " bytes into pathId " << writeMeta.GetTableId() << (writeMeta.GetWriteId()? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : " ") - << WritesMonitor.DebugString() + << Counters.GetWritesMonitor()->DebugString() << " at tablet " << TabletID()); writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now()); std::shared_ptr task = std::make_shared(TabletID(), SelfId(), BufferizationWriteActorId, std::move(writeData), @@ -304,7 +300,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor const auto behaviour = TOperationsManager::GetBehaviour(*ev->Get()); if (behaviour == EOperationBehaviour::Undefined) { - IncCounter(COUNTER_WRITE_FAIL); + Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "invalid write event"); ctx.Send(source, result.release(), 0, cookie); return; @@ -313,7 +309,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor if (behaviour == EOperationBehaviour::CommitWriteLock) { auto commitOperation = std::make_shared(); if (!commitOperation->Parse(*ev->Get())) { - IncCounter(COUNTER_WRITE_FAIL); + Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "invalid commit event"); ctx.Send(source, result.release(), 0, cookie); } @@ -324,7 +320,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor const ui64 lockId = (behaviour == EOperationBehaviour::InTxWrite) ? record.GetTxId() : record.GetLockTxId(); if (record.GetOperations().size() != 1) { - IncCounter(COUNTER_WRITE_FAIL); + Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "only single operation is supported"); ctx.Send(source, result.release(), 0, cookie); return; @@ -333,7 +329,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor const auto& operation = record.GetOperations()[0]; const std::optional mType = TEnumOperator::DeserializeFromProto(operation.GetType()); if (!mType) { - IncCounter(COUNTER_WRITE_FAIL); + Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "operation " + NKikimrDataEvents::TEvWrite::TOperation::EOperationType_Name(operation.GetType()) + " is not supported"); ctx.Send(source, result.release(), 0, cookie); @@ -341,7 +337,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor } if (!operation.GetTableId().HasSchemaVersion()) { - IncCounter(COUNTER_WRITE_FAIL); + Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "schema version not set"); ctx.Send(source, result.release(), 0, cookie); return; @@ -349,7 +345,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchema(operation.GetTableId().GetSchemaVersion()); if (!schema) { - IncCounter(COUNTER_WRITE_FAIL); + Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "unknown schema version"); ctx.Send(source, result.release(), 0, cookie); return; @@ -358,7 +354,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor const auto tableId = operation.GetTableId().GetTableId(); if (!TablesManager.IsReadyForWrite(tableId)) { - IncCounter(COUNTER_WRITE_FAIL); + Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "table not writable"); ctx.Send(source, result.release(), 0, cookie); return; @@ -366,7 +362,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor auto arrowData = std::make_shared(schema); if (!arrowData->Parse(operation, NEvWrite::TPayloadReader(*ev->Get()))) { - IncCounter(COUNTER_WRITE_FAIL); + Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL); auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "parsing data error"); ctx.Send(source, result.release(), 0, cookie); } @@ -379,7 +375,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor return; } - auto wg = WritesMonitor.RegisterWrite(arrowData->GetSize()); + auto wg = Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize()); std::optional granuleShardingVersionId; if (record.HasGranuleShardingVersionId()) { diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 69d54a68d1bd..27497b4e8fc6 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -41,7 +41,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorConte if (IsAnyChannelYellowStop()) { ACFL_ERROR("event", "TEvWriteIndex failed")("reason", "channel yellow stop"); - IncCounter(COUNTER_OUT_OF_SPACE); + Counters.GetTabletCounters()->IncCounter(COUNTER_OUT_OF_SPACE); ev->Get()->SetPutStatus(NKikimrProto::TRYLATER); NOlap::TChangesFinishContext context("out of disk space"); ev->Get()->IndexChanges->Abort(*this, context); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 40a81810df3f..586c545a4bb8 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -68,30 +68,24 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , ProgressTxController(std::make_unique(*this)) , StoragesManager(std::make_shared(*this)) , DataLocksManager(std::make_shared()) - , PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod(TSettings::DefaultPeriodicWakeupActivationPeriod)) + , PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod( + TSettings::DefaultPeriodicWakeupActivationPeriod)) , StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval(TSettings::DefaultStatsReportInterval)) + , TabletCountersHolder(new TProtobufTabletCounters()) + , Counters(*TabletCountersHolder) , InFlightReadsTracker(StoragesManager) , TablesManager(StoragesManager, info->TabletID) , Subscribers(std::make_shared(*this)) , PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig())) , InsertTable(std::make_unique()) - , SubscribeCounters(std::make_shared()) - , InsertTaskSubscription(NOlap::TInsertColumnEngineChanges::StaticTypeName(), SubscribeCounters) - , CompactTaskSubscription(NOlap::TCompactColumnEngineChanges::StaticTypeName(), SubscribeCounters) - , TTLTaskSubscription(NOlap::TTTLColumnEngineChanges::StaticTypeName(), SubscribeCounters) - , ScanCounters("Scan") - , WritesMonitor(*this) - , NormalizerController(StoragesManager, SubscribeCounters) + , InsertTaskSubscription(NOlap::TInsertColumnEngineChanges::StaticTypeName(), Counters.GetSubscribeCounters()) + , CompactTaskSubscription(NOlap::TCompactColumnEngineChanges::StaticTypeName(), Counters.GetSubscribeCounters()) + , TTLTaskSubscription(NOlap::TTTLColumnEngineChanges::StaticTypeName(), Counters.GetSubscribeCounters()) + , BackgroundController(Counters.GetBackgroundControllerCounters()) + , NormalizerController(StoragesManager, Counters.GetSubscribeCounters()) , SysLocks(this) - , MaxReadStaleness(TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetMaxReadStaleness_ms())) -{ - TabletCountersPtr.reset(new TProtobufTabletCounters< - ESimpleCounters_descriptor, - ECumulativeCounters_descriptor, - EPercentileCounters_descriptor, - ETxTypes_descriptor - >()); - TabletCounters = TabletCountersPtr.get(); + , MaxReadStaleness(TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetMaxReadStaleness_ms())) { } void TColumnShard::OnDetach(const TActorContext& ctx) { @@ -427,9 +421,9 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl TablesManager.AddTableVersion(pathId, version, tableVerProto, db, Tiers); - SetCounter(COUNTER_TABLES, TablesManager.GetTables().size()); - SetCounter(COUNTER_TABLE_PRESETS, TablesManager.GetSchemaPresets().size()); - SetCounter(COUNTER_TABLE_TTLS, TablesManager.GetTtl().PathsCount()); + Counters.GetTabletCounters()->SetCounter(COUNTER_TABLES, TablesManager.GetTables().size()); + Counters.GetTabletCounters()->SetCounter(COUNTER_TABLE_PRESETS, TablesManager.GetSchemaPresets().size()); + Counters.GetTabletCounters()->SetCounter(COUNTER_TABLE_TTLS, TablesManager.GetTtl().PathsCount()); } void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterProto, const NOlap::TSnapshot& version, @@ -515,7 +509,7 @@ void TColumnShard::EnqueueBackgroundActivities(const bool periodic) { ACFL_DEBUG("event", "EnqueueBackgroundActivities")("periodic", periodic); StoragesManager->GetOperatorVerified(NOlap::IStoragesManager::DefaultStorageId); StoragesManager->GetSharedBlobsManager()->GetStorageManagerVerified(NOlap::IStoragesManager::DefaultStorageId); - CSCounters.OnStartBackground(); + Counters.GetCSCounters().OnStartBackground(); if (!TablesManager.HasPrimaryIndex()) { AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("problem", "Background activities cannot be started: no index at tablet"); @@ -637,7 +631,7 @@ class TTTLChangesReadTask: public TChangesReadTask, public TMonitoringObjectsCou }; void TColumnShard::StartIndexTask(std::vector&& dataToIndex, const i64 bytesToIndex) { - CSCounters.IndexationInput(bytesToIndex); + Counters.GetCSCounters().IndexationInput(bytesToIndex); std::vector data; data.reserve(dataToIndex.size()); @@ -659,7 +653,8 @@ void TColumnShard::StartIndexTask(std::vector&& dat NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription( ResourceSubscribeActor, std::make_shared( - std::make_shared(std::move(ev), SelfId(), TabletID(), IndexationCounters, GetLastCompletedTx()), 0, indexChanges->CalcMemoryForUsage(), externalTaskId, InsertTaskSubscription)); + std::make_shared(std::move(ev), SelfId(), TabletID(), Counters.GetIndexationCounters(), GetLastCompletedTx()), + 0, indexChanges->CalcMemoryForUsage(), externalTaskId, InsertTaskSubscription)); } void TColumnShard::SetupIndexation() { @@ -689,7 +684,7 @@ void TColumnShard::SetupIndexation() { } AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start_indexation_tasks")("insert_overload_size", InsertTable->GetCountersCommitted().Bytes); - CSCounters.OnSetupIndexation(); + Counters.GetCSCounters().OnSetupIndexation(); ui64 bytesToIndex = 0; ui64 txBytesWrite = 0; std::vector dataToIndex; @@ -720,7 +715,7 @@ void TColumnShard::SetupCompaction() { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_compaction")("reason", "disabled"); return; } - CSCounters.OnSetupCompaction(); + Counters.GetCSCounters().OnSetupCompaction(); BackgroundController.CheckDeadlines(); while (BackgroundController.GetCompactionsCount() < TSettings::MAX_ACTIVE_COMPACTIONS) { @@ -739,7 +734,7 @@ void TColumnShard::SetupCompaction() { NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription( ResourceSubscribeActor, std::make_shared( - std::make_shared(std::move(ev), SelfId(), TabletID(), CompactionCounters, GetLastCompletedTx()), 0, indexChanges->CalcMemoryForUsage(), externalTaskId, CompactTaskSubscription)); + std::make_shared(std::move(ev), SelfId(), TabletID(), Counters.GetCompactionCounters(), GetLastCompletedTx()), 0, indexChanges->CalcMemoryForUsage(), externalTaskId, CompactTaskSubscription)); } LOG_S_DEBUG("ActiveCompactions: " << BackgroundController.GetCompactionsCount() << " at tablet " << TabletID()); @@ -750,7 +745,7 @@ bool TColumnShard::SetupTtl(const THashMap& pathTtls) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_ttl")("reason", "disabled"); return false; } - CSCounters.OnSetupTtl(); + Counters.GetCSCounters().OnSetupTtl(); THashMap eviction = pathTtls; for (auto&& i : eviction) { ACFL_DEBUG("background", "ttl")("path", i.first)("info", i.second.GetDebugString()); @@ -774,7 +769,8 @@ bool TColumnShard::SetupTtl(const THashMap& pathTtls) { if (needWrites) { NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription( ResourceSubscribeActor, std::make_shared( - std::make_shared(std::move(ev), SelfId(), TabletID(), CompactionCounters, GetLastCompletedTx()), 0, i->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription)); + std::make_shared(std::move(ev), SelfId(), TabletID(), Counters.GetCompactionCounters(), GetLastCompletedTx()), + 0, i->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription)); } else { ev->SetPutStatus(NKikimrProto::OK); ActorContext().Send(SelfId(), std::move(ev)); @@ -784,7 +780,7 @@ bool TColumnShard::SetupTtl(const THashMap& pathTtls) { } void TColumnShard::SetupCleanupPortions() { - CSCounters.OnSetupCleanup(); + Counters.GetCSCounters().OnSetupCleanup(); if (!AppDataVerified().ColumnShardConfig.GetCleanupEnabled() || !NYDBTest::TControllers::GetColumnShardController()->IsBackgroundEnabled(NYDBTest::ICSController::EBackground::Cleanup)) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_cleanup")("reason", "disabled"); return; @@ -813,7 +809,7 @@ void TColumnShard::SetupCleanupPortions() { } void TColumnShard::SetupCleanupTables() { - CSCounters.OnSetupCleanup(); + Counters.GetCSCounters().OnSetupCleanup(); if (BackgroundController.IsCleanupTablesActive()) { ACFL_DEBUG("background", "cleanup")("skip_reason", "in_progress"); return; diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 52d9342a1f88..1511c60e6e5e 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -13,6 +13,7 @@ #include "transactions/tx_controller.h" #include "inflight_request_tracker.h" #include "counters/columnshard.h" +#include "counters/counters_manager.h" #include "resource_subscriber/counters.h" #include "resource_subscriber/task.h" #include "normalizer/abstract/abstract.h" @@ -276,14 +277,6 @@ class TColumnShard putStatus.OnYellowChannels(Executor()); } - void SetCounter(NColumnShard::ESimpleCounters counter, ui64 num) const { - TabletCounters->Simple()[counter].Set(num); - } - - void IncCounter(NColumnShard::ECumulativeCounters counter, ui64 num = 1) const { - TabletCounters->Cumulative()[counter].Increment(num); - } - void ActivateTiering(const ui64 pathId, const TString& useTiering); void OnTieringModified(const std::optional pathId = {}); public: @@ -297,25 +290,17 @@ class TColumnShard None /* "none" */ }; - void IncCounter(NColumnShard::EPercentileCounters counter, const TDuration& latency) const { - TabletCounters->Percentile()[counter].IncrementFor(latency.MicroSeconds()); - } - - void IncCounter(NDataShard::ESimpleCounters counter, ui64 num = 1) const { - TabletCounters->Simple()[counter].Add(num); - } - // For syslocks void IncCounter(NDataShard::ECumulativeCounters counter, ui64 num = 1) const { - TabletCounters->Cumulative()[counter].Increment(num); + Counters.GetTabletCounters()->IncCounter(counter, num); } void IncCounter(NDataShard::EPercentileCounters counter, ui64 num) const { - TabletCounters->Percentile()[counter].IncrementFor(num); + Counters.GetTabletCounters()->IncCounter(counter, num); } void IncCounter(NDataShard::EPercentileCounters counter, const TDuration& latency) const { - TabletCounters->Percentile()[counter].IncrementFor(latency.MilliSeconds()); + Counters.GetTabletCounters()->IncCounter(counter, latency); } inline TRowVersion LastCompleteTxVersion() const { @@ -431,56 +416,6 @@ class TColumnShard std::optional GranuleShardingVersionId; }; - class TWritesMonitor { - private: - TColumnShard& Owner; - YDB_READONLY(ui64, WritesInFlight, 0); - YDB_READONLY(ui64, WritesSizeInFlight, 0); - - public: - class TGuard: public TNonCopyable { - friend class TWritesMonitor; - private: - TWritesMonitor& Owner; - - explicit TGuard(TWritesMonitor& owner) - : Owner(owner) - {} - - public: - ~TGuard() { - Owner.UpdateCounters(); - } - }; - - TWritesMonitor(TColumnShard& owner) - : Owner(owner) - {} - - TGuard RegisterWrite(const ui64 dataSize) { - ++WritesInFlight; - WritesSizeInFlight += dataSize; - return TGuard(*this); - } - - TGuard FinishWrite(const ui64 dataSize, const ui32 writesCount = 1) { - Y_ABORT_UNLESS(WritesInFlight > 0); - Y_ABORT_UNLESS(WritesSizeInFlight >= dataSize); - WritesInFlight -= writesCount; - WritesSizeInFlight -= dataSize; - return TGuard(*this); - } - - TString DebugString() const { - return TStringBuilder() << "{object=write_monitor;count=" << WritesInFlight << ";size=" << WritesSizeInFlight << "}"; - } - - private: - void UpdateCounters() { - Owner.SetCounter(COUNTER_WRITES_IN_FLY, WritesInFlight); - } - }; - ui64 CurrentSchemeShardId = 0; TMessageSeqNo LastSchemaSeqNo; std::optional ProcessingParams; @@ -500,32 +435,25 @@ class TColumnShard const TDuration PeriodicWakeupActivationPeriod; TDuration FailActivationDelay = TDuration::Seconds(1); const TDuration StatsReportInterval; - TInstant LastAccessTime; TInstant LastStatsReport; TActorId ResourceSubscribeActor; TActorId BufferizationWriteActorId; TActorId StatsReportPipe; + std::unique_ptr TabletCountersHolder; + TCountersManager Counters; + TInFlightReadsTracker InFlightReadsTracker; TTablesManager TablesManager; std::shared_ptr Subscribers; std::shared_ptr Tiers; - std::unique_ptr TabletCountersPtr; - TTabletCountersBase* TabletCounters; std::unique_ptr PipeClientCache; std::unique_ptr InsertTable; - std::shared_ptr SubscribeCounters; NOlap::NResourceBroker::NSubscribe::TTaskContext InsertTaskSubscription; NOlap::NResourceBroker::NSubscribe::TTaskContext CompactTaskSubscription; NOlap::NResourceBroker::NSubscribe::TTaskContext TTLTaskSubscription; - const TScanCounters ScanCounters; - const TIndexationCounters CompactionCounters = TIndexationCounters("GeneralCompaction"); - const TIndexationCounters IndexationCounters = TIndexationCounters("Indexation"); - const TIndexationCounters EvictionCounters = TIndexationCounters("Eviction"); - const TCSCounters CSCounters; - TWritesMonitor WritesMonitor; bool ProgressTxInFlight = false; THashMap ScanTxInFlight; THashMap LongTxWrites; @@ -594,8 +522,6 @@ class TColumnShard void SendPeriodicStats(); void FillOlapStats(const TActorContext& ctx, std::unique_ptr& ev); void FillColumnTableStats(const TActorContext& ctx, std::unique_ptr& ev); - void ConfigureStats(const NOlap::TColumnEngineStats& indexStats, ::NKikimrTableStats::TTableStats* tabletStats); - void FillTxTableStats(::NKikimrTableStats::TTableStats* tableStats) const; public: ui64 TabletTxCounter = 0; diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index da31b1938e5a..bee98ec14965 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -142,8 +142,9 @@ struct TEvPrivate { struct TEvReadFinished : public TEventLocal { explicit TEvReadFinished(ui64 requestCookie, ui64 txId = 0) - : RequestCookie(requestCookie), TxId(txId) - {} + : RequestCookie(requestCookie) + , TxId(txId) { + } ui64 RequestCookie; ui64 TxId; diff --git a/ydb/core/tx/columnshard/counters/aggregation/table_stats.cpp b/ydb/core/tx/columnshard/counters/aggregation/table_stats.cpp new file mode 100644 index 000000000000..2bb4fd761cd6 --- /dev/null +++ b/ydb/core/tx/columnshard/counters/aggregation/table_stats.cpp @@ -0,0 +1,41 @@ +#include "table_stats.h" + +namespace NKikimr::NColumnShard { + +void TTableStatsBuilder::FillColumnTableStats(const TSingleColumnTableCounters& stats) { + stats.FillStats(TableStats); +} + +void TTableStatsBuilder::FillColumnTableStats(const TColumnTablesCounters& stats) { + stats.FillStats(TableStats); +} + +void TTableStatsBuilder::FillTabletStats(const TTabletCountersHandle& stats) { + stats.FillStats(TableStats); +} + +void TTableStatsBuilder::FillBackgroundControllerStats(const TBackgroundControllerCounters& stats, ui64 pathId) { + stats.FillStats(pathId, TableStats); +} + +void TTableStatsBuilder::FillBackgroundControllerStats(const TBackgroundControllerCounters& stats) { + stats.FillTotalStats(TableStats); +} + +void TTableStatsBuilder::FillScanCountersStats(const TScanCounters& stats) { + stats.FillStats(TableStats); +} + +void TTableStatsBuilder::FillExecutorStats(const NTabletFlatExecutor::NFlatExecutorSetup::IExecutor& executor) { + TableStats.SetInFlightTxCount(executor.GetStats().TxInFly); + TableStats.SetHasLoanedParts(executor.HasLoanedParts()); +} + +void TTableStatsBuilder::FillColumnEngineStats(const NOlap::TColumnEngineStats& stats) { + auto activeStats = stats.Active(); // data stats excluding inactive and evicted + TableStats.SetRowCount(activeStats.Rows); + TableStats.SetDataSize(activeStats.Bytes); + TableStats.SetPartCount(activeStats.Portions); +} + +} // namespace NKikimr::NColumnShard \ No newline at end of file diff --git a/ydb/core/tx/columnshard/counters/aggregation/table_stats.h b/ydb/core/tx/columnshard/counters/aggregation/table_stats.h new file mode 100644 index 000000000000..86870a2ef2d6 --- /dev/null +++ b/ydb/core/tx/columnshard/counters/aggregation/table_stats.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NColumnShard { + +class TTableStatsBuilder { +private: + ::NKikimrTableStats::TTableStats& TableStats; + +public: + TTableStatsBuilder(::NKikimrTableStats::TTableStats& tableStats) + : TableStats(tableStats) { + } + + void FillColumnTableStats(const TSingleColumnTableCounters& stats); + void FillColumnTableStats(const TColumnTablesCounters& stats); + + void FillTabletStats(const TTabletCountersHandle& stats); + + void FillBackgroundControllerStats(const TBackgroundControllerCounters& stats, ui64 pathId); + void FillBackgroundControllerStats(const TBackgroundControllerCounters& stats); + + void FillScanCountersStats(const TScanCounters& stats); + + void FillExecutorStats(const NTabletFlatExecutor::NFlatExecutorSetup::IExecutor& executor); + + void FillColumnEngineStats(const NOlap::TColumnEngineStats& stats); +}; + +} // namespace NKikimr::NColumnShard \ No newline at end of file diff --git a/ydb/core/tx/columnshard/counters/aggregation/ya.make b/ydb/core/tx/columnshard/counters/aggregation/ya.make new file mode 100644 index 000000000000..f9236d2f1bcb --- /dev/null +++ b/ydb/core/tx/columnshard/counters/aggregation/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + table_stats.cpp +) + +PEERDIR( + ydb/core/protos + ydb/core/base +) + +END() diff --git a/ydb/core/tx/columnshard/counters/background_controller.cpp b/ydb/core/tx/columnshard/counters/background_controller.cpp new file mode 100644 index 000000000000..fcc89f6ee940 --- /dev/null +++ b/ydb/core/tx/columnshard/counters/background_controller.cpp @@ -0,0 +1,18 @@ +#include "background_controller.h" + +#include +#include + +namespace NKikimr::NColumnShard { + +void TBackgroundControllerCounters::OnCompactionFinish(ui64 pathId) { + TInstant now = TAppData::TimeProvider->Now(); + TInstant& lastFinish = LastCompactionFinishByPathId[pathId]; + lastFinish = std::max(lastFinish, now); + + if (LastCompactionFinish < now) { + LastCompactionFinish = now; + } +} + +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/counters/background_controller.h b/ydb/core/tx/columnshard/counters/background_controller.h new file mode 100644 index 000000000000..d46d7cdacf91 --- /dev/null +++ b/ydb/core/tx/columnshard/counters/background_controller.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr::NColumnShard { + +class TBackgroundControllerCounters { +private: + THashMap LastCompactionFinishByPathId; + TInstant LastCompactionFinish; + +public: + void OnCompactionFinish(ui64 pathId); + + void FillStats(ui64 pathId, ::NKikimrTableStats::TTableStats& output) const { + output.SetLastFullCompactionTs(GetLastCompactionFinishInstant(pathId).value_or(TInstant::Zero()).Seconds()); + } + + void FillTotalStats(::NKikimrTableStats::TTableStats& output) const { + output.SetLastFullCompactionTs(LastCompactionFinish.Seconds()); + } + +private: + std::optional GetLastCompactionFinishInstant(const ui64 pathId) const { + auto findInstant = LastCompactionFinishByPathId.FindPtr(pathId); + if (!findInstant) { + return std::nullopt; + } + return *findInstant; + } +}; + +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/counters/column_tables.cpp b/ydb/core/tx/columnshard/counters/column_tables.cpp new file mode 100644 index 000000000000..51b9ecf6283e --- /dev/null +++ b/ydb/core/tx/columnshard/counters/column_tables.cpp @@ -0,0 +1,13 @@ +#include "column_tables.h" + +namespace NKikimr::NColumnShard { + +std::shared_ptr TColumnTablesCounters::GetPathIdCounter(ui64 pathId) { + auto findCounter = PathIdCounters.FindPtr(pathId); + if (findCounter) { + return *findCounter; + } + return PathIdCounters.emplace(pathId, std::make_shared(*this)).first->second; +} + +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/counters/column_tables.h b/ydb/core/tx/columnshard/counters/column_tables.h new file mode 100644 index 000000000000..61a546b1cd99 --- /dev/null +++ b/ydb/core/tx/columnshard/counters/column_tables.h @@ -0,0 +1,85 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace NKikimr::NColumnShard { + +class TSingleColumnTableCounters; + +class TColumnTablesCounters { +private: + YDB_READONLY_CONST(std::shared_ptr, LastAccessTime); + YDB_READONLY_CONST(std::shared_ptr, LastUpdateTime); + + THashMap> PathIdCounters; + + friend class TSingleColumnTableCounters; + +public: + TColumnTablesCounters() + : LastAccessTime(std::make_shared()) + , LastUpdateTime(std::make_shared()) { + } + + void FillStats(::NKikimrTableStats::TTableStats& output) const { + output.SetLastAccessTime(LastAccessTime->MilliSeconds()); + output.SetLastUpdateTime(LastUpdateTime->MilliSeconds()); + } + + std::shared_ptr GetPathIdCounter(ui64 pathId); +}; + +class TSingleColumnTableCounters { +private: + YDB_READONLY(TInstant, PathIdLastAccessTime, TInstant::Zero()); + YDB_READONLY(TInstant, PathIdLastUpdateTime, TInstant::Zero()); + + const std::shared_ptr TotalLastAccessTime; + const std::shared_ptr TotalLastUpdateTime; + +public: + TSingleColumnTableCounters(TColumnTablesCounters& owner) + : TotalLastAccessTime(owner.LastAccessTime) + , TotalLastUpdateTime(owner.LastUpdateTime) { + } + + void OnAccess() { + UpdateLastAccessTime(TAppData::TimeProvider->Now()); + } + + void OnUpdate() { + TInstant now = TAppData::TimeProvider->Now(); + UpdateLastUpdateTime(now); + UpdateLastAccessTime(now); + } + + void FillStats(::NKikimrTableStats::TTableStats& output) const { + output.SetLastAccessTime(PathIdLastAccessTime.MilliSeconds()); + output.SetLastUpdateTime(PathIdLastUpdateTime.MilliSeconds()); + } + +private: + void UpdateLastAccessTime(TInstant value) { + if (PathIdLastAccessTime < value) { + PathIdLastAccessTime = value; + } + if (*TotalLastAccessTime < value) { + *TotalLastAccessTime = value; + } + } + + void UpdateLastUpdateTime(TInstant value) { + if (PathIdLastUpdateTime < value) { + PathIdLastUpdateTime = value; + } + if (*TotalLastUpdateTime < value) { + *TotalLastUpdateTime = value; + } + } +}; + +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/counters/columnshard.cpp b/ydb/core/tx/columnshard/counters/columnshard.cpp index 33b9b2fa47a0..7bac541998f6 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.cpp +++ b/ydb/core/tx/columnshard/counters/columnshard.cpp @@ -6,9 +6,12 @@ namespace NKikimr::NColumnShard { -TCSCounters::TCSCounters() +TCSCounters::TCSCounters(std::shared_ptr tabletCounters) : TBase("CS") + , TabletCounters(std::move(tabletCounters)) , Initialization(*this) { + Y_ABORT_UNLESS(TabletCounters); + StartBackgroundCount = TBase::GetDeriviative("StartBackground/Count"); TooEarlyBackgroundCount = TBase::GetDeriviative("TooEarlyBackground/Count"); SetupCompactionCount = TBase::GetDeriviative("SetupCompaction/Count"); diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h index 07b2bbe0ca60..b08ad71355e3 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.h +++ b/ydb/core/tx/columnshard/counters/columnshard.h @@ -4,6 +4,7 @@ #include #include +#include namespace NKikimr::NColumnShard { @@ -69,6 +70,8 @@ class TCSCounters: public TCommonCountersOwner { private: using TBase = TCommonCountersOwner; + std::shared_ptr TabletCounters; + NMonitoring::TDynamicCounters::TCounterPtr StartBackgroundCount; NMonitoring::TDynamicCounters::TCounterPtr TooEarlyBackgroundCount; NMonitoring::TDynamicCounters::TCounterPtr SetupCompactionCount; @@ -183,27 +186,36 @@ class TCSCounters: public TCommonCountersOwner { SplitCompactionGranulePortionsCount->SetValue(portionsCount); } - void OnOverloadInsertTable(const ui64 size) const { + void OnWriteOverloadDisk() const { + TabletCounters->IncCounter(COUNTER_OUT_OF_SPACE); + } + + void OnWriteOverloadInsertTable(const ui64 size) const { + TabletCounters->IncCounter(COUNTER_WRITE_OVERLOAD); OverloadInsertTableBytes->Add(size); OverloadInsertTableCount->Add(1); } - void OnOverloadMetadata(const ui64 size) const { + void OnWriteOverloadMetadata(const ui64 size) const { + TabletCounters->IncCounter(COUNTER_WRITE_OVERLOAD); OverloadMetadataBytes->Add(size); OverloadMetadataCount->Add(1); } - void OnOverloadShardTx(const ui64 size) const { + void OnWriteOverloadShardTx(const ui64 size) const { + TabletCounters->IncCounter(COUNTER_WRITE_OVERLOAD); OverloadShardTxBytes->Add(size); OverloadShardTxCount->Add(1); } - void OnOverloadShardWrites(const ui64 size) const { + void OnWriteOverloadShardWrites(const ui64 size) const { + TabletCounters->IncCounter(COUNTER_WRITE_OVERLOAD); OverloadShardWritesBytes->Add(size); OverloadShardWritesCount->Add(1); } - void OnOverloadShardWritesSize(const ui64 size) const { + void OnWriteOverloadShardWritesSize(const ui64 size) const { + TabletCounters->IncCounter(COUNTER_WRITE_OVERLOAD); OverloadShardWritesSizeBytes->Add(size); OverloadShardWritesSizeCount->Add(1); } @@ -254,7 +266,7 @@ class TCSCounters: public TCommonCountersOwner { SetupCleanupCount->Add(1); } - TCSCounters(); + TCSCounters(std::shared_ptr tabletCounters); }; } diff --git a/ydb/core/tx/columnshard/counters/counters_manager.h b/ydb/core/tx/columnshard/counters/counters_manager.h new file mode 100644 index 000000000000..ca3f603d1592 --- /dev/null +++ b/ydb/core/tx/columnshard/counters/counters_manager.h @@ -0,0 +1,47 @@ +#pragma once + +#include "columnshard.h" +#include "indexation.h" +#include "scan.h" +#include "column_tables.h" +#include "writes_monitor.h" +#include "tablet_counters.h" +#include "background_controller.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NColumnShard { + +class TCountersManager { +private: + YDB_READONLY_DEF(std::shared_ptr, TabletCounters); + YDB_READONLY_DEF(std::shared_ptr, WritesMonitor); + + YDB_READONLY_DEF(std::shared_ptr, BackgroundControllerCounters); + YDB_READONLY_DEF(std::shared_ptr, ColumnTablesCounters); + + YDB_READONLY(TCSCounters, CSCounters, TCSCounters(TabletCounters)); + YDB_READONLY(TIndexationCounters, EvictionCounters, TIndexationCounters("Eviction")); + YDB_READONLY(TIndexationCounters, IndexationCounters, TIndexationCounters("Indexation")); + YDB_READONLY(TIndexationCounters, CompactionCounters, TIndexationCounters("GeneralCompaction")); + YDB_READONLY(TScanCounters, ScanCounters, TScanCounters("Scan")); + YDB_READONLY_DEF(std::shared_ptr, SubscribeCounters); + +public: + TCountersManager(TTabletCountersBase& tabletCounters) + : TabletCounters(std::make_shared(tabletCounters)) + , WritesMonitor(std::make_shared(tabletCounters)) + , BackgroundControllerCounters(std::make_shared()) + , ColumnTablesCounters(std::make_shared()) + , SubscribeCounters(std::make_shared()) { + } +}; + +} // namespace NKikimr::NColumnShard \ No newline at end of file diff --git a/ydb/core/tx/columnshard/counters/scan.cpp b/ydb/core/tx/columnshard/counters/scan.cpp index 075aa0e880ec..cdfd42aa9bc4 100644 --- a/ydb/core/tx/columnshard/counters/scan.cpp +++ b/ydb/core/tx/columnshard/counters/scan.cpp @@ -88,12 +88,14 @@ TScanCounters::TScanCounters(const TString& module) ScanIntervalState = std::make_shared(*this); ResourcesSubscriberCounters = std::make_shared(); ScanDurationByStatus.resize((ui32)EStatusFinish::COUNT); + ScansFinishedByStatus.resize((ui32)EStatusFinish::COUNT); ui32 idx = 0; for (auto&& i : GetEnumAllValues()) { if (i == EStatusFinish::COUNT) { continue; } ScanDurationByStatus[(ui32)i] = TBase::GetHistogram("ScanDuration/" + ::ToString(i) + "/Milliseconds", NMonitoring::ExponentialHistogram(18, 2, 1)); + ScansFinishedByStatus[(ui32)i] = TBase::GetDeriviative("ScansFinished/" + ::ToString(i)); AFL_VERIFY(idx == (ui32)i); ++idx; } @@ -103,4 +105,8 @@ NKikimr::NColumnShard::TScanAggregations TScanCounters::BuildAggregations() { return TScanAggregations(GetModuleId()); } +void TScanCounters::FillStats(::NKikimrTableStats::TTableStats& output) const { + output.SetRangeReads(ScansFinishedByStatus[(ui32)EStatusFinish::Success]->Val()); +} + } diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h index efaec82c563e..053599d1580c 100644 --- a/ydb/core/tx/columnshard/counters/scan.h +++ b/ydb/core/tx/columnshard/counters/scan.h @@ -1,6 +1,7 @@ #pragma once #include "common/owner.h" #include "common/histogram.h" +#include #include #include #include @@ -127,6 +128,7 @@ class TScanCounters: public TCommonCountersOwner { NMonitoring::TDynamicCounters::TCounterPtr AckWaitingDuration; std::vector ScanDurationByStatus; + std::vector ScansFinishedByStatus; NMonitoring::TDynamicCounters::TCounterPtr NoScanRecords; NMonitoring::TDynamicCounters::TCounterPtr NoScanIntervals; @@ -212,9 +214,10 @@ class TScanCounters: public TCommonCountersOwner { LogScanIntervals->Add(1); } - void OnScanDuration(const EStatusFinish status, const TDuration d) const { + void OnScanFinished(const EStatusFinish status, const TDuration d) const { AFL_VERIFY((ui32)status < ScanDurationByStatus.size()); ScanDurationByStatus[(ui32)status]->Collect(d.MilliSeconds()); + ScansFinishedByStatus[(ui32)status]->Add(1); } void AckWaitingInfo(const TDuration d) const { @@ -257,6 +260,8 @@ class TScanCounters: public TCommonCountersOwner { } TScanAggregations BuildAggregations(); + + void FillStats(::NKikimrTableStats::TTableStats& output) const; }; class TCounterGuard: TNonCopyable { diff --git a/ydb/core/tx/columnshard/counters/tablet_counters.h b/ydb/core/tx/columnshard/counters/tablet_counters.h new file mode 100644 index 000000000000..7a01bc9becb3 --- /dev/null +++ b/ydb/core/tx/columnshard/counters/tablet_counters.h @@ -0,0 +1,123 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NColumnShard { + +class TTabletCountersHandle { +private: + TTabletCountersBase& TabletCounters; + +public: + TTabletCountersHandle(TTabletCountersBase& stats) + : TabletCounters(stats) { + } + + void SetCounter(NColumnShard::ESimpleCounters counter, ui64 num) const { + TabletCounters.Simple()[counter].Set(num); + } + + void IncCounter(NColumnShard::ECumulativeCounters counter, ui64 num = 1) const { + TabletCounters.Cumulative()[counter].Increment(num); + } + + void IncCounter(NColumnShard::EPercentileCounters counter, const TDuration& latency) const { + TabletCounters.Percentile()[counter].IncrementFor(latency.MicroSeconds()); + } + + void IncCounter(NDataShard::ESimpleCounters counter, ui64 num = 1) const { + TabletCounters.Simple()[counter].Add(num); + } + + void IncCounter(NDataShard::ECumulativeCounters counter, ui64 num = 1) const { + TabletCounters.Cumulative()[counter].Increment(num); + } + + void IncCounter(NDataShard::EPercentileCounters counter, ui64 num) const { + TabletCounters.Percentile()[counter].IncrementFor(num); + } + + void IncCounter(NDataShard::EPercentileCounters counter, const TDuration& latency) const { + TabletCounters.Percentile()[counter].IncrementFor(latency.MilliSeconds()); + } + + ui64 GetValue(NColumnShard::ESimpleCounters counter) const { + return TabletCounters.Simple()[counter].Get(); + } + + ui64 GetValue(NColumnShard::ECumulativeCounters counter) const { + return TabletCounters.Cumulative()[counter].Get(); + } + + const TTabletPercentileCounter& GetValue(NColumnShard::EPercentileCounters counter) const { + return TabletCounters.Percentile()[counter]; + } + + ui64 GetValue(NDataShard::ESimpleCounters counter) const { + return TabletCounters.Simple()[counter].Get(); + } + + ui64 GetValue(NDataShard::ECumulativeCounters counter) const { + return TabletCounters.Cumulative()[counter].Get(); + } + + const TTabletPercentileCounter& GetCounter(NDataShard::EPercentileCounters counter) const { + return TabletCounters.Percentile()[counter]; + } + + void OnWriteSuccess(const ui64 blobsWritten, const ui64 bytesWritten) const { + IncCounter(NColumnShard::COUNTER_UPSERT_BLOBS_WRITTEN, blobsWritten); + IncCounter(NColumnShard::COUNTER_UPSERT_BYTES_WRITTEN, bytesWritten); + // self.Stats.GetTabletCounters().IncCounter(NColumnShard::COUNTER_RAW_BYTES_UPSERTED, insertedBytes); + IncCounter(NColumnShard::COUNTER_WRITE_SUCCESS); + } + + void OnWriteFailure() const { + IncCounter(NColumnShard::COUNTER_WRITE_FAIL); + } + + void OnScanStarted(const NOlap::TSelectInfo::TStats& countersDelta) const { + IncCounter(NColumnShard::COUNTER_READ_INDEX_PORTIONS, countersDelta.Portions); + IncCounter(NColumnShard::COUNTER_READ_INDEX_BLOBS, countersDelta.Blobs); + IncCounter(NColumnShard::COUNTER_READ_INDEX_ROWS, countersDelta.Rows); + IncCounter(NColumnShard::COUNTER_READ_INDEX_BYTES, countersDelta.Bytes); + } + + void OnWriteCommitted(const NOlap::TInsertionSummary::TCounters& countersDelta) const { + IncCounter(COUNTER_BLOBS_COMMITTED, countersDelta.Rows); + IncCounter(COUNTER_BYTES_COMMITTED, countersDelta.Bytes); + IncCounter(COUNTER_RAW_BYTES_COMMITTED, countersDelta.RawBytes); + } + + void OnCompactionWriteIndexCompleted(bool success, const ui64 blobsWritten, const ui64 bytesWritten) const { + IncCounter(success ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL); + IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, blobsWritten); + IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, bytesWritten); + } + + void OnInsertionWriteIndexCompleted(const ui64 blobsWritten, const ui64 bytesWritten, const TDuration duration) const { + IncCounter(NColumnShard::COUNTER_INDEXING_BLOBS_WRITTEN, blobsWritten); + IncCounter(NColumnShard::COUNTER_INDEXING_BYTES_WRITTEN, bytesWritten); + IncCounter(NColumnShard::COUNTER_INDEXING_TIME, duration.MilliSeconds()); + } + + void FillStats(::NKikimrTableStats::TTableStats& output) const { + output.SetRowUpdates(GetValue(COUNTER_WRITE_SUCCESS)); + output.SetRowDeletes(0); // manual deletes are not supported + output.SetRowReads(0); // all reads are range reads + output.SetRangeReadRows(GetValue(COUNTER_READ_INDEX_ROWS)); + + output.SetImmediateTxCompleted(GetValue(COUNTER_IMMEDIATE_TX_COMPLETED)); + output.SetTxRejectedByOverload(GetValue(COUNTER_WRITE_OVERLOAD)); + output.SetTxRejectedBySpace(GetValue(COUNTER_OUT_OF_SPACE)); + output.SetPlannedTxCompleted(GetValue(COUNTER_PLANNED_TX_COMPLETED)); + output.SetTxCompleteLagMsec(GetValue(COUNTER_TX_COMPLETE_LAG)); + } +}; + +} diff --git a/ydb/core/tx/columnshard/counters/writes_monitor.h b/ydb/core/tx/columnshard/counters/writes_monitor.h new file mode 100644 index 000000000000..78c5ddddd11f --- /dev/null +++ b/ydb/core/tx/columnshard/counters/writes_monitor.h @@ -0,0 +1,62 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr::NColumnShard { + +class TWritesMonitor { +private: + TTabletCountersBase& Stats; + + YDB_READONLY(ui64, WritesInFlight, 0); + YDB_READONLY(ui64, WritesSizeInFlight, 0); + +public: + class TGuard: public TNonCopyable { + friend class TWritesMonitor; + + private: + TWritesMonitor& Owner; + + explicit TGuard(TWritesMonitor& owner) + : Owner(owner) { + } + + public: + ~TGuard() { + Owner.UpdateCounters(); + } + }; + + TWritesMonitor(TTabletCountersBase& stats) + : Stats(stats) { + } + + TGuard OnStartWrite(const ui64 dataSize) { + ++WritesInFlight; + WritesSizeInFlight += dataSize; + return TGuard(*this); + } + + TGuard OnFinishWrite(const ui64 dataSize, const ui32 writesCount = 1) { + Y_ABORT_UNLESS(WritesInFlight > 0); + Y_ABORT_UNLESS(WritesSizeInFlight >= dataSize); + WritesInFlight -= writesCount; + WritesSizeInFlight -= dataSize; + return TGuard(*this); + } + + TString DebugString() const { + return TStringBuilder() << "{object=write_monitor;count=" << WritesInFlight << ";size=" << WritesSizeInFlight + << "}"; + } + +private: + void UpdateCounters() { + Stats.Simple()[COUNTER_WRITES_IN_FLY].Set(WritesInFlight); + } +}; + +} diff --git a/ydb/core/tx/columnshard/counters/ya.make b/ydb/core/tx/columnshard/counters/ya.make index 65797cb34752..0156663ffe27 100644 --- a/ydb/core/tx/columnshard/counters/ya.make +++ b/ydb/core/tx/columnshard/counters/ya.make @@ -1,6 +1,8 @@ LIBRARY() SRCS( + background_controller.cpp + column_tables.cpp indexation.cpp scan.cpp engine_logs.cpp @@ -13,6 +15,7 @@ SRCS( PEERDIR( library/cpp/monlib/dynamic_counters + ydb/core/tx/columnshard/counters/aggregation ydb/core/tx/columnshard/counters/common ydb/core/base ) diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp index 7dfe36689d48..7d37981a9039 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.cpp @@ -46,7 +46,7 @@ void TColumnEngineChanges::WriteIndexOnComplete(NColumnShard::TColumnShard* self DoWriteIndexOnComplete(self, context); if (self) { OnFinish(*self, context); - self->IncCounter(GetCounterIndex(context.FinishedSuccessfully)); + self->Counters.GetTabletCounters()->IncCounter(GetCounterIndex(context.FinishedSuccessfully)); } } diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup_portions.cpp b/ydb/core/tx/columnshard/engines/changes/cleanup_portions.cpp index 3aa29ed01a13..28aef14597e3 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup_portions.cpp +++ b/ydb/core/tx/columnshard/engines/changes/cleanup_portions.cpp @@ -41,9 +41,9 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::T } } if (self) { - self->IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size()); + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size()); for (auto&& p : PortionsToDrop) { - self->IncCounter(NColumnShard::COUNTER_RAW_BYTES_ERASED, p.GetTotalRawBytes()); + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_RAW_BYTES_ERASED, p.GetTotalRawBytes()); } } } diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index a94d160158e6..2441ce4248b8 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -53,7 +53,7 @@ void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { void TCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) { TBase::DoWriteIndexOnComplete(self, context); if (self) { - self->IncCounter(NColumnShard::COUNTER_COMPACTION_TIME, context.Duration.MilliSeconds()); + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_COMPACTION_TIME, context.Duration.MilliSeconds()); } } diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 3a92f179abf4..ea7b6ddc2eb4 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -195,17 +195,15 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc void TGeneralCompactColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) { TBase::DoWriteIndexOnComplete(self, context); if (self) { - self->IncCounter( - context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL); - self->IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten); - self->IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten); + self->Counters.GetTabletCounters()->OnCompactionWriteIndexCompleted( + context.FinishedSuccessfully, context.BlobsWritten, context.BytesWritten); } } void TGeneralCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { TBase::DoStart(self); auto& g = *GranuleMeta; - self.CSCounters.OnSplitCompactionInfo( + self.Counters.GetCSCounters().OnSplitCompactionInfo( g.GetAdditiveSummary().GetCompacted().GetTotalPortionsSize(), g.GetAdditiveSummary().GetCompacted().GetPortionsCount()); } diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 4ad95829076b..9d1058e880a5 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -36,9 +36,7 @@ void TInsertColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnSha if (!DataToIndex.empty()) { self->UpdateInsertTableCounters(); } - self->IncCounter(NColumnShard::COUNTER_INDEXING_BLOBS_WRITTEN, context.BlobsWritten); - self->IncCounter(NColumnShard::COUNTER_INDEXING_BYTES_WRITTEN, context.BytesWritten); - self->IncCounter(NColumnShard::COUNTER_INDEXING_TIME, context.Duration.MilliSeconds()); + self->Counters.GetTabletCounters()->OnInsertionWriteIndexCompleted(context.BlobsWritten, context.BytesWritten, context.Duration); } } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 5b3d988abfba..e24fa9e8988c 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -42,13 +42,13 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self case NOlap::TPortionMeta::EProduced::UNSPECIFIED: Y_ABORT_UNLESS(false); // unexpected case NOlap::TPortionMeta::EProduced::INSERTED: - self->IncCounter(NColumnShard::COUNTER_INDEXING_PORTIONS_WRITTEN); + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_INDEXING_PORTIONS_WRITTEN); break; case NOlap::TPortionMeta::EProduced::COMPACTED: - self->IncCounter(NColumnShard::COUNTER_COMPACTION_PORTIONS_WRITTEN); + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_COMPACTION_PORTIONS_WRITTEN); break; case NOlap::TPortionMeta::EProduced::SPLIT_COMPACTED: - self->IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_PORTIONS_WRITTEN); + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_PORTIONS_WRITTEN); break; case NOlap::TPortionMeta::EProduced::EVICTED: Y_ABORT("Unexpected evicted case"); @@ -58,19 +58,19 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self break; } } - self->IncCounter(NColumnShard::COUNTER_PORTIONS_DEACTIVATED, PortionsToRemove.size()); + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_DEACTIVATED, PortionsToRemove.size()); THashSet blobsDeactivated; for (auto& [_, portionInfo] : PortionsToRemove) { for (auto& rec : portionInfo.Records) { blobsDeactivated.emplace(portionInfo.GetBlobId(rec.BlobRange.GetBlobIdxVerified())); } - self->IncCounter(NColumnShard::COUNTER_RAW_BYTES_DEACTIVATED, portionInfo.GetTotalRawBytes()); + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_RAW_BYTES_DEACTIVATED, portionInfo.GetTotalRawBytes()); } - self->IncCounter(NColumnShard::COUNTER_BLOBS_DEACTIVATED, blobsDeactivated.size()); + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_BLOBS_DEACTIVATED, blobsDeactivated.size()); for (auto& blobId : blobsDeactivated) { - self->IncCounter(NColumnShard::COUNTER_BYTES_DEACTIVATED, blobId.BlobSize()); + self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_BYTES_DEACTIVATED, blobId.BlobSize()); } } { diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp index c88a69aabc9c..b4c1f22f601d 100644 --- a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp @@ -402,7 +402,7 @@ void TColumnShardScan::Finish(const NColumnShard::TScanCounters::EStatusFinish s Send(ColumnShardActorId, new NColumnShard::TEvPrivate::TEvReadFinished(RequestCookie, TxId)); AFL_VERIFY(StartInstant); - ScanCountersPool.OnScanDuration(status, TMonotonic::Now() - *StartInstant); + ScanCountersPool.OnScanFinished(status, TMonotonic::Now() - *StartInstant); ReportStats(); PassAway(); } diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp index 5decb79c2203..33d2dc3e73cb 100644 --- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp +++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp @@ -80,13 +80,13 @@ void TTxInternalScan::Complete(const TActorContext& ctx) { auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder() << "Table " << request.GetPathId() << " (shard " << Self->TabletID() << ") scan failed, reason: " << requestCookie.GetErrorMessage()); NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add()); - Self->ScanCounters.OnScanDuration(NColumnShard::TScanCounters::EStatusFinish::CannotAddInFlight, TDuration::Zero()); + Self->Counters.GetScanCounters().OnScanFinished(NColumnShard::TScanCounters::EStatusFinish::CannotAddInFlight, TDuration::Zero()); ctx.Send(scanComputeActor, ev.Release()); return; } auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(), TComputeShardingPolicy(), ScanId, TxId, ScanGen, *requestCookie, Self->TabletID(), TDuration::Max(), ReadMetadataRange, - NKikimrDataEvents::FORMAT_ARROW, Self->ScanCounters)); + NKikimrDataEvents::FORMAT_ARROW, Self->Counters.GetScanCounters())); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxInternalScan started")("actor_id", scanActor)("trace_detailed", detailedInfo); } diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp index ab0d41db1931..6febbcb380fa 100644 --- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp +++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp @@ -191,7 +191,7 @@ void TTxScan::Complete(const TActorContext& ctx) { auto dataFormat = request.GetDataFormat(); const TDuration timeout = TDuration::MilliSeconds(request.GetTimeoutMs()); if (scanGen > 1) { - Self->IncCounter(NColumnShard::COUNTER_SCAN_RESTARTED); + Self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_SCAN_RESTARTED); } const NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build() ("tx_id", txId)("scan_id", scanId)("gen", scanGen)("table", table)("snapshot", snapshot)("tablet", Self->TabletID())("timeout", timeout); @@ -226,22 +226,18 @@ void TTxScan::Complete(const TActorContext& ctx) { auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder() << "Table " << table << " (shard " << Self->TabletID() << ") scan failed, reason: " << requestCookie.GetErrorMessage()); NYql::IssueToMessage(issue, ev->Record.MutableIssues()->Add()); - Self->ScanCounters.OnScanDuration(NColumnShard::TScanCounters::EStatusFinish::CannotAddInFlight, TDuration::Zero()); + Self->Counters.GetScanCounters().OnScanFinished(NColumnShard::TScanCounters::EStatusFinish::CannotAddInFlight, TDuration::Zero()); ctx.Send(scanComputeActor, ev.Release()); return; } - auto statsDelta = Self->InFlightReadsTracker.GetSelectStatsDelta(); - Self->IncCounter(NColumnShard::COUNTER_READ_INDEX_PORTIONS, statsDelta.Portions); - Self->IncCounter(NColumnShard::COUNTER_READ_INDEX_BLOBS, statsDelta.Blobs); - Self->IncCounter(NColumnShard::COUNTER_READ_INDEX_ROWS, statsDelta.Rows); - Self->IncCounter(NColumnShard::COUNTER_READ_INDEX_BYTES, statsDelta.Bytes); + Self->Counters.GetTabletCounters()->OnScanStarted(Self->InFlightReadsTracker.GetSelectStatsDelta()); TComputeShardingPolicy shardingPolicy; AFL_VERIFY(shardingPolicy.DeserializeFromProto(request.GetComputeShardingPolicy())); auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(), - shardingPolicy, scanId, txId, scanGen, *requestCookie, Self->TabletID(), timeout, ReadMetadataRange, dataFormat, Self->ScanCounters)); + shardingPolicy, scanId, txId, scanGen, *requestCookie, Self->TabletID(), timeout, ReadMetadataRange, dataFormat, Self->Counters.GetScanCounters())); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "TTxScan started")("actor_id", scanActor)("trace_detailed", detailedInfo); } diff --git a/ydb/core/tx/columnshard/hooks/testing/ro_controller.h b/ydb/core/tx/columnshard/hooks/testing/ro_controller.h index c271878ea838..3d9576b20e7d 100644 --- a/ydb/core/tx/columnshard/hooks/testing/ro_controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/ro_controller.h @@ -82,10 +82,10 @@ class TReadOnlyController: public ICSController { void WaitIndexation(const TDuration d) const { TInstant start = TInstant::Now(); - ui32 compactionsStart = GetInsertStartedCounter().Val(); + ui32 insertsStart = GetInsertStartedCounter().Val(); while (Now() - start < d) { - if (compactionsStart != GetInsertStartedCounter().Val()) { - compactionsStart = GetInsertStartedCounter().Val(); + if (insertsStart != GetInsertStartedCounter().Val()) { + insertsStart = GetInsertStartedCounter().Val(); start = TInstant::Now(); } Cerr << "WAIT_INDEXATION: " << GetInsertStartedCounter().Val() << Endl; diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index 4ff54d395e73..cffffbe7b506 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -49,12 +49,9 @@ namespace NKikimr::NColumnShard { return owner.TablesManager.HasTable(pathId); }; - auto counters = owner.InsertTable->Commit(dbTable, snapshot.GetPlanStep(), snapshot.GetTxId(), { gWriteId }, + const auto counters = owner.InsertTable->Commit(dbTable, snapshot.GetPlanStep(), snapshot.GetTxId(), { gWriteId }, pathExists); - - owner.IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows); - owner.IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes); - owner.IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes); + owner.Counters.GetTabletCounters()->OnWriteCommitted(counters); } owner.UpdateInsertTableCounters(); } diff --git a/ydb/core/tx/columnshard/resource_subscriber/counters.cpp b/ydb/core/tx/columnshard/resource_subscriber/counters.cpp index 785ca04ba8f9..45f785c10223 100644 --- a/ydb/core/tx/columnshard/resource_subscriber/counters.cpp +++ b/ydb/core/tx/columnshard/resource_subscriber/counters.cpp @@ -1,9 +1,12 @@ #include "counters.h" +#include + namespace NKikimr::NOlap::NResourceBroker::NSubscribe { std::shared_ptr TSubscriberCounters::GetTypeCounters(const TString& resourceType) { + TGuard lock(Mutex); auto it = ResourceTypeCounters.find(resourceType); if (it == ResourceTypeCounters.end()) { it = ResourceTypeCounters.emplace(resourceType, std::make_shared(*this, resourceType)).first; diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h index 2db77ae315a7..3f1d1ea5d0ee 100644 --- a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h +++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h @@ -58,9 +58,9 @@ namespace NKikimr::NColumnShard { auto counters = owner.InsertTable->Commit(dbTable, version.GetPlanStep(), version.GetTxId(), WriteIds, pathExists); - owner.IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows); - owner.IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes); - owner.IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes); + owner.Counters.GetTabletCounters()->IncCounter(COUNTER_BLOBS_COMMITTED, counters.Rows); + owner.Counters.GetTabletCounters()->IncCounter(COUNTER_BYTES_COMMITTED, counters.Bytes); + owner.Counters.GetTabletCounters()->IncCounter(COUNTER_RAW_BYTES_COMMITTED, counters.RawBytes); NIceDb::TNiceDb db(txc.DB); for (TWriteId writeId : WriteIds) { diff --git a/ydb/core/tx/columnshard/transactions/operators/propose_tx.cpp b/ydb/core/tx/columnshard/transactions/operators/propose_tx.cpp index c7d39da52740..1fd63abaf084 100644 --- a/ydb/core/tx/columnshard/transactions/operators/propose_tx.cpp +++ b/ydb/core/tx/columnshard/transactions/operators/propose_tx.cpp @@ -7,7 +7,7 @@ void IProposeTxOperator::DoSendReply(TColumnShard& owner, const TActorContext& c std::unique_ptr evResult = std::make_unique( owner.TabletID(), txInfo.TxKind, txInfo.TxId, GetProposeStartInfoVerified().GetStatus(), GetProposeStartInfoVerified().GetStatusMessage()); if (IsFail()) { - owner.IncCounter(COUNTER_PREPARE_ERROR); + owner.Counters.GetTabletCounters()->IncCounter(COUNTER_PREPARE_ERROR); AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", GetProposeStartInfoVerified().GetStatusMessage())("tablet_id", owner.TabletID())("tx_id", txInfo.TxId); } else { evResult->Record.SetMinStep(txInfo.MinStep); @@ -15,7 +15,7 @@ void IProposeTxOperator::DoSendReply(TColumnShard& owner, const TActorContext& c if (owner.ProcessingParams) { evResult->Record.MutableDomainCoordinators()->CopyFrom(owner.ProcessingParams->GetCoordinators()); } - owner.IncCounter(COUNTER_PREPARE_SUCCESS); + owner.Counters.GetTabletCounters()->IncCounter(COUNTER_PREPARE_SUCCESS); } ctx.Send(txInfo.Source, evResult.release()); } diff --git a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp index 7b48c913e0fd..12d43621ddbf 100644 --- a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp +++ b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp @@ -736,6 +736,10 @@ Y_UNIT_TEST_SUITE(TOlap) { UNIT_ASSERT_GT(tabletStats.GetRowCount(), 0); UNIT_ASSERT_GT(tabletStats.GetDataSize(), 0); + UNIT_ASSERT_GT(tabletStats.GetPartCount(), 0); + UNIT_ASSERT_GT(tabletStats.GetRowUpdates(), 0); + UNIT_ASSERT_GT(tabletStats.GetImmediateTxCompleted(), 0); + UNIT_ASSERT_GT(tabletStats.GetPlannedTxCompleted(), 0); } { @@ -745,6 +749,10 @@ Y_UNIT_TEST_SUITE(TOlap) { UNIT_ASSERT_GT(tabletStats.GetRowCount(), 0); UNIT_ASSERT_GT(tabletStats.GetDataSize(), 0); + UNIT_ASSERT_GT(tabletStats.GetPartCount(), 0); + UNIT_ASSERT_GT(tabletStats.GetRowUpdates(), 0); + UNIT_ASSERT_GT(tabletStats.GetImmediateTxCompleted(), 0); + UNIT_ASSERT_GT(tabletStats.GetPlannedTxCompleted(), 0); } #if 0