Skip to content

Commit

Permalink
Ignore empty metrics in in-progress stats. Try-catch in-progress stat…
Browse files Browse the repository at this point in the history
…s conversion. (#633)
  • Loading branch information
Hor911 authored Dec 25, 2023
1 parent fa79cd4 commit fb722f7
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 148 deletions.
62 changes: 33 additions & 29 deletions ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,42 +155,46 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
}

void ReportPublicCounters(const Ydb::TableStats::QueryStats& stats) {
auto stat = GetPublicStat(GetV1StatFromV2Plan(stats.query_plan()));
auto publicCounters = GetPublicCounters();
try {
auto stat = GetPublicStat(GetV1StatFromV2Plan(stats.query_plan()));
auto publicCounters = GetPublicCounters();

if (stat.MemoryUsageBytes) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.memory_usage_bytes");
counter = *stat.MemoryUsageBytes;
}
if (stat.MemoryUsageBytes) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.memory_usage_bytes");
counter = *stat.MemoryUsageBytes;
}

if (stat.CpuUsageUs) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.cpu_usage_us", true);
counter = *stat.CpuUsageUs;
}
if (stat.CpuUsageUs) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.cpu_usage_us", true);
counter = *stat.CpuUsageUs;
}

if (stat.InputBytes) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.input_bytes", true);
counter = *stat.InputBytes;
}
if (stat.InputBytes) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.input_bytes", true);
counter = *stat.InputBytes;
}

if (stat.OutputBytes) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.output_bytes", true);
counter = *stat.OutputBytes;
}
if (stat.OutputBytes) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.output_bytes", true);
counter = *stat.OutputBytes;
}

if (stat.SourceInputRecords) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.source_input_records", true);
counter = *stat.SourceInputRecords;
}
if (stat.SourceInputRecords) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.source_input_records", true);
counter = *stat.SourceInputRecords;
}

if (stat.SinkOutputRecords) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.sink_output_records", true);
counter = *stat.SinkOutputRecords;
}
if (stat.SinkOutputRecords) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.sink_output_records", true);
counter = *stat.SinkOutputRecords;
}

if (stat.RunningTasks) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.running_tasks");
counter = *stat.RunningTasks;
if (stat.RunningTasks) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.running_tasks");
counter = *stat.RunningTasks;
}
} catch(const NJson::TJsonException& ex) {
LOG_E("Error statistics conversion: " << ex.what());
}
}

Expand Down
160 changes: 88 additions & 72 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
EgressRows.resize(taskCount);
EgressBytes.resize(taskCount);

FirstRowTimeMs.resize(taskCount);
FinishTimeMs.resize(taskCount);
StartTimeMs.resize(taskCount);
DurationUs.resize(taskCount);
WaitInputTimeUs.resize(taskCount);
WaitOutputTimeUs.resize(taskCount);

for (auto& p : Ingress) p.second.Resize(taskCount);
for (auto& p : Egress) p.second.Resize(taskCount);
Expand All @@ -64,26 +66,32 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
MaxMemoryUsage.resize(taskCount);
}

void SetNonZero(ui64& target, ui64 source) {
if (source) {
target = source;
}
}

void TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) {
aggrAsyncStats.Bytes[index] = asyncStats.GetBytes();
aggrAsyncStats.Rows[index] = asyncStats.GetRows();
aggrAsyncStats.Chunks[index] = asyncStats.GetChunks();
aggrAsyncStats.Splits[index] = asyncStats.GetSplits();
SetNonZero(aggrAsyncStats.Bytes[index], asyncStats.GetBytes());
SetNonZero(aggrAsyncStats.Rows[index], asyncStats.GetRows());
SetNonZero(aggrAsyncStats.Chunks[index], asyncStats.GetChunks());
SetNonZero(aggrAsyncStats.Splits[index], asyncStats.GetSplits());

auto firstMessageMs = asyncStats.GetFirstMessageMs();
aggrAsyncStats.FirstMessageMs[index] = firstMessageMs;
aggrAsyncStats.PauseMessageMs[index] = asyncStats.GetPauseMessageMs();
aggrAsyncStats.ResumeMessageMs[index] = asyncStats.GetResumeMessageMs();
SetNonZero(aggrAsyncStats.FirstMessageMs[index], firstMessageMs);
SetNonZero(aggrAsyncStats.PauseMessageMs[index], asyncStats.GetPauseMessageMs());
SetNonZero(aggrAsyncStats.ResumeMessageMs[index], asyncStats.GetResumeMessageMs());
auto lastMessageMs = asyncStats.GetLastMessageMs();
aggrAsyncStats.LastMessageMs[index] = lastMessageMs;
aggrAsyncStats.WaitTimeUs[index] = asyncStats.GetWaitTimeUs();
aggrAsyncStats.WaitPeriods[index] = asyncStats.GetWaitPeriods();
SetNonZero(aggrAsyncStats.LastMessageMs[index], lastMessageMs);
SetNonZero(aggrAsyncStats.WaitTimeUs[index], asyncStats.GetWaitTimeUs());
SetNonZero(aggrAsyncStats.WaitPeriods[index], asyncStats.GetWaitPeriods());
if (firstMessageMs && lastMessageMs > firstMessageMs) {
aggrAsyncStats.ActiveTimeUs[index] = lastMessageMs - firstMessageMs;
}
}

