Skip to content

Commit

Permalink
add missing counter stats
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 committed Jul 26, 2024
1 parent 6928da4 commit e676d99
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 50 deletions.
18 changes: 18 additions & 0 deletions ydb/core/tx/columnshard/background_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ void TBackgroundController::CheckDeadlinesIndexation() {
}
}

TInstant TBackgroundController::GetLastCompactionFinishInstant(ui64 pathId) const {
auto findInstant = LastCompactionFinishInstants.find(pathId);
if (findInstant.IsEnd()) {
return TInstant::Zero();
}
return findInstant->second;
}

TInstant TBackgroundController::GetLastCompactionFinishInstant() const {
TInstant maxInstant = TInstant::Zero();
for (const auto& [pathId, instant] : LastCompactionFinishInstants) {
if (maxInstant < instant) {
maxInstant = instant;
}
}
return maxInstant;
}

void TBackgroundController::StartIndexing(const NOlap::TColumnEngineChanges& changes) {
LastIndexationInstant = TMonotonic::Now();
Y_ABORT_UNLESS(ActiveIndexationTasks.emplace(changes.GetTaskIdentifier(), TMonotonic::Now()).second);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/background_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class TBackgroundController {

using TCurrentCompaction = THashMap<ui64, NOlap::TPlanCompactionInfo>;
TCurrentCompaction ActiveCompactionInfo;
THashMap<ui64, TInstant> LastCompactionFinishInstants; // pathId to finish time

bool ActiveCleanupPortions = false;
bool ActiveCleanupTables = false;
Expand All @@ -29,13 +30,17 @@ class TBackgroundController {
bool StartCompaction(const NOlap::TPlanCompactionInfo& info);
void FinishCompaction(const NOlap::TPlanCompactionInfo& info) {
Y_ABORT_UNLESS(ActiveCompactionInfo.erase(info.GetPathId()));
TInstant& lastFinishInstant = LastCompactionFinishInstants[info.GetPathId()];
lastFinishInstant = std::max(lastFinishInstant, TInstant::Now());
}
const TCurrentCompaction& GetActiveCompaction() const {
return ActiveCompactionInfo;
}
ui32 GetCompactionsCount() const {
return ActiveCompactionInfo.size();
}
TInstant GetLastCompactionFinishInstant(ui64 pathId) const;
TInstant GetLastCompactionFinishInstant() const;

void StartIndexing(const NOlap::TColumnEngineChanges& changes);
void FinishIndexing(const NOlap::TColumnEngineChanges& changes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
Self->CSCounters.OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->CSCounters.OnSuccessWriteResponse();
}

Self->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
}

}
162 changes: 124 additions & 38 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,18 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon
InFlightReadsTracker.RemoveInFlightRequest(ev->Get()->RequestCookie, index);

ui64 txId = ev->Get()->TxId;
bool success = ev->Get()->Success;
if (ScanTxInFlight.contains(txId)) {
TDuration duration = TAppData::TimeProvider->Now() - ScanTxInFlight[txId];
IncCounter(COUNTER_SCAN_LATENCY, duration);
ScanTxInFlight.erase(txId);
SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size());
IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
if (success) {
IncCounter(COUNTER_READ_SUCCESS);
} else {
IncCounter(COUNTER_READ_FAIL);
}
}
}

Expand Down Expand Up @@ -309,30 +315,108 @@ void TColumnShard::UpdateResourceMetrics(const TActorContext& ctx, const TUsage&
metrics->TryUpdate(ctx);
}

