Skip to content

Commit

Permalink
add tx progress counters (ydb-platform#7407)
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored and zverevgeny committed Sep 14, 2024
1 parent 1a7e6de commit 78abfb8
Show file tree
Hide file tree
Showing 23 changed files with 316 additions and 196 deletions.
25 changes: 7 additions & 18 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,14 +339,9 @@ void TColumnShard::FillOlapStats(
resourceMetrics->Fill(*ev->Record.MutableTabletMetrics());
}

TTableStatsBuilder statsBuilder(*ev->Record.MutableTableStats());
statsBuilder.FillColumnTableStats(*Counters.GetColumnTablesCounters());
statsBuilder.FillTabletStats(*Counters.GetTabletCounters());
statsBuilder.FillBackgroundControllerStats(*Counters.GetBackgroundControllerCounters());
statsBuilder.FillScanCountersStats(Counters.GetScanCounters());
statsBuilder.FillExecutorStats(*Executor());
if (TablesManager.HasPrimaryIndex()) {
statsBuilder.FillColumnEngineStats(TablesManager.MutablePrimaryIndex().GetTotalStats());
TTableStatsBuilder statsBuilder(Counters, Executor(), TablesManager.MutablePrimaryIndex());
statsBuilder.FillTotalTableStats(*ev->Record.MutableTableStats());
}
}

Expand All @@ -355,6 +350,9 @@ void TColumnShard::FillColumnTableStats(
std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev
) {
auto tables = TablesManager.GetTables();
std::optional<TTableStatsBuilder> tableStatsBuilder =
TablesManager.HasPrimaryIndex() ? std::make_optional<TTableStatsBuilder>(Counters, Executor(), TablesManager.MutablePrimaryIndex())
: std::nullopt;

LOG_S_DEBUG("There are stats for " << tables.size() << " tables");
for (const auto& [pathId, _] : tables) {
Expand All @@ -372,17 +370,8 @@ void TColumnShard::FillColumnTableStats(
resourceMetrics->Fill(*periodicTableStats->MutableTabletMetrics());
}

TTableStatsBuilder statsBuilder(*periodicTableStats->MutableTableStats());
statsBuilder.FillColumnTableStats(*Counters.GetColumnTablesCounters()->GetPathIdCounter(pathId));
statsBuilder.FillTabletStats(*Counters.GetTabletCounters());
statsBuilder.FillBackgroundControllerStats(*Counters.GetBackgroundControllerCounters(), pathId);
statsBuilder.FillScanCountersStats(Counters.GetScanCounters());
statsBuilder.FillExecutorStats(*Executor());
if (TablesManager.HasPrimaryIndex()) {
auto columnEngineStats = TablesManager.GetPrimaryIndexSafe().GetStats().FindPtr(pathId);
if (columnEngineStats && *columnEngineStats) {
statsBuilder.FillColumnEngineStats(**columnEngineStats);
}
if (tableStatsBuilder) {
tableStatsBuilder->FillTableStats(pathId, *(periodicTableStats->MutableTableStats()));
}

LOG_S_TRACE("Add stats for table, tableLocalID=" << pathId);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext
return;
}

Counters.GetColumnTablesCounters()->GetPathIdCounter(record.GetLocalPathId())->OnAccess();
Counters.GetColumnTablesCounters()->GetPathIdCounter(record.GetLocalPathId())->OnReadEvent();
ScanTxInFlight.insert({txId, TAppData::TimeProvider->Now()});
Counters.GetTabletCounters()->SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size());
Execute(new NOlap::NReader::TTxScan(this, ev), ctx);
Expand Down
18 changes: 9 additions & 9 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
switch (overloadReason) {
case EOverloadStatus::Disk:
Counters.GetCSCounters().OnWriteOverloadDisk();
Counters.OnWriteOverloadDisk();
break;
case EOverloadStatus::InsertTable:
Counters.GetCSCounters().OnWriteOverloadInsertTable(writeData.GetSize());
Counters.OnWriteOverloadInsertTable(writeData.GetSize());
break;
case EOverloadStatus::OverloadMetadata:
Counters.GetCSCounters().OnWriteOverloadMetadata(writeData.GetSize());
Counters.OnWriteOverloadMetadata(writeData.GetSize());
break;
case EOverloadStatus::ShardTxInFly:
Counters.GetCSCounters().OnWriteOverloadShardTx(writeData.GetSize());
Counters.OnWriteOverloadShardTx(writeData.GetSize());
break;
case EOverloadStatus::ShardWritesInFly:
Counters.GetCSCounters().OnWriteOverloadShardWrites(writeData.GetSize());
Counters.OnWriteOverloadShardWrites(writeData.GetSize());
break;
case EOverloadStatus::ShardWritesSizeInFly:
Counters.GetCSCounters().OnWriteOverloadShardWritesSize(writeData.GetSize());
Counters.OnWriteOverloadShardWritesSize(writeData.GetSize());
break;
case EOverloadStatus::None:
Y_ABORT("invalid function usage");
Expand Down Expand Up @@ -86,7 +86,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto baseAggregations = wBuffer.GetAggregations();
wBuffer.InitReplyReceived(TMonotonic::Now());

auto wg = Counters.GetWritesMonitor()->OnFinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size());
Counters.GetWritesMonitor()->OnFinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size());