void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage) {
void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs) {
auto taskId = taskStats.GetTaskId();
auto it = Task2Index.find(taskId);

Expand All @@ -98,44 +106,48 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
index = it->second;
}

CpuTimeUs[index] = taskStats.GetCpuTimeUs();
SourceCpuTimeUs[index] = taskStats.GetSourceCpuTimeUs();
SetNonZero(CpuTimeUs[index], taskStats.GetCpuTimeUs());
SetNonZero(SourceCpuTimeUs[index], taskStats.GetSourceCpuTimeUs());

InputRows[index] = taskStats.GetInputRows();
InputBytes[index] = taskStats.GetInputBytes();
OutputRows[index] = taskStats.GetOutputRows();
OutputBytes[index] = taskStats.GetOutputBytes();
ResultRows[index] = taskStats.GetResultRows();
ResultBytes[index] = taskStats.GetResultBytes();
IngressRows[index] = taskStats.GetIngressRows();
IngressBytes[index] = taskStats.GetIngressBytes();
EgressRows[index] = taskStats.GetEgressRows();
EgressBytes[index] = taskStats.GetEgressBytes();
SetNonZero(InputRows[index], taskStats.GetInputRows());
SetNonZero(InputBytes[index], taskStats.GetInputBytes());
SetNonZero(OutputRows[index], taskStats.GetOutputRows());
SetNonZero(OutputBytes[index], taskStats.GetOutputBytes());
SetNonZero(ResultRows[index], taskStats.GetResultRows());
SetNonZero(ResultBytes[index], taskStats.GetResultBytes());
SetNonZero(IngressRows[index], taskStats.GetIngressRows());
SetNonZero(IngressBytes[index], taskStats.GetIngressBytes());
SetNonZero(EgressRows[index], taskStats.GetEgressRows());
SetNonZero(EgressBytes[index], taskStats.GetEgressBytes());

StartTimeMs[index] = taskStats.GetStartTimeMs(); // to be reviewed
FirstRowTimeMs[index] = taskStats.GetFirstRowTimeMs(); // to be reviewed
FinishTimeMs[index] = taskStats.GetFinishTimeMs(); // to be reviewed
SetNonZero(StartTimeMs[index], taskStats.GetStartTimeMs());
SetNonZero(FinishTimeMs[index], taskStats.GetFinishTimeMs());
SetNonZero(DurationUs[index], durationUs);
SetNonZero(WaitInputTimeUs[index], taskStats.GetWaitInputTimeUs());
SetNonZero(WaitOutputTimeUs[index], taskStats.GetWaitOutputTimeUs());

