Skip to content

Commit

Permalink
CS statistics (Rows + Bytes) (ydb-platform#14549)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 authored Feb 14, 2025
1 parent d0a16a7 commit f97d523
Show file tree
Hide file tree
Showing 8 changed files with 629 additions and 168 deletions.
32 changes: 28 additions & 4 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,36 @@ void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, b
auto* taskStats = dst->MutableTasks(0);
auto* tableStats = taskStats->AddTables();

auto& sourceStats = *taskStats->AddSources();

// sourceStats.SetInputIndex(0); // do not have real input index
sourceStats.SetIngressName("CS");

auto& ingressStats = *sourceStats.MutableIngress();

tableStats->SetTablePath(ScanData->TablePath);

if (auto* x = ScanData->BasicStats.get()) {
tableStats->SetReadRows(x->Rows);
tableStats->SetReadBytes(x->Bytes);
tableStats->SetAffectedPartitions(x->AffectedShards);
if (auto* stats = ScanData->BasicStats.get()) {
ingressStats.SetRows(stats->Rows);
ingressStats.SetBytes(stats->Bytes);
ingressStats.SetFirstMessageMs(stats->FirstMessageMs);
ingressStats.SetLastMessageMs(stats->LastMessageMs);

for (auto& [shardId, stat] : stats->ExternalStats) {
auto& externalStat = *sourceStats.AddExternalPartitions();
externalStat.SetPartitionId(ToString(shardId));
externalStat.SetExternalRows(stat.ExternalRows);
externalStat.SetExternalBytes(stat.ExternalBytes);
externalStat.SetFirstMessageMs(stat.FirstMessageMs);
externalStat.SetLastMessageMs(stat.LastMessageMs);
}

taskStats->SetIngressRows(taskStats->GetIngressRows() + stats->Rows);
taskStats->SetIngressBytes(taskStats->GetIngressBytes() + stats->Bytes);

tableStats->SetReadRows(stats->Rows);
tableStats->SetReadBytes(stats->Bytes);
tableStats->SetAffectedPartitions(stats->AffectedShards);
// TODO: CpuTime
}

Expand Down
127 changes: 114 additions & 13 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,9 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
FinishTimeMs.resize(taskCount);
StartTimeMs.resize(taskCount);
DurationUs.resize(taskCount);
WaitInputTimeUs.resize(taskCount);
WaitOutputTimeUs.resize(taskCount);

WaitInputTimeUs.Resize(taskCount);
WaitOutputTimeUs.Resize(taskCount);

SpillingComputeBytes.Resize(taskCount);
SpillingChannelBytes.Resize(taskCount);
Expand All @@ -215,6 +216,10 @@ void TStageExecutionStats::SetHistorySampleCount(ui32 historySampleCount) {
HistorySampleCount = historySampleCount;
CpuTimeUs.HistorySampleCount = historySampleCount;
MaxMemoryUsage.HistorySampleCount = historySampleCount;

WaitInputTimeUs.HistorySampleCount = historySampleCount;
WaitOutputTimeUs.HistorySampleCount = historySampleCount;

SpillingComputeBytes.HistorySampleCount = historySampleCount;
SpillingChannelBytes.HistorySampleCount = historySampleCount;
SpillingComputeTimeUs.HistorySampleCount = historySampleCount;
Expand Down Expand Up @@ -252,6 +257,12 @@ void TStageExecutionStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqSta
if (stageStats.HasMaxMemoryUsage()) {
MaxMemoryUsage.ExportHistory(baseTimeMs, *stageStats.MutableMaxMemoryUsage());
}
if (stageStats.HasWaitInputTimeUs()) {
WaitInputTimeUs.ExportHistory(baseTimeMs, *stageStats.MutableWaitInputTimeUs());
}
if (stageStats.HasWaitOutputTimeUs()) {
WaitOutputTimeUs.ExportHistory(baseTimeMs, *stageStats.MutableWaitOutputTimeUs());
}
if (stageStats.HasSpillingComputeBytes()) {
SpillingComputeBytes.ExportHistory(baseTimeMs, *stageStats.MutableSpillingComputeBytes());
}
Expand Down Expand Up @@ -346,8 +357,8 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
baseTimeMs = NonZeroMin(baseTimeMs, finishTimeMs);

SetNonZero(DurationUs[index], durationUs);
SetNonZero(WaitInputTimeUs[index], taskStats.GetWaitInputTimeUs());
SetNonZero(WaitOutputTimeUs[index], taskStats.GetWaitOutputTimeUs());
WaitInputTimeUs.SetNonZero(index, taskStats.GetWaitInputTimeUs());
WaitOutputTimeUs.SetNonZero(index, taskStats.GetWaitOutputTimeUs());

SpillingComputeBytes.SetNonZero(index, taskStats.GetSpillingComputeWriteBytes());
SpillingChannelBytes.SetNonZero(index, taskStats.GetSpillingChannelWriteBytes());
Expand Down Expand Up @@ -466,6 +477,13 @@ TProgressStatEntry operator - (const TProgressStatEntry& l, const TProgressStatE
};
}

void MergeAggr(NDqProto::TDqStatsAggr& aggr, const NDqProto::TDqStatsAggr& stat) noexcept {
aggr.SetMin(NonZeroMin(aggr.GetMin(), stat.GetMin()));
aggr.SetMax(std::max(aggr.GetMax(), stat.GetMax()));
aggr.SetSum(aggr.GetSum() + stat.GetSum());
aggr.SetCnt(aggr.GetCnt() + stat.GetCnt());
}

void UpdateAggr(NDqProto::TDqStatsAggr* aggr, ui64 value) noexcept {
if (value) {
if (aggr->GetMin() == 0) {
Expand All @@ -479,6 +497,19 @@ void UpdateAggr(NDqProto::TDqStatsAggr* aggr, ui64 value) noexcept {
}
}

void MergeExternal(NDqProto::TDqExternalAggrStats& asyncAggr, const NDqProto::TDqExternalAggrStats& asyncStat) noexcept {
MergeAggr(*asyncAggr.MutableExternalRows(), asyncStat.GetExternalRows());
MergeAggr(*asyncAggr.MutableExternalBytes(), asyncStat.GetExternalBytes());
MergeAggr(*asyncAggr.MutableStorageRows(), asyncStat.GetStorageRows());
MergeAggr(*asyncAggr.MutableStorageBytes(), asyncStat.GetStorageBytes());
MergeAggr(*asyncAggr.MutableCpuTimeUs(), asyncStat.GetCpuTimeUs());
MergeAggr(*asyncAggr.MutableWaitInputTimeUs(), asyncStat.GetWaitInputTimeUs());
MergeAggr(*asyncAggr.MutableWaitOutputTimeUs(), asyncStat.GetWaitOutputTimeUs());
MergeAggr(*asyncAggr.MutableFirstMessageMs(), asyncStat.GetFirstMessageMs());
MergeAggr(*asyncAggr.MutableLastMessageMs(), asyncStat.GetLastMessageMs());
asyncAggr.SetPartitionCount(asyncAggr.GetPartitionCount() + asyncStat.GetExternalRows().GetCnt());
}

ui64 UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDqAsyncBufferStats& asyncStat) noexcept {
ui64 baseTimeMs = 0;

Expand Down Expand Up @@ -652,9 +683,27 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask(
FillStageDurationUs(*stageStats);

for (auto& sourcesStat : task.GetSources()) {
BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutableIngress(), sourcesStat.GetIngress()));
BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePush(), sourcesStat.GetPush()));
BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePop(), sourcesStat.GetPop()));
auto& ingress = (*stageStats->MutableIngress())[sourcesStat.GetIngressName()];
MergeExternal(*ingress.MutableExternal(), sourcesStat.GetExternal());

