Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recompilation with query id from compilation result #9592

Merged
merged 1 commit into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ydb/core/kqp/common/compilation/result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,11 @@

namespace NKikimr::NKqp {

std::shared_ptr<NYql::TAstParseResult> TKqpCompileResult::GetAst() const {
if (QueryAst) {
return QueryAst->Ast;
}
return nullptr;
}

} // namespace NKikimr::NKqp
13 changes: 8 additions & 5 deletions ydb/core/kqp/common/compilation/result.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <memory>
#include <ydb/core/kqp/common/simple/query_ast.h>
#include <ydb/core/kqp/common/simple/query_id.h>
#include <ydb/core/kqp/common/simple/helpers.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
Expand All @@ -14,24 +15,26 @@ struct TKqpCompileResult {
using TConstPtr = std::shared_ptr<const TKqpCompileResult>;

TKqpCompileResult(const TString& uid, const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues,
ETableReadType maxReadType, TMaybe<TKqpQueryId> query = {}, std::shared_ptr<NYql::TAstParseResult> ast = {},
ETableReadType maxReadType, TMaybe<TKqpQueryId> query = {}, TMaybe<TQueryAst> queryAst = {},
bool needToSplit = false, const TMaybe<TString>& commandTagName = {})
: Status(status)
, Issues(issues)
, Query(std::move(query))
, Uid(uid)
, MaxReadType(maxReadType)
, Ast(std::move(ast))
, QueryAst(std::move(queryAst))
, NeedToSplit(needToSplit)
, CommandTagName(commandTagName) {}

static std::shared_ptr<TKqpCompileResult> Make(const TString& uid, const Ydb::StatusIds::StatusCode& status,
const NYql::TIssues& issues, ETableReadType maxReadType, TMaybe<TKqpQueryId> query = {},
std::shared_ptr<NYql::TAstParseResult> ast = {}, bool needToSplit = false, const TMaybe<TString>& commandTagName = {})
TMaybe<TQueryAst> queryAst = {}, bool needToSplit = false, const TMaybe<TString>& commandTagName = {})
{
return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType, std::move(query), std::move(ast), needToSplit, commandTagName);
return std::make_shared<TKqpCompileResult>(uid, status, issues, maxReadType, std::move(query), std::move(queryAst), needToSplit, commandTagName);
}

std::shared_ptr<NYql::TAstParseResult> GetAst() const;

Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;

Expand All @@ -40,7 +43,7 @@ struct TKqpCompileResult {

ETableReadType MaxReadType;
bool AllowCache = true;
std::shared_ptr<NYql::TAstParseResult> Ast;
TMaybe<TQueryAst> QueryAst;
bool NeedToSplit = false;
TMaybe<TString> CommandTagName = {};

Expand Down
12 changes: 4 additions & 8 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

void ReplyError(Ydb::StatusIds::StatusCode status, const TIssues& issues) {
if (!KqpCompileResult) {
KqpCompileResult = TKqpCompileResult::Make(Uid, status, issues, ETableReadType::Other, std::move(QueryId));
KqpCompileResult = TKqpCompileResult::Make(Uid, status, issues, ETableReadType::Other, std::move(QueryId), std::move(QueryAst));
} else {
KqpCompileResult = TKqpCompileResult::Make(Uid, status, issues, ETableReadType::Other, std::move(KqpCompileResult->Query));
KqpCompileResult = TKqpCompileResult::Make(Uid, status, issues, ETableReadType::Other, std::move(KqpCompileResult->Query), std::move(KqpCompileResult->QueryAst));
}

Reply();
Expand Down Expand Up @@ -456,10 +456,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
KqpCompileResult->PreparedQuery = preparedQueryHolder;
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery()) && allowCache;

if (QueryAst) {
KqpCompileResult->Ast = QueryAst->Ast;
}
}

