Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-2417 show ast in case of compilation error #1353

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 17 additions & 21 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,18 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
return;
}

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Request_->SetRuHeader(record.GetConsumedRu());
Ydb::Query::ExecuteQueryResponsePart response;

auto& kqpResponse = record.GetResponse();
if (NeedReportStats(*Request_->GetProtoRequest())) {
hasTrailingMessage = true;
FillQueryStats(*response.mutable_exec_stats(), kqpResponse);
if (NeedReportAst(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
}
}

Ydb::Query::ExecuteQueryResponsePart response;
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Request_->SetRuHeader(record.GetConsumedRu());

if (QueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE) {
for(int i = 0; i < kqpResponse.GetYdbResults().size(); i++) {
Expand All @@ -415,25 +421,15 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
hasTrailingMessage = true;
response.mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id());
}

if (NeedReportStats(*Request_->GetProtoRequest())) {
hasTrailingMessage = true;
FillQueryStats(*response.mutable_exec_stats(), kqpResponse);
if (NeedReportAst(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
}
}

if (hasTrailingMessage) {
response.set_status(Ydb::StatusIds::SUCCESS);
response.mutable_issues()->CopyFrom(issueMessage);
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
ReplySerializedAndFinishStream(record.GetYdbStatus(), std::move(out));
}
}

if (!hasTrailingMessage) {
if (hasTrailingMessage) {
response.set_status(record.GetYdbStatus());
response.mutable_issues()->CopyFrom(issueMessage);
TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
ReplySerializedAndFinishStream(record.GetYdbStatus(), std::move(out));
} else {
NYql::TIssues issues;
NYql::IssuesFromMessage(issueMessage, issues);
ReplyFinishStream(record.GetYdbStatus(), issueMessage);
Expand Down
28 changes: 17 additions & 11 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,18 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
PassAway();
}

void FillCompileResult(std::unique_ptr<NKikimrKqp::TPreparedQuery> preparingQuery, NKikimrKqp::EQueryType queryType) {
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
preparingQuery.release(), AppData()->FunctionRegistry);
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
KqpCompileResult->PreparedQuery = preparedQueryHolder;
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());

if (AstResult) {
KqpCompileResult->Ast = AstResult->Ast;
}
}

void Handle(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) {
Y_ENSURE(!ev->Get()->QueryId);

Expand Down Expand Up @@ -403,17 +415,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

if (status == Ydb::StatusIds::SUCCESS) {
YQL_ENSURE(kqpResult.PreparingQuery);
{
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
kqpResult.PreparingQuery.release(), AppData()->FunctionRegistry);
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
KqpCompileResult->PreparedQuery = preparedQueryHolder;
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());

if (AstResult) {
KqpCompileResult->Ast = AstResult->Ast;
}
}
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType);

auto now = TInstant::Now();
auto duration = now - StartTime;
Expand All @@ -423,6 +425,10 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
<< ", self: " << ctx.SelfID
<< ", duration: " << duration);
} else {
if (kqpResult.PreparingQuery) {
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType);
}

LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Compilation failed"
<< ", self: " << ctx.SelfID
<< ", status: " << Ydb::StatusIds_StatusCode_Name(status)
Expand Down
47 changes: 42 additions & 5 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ class TAsyncValidateYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResul
, SqlVersion(sqlVersion) {}