const auto& [it, inserted] = ExternalPartitionStats.emplace(stageStats->GetStageId(), sourcesStat.GetIngressName());
auto& externalPartitionStat = it->second;

for (auto& externalPartition : sourcesStat.GetExternalPartitions()) {
const auto& [it, inserted] = externalPartitionStat.Stat.emplace(externalPartition.GetPartitionId(),
TExternalPartitionStat(externalPartition.GetExternalRows(), externalPartition.GetExternalBytes(),
externalPartition.GetFirstMessageMs(), externalPartition.GetLastMessageMs()));
if (!inserted) {
it->second.ExternalRows += externalPartition.GetExternalRows();
it->second.ExternalBytes += externalPartition.GetExternalBytes();
it->second.FirstMessageMs = NonZeroMin(it->second.FirstMessageMs, externalPartition.GetFirstMessageMs());
it->second.LastMessageMs = std::max(it->second.LastMessageMs, externalPartition.GetLastMessageMs());
}
}

BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*ingress.MutableIngress(), sourcesStat.GetIngress()));
BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*ingress.MutablePush(), sourcesStat.GetPush()));
BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*ingress.MutablePop(), sourcesStat.GetPop()));
}
for (auto& inputChannelStat : task.GetInputChannels()) {
BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePush(), inputChannelStat.GetPush()));
Expand Down Expand Up @@ -823,9 +872,14 @@ void TQueryExecutionStats::AddDatashardFullStatsByTask(
UpdateAggr(stageStats->MutableFinishTimeMs(), finishTimeMs);
BaseTimeMs = NonZeroMin(BaseTimeMs, finishTimeMs);

FillStageDurationUs(*stageStats);
UpdateAggr(stageStats->MutableWaitInputTimeUs(), task.GetWaitInputTimeUs());
UpdateAggr(stageStats->MutableWaitOutputTimeUs(), task.GetWaitOutputTimeUs());
FillStageDurationUs(*stageStats);

UpdateAggr(stageStats->MutableSpillingComputeBytes(), task.GetSpillingComputeWriteBytes());
UpdateAggr(stageStats->MutableSpillingChannelBytes(), task.GetSpillingChannelWriteBytes());
UpdateAggr(stageStats->MutableSpillingComputeTimeUs(), task.GetSpillingComputeReadTimeUs() + task.GetSpillingComputeWriteTimeUs());
UpdateAggr(stageStats->MutableSpillingChannelTimeUs(), task.GetSpillingChannelReadTimeUs() + task.GetSpillingChannelWriteTimeUs());

for (auto& tableStats: task.GetTables()) {
auto* tableAggrStats = GetOrCreateTableAggrStats(stageStats, tableStats.GetTablePath());
Expand Down Expand Up @@ -1094,8 +1148,8 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
ExportOffsetAggStats(stageStat.StartTimeMs, *stageStats.MutableStartTimeMs(), BaseTimeMs);
ExportOffsetAggStats(stageStat.FinishTimeMs, *stageStats.MutableFinishTimeMs(), BaseTimeMs);
ExportAggStats(stageStat.DurationUs, *stageStats.MutableDurationUs());
ExportAggStats(stageStat.WaitInputTimeUs, *stageStats.MutableWaitInputTimeUs());
ExportAggStats(stageStat.WaitOutputTimeUs, *stageStats.MutableWaitOutputTimeUs());
stageStat.WaitInputTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableWaitInputTimeUs());
stageStat.WaitOutputTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableWaitOutputTimeUs());

