Skip to content

Commit

Permalink
YQ-3446 add queued time into query stats (ydb-platform#6965)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Jul 30, 2024
1 parent c2fb3fa commit 6c0b232
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 4 deletions.
33 changes: 31 additions & 2 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2243,7 +2243,7 @@ TString AddSimplifiedPlan(const TString& planText, TIntrusivePtr<NOpt::TKqpOptim
return planJson.GetStringRobust();
}

TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext> optCtx, const TString commonPlanInfo = "") {
TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext> optCtx, const TString commonPlanInfo = "", const TString& queryStats = "") {
NJsonWriter::TBuf writer;
writer.SetIndentSpaces(2);

Expand All @@ -2266,6 +2266,15 @@ TString SerializeTxPlans(const TVector<const TString>& txPlans, TIntrusivePtr<NO
writer.BeginObject();
writer.WriteKey("Node Type").WriteString("Query");
writer.WriteKey("PlanNodeType").WriteString("Query");

if (queryStats) {
NJson::TJsonValue queryStatsJson;
NJson::ReadJsonTree(queryStats, &queryStatsJson, true);

writer.WriteKey("Stats");
writer.WriteJsonValue(&queryStatsJson);
}

writer.WriteKey("Plans");
writer.BeginList();

Expand Down Expand Up @@ -2705,7 +2714,27 @@ TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {
txPlans.push_back(txPlan);
}
}
return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>());

NJsonWriter::TBuf writer;
writer.BeginObject();

if (queryStats.HasCompilation()) {
const auto& compilation = queryStats.GetCompilation();

writer.WriteKey("Compilation");
writer.BeginObject();
writer.WriteKey("FromCache").WriteBool(compilation.GetFromCache());
writer.WriteKey("DurationUs").WriteLongLong(compilation.GetDurationUs());
writer.WriteKey("CpuTimeUs").WriteLongLong(compilation.GetCpuTimeUs());
writer.EndObject();
}

writer.WriteKey("ProcessCpuTimeUs").WriteLongLong(queryStats.GetWorkerCpuTimeUs());
writer.WriteKey("TotalDurationUs").WriteLongLong(queryStats.GetDurationUs());
writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs());
writer.EndObject();

return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>(), "", writer.Str());
}

TString SerializeScriptPlan(const TVector<const TString>& queryPlans) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class TKqpQueryState : public TNonCopyable {
bool IsDocumentApiRestricted_ = false;

TInstant StartTime;
TInstant ContinueTime;
NYql::TKikimrQueryDeadlines QueryDeadlines;
TKqpQueryStats QueryStats;
bool KeepSession = false;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_query_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ ui64 CalcRequestUnit(const TKqpQueryStats& stats) {
NKqpProto::TKqpStatsQuery TKqpQueryStats::ToProto() const {
NKqpProto::TKqpStatsQuery result;
result.SetDurationUs(DurationUs);
result.SetQueuedTimeUs(QueuedTimeUs);

if (Compilation) {
result.MutableCompilation()->SetFromCache(Compilation->FromCache);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_query_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace NKikimr::NKqp {

struct TKqpQueryStats {
ui64 DurationUs = 0;
ui64 QueuedTimeUs = 0;
std::optional<TKqpStatsCompile> Compilation;

ui64 WorkerCpuTimeUs = 0;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

void Handle(NWorkload::TEvContinueRequest::TPtr& ev) {
YQL_ENSURE(QueryState);
QueryState->ContinueTime = TInstant::Now();

if (ev->Get()->Status == Ydb::StatusIds::UNSUPPORTED) {
LOG_T("Failed to place request in resource pool, feature flag is disabled");
Expand Down Expand Up @@ -1552,6 +1553,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

stats->DurationUs = ((TInstant::Now() - QueryState->StartTime).MicroSeconds());
stats->WorkerCpuTimeUs = (QueryState->GetCpuTime().MicroSeconds());
if (const auto continueTime = QueryState->ContinueTime) {
stats->QueuedTimeUs = (continueTime - QueryState->StartTime).MicroSeconds();
}
if (QueryState->CompileResult) {
stats->Compilation.emplace();
stats->Compilation->FromCache = (QueryState->CompileStats.FromCache);
Expand Down
12 changes: 10 additions & 2 deletions ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
UpdateConfigCounters(poolConfig);
}

void CollectRequestLatency(TInstant continueTime) {
if (continueTime) {
RequestsLatencyMs->Collect((TInstant::Now() - continueTime).MilliSeconds());
}
}

void UpdateConfigCounters(const NResourcePool::TPoolSettings& poolConfig) {
InFlightLimit->Set(std::max(poolConfig.ConcurrentQueryLimit, 0));
QueueSizeLimit->Set(std::max(poolConfig.QueueSize, 0));
Expand Down Expand Up @@ -106,6 +112,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
const TActorId WorkerActorId;
const TString SessionId;
const TInstant StartTime = TInstant::Now();
TInstant ContinueTime;

EState State = EState::Pending;
bool Started = false; // after TEvContinueRequest success
Expand Down Expand Up @@ -267,6 +274,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
if (status == Ydb::StatusIds::SUCCESS) {
LocalInFlight++;
request->Started = true;
request->ContinueTime = TInstant::Now();
Counters.LocalInFly->Inc();
Counters.ContinueOk->Inc();
Counters.DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
Expand Down Expand Up @@ -387,7 +395,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {

if (status == Ydb::StatusIds::SUCCESS) {
Counters.CleanupOk->Inc();
Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
Counters.CollectRequestLatency(request->ContinueTime);
LOG_D("Reply cleanup success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
} else {
Counters.CleanupError->Inc();
Expand All @@ -401,7 +409,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
this->Send(MakeKqpProxyID(this->SelfId().NodeId()), ev.release());

Counters.Cancelled->Inc();
Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
Counters.CollectRequestLatency(request->ContinueTime);
LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ message TKqpExecutionExtraStats {
message TKqpStatsQuery {
// Basic stats
uint64 DurationUs = 1;
uint64 QueuedTimeUs = 9;
TKqpStatsCompile Compilation = 2;

reserved 3; // repeated TKqpStatsExecution Executions = 3;
Expand Down

0 comments on commit 6c0b232

Please sign in to comment.