Skip to content

Commit

Permalink
Merge 06e5119 into 4a1f83b
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Aug 2, 2024
2 parents 4a1f83b + 06e5119 commit 5578d6c
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 119 deletions.
23 changes: 4 additions & 19 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,9 @@ void TColumnShard::FillOlapStats(
resourceMetrics->Fill(*ev->Record.MutableTabletMetrics());
}

TTableStatsBuilder statsBuilder(*ev->Record.MutableTableStats());
statsBuilder.FillColumnTableStats(*Counters.GetColumnTablesCounters());
statsBuilder.FillTabletStats(*Counters.GetTabletCounters());
statsBuilder.FillBackgroundControllerStats(*Counters.GetBackgroundControllerCounters());
statsBuilder.FillScanCountersStats(Counters.GetScanCounters());
statsBuilder.FillExecutorStats(*Executor());
if (TablesManager.HasPrimaryIndex()) {
statsBuilder.FillColumnEngineStats(TablesManager.MutablePrimaryIndex().GetTotalStats());
TTableStatsBuilder statsBuilder(Counters, Executor(), TablesManager.MutablePrimaryIndex());
statsBuilder.FillTotalTableStats(*ev->Record.MutableTableStats());
}
}

Expand All @@ -343,6 +338,7 @@ void TColumnShard::FillColumnTableStats(
std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev
) {
auto tables = TablesManager.GetTables();
TTableStatsBuilder tableStatsBuilder(Counters, Executor(), TablesManager.MutablePrimaryIndex());

LOG_S_DEBUG("There are stats for " << tables.size() << " tables");
for (const auto& [pathId, _] : tables) {
Expand All @@ -360,18 +356,7 @@ void TColumnShard::FillColumnTableStats(
resourceMetrics->Fill(*periodicTableStats->MutableTabletMetrics());
}

TTableStatsBuilder statsBuilder(*periodicTableStats->MutableTableStats());
statsBuilder.FillColumnTableStats(*Counters.GetColumnTablesCounters()->GetPathIdCounter(pathId));
statsBuilder.FillTabletStats(*Counters.GetTabletCounters());
statsBuilder.FillBackgroundControllerStats(*Counters.GetBackgroundControllerCounters(), pathId);
statsBuilder.FillScanCountersStats(Counters.GetScanCounters());
statsBuilder.FillExecutorStats(*Executor());
if (TablesManager.HasPrimaryIndex()) {
auto columnEngineStats = TablesManager.GetPrimaryIndexSafe().GetStats().FindPtr(pathId);
if (columnEngineStats && *columnEngineStats) {
statsBuilder.FillColumnEngineStats(**columnEngineStats);
}
}
tableStatsBuilder.FillTableStats(pathId, *(periodicTableStats->MutableTableStats()));

LOG_S_TRACE("Add stats for table, tableLocalID=" << pathId);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext
return;
}

Counters.GetColumnTablesCounters()->GetPathIdCounter(record.GetLocalPathId())->OnAccess();
Counters.GetColumnTablesCounters()->GetPathIdCounter(record.GetLocalPathId())->OnReadEvent();
ScanTxInFlight.insert({txId, TAppData::TimeProvider->Now()});
Counters.GetTabletCounters()->SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size());
Execute(new NOlap::NReader::TTxScan(this, ev), ctx);
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto baseAggregations = wBuffer.GetAggregations();
wBuffer.InitReplyReceived(TMonotonic::Now());

auto wg = Counters.GetWritesMonitor()->OnFinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size());
Counters.GetWritesMonitor()->OnFinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size());

for (auto&& aggr : baseAggregations) {
const auto& writeMeta = aggr->GetWriteMeta();
Expand Down Expand Up @@ -156,7 +156,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
const TString dedupId = record.GetDedupId();
const auto source = ev->Sender;

Counters.GetColumnTablesCounters()->GetPathIdCounter(tableId)->OnUpdate();
Counters.GetColumnTablesCounters()->GetPathIdCounter(tableId)->OnWriteEvent();

std::optional<ui32> granuleShardingVersion;
if (record.HasGranuleShardingVersion()) {
Expand Down Expand Up @@ -375,7 +375,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}

auto wg = Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize());
Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize());

