Skip to content

Commit

Permalink
WaitTimes viz and analysis
Browse files Browse the repository at this point in the history
External stages

Precise WaitInputTime

External stats with cardinality

Cleanup
  • Loading branch information
Hor911 committed Feb 13, 2025
1 parent 41acb39 commit a35c6d0
Show file tree
Hide file tree
Showing 10 changed files with 681 additions and 191 deletions.
32 changes: 28 additions & 4 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,36 @@ void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, b
auto* taskStats = dst->MutableTasks(0);
auto* tableStats = taskStats->AddTables();

auto& sourceStats = *taskStats->AddSources();

// sourceStats.SetInputIndex(0); // do not have real input index
sourceStats.SetIngressName("CS");

auto& ingressStats = *sourceStats.MutableIngress();

tableStats->SetTablePath(ScanData->TablePath);

if (auto* x = ScanData->BasicStats.get()) {
tableStats->SetReadRows(x->Rows);
tableStats->SetReadBytes(x->Bytes);
tableStats->SetAffectedPartitions(x->AffectedShards);
if (auto* stats = ScanData->BasicStats.get()) {
ingressStats.SetRows(stats->Rows);
ingressStats.SetBytes(stats->Bytes);
ingressStats.SetFirstMessageMs(stats->FirstMessageMs);
ingressStats.SetLastMessageMs(stats->LastMessageMs);

for (auto& [shardId, stat] : stats->ExternalStats) {
auto& externalStat = *sourceStats.AddExternalPartitions();
externalStat.SetPartitionId(ToString(shardId));
externalStat.SetExternalRows(stat.ExternalRows);
externalStat.SetExternalBytes(stat.ExternalBytes);
externalStat.SetFirstMessageMs(stat.FirstMessageMs);
externalStat.SetLastMessageMs(stat.LastMessageMs);
}

taskStats->SetIngressRows(taskStats->GetIngressRows() + stats->Rows);
taskStats->SetIngressBytes(taskStats->GetIngressBytes() + stats->Bytes);

tableStats->SetReadRows(stats->Rows);
tableStats->SetReadBytes(stats->Bytes);
tableStats->SetAffectedPartitions(stats->AffectedShards);
// TODO: CpuTime
}

Expand Down
28 changes: 14 additions & 14 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,20 +425,22 @@ class TKqpExecuterBase : public TActor<TDerived> {

YQL_ENSURE(Stats);

if (state.HasStats() && Request.ProgressStatsPeriod) {
if (state.HasStats()) {
Stats->UpdateTaskStats(taskId, state.GetStats());
auto now = TInstant::Now();
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
auto progress = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>();
auto& execStats = *progress->Record.MutableQueryStats()->AddExecutions();
Stats->ExportExecStats(execStats);
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
const auto& tx = Request.Transactions[txId].Body;
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), execStats);
execStats.AddTxPlansWithStats(planWithStats);
if (Request.ProgressStatsPeriod) {
auto now = TInstant::Now();
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
auto progress = MakeHolder<TEvKqpExecuter::TEvExecuterProgress>();
auto& execStats = *progress->Record.MutableQueryStats()->AddExecutions();
Stats->ExportExecStats(execStats);
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
const auto& tx = Request.Transactions[txId].Body;
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), execStats);
execStats.AddTxPlansWithStats(planWithStats);
}
this->Send(Target, progress.Release());
LastProgressStats = now;
}
this->Send(Target, progress.Release());
LastProgressStats = now;
}
}

