Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore empty metrics in in-progress stats. Try-catch in-progress stats #633

Merged
merged 1 commit into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 33 additions & 29 deletions ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,42 +155,46 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
}

void ReportPublicCounters(const Ydb::TableStats::QueryStats& stats) {
auto stat = GetPublicStat(GetV1StatFromV2Plan(stats.query_plan()));
auto publicCounters = GetPublicCounters();
try {
auto stat = GetPublicStat(GetV1StatFromV2Plan(stats.query_plan()));
auto publicCounters = GetPublicCounters();

if (stat.MemoryUsageBytes) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.memory_usage_bytes");
counter = *stat.MemoryUsageBytes;
}
if (stat.MemoryUsageBytes) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.memory_usage_bytes");
counter = *stat.MemoryUsageBytes;
}

if (stat.CpuUsageUs) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.cpu_usage_us", true);
counter = *stat.CpuUsageUs;
}
if (stat.CpuUsageUs) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.cpu_usage_us", true);
counter = *stat.CpuUsageUs;
}

if (stat.InputBytes) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.input_bytes", true);
counter = *stat.InputBytes;
}
if (stat.InputBytes) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.input_bytes", true);
counter = *stat.InputBytes;
}

if (stat.OutputBytes) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.output_bytes", true);
counter = *stat.OutputBytes;
}
if (stat.OutputBytes) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.output_bytes", true);
counter = *stat.OutputBytes;
}

if (stat.SourceInputRecords) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.source_input_records", true);
counter = *stat.SourceInputRecords;
}
if (stat.SourceInputRecords) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.source_input_records", true);
counter = *stat.SourceInputRecords;
}

if (stat.SinkOutputRecords) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.sink_output_records", true);
counter = *stat.SinkOutputRecords;
}
if (stat.SinkOutputRecords) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.sink_output_records", true);
counter = *stat.SinkOutputRecords;
}

if (stat.RunningTasks) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.running_tasks");
counter = *stat.RunningTasks;
if (stat.RunningTasks) {
auto& counter = *publicCounters->GetNamedCounter("name", "query.running_tasks");
counter = *stat.RunningTasks;
}
} catch(const NJson::TJsonException& ex) {
LOG_E("Error statistics conversion: " << ex.what());
}
}

Expand Down
160 changes: 88 additions & 72 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
EgressRows.resize(taskCount);
EgressBytes.resize(taskCount);

FirstRowTimeMs.resize(taskCount);
FinishTimeMs.resize(taskCount);
StartTimeMs.resize(taskCount);
DurationUs.resize(taskCount);
WaitInputTimeUs.resize(taskCount);
WaitOutputTimeUs.resize(taskCount);

for (auto& p : Ingress) p.second.Resize(taskCount);
for (auto& p : Egress) p.second.Resize(taskCount);
Expand All @@ -64,26 +66,32 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
MaxMemoryUsage.resize(taskCount);
}

void SetNonZero(ui64& target, ui64 source) {
if (source) {
target = source;
}
}

void TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) {
aggrAsyncStats.Bytes[index] = asyncStats.GetBytes();
aggrAsyncStats.Rows[index] = asyncStats.GetRows();
aggrAsyncStats.Chunks[index] = asyncStats.GetChunks();
aggrAsyncStats.Splits[index] = asyncStats.GetSplits();
SetNonZero(aggrAsyncStats.Bytes[index], asyncStats.GetBytes());
SetNonZero(aggrAsyncStats.Rows[index], asyncStats.GetRows());
SetNonZero(aggrAsyncStats.Chunks[index], asyncStats.GetChunks());
SetNonZero(aggrAsyncStats.Splits[index], asyncStats.GetSplits());

auto firstMessageMs = asyncStats.GetFirstMessageMs();
aggrAsyncStats.FirstMessageMs[index] = firstMessageMs;
aggrAsyncStats.PauseMessageMs[index] = asyncStats.GetPauseMessageMs();
aggrAsyncStats.ResumeMessageMs[index] = asyncStats.GetResumeMessageMs();
SetNonZero(aggrAsyncStats.FirstMessageMs[index], firstMessageMs);
SetNonZero(aggrAsyncStats.PauseMessageMs[index], asyncStats.GetPauseMessageMs());
SetNonZero(aggrAsyncStats.ResumeMessageMs[index], asyncStats.GetResumeMessageMs());
auto lastMessageMs = asyncStats.GetLastMessageMs();
aggrAsyncStats.LastMessageMs[index] = lastMessageMs;
aggrAsyncStats.WaitTimeUs[index] = asyncStats.GetWaitTimeUs();
aggrAsyncStats.WaitPeriods[index] = asyncStats.GetWaitPeriods();
SetNonZero(aggrAsyncStats.LastMessageMs[index], lastMessageMs);
SetNonZero(aggrAsyncStats.WaitTimeUs[index], asyncStats.GetWaitTimeUs());
SetNonZero(aggrAsyncStats.WaitPeriods[index], asyncStats.GetWaitPeriods());
if (firstMessageMs && lastMessageMs > firstMessageMs) {
aggrAsyncStats.ActiveTimeUs[index] = lastMessageMs - firstMessageMs;
}
}

