Skip to content

Commit

Permalink
Merge 7685dbf into 0c292ea
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd authored Feb 14, 2025
2 parents 0c292ea + 7685dbf commit 820303f
Show file tree
Hide file tree
Showing 23 changed files with 542 additions and 79 deletions.
23 changes: 23 additions & 0 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,25 @@ bool NeedReportAst(const Ydb::Query::ExecuteQueryRequest& req) {
}
}

bool NeedCollectDiagnostics(const Ydb::Query::ExecuteQueryRequest& req) {
switch (req.exec_mode()) {
case Ydb::Query::EXEC_MODE_EXPLAIN:
return true;

case Ydb::Query::EXEC_MODE_EXECUTE:
switch (req.stats_mode()) {
case Ydb::Query::StatsMode::STATS_MODE_FULL:
case Ydb::Query::StatsMode::STATS_MODE_PROFILE:
return true;
default:
return false;
}

default:
return false;
}
}

class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
Expand Down Expand Up @@ -284,6 +303,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
req->pool_id());

ev->SetProgressStatsPeriod(TDuration::MilliSeconds(req->stats_period_ms()));
ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId())) {
NYql::TIssues issues;
Expand Down Expand Up @@ -403,6 +423,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
if (NeedReportAst(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
}
if (NeedCollectDiagnostics(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_meta(kqpResponse.GetQueryDiagnostics());
}
}

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/grpc_services/rpc_execute_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ using namespace Ydb;
using namespace Ydb::Table;
using namespace NKqp;

bool NeedCollectDiagnostics(const Ydb::Table::ExecuteDataQueryRequest& req) {
switch (req.collect_stats()) {
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL:
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE:
return true;
default:
return false;
}
}

using TEvExecuteDataQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteDataQueryRequest,
Ydb::Table::ExecuteDataQueryResponse>;

Expand Down Expand Up @@ -147,6 +157,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
req->has_query_cache_policy() ? &req->query_cache_policy() : nullptr,
req->has_operation_params() ? &req->operation_params() : nullptr);

ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));

ReportCostInfo_ = req->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;

ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId());
Expand All @@ -166,6 +178,9 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
if (from.HasQueryStats()) {
FillQueryStats(*to->mutable_query_stats(), from);
to->mutable_query_stats()->set_query_ast(from.GetQueryAst());
if (from.HasQueryDiagnostics()) {
to->mutable_query_stats()->set_query_meta(from.GetQueryDiagnostics());
}
return;
}
}
Expand Down
28 changes: 26 additions & 2 deletions ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,27 @@ bool NeedReportPlan(const Ydb::Table::ExecuteScanQueryRequest& req) {
}
}

bool NeedCollectDiagnostics(const Ydb::Table::ExecuteScanQueryRequest& req) {
switch (req.mode()) {
case ExecuteScanQueryRequest_Mode_MODE_EXPLAIN:
return true;

case ExecuteScanQueryRequest_Mode_MODE_EXEC:
switch (req.collect_stats()) {
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL:
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE:
return true;
default:
break;
}

return false;

default:
return false;
}
}

bool CheckRequest(const Ydb::Table::ExecuteScanQueryRequest& req, TParseRequestError& error)
{
switch (req.mode()) {
Expand Down Expand Up @@ -228,7 +249,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
nullptr
);

ev->Record.MutableRequest()->SetCollectDiagnostics(req->Getcollect_full_diagnostics());
ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
NYql::TIssues issues;
Expand Down Expand Up @@ -291,6 +312,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ

bool reportStats = NeedReportStats(*Request_->GetProtoRequest());
bool reportPlan = reportStats && NeedReportPlan(*Request_->GetProtoRequest());
bool collectDiagnostics = NeedCollectDiagnostics(*Request_->GetProtoRequest());

if (reportStats) {
if (kqpResponse.HasQueryStats()) {
Expand All @@ -308,7 +330,9 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
response.mutable_result()->mutable_query_stats()->set_query_ast(kqpResponse.GetQueryAst());
}

response.mutable_result()->set_query_full_diagnostics(kqpResponse.GetQueryDiagnostics());
if (collectDiagnostics) {
response.mutable_result()->mutable_query_stats()->set_query_meta(kqpResponse.GetQueryDiagnostics());
}

Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
Request_->SendSerializedResult(std::move(out), record.GetYdbStatus());
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,14 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
};

struct TEvCompileResponse: public TEventLocal<TEvCompileResponse, TKqpEvents::EvCompileResponse> {
TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {}, const std::optional<TString>& replayMessage = std::nullopt)
TEvCompileResponse(const TKqpCompileResult::TConstPtr& compileResult, NLWTrace::TOrbit orbit = {})
: CompileResult(compileResult)
, ReplayMessage(replayMessage)
, Orbit(std::move(orbit)) {
}

TKqpCompileResult::TConstPtr CompileResult;
TKqpStatsCompile Stats;
std::optional<TString> ReplayMessage;
std::optional<TString> ReplayMessageUserView;

NLWTrace::TOrbit Orbit;
};
Expand Down
11 changes: 7 additions & 4 deletions ydb/core/kqp/common/compilation/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@ struct TKqpCompileResult {

TKqpCompileResult(const TString& uid, const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues,
ETableReadType maxReadType, TMaybe<TKqpQueryId> query = {}, TMaybe<TQueryAst> queryAst = {},
bool needToSplit = false, const TMaybe<TString>& commandTagName = {})
bool needToSplit = false, const TMaybe<TString>& commandTagName = {}, const TMaybe<TString>& replayMessageUserView = {})
: Status(status)
, Issues(issues)
, Query(std::move(query))
, Uid(uid)
, MaxReadType(maxReadType)
, QueryAst(std::move(queryAst))
, NeedToSplit(needToSplit)
, CommandTagName(commandTagName) {}
, CommandTagName(commandTagName)
, ReplayMessageUserView(replayMessageUserView) {}