Expand Down Expand Up @@ -1351,7 +1353,6 @@ class TKqpExecuterBase : public TActor<TDerived> {
if (isFullScan && !source.HasItemsLimit()) {
Counters->Counters->FullScansExecuted->Inc();
}

if (partitions.size() > 0 && source.GetSequentialInFlightShards() > 0 && partitions.size() > source.GetSequentialInFlightShards()) {
auto [startShard, shardInfo] = MakeVirtualTablePartition(source, stageInfo, HolderFactory(), TypeEnv());

Expand Down Expand Up @@ -1676,7 +1677,6 @@ class TKqpExecuterBase : public TActor<TDerived> {
// not supported for scan queries
YQL_ENSURE(!readSettings.Reverse);
}

for (auto&& i: partitions) {
const ui64 nodeId = ShardIdToNodeId.at(i.first);
nodeShards[nodeId].emplace_back(TShardInfoWithId(i.first, std::move(i.second)));
Expand Down
129 changes: 115 additions & 14 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,9 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
FinishTimeMs.resize(taskCount);
StartTimeMs.resize(taskCount);
DurationUs.resize(taskCount);
WaitInputTimeUs.resize(taskCount);
WaitOutputTimeUs.resize(taskCount);

WaitInputTimeUs.Resize(taskCount);
WaitOutputTimeUs.Resize(taskCount);

SpillingComputeBytes.Resize(taskCount);
SpillingChannelBytes.Resize(taskCount);
Expand All @@ -215,6 +216,10 @@ void TStageExecutionStats::SetHistorySampleCount(ui32 historySampleCount) {
HistorySampleCount = historySampleCount;
CpuTimeUs.HistorySampleCount = historySampleCount;
MaxMemoryUsage.HistorySampleCount = historySampleCount;

WaitInputTimeUs.HistorySampleCount = historySampleCount;
WaitOutputTimeUs.HistorySampleCount = historySampleCount;

SpillingComputeBytes.HistorySampleCount = historySampleCount;
SpillingChannelBytes.HistorySampleCount = historySampleCount;
SpillingComputeTimeUs.HistorySampleCount = historySampleCount;
Expand Down Expand Up @@ -252,6 +257,12 @@ void TStageExecutionStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqSta
if (stageStats.HasMaxMemoryUsage()) {
MaxMemoryUsage.ExportHistory(baseTimeMs, *stageStats.MutableMaxMemoryUsage());
}
if (stageStats.HasWaitInputTimeUs()) {
WaitInputTimeUs.ExportHistory(baseTimeMs, *stageStats.MutableWaitInputTimeUs());
}
if (stageStats.HasWaitOutputTimeUs()) {
WaitOutputTimeUs.ExportHistory(baseTimeMs, *stageStats.MutableWaitOutputTimeUs());
}
if (stageStats.HasSpillingComputeBytes()) {
SpillingComputeBytes.ExportHistory(baseTimeMs, *stageStats.MutableSpillingComputeBytes());
}
Expand Down Expand Up @@ -346,8 +357,8 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
baseTimeMs = NonZeroMin(baseTimeMs, finishTimeMs);

SetNonZero(DurationUs[index], durationUs);
SetNonZero(WaitInputTimeUs[index], taskStats.GetWaitInputTimeUs());
SetNonZero(WaitOutputTimeUs[index], taskStats.GetWaitOutputTimeUs());
WaitInputTimeUs.SetNonZero(index, taskStats.GetWaitInputTimeUs());
WaitOutputTimeUs.SetNonZero(index, taskStats.GetWaitOutputTimeUs());

SpillingComputeBytes.SetNonZero(index, taskStats.GetSpillingComputeWriteBytes());
SpillingChannelBytes.SetNonZero(index, taskStats.GetSpillingChannelWriteBytes());
Expand Down Expand Up @@ -466,6 +477,13 @@ TProgressStatEntry operator - (const TProgressStatEntry& l, const TProgressStatE
};
}

void MergeAggr(NDqProto::TDqStatsAggr& aggr, const NDqProto::TDqStatsAggr& stat) noexcept {
aggr.SetMin(NonZeroMin(aggr.GetMin(), stat.GetMin()));
aggr.SetMax(std::max(aggr.GetMax(), stat.GetMax()));
aggr.SetSum(aggr.GetSum() + stat.GetSum());
aggr.SetCnt(aggr.GetCnt() + stat.GetCnt());
}

void UpdateAggr(NDqProto::TDqStatsAggr* aggr, ui64 value) noexcept {
if (value) {
if (aggr->GetMin() == 0) {
Expand All @@ -479,6 +497,19 @@ void UpdateAggr(NDqProto::TDqStatsAggr* aggr, ui64 value) noexcept {
}
}

void MergeExternal(NDqProto::TDqExternalAggrStats& asyncAggr, const NDqProto::TDqExternalAggrStats& asyncStat) noexcept {
MergeAggr(*asyncAggr.MutableExternalRows(), asyncStat.GetExternalRows());
MergeAggr(*asyncAggr.MutableExternalBytes(), asyncStat.GetExternalBytes());
MergeAggr(*asyncAggr.MutableStorageRows(), asyncStat.GetStorageRows());
MergeAggr(*asyncAggr.MutableStorageBytes(), asyncStat.GetStorageBytes());
MergeAggr(*asyncAggr.MutableCpuTimeUs(), asyncStat.GetCpuTimeUs());
MergeAggr(*asyncAggr.MutableWaitInputTimeUs(), asyncStat.GetWaitInputTimeUs());
MergeAggr(*asyncAggr.MutableWaitOutputTimeUs(), asyncStat.GetWaitOutputTimeUs());
MergeAggr(*asyncAggr.MutableFirstMessageMs(), asyncStat.GetFirstMessageMs());
MergeAggr(*asyncAggr.MutableLastMessageMs(), asyncStat.GetLastMessageMs());
asyncAggr.SetPartitionCount(asyncAggr.GetPartitionCount() + asyncStat.GetExternalRows().GetCnt());
}

ui64 UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDqAsyncBufferStats& asyncStat) noexcept {
ui64 baseTimeMs = 0;

Expand Down Expand Up @@ -615,7 +646,7 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask(
const NYql::NDqProto::TDqComputeActorStats& stats
) {
auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result);

// Cerr << task.DebugString() << Endl;
stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1);
UpdateAggr(stageStats->MutableMaxMemoryUsage(), stats.GetMaxMemoryUsage()); // only 1 task per CA now
UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs());
Expand Down Expand Up @@ -652,9 +683,27 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask(
FillStageDurationUs(*stageStats);

for (auto& sourcesStat : task.GetSources()) {
BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutableIngress(), sourcesStat.GetIngress()));
BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePush(), sourcesStat.GetPush()));
BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutablePop(), sourcesStat.GetPop()));
auto& ingress = (*stageStats->MutableIngress())[sourcesStat.GetIngressName()];
MergeExternal(*ingress.MutableExternal(), sourcesStat.GetExternal());