void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage) {
void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs) {
auto taskId = taskStats.GetTaskId();
auto it = Task2Index.find(taskId);

Expand All @@ -98,44 +106,48 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
index = it->second;
}

CpuTimeUs[index] = taskStats.GetCpuTimeUs();
SourceCpuTimeUs[index] = taskStats.GetSourceCpuTimeUs();
SetNonZero(CpuTimeUs[index], taskStats.GetCpuTimeUs());
SetNonZero(SourceCpuTimeUs[index], taskStats.GetSourceCpuTimeUs());

InputRows[index] = taskStats.GetInputRows();
InputBytes[index] = taskStats.GetInputBytes();
OutputRows[index] = taskStats.GetOutputRows();
OutputBytes[index] = taskStats.GetOutputBytes();
ResultRows[index] = taskStats.GetResultRows();
ResultBytes[index] = taskStats.GetResultBytes();
IngressRows[index] = taskStats.GetIngressRows();
IngressBytes[index] = taskStats.GetIngressBytes();
EgressRows[index] = taskStats.GetEgressRows();
EgressBytes[index] = taskStats.GetEgressBytes();
SetNonZero(InputRows[index], taskStats.GetInputRows());
SetNonZero(InputBytes[index], taskStats.GetInputBytes());
SetNonZero(OutputRows[index], taskStats.GetOutputRows());
SetNonZero(OutputBytes[index], taskStats.GetOutputBytes());
SetNonZero(ResultRows[index], taskStats.GetResultRows());
SetNonZero(ResultBytes[index], taskStats.GetResultBytes());
SetNonZero(IngressRows[index], taskStats.GetIngressRows());
SetNonZero(IngressBytes[index], taskStats.GetIngressBytes());
SetNonZero(EgressRows[index], taskStats.GetEgressRows());
SetNonZero(EgressBytes[index], taskStats.GetEgressBytes());

StartTimeMs[index] = taskStats.GetStartTimeMs(); // to be reviewed
FirstRowTimeMs[index] = taskStats.GetFirstRowTimeMs(); // to be reviewed
FinishTimeMs[index] = taskStats.GetFinishTimeMs(); // to be reviewed
SetNonZero(StartTimeMs[index], taskStats.GetStartTimeMs());
SetNonZero(FinishTimeMs[index], taskStats.GetFinishTimeMs());
SetNonZero(DurationUs[index], durationUs);
SetNonZero(WaitInputTimeUs[index], taskStats.GetWaitInputTimeUs());
SetNonZero(WaitOutputTimeUs[index], taskStats.GetWaitOutputTimeUs());

for (auto& tableStat : taskStats.GetTables()) {
auto tablePath = tableStat.GetTablePath();
auto [it, inserted] = Tables.try_emplace(tablePath, taskCount);
auto& aggrTableStats = it->second;
aggrTableStats.ReadRows[index] = tableStat.GetReadRows();
aggrTableStats.ReadBytes[index] = tableStat.GetReadBytes();
aggrTableStats.WriteRows[index] = tableStat.GetWriteRows();
aggrTableStats.WriteBytes[index] = tableStat.GetWriteBytes();
aggrTableStats.EraseRows[index] = tableStat.GetEraseRows();
aggrTableStats.EraseBytes[index] = tableStat.GetEraseBytes();
aggrTableStats.AffectedPartitions[index] = tableStat.GetAffectedPartitions();
SetNonZero(aggrTableStats.ReadRows[index], tableStat.GetReadRows());
SetNonZero(aggrTableStats.ReadBytes[index], tableStat.GetReadBytes());
SetNonZero(aggrTableStats.WriteRows[index], tableStat.GetWriteRows());
SetNonZero(aggrTableStats.WriteBytes[index], tableStat.GetWriteBytes());
SetNonZero(aggrTableStats.EraseRows[index], tableStat.GetEraseRows());
SetNonZero(aggrTableStats.EraseBytes[index], tableStat.GetEraseBytes());
SetNonZero(aggrTableStats.AffectedPartitions[index], tableStat.GetAffectedPartitions());
}

