From fd4d415e05abbb660f69f3c5c15253afdc067b52 Mon Sep 17 00:00:00 2001 From: Mikhail Surin Date: Fri, 19 Jul 2024 14:54:39 +0300 Subject: [PATCH 1/2] fix columns order in returning list (#6861) --- .../kqp/provider/yql_kikimr_opt_build.cpp | 12 ++++-- ydb/core/kqp/ut/opt/kqp_returning_ut.cpp | 40 +++++++++++++++++++ 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index 5220fba92a72..79e8cfb340ea 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -445,7 +445,9 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T .Update(node) .Columns(write.ReturningColumns()) .Build() - .Settings().Build() + .Settings() + .Add().Name().Value("columns").Build().Value(write.ReturningColumns()).Build() + .Build() .Done()); } @@ -491,7 +493,9 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T .Update(node) .Columns(update.ReturningColumns()) .Build() - .Settings().Build() + .Settings() + .Add().Name().Value("columns").Build().Value(update.ReturningColumns()).Build() + .Build() .Done()); } @@ -524,7 +528,9 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T .Update(node) .Columns(del.ReturningColumns()) .Build() - .Settings().Build() + .Settings() + .Add().Name().Value("columns").Build().Value(del.ReturningColumns()).Build() + .Build() .Done()); } diff --git a/ydb/core/kqp/ut/opt/kqp_returning_ut.cpp b/ydb/core/kqp/ut/opt/kqp_returning_ut.cpp index 7035b4d6927b..837d81d654de 100644 --- a/ydb/core/kqp/ut/opt/kqp_returning_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_returning_ut.cpp @@ -134,6 +134,46 @@ Y_UNIT_TEST(ReturningSerial) { } } +Y_UNIT_TEST(ReturningColumnsOrder) { + auto kikimr = DefaultKikimrRunner(); + + auto client = kikimr.GetTableClient(); + auto session = client.CreateSession().GetValueSync().GetSession(); + auto db = kikimr.GetQueryClient(); + + const auto queryCreate = Q_(R"( + CREATE TABLE test1 (id Int32, v Text, PRIMARY KEY(id)); + )"); + + auto resultCreate = session.ExecuteSchemeQuery(queryCreate).GetValueSync(); + UNIT_ASSERT_C(resultCreate.IsSuccess(), resultCreate.GetIssues().ToString()); + + { + const auto query = Q_(R"( + UPSERT INTO test1 (id, v) VALUES (1, '321') RETURNING id, v; + REPLACE INTO test1 (id, v) VALUES (1, '111') RETURNING v, id; + )"); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(result.IsSuccess()); + CompareYson(R"([[[1];["321"]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[["111"];[1]]])", FormatResultSetYson(result.GetResultSet(1))); + } + + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .Syntax(NYdb::NQuery::ESyntax::YqlV1); + { + auto result = db.ExecuteQuery(R"( + UPSERT INTO test1 (id, v) VALUES (1, '321') RETURNING id, v; + REPLACE INTO test1 (id, v) VALUES (1, '111') RETURNING v, id; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[1];["321"]]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[["111"];[1]]])", FormatResultSetYson(result.GetResultSet(1))); + } + +} + Y_UNIT_TEST(ReturningTypes) { auto kikimr = DefaultKikimrRunner(); From 17f4cc240b51525930a7d87125cd9ba3f740003a Mon Sep 17 00:00:00 2001 From: Mikhail Surin Date: Wed, 24 Jul 2024 14:57:21 +0300 Subject: [PATCH 2/2] Ensure returning is in the same block (#6980) --- .../kqp/provider/yql_kikimr_opt_build.cpp | 19 ++++++++++++++++++- ydb/core/kqp/ut/opt/kqp_returning_ut.cpp | 11 ++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index 79e8cfb340ea..5c4d7633ba55 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -247,7 +247,7 @@ struct TKiExploreTxResults { } } - void AddResult(const TExprBase& result) { + void PrepareForResult() { if (QueryBlocks.empty()) { AddQueryBlock(); } @@ -255,6 +255,10 @@ struct TKiExploreTxResults { if (!ConcurrentResults && QueryBlocks.back().Results.size() > 0) { AddQueryBlock(); } + } + + void AddResult(const TExprBase& result) { + PrepareForResult(); auto& curBlock = QueryBlocks.back(); curBlock.Results.push_back(result); @@ -422,6 +426,10 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T const auto& tableData = tablesData->ExistingTable(cluster, table); YQL_ENSURE(tableData.Metadata); + if (!write.ReturningColumns().Empty()) { + txRes.PrepareForResult(); + } + if (tableOp == TYdbOperation::UpdateOn) { auto inputColumnsSetting = GetSetting(write.Settings().Ref(), "input_columns"); YQL_ENSURE(inputColumnsSetting); @@ -482,6 +490,11 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T for (const auto& item : updateStructType->GetItems()) { updateColumns.emplace(item->GetName()); } + + if (!update.ReturningColumns().Empty()) { + txRes.PrepareForResult(); + } + txRes.AddUpdateOpToQueryBlock(node, tableData.Metadata, updateColumns); if (!update.ReturningColumns().Empty()) { txRes.AddResult( @@ -517,6 +530,10 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T YQL_ENSURE(tablesData); const auto& tableData = tablesData->ExistingTable(cluster, table); YQL_ENSURE(tableData.Metadata); + if (!del.ReturningColumns().Empty()) { + txRes.PrepareForResult(); + } + txRes.AddWriteOpToQueryBlock(node, tableData.Metadata, tableOp & KikimrReadOps()); if (!del.ReturningColumns().Empty()) { txRes.AddResult( diff --git a/ydb/core/kqp/ut/opt/kqp_returning_ut.cpp b/ydb/core/kqp/ut/opt/kqp_returning_ut.cpp index 837d81d654de..7b8649ec4e2b 100644 --- a/ydb/core/kqp/ut/opt/kqp_returning_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_returning_ut.cpp @@ -161,7 +161,8 @@ Y_UNIT_TEST(ReturningColumnsOrder) { } auto settings = NYdb::NQuery::TExecuteQuerySettings() - .Syntax(NYdb::NQuery::ESyntax::YqlV1); + .Syntax(NYdb::NQuery::ESyntax::YqlV1) + .ConcurrentResultSets(false); { auto result = db.ExecuteQuery(R"( UPSERT INTO test1 (id, v) VALUES (1, '321') RETURNING id, v; @@ -171,6 +172,14 @@ Y_UNIT_TEST(ReturningColumnsOrder) { CompareYson(R"([[[1];["321"]]])", FormatResultSetYson(result.GetResultSet(0))); CompareYson(R"([[["111"];[1]]])", FormatResultSetYson(result.GetResultSet(1))); } + { + auto it = db.StreamExecuteQuery(R"( + UPSERT INTO test1 (id, v) VALUES (2, '321') RETURNING id, v; + REPLACE INTO test1 (id, v) VALUES (2, '111') RETURNING v, id; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + Cerr << StreamResultToYson(it); + } }