From 56c00398f825ed3b4ef7af27d6afd9325746f91a Mon Sep 17 00:00:00 2001 From: Aleksandr Khoroshilov Date: Thu, 21 Dec 2023 13:45:38 +0000 Subject: [PATCH] Ignore empty metrics in in-progress stats. Try-catch in-progress stats conversion. --- .../libs/compute/ydb/status_tracker_actor.cpp | 62 +++---- .../kqp/executer_actor/kqp_executer_stats.cpp | 160 ++++++++++-------- .../kqp/executer_actor/kqp_executer_stats.h | 7 +- ydb/core/kqp/opt/kqp_query_plan.cpp | 39 +++-- .../dq/actors/compute/dq_compute_actor_impl.h | 26 ++- .../actors/compute/dq_compute_actor_stats.cpp | 63 ++++--- .../yql/dq/actors/protos/dq_stats.proto | 31 +++- .../dq/actors/task_controller_impl.h | 10 +- 8 files changed, 250 insertions(+), 148 deletions(-) diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp index f21c7e89827c..17b03cef9cbb 100644 --- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp @@ -155,42 +155,46 @@ class TStatusTrackerActor : public TBaseComputeActor { } void ReportPublicCounters(const Ydb::TableStats::QueryStats& stats) { - auto stat = GetPublicStat(GetV1StatFromV2Plan(stats.query_plan())); - auto publicCounters = GetPublicCounters(); + try { + auto stat = GetPublicStat(GetV1StatFromV2Plan(stats.query_plan())); + auto publicCounters = GetPublicCounters(); - if (stat.MemoryUsageBytes) { - auto& counter = *publicCounters->GetNamedCounter("name", "query.memory_usage_bytes"); - counter = *stat.MemoryUsageBytes; - } + if (stat.MemoryUsageBytes) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.memory_usage_bytes"); + counter = *stat.MemoryUsageBytes; + } - if (stat.CpuUsageUs) { - auto& counter = *publicCounters->GetNamedCounter("name", "query.cpu_usage_us", true); - counter = *stat.CpuUsageUs; - } + if (stat.CpuUsageUs) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.cpu_usage_us", true); + counter = *stat.CpuUsageUs; + } - if (stat.InputBytes) { - auto& counter = *publicCounters->GetNamedCounter("name", "query.input_bytes", true); - counter = *stat.InputBytes; - } + if (stat.InputBytes) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.input_bytes", true); + counter = *stat.InputBytes; + } - if (stat.OutputBytes) { - auto& counter = *publicCounters->GetNamedCounter("name", "query.output_bytes", true); - counter = *stat.OutputBytes; - } + if (stat.OutputBytes) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.output_bytes", true); + counter = *stat.OutputBytes; + } - if (stat.SourceInputRecords) { - auto& counter = *publicCounters->GetNamedCounter("name", "query.source_input_records", true); - counter = *stat.SourceInputRecords; - } + if (stat.SourceInputRecords) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.source_input_records", true); + counter = *stat.SourceInputRecords; + } - if (stat.SinkOutputRecords) { - auto& counter = *publicCounters->GetNamedCounter("name", "query.sink_output_records", true); - counter = *stat.SinkOutputRecords; - } + if (stat.SinkOutputRecords) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.sink_output_records", true); + counter = *stat.SinkOutputRecords; + } - if (stat.RunningTasks) { - auto& counter = *publicCounters->GetNamedCounter("name", "query.running_tasks"); - counter = *stat.RunningTasks; + if (stat.RunningTasks) { + auto& counter = *publicCounters->GetNamedCounter("name", "query.running_tasks"); + counter = *stat.RunningTasks; + } + } catch(const NJson::TJsonException& ex) { + LOG_E("Error statistics conversion: " << ex.what()); } } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index 70e969cee153..6b8b194b27d0 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -52,9 +52,11 @@ void TStageExecutionStats::Resize(ui32 taskCount) { EgressRows.resize(taskCount); EgressBytes.resize(taskCount); - FirstRowTimeMs.resize(taskCount); FinishTimeMs.resize(taskCount); StartTimeMs.resize(taskCount); + DurationUs.resize(taskCount); + WaitInputTimeUs.resize(taskCount); + WaitOutputTimeUs.resize(taskCount); for (auto& p : Ingress) p.second.Resize(taskCount); for (auto& p : Egress) p.second.Resize(taskCount); @@ -64,26 +66,32 @@ void TStageExecutionStats::Resize(ui32 taskCount) { MaxMemoryUsage.resize(taskCount); } +void SetNonZero(ui64& target, ui64 source) { + if (source) { + target = source; + } +} + void TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) { - aggrAsyncStats.Bytes[index] = asyncStats.GetBytes(); - aggrAsyncStats.Rows[index] = asyncStats.GetRows(); - aggrAsyncStats.Chunks[index] = asyncStats.GetChunks(); - aggrAsyncStats.Splits[index] = asyncStats.GetSplits(); + SetNonZero(aggrAsyncStats.Bytes[index], asyncStats.GetBytes()); + SetNonZero(aggrAsyncStats.Rows[index], asyncStats.GetRows()); + SetNonZero(aggrAsyncStats.Chunks[index], asyncStats.GetChunks()); + SetNonZero(aggrAsyncStats.Splits[index], asyncStats.GetSplits()); auto firstMessageMs = asyncStats.GetFirstMessageMs(); - aggrAsyncStats.FirstMessageMs[index] = firstMessageMs; - aggrAsyncStats.PauseMessageMs[index] = asyncStats.GetPauseMessageMs(); - aggrAsyncStats.ResumeMessageMs[index] = asyncStats.GetResumeMessageMs(); + SetNonZero(aggrAsyncStats.FirstMessageMs[index], firstMessageMs); + SetNonZero(aggrAsyncStats.PauseMessageMs[index], asyncStats.GetPauseMessageMs()); + SetNonZero(aggrAsyncStats.ResumeMessageMs[index], asyncStats.GetResumeMessageMs()); auto lastMessageMs = asyncStats.GetLastMessageMs(); - aggrAsyncStats.LastMessageMs[index] = lastMessageMs; - aggrAsyncStats.WaitTimeUs[index] = asyncStats.GetWaitTimeUs(); - aggrAsyncStats.WaitPeriods[index] = asyncStats.GetWaitPeriods(); + SetNonZero(aggrAsyncStats.LastMessageMs[index], lastMessageMs); + SetNonZero(aggrAsyncStats.WaitTimeUs[index], asyncStats.GetWaitTimeUs()); + SetNonZero(aggrAsyncStats.WaitPeriods[index], asyncStats.GetWaitPeriods()); if (firstMessageMs && lastMessageMs > firstMessageMs) { aggrAsyncStats.ActiveTimeUs[index] = lastMessageMs - firstMessageMs; } } -void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage) { +void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs) { auto taskId = taskStats.GetTaskId(); auto it = Task2Index.find(taskId); @@ -98,44 +106,48 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS index = it->second; } - CpuTimeUs[index] = taskStats.GetCpuTimeUs(); - SourceCpuTimeUs[index] = taskStats.GetSourceCpuTimeUs(); + SetNonZero(CpuTimeUs[index], taskStats.GetCpuTimeUs()); + SetNonZero(SourceCpuTimeUs[index], taskStats.GetSourceCpuTimeUs()); - InputRows[index] = taskStats.GetInputRows(); - InputBytes[index] = taskStats.GetInputBytes(); - OutputRows[index] = taskStats.GetOutputRows(); - OutputBytes[index] = taskStats.GetOutputBytes(); - ResultRows[index] = taskStats.GetResultRows(); - ResultBytes[index] = taskStats.GetResultBytes(); - IngressRows[index] = taskStats.GetIngressRows(); - IngressBytes[index] = taskStats.GetIngressBytes(); - EgressRows[index] = taskStats.GetEgressRows(); - EgressBytes[index] = taskStats.GetEgressBytes(); + SetNonZero(InputRows[index], taskStats.GetInputRows()); + SetNonZero(InputBytes[index], taskStats.GetInputBytes()); + SetNonZero(OutputRows[index], taskStats.GetOutputRows()); + SetNonZero(OutputBytes[index], taskStats.GetOutputBytes()); + SetNonZero(ResultRows[index], taskStats.GetResultRows()); + SetNonZero(ResultBytes[index], taskStats.GetResultBytes()); + SetNonZero(IngressRows[index], taskStats.GetIngressRows()); + SetNonZero(IngressBytes[index], taskStats.GetIngressBytes()); + SetNonZero(EgressRows[index], taskStats.GetEgressRows()); + SetNonZero(EgressBytes[index], taskStats.GetEgressBytes()); - StartTimeMs[index] = taskStats.GetStartTimeMs(); // to be reviewed - FirstRowTimeMs[index] = taskStats.GetFirstRowTimeMs(); // to be reviewed - FinishTimeMs[index] = taskStats.GetFinishTimeMs(); // to be reviewed + SetNonZero(StartTimeMs[index], taskStats.GetStartTimeMs()); + SetNonZero(FinishTimeMs[index], taskStats.GetFinishTimeMs()); + SetNonZero(DurationUs[index], durationUs); + SetNonZero(WaitInputTimeUs[index], taskStats.GetWaitInputTimeUs()); + SetNonZero(WaitOutputTimeUs[index], taskStats.GetWaitOutputTimeUs()); for (auto& tableStat : taskStats.GetTables()) { auto tablePath = tableStat.GetTablePath(); auto [it, inserted] = Tables.try_emplace(tablePath, taskCount); auto& aggrTableStats = it->second; - aggrTableStats.ReadRows[index] = tableStat.GetReadRows(); - aggrTableStats.ReadBytes[index] = tableStat.GetReadBytes(); - aggrTableStats.WriteRows[index] = tableStat.GetWriteRows(); - aggrTableStats.WriteBytes[index] = tableStat.GetWriteBytes(); - aggrTableStats.EraseRows[index] = tableStat.GetEraseRows(); - aggrTableStats.EraseBytes[index] = tableStat.GetEraseBytes(); - aggrTableStats.AffectedPartitions[index] = tableStat.GetAffectedPartitions(); + SetNonZero(aggrTableStats.ReadRows[index], tableStat.GetReadRows()); + SetNonZero(aggrTableStats.ReadBytes[index], tableStat.GetReadBytes()); + SetNonZero(aggrTableStats.WriteRows[index], tableStat.GetWriteRows()); + SetNonZero(aggrTableStats.WriteBytes[index], tableStat.GetWriteBytes()); + SetNonZero(aggrTableStats.EraseRows[index], tableStat.GetEraseRows()); + SetNonZero(aggrTableStats.EraseBytes[index], tableStat.GetEraseBytes()); + SetNonZero(aggrTableStats.AffectedPartitions[index], tableStat.GetAffectedPartitions()); } for (auto& sourceStat : taskStats.GetSources()) { auto ingressName = sourceStat.GetIngressName(); - auto [it, inserted] = Ingress.try_emplace(ingressName, taskCount); - auto& asyncBufferStats = it->second; - UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress()); - UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush()); - UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop()); + if (ingressName) { + auto [it, inserted] = Ingress.try_emplace(ingressName, taskCount); + auto& asyncBufferStats = it->second; + UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress()); + UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush()); + UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop()); + } } for (auto& inputChannelStat : taskStats.GetInputChannels()) { @@ -156,14 +168,16 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS for (auto& sinkStat : taskStats.GetSinks()) { auto egressName = sinkStat.GetEgressName(); - auto [it, inserted] = Egress.try_emplace(egressName, taskCount); - auto& asyncBufferStats = it->second; - UpdateAsyncStats(index, asyncBufferStats.Push, sinkStat.GetPush()); - UpdateAsyncStats(index, asyncBufferStats.Pop, sinkStat.GetPop()); - UpdateAsyncStats(index, asyncBufferStats.Ingress, sinkStat.GetEgress()); + if (egressName) { + auto [it, inserted] = Egress.try_emplace(egressName, taskCount); + auto& asyncBufferStats = it->second; + UpdateAsyncStats(index, asyncBufferStats.Push, sinkStat.GetPush()); + UpdateAsyncStats(index, asyncBufferStats.Pop, sinkStat.GetPop()); + UpdateAsyncStats(index, asyncBufferStats.Ingress, sinkStat.GetEgress()); + } } - MaxMemoryUsage[index] = maxMemoryUsage; + SetNonZero(MaxMemoryUsage[index], maxMemoryUsage); } namespace { @@ -235,17 +249,6 @@ void UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDq } } -void UpdateMinMax(NDqProto::TDqStatsMinMax* minMax, ui64 value) noexcept { - if (value) { - if (minMax->GetMin() == 0) { - minMax->SetMin(value); - } else { - minMax->SetMin(std::min(minMax->GetMin(), value)); - } - minMax->SetMax(std::max(minMax->GetMax(), value)); - } -} - NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDq::TStageId& stageId, const TKqpTasksGraph& tasksGraph, NDqProto::TDqExecutionStats& execStats) { @@ -324,6 +327,16 @@ bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode) { return statsMode >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE; } +void TQueryExecutionStats::FillStageDurationUs(NYql::NDqProto::TDqStageStats& stats) { + if (stats.HasStartTimeMs() && stats.HasFinishTimeMs()) { + auto startTimeMs = stats.GetStartTimeMs().GetMin(); + auto finishTimeMs = stats.GetFinishTimeMs().GetMin(); + if (startTimeMs && finishTimeMs > startTimeMs) { + stats.SetStageDurationUs((finishTimeMs - startTimeMs) * 1'000); + } + } +} + void TQueryExecutionStats::AddComputeActorFullStatsByTask( const NYql::NDqProto::TDqTaskStats& task, const NYql::NDqProto::TDqComputeActorStats& stats @@ -345,11 +358,12 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask( UpdateAggr(stageStats->MutableEgressRows(), task.GetEgressRows()); UpdateAggr(stageStats->MutableEgressBytes(), task.GetEgressBytes()); - UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed - UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed - UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed - - stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000); + UpdateAggr(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); + UpdateAggr(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); + UpdateAggr(stageStats->MutableDurationUs(), stats.GetDurationUs()); + UpdateAggr(stageStats->MutableWaitInputTimeUs(), task.GetWaitInputTimeUs()); + UpdateAggr(stageStats->MutableWaitOutputTimeUs(), task.GetWaitOutputTimeUs()); + FillStageDurationUs(*stageStats); for (auto& sourcesStat : task.GetSources()) { UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutableIngress(), sourcesStat.GetIngress()); @@ -476,11 +490,12 @@ void TQueryExecutionStats::AddDatashardFullStatsByTask( UpdateAggr(stageStats->MutableOutputRows(), task.GetOutputRows()); UpdateAggr(stageStats->MutableOutputBytes(), task.GetOutputBytes()); - UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed - UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed - UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed - - stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000); + UpdateAggr(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); + UpdateAggr(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); + // UpdateAggr(stageStats->MutableDurationUs(), ??? ); + UpdateAggr(stageStats->MutableWaitInputTimeUs(), task.GetWaitInputTimeUs()); + UpdateAggr(stageStats->MutableWaitOutputTimeUs(), task.GetWaitOutputTimeUs()); + FillStageDurationUs(*stageStats); for (auto& tableStats: task.GetTables()) { auto* tableAggrStats = GetOrCreateTableAggrStats(stageStats, tableStats.GetTablePath()); @@ -598,7 +613,7 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD if (inserted) { it->second.StageId = TasksGraph->GetTask(taskStats.GetTaskId()).StageId; } - it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage()); + it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage(), stats.GetDurationUs()); } void ExportAggStats(std::vector& data, NYql::NDqProto::TDqStatsMinMax& stats) { @@ -701,11 +716,12 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st ExportAggStats(p.second.EgressRows, *stageStats.MutableEgressRows()); ExportAggStats(p.second.EgressBytes, *stageStats.MutableEgressBytes()); - ExportAggStats(p.second.StartTimeMs, *stageStats.MutableStartTimeMs()); // to be reviewed - ExportAggStats(p.second.FirstRowTimeMs, *stageStats.MutableFirstRowTimeMs()); // to be reviewed - ExportAggStats(p.second.FinishTimeMs, *stageStats.MutableFinishTimeMs()); // to be reviewed - - stageStats.SetDurationUs((stageStats.GetFinishTimeMs().GetMax() - stageStats.GetStartTimeMs().GetMin()) * 1'000); + ExportAggStats(p.second.StartTimeMs, *stageStats.MutableStartTimeMs()); + ExportAggStats(p.second.FinishTimeMs, *stageStats.MutableFinishTimeMs()); + ExportAggStats(p.second.DurationUs, *stageStats.MutableDurationUs()); + ExportAggStats(p.second.WaitInputTimeUs, *stageStats.MutableWaitInputTimeUs()); + ExportAggStats(p.second.WaitOutputTimeUs, *stageStats.MutableWaitOutputTimeUs()); + FillStageDurationUs(stageStats); for (auto& p2 : p.second.Tables) { auto& table = *stageStats.AddTables(); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h index ac2c2d6efd76..bea43d882dab 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h @@ -84,9 +84,11 @@ struct TStageExecutionStats { std::vector EgressRows; std::vector EgressBytes; - std::vector FirstRowTimeMs; std::vector FinishTimeMs; std::vector StartTimeMs; + std::vector DurationUs; + std::vector WaitInputTimeUs; + std::vector WaitOutputTimeUs; std::map Tables; std::map Ingress; @@ -98,7 +100,7 @@ struct TStageExecutionStats { void Resize(ui32 taskCount); void UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats); - void UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage); + void UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs); }; struct TQueryExecutionStats { @@ -162,6 +164,7 @@ struct TQueryExecutionStats { void UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats); void ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats); + void FillStageDurationUs(NYql::NDqProto::TDqStageStats& stats); void Finish(); diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index 13bf2678a9e1..3dd70e1f2120 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -1813,16 +1813,24 @@ void FillAsyncAggrStat(NJson::TJsonValue& node, const NYql::NDqProto::TDqAsyncSt FillAggrStat(node, asyncAggr.GetActiveTimeUs(), "ActiveTimeUs"); } if (asyncAggr.HasFirstMessageMs() && asyncAggr.HasLastMessageMs()) { - auto& aggrStat = node.InsertValue("ActiveMessageMs", NJson::JSON_MAP); - aggrStat["Min"] = asyncAggr.GetFirstMessageMs().GetMin(); - aggrStat["Max"] = asyncAggr.GetLastMessageMs().GetMax(); - aggrStat["Count"] = asyncAggr.GetFirstMessageMs().GetCnt(); + auto firstMessageMs = asyncAggr.GetFirstMessageMs().GetMin(); + auto lastMessageMs = asyncAggr.GetLastMessageMs().GetMax(); + if (firstMessageMs && lastMessageMs > firstMessageMs) { + auto& aggrStat = node.InsertValue("ActiveMessageMs", NJson::JSON_MAP); + aggrStat["Min"] = firstMessageMs; + aggrStat["Max"] = lastMessageMs; + aggrStat["Count"] = asyncAggr.GetFirstMessageMs().GetCnt(); + } } if (asyncAggr.HasPauseMessageMs() && asyncAggr.HasResumeMessageMs()) { - auto& aggrStat = node.InsertValue("WaitMessageMs", NJson::JSON_MAP); - aggrStat["Min"] = asyncAggr.GetPauseMessageMs().GetMin(); - aggrStat["Max"] = asyncAggr.GetResumeMessageMs().GetMax(); - aggrStat["Count"] = asyncAggr.GetPauseMessageMs().GetCnt(); + auto firstMessageMs = asyncAggr.GetPauseMessageMs().GetMin(); + auto lastMessageMs = asyncAggr.GetResumeMessageMs().GetMax(); + if (firstMessageMs && lastMessageMs > firstMessageMs) { + auto& aggrStat = node.InsertValue("WaitMessageMs", NJson::JSON_MAP); + aggrStat["Min"] = firstMessageMs; + aggrStat["Max"] = lastMessageMs; + aggrStat["Count"] = asyncAggr.GetPauseMessageMs().GetCnt(); + } } } @@ -1878,10 +1886,6 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD SetNonZero(node, "EgressRows", taskStats.GetEgressRows()); SetNonZero(node, "EgressBytes", taskStats.GetEgressBytes()); - // equals to max if there was no first row - if(taskStats.GetFirstRowTimeMs() != std::numeric_limits::max()) { - SetNonZero(node, "FirstRowTimeMs", taskStats.GetFirstRowTimeMs()); // need to be reviewed - } SetNonZero(node, "StartTimeMs", taskStats.GetStartTimeMs()); // need to be reviewed SetNonZero(node, "FinishTimeMs", taskStats.GetFinishTimeMs()); // need to be reviewed @@ -1940,8 +1944,17 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD stats["Tasks"] = (*stat)->GetTotalTasksCount(); - stats["TotalDurationMs"] = (*stat)->GetDurationUs() / 1000; + stats["StageDurationUs"] = (*stat)->GetStageDurationUs(); + if ((*stat)->HasDurationUs()) { + FillAggrStat(stats, (*stat)->GetDurationUs(), "DurationUs"); + } + if ((*stat)->HasWaitInputTimeUs()) { + FillAggrStat(stats, (*stat)->GetWaitInputTimeUs(), "WaitInputTimeUs"); + } + if ((*stat)->HasWaitOutputTimeUs()) { + FillAggrStat(stats, (*stat)->GetWaitOutputTimeUs(), "WaitOutputTimeUs"); + } if ((*stat)->HasInputRows()) { FillAggrStat(stats, (*stat)->GetInputRows(), "InputRows"); } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index e5880760d2b5..afea078bf101 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -2020,6 +2020,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ui64 ingressBytes = 0; ui64 ingressRows = 0; + auto startTimeMs = protoTask->GetStartTimeMs(); if (RuntimeSettings.CollectFull()) { // in full/profile mode enumerate existing protos @@ -2033,6 +2034,12 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ingressBytes += ingressStats.Bytes; // ingress rows are usually not reported, so we count rows in task runner input ingressRows += ingressStats.Rows ? ingressStats.Rows : taskStats->Sources.at(inputIndex)->GetPopStats().Rows; + if (ingressStats.FirstMessageTs) { + auto firstMessageMs = ingressStats.FirstMessageTs.MilliSeconds(); + if (!startTimeMs || startTimeMs > firstMessageMs) { + startTimeMs = firstMessageMs; + } + } } } } else { @@ -2043,13 +2050,18 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped // ingress rows are usually not reported, so we count rows in task runner input ingressRows += ingressStats.Rows ? ingressStats.Rows : taskStats->Sources.at(inputIndex)->GetPopStats().Rows; } - }; + } + if (!startTimeMs) { + startTimeMs = taskStats->StartTs.MilliSeconds(); + } + protoTask->SetStartTimeMs(startTimeMs); protoTask->SetIngressBytes(ingressBytes); protoTask->SetIngressRows(ingressRows); ui64 egressBytes = 0; ui64 egressRows = 0; + auto finishTimeMs = protoTask->GetFinishTimeMs(); for (auto& [outputIndex, sinkInfo] : SinksMap) { if (auto* sink = GetSink(outputIndex, sinkInfo)) { @@ -2065,6 +2077,12 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped FillAsyncStats(*protoSink.MutableEgress(), egressStats); protoSink.SetMaxMemoryUsage(popStats.MaxMemoryUsage); protoSink.SetErrorsCount(sinkInfo.IssuesBuffer.GetAllAddedIssuesCount()); + if (egressStats.LastMessageTs) { + auto lastMessageMs = egressStats.LastMessageTs.MilliSeconds(); + if (!finishTimeMs || finishTimeMs > lastMessageMs) { + finishTimeMs = lastMessageMs; + } + } } egressBytes += egressStats.Bytes; // egress rows are usually not reported, so we count rows in task runner output @@ -2073,9 +2091,15 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } } + protoTask->SetFinishTimeMs(finishTimeMs); protoTask->SetEgressBytes(egressBytes); protoTask->SetEgressRows(egressRows); + if (startTimeMs && finishTimeMs > startTimeMs) { + // we may loose precision here a little bit ... rework sometimes + dst->SetDurationUs((finishTimeMs - startTimeMs) * 1'000); + } + for (auto& [inputIndex, transformInfo] : InputTransformsMap) { auto* transform = GetInputTransform(inputIndex, transformInfo); if (transform && RuntimeSettings.CollectFull()) { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp index 6183605b9135..408b1b51c587 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp @@ -23,6 +23,20 @@ void FillAsyncStats(NDqProto::TDqAsyncBufferStats& proto, TDqAsyncStats stats) { } } +void MergeMinTs(TInstant& current, const TInstant value) { + if (value) { + if (!current || current > value) { + current = value; + } + } +} + +void MergeMaxTs(TInstant& current, const TInstant value) { + if (current < value) { + current = value; + } +} + void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& taskStats, NDqProto::TDqTaskStats* protoTask, TCollectStatsLevel level) { @@ -34,8 +48,8 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& protoTask->SetStageId(stageId); protoTask->SetCpuTimeUs(taskStats.ComputeCpuTime.MicroSeconds() + taskStats.BuildCpuTime.MicroSeconds()); - protoTask->SetFinishTimeMs(taskStats.FinishTs.MilliSeconds()); // to be reviewed - protoTask->SetStartTimeMs(taskStats.StartTs.MilliSeconds()); // to be reviewed + TInstant finishTime = taskStats.FinishTs; + TInstant startTime; if (NActors::TlsActivationContext && NActors::TlsActivationContext->ActorSystem()) { protoTask->SetNodeId(NActors::TlsActivationContext->ActorSystem()->NodeId); @@ -107,18 +121,21 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& protoChannel.SetDeserializationTimeUs(pushStats.DeserializationTime.MicroSeconds()); protoChannel.SetMaxMemoryUsage(pushStats.MaxMemoryUsage); } + MergeMinTs(startTime, pushStats.FirstMessageTs); } break; case TCollectStatsLevel::Profile: for (auto& [channelId, inputChannel] : inputChannels) { - taskPushStats.MergeData(inputChannel->GetPushStats()); + const auto& pushStats = inputChannel->GetPushStats(); + taskPushStats.MergeData(pushStats); auto& protoChannel = *protoTask->AddInputChannels(); protoChannel.SetChannelId(channelId); protoChannel.SetSrcStageId(srcStageId); - FillAsyncStats(*protoChannel.MutablePush(), inputChannel->GetPushStats()); + FillAsyncStats(*protoChannel.MutablePush(), pushStats); FillAsyncStats(*protoChannel.MutablePop(), inputChannel->GetPopStats()); - protoChannel.SetDeserializationTimeUs(inputChannel->GetPushStats().DeserializationTime.MicroSeconds()); - protoChannel.SetMaxMemoryUsage(inputChannel->GetPushStats().MaxMemoryUsage); + protoChannel.SetDeserializationTimeUs(pushStats.DeserializationTime.MicroSeconds()); + protoChannel.SetMaxMemoryUsage(pushStats.MaxMemoryUsage); + MergeMinTs(startTime, pushStats.FirstMessageTs); } break; } @@ -128,15 +145,17 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& protoTask->SetInputBytes(taskPushStats.Bytes); // - // task runner is not aware of ingress/egress stats, fill in in CA + // task runner is not aware of ingress/egress stats, fill it in CA // if (StatsLevelCollectFull(level)) { for (auto& [inputIndex, sources] : taskStats.Sources) { + const auto& pushStats = sources->GetPushStats(); auto& protoSource = *protoTask->AddSources(); protoSource.SetInputIndex(inputIndex); - FillAsyncStats(*protoSource.MutablePush(), sources->GetPushStats()); + FillAsyncStats(*protoSource.MutablePush(), pushStats); FillAsyncStats(*protoSource.MutablePop(), sources->GetPopStats()); - protoSource.SetMaxMemoryUsage(sources->GetPushStats().MaxMemoryUsage); + protoSource.SetMaxMemoryUsage(pushStats.MaxMemoryUsage); + MergeMinTs(startTime, pushStats.FirstMessageTs); } } @@ -197,25 +216,28 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& protoChannel.SetSpilledRows(popStats.SpilledRows); protoChannel.SetSpilledBlobs(popStats.SpilledBlobs); } + MergeMaxTs(finishTime, popStats.LastMessageTs); } break; case TCollectStatsLevel::Profile: for (auto& [channelId, outputChannel] : outputChannels) { - taskPopStats.MergeData(outputChannel->GetPopStats()); + const auto& popStats = outputChannel->GetPopStats(); + taskPopStats.MergeData(popStats); if (dstStageId == 0) { - resultStats.MergeData(outputChannel->GetPopStats()); + resultStats.MergeData(popStats); } - auto& protoChannel = *protoTask->AddOutputChannels(); + auto& protoChannel = *protoTask->AddOutputChannels(); protoChannel.SetChannelId(channelId); protoChannel.SetDstStageId(dstStageId); FillAsyncStats(*protoChannel.MutablePush(), outputChannel->GetPushStats()); - FillAsyncStats(*protoChannel.MutablePop(), outputChannel->GetPopStats()); - protoChannel.SetMaxMemoryUsage(outputChannel->GetPopStats().MaxMemoryUsage); - protoChannel.SetMaxRowsInMemory(outputChannel->GetPopStats().MaxRowsInMemory); - protoChannel.SetSerializationTimeUs(outputChannel->GetPopStats().SerializationTime.MicroSeconds()); - protoChannel.SetSpilledBytes(outputChannel->GetPopStats().SpilledBytes); - protoChannel.SetSpilledRows(outputChannel->GetPopStats().SpilledRows); - protoChannel.SetSpilledBlobs(outputChannel->GetPopStats().SpilledBlobs); + FillAsyncStats(*protoChannel.MutablePop(), popStats); + protoChannel.SetMaxMemoryUsage(popStats.MaxMemoryUsage); + protoChannel.SetMaxRowsInMemory(popStats.MaxRowsInMemory); + protoChannel.SetSerializationTimeUs(popStats.SerializationTime.MicroSeconds()); + protoChannel.SetSpilledBytes(popStats.SpilledBytes); + protoChannel.SetSpilledRows(popStats.SpilledRows); + protoChannel.SetSpilledBlobs(popStats.SpilledBlobs); + MergeMaxTs(finishTime, popStats.LastMessageTs); } break; } @@ -225,6 +247,9 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& protoTask->SetOutputBytes(taskPopStats.Bytes); protoTask->SetResultRows(resultStats.Rows); protoTask->SetResultBytes(resultStats.Bytes); + + protoTask->SetFinishTimeMs(finishTime.MilliSeconds()); + protoTask->SetStartTimeMs(startTime.MilliSeconds()); } } // namespace NDq diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index 5a20d775d636..41f674af2d1d 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -208,10 +208,14 @@ message TDqTaskStats { google.protobuf.Any Extra = 200; - // to be reviewed + // BASIC: StartTs from TR since we don't have better times + // FULL+: min(FirstMessageMs) from all Inputs uint64 StartTimeMs = 158; - uint64 FinishTimeMs = 5; // task finish time, timestamp in millis - uint64 FirstRowTimeMs = 4; // first row time, timestamp in millis + + // always FinishTs from TR + uint64 FinishTimeMs = 5; + + reserved 4; // was FirstRowTimeMs reserved 104; uint64 WaitInputTimeUs = 111; // wait input wall time (any input: channels, source, ...) uint64 WaitOutputTimeUs = 105; // wait output wall time (any output: channels, sinks, ...) @@ -223,7 +227,7 @@ message TDqTaskStats { message TDqComputeActorStats { // basic stats uint64 CpuTimeUs = 1; // total cpu time: tasks cpu time + self cpu time - uint64 DurationUs = 2; // compute actor duration, wall time (from FirstRowTime to FinishTime) + uint64 DurationUs = 2; // task.FinishTimeMs - task.StartTimeMs repeated TDqTaskStats Tasks = 3; // in the BASIC_MODE only basic fields are used uint64 MaxMemoryUsage = 4; @@ -312,12 +316,21 @@ message TDqStageStats { TDqStatsAggr EgressBytes = 30; TDqStatsAggr EgressRows = 31; - TDqStatsMinMax FirstRowTimeMs = 13; - TDqStatsMinMax FinishTimeMs = 14; - TDqStatsMinMax StartTimeMs = 21; - uint64 DurationUs = 15; // microseconds from min(task_first_row_time) to max(task_finish_time) + reserved 13; // FirstRowTimeMs + reserved 14; // FinishTimeMs + reserved 21; // StartTimeMs + uint64 StageDurationUs = 15; // FinishTimeMs.Max - StartTimeMs.Min + + // Sum has no meaning for times and should not be rendered + // important metric is Avg = Sum/Cnt + TDqStatsAggr FinishTimeMs = 32; + TDqStatsAggr StartTimeMs = 33; + TDqStatsAggr DurationUs = 34; // just aggregate over task.DurationUs + + TDqStatsAggr WaitInputTimeUs = 35; + TDqStatsAggr WaitOutputTimeUs = 36; - repeated TDqTableAggrStats Tables = 16; // is it required? + repeated TDqTableAggrStats Tables = 16; repeated TDqComputeActorStats ComputeActors = 17; // more detailed stats oneof LlvmOptions { diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h index 19701830b0b0..f4f3a94c5c9b 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h +++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h @@ -368,8 +368,7 @@ class TTaskControllerImpl: public NActors::TActor { ADD_COUNTER(CpuTimeUs) ADD_COUNTER(ComputeCpuTimeUs) ADD_COUNTER(SourceCpuTimeUs) - // ADD_COUNTER(FirstRowTimeMs) - // ADD_COUNTER(FinishTimeMs) + ADD_COUNTER(IngressRows) ADD_COUNTER(IngressBytes) ADD_COUNTER(EgressRows) @@ -380,8 +379,9 @@ class TTaskControllerImpl: public NActors::TActor { ADD_COUNTER(OutputBytes) ADD_COUNTER(ResultRows) ADD_COUNTER(ResultBytes) - // ADD_COUNTER(StartTimeMs) + ADD_COUNTER(StartTimeMs) + ADD_COUNTER(FinishTimeMs) ADD_COUNTER(WaitInputTimeUs) ADD_COUNTER(WaitOutputTimeUs) @@ -392,6 +392,10 @@ class TTaskControllerImpl: public NActors::TActor { TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "MkqlMaxMemoryUsage"), v); } + if (auto v = x.GetDurationUs()) { + TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "DurationUs"), v); + } + for (const auto& stat : s.GetMkqlStats()) { TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, stat.GetName()), stat.GetValue()); }