Skip to content

Commit

Permalink
Merge 3496c5e into 1091a58
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jul 23, 2024
2 parents 1091a58 + 3496c5e commit eaa23fa
Show file tree
Hide file tree
Showing 19 changed files with 85 additions and 25 deletions.
1 change: 1 addition & 0 deletions ydb/core/grpc_services/rpc_kqp_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ void FillQueryStats(Ydb::TableStats::QueryStats& queryStats, const NKqpProto::TK
queryStats.set_process_cpu_time_us(kqpStats.GetWorkerCpuTimeUs());
queryStats.set_total_cpu_time_us(totalCpuTimeUs);
queryStats.set_total_duration_us(kqpStats.GetDurationUs());
queryStats.set_queued_time_us(kqpStats.GetQueuedTimeUs());
}

void FillQueryStats(Ydb::TableStats::QueryStats& queryStats, const NKikimrKqp::TQueryResponse& kqpResponse) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class TKqpQueryState : public TNonCopyable {
bool IsDocumentApiRestricted_ = false;

TInstant StartTime;
TInstant ContinueTime;
NYql::TKikimrQueryDeadlines QueryDeadlines;
TKqpQueryStats QueryStats;
bool KeepSession = false;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_query_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ ui64 CalcRequestUnit(const TKqpQueryStats& stats) {
NKqpProto::TKqpStatsQuery TKqpQueryStats::ToProto() const {
NKqpProto::TKqpStatsQuery result;
result.SetDurationUs(DurationUs);
result.SetQueuedTimeUs(QueuedTimeUs);

if (Compilation) {
result.MutableCompilation()->SetFromCache(Compilation->FromCache);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/session_actor/kqp_query_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace NKikimr::NKqp {

struct TKqpQueryStats {
ui64 DurationUs = 0;
ui64 QueuedTimeUs = 0;
std::optional<TKqpStatsCompile> Compilation;

ui64 WorkerCpuTimeUs = 0;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

void Handle(NWorkload::TEvContinueRequest::TPtr& ev) {
YQL_ENSURE(QueryState);
QueryState->ContinueTime = TInstant::Now();

if (ev->Get()->Status == Ydb::StatusIds::UNSUPPORTED) {
LOG_T("Failed to place request in resource pool, feature flag is disabled");
Expand Down Expand Up @@ -1552,6 +1553,9 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

stats->DurationUs = ((TInstant::Now() - QueryState->StartTime).MicroSeconds());
stats->WorkerCpuTimeUs = (QueryState->GetCpuTime().MicroSeconds());
if (const auto continueTime = QueryState->ContinueTime) {
stats->QueuedTimeUs = ((TInstant::Now() - continueTime).MicroSeconds());
}
if (QueryState->CompileResult) {
stats->Compilation.emplace();
stats->Compilation->FromCache = (QueryState->CompileStats.FromCache);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,15 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto db = kikimr.GetQueryClient();

TExecuteQuerySettings settings;
settings.StatsMode(EStatsMode::Full);

{ // Existing pool
settings.PoolId("default");

const TString query = "SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0 ORDER BY Key";
auto result = db.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
CheckQueryResult(result);
UNIT_ASSERT_VALUES_UNEQUAL(result.GetStats()->GetQueuedTime(), TDuration::Zero());
}

{ // Not existing pool (check workload manager enabled)
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,16 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) {
auto db = kikimr.GetQueryClient();

TExecuteScriptSettings settings;
settings.StatsMode(EStatsMode::Full);

{ // Existing pool
settings.PoolId("default");

auto scripOp = db.ExecuteScript("SELECT 42", settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scripOp.Status().GetStatus(), EStatus::SUCCESS, scripOp.Status().GetIssues().ToString());
CheckScriptResults(scripOp, WaitScriptExecutionOperation(scripOp.Id(), kikimr.GetDriver()), db);
auto readyOp = WaitScriptExecutionOperation(scripOp.Id(), kikimr.GetDriver());
CheckScriptResults(scripOp, readyOp, db);
UNIT_ASSERT_VALUES_UNEQUAL(readyOp.Metadata().ExecStats.GetQueuedTime(), TDuration::Zero());
}

{ // Not existing pool (check workload manager enabled)
Expand Down
12 changes: 10 additions & 2 deletions ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
UpdateConfigCounters(poolConfig);
}

void CollectRequestLatency(TInstant continueTime) {
if (continueTime) {
RequestsLatencyMs->Collect((TInstant::Now() - continueTime).MilliSeconds());
}
}

void UpdateConfigCounters(const NResourcePool::TPoolSettings& poolConfig) {
InFlightLimit->Set(std::max(poolConfig.ConcurrentQueryLimit, 0));
QueueSizeLimit->Set(std::max(poolConfig.QueueSize, 0));
Expand Down Expand Up @@ -106,6 +112,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
const TActorId WorkerActorId;
const TString SessionId;
const TInstant StartTime = TInstant::Now();
TInstant ContinueTime;

EState State = EState::Pending;
bool Started = false; // after TEvContinueRequest success
Expand Down Expand Up @@ -267,6 +274,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
if (status == Ydb::StatusIds::SUCCESS) {
LocalInFlight++;
request->Started = true;
request->ContinueTime = TInstant::Now();
Counters.LocalInFly->Inc();
Counters.ContinueOk->Inc();
Counters.DelayedTimeMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
Expand Down Expand Up @@ -387,7 +395,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {

if (status == Ydb::StatusIds::SUCCESS) {
Counters.CleanupOk->Inc();
Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
Counters.CollectRequestLatency(request->ContinueTime);
LOG_D("Reply cleanup success to " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
} else {
Counters.CleanupError->Inc();
Expand All @@ -401,7 +409,7 @@ class TPoolHandlerActorBase : public TActor<TDerived> {
this->Send(MakeKqpProxyID(this->SelfId().NodeId()), ev.release());

Counters.Cancelled->Inc();
Counters.RequestsLatencyMs->Collect((TInstant::Now() - request->StartTime).MilliSeconds());
Counters.CollectRequestLatency(request->ContinueTime);
LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup {
request->SetQuery(query);
request->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);
request->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
request->SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL);
request->SetDatabase(Settings_.DomainName_);
request->SetPoolId(settings.PoolId_);

Expand Down Expand Up @@ -567,6 +568,10 @@ const std::vector<Ydb::ResultSet>& TQueryRunnerResult::GetResultSets() const {
return ResultSets;
}

const NKqpProto::TKqpStatsQuery& TQueryRunnerResult::GetQueryStats() const {
return Response.GetResponse().GetQueryStats();
}

//// TQueryRunnerResultAsync

TQueryRunnerResult TQueryRunnerResultAsync::GetResult(TDuration timeout) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct TQueryRunnerResult {

const Ydb::ResultSet& GetResultSet(size_t resultIndex) const;
const std::vector<Ydb::ResultSet>& GetResultSets() const;
const NKqpProto::TKqpStatsQuery& GetQueryStats() const;
};

struct TQueryRunnerResultAsync {
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h>

#include <ydb/core/protos/kqp_stats.pb.h>


namespace NKikimr::NKqp {

Expand Down Expand Up @@ -44,7 +46,9 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) {
.EnableResourcePools(false)
.Create();

TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId("another_pool_id")));
auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId("another_pool_id"));
TSampleQueries::TSelect42::CheckResult(result);
UNIT_ASSERT_VALUES_EQUAL(result.GetQueryStats().GetQueuedTimeUs(), 0);
}

TQueryRunnerResultAsync StartQueueSizeCheckRequests(TIntrusivePtr<IYdbSetup> ydb, const TQueryRunnerSettings& settings) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/kqp_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ message TKqpExecutionExtraStats {
message TKqpStatsQuery {
// Basic stats
uint64 DurationUs = 1;
uint64 QueuedTimeUs = 9;
TKqpStatsCompile Compilation = 2;

reserved 3; // repeated TKqpStatsExecution Executions = 3;
Expand Down
1 change: 1 addition & 0 deletions ydb/public/api/protos/ydb_query_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ message QueryStats {
string query_ast = 5;
uint64 total_duration_us = 6;
uint64 total_cpu_time_us = 7;
uint64 queued_time_us = 8;
}
4 changes: 4 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_query/stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ TDuration TExecStats::GetTotalDuration() const {
return TDuration::MicroSeconds(Impl_->Proto.total_duration_us());
}

TDuration TExecStats::GetQueuedTime() const {
return TDuration::MicroSeconds(Impl_->Proto.queued_time_us());
}

TDuration TExecStats::GetTotalCpuTime() const {
return TDuration::MicroSeconds(Impl_->Proto.total_cpu_time_us());
}
Expand Down
1 change: 1 addition & 0 deletions ydb/public/sdk/cpp/client/ydb_query/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class TExecStats {
TMaybe<TString> GetAst() const;

TDuration GetTotalDuration() const;
TDuration GetQueuedTime() const;
TDuration GetTotalCpuTime() const;

private:
Expand Down
4 changes: 0 additions & 4 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,11 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp
Cout << "..." << colors.Default() << Endl;
}

TInstant startTime = TInstant::Now();
switch (executionCase) {
case TExecutionOptions::EExecutionCase::GenericScript:
if (!runner.ExecuteScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed";
}
Cout << colors.Cyan() << "Script request finished. Time: " << TInstant::Now() - startTime << colors.Default() << Endl;
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl;
if (!runner.FetchScriptResults()) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Fetch script results failed";
Expand All @@ -114,14 +112,12 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, NKqpRun::TKqp
if (!runner.ExecuteQuery(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed";
}
Cout << colors.Cyan() << "Generic request finished. Time: " << TInstant::Now() - startTime << colors.Default() << Endl;
break;

case TExecutionOptions::EExecutionCase::YqlScript:
if (!runner.ExecuteYqlScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed";
}
Cout << colors.Cyan() << "Yql script request finished. Time: " << TInstant::Now() - startTime << colors.Default() << Endl;
break;

case TExecutionOptions::EExecutionCase::AsyncQuery:
Expand Down
24 changes: 22 additions & 2 deletions ydb/tests/tools/kqprun/src/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,19 @@ class TKqpRunner::TImpl {

bool ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, EQueryType queryType) {
StartScriptTraceOpt();
StartTime_ = TInstant::Now();

TString queryTypeStr;
TQueryMeta meta;
TRequestResult status;
switch (queryType) {
case EQueryType::ScriptQuery:
queryTypeStr = "Generic";
status = YdbSetup_.QueryRequest(query, action, traceId, meta, ResultSets_, GetProgressCallback());
break;

case EQueryType::YqlScriptQuery:
queryTypeStr = "Yql script";
status = YdbSetup_.YqlScriptRequest(query, action, traceId, meta, ResultSets_);
break;

Expand All @@ -153,8 +157,9 @@ class TKqpRunner::TImpl {
TYdbSetup::StopTraceOpt();

PrintScriptAst(meta.Ast);

PrintScriptProgress(ExecutionMeta_.Plan);
PrintScriptPlan(meta.Plan);
PrintScriptFinish(meta, queryTypeStr);

if (!status.IsSuccess()) {
Cerr << CerrColors_.Red() << "Failed to execute query, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl;
Expand Down Expand Up @@ -220,6 +225,7 @@ class TKqpRunner::TImpl {

private:
bool WaitScriptExecutionOperation() {
StartTime_ = TInstant::Now();
ExecutionMeta_ = TExecutionMeta();

TDuration getOperationPeriod = TDuration::Seconds(1);
Expand All @@ -245,8 +251,8 @@ class TKqpRunner::TImpl {
}

PrintScriptAst(ExecutionMeta_.Ast);

PrintScriptPlan(ExecutionMeta_.Plan);
PrintScriptFinish(ExecutionMeta_, "Script");

if (!status.IsSuccess() || ExecutionMeta_.ExecutionStatus != NYdb::NQuery::EExecStatus::Completed) {
Cerr << CerrColors_.Red() << "Failed to execute script, invalid final status, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl;
Expand Down Expand Up @@ -377,6 +383,19 @@ class TKqpRunner::TImpl {
}
}

void PrintScriptFinish(const TQueryMeta& meta, const TString& queryType) const {
Cout << CoutColors_.Cyan() << queryType << " request finished.";
if (meta.TotalDuration) {
Cout << " Total duration: " << meta.TotalDuration;
} else {
Cout << " Estimated duration: " << TInstant::Now() - StartTime_;
}
if (meta.QueuedTime) {
Cout << ", Queued time: " << meta.QueuedTime;
}
Cout << CoutColors_.Default() << Endl;
}

private:
TRunnerOptions Options_;

Expand All @@ -388,6 +407,7 @@ class TKqpRunner::TImpl {
TString ExecutionOperation_;
TExecutionMeta ExecutionMeta_;
std::vector<Ydb::ResultSet> ResultSets_;
TInstant StartTime_;
};


Expand Down
19 changes: 13 additions & 6 deletions ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ class TStaticSecuredCredentialsFactory : public NYql::ISecuredServiceAccountCred
TString YqlToken_;
};

void FillQueryMeta(TQueryMeta& meta, const NKikimrKqp::TQueryResponse& response) {
meta.Ast = response.GetQueryAst();
if (const auto& plan = response.GetQueryPlan()) {
meta.Plan = plan;
}
meta.TotalDuration = TDuration::MicroSeconds(response.GetQueryStats().GetDurationUs());
meta.QueuedTime = TDuration::MicroSeconds(response.GetQueryStats().GetQueuedTimeUs());
}

} // anonymous namespace


Expand Down Expand Up @@ -424,10 +433,7 @@ TRequestResult TYdbSetup::QueryRequest(const TString& query, NKikimrKqp::EQueryA
const auto& responseRecord = queryOperationResponse.GetResponse();

resultSets = std::move(queryResponse.ResultSets);
meta.Ast = responseRecord.GetQueryAst();
if (const auto& plan = responseRecord.GetQueryPlan()) {
meta.Plan = plan;
}
FillQueryMeta(meta, responseRecord);

return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues());
}
Expand All @@ -438,8 +444,7 @@ TRequestResult TYdbSetup::YqlScriptRequest(const TString& query, NKikimrKqp::EQu
auto yqlQueryOperationResponse = Impl_->YqlScriptRequest(query, action, traceId)->Get()->Record.GetRef();
const auto& responseRecord = yqlQueryOperationResponse.GetResponse();

meta.Ast = responseRecord.GetQueryAst();
meta.Plan = responseRecord.GetQueryPlan();
FillQueryMeta(meta, responseRecord);

resultSets.reserve(responseRecord.results_size());
for (const auto& result : responseRecord.results()) {
Expand All @@ -466,6 +471,8 @@ TRequestResult TYdbSetup::GetScriptExecutionOperationRequest(const TString& oper
if (deserializedMeta.exec_stats().query_plan() != "{}") {
meta.Plan = deserializedMeta.exec_stats().query_plan();
}
meta.TotalDuration = TDuration::MicroSeconds(deserializedMeta.exec_stats().total_duration_us());
meta.QueuedTime = TDuration::MicroSeconds(deserializedMeta.exec_stats().queued_time_us());
}

return TRequestResult(scriptExecutionOperation->Get()->Status, scriptExecutionOperation->Get()->Issues);
Expand Down
17 changes: 8 additions & 9 deletions ydb/tests/tools/kqprun/src/ydb_setup.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,19 @@ struct TSchemeMeta {
};


struct TExecutionMeta {
bool Ready = false;
NYdb::NQuery::EExecStatus ExecutionStatus = NYdb::NQuery::EExecStatus::Unspecified;

i32 ResultSetsCount = 0;

struct TQueryMeta {
TString Ast;
TString Plan;
TDuration TotalDuration;
TDuration QueuedTime;
};


struct TQueryMeta {
TString Ast;
TString Plan;
struct TExecutionMeta : public TQueryMeta {
bool Ready = false;
NYdb::NQuery::EExecStatus ExecutionStatus = NYdb::NQuery::EExecStatus::Unspecified;

i32 ResultSetsCount = 0;
};


Expand Down

0 comments on commit eaa23fa

Please sign in to comment.