Skip to content

Commit

Permalink
Report finished tasks per stage (#14934)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 authored Feb 24, 2025
1 parent 8f70b94 commit 68cd155
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 14 deletions.
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
if (state.HasStats()) {
ui64 cycleCount = GetCycleCountFast();

Stats->UpdateTaskStats(taskId, state.GetStats());
Stats->UpdateTaskStats(taskId, state.GetStats(), (NYql::NDqProto::EComputeState) state.GetState());
if (Request.ProgressStatsPeriod) {
auto now = TInstant::Now();
if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
Expand Down Expand Up @@ -471,6 +471,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
Stats->AddComputeActorStats(
computeActor.NodeId(),
std::move(*state.MutableStats()),
(NYql::NDqProto::EComputeState) state.GetState(),
TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs())
);

Expand Down
25 changes: 19 additions & 6 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
for (auto& [_, a] : Aggregations) a.Resize(taskCount);

MaxMemoryUsage.Resize(taskCount);
Finished.resize(taskCount);
}

void TStageExecutionStats::SetHistorySampleCount(ui32 historySampleCount) {
Expand Down Expand Up @@ -320,7 +321,7 @@ ui64 TStageExecutionStats::UpdateAsyncStats(ui32 index, TAsyncStats& aggrAsyncSt
return baseTimeMs;
}

ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs) {
ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, NYql::NDqProto::EComputeState state, ui64 maxMemoryUsage, ui64 durationUs) {
auto taskId = taskStats.GetTaskId();
auto it = Task2Index.find(taskId);
ui64 baseTimeMs = 0;
Expand All @@ -339,6 +340,13 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
index = it->second;
}

if (state == NYql::NDqProto::COMPUTE_STATE_FINISHED) {
if (!Finished[index]) {
Finished[index] = true;
FinishedCount++;
}
}

CpuTimeUs.SetNonZero(index, taskStats.GetCpuTimeUs());
SetNonZero(SourceCpuTimeUs[index], taskStats.GetSourceCpuTimeUs());

Expand Down Expand Up @@ -661,11 +669,15 @@ ui64 TQueryExecutionStats::EstimateFinishMem() {

void TQueryExecutionStats::AddComputeActorFullStatsByTask(
const NYql::NDqProto::TDqTaskStats& task,
const NYql::NDqProto::TDqComputeActorStats& stats
const NYql::NDqProto::TDqComputeActorStats& stats,
NYql::NDqProto::EComputeState state
) {
auto* stageStats = GetOrCreateStageStats(task, *TasksGraph, *Result);

stageStats->SetTotalTasksCount(stageStats->GetTotalTasksCount() + 1);
if (state == NYql::NDqProto::COMPUTE_STATE_FINISHED) {
stageStats->SetFinishedTasksCount(stageStats->GetFinishedTasksCount() + 1);
}
UpdateAggr(stageStats->MutableMaxMemoryUsage(), stats.GetMaxMemoryUsage()); // only 1 task per CA now
UpdateAggr(stageStats->MutableCpuTimeUs(), task.GetCpuTimeUs());
UpdateAggr(stageStats->MutableSourceCpuTimeUs(), task.GetSourceCpuTimeUs());
Expand Down Expand Up @@ -782,7 +794,7 @@ void TQueryExecutionStats::AddComputeActorProfileStatsByTask(
}

void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProto::TDqComputeActorStats&& stats,
TDuration collectLongTaskStatsTimeout) {
NYql::NDqProto::EComputeState state, TDuration collectLongTaskStatsTimeout) {
// Cerr << (TStringBuilder() << "::AddComputeActorStats " << stats.DebugString() << Endl);

Result->SetCpuTimeUs(Result->GetCpuTimeUs() + stats.GetCpuTimeUs());
Expand Down Expand Up @@ -847,7 +859,7 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt

if (CollectFullStats(StatsMode)) {
for (const auto& task : stats.GetTasks()) {
AddComputeActorFullStatsByTask(task, stats);
AddComputeActorFullStatsByTask(task, stats, state);
}
}

Expand Down Expand Up @@ -1049,7 +1061,7 @@ void TQueryExecutionStats::AddBufferStats(NYql::NDqProto::TDqTaskStats&& taskSta
}
}

void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats) {
void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats, NYql::NDqProto::EComputeState state) {
Y_ASSERT(stats.GetTasks().size() == 1);
const NYql::NDqProto::TDqTaskStats& taskStats = stats.GetTasks(0);
Y_ASSERT(taskStats.GetTaskId() == taskId);
Expand All @@ -1059,7 +1071,7 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
it->second.StageId = TasksGraph->GetTask(taskStats.GetTaskId()).StageId;
it->second.SetHistorySampleCount(HistorySampleCount);
}
BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage(), stats.GetDurationUs()));
BaseTimeMs = NonZeroMin(BaseTimeMs, it->second.UpdateStats(taskStats, state, stats.GetMaxMemoryUsage(), stats.GetDurationUs()));
}