void FillResult(TResult& validateResult) const override {
if (!validateResult.Success()) {
return;
}

YQL_ENSURE(SessionCtx->Query().PrepareOnly);
validateResult.PreparedQuery.reset(SessionCtx->Query().PreparingQuery.release());
validateResult.SqlVersion = SqlVersion;
Expand Down Expand Up @@ -211,6 +215,10 @@ class TAsyncExplainYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
, UseDqExplain(useDqExplain) {}

void FillResult(TResult& queryResult) const override {
if (!queryResult.Success()) {
return;
}

if (UseDqExplain) {
TVector<const TString> plans;
for (auto id : SessionCtx->Query().ExecutionOrder) {
Expand Down Expand Up @@ -253,6 +261,10 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
, SqlVersion(sqlVersion) {}

void FillResult(TResult& queryResult) const override {
if (!queryResult.Success()) {
return;
}

for (auto& resultStr : ResultProviderConfig.CommittedResults) {
queryResult.Results.emplace_back(
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
Expand Down Expand Up @@ -300,6 +312,10 @@ class TAsyncExecuteKqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
, ExecuteCtx(executeCtx) {}

void FillResult(TResult& queryResult) const override {
if (!queryResult.Success()) {
return;
}

YQL_ENSURE(ExecuteCtx.QueryResults.size() == 1);
queryResult = std::move(ExecuteCtx.QueryResults[0]);
queryResult.QueryPlan = queryResult.PreparingQuery->GetPhysicalQuery().GetQueryPlan();
Expand All @@ -320,13 +336,28 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
using TResult = IKqpHost::TQueryResult;

TAsyncPrepareYqlResult(TExprNode* queryRoot, TExprContext& exprCtx, IGraphTransformer& transformer,
TIntrusivePtr<TKikimrQueryContext> queryCtx, const TKqpQueryRef& query, TMaybe<TSqlVersion> sqlVersion)
TIntrusivePtr<TKikimrQueryContext> queryCtx, const TKqpQueryRef& query, TMaybe<TSqlVersion> sqlVersion,
TIntrusivePtr<TKqlTransformContext> transformCtx)
: TKqpAsyncResultBase(queryRoot, exprCtx, transformer)
, QueryCtx(queryCtx)
, ExprCtx(exprCtx)
, TransformCtx(transformCtx)
, QueryText(query.Text)
, SqlVersion(sqlVersion) {}

void FillResult(TResult& prepareResult) const override {
if (!prepareResult.Success()) {
auto exprRoot = GetExprRoot();
if (TransformCtx && TransformCtx->ExplainTransformerInput) {
exprRoot = TransformCtx->ExplainTransformerInput;
}
if (exprRoot) {
prepareResult.PreparingQuery = std::move(QueryCtx->PreparingQuery);
prepareResult.PreparingQuery->MutablePhysicalQuery()->SetQueryAst(KqpExprToPrettyString(*exprRoot, ExprCtx));
}
return;
}

YQL_ENSURE(QueryCtx->PrepareOnly);
YQL_ENSURE(QueryCtx->PreparingQuery);

Expand All @@ -344,6 +375,8 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult

private:
TIntrusivePtr<TKikimrQueryContext> QueryCtx;
NYql::TExprContext& ExprCtx;
TIntrusivePtr<TKqlTransformContext> TransformCtx;
TString QueryText;
TMaybe<TSqlVersion> SqlVersion;
};
Expand Down Expand Up @@ -933,6 +966,7 @@ class TKqpHost : public IKqpHost {
, IsInternalCall(isInternalCall)
, FederatedQuerySetup(federatedQuerySetup)
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken))
, Config(config)
, TypesCtx(MakeIntrusive<TTypeAnnotationContext>())
, PlanBuilder(CreatePlanBuilder(*TypesCtx))
, FakeWorld(ExprCtx->NewWorld(TPosition()))
Expand Down Expand Up @@ -1265,7 +1299,7 @@ class TKqpHost : public IKqpHost {
}

return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(),
query.Text, sqlVersion);
query.Text, sqlVersion, TransformCtx);
}

IAsyncQueryResultPtr PrepareDataQueryAstInternal(const TKqpQueryRef& queryAst, const TPrepareSettings& settings,
Expand Down Expand Up @@ -1327,7 +1361,7 @@ class TKqpHost : public IKqpHost {
}

return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(),
query.Text, sqlVersion);
query.Text, sqlVersion, TransformCtx);
}

IAsyncQueryResultPtr PrepareScanQueryInternal(const TKqpQueryRef& query, bool isSql, TExprContext& ctx,
Expand All @@ -1354,7 +1388,7 @@ class TKqpHost : public IKqpHost {
}

return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(),
query.Text, sqlVersion);
query.Text, sqlVersion, TransformCtx);
}

IAsyncQueryResultPtr PrepareScanQueryAstInternal(const TKqpQueryRef& queryAst, TExprContext& ctx) {
Expand Down Expand Up @@ -1502,7 +1536,8 @@ class TKqpHost : public IKqpHost {
}

void Init(EKikimrQueryType queryType) {
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry);
TransformCtx = MakeIntrusive<TKqlTransformContext>(Config, SessionCtx->QueryPtr(), SessionCtx->TablesPtr());
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry);

ExprCtx->NodesAllocationLimit = SessionCtx->Config()._KqpExprNodesAllocationLimit.Get().GetRef();
ExprCtx->StringsAllocationLimit = SessionCtx->Config()._KqpExprStringsAllocationLimit.Get().GetRef();
Expand Down Expand Up @@ -1635,6 +1670,7 @@ class TKqpHost : public IKqpHost {
std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;

TIntrusivePtr<TKikimrSessionContext> SessionCtx;
TKikimrConfiguration::TPtr Config;

TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> FuncRegistryHolder;
const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry;
Expand All @@ -1648,6 +1684,7 @@ class TKqpHost : public IKqpHost {
TExprNode::TPtr FakeWorld;

TIntrusivePtr<TExecuteContext> ExecuteCtx;
TIntrusivePtr<TKqlTransformContext> TransformCtx;
TIntrusivePtr<IKqpRunner> KqpRunner;
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})};

Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/host/kqp_host_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ class TKqpAsyncResultBase : public NYql::IKikimrAsyncResult<TResult> {
YQL_ENSURE(HasResult());

if (Status.GetValue() == NYql::IGraphTransformer::TStatus::Error) {
return NYql::NCommon::ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues());
TResult result = NYql::NCommon::ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues());
FillResult(result);
return result;
}