for (auto&& aggr : baseAggregations) {
const auto& writeMeta = aggr->GetWriteMeta();
Expand Down Expand Up @@ -158,7 +158,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
const TString dedupId = record.GetDedupId();
const auto source = ev->Sender;

Counters.GetColumnTablesCounters()->GetPathIdCounter(tableId)->OnUpdate();
Counters.GetColumnTablesCounters()->GetPathIdCounter(tableId)->OnWriteEvent();

std::optional<ui32> granuleShardingVersion;
if (record.HasGranuleShardingVersion()) {
Expand Down Expand Up @@ -397,7 +397,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}

auto wg = Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize());
Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize());

std::optional<ui32> granuleShardingVersionId;
if (record.HasGranuleShardingVersionId()) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ NTabletPipe::TClientConfig GetPipeClientConfig() {
TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
: TActor(&TThis::StateInit)
, TTabletExecutedFlat(info, tablet, nullptr)
, TabletCountersHolder(new TProtobufTabletCounters<ESimpleCounters_descriptor, ECumulativeCounters_descriptor,
EPercentileCounters_descriptor, ETxTypes_descriptor>())
, Counters(*TabletCountersHolder)
, ProgressTxController(std::make_unique<TTxController>(*this))
, StoragesManager(std::make_shared<NOlap::TStoragesManager>(*this))
, DataLocksManager(std::make_shared<NOlap::NDataLocks::TManager>())
, PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod(
TSettings::DefaultPeriodicWakeupActivationPeriod))
, StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval(TSettings::DefaultStatsReportInterval))
, TabletCountersHolder(new TProtobufTabletCounters<ESimpleCounters_descriptor, ECumulativeCounters_descriptor,
EPercentileCounters_descriptor, ETxTypes_descriptor>())
, Counters(*TabletCountersHolder)
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
, TablesManager(StoragesManager, info->TabletID)
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,9 @@ class TColumnShard
}

private:
std::unique_ptr<TTabletCountersBase> TabletCountersHolder;
TCountersManager Counters;

std::unique_ptr<TTxController> ProgressTxController;
std::unique_ptr<TOperationsManager> OperationsManager;
std::shared_ptr<NOlap::NDataSharing::TSessionsManager> SharingSessionsManager;
Expand Down Expand Up @@ -443,9 +446,6 @@ class TColumnShard
TActorId BufferizationWriteActorId;
TActorId StatsReportPipe;

std::unique_ptr<TTabletCountersBase> TabletCountersHolder;
TCountersManager Counters;

TInFlightReadsTracker InFlightReadsTracker;
TTablesManager TablesManager;
std::shared_ptr<NSubscriber::TManager> Subscribers;
Expand Down
41 changes: 0 additions & 41 deletions ydb/core/tx/columnshard/counters/aggregation/table_stats.cpp

This file was deleted.

48 changes: 30 additions & 18 deletions ydb/core/tx/columnshard/counters/aggregation/table_stats.h
Original file line number Diff line number Diff line change
@@ -1,37 +1,49 @@
#pragma once