for (auto& tableStat : taskStats.GetTables()) {
auto tablePath = tableStat.GetTablePath();
auto [it, inserted] = Tables.try_emplace(tablePath, taskCount);
auto& aggrTableStats = it->second;
aggrTableStats.ReadRows[index] = tableStat.GetReadRows();
aggrTableStats.ReadBytes[index] = tableStat.GetReadBytes();
aggrTableStats.WriteRows[index] = tableStat.GetWriteRows();
aggrTableStats.WriteBytes[index] = tableStat.GetWriteBytes();
aggrTableStats.EraseRows[index] = tableStat.GetEraseRows();
aggrTableStats.EraseBytes[index] = tableStat.GetEraseBytes();
aggrTableStats.AffectedPartitions[index] = tableStat.GetAffectedPartitions();
SetNonZero(aggrTableStats.ReadRows[index], tableStat.GetReadRows());
SetNonZero(aggrTableStats.ReadBytes[index], tableStat.GetReadBytes());
SetNonZero(aggrTableStats.WriteRows[index], tableStat.GetWriteRows());
SetNonZero(aggrTableStats.WriteBytes[index], tableStat.GetWriteBytes());
SetNonZero(aggrTableStats.EraseRows[index], tableStat.GetEraseRows());
SetNonZero(aggrTableStats.EraseBytes[index], tableStat.GetEraseBytes());
SetNonZero(aggrTableStats.AffectedPartitions[index], tableStat.GetAffectedPartitions());
}

for (auto& sourceStat : taskStats.GetSources()) {
auto ingressName = sourceStat.GetIngressName();
auto [it, inserted] = Ingress.try_emplace(ingressName, taskCount);
auto& asyncBufferStats = it->second;
UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress());
UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush());
UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop());
if (ingressName) {
auto [it, inserted] = Ingress.try_emplace(ingressName, taskCount);
auto& asyncBufferStats = it->second;
UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress());
UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush());
UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop());
}
}

for (auto& inputChannelStat : taskStats.GetInputChannels()) {
Expand All @@ -156,14 +168,16 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS

for (auto& sinkStat : taskStats.GetSinks()) {
auto egressName = sinkStat.GetEgressName();
auto [it, inserted] = Egress.try_emplace(egressName, taskCount);
auto& asyncBufferStats = it->second;
UpdateAsyncStats(index, asyncBufferStats.Push, sinkStat.GetPush());
UpdateAsyncStats(index, asyncBufferStats.Pop, sinkStat.GetPop());
UpdateAsyncStats(index, asyncBufferStats.Ingress, sinkStat.GetEgress());
if (egressName) {
auto [it, inserted] = Egress.try_emplace(egressName, taskCount);
auto& asyncBufferStats = it->second;
UpdateAsyncStats(index, asyncBufferStats.Push, sinkStat.GetPush());
UpdateAsyncStats(index, asyncBufferStats.Pop, sinkStat.GetPop());
UpdateAsyncStats(index, asyncBufferStats.Ingress, sinkStat.GetEgress());
}
}

MaxMemoryUsage[index] = maxMemoryUsage;
SetNonZero(MaxMemoryUsage[index], maxMemoryUsage);
}

namespace {
Expand Down Expand Up @@ -235,17 +249,6 @@ void UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDq
}
}

void UpdateMinMax(NDqProto::TDqStatsMinMax* minMax, ui64 value) noexcept {
if (value) {
if (minMax->GetMin() == 0) {
minMax->SetMin(value);
} else {
minMax->SetMin(std::min(minMax->GetMin(), value));
}
minMax->SetMax(std::max(minMax->GetMax(), value));
}
}

NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDq::TStageId& stageId,
const TKqpTasksGraph& tasksGraph, NDqProto::TDqExecutionStats& execStats)
{
Expand Down Expand Up @@ -324,6 +327,16 @@ bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode) {
return statsMode >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE;
}