const auto& [it, inserted] = ExternalPartitionStats.emplace(stageStats->GetStageId(), sourcesStat.GetIngressName());
auto& externalPartitionStat = it->second;

for (auto& externalPartition : sourcesStat.GetExternalPartitions()) {
const auto& [it, inserted] = externalPartitionStat.Stat.emplace(externalPartition.GetPartitionId(),
TExternalPartitionStat(externalPartition.GetExternalRows(), externalPartition.GetExternalBytes(),
externalPartition.GetFirstMessageMs(), externalPartition.GetLastMessageMs()));
if (!inserted) {
it->second.ExternalRows += externalPartition.GetExternalRows();
it->second.ExternalBytes += externalPartition.GetExternalBytes();
it->second.FirstMessageMs = NonZeroMin(it->second.FirstMessageMs, externalPartition.GetFirstMessageMs());
it->second.LastMessageMs = std::max(it->second.LastMessageMs, externalPartition.GetLastMessageMs());
}
}

BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*ingress.MutableIngress(), sourcesStat.GetIngress()));
BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*ingress.MutablePush(), sourcesStat.GetPush()));
BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*ingress.MutablePop(), sourcesStat.GetPop()));
}
for (auto& inputChannelStat : task.GetInputChannels()) {
BaseTimeMs = NonZeroMin(BaseTimeMs, UpdateAsyncAggr(*(*stageStats->MutableInput())[inputChannelStat.GetSrcStageId()].MutablePush(), inputChannelStat.GetPush()));
Expand Down Expand Up @@ -823,9 +872,14 @@ void TQueryExecutionStats::AddDatashardFullStatsByTask(
UpdateAggr(stageStats->MutableFinishTimeMs(), finishTimeMs);
BaseTimeMs = NonZeroMin(BaseTimeMs, finishTimeMs);

FillStageDurationUs(*stageStats);
UpdateAggr(stageStats->MutableWaitInputTimeUs(), task.GetWaitInputTimeUs());
UpdateAggr(stageStats->MutableWaitOutputTimeUs(), task.GetWaitOutputTimeUs());
FillStageDurationUs(*stageStats);

UpdateAggr(stageStats->MutableSpillingComputeBytes(), task.GetSpillingComputeWriteBytes());
UpdateAggr(stageStats->MutableSpillingChannelBytes(), task.GetSpillingChannelWriteBytes());
UpdateAggr(stageStats->MutableSpillingComputeTimeUs(), task.GetSpillingComputeReadTimeUs() + task.GetSpillingComputeWriteTimeUs());
UpdateAggr(stageStats->MutableSpillingChannelTimeUs(), task.GetSpillingChannelReadTimeUs() + task.GetSpillingChannelWriteTimeUs());

for (auto& tableStats: task.GetTables()) {
auto* tableAggrStats = GetOrCreateTableAggrStats(stageStats, tableStats.GetTablePath());
Expand Down Expand Up @@ -1094,8 +1148,8 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
ExportOffsetAggStats(stageStat.StartTimeMs, *stageStats.MutableStartTimeMs(), BaseTimeMs);
ExportOffsetAggStats(stageStat.FinishTimeMs, *stageStats.MutableFinishTimeMs(), BaseTimeMs);
ExportAggStats(stageStat.DurationUs, *stageStats.MutableDurationUs());
ExportAggStats(stageStat.WaitInputTimeUs, *stageStats.MutableWaitInputTimeUs());
ExportAggStats(stageStat.WaitOutputTimeUs, *stageStats.MutableWaitOutputTimeUs());
stageStat.WaitInputTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableWaitInputTimeUs());
stageStat.WaitOutputTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableWaitOutputTimeUs());

stageStat.SpillingComputeBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingComputeBytes());
stageStat.SpillingChannelBytes.ExportAggStats(BaseTimeMs, *stageStats.MutableSpillingChannelBytes());
Expand Down Expand Up @@ -1152,6 +1206,15 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
}
}

