Skip to content

Commit

Permalink
Merge 4493bdb into 591f564
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Feb 5, 2024
2 parents 591f564 + 4493bdb commit fc4102c
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 28 deletions.
28 changes: 17 additions & 11 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,18 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
PassAway();
}

void FillCompileResult(std::unique_ptr<NKikimrKqp::TPreparedQuery> preparingQuery, NKikimrKqp::EQueryType queryType) {
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
preparingQuery.release(), AppData()->FunctionRegistry);
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
KqpCompileResult->PreparedQuery = preparedQueryHolder;
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());

if (AstResult) {
KqpCompileResult->Ast = AstResult->Ast;
}
}

void Handle(TEvKqp::TEvContinueProcess::TPtr &ev, const TActorContext &ctx) {
Y_ENSURE(!ev->Get()->QueryId);

Expand Down Expand Up @@ -403,17 +415,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

if (status == Ydb::StatusIds::SUCCESS) {
YQL_ENSURE(kqpResult.PreparingQuery);
{
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
kqpResult.PreparingQuery.release(), AppData()->FunctionRegistry);
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
KqpCompileResult->PreparedQuery = preparedQueryHolder;
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());

if (AstResult) {
KqpCompileResult->Ast = AstResult->Ast;
}
}
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType);

auto now = TInstant::Now();
auto duration = now - StartTime;
Expand All @@ -423,6 +425,10 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
<< ", self: " << ctx.SelfID
<< ", duration: " << duration);
} else {
if (kqpResult.PreparingQuery) {
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType);
}

LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Compilation failed"
<< ", self: " << ctx.SelfID
<< ", status: " << Ydb::StatusIds_StatusCode_Name(status)
Expand Down
43 changes: 40 additions & 3 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ class TAsyncValidateYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResul
, SqlVersion(sqlVersion) {}