void Handle(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) {
Expand All @@ -478,7 +474,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

if (kqpResult.NeedToSplit) {
KqpCompileResult = TKqpCompileResult::Make(
Uid, status, kqpResult.Issues(), ETableReadType::Other, std::move(QueryId), {}, true);
Uid, status, kqpResult.Issues(), ETableReadType::Other, std::move(QueryId), std::move(QueryAst), true);
Reply();
return;
}
Expand All @@ -496,7 +492,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

auto queryType = QueryId.Settings.QueryType;

KqpCompileResult = TKqpCompileResult::Make(Uid, status, kqpResult.Issues(), maxReadType, std::move(QueryId));
KqpCompileResult = TKqpCompileResult::Make(Uid, status, kqpResult.Issues(), maxReadType, std::move(QueryId), std::move(QueryAst));
KqpCompileResult->CommandTagName = kqpResult.CommandTagName;

if (status == Ydb::StatusIds::SUCCESS) {
Expand Down
29 changes: 15 additions & 14 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ class TKqpQueryCache {

void InsertAst(const TKqpCompileResult::TConstPtr& compileResult) {
Y_ENSURE(compileResult->Query);
Y_ENSURE(compileResult->Ast);
Y_ENSURE(compileResult->GetAst());

AstIndex.emplace(GetQueryIdWithAst(*compileResult->Query, *compileResult->Ast), compileResult->Uid);
AstIndex.emplace(GetQueryIdWithAst(*compileResult->Query, *compileResult->GetAst()), compileResult->Uid);
}

bool Insert(const TKqpCompileResult::TConstPtr& compileResult, bool isEnableAstCache, bool isPerStatementExecution) {
if (!isPerStatementExecution) {
InsertQuery(compileResult);
}
if (isEnableAstCache && compileResult->Ast) {
if (isEnableAstCache && compileResult->GetAst()) {
InsertAst(compileResult);
}

Expand All @@ -76,8 +76,8 @@ class TKqpQueryCache {

auto queryId = *removedItem->Value.CompileResult->Query;
QueryIndex.erase(queryId);
if (removedItem->Value.CompileResult->Ast) {
AstIndex.erase(GetQueryIdWithAst(queryId, *removedItem->Value.CompileResult->Ast));
if (removedItem->Value.CompileResult->GetAst()) {
AstIndex.erase(GetQueryIdWithAst(queryId, *removedItem->Value.CompileResult->GetAst()));
}
auto indexIt = Index.find(*removedItem);
if (indexIt != Index.end()) {
Expand Down Expand Up @@ -190,8 +190,8 @@ class TKqpQueryCache {
Y_ABORT_UNLESS(item->Value.CompileResult->Query);
auto queryId = *item->Value.CompileResult->Query;
QueryIndex.erase(queryId);
if (item->Value.CompileResult->Ast) {
AstIndex.erase(GetQueryIdWithAst(queryId, *item->Value.CompileResult->Ast));
if (item->Value.CompileResult->GetAst()) {
AstIndex.erase(GetQueryIdWithAst(queryId, *item->Value.CompileResult->GetAst()));
}

Index.erase(it);
Expand Down Expand Up @@ -327,6 +327,8 @@ struct TKqpCompileRequest {
NYql::TExprContext* SplitCtx;
NYql::TExprNode::TPtr SplitExpr;

bool FindInCache = true;

bool IsIntrestedInResult() const {
return IntrestedInResult->load();
}
Expand Down Expand Up @@ -764,8 +766,6 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
}

if (compileResult || request.Query) {
QueryCache.EraseByUid(request.Uid);

Counters->ReportCompileRequestCompile(dbCounters);

NWilson::TSpan compileServiceSpan(TWilsonKqp::CompileService, ev->Get() ? std::move(ev->TraceId) : NWilson::TTraceId(), "CompileService");
Expand All @@ -790,12 +790,13 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
);
}
}
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, query,
TKqpCompileRequest compileRequest(ev->Sender, request.Uid, compileResult ? *compileResult->Query : *request.Query,
compileSettings, request.UserToken, dbCounters, request.GUCSettings, request.ApplicationName,
ev->Cookie, std::move(ev->Get()->IntrestedInResult),
ev->Get()->UserRequestContext,
ev->Get() ? std::move(ev->Get()->Orbit) : NLWTrace::TOrbit(),
std::move(compileServiceSpan), std::move(ev->Get()->TempTablesState));
compileRequest.FindInCache = false;

if (TableServiceConfig.GetEnableAstCache() && request.QueryAst) {
return CompileByAst(*request.QueryAst, compileRequest, ctx);
Expand Down Expand Up @@ -976,7 +977,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
<< ", ast: " << queryAst.Ast->Root->ToString());
auto compileResult = QueryCache.FindByAst(compileRequest.Query, *queryAst.Ast, compileRequest.CompileSettings.KeepInCache);

if (HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState)) {
if (!compileRequest.FindInCache || HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState)) {
compileResult = nullptr;
}

Expand All @@ -987,7 +988,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
<< ", sender: " << compileRequest.Sender
<< ", queryUid: " << compileResult->Uid);

compileResult->Ast->PgAutoParamValues = std::move(queryAst.Ast->PgAutoParamValues);
compileResult->GetAst()->PgAutoParamValues = std::move(queryAst.Ast->PgAutoParamValues);

ReplyFromCache(compileRequest.Sender, compileResult, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan));
return;
Expand Down Expand Up @@ -1061,10 +1062,10 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
if (QueryCache.FindByQuery(query, keepInCache)) {
return false;
}
if (compileResult->Ast && QueryCache.FindByAst(query, *compileResult->Ast, keepInCache)) {
if (compileResult->GetAst() && QueryCache.FindByAst(query, *compileResult->GetAst(), keepInCache)) {
return false;
}
auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->Ast);
auto newCompileResult = TKqpCompileResult::Make(CreateGuidAsString(), compileResult->Status, compileResult->Issues, compileResult->MaxReadType, std::move(query), compileResult->QueryAst);
newCompileResult->AllowCache = compileResult->AllowCache;
newCompileResult->PreparedQuery = compileResult->PreparedQuery;
LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Insert preparing query with params, queryId: " << query.SerializeToString());
Expand Down
10 changes: 2 additions & 8 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
CommandTagName = CompileResult->CommandTagName;
}
for (const auto& param : PreparedQuery->GetParameters()) {
const auto& ast = CompileResult->Ast;
const auto& ast = CompileResult->GetAst();
if (!ast || !ast->PgAutoParamValues || !ast->PgAutoParamValues->contains(param.GetName())) {
ResultParams.push_back(param);
}
Expand Down Expand Up @@ -275,15 +275,9 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque
compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt);
}

TMaybe<TQueryAst> statementAst;
if (!Statements.empty()) {
YQL_ENSURE(CurrentStatementId < Statements.size());
statementAst = Statements[CurrentStatementId];
}

return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, CompileResult->Uid, query, isQueryActionPrepare,
compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState,
statementAst);
CompileResult->QueryAst);
}

std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildSplitRequest(std::shared_ptr<std::atomic<bool>> cookie, const TGUCSettings::TPtr& gUCSettingsPtr) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,8 +899,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
try {
const auto& parameters = QueryState->GetYdbParameters();
QueryState->QueryData->ParseParameters(parameters);
if (QueryState->CompileResult && QueryState->CompileResult->Ast && QueryState->CompileResult->Ast->PgAutoParamValues) {
for(const auto& [name, param] : *QueryState->CompileResult->Ast->PgAutoParamValues) {
if (QueryState->CompileResult && QueryState->CompileResult->GetAst() && QueryState->CompileResult->GetAst()->PgAutoParamValues) {
for(const auto& [name, param] : *QueryState->CompileResult->GetAst()->PgAutoParamValues) {
if (!parameters.contains(name)) {
QueryState->QueryData->AddTypedValueParam(name, param);
}
Expand Down
Loading