diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index d9ce0c8f5345..bf0e642eb5d9 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -2243,7 +2243,7 @@ TString AddSimplifiedPlan(const TString& planText, TIntrusivePtr& txPlans, TIntrusivePtr optCtx, const TString commonPlanInfo = "") { +TString SerializeTxPlans(const TVector& txPlans, TIntrusivePtr optCtx, const TString commonPlanInfo = "", const TString& queryStats = "") { NJsonWriter::TBuf writer; writer.SetIndentSpaces(2); @@ -2266,6 +2266,15 @@ TString SerializeTxPlans(const TVector& txPlans, TIntrusivePtr()); + + NJsonWriter::TBuf writer; + writer.BeginObject(); + + if (queryStats.HasCompilation()) { + const auto& compilation = queryStats.GetCompilation(); + + writer.WriteKey("Compilation"); + writer.BeginObject(); + writer.WriteKey("FromCache").WriteBool(compilation.GetFromCache()); + writer.WriteKey("DurationUs").WriteLongLong(compilation.GetDurationUs()); + writer.WriteKey("CpuTimeUs").WriteLongLong(compilation.GetCpuTimeUs()); + writer.EndObject(); + } + + writer.WriteKey("ProcessCpuTimeUs").WriteLongLong(queryStats.GetWorkerCpuTimeUs()); + writer.WriteKey("TotalDurationUs").WriteLongLong(queryStats.GetDurationUs()); + writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs()); + writer.EndObject(); + + return SerializeTxPlans(txPlans, TIntrusivePtr(), "", writer.Str()); } TString SerializeScriptPlan(const TVector& queryPlans) { diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 30b313fe9a7d..bb027f5bfd89 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -114,6 +114,7 @@ class TKqpQueryState : public TNonCopyable { bool IsDocumentApiRestricted_ = false; TInstant StartTime; + TInstant ContinueTime; NYql::TKikimrQueryDeadlines QueryDeadlines; TKqpQueryStats QueryStats; bool KeepSession = false; diff --git a/ydb/core/kqp/session_actor/kqp_query_stats.cpp b/ydb/core/kqp/session_actor/kqp_query_stats.cpp index e26d6b5e7b8f..922b788419ea 100644 --- a/ydb/core/kqp/session_actor/kqp_query_stats.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_stats.cpp @@ -210,6 +210,7 @@ ui64 CalcRequestUnit(const TKqpQueryStats& stats) { NKqpProto::TKqpStatsQuery TKqpQueryStats::ToProto() const { NKqpProto::TKqpStatsQuery result; result.SetDurationUs(DurationUs); + result.SetQueuedTimeUs(QueuedTimeUs); if (Compilation) { result.MutableCompilation()->SetFromCache(Compilation->FromCache); diff --git a/ydb/core/kqp/session_actor/kqp_query_stats.h b/ydb/core/kqp/session_actor/kqp_query_stats.h index f73ce6316f07..9cda3417beb9 100644 --- a/ydb/core/kqp/session_actor/kqp_query_stats.h +++ b/ydb/core/kqp/session_actor/kqp_query_stats.h @@ -8,6 +8,7 @@ namespace NKikimr::NKqp { struct TKqpQueryStats { ui64 DurationUs = 0; + ui64 QueuedTimeUs = 0; std::optional Compilation; ui64 WorkerCpuTimeUs = 0; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index f763fb6b976e..f8e692b52921 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -475,6 +475,7 @@ class TKqpSessionActor : public TActorBootstrapped { void Handle(NWorkload::TEvContinueRequest::TPtr& ev) { YQL_ENSURE(QueryState); + QueryState->ContinueTime = TInstant::Now(); if (ev->Get()->Status == Ydb::StatusIds::UNSUPPORTED) { LOG_T("Failed to place request in resource pool, feature flag is disabled"); @@ -1552,6 +1553,9 @@ class TKqpSessionActor : public TActorBootstrapped { stats->DurationUs = ((TInstant::Now() - QueryState->StartTime).MicroSeconds()); stats->WorkerCpuTimeUs = (QueryState->GetCpuTime().MicroSeconds()); + if (const auto continueTime = QueryState->ContinueTime) { + stats->QueuedTimeUs = (continueTime - QueryState->StartTime).MicroSeconds(); + } if (QueryState->CompileResult) { stats->Compilation.emplace(); stats->Compilation->FromCache = (QueryState->CompileStats.FromCache); diff --git a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp index f0f11628a068..c6b4ef3821c6 100644 --- a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp +++ b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp @@ -54,6 +54,12 @@ class TPoolHandlerActorBase : public TActor { UpdateConfigCounters(poolConfig); } + void CollectRequestLatency(TInstant continueTime) { + if (continueTime) { + RequestsLatencyMs->Collect((TInstant::Now() - continueTime).MilliSeconds()); + } + } + void UpdateConfigCounters(const NResourcePool::TPoolSettings& poolConfig) { InFlightLimit->Set(std::max(poolConfig.ConcurrentQueryLimit, 0)); QueueSizeLimit->Set(std::max(poolConfig.QueueSize, 0)); @@ -106,6 +112,7 @@ class TPoolHandlerActorBase : public TActor { const TActorId WorkerActorId; const TString SessionId; const TInstant StartTime = TInstant::Now(); + TInstant ContinueTime; EState State = EState::Pending; bool Started = false; // after TEvContinueRequest success @@ -267,6 +274,7 @@ class TPoolHandlerActorBase : public TActor { if (status == Ydb::StatusIds::SUCCESS) { LocalInFlight++; request->Started = true; + request->ContinueTime = TInstant::Now(); Counters.LocalInFly->Inc(); Counters.ContinueOk->Inc(); Counters.DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); @@ -387,7 +395,7 @@ class TPoolHandlerActorBase : public TActor { if (status == Ydb::StatusIds::SUCCESS) { Counters.CleanupOk->Inc(); - Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); + Counters.CollectRequestLatency(request->ContinueTime); LOG_D("Reply cleanup success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight); } else { Counters.CleanupError->Inc(); @@ -401,7 +409,7 @@ class TPoolHandlerActorBase : public TActor { this->Send(MakeKqpProxyID(this->SelfId().NodeId()), ev.release()); Counters.Cancelled->Inc(); - Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds()); + Counters.CollectRequestLatency(request->ContinueTime); LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight); } diff --git a/ydb/core/protos/kqp_stats.proto b/ydb/core/protos/kqp_stats.proto index 24e09f2dcd90..df70faff1a13 100644 --- a/ydb/core/protos/kqp_stats.proto +++ b/ydb/core/protos/kqp_stats.proto @@ -73,6 +73,7 @@ message TKqpExecutionExtraStats { message TKqpStatsQuery { // Basic stats uint64 DurationUs = 1; + uint64 QueuedTimeUs = 9; TKqpStatsCompile Compilation = 2; reserved 3; // repeated TKqpStatsExecution Executions = 3;