static std::shared_ptr<TKqpCompileResult> Make(const TString& uid, const Ydb::StatusIds::StatusCode& status,
const NYql::TIssues& issues, ETableReadType maxReadType, TMaybe<TKqpQueryId> query = {},
TMaybe<TQueryAst> queryAst = {}, bool needToSplit = false, const TMaybe<TString>& commandTagName = {})
TMaybe<TQueryAst> queryAst = {}, bool needToSplit = false, const TMaybe<TString>& commandTagName = {}, const TMaybe<TString>& replayMessageUserView = {})
{
return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType, std::move(query), std::move(queryAst), needToSplit, commandTagName);
return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType, std::move(query), std::move(queryAst), needToSplit, commandTagName, replayMessageUserView);
}

std::shared_ptr<NYql::TAstParseResult> GetAst() const;
Expand All @@ -47,6 +48,8 @@ struct TKqpCompileResult {
bool NeedToSplit = false;
TMaybe<TString> CommandTagName = {};

TMaybe<TString> ReplayMessageUserView;

std::shared_ptr<const TPreparedQueryHolder> PreparedQuery;
};

Expand Down
8 changes: 5 additions & 3 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

replayMessage.InsertValue("query_id", Uid);
replayMessage.InsertValue("version", "1.0");
replayMessage.InsertValue("query_text", EscapeC(QueryId.Text));
NJson::TJsonValue queryParameterTypes(NJson::JSON_MAP);
if (QueryId.QueryParameterTypes) {
for (const auto& [paramName, paramType] : *QueryId.QueryParameterTypes) {
Expand All @@ -365,7 +364,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
replayMessage.InsertValue("query_syntax", ToString(Config->_KqpYqlSyntaxVersion.Get().GetRef()));
replayMessage.InsertValue("query_database", QueryId.Database);
replayMessage.InsertValue("query_cluster", QueryId.Cluster);
replayMessage.InsertValue("query_plan", queryPlan);
replayMessage.InsertValue("query_type", ToString(QueryId.Settings.QueryType));

if (CollectFullDiagnostics) {
Expand All @@ -380,6 +378,8 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
ReplayMessageUserView = NJson::WriteJson(replayMessage, /*formatOutput*/ false);
}

replayMessage.InsertValue("query_plan", queryPlan);
replayMessage.InsertValue("query_text", EscapeC(QueryId.Text));
replayMessage.InsertValue("table_metadata", TString(NJson::WriteJson(tablesMeta, false)));
replayMessage.InsertValue("table_meta_serialization_type", EMetaSerializationType::EncodedProto);

Expand All @@ -401,10 +401,12 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
<< ", issues: " << KqpCompileResult->Issues.ToString()
<< ", uid: " << KqpCompileResult->Uid);

if (ReplayMessageUserView) {
KqpCompileResult->ReplayMessageUserView = std::move(*ReplayMessageUserView);
}
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(KqpCompileResult);

responseEv->ReplayMessage = std::move(ReplayMessage);
responseEv->ReplayMessageUserView = std::move(ReplayMessageUserView);
ReplayMessage = std::nullopt;
ReplayMessageUserView = std::nullopt;
auto& stats = responseEv->Stats;
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

if (compileResult->NeedToSplit) {
Reply(compileRequest.Sender, compileResult, compileStats, ctx,
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
ProcessQueue(ctx);
return;
}
Expand All @@ -635,7 +635,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
for (auto& request : requests) {
LWTRACK(KqpCompileServiceGetCompilation, request.Orbit, request.Query.UserSid, compileActorId.ToString());
Reply(request.Sender, compileResult, compileStats, ctx,
request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan));
}
} else {
if (!hasTempTablesNameClashes) {
Expand All @@ -647,7 +647,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

LWTRACK(KqpCompileServiceGetCompilation, compileRequest.Orbit, compileRequest.Query.UserSid, compileActorId.ToString());
Reply(compileRequest.Sender, compileResult, compileStats, ctx,
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt));
compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
}
catch (const std::exception& e) {
LogException("TEvCompileResponse", ev->Sender, e, ctx);
Expand Down Expand Up @@ -809,7 +809,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
if (compileResult->GetAst() && QueryCache->FindByAst(query, *compileResult->GetAst(), keepInCache)) {
return false;
}
auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->QueryAst);
auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->QueryAst,
false, {}, compileResult->ReplayMessageUserView);
newCompileResult->AllowCache = compileResult->AllowCache;
newCompileResult->PreparedQuery = compileResult->PreparedQuery;
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert preparing query with params, queryId: " << query.SerializeToString());
Expand Down Expand Up @@ -865,7 +866,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult,
const TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie,
NLWTrace::TOrbit orbit, NWilson::TSpan span, const std::optional<TString>& replayMessage = std::nullopt)
NLWTrace::TOrbit orbit, NWilson::TSpan span)
{
const auto& query = compileResult->Query;
LWTRACK(KqpCompileServiceReply,
Expand All @@ -878,7 +879,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
<< ", queryUid: " << compileResult->Uid
<< ", status:" << compileResult->Status);

auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit), replayMessage);
auto responseEv = MakeHolder<TEvKqp::TEvCompileResponse>(compileResult, std::move(orbit));
responseEv->Stats = compileStats;