YQL_ENSURE(Status.GetValue() == NYql::IGraphTransformer::TStatus::Ok);
Expand Down Expand Up @@ -244,7 +246,7 @@ class IKqpRunner : public TThrRefBase {

TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TIntrusivePtr<NYql::TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry);
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry);

TAutoPtr<NYql::IGraphTransformer> CreateKqpExplainPreparedTransformer(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, TIntrusivePtr<TKqlTransformContext> transformCtx, const NMiniKQL::IFunctionRegistry* funcRegistry,
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ class TKqpRunner : public IKqpRunner {
public:
TKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry)
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
: Gateway(gateway)
, Cluster(cluster)
, TypesCtx(*typesCtx)
, SessionCtx(sessionCtx)
, FunctionRegistry(funcRegistry)
, Config(sessionCtx->ConfigPtr())
, TransformCtx(MakeIntrusive<TKqlTransformContext>(Config, sessionCtx->QueryPtr(), sessionCtx->TablesPtr()))
, TransformCtx(transformCtx)
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
sessionCtx->TablesPtr()))
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
Expand Down Expand Up @@ -377,9 +377,9 @@ class TKqpRunner : public IKqpRunner {

TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry)
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
{
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, funcRegistry);
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, transformCtx, funcRegistry);
}

} // namespace NKqp
Expand Down
15 changes: 11 additions & 4 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1720,10 +1720,17 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

const auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
FillColumnsMeta(phyQuery, response);
} else if (compileResult->Status == Ydb::StatusIds::TIMEOUT && QueryState->QueryDeadlines.CancelAt) {
// The compile timeout cause cancelation execution of request.
// So in case of cancel after we can reply with canceled status
ev.SetYdbStatus(Ydb::StatusIds::CANCELLED);
} else {
if (compileResult->Status == Ydb::StatusIds::TIMEOUT && QueryState->QueryDeadlines.CancelAt) {
// The compile timeout cause cancelation execution of request.
// So in case of cancel after we can reply with canceled status
ev.SetYdbStatus(Ydb::StatusIds::CANCELLED);
}

auto& preparedQuery = compileResult->PreparedQuery;
if (preparedQuery && QueryState->ReportStats() && QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) {
response.SetQueryAst(preparedQuery->GetPhysicalQuery().GetQueryAst());
}
}
}

Expand Down
23 changes: 23 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,29 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_VALUES_EQUAL(totalTasks, 2);
}

Y_UNIT_TEST(ExecStatsAst) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();

auto settings = TExecuteQuerySettings()
.StatsMode(EStatsMode::Full);

std::vector<std::pair<TString, EStatus>> cases = {
{ "SELECT 42 AS test_ast_column", EStatus::SUCCESS },
{ "SELECT test_ast_column FROM TwoShard", EStatus::GENERIC_ERROR },
{ "SELECT UNWRAP(42 / 0) AS test_ast_column", EStatus::PRECONDITION_FAILED },
};

for (const auto& [sql, status] : cases) {
auto result = db.ExecuteQuery(sql, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), status, result.GetIssues().ToString());

UNIT_ASSERT(result.GetStats().Defined());
UNIT_ASSERT(result.GetStats()->GetAst().Defined());
UNIT_ASSERT_STRING_CONTAINS(*result.GetStats()->GetAst(), "test_ast_column");
}
}

Y_UNIT_TEST(Ddl) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
Expand Down
15 changes: 8 additions & 7 deletions ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,21 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
Iterator_.ReadNext().Subscribe([self](TAsyncExecuteQueryPart partFuture) mutable {
auto part = partFuture.ExtractValue();

if (const auto& st = part.GetStats()) {
self->Stats_ = st;
}

if (!part.IsSuccess()) {
TMaybe<TExecStats> stats;
std::swap(self->Stats_, stats);

if (part.EOS()) {
TVector<NYql::TIssue> issues;
TVector<Ydb::ResultSet> resultProtos;
TMaybe<TExecStats> stats;
TMaybe<TTransaction> tx;

std::swap(self->Issues_, issues);
std::swap(self->ResultSets_, resultProtos);
std::swap(self->Stats_, stats);
std::swap(self->Tx_, tx);

TVector<TResultSet> resultSets;
Expand All @@ -160,7 +165,7 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
std::move(tx)
));
} else {
self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, {}, {}));
self->Promise_.SetValue(TExecuteQueryResult(std::move(part), {}, std::move(stats), {}));
}

return;
Expand All @@ -185,10 +190,6 @@ struct TExecuteQueryBuffer : public TThrRefBase, TNonCopyable {
resultSet.mutable_rows()->Add(inRsProto.rows().begin(), inRsProto.rows().end());
}

if (const auto& st = part.GetStats()) {
self->Stats_ = st;
}

if (const auto& tx = part.GetTransaction()) {
self->Tx_ = tx;
}
Expand Down
Loading
Loading