Skip to content

Commit

Permalink
Collect Sink Stats (#12507)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Dec 12, 2024
1 parent be08b3f commit ec5a012
Show file tree
Hide file tree
Showing 9 changed files with 612 additions and 16 deletions.
1 change: 1 addition & 0 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.ReadHuge
ydb/core/keyvalue/ut_trace TKeyValueTracingTest.ReadSmall
ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteHuge
ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteSmall
ydb/core/kqp/ut/cost KqpCost.OlapWriteRow
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.Select
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertEvWrite
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertViaLegacyScripting-Streaming
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/common/buffer/events.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
#include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h>
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
#include <yql/essentials/public/issue/yql_issue.h>

Expand Down Expand Up @@ -28,6 +29,10 @@ struct TEvFlush : public TEventLocal<TEvFlush, TKqpBufferWriterEvents::EvFlush>
};

struct TEvResult : public TEventLocal<TEvResult, TKqpBufferWriterEvents::EvResult> {
TEvResult() = default;
TEvResult(NYql::NDqProto::TDqTaskStats&& stats) : Stats(std::move(stats)) {}

std::optional<NYql::NDqProto::TDqTaskStats> Stats;
};

struct TEvError : public TEventLocal<TEvError, TKqpBufferWriterEvents::EvError> {
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

void HandleFinalize(TEvKqpBuffer::TEvResult::TPtr&) {
void HandleFinalize(TEvKqpBuffer::TEvResult::TPtr& ev) {
if (ev->Get()->Stats) {
if (Stats) {
Stats->AddBufferStats(std::move(*ev->Get()->Stats));
}
}
MakeResponseAndPassAway();
}

Expand Down
20 changes: 20 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,26 @@ void TQueryExecutionStats::AddDatashardStats(NKikimrQueryStats::TTxStats&& txSta
}
}

void TQueryExecutionStats::AddBufferStats(NYql::NDqProto::TDqTaskStats&& taskStats) {
for (auto& table : taskStats.GetTables()) {
NYql::NDqProto::TDqTableStats* tableAggr = nullptr;
if (auto it = TableStats.find(table.GetTablePath()); it != TableStats.end()) {
tableAggr = it->second;
} else {
tableAggr = Result->AddTables();
tableAggr->SetTablePath(table.GetTablePath());
TableStats.emplace(table.GetTablePath(), tableAggr);
}

tableAggr->SetReadRows(tableAggr->GetReadRows() + table.GetReadRows());
tableAggr->SetReadBytes(tableAggr->GetReadBytes() + table.GetReadBytes());
tableAggr->SetWriteRows(tableAggr->GetWriteRows() + table.GetWriteRows());
tableAggr->SetWriteBytes(tableAggr->GetWriteBytes() + table.GetWriteBytes());
tableAggr->SetEraseRows(tableAggr->GetEraseRows() + table.GetEraseRows());
tableAggr->SetAffectedPartitions(table.GetAffectedPartitions());
}
}

void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats) {
Y_ASSERT(stats.GetTasks().size() == 1);
const NYql::NDqProto::TDqTaskStats& taskStats = stats.GetTasks(0);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ struct TQueryExecutionStats {
TDuration collectLongTaskStatsTimeout = TDuration::Max()
);
void AddDatashardStats(NKikimrQueryStats::TTxStats&& txStats);
void AddBufferStats(NYql::NDqProto::TDqTaskStats&& taskStats);

void UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TDqComputeActorStats& stats);
void ExportExecStats(NYql::NDqProto::TDqExecutionStats& stats);
Expand Down
89 changes: 85 additions & 4 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
#include <ydb/core/protos/kqp_physical.pb.h>
#include <ydb/core/protos/query_stats.pb.h>
#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/tx/data_events/events.h>
#include <ydb/core/tx/data_events/payload_helper.h>
Expand Down Expand Up @@ -127,6 +128,17 @@ struct IKqpTableWriterCallbacks {
virtual void OnError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues) = 0;
};

struct TKqpTableWriterStatistics {
ui64 ReadRows = 0;
ui64 ReadBytes = 0;
ui64 WriteRows = 0;
ui64 WriteBytes = 0;
ui64 EraseRows = 0;
ui64 EraseBytes = 0;

THashSet<ui64> AffectedPartitions;
};


class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
using TBase = TActorBootstrapped<TKqpTableWriteActor>;
Expand Down Expand Up @@ -492,8 +504,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
return builder;
}()
<< ", Cookie=" << ev->Cookie);


UpdateStats(ev->Get()->Record.GetTxStats());