for (auto& sourceStat : taskStats.GetSources()) {
auto ingressName = sourceStat.GetIngressName();
auto [it, inserted] = Ingress.try_emplace(ingressName, taskCount);
auto& asyncBufferStats = it->second;
UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress());
UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush());
UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop());
if (ingressName) {
auto [it, inserted] = Ingress.try_emplace(ingressName, taskCount);
auto& asyncBufferStats = it->second;
UpdateAsyncStats(index, asyncBufferStats.Ingress, sourceStat.GetIngress());
UpdateAsyncStats(index, asyncBufferStats.Push, sourceStat.GetPush());
UpdateAsyncStats(index, asyncBufferStats.Pop, sourceStat.GetPop());
}
}

for (auto& inputChannelStat : taskStats.GetInputChannels()) {
Expand All @@ -156,14 +168,16 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS

for (auto& sinkStat : taskStats.GetSinks()) {
auto egressName = sinkStat.GetEgressName();
auto [it, inserted] = Egress.try_emplace(egressName, taskCount);
auto& asyncBufferStats = it->second;
UpdateAsyncStats(index, asyncBufferStats.Push, sinkStat.GetPush());
UpdateAsyncStats(index, asyncBufferStats.Pop, sinkStat.GetPop());
UpdateAsyncStats(index, asyncBufferStats.Ingress, sinkStat.GetEgress());
if (egressName) {
auto [it, inserted] = Egress.try_emplace(egressName, taskCount);
auto& asyncBufferStats = it->second;
UpdateAsyncStats(index, asyncBufferStats.Push, sinkStat.GetPush());
UpdateAsyncStats(index, asyncBufferStats.Pop, sinkStat.GetPop());
UpdateAsyncStats(index, asyncBufferStats.Ingress, sinkStat.GetEgress());
}
}

MaxMemoryUsage[index] = maxMemoryUsage;
SetNonZero(MaxMemoryUsage[index], maxMemoryUsage);
}

namespace {
Expand Down Expand Up @@ -235,17 +249,6 @@ void UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDq
}
}

void UpdateMinMax(NDqProto::TDqStatsMinMax* minMax, ui64 value) noexcept {
if (value) {
if (minMax->GetMin() == 0) {
minMax->SetMin(value);
} else {
minMax->SetMin(std::min(minMax->GetMin(), value));
}
minMax->SetMax(std::max(minMax->GetMax(), value));
}
}

NDqProto::TDqStageStats* GetOrCreateStageStats(const NYql::NDq::TStageId& stageId,
const TKqpTasksGraph& tasksGraph, NDqProto::TDqExecutionStats& execStats)
{
Expand Down Expand Up @@ -324,6 +327,16 @@ bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode) {
return statsMode >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE;
}