if (span) {
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,6 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
return false;
}
Orbit = std::move(ev->Orbit);
if (ev->ReplayMessage) {
ReplayMessage = *ev->ReplayMessage;
}

return true;
}
Expand All @@ -160,6 +157,10 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TKqpCompileResult::TConstPtr comp
return false;
}

if (compileResult->ReplayMessageUserView && GetCollectDiagnostics()) {
ReplayMessage = *compileResult->ReplayMessageUserView;
}

YQL_ENSURE(CompileResult->PreparedQuery);
const ui32 compiledVersion = CompileResult->PreparedQuery->GetVersion();
YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1,
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2698,15 +2698,15 @@ Y_UNIT_TEST_SUITE(KqpIndexes) {
UNIT_ASSERT_C(value.IsMap(), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_id"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("version"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_text"), "Incorrect Diagnostics");
UNIT_ASSERT_C(!value.Has("query_text"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_parameter_types"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("table_metadata"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value["table_metadata"].IsArray(), "Incorrect Diagnostics: table_metadata type should be an array");
UNIT_ASSERT_C(value.Has("created_at"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_syntax"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_database"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_cluster"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_plan"), "Incorrect Diagnostics");
UNIT_ASSERT_C(!value.Has("query_plan"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_type"), "Incorrect Diagnostics");
}

Expand Down
16 changes: 7 additions & 9 deletions ydb/core/kqp/ut/olap/helpers/query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

namespace NKikimr::NKqp {

TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it, NJson::TJsonValue* statInfo /*= nullptr*/, NJson::TJsonValue* diagnostics /*= nullptr*/) {
TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPartIterator& it, NJson::TJsonValue* statInfo /*= nullptr*/, NJson::TJsonValue* meta /*= nullptr*/) {
TVector<THashMap<TString, NYdb::TValue>> rows;
if (statInfo) {
*statInfo = NJson::JSON_NULL;
}
if (diagnostics) {
*diagnostics = NJson::JSON_NULL;
if (meta) {
*meta = NJson::JSON_NULL;
}
for (;;) {
auto streamPart = it.ReadNext().GetValueSync();
Expand All @@ -28,12 +28,10 @@ TVector<THashMap<TString, NYdb::TValue>> CollectRows(NYdb::NTable::TScanQueryPar
if (plan && statInfo) {
UNIT_ASSERT(NJson::ReadJsonFastTree(*plan, statInfo));
}
}

if (streamPart.HasDiagnostics()) {
auto diagnosticsString = TString{streamPart.GetDiagnostics()};
if (!diagnosticsString.empty() && diagnostics) {
UNIT_ASSERT(NJson::ReadJsonFastTree(diagnosticsString, diagnostics));
auto metaString = streamPart.GetQueryStats().GetMeta();
if (metaString && !metaString->empty() && meta) {
UNIT_ASSERT(NJson::ReadJsonFastTree(*metaString, meta));
}
}

Expand Down Expand Up @@ -70,4 +68,4 @@ TVector<THashMap<TString, NYdb::TValue>> ExecuteScanQuery(NYdb::NTable::TTableCl
return rows;
}

}
}
Loading

0 comments on commit 820303f

Please sign in to comment.