void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {
Expand Down Expand Up @@ -1219,6 +1231,7 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
for (auto& [stageId, stageStat] : StageStats) {
auto& stageStats = *protoStages[stageStat.StageId.StageId];
stageStats.SetTotalTasksCount(stageStat.Task2Index.size());
stageStats.SetFinishedTasksCount(stageStat.FinishedCount);

stageStats.SetBaseTimeMs(BaseTimeMs);
stageStat.CpuTimeUs.ExportAggStats(BaseTimeMs, *stageStats.MutableCpuTimeUs());
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "kqp_tasks_graph.h"
#include <util/generic/vector.h>
#include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/core/protos/query_stats.pb.h>
#include <ydb/library/yql/dq/runtime/dq_tasks_counters.h>

Expand Down Expand Up @@ -189,6 +190,8 @@ struct TStageExecutionStats {

ui32 HistorySampleCount = 0;
ui32 TaskCount = 0;
std::vector<bool> Finished;
ui32 FinishedCount = 0;

void Resize(ui32 taskCount);
ui32 EstimateMem() {
Expand All @@ -201,7 +204,7 @@ struct TStageExecutionStats {
void SetHistorySampleCount(ui32 historySampleCount);
void ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStageStats& stageStats);
ui64 UpdateAsyncStats(ui32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats);
ui64 UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs);
ui64 UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, NYql::NDqProto::EComputeState state, ui64 maxMemoryUsage, ui64 durationUs);
};

struct TExternalPartitionStat {
Expand Down Expand Up @@ -277,6 +280,7 @@ struct TQueryExecutionStats {
void AddComputeActorStats(
ui32 nodeId,
NYql::NDqProto::TDqComputeActorStats&& stats,
NYql::NDqProto::EComputeState state,
TDuration collectLongTaskStatsTimeout = TDuration::Max()
);
void AddNodeShardsCount(const ui32 stageId, const ui32 nodeId, const ui32 shardsCount) {
Expand All @@ -295,7 +299,7 @@ struct TQueryExecutionStats {
void AddDatashardStats(NKikimrQueryStats::TTxStats&& txStats);
void AddBufferStats(NYql::NDqProto::TDqTaskStats&& taskStats);

void UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats);
void UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats, NYql::NDqProto::EComputeState state);
void ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats);
void FillStageDurationUs(NYql::NDqProto::TDqStageStats& stats);
ui64 EstimateCollectMem();
Expand All @@ -305,7 +309,8 @@ struct TQueryExecutionStats {
private:
void AddComputeActorFullStatsByTask(
const NYql::NDqProto::TDqTaskStats& task,
const NYql::NDqProto::TDqComputeActorStats& stats);
const NYql::NDqProto::TDqComputeActorStats& stats,
NYql::NDqProto::EComputeState state);
void AddComputeActorProfileStatsByTask(
const NYql::NDqProto::TDqTaskStats& task,
const NYql::NDqProto::TDqComputeActorStats& stats,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class TKqpLiteralExecuter {

fakeComputeActorStats.SetDurationUs(elapsedMicros);

Stats->AddComputeActorStats(OwnerActor.NodeId(), std::move(fakeComputeActorStats));
Stats->AddComputeActorStats(OwnerActor.NodeId(), std::move(fakeComputeActorStats), NYql::NDqProto::COMPUTE_STATE_FINISHED);

Stats->ExecuterCpuTime = executerCpuTime;
Stats->FinishTs = Stats->StartTs + TDuration::MicroSeconds(elapsedMicros);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2845,6 +2845,7 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD

stats["PhysicalStageId"] = (*stat)->GetStageId();
stats["Tasks"] = (*stat)->GetTotalTasksCount();
stats["FinishedTasks"] = (*stat)->GetFinishedTasksCount();

stats["StageDurationUs"] = (*stat)->GetStageDurationUs();

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/dq/actors/protos/dq_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ message TDqStageStats {

uint32 TotalTasksCount = 5;
uint32 FailedTasksCount = 6;
uint32 FinishedTasksCount = 50;

TDqStatsAggr CpuTimeUs = 8;
TDqStatsAggr SourceCpuTimeUs = 25;
Expand Down
18 changes: 15 additions & 3 deletions ydb/public/lib/ydb_cli/common/plan2svg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,9 @@ void TPlan::LoadStage(std::shared_ptr<TStage> stage, const NJson::TJsonValue& no
stage->Tasks = tasksNode->GetIntegerSafe();
Tasks += stage->Tasks;
}
if (auto* finishedTasksNode = stage->StatsNode->GetValueByPath("FinishedTasks")) {
stage->FinishedTasks = finishedTasksNode->GetIntegerSafe();
}

if (auto* physicalStageIdNode = stage->StatsNode->GetValueByPath("PhysicalStageId")) {
stage->PhysicalStageId = physicalStageIdNode->GetIntegerSafe();
Expand Down Expand Up @@ -1554,11 +1557,20 @@ void TPlan::PrintSvg(ui64 maxTime, ui32& offsetY, TStringBuilder& background, TS
<< "' y='" << s->OffsetY + s->Height / 2 + offsetY + INTERNAL_TEXT_HEIGHT / 2 << "'>" << s->Tasks << "</text>" << Endl
<< "</g>" << Endl;
} else {
canvas
<< "<g><title>Stage " << s->PhysicalStageId << ", tasks: " << taskCount << "</title>" << Endl
canvas
<< "<g><title>Stage " << s->PhysicalStageId << ", tasks: " << s->Tasks << ", finished: " << s->FinishedTasks << "</title>" << Endl;
if (s->FinishedTasks && s->FinishedTasks <= s->Tasks) {
auto finishedHeight = s->Height * s->FinishedTasks / s->Tasks;
auto xx = Config.TaskLeft + Config.TaskWidth - Config.TaskWidth / 8;
canvas
<< "<line x1='" << xx << "' y1='" << s->OffsetY + offsetY + s->Height - finishedHeight
<< "' x2='" << xx << "' y2='" << s->OffsetY + offsetY + s->Height
<< "' stroke-width='" << Config.TaskWidth / 4 << "' stroke='" << Config.Palette.StageClone << "' stroke-dasharray='1,1' />" << Endl;
}
canvas
<< " <text text-anchor='end' font-family='Verdana' font-size='" << INTERNAL_TEXT_HEIGHT << "px' fill='" << Config.Palette.StageText
<< "' x='" << Config.TaskLeft + Config.TaskWidth - 2
<< "' y='" << s->OffsetY + s->Height / 2 + offsetY + INTERNAL_TEXT_HEIGHT / 2 << "'>" << taskCount << "</text>" << Endl
<< "' y='" << s->OffsetY + s->Height / 2 + offsetY + INTERNAL_TEXT_HEIGHT / 2 << "'>" << s->Tasks << "</text>" << Endl
<< "</g>" << Endl;
}

Expand Down
1 change: 1 addition & 0 deletions ydb/public/lib/ydb_cli/common/plan2svg.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class TStage {
ui32 PlanNodeId = 0;
ui32 PhysicalStageId = 0;
ui32 Tasks = 0;
ui32 FinishedTasks = 0;
const NJson::TJsonValue* StatsNode = nullptr;
ui64 MinTime = 0;
ui64 MaxTime = 0;
Expand Down

0 comments on commit 68cd155

Please sign in to comment.