void TQueryExecutionStats::FillStageDurationUs(NYql::NDqProto::TDqStageStats& stats) {
if (stats.HasStartTimeMs() && stats.HasFinishTimeMs()) {
auto startTimeMs = stats.GetStartTimeMs().GetMin();
auto finishTimeMs = stats.GetFinishTimeMs().GetMin();
if (startTimeMs && finishTimeMs > startTimeMs) {
stats.SetStageDurationUs((finishTimeMs - startTimeMs) * 1'000);
}
}
}

void TQueryExecutionStats::AddComputeActorFullStatsByTask(
const NYql::NDqProto::TDqTaskStats& task,
const NYql::NDqProto::TDqComputeActorStats& stats
Expand All @@ -345,11 +358,12 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask(
UpdateAggr(stageStats->MutableEgressRows(), task.GetEgressRows());
UpdateAggr(stageStats->MutableEgressBytes(), task.GetEgressBytes());

UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed
UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed
UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed

stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000);
UpdateAggr(stageStats->MutableStartTimeMs(), task.GetStartTimeMs());
UpdateAggr(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs());
UpdateAggr(stageStats->MutableDurationUs(), stats.GetDurationUs());
UpdateAggr(stageStats->MutableWaitInputTimeUs(), task.GetWaitInputTimeUs());
UpdateAggr(stageStats->MutableWaitOutputTimeUs(), task.GetWaitOutputTimeUs());
FillStageDurationUs(*stageStats);

for (auto& sourcesStat : task.GetSources()) {
UpdateAsyncAggr(*(*stageStats->MutableIngress())[sourcesStat.GetIngressName()].MutableIngress(), sourcesStat.GetIngress());
Expand Down Expand Up @@ -476,11 +490,12 @@ void TQueryExecutionStats::AddDatashardFullStatsByTask(
UpdateAggr(stageStats->MutableOutputRows(), task.GetOutputRows());
UpdateAggr(stageStats->MutableOutputBytes(), task.GetOutputBytes());

UpdateMinMax(stageStats->MutableStartTimeMs(), task.GetStartTimeMs()); // to be reviewed
UpdateMinMax(stageStats->MutableFirstRowTimeMs(), task.GetFirstRowTimeMs()); // to be reviewed
UpdateMinMax(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs()); // to be reviewed

stageStats->SetDurationUs((stageStats->GetFinishTimeMs().GetMax() - stageStats->GetStartTimeMs().GetMin()) * 1'000);
UpdateAggr(stageStats->MutableStartTimeMs(), task.GetStartTimeMs());
UpdateAggr(stageStats->MutableFinishTimeMs(), task.GetFinishTimeMs());
// UpdateAggr(stageStats->MutableDurationUs(), ??? );
UpdateAggr(stageStats->MutableWaitInputTimeUs(), task.GetWaitInputTimeUs());
UpdateAggr(stageStats->MutableWaitOutputTimeUs(), task.GetWaitOutputTimeUs());
FillStageDurationUs(*stageStats);

for (auto& tableStats: task.GetTables()) {
auto* tableAggrStats = GetOrCreateTableAggrStats(stageStats, tableStats.GetTablePath());
Expand Down Expand Up @@ -598,7 +613,7 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
if (inserted) {
it->second.StageId = TasksGraph->GetTask(taskStats.GetTaskId()).StageId;
}
it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage());
it->second.UpdateStats(taskStats, stats.GetMaxMemoryUsage(), stats.GetDurationUs());
}

void ExportAggStats(std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {
Expand Down Expand Up @@ -701,11 +716,12 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
ExportAggStats(p.second.EgressRows, *stageStats.MutableEgressRows());
ExportAggStats(p.second.EgressBytes, *stageStats.MutableEgressBytes());

ExportAggStats(p.second.StartTimeMs, *stageStats.MutableStartTimeMs()); // to be reviewed
ExportAggStats(p.second.FirstRowTimeMs, *stageStats.MutableFirstRowTimeMs()); // to be reviewed
ExportAggStats(p.second.FinishTimeMs, *stageStats.MutableFinishTimeMs()); // to be reviewed

stageStats.SetDurationUs((stageStats.GetFinishTimeMs().GetMax() - stageStats.GetStartTimeMs().GetMin()) * 1'000);
ExportAggStats(p.second.StartTimeMs, *stageStats.MutableStartTimeMs());
ExportAggStats(p.second.FinishTimeMs, *stageStats.MutableFinishTimeMs());
ExportAggStats(p.second.DurationUs, *stageStats.MutableDurationUs());
ExportAggStats(p.second.WaitInputTimeUs, *stageStats.MutableWaitInputTimeUs());
ExportAggStats(p.second.WaitOutputTimeUs, *stageStats.MutableWaitOutputTimeUs());
FillStageDurationUs(stageStats);

for (auto& p2 : p.second.Tables) {
auto& table = *stageStats.AddTables();
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,11 @@ struct TStageExecutionStats {
std::vector<ui64> EgressRows;
std::vector<ui64> EgressBytes;

std::vector<ui64> FirstRowTimeMs;
std::vector<ui64> FinishTimeMs;
std::vector<ui64> StartTimeMs;
std::vector<ui64> DurationUs;
std::vector<ui64> WaitInputTimeUs;
std::vector<ui64> WaitOutputTimeUs;

std::map<TString, TTableStats> Tables;
std::map<TString, TAsyncBufferStats> Ingress;
Expand All @@ -98,7 +100,7 @@ struct TStageExecutionStats {

void Resize(ui32 taskCount);
void UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats);
void UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage);
void UpdateStats(const NYql::NDqProto::TDqTaskStats& taskStats, ui64 maxMemoryUsage, ui64 durationUs);
};

struct TQueryExecutionStats {
Expand Down Expand Up @@ -162,6 +164,7 @@ struct TQueryExecutionStats {

void UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats);
void ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats);
void FillStageDurationUs(NYql::NDqProto::TDqStageStats& stats);

void Finish();

Expand Down
Loading
Loading