std::optional<ui32> granuleShardingVersionId;
if (record.HasGranuleShardingVersionId()) {
Expand Down
41 changes: 0 additions & 41 deletions ydb/core/tx/columnshard/counters/aggregation/table_stats.cpp

This file was deleted.

46 changes: 29 additions & 17 deletions ydb/core/tx/columnshard/counters/aggregation/table_stats.h
Original file line number Diff line number Diff line change
@@ -1,37 +1,49 @@
#pragma once

#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/protos/table_stats.pb.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/core/tx/columnshard/counters/column_tables.h>
#include <ydb/core/tx/columnshard/counters/tablet_counters.h>
#include <ydb/core/tx/columnshard/counters/background_controller.h>
#include <ydb/core/tx/columnshard/counters/counters_manager.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/protos/table_stats.pb.h>

namespace NKikimr::NColumnShard {

class TTableStatsBuilder {
private:
::NKikimrTableStats::TTableStats& TableStats;
TCountersManager& Counters;
const NTabletFlatExecutor::NFlatExecutorSetup::IExecutor& Executor;
NOlap::IColumnEngine& ColumnEngine;

public:
TTableStatsBuilder(::NKikimrTableStats::TTableStats& tableStats)
: TableStats(tableStats) {
TTableStatsBuilder(
TCountersManager& counters, const NTabletFlatExecutor::NFlatExecutorSetup::IExecutor* executor, NOlap::IColumnEngine& columnEngine)
: Counters(counters)
, Executor(*executor)
, ColumnEngine(columnEngine) {
}

void FillColumnTableStats(const TSingleColumnTableCounters& stats);
void FillColumnTableStats(const TColumnTablesCounters& stats);

void FillTabletStats(const TTabletCountersHandle& stats);
void FillTableStats(ui64 pathId, ::NKikimrTableStats::TTableStats& tableStats) {
Counters.FillTableStats(pathId, tableStats);

void FillBackgroundControllerStats(const TBackgroundControllerCounters& stats, ui64 pathId);
void FillBackgroundControllerStats(const TBackgroundControllerCounters& stats);
auto columnEngineStats = ColumnEngine.GetStats().FindPtr(pathId);
if (columnEngineStats && *columnEngineStats) {
auto activeStats = (*columnEngineStats)->Active();
tableStats.SetRowCount(activeStats.Rows);
tableStats.SetDataSize(activeStats.Bytes);
tableStats.SetPartCount(activeStats.Portions);
}
}

void FillScanCountersStats(const TScanCounters& stats);
void FillTotalTableStats(::NKikimrTableStats::TTableStats& tableStats) {
Counters.FillTotalTableStats(tableStats);

void FillExecutorStats(const NTabletFlatExecutor::NFlatExecutorSetup::IExecutor& executor);
tableStats.SetInFlightTxCount(Executor.GetStats().TxInFly);
tableStats.SetHasLoanedParts(Executor.HasLoanedParts());

void FillColumnEngineStats(const NOlap::TColumnEngineStats& stats);
auto activeStats = ColumnEngine.GetTotalStats().Active();
tableStats.SetRowCount(activeStats.Rows);
tableStats.SetDataSize(activeStats.Bytes);
tableStats.SetPartCount(activeStats.Portions);
}
};

} // namespace NKikimr::NColumnShard
4 changes: 1 addition & 3 deletions ydb/core/tx/columnshard/counters/aggregation/ya.make
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
LIBRARY()

SRCS(
table_stats.cpp
)
SRCS()

PEERDIR(
ydb/core/protos
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/counters/column_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ class TSingleColumnTableCounters {
, TotalLastUpdateTime(owner.LastUpdateTime) {
}

void OnAccess() {
void OnReadEvent() {
UpdateLastAccessTime(TAppData::TimeProvider->Now());
}

void OnUpdate() {
void OnWriteEvent() {
TInstant now = TAppData::TimeProvider->Now();
UpdateLastUpdateTime(now);
UpdateLastAccessTime(now);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/counters/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ namespace NKikimr::NColumnShard {
TCSCounters::TCSCounters(std::shared_ptr<const TTabletCountersHandle> tabletCounters)
: TBase("CS")
, TabletCounters(std::move(tabletCounters))
, Initialization(*this) {
, Initialization(*this)
, TxProgress(*this) {
Y_ABORT_UNLESS(TabletCounters);

StartBackgroundCount = TBase::GetDeriviative("StartBackground/Count");
Expand Down
83 changes: 80 additions & 3 deletions ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#pragma once
#include "common/owner.h"

#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <ydb/core/tx/columnshard/counters/tablet_counters.h>

#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <util/generic/hash_set.h>
#include <ydb/core/tx/columnshard/counters/tablet_counters.h>

namespace NKikimr::NColumnShard {

Expand All @@ -30,7 +30,6 @@ class TCSInitialization: public TCommonCountersOwner {
const NMonitoring::THistogramPtr HistogramSwitchToWorkFromCreateDurationMs;

public:

void OnTxInitFinished(const TDuration d) const {
HistogramTxInitDurationMs->Collect(d.MilliSeconds());
}
Expand Down Expand Up @@ -66,6 +65,83 @@ class TCSInitialization: public TCommonCountersOwner {
}
};

class TTxProgressCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
using TOpType = TString;

class TProgressCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;

public:
NMonitoring::TDynamicCounters::TCounterPtr RegisterTx;
NMonitoring::TDynamicCounters::TCounterPtr RegisterTxWithDeadline;
NMonitoring::TDynamicCounters::TCounterPtr StartProposeOnExecute;
NMonitoring::TDynamicCounters::TCounterPtr StartProposeOnComplete;
NMonitoring::TDynamicCounters::TCounterPtr FinishProposeOnExecute;
NMonitoring::TDynamicCounters::TCounterPtr FinishProposeOnComplete;
NMonitoring::TDynamicCounters::TCounterPtr FinishPlannedTx;
NMonitoring::TDynamicCounters::TCounterPtr AbortTx;

TProgressCounters(TBase& owner)
: TBase(owner)
, RegisterTx(owner.GetDeriviative("RegisterTx"))
, RegisterTxWithDeadline(owner.GetDeriviative("RegisterTxWithDeadline"))
, StartProposeOnExecute(owner.GetDeriviative("StartProposeOnExecute"))
, StartProposeOnComplete(owner.GetDeriviative("StartProposeOnComplete"))
, FinishProposeOnExecute(owner.GetDeriviative("FinishProposeOnExecute"))
, FinishProposeOnComplete(owner.GetDeriviative("FinishProposeOnComplete"))
, FinishPlannedTx(owner.GetDeriviative("FinishPlannedTx"))
, AbortTx(owner.GetDeriviative("AbortTx")) {
}
};

THashMap<TOpType, TProgressCounters> SubGroups;

public:
void OnRegisterTx(const TOpType& opType) {
GetSubGroup(opType).RegisterTx->Add(1);
}

void OnRegisterTxWithDeadline(const TOpType& opType) {
GetSubGroup(opType).RegisterTxWithDeadline->Add(1);
}

void OnStartProposeOnExecute(const TOpType& opType) {
GetSubGroup(opType).StartProposeOnExecute->Add(1);
}

void OnStartProposeOnComplete(const TOpType& opType) {
GetSubGroup(opType).StartProposeOnComplete->Add(1);
}

void OnFinishProposeOnExecute(const TOpType& opType) {
GetSubGroup(opType).FinishProposeOnExecute->Add(1);
}

void OnFinishProposeOnComplete(const TOpType& opType) {
GetSubGroup(opType).FinishProposeOnComplete->Add(1);
}

void OnFinishPlannedTx(const TOpType& opType) {
GetSubGroup(opType).FinishPlannedTx->Add(1);
}

void OnAbortTx(const TOpType& opType) {
GetSubGroup(opType).AbortTx->Add(1);
}

TTxProgressCounters(TCommonCountersOwner& owner)
: TBase(owner, "txProgress") { // TODO: fix parameter name?
}

private:
TProgressCounters& GetSubGroup(const TOpType& opType) {
return SubGroups.try_emplace(opType, *this).first->second;
}
};

class TCSCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
Expand Down Expand Up @@ -122,6 +198,7 @@ class TCSCounters: public TCommonCountersOwner {

public:
const TCSInitialization Initialization;
TTxProgressCounters TxProgress;

void OnStartWriteRequest() const {
WriteRequests->Add(1);
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/tx/columnshard/counters/counters_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ class TCountersManager {
, ColumnTablesCounters(std::make_shared<TColumnTablesCounters>())
, SubscribeCounters(std::make_shared<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>()) {
}

void FillTableStats(ui64 pathId, ::NKikimrTableStats::TTableStats& tableStats) {
ColumnTablesCounters->GetPathIdCounter(pathId)->FillStats(tableStats);
BackgroundControllerCounters->FillStats(pathId, tableStats);
}

void FillTotalTableStats(::NKikimrTableStats::TTableStats& tableStats) {
ColumnTablesCounters->FillStats(tableStats);
TabletCounters->FillStats(tableStats);
BackgroundControllerCounters->FillTotalStats(tableStats);
ScanCounters.FillStats(tableStats);
}
};

} // namespace NKikimr::NColumnShard
Loading

0 comments on commit 5578d6c

Please sign in to comment.