switch (ev->Get()->GetStatus()) {
case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: {
Expand Down Expand Up @@ -955,6 +966,56 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
Send(this->SelfId(), new TEvPrivate::TEvTerminate{});
}

void UpdateStats(const NKikimrQueryStats::TTxStats& txStats) {
for (const auto& tableAccessStats : txStats.GetTableAccessStats()) {
YQL_ENSURE(tableAccessStats.GetTableInfo().GetPathId() == TableId.PathId.LocalPathId);
Stats.ReadRows += tableAccessStats.GetSelectRow().GetRows();
Stats.ReadRows += tableAccessStats.GetSelectRange().GetRows();
Stats.ReadBytes += tableAccessStats.GetSelectRow().GetBytes();
Stats.ReadBytes += tableAccessStats.GetSelectRange().GetBytes();
Stats.WriteRows += tableAccessStats.GetUpdateRow().GetRows();
Stats.WriteBytes += tableAccessStats.GetUpdateRow().GetBytes();
Stats.EraseRows += tableAccessStats.GetEraseRow().GetRows();
Stats.EraseBytes += tableAccessStats.GetEraseRow().GetRows();
}

for (const auto& perShardStats : txStats.GetPerShardStats()) {
Stats.AffectedPartitions.insert(perShardStats.GetShardId());
}
}

void FillStats(NYql::NDqProto::TDqTaskStats* stats) {
NYql::NDqProto::TDqTableStats* tableStats = nullptr;
for (size_t i = 0; i < stats->TablesSize(); ++i) {
auto* table = stats->MutableTables(i);
if (table->GetTablePath() == TablePath) {
tableStats = table;
}
}
if (!tableStats) {
tableStats = stats->AddTables();
tableStats->SetTablePath(TablePath);
}

tableStats->SetReadRows(tableStats->GetReadRows() + Stats.ReadRows);
tableStats->SetReadBytes(tableStats->GetReadBytes() + Stats.ReadBytes);
tableStats->SetWriteRows(tableStats->GetWriteRows() + Stats.WriteRows);
tableStats->SetWriteBytes(tableStats->GetWriteBytes() + Stats.WriteBytes);
tableStats->SetEraseRows(tableStats->GetEraseRows() + Stats.EraseRows);
tableStats->SetEraseBytes(tableStats->GetEraseBytes() + Stats.EraseBytes);

Stats.ReadRows = 0;
Stats.ReadBytes = 0;
Stats.WriteRows = 0;
Stats.WriteBytes = 0;
Stats.EraseRows = 0;
Stats.EraseBytes = 0;

tableStats->SetAffectedPartitions(
tableStats->GetAffectedPartitions() + Stats.AffectedPartitions.size());
Stats.AffectedPartitions.clear();
}

NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false);

TString LogPrefix;
Expand Down Expand Up @@ -987,6 +1048,8 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {

TIntrusivePtr<TKqpCounters> Counters;

TKqpTableWriterStatistics Stats;

NWilson::TSpan TableWriteActorSpan;
NWilson::TSpan TableWriteActorStateSpan;
};
Expand Down Expand Up @@ -1208,6 +1271,12 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
RuntimeError(message, statusCode, subIssues);
}

void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats, bool last, const NYql::NDq::TDqMeteringStats*) override {
if (last && WriteTableActor) {
WriteTableActor->FillStats(stats);
}
}

TString LogPrefix;
const NKikimrKqp::TKqpTableSinkSettings Settings;
TWriteActorSettings MessageSettings;
Expand Down Expand Up @@ -2019,7 +2088,9 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
if (TxManager->ConsumeCommitResult(shardId)) {
CA_LOG_D("Committed");
State = EState::FINISHED;
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{});
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{
BuildStats()
});
ExecuterActorId = {};
Y_ABORT_UNLESS(GetTotalMemory() == 0);
return;
Expand All @@ -2036,7 +2107,9 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
"BufferWriteActorState::Writing", NWilson::EFlags::AUTO_END);
CA_LOG_D("Flushed");
State = EState::WRITING;
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{});
Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{
BuildStats()
});
ExecuterActorId = {};
Y_ABORT_UNLESS(GetTotalMemory() == 0);
}
Expand All @@ -2063,6 +2136,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
PassAway();
}

NYql::NDqProto::TDqTaskStats BuildStats() {
NYql::NDqProto::TDqTaskStats result;
for (const auto& [_, writeInfo] : WriteInfos) {
writeInfo.WriteTableActor->FillStats(&result);
}
return result;
}

private:
TString LogPrefix;
const TActorId SessionActorId;
Expand Down
Loading

0 comments on commit ec5a012

Please sign in to comment.