stageStat.SpillingComputeBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingComputeBytes());
stageStat.SpillingChannelBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingChannelBytes());
Expand Down Expand Up @@ -1152,6 +1206,15 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
}
}

void TQueryExecutionStats::AdjustExternalAggr(NYql::NDqProto::TDqExternalAggrStats& stats) {
if (stats.HasFirstMessageMs()) {
AdjustDqStatsAggr(*stats.MutableFirstMessageMs());
}
if (stats.HasLastMessageMs()) {
AdjustDqStatsAggr(*stats.MutableLastMessageMs());
}
}

void TQueryExecutionStats::AdjustAsyncAggr(NYql::NDqProto::TDqAsyncStatsAggr& stats) {
if (stats.HasFirstMessageMs()) {
AdjustDqStatsAggr(*stats.MutableFirstMessageMs());
Expand All @@ -1168,6 +1231,9 @@ void TQueryExecutionStats::AdjustAsyncAggr(NYql::NDqProto::TDqAsyncStatsAggr& st
}

void TQueryExecutionStats::AdjustAsyncBufferAggr(NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
if (stats.HasExternal()) {
AdjustExternalAggr(*stats.MutableExternal());
}
if (stats.HasIngress()) {
AdjustAsyncAggr(*stats.MutableIngress());
}
Expand All @@ -1184,13 +1250,15 @@ void TQueryExecutionStats::AdjustAsyncBufferAggr(NYql::NDqProto::TDqAsyncBufferS

void TQueryExecutionStats::AdjustDqStatsAggr(NYql::NDqProto::TDqStatsAggr& stats) {
if (auto min = stats.GetMin()) {
stats.SetMin(min - BaseTimeMs);
stats.SetMin(min > BaseTimeMs ? min - BaseTimeMs : 0);
}
if (auto max = stats.GetMax()) {
stats.SetMax(max - BaseTimeMs);
stats.SetMax(max > BaseTimeMs ? max - BaseTimeMs : 0);
}
if (auto cnt = stats.GetCnt()) {
stats.SetSum(stats.GetSum() - BaseTimeMs * cnt);
auto sum = stats.GetSum();
auto baseSum = BaseTimeMs * cnt;
stats.SetSum(sum > baseSum ? sum - baseSum : 0);
}
}

Expand Down Expand Up @@ -1222,6 +1290,39 @@ void TQueryExecutionStats::Finish() {
for (auto& [stageId, stagetype] : TasksGraph->GetStagesInfo()) {
auto stageStats = GetOrCreateStageStats(stageId, *TasksGraph, *Result);
stageStats->SetBaseTimeMs(BaseTimeMs);

if (ExternalPartitionStats.contains(stageStats->GetStageId())) {
auto& externalPartitionStat = ExternalPartitionStats[stageStats->GetStageId()];
auto& ingress = (*stageStats->MutableIngress())[externalPartitionStat.Name];
auto& external = *ingress.MutableExternal();
for (auto& [partitionId, partitionStat] : externalPartitionStat.Stat) {
auto& externalRows = *external.MutableExternalRows();
externalRows.SetMin(NonZeroMin(externalRows.GetMin(), partitionStat.ExternalRows));
externalRows.SetMax(std::max(externalRows.GetMax(), partitionStat.ExternalRows));
externalRows.SetSum(externalRows.GetSum() + partitionStat.ExternalRows);
externalRows.SetCnt(externalRows.GetCnt() + 1);

auto& externalBytes = *external.MutableExternalBytes();
externalBytes.SetMin(NonZeroMin(externalBytes.GetMin(), partitionStat.ExternalBytes));
externalBytes.SetMax(std::max(externalBytes.GetMax(), partitionStat.ExternalBytes));
externalBytes.SetSum(externalBytes.GetSum() + partitionStat.ExternalBytes);
externalBytes.SetCnt(externalBytes.GetCnt() + 1);

auto& firstMessageMs = *external.MutableFirstMessageMs();
firstMessageMs.SetMin(NonZeroMin(firstMessageMs.GetMin(), partitionStat.FirstMessageMs));
firstMessageMs.SetMax(std::max(firstMessageMs.GetMax(), partitionStat.FirstMessageMs));
firstMessageMs.SetSum(firstMessageMs.GetSum() + partitionStat.FirstMessageMs);
firstMessageMs.SetCnt(firstMessageMs.GetCnt() + 1);

auto& lastMessageMs = *external.MutableLastMessageMs();
lastMessageMs.SetMin(NonZeroMin(lastMessageMs.GetMin(), partitionStat.LastMessageMs));
lastMessageMs.SetMax(std::max(lastMessageMs.GetMax(), partitionStat.LastMessageMs));
lastMessageMs.SetSum(lastMessageMs.GetSum() + partitionStat.LastMessageMs);
lastMessageMs.SetCnt(lastMessageMs.GetCnt() + 1);
}
external.SetPartitionCount(external.GetPartitionCount() + externalPartitionStat.Stat.size());
}

AdjustBaseTime(stageStats);
auto it = StageStats.find(stageId.StageId);
if (it != StageStats.end()) {
Expand Down
34 changes: 32 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ struct TAsyncBufferStats {
void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats);
};

struct TIngressStats : public TAsyncBufferStats {

TIngressStats() = default;
TIngressStats(ui32 taskCount) {
Resize(taskCount);
}

void Resize(ui32 taskCount);
};

struct TTableStats {

TTableStats() = default;
Expand Down Expand Up @@ -125,8 +135,8 @@ struct TStageExecutionStats {
std::vector<ui64> FinishTimeMs;
std::vector<ui64> StartTimeMs;
std::vector<ui64> DurationUs;
std::vector<ui64> WaitInputTimeUs;
std::vector<ui64> WaitOutputTimeUs;
TTimeSeriesStats WaitInputTimeUs;
TTimeSeriesStats WaitOutputTimeUs;

TTimeSeriesStats SpillingComputeBytes;
TTimeSeriesStats SpillingChannelBytes;
Expand All @@ -153,14 +163,34 @@ struct TStageExecutionStats {
ui64 UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs);
};

struct TExternalPartitionStat {
ui64 ExternalRows;
ui64 ExternalBytes;
ui64 FirstMessageMs;
ui64 LastMessageMs;
TExternalPartitionStat() = default;
TExternalPartitionStat(ui64 externalRows, ui64 externalBytes, ui64 firstMessageMs, ui64 lastMessageMs)
: ExternalRows(externalRows), ExternalBytes(externalBytes), FirstMessageMs(firstMessageMs), LastMessageMs(lastMessageMs)
{}
};

struct TIngressExternalPartitionStat {
TString Name;
std::map<TString, TExternalPartitionStat> Stat;
TIngressExternalPartitionStat() = default;
TIngressExternalPartitionStat(const TString& name) : Name(name) {}
};

struct TQueryExecutionStats {
private:
std::map<ui32, std::map<ui32, ui32>> ShardsCountByNode;
std::map<ui32, bool> UseLlvmByStageId;
std::map<ui32, TStageExecutionStats> StageStats;
std::map<ui32, TIngressExternalPartitionStat> ExternalPartitionStats; // FIXME: several ingresses
ui64 BaseTimeMs = 0;
void ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto::TDqAsyncStatsAggr& stats);
void ExportAggAsyncBufferStats(TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats);
void AdjustExternalAggr(NYql::NDqProto::TDqExternalAggrStats& stats);
void AdjustAsyncAggr(NYql::NDqProto::TDqAsyncStatsAggr& stats);
void AdjustAsyncBufferAggr(NYql::NDqProto::TDqAsyncBufferStatsAggr& stats);
void AdjustDqStatsAggr(NYql::NDqProto::TDqStatsAggr& stats);
Expand Down
32 changes: 32 additions & 0 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2921,6 +2921,38 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
for (auto ingress : (*stat)->GetIngress()) {
auto& ingressInfo = ingressStats.AppendValue(NJson::JSON_MAP);
ingressInfo["Name"] = ingress.first;
if (ingress.second.HasExternal()) {
auto& node = ingressInfo.InsertValue("External", NJson::JSON_MAP);
auto& externalInfo = ingress.second.GetExternal();
if (externalInfo.HasExternalRows()) {
FillAggrStat(node, externalInfo.GetExternalRows(), "ExternalRows");
}
if (externalInfo.HasExternalBytes()) {
FillAggrStat(node, externalInfo.GetExternalBytes(), "ExternalBytes");
}
if (externalInfo.HasStorageRows()) {
FillAggrStat(node, externalInfo.GetStorageRows(), "StorageRows");
}
if (externalInfo.HasStorageBytes()) {
FillAggrStat(node, externalInfo.GetStorageBytes(), "StorageBytes");
}
if (externalInfo.HasCpuTimeUs()) {
FillAggrStat(node, externalInfo.GetCpuTimeUs(), "CpuTimeUs");
}
if (externalInfo.HasWaitInputTimeUs()) {
FillAggrStat(node, externalInfo.GetWaitInputTimeUs(), "WaitInputTimeUs");
}
if (externalInfo.HasWaitOutputTimeUs()) {
FillAggrStat(node, externalInfo.GetWaitOutputTimeUs(), "WaitOutputTimeUs");
}
if (externalInfo.HasFirstMessageMs()) {
FillAggrStat(node, externalInfo.GetFirstMessageMs(), "FirstMessageMs");
}
if (externalInfo.HasLastMessageMs()) {
FillAggrStat(node, externalInfo.GetLastMessageMs(), "LastMessageMs");
}
SetNonZero(node, "PartitionCount", externalInfo.GetPartitionCount());
}
if (ingress.second.HasIngress()) {
FillAsyncAggrStat(ingressInfo.InsertValue("Ingress", NJson::JSON_MAP), ingress.second.GetIngress());
}
Expand Down
Loading

0 comments on commit f97d523

Please sign in to comment.