void TQueryExecutionStats::FillStageDurationUs(NYql::NDqProto::TDqStageStats& stats) {
if (stats.HasStartTimeMs() && stats.HasFinishTimeMs()) {
auto startTimeMs = stats.GetStartTimeMs().GetMin();
auto finishTimeMs = stats.GetFinishTimeMs().GetMin();
if (startTimeMs && finishTimeMs > startTimeMs) {
stats.SetStageDurationUs((finishTimeMs - startTimeMs) * 1'000);
}
}
}

void TQueryExecutionStats::AddComputeActorFullStatsByTask(
const NYql::NDqProto::TDqTaskStats& task,
const NYql::NDqProto::TDqComputeActorStats& stats
Expand All @@ -345,11 +358,12 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask(
UpdateAggr(stageStats->MutableEgressRows(), task.GetEgressRows());
UpdateAggr(stageStats->MutableEgressBytes(), task.GetEgressBytes());

UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed
UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed
UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed

stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000);
UpdateAggr(stageStats->MutableStartTimeMs(), task.GetStartTimeMs());
UpdateAggr(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs());
UpdateAggr(stageStats->MutableDurationUs(), stats.GetDurationUs());
UpdateAggr(stageStats->MutableWaitInputTimeUs(), task.GetWaitInputTimeUs());
UpdateAggr(stageStats->MutableWaitOutputTimeUs(), task.GetWaitOutputTimeUs());
FillStageDurationUs(*stageStats);

for (auto& sourcesStat : task.GetSources()) {
UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutableIngress(), sourcesStat.GetIngress());
Expand Down Expand Up @@ -476,11 +490,12 @@ void TQueryExecutionStats::AddDatashardFullStatsByTask(
UpdateAggr(stageStats->MutableOutputRows(), task.GetOutputRows());
UpdateAggr(stageStats->MutableOutputBytes(), task.GetOutputBytes());

UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed
UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed
UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed

stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000);
UpdateAggr(stageStats->MutableStartTimeMs(), task.GetStartTimeMs());
UpdateAggr(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs());
// UpdateAggr(stageStats->MutableDurationUs(), ??? );
UpdateAggr(stageStats->MutableWaitInputTimeUs(), task.GetWaitInputTimeUs());
UpdateAggr(stageStats->MutableWaitOutputTimeUs(), task.GetWaitOutputTimeUs());
FillStageDurationUs(*stageStats);

for (auto& tableStats: task.GetTables()) {
auto* tableAggrStats = GetOrCreateTableAggrStats(stageStats, tableStats.GetTablePath());
Expand Down Expand Up @@ -598,7 +613,7 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
if (inserted) {
it->second.StageId = TasksGraph->GetTask(taskStats.GetTaskId()).StageId;
}
it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage());
it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage(), stats.GetDurationUs());
}

void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {
Expand Down Expand Up @@ -701,11 +716,12 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
ExportAggStats(p.second.EgressRows, *stageStats.MutableEgressRows());
ExportAggStats(p.second.EgressBytes, *stageStats.MutableEgressBytes());

ExportAggStats(p.second.StartTimeMs, *stageStats.MutableStartTimeMs()); // to be reviewed
ExportAggStats(p.second.FirstRowTimeMs, *stageStats.MutableFirstRowTimeMs()); // to be reviewed
ExportAggStats(p.second.FinishTimeMs, *stageStats.MutableFinishTimeMs()); // to be reviewed

stageStats.SetDurationUs((stageStats.GetFinishTimeMs().GetMax() - stageStats.GetStartTimeMs().GetMin()) * 1'000);
ExportAggStats(p.second.StartTimeMs, *stageStats.MutableStartTimeMs());
ExportAggStats(p.second.FinishTimeMs, *stageStats.MutableFinishTimeMs());
ExportAggStats(p.second.DurationUs, *stageStats.MutableDurationUs());
ExportAggStats(p.second.WaitInputTimeUs, *stageStats.MutableWaitInputTimeUs());
ExportAggStats(p.second.WaitOutputTimeUs, *stageStats.MutableWaitOutputTimeUs());
FillStageDurationUs(stageStats);

for (auto& p2 : p.second.Tables) {
auto& table = *stageStats.AddTables();
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@ struct TStageExecutionStats {
std::vector<ui64> EgressRows;
std::vector<ui64> EgressBytes;

std::vector<ui64> FirstRowTimeMs;
std::vector<ui64> FinishTimeMs;
std::vector<ui64> StartTimeMs;
std::vector<ui64> DurationUs;
std::vector<ui64> WaitInputTimeUs;
std::vector<ui64> WaitOutputTimeUs;

std::map<TString, TTableStats> Tables;
std::map<TString, TAsyncBufferStats> Ingress;
Expand All @@ -98,7 +100,7 @@ struct TStageExecutionStats {

void Resize(ui32 taskCount);
void UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats);
void UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage);
void UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs);
};

struct TQueryExecutionStats {
Expand Down Expand Up @@ -162,6 +164,7 @@ struct TQueryExecutionStats {

void UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats);
void ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats);
void FillStageDurationUs(NYql::NDqProto::TDqStageStats& stats);

void Finish();

Expand Down
Loading

0 comments on commit fb722f7

Please sign in to comment.