#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/protos/table_stats.pb.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/core/tx/columnshard/counters/column_tables.h>
#include <ydb/core/tx/columnshard/counters/tablet_counters.h>
#include <ydb/core/tx/columnshard/counters/background_controller.h>
#include <ydb/core/tx/columnshard/counters/counters_manager.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/protos/table_stats.pb.h>

namespace NKikimr::NColumnShard {

class TTableStatsBuilder {
private:
::NKikimrTableStats::TTableStats& TableStats;
TCountersManager& Counters;
const NTabletFlatExecutor::NFlatExecutorSetup::IExecutor& Executor;
NOlap::IColumnEngine& ColumnEngine;

public:
TTableStatsBuilder(::NKikimrTableStats::TTableStats& tableStats)
: TableStats(tableStats) {
TTableStatsBuilder(
TCountersManager& counters, const NTabletFlatExecutor::NFlatExecutorSetup::IExecutor* executor, NOlap::IColumnEngine& columnEngine)
: Counters(counters)
, Executor(*executor)
, ColumnEngine(columnEngine) {
}

void FillColumnTableStats(const TSingleColumnTableCounters& stats);
void FillColumnTableStats(const TColumnTablesCounters& stats);

void FillTabletStats(const TTabletCountersHandle& stats);
void FillTableStats(ui64 pathId, ::NKikimrTableStats::TTableStats& tableStats) {
Counters.FillTableStats(pathId, tableStats);

void FillBackgroundControllerStats(const TBackgroundControllerCounters& stats, ui64 pathId);
void FillBackgroundControllerStats(const TBackgroundControllerCounters& stats);
auto columnEngineStats = ColumnEngine.GetStats().FindPtr(pathId);
if (columnEngineStats && *columnEngineStats) {
auto activeStats = (*columnEngineStats)->Active();
tableStats.SetRowCount(activeStats.Rows);
tableStats.SetDataSize(activeStats.Bytes);
tableStats.SetPartCount(activeStats.Portions);
}
}

void FillScanCountersStats(const TScanCounters& stats);
void FillTotalTableStats(::NKikimrTableStats::TTableStats& tableStats) {
Counters.FillTotalTableStats(tableStats);

void FillExecutorStats(const NTabletFlatExecutor::NFlatExecutorSetup::IExecutor& executor);
tableStats.SetInFlightTxCount(Executor.GetStats().TxInFly);
tableStats.SetHasLoanedParts(Executor.HasLoanedParts());

void FillColumnEngineStats(const NOlap::TColumnEngineStats& stats);
auto activeStats = ColumnEngine.GetTotalStats().Active();
tableStats.SetRowCount(activeStats.Rows);
tableStats.SetDataSize(activeStats.Bytes);
tableStats.SetPartCount(activeStats.Portions);
}
};

} // namespace NKikimr::NColumnShard
} // namespace NKikimr::NColumnShard
4 changes: 1 addition & 3 deletions ydb/core/tx/columnshard/counters/aggregation/ya.make
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
LIBRARY()

SRCS(
table_stats.cpp
)
SRCS()

PEERDIR(
ydb/core/protos
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/counters/column_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ class TSingleColumnTableCounters {
, TotalLastUpdateTime(owner.LastUpdateTime) {
}

void OnAccess() {
void OnReadEvent() {
UpdateLastAccessTime(TAppData::TimeProvider->Now());
}

void OnUpdate() {
void OnWriteEvent() {
TInstant now = TAppData::TimeProvider->Now();
UpdateLastUpdateTime(now);
UpdateLastAccessTime(now);
Expand Down
8 changes: 3 additions & 5 deletions ydb/core/tx/columnshard/counters/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@

namespace NKikimr::NColumnShard {

TCSCounters::TCSCounters(std::shared_ptr<const TTabletCountersHandle> tabletCounters)
TCSCounters::TCSCounters()
: TBase("CS")
, TabletCounters(std::move(tabletCounters))
, Initialization(*this) {
Y_ABORT_UNLESS(TabletCounters);

, Initialization(*this)
, TxProgress(*this) {
StartBackgroundCount = TBase::GetDeriviative("StartBackground/Count");
TooEarlyBackgroundCount = TBase::GetDeriviative("TooEarlyBackground/Count");
SetupCompactionCount = TBase::GetDeriviative("SetupCompaction/Count");
Expand Down
Loading

0 comments on commit 78abfb8

Please sign in to comment.