void FillResult(TResult& validateResult) const override {
if (!validateResult.Success()) {
return;
}

YQL_ENSURE(SessionCtx->Query().PrepareOnly);
validateResult.PreparedQuery.reset(SessionCtx->Query().PreparingQuery.release());
validateResult.SqlVersion = SqlVersion;
Expand Down Expand Up @@ -211,6 +215,10 @@ class TAsyncExplainYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
, UseDqExplain(useDqExplain) {}

void FillResult(TResult& queryResult) const override {
if (!queryResult.Success()) {
return;
}

if (UseDqExplain) {
TVector<const TString> plans;
for (auto id : SessionCtx->Query().ExecutionOrder) {
Expand Down Expand Up @@ -253,6 +261,10 @@ class TAsyncExecuteYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
, SqlVersion(sqlVersion) {}

void FillResult(TResult& queryResult) const override {
if (!queryResult.Success()) {
return;
}

for (auto& resultStr : ResultProviderConfig.CommittedResults) {
queryResult.Results.emplace_back(
google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
Expand Down Expand Up @@ -300,6 +312,10 @@ class TAsyncExecuteKqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
, ExecuteCtx(executeCtx) {}

void FillResult(TResult& queryResult) const override {
if (!queryResult.Success()) {
return;
}

YQL_ENSURE(ExecuteCtx.QueryResults.size() == 1);
queryResult = std::move(ExecuteCtx.QueryResults[0]);
queryResult.QueryPlan = queryResult.PreparingQuery->GetPhysicalQuery().GetQueryPlan();
Expand All @@ -320,13 +336,28 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult
using TResult = IKqpHost::TQueryResult;

TAsyncPrepareYqlResult(TExprNode* queryRoot, TExprContext& exprCtx, IGraphTransformer& transformer,
TIntrusivePtr<TKikimrQueryContext> queryCtx, const TKqpQueryRef& query, TMaybe<TSqlVersion> sqlVersion)
TIntrusivePtr<TKikimrQueryContext> queryCtx, const TKqpQueryRef& query, TMaybe<TSqlVersion> sqlVersion,
TIntrusivePtr<TKqlTransformContext> transformCtx = nullptr)
: TKqpAsyncResultBase(queryRoot, exprCtx, transformer)
, QueryCtx(queryCtx)
, ExprCtx(exprCtx)
, TransformCtx(transformCtx)
, QueryText(query.Text)
, SqlVersion(sqlVersion) {}

void FillResult(TResult& prepareResult) const override {
if (!prepareResult.Success()) {
auto exprRoot = GetExprRoot();
if (TransformCtx && TransformCtx->ExplainTransformerInput) {
exprRoot = TransformCtx->ExplainTransformerInput;
}
if (exprRoot) {
prepareResult.PreparingQuery = std::move(QueryCtx->PreparingQuery);
prepareResult.PreparingQuery->MutablePhysicalQuery()->SetQueryAst(KqpExprToPrettyString(*exprRoot, ExprCtx));
}
return;
}

YQL_ENSURE(QueryCtx->PrepareOnly);
YQL_ENSURE(QueryCtx->PreparingQuery);

Expand All @@ -344,6 +375,8 @@ class TAsyncPrepareYqlResult : public TKqpAsyncResultBase<IKqpHost::TQueryResult

private:
TIntrusivePtr<TKikimrQueryContext> QueryCtx;
NYql::TExprContext& ExprCtx;
TIntrusivePtr<TKqlTransformContext> TransformCtx;
TString QueryText;
TMaybe<TSqlVersion> SqlVersion;
};
Expand Down Expand Up @@ -933,6 +966,7 @@ class TKqpHost : public IKqpHost {
, IsInternalCall(isInternalCall)
, FederatedQuerySetup(federatedQuerySetup)
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken))
, Config(config)
, TypesCtx(MakeIntrusive<TTypeAnnotationContext>())
, PlanBuilder(CreatePlanBuilder(*TypesCtx))
, FakeWorld(ExprCtx->NewWorld(TPosition()))
Expand Down Expand Up @@ -1327,7 +1361,7 @@ class TKqpHost : public IKqpHost {
}

return MakeIntrusive<TAsyncPrepareYqlResult>(queryExpr.Get(), ctx, *YqlTransformer, SessionCtx->QueryPtr(),
query.Text, sqlVersion);
query.Text, sqlVersion, TransformCtx);
}

IAsyncQueryResultPtr PrepareScanQueryInternal(const TKqpQueryRef& query, bool isSql, TExprContext& ctx,
Expand Down Expand Up @@ -1502,7 +1536,8 @@ class TKqpHost : public IKqpHost {
}

void Init(EKikimrQueryType queryType) {
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, *FuncRegistry);
TransformCtx = MakeIntrusive<TKqlTransformContext>(Config, SessionCtx->QueryPtr(), SessionCtx->TablesPtr());
KqpRunner = CreateKqpRunner(Gateway, Cluster, TypesCtx, SessionCtx, TransformCtx, *FuncRegistry);

ExprCtx->NodesAllocationLimit = SessionCtx->Config()._KqpExprNodesAllocationLimit.Get().GetRef();
ExprCtx->StringsAllocationLimit = SessionCtx->Config()._KqpExprStringsAllocationLimit.Get().GetRef();
Expand Down Expand Up @@ -1635,6 +1670,7 @@ class TKqpHost : public IKqpHost {
std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;

TIntrusivePtr<TKikimrSessionContext> SessionCtx;
TKikimrConfiguration::TPtr Config;

TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> FuncRegistryHolder;
const NKikimr::NMiniKQL::IFunctionRegistry* FuncRegistry;
Expand All @@ -1648,6 +1684,7 @@ class TKqpHost : public IKqpHost {
TExprNode::TPtr FakeWorld;

TIntrusivePtr<TExecuteContext> ExecuteCtx;
TIntrusivePtr<TKqlTransformContext> TransformCtx;
TIntrusivePtr<IKqpRunner> KqpRunner;
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})};

Expand Down
6 changes: 4 additions & 2 deletions ydb/core/kqp/host/kqp_host_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ class TKqpAsyncResultBase : public NYql::IKikimrAsyncResult<TResult> {
YQL_ENSURE(HasResult());

if (Status.GetValue() == NYql::IGraphTransformer::TStatus::Error) {
return NYql::NCommon::ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues());
TResult result = NYql::NCommon::ResultFromErrors<TResult>(ExprCtx.IssueManager.GetIssues());
FillResult(result);
return result;
}

YQL_ENSURE(Status.GetValue() == NYql::IGraphTransformer::TStatus::Ok);
Expand Down Expand Up @@ -244,7 +246,7 @@ class IKqpRunner : public TThrRefBase {

TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TIntrusivePtr<NYql::TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry);
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry);

TAutoPtr<NYql::IGraphTransformer> CreateKqpExplainPreparedTransformer(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, TIntrusivePtr<TKqlTransformContext> transformCtx, const NMiniKQL::IFunctionRegistry* funcRegistry,
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,14 @@ class TKqpRunner : public IKqpRunner {
public:
TKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry)
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
: Gateway(gateway)
, Cluster(cluster)
, TypesCtx(*typesCtx)
, SessionCtx(sessionCtx)
, FunctionRegistry(funcRegistry)
, Config(sessionCtx->ConfigPtr())
, TransformCtx(MakeIntrusive<TKqlTransformContext>(Config, sessionCtx->QueryPtr(), sessionCtx->TablesPtr()))
, TransformCtx(transformCtx)
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
sessionCtx->TablesPtr()))
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
Expand Down Expand Up @@ -377,9 +377,9 @@ class TKqpRunner : public IKqpRunner {

TIntrusivePtr<IKqpRunner> CreateKqpRunner(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster,
const TIntrusivePtr<TTypeAnnotationContext>& typesCtx, const TIntrusivePtr<TKikimrSessionContext>& sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry)
const TIntrusivePtr<TKqlTransformContext>& transformCtx, const NMiniKQL::IFunctionRegistry& funcRegistry)
{
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, funcRegistry);
return new TKqpRunner(gateway, cluster, typesCtx, sessionCtx, transformCtx, funcRegistry);
}

} // namespace NKqp
Expand Down
15 changes: 11 additions & 4 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1720,10 +1720,17 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

const auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
FillColumnsMeta(phyQuery, response);
} else if (compileResult->Status == Ydb::StatusIds::TIMEOUT && QueryState->QueryDeadlines.CancelAt) {
// The compile timeout cause cancelation execution of request.
// So in case of cancel after we can reply with canceled status
ev.SetYdbStatus(Ydb::StatusIds::CANCELLED);
} else {
if (compileResult->Status == Ydb::StatusIds::TIMEOUT && QueryState->QueryDeadlines.CancelAt) {
// The compile timeout cause cancelation execution of request.
// So in case of cancel after we can reply with canceled status
ev.SetYdbStatus(Ydb::StatusIds::CANCELLED);
}

auto& preparedQuery = compileResult->PreparedQuery;
if (preparedQuery && QueryState->ReportStats() && QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) {
response.SetQueryAst(preparedQuery->GetPhysicalQuery().GetQueryAst());
}
}
}

Expand Down
33 changes: 33 additions & 0 deletions ydb/tests/fq/s3/test_bindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,3 +586,36 @@ def test_count_for_pg_binding(self, kikimr, s3, client, pg_syntax):
else:
assert result_set.columns[0].type.type_id == ydb.Type.UINT64
assert result_set.rows[0].items[0].uint64_value == 1

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_ast_in_failed_query_compilation(self, kikimr, s3, client):
resource = boto3.resource(
"s3",
endpoint_url=s3.s3_url,
aws_access_key_id="key",
aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("bindbucket")
bucket.create(ACL='public-read')
bucket.objects.all().delete()

connection_id = client.create_storage_connection("bb", "bindbucket").result.connection_id

data_column = ydb.Column(name="data", type=ydb.Type(type_id=ydb.Type.PrimitiveTypeId.STRING))
client.create_object_storage_binding(name="s3binding",
path="/",
format="raw",
connection_id=connection_id,
columns=[data_column])

sql = R'''
SELECT some_unknown_column FROM bindings.`s3binding`;
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)

ast = client.describe_query(query_id).result.query.ast.data
assert "(\'columns \'(\'\"some_unknown_column\"))" in ast, "Invalid query ast"
9 changes: 5 additions & 4 deletions ydb/tests/fq/yds/test_select_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-

import logging
import sys

from ydb.tests.tools.fq_runner.kikimr_utils import yq_v1, yq_all

Expand Down Expand Up @@ -120,11 +121,11 @@ def test_compile_error(self, client, yq_version):
assert "Failed to parse query" in describe_string, describe_string

@yq_all
def test_ast_in_failed_query(self, client):
sql = "SELECT unwrap(1 / 0)"
def test_ast_in_failed_query_runtime(self, client):
sql = "SELECT unwrap(42 / 0) AS error_column"

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)

ast = str(client.describe_query(query_id).result.query.ast)
assert ast != "", "Query ast not found"
ast = client.describe_query(query_id).result.query.ast.data
assert "(\'\"error_column\" (Unwrap (/ (Int32 \'\"42\")" in ast, "Invalid query ast"

0 comments on commit fc4102c

Please sign in to comment.