void TColumnShard::ConfigureStats(const NOlap::TColumnEngineStats& indexStats,
::NKikimrTableStats::TTableStats* tabletStats) {
NOlap::TSnapshot lastIndexUpdate = TablesManager.GetPrimaryIndexSafe().LastUpdate();
auto activeIndexStats = indexStats.Active(); // data stats excluding inactive and evicted
std::optional<TColumnShard::TAggregatedTableStats> TColumnShard::CollectTableStats() const {
if (!TablesManager.HasPrimaryIndex()) {
return std::nullopt;
}

const TMap<ui64, std::shared_ptr<NOlap::TColumnEngineStats>>& columnEngineStats =
TablesManager.GetPrimaryIndexSafe().GetStats();
TAggregatedTableStats resultStats;

// TODO: Pull dataStats collected via BuildStats. They are shared between all patIds.

if (activeIndexStats.Rows < 0 || activeIndexStats.Bytes < 0) {
LOG_S_WARN("Negative stats counter. Rows: " << activeIndexStats.Rows << " Bytes: " << activeIndexStats.Bytes
<< TabletID());
for (const auto& [pathId, tableInfo] : TablesManager.GetTables()) {
TColumnTableStats& tableStats = resultStats.StatsByPathId[pathId];
tableStats.AccessTime = tableInfo.GetLastAccessTime();
tableStats.UpdateTime = tableInfo.GetLastUpdateTime();
tableStats.LastFullCompaction = BackgroundController.GetLastCompactionFinishInstant(pathId);

activeIndexStats.Rows = (activeIndexStats.Rows < 0) ? 0 : activeIndexStats.Rows;
activeIndexStats.Bytes = (activeIndexStats.Bytes < 0) ? 0 : activeIndexStats.Bytes;
auto findEngineStats = columnEngineStats.FindPtr(pathId);
if (findEngineStats && *findEngineStats) {
NOlap::TColumnEngineStats::TPortionsStats portionsStats =
(*findEngineStats)->Active(); // data stats excluding inactive and evicted

if (portionsStats.Rows < 0 || portionsStats.Bytes < 0) {
LOG_S_WARN(
"Negative stats counter. Rows: " << portionsStats.Rows << " Bytes: " << portionsStats.Bytes
<< TabletID()
);

portionsStats.Rows = (portionsStats.Rows < 0) ? 0 : portionsStats.Rows;
portionsStats.Bytes = (portionsStats.Bytes < 0) ? 0 : portionsStats.Bytes;
}

// Count rows and bytes from portions only, ignoring data stored in InsertTable
tableStats.RowCount = portionsStats.Rows;
tableStats.DataSize = portionsStats.Bytes;
} else {
LOG_S_ERROR("CollectTableStats: missing column engine stats for pathId " << pathId);
}

if (resultStats.TotalStats.AccessTime < tableStats.AccessTime) {
resultStats.TotalStats.AccessTime = tableStats.AccessTime;
}
if (resultStats.TotalStats.UpdateTime < tableStats.UpdateTime) {
resultStats.TotalStats.UpdateTime = tableStats.UpdateTime;
}
if (resultStats.TotalStats.LastFullCompaction < tableStats.LastFullCompaction) {
resultStats.TotalStats.LastFullCompaction = tableStats.LastFullCompaction;
}
// TODO: When dataStats are included, don't aggregate rowCount and dataSize from individual pathIds.
resultStats.TotalStats.RowCount += tableStats.RowCount;
resultStats.TotalStats.DataSize += tableStats.DataSize;
}

tabletStats->SetRowCount(activeIndexStats.Rows);
tabletStats->SetDataSize(activeIndexStats.Bytes + TabletCounters->Simple()[COUNTER_COMMITTED_BYTES].Get());
return resultStats;
}

void TColumnShard::ConfigureStats(const TColumnTableStats& inputStats, ::NKikimrTableStats::TTableStats* outputStats) {
Y_ABORT_UNLESS(outputStats);

// TODO: we need row/dataSize counters for evicted data (managed by tablet but stored outside)
// tabletStats->SetIndexSize(); // TODO: calc size of internal tables
outputStats->SetRowCount(inputStats.RowCount);
outputStats->SetDataSize(inputStats.DataSize);

tabletStats->SetLastAccessTime(LastAccessTime.MilliSeconds());
tabletStats->SetLastUpdateTime(lastIndexUpdate.GetPlanStep());
// TODO: we need row/dataSize counters for evicted data (managed by tablet but stored outside)
// tabletStats->SetIndexSize(ti.Stats.IndexSize); // TODO: calc size of internal tables
// tabletStats->SetInMemSize(ti.Stats.MemDataSize);

// TMap<ui8, std::tuple<ui64, ui64>> channels; // Channel -> (DataSize, IndexSize)
// for (size_t channel = 0; channel < ti.Stats.DataStats.DataSize.ByChannel.size(); channel++) {
// if (ti.Stats.DataStats.DataSize.ByChannel[channel]) {
// std::get<0>(channels[channel]) = ti.Stats.DataStats.DataSize.ByChannel[channel];
// }
// }
// for (size_t channel = 0; channel < ti.Stats.DataStats.IndexSize.ByChannel.size(); channel++) {
// if (ti.Stats.DataStats.IndexSize.ByChannel[channel]) {
// std::get<1>(channels[channel]) = ti.Stats.DataStats.IndexSize.ByChannel[channel];
// }
// }
// for (auto p : channels) {
// auto item = ev->Record.MutableTableStats()->AddChannels();
// item->SetChannel(p.first);
// item->SetDataSize(std::get<0>(p.second));
// item->SetIndexSize(std::get<1>(p.second));
// }

outputStats->SetLastAccessTime(inputStats.AccessTime.MilliSeconds());
outputStats->SetLastUpdateTime(inputStats.UpdateTime.MilliSeconds());

outputStats->SetRowUpdates(TabletCounters->Cumulative()[COUNTER_WRITE_SUCCESS].Get());
outputStats->SetRowDeletes(0);
outputStats->SetRowReads(0); // all reads are range reads
outputStats->SetRangeReads(TabletCounters->Cumulative()[COUNTER_READ_SUCCESS].Get());
outputStats->SetRangeReadRows(TabletCounters->Cumulative()[COUNTER_READ_INDEX_ROWS].Get());

// ev->Record.MutableTableStats()->SetPartCount(ti.Stats.PartCount);
// ev->Record.MutableTableStats()->SetSearchHeight(ti.Stats.SearchHeight);

outputStats->SetLastFullCompactionTs(inputStats.LastFullCompaction.Seconds());
outputStats->SetHasLoanedParts(Executor()->HasLoanedParts());
}

void TColumnShard::FillTxTableStats(::NKikimrTableStats::TTableStats* tableStats) const {
Y_ABORT_UNLESS(tableStats);
tableStats->SetImmediateTxCompleted(TabletCounters->Cumulative()[COUNTER_IMMEDIATE_TX_COMPLETED].Get());
tableStats->SetTxRejectedByOverload(TabletCounters->Cumulative()[COUNTER_WRITE_OVERLOAD].Get());
tableStats->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_OUT_OF_SPACE].Get());
Expand All @@ -341,7 +425,11 @@ void TColumnShard::FillTxTableStats(::NKikimrTableStats::TTableStats* tableStats
tableStats->SetInFlightTxCount(Executor()->GetStats().TxInFly);
}

void TColumnShard::FillOlapStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev) {
void TColumnShard::FillOlapStats(
const TActorContext& ctx,
const std::optional<TAggregatedTableStats>& tableStats,
std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev
) {
ev->Record.SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready
ev->Record.SetGeneration(Executor()->Generation());
ev->Record.SetRound(StatsReportRound++);
Expand All @@ -350,30 +438,27 @@ void TColumnShard::FillOlapStats(const TActorContext& ctx, std::unique_ptr<TEvDa
if (auto* resourceMetrics = Executor()->GetResourceMetrics()) {
resourceMetrics->Fill(*ev->Record.MutableTabletMetrics());
}
auto* tabletStats = ev->Record.MutableTableStats();
FillTxTableStats(tabletStats);
if (TablesManager.HasPrimaryIndex()) {
const auto& indexStats = TablesManager.MutablePrimaryIndex().GetTotalStats();
ConfigureStats(indexStats, tabletStats);
auto* outputTableStats = ev->Record.MutableTableStats();
FillTxTableStats(outputTableStats);
if (tableStats) {
ConfigureStats(tableStats->TotalStats, outputTableStats);
}
}

void TColumnShard::FillColumnTableStats(const TActorContext& ctx,
std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev) {
if (!TablesManager.HasPrimaryIndex()) {
void TColumnShard::FillColumnTableStats(
const TActorContext& ctx,
const std::optional<TAggregatedTableStats>& tableStats,
std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev
) {
if (!tableStats) {
return;
}
const auto& tablesIndexStats = TablesManager.MutablePrimaryIndex().GetStats();
LOG_S_DEBUG("There are stats for " << tablesIndexStats.size() << " tables");
for (const auto& [tableLocalID, columnStats] : tablesIndexStats) {
if (!columnStats) {
LOG_S_ERROR("SendPeriodicStats: empty stats");
continue;
}

LOG_S_DEBUG("There are stats for " << tableStats->StatsByPathId.size() << " tables");
for (const auto& [pathId, columnStats] : tableStats->StatsByPathId) {
auto* periodicTableStats = ev->Record.AddTables();
periodicTableStats->SetDatashardId(TabletID());
periodicTableStats->SetTableLocalId(tableLocalID);
periodicTableStats->SetTableLocalId(pathId);

periodicTableStats->SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready
periodicTableStats->SetGeneration(Executor()->Generation());
Expand All @@ -385,11 +470,11 @@ void TColumnShard::FillColumnTableStats(const TActorContext& ctx,
resourceMetrics->Fill(*periodicTableStats->MutableTabletMetrics());
}

auto* tableStats = periodicTableStats->MutableTableStats();
FillTxTableStats(tableStats);
ConfigureStats(*columnStats, tableStats);
auto* outputTableStats = periodicTableStats->MutableTableStats();
FillTxTableStats(outputTableStats);
ConfigureStats(columnStats, outputTableStats);

LOG_S_TRACE("Add stats for table, tableLocalID=" << tableLocalID);
LOG_S_TRACE("Add stats for table, tableLocalID=" << pathId);
}
}

Expand All @@ -416,10 +501,11 @@ void TColumnShard::SendPeriodicStats() {
StatsReportPipe = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, CurrentSchemeShardId, clientConfig));
}

std::optional<TAggregatedTableStats> aggregatedStats = CollectTableStats();
auto ev = std::make_unique<TEvDataShard::TEvPeriodicTableStats>(TabletID(), OwnerPathId);

FillOlapStats(ctx, ev);
FillColumnTableStats(ctx, ev);
FillOlapStats(ctx, aggregatedStats, ev);
FillColumnTableStats(ctx, aggregatedStats, ev);

NTabletPipe::SendData(ctx, StatsReportPipe, ev.release());
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvScan::TPtr& ev, const TActorContext
return;
}

LastAccessTime = TAppData::TimeProvider->Now();
TablesManager.RegisterAccess(record.GetLocalPathId());
ScanTxInFlight.insert({txId, LastAccessTime});
SetCounter(COUNTER_SCAN_IN_FLY, ScanTxInFlight.size());
Execute(new NOlap::NReader::TTxScan(this, ev), ctx);
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteDraft::TPtr& ev, const TActorConte

void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
CSCounters.OnStartWriteRequest();
LastAccessTime = TAppData::TimeProvider->Now();

const auto& record = Proto(ev->Get());
const ui64 tableId = record.GetTableId();
Expand All @@ -162,6 +161,9 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
const TString dedupId = record.GetDedupId();
const auto source = ev->Sender;

TablesManager.RegisterAccess(tableId);
TablesManager.RegisterUpdate(tableId);

std::optional<ui32> granuleShardingVersion;
if (record.HasGranuleShardingVersion()) {
granuleShardingVersion = record.GetGranuleShardingVersion();
Expand Down
20 changes: 17 additions & 3 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,19 @@ class TColumnShard
std::optional<ui32> GranuleShardingVersionId;
};

struct TColumnTableStats {
ui64 RowCount = 0;
ui64 DataSize = 0;
TInstant AccessTime;
TInstant UpdateTime;
TInstant LastFullCompaction;
};

struct TAggregatedTableStats {
TColumnTableStats TotalStats;
THashMap<ui64, TColumnTableStats> StatsByPathId;
};

class TWritesMonitor {
private:
TColumnShard& Owner;
Expand Down Expand Up @@ -586,10 +599,11 @@ class TColumnShard
void UpdateResourceMetrics(const TActorContext& ctx, const TUsage& usage);
ui64 MemoryUsage() const;

std::optional<TAggregatedTableStats> CollectTableStats() const;
void SendPeriodicStats();
void FillOlapStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev);
void FillColumnTableStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev);
void ConfigureStats(const NOlap::TColumnEngineStats& indexStats, ::NKikimrTableStats::TTableStats* tabletStats);
void FillOlapStats(const TActorContext& ctx, const std::optional<TAggregatedTableStats>& tableStats, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev);
void FillColumnTableStats(const TActorContext& ctx, const std::optional<TAggregatedTableStats>& tableStats, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev);
void ConfigureStats(const TColumnTableStats& inputStats, ::NKikimrTableStats::TTableStats* outputStats);
void FillTxTableStats(::NKikimrTableStats::TTableStats* tableStats) const;

public:
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/columnshard_private_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,13 @@ struct TEvPrivate {
};

struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {
explicit TEvReadFinished(ui64 requestCookie, ui64 txId = 0)
: RequestCookie(requestCookie), TxId(txId)
explicit TEvReadFinished(ui64 requestCookie, ui64 txId, bool success)
: RequestCookie(requestCookie), TxId(txId), Success(success)
{}

ui64 RequestCookie;
ui64 TxId;
bool Success;
};

struct TEvPeriodicWakeup : public TEventLocal<TEvPeriodicWakeup, EvPeriodicWakeup> {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ void TColumnShardScan::Finish(const NColumnShard::TScanCounters::EStatusFinish s
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " finished for tablet " << TabletId);

Send(ColumnShardActorId, new NColumnShard::TEvPrivate::TEvReadFinished(RequestCookie, TxId));
bool success = (status == NColumnShard::TScanCounters::EStatusFinish::Success);
Send(ColumnShardActorId, new NColumnShard::TEvPrivate::TEvReadFinished(RequestCookie, TxId, success));
AFL_VERIFY(StartInstant);
ScanCountersPool.OnScanDuration(status, TMonotonic::Now() - *StartInstant);
ReportStats();
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/hooks/testing/ro_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ class TReadOnlyController: public ICSController {

void WaitIndexation(const TDuration d) const {
TInstant start = TInstant::Now();
ui32 compactionsStart = GetInsertStartedCounter().Val();
ui32 indexationStart = GetInsertStartedCounter().Val();
while (Now() - start < d) {
if (compactionsStart != GetInsertStartedCounter().Val()) {
compactionsStart = GetInsertStartedCounter().Val();
if (indexationStart != GetInsertStartedCounter().Val()) {
indexationStart = GetInsertStartedCounter().Val();
start = TInstant::Now();
}
Cerr << "WAIT_INDEXATION: " << GetInsertStartedCounter().Val() << Endl;
Expand Down
Loading

0 comments on commit e676d99

Please sign in to comment.