void TQueryExecutionStats::AdjustExternalAggr(NYql::NDqProto::TDqExternalAggrStats& stats) {
if (stats.HasFirstMessageMs()) {
AdjustDqStatsAggr(*stats.MutableFirstMessageMs());
}
if (stats.HasLastMessageMs()) {
AdjustDqStatsAggr(*stats.MutableLastMessageMs());
}
}

void TQueryExecutionStats::AdjustAsyncAggr(NYql::NDqProto::TDqAsyncStatsAggr& stats) {
if (stats.HasFirstMessageMs()) {
AdjustDqStatsAggr(*stats.MutableFirstMessageMs());
Expand All @@ -1168,6 +1231,9 @@ void TQueryExecutionStats::AdjustAsyncAggr(NYql::NDqProto::TDqAsyncStatsAggr& st
}

void TQueryExecutionStats::AdjustAsyncBufferAggr(NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
if (stats.HasExternal()) {
AdjustExternalAggr(*stats.MutableExternal());
}
if (stats.HasIngress()) {
AdjustAsyncAggr(*stats.MutableIngress());
}
Expand All @@ -1184,13 +1250,15 @@ void TQueryExecutionStats::AdjustAsyncBufferAggr(NYql::NDqProto::TDqAsyncBufferS

void TQueryExecutionStats::AdjustDqStatsAggr(NYql::NDqProto::TDqStatsAggr& stats) {
if (auto min = stats.GetMin()) {
stats.SetMin(min - BaseTimeMs);
stats.SetMin(min > BaseTimeMs ? min - BaseTimeMs : 0);
}
if (auto max = stats.GetMax()) {
stats.SetMax(max - BaseTimeMs);
stats.SetMax(max > BaseTimeMs ? max - BaseTimeMs : 0);
}
if (auto cnt = stats.GetCnt()) {
stats.SetSum(stats.GetSum() - BaseTimeMs * cnt);
auto sum = stats.GetSum();
auto baseSum = BaseTimeMs * cnt;
stats.SetSum(sum > baseSum ? sum - baseSum : 0);
}
}

Expand Down Expand Up @@ -1222,6 +1290,39 @@ void TQueryExecutionStats::Finish() {
for (auto& [stageId, stagetype] : TasksGraph->GetStagesInfo()) {
auto stageStats = GetOrCreateStageStats(stageId, *TasksGraph, *Result);
stageStats->SetBaseTimeMs(BaseTimeMs);

if (ExternalPartitionStats.contains(stageStats->GetStageId())) {
auto& externalPartitionStat = ExternalPartitionStats[stageStats->GetStageId()];
auto& ingress = (*stageStats->MutableIngress())[externalPartitionStat.Name];
auto& external = *ingress.MutableExternal();
for (auto& [partitionId, partitionStat] : externalPartitionStat.Stat) {
auto& externalRows = *external.MutableExternalRows();
externalRows.SetMin(NonZeroMin(externalRows.GetMin(), partitionStat.ExternalRows));
externalRows.SetMax(std::max(externalRows.GetMax(), partitionStat.ExternalRows));
externalRows.SetSum(externalRows.GetSum() + partitionStat.ExternalRows);
externalRows.SetCnt(externalRows.GetCnt() + 1);

auto& externalBytes = *external.MutableExternalBytes();
externalBytes.SetMin(NonZeroMin(externalBytes.GetMin(), partitionStat.ExternalBytes));
externalBytes.SetMax(std::max(externalBytes.GetMax(), partitionStat.ExternalBytes));
externalBytes.SetSum(externalBytes.GetSum() + partitionStat.ExternalBytes);
externalBytes.SetCnt(externalBytes.GetCnt() + 1);

auto& firstMessageMs = *external.MutableFirstMessageMs();
firstMessageMs.SetMin(NonZeroMin(firstMessageMs.GetMin(), partitionStat.FirstMessageMs));
firstMessageMs.SetMax(std::max(firstMessageMs.GetMax(), partitionStat.FirstMessageMs));
firstMessageMs.SetSum(firstMessageMs.GetSum() + partitionStat.FirstMessageMs);
firstMessageMs.SetCnt(firstMessageMs.GetCnt() + 1);

auto& lastMessageMs = *external.MutableLastMessageMs();
lastMessageMs.SetMin(NonZeroMin(lastMessageMs.GetMin(), partitionStat.LastMessageMs));
lastMessageMs.SetMax(std::max(lastMessageMs.GetMax(), partitionStat.LastMessageMs));
lastMessageMs.SetSum(lastMessageMs.GetSum() + partitionStat.LastMessageMs);
lastMessageMs.SetCnt(lastMessageMs.GetCnt() + 1);
}
external.SetPartitionCount(external.GetPartitionCount() + externalPartitionStat.Stat.size());
}

AdjustBaseTime(stageStats);
auto it = StageStats.find(stageId.StageId);
if (it != StageStats.end()) {
Expand Down
Loading

0 comments on commit a35c6d0

Please sign in to comment.