Skip to content

Commit

Permalink
Merge 17f4cc2 into 064fd75
Browse files Browse the repository at this point in the history
  • Loading branch information
ssmike authored Sep 17, 2024
2 parents 064fd75 + 17f4cc2 commit 5a9a365
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 4 deletions.
31 changes: 27 additions & 4 deletions ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,18 @@ struct TKiExploreTxResults {
}
}

void AddResult(const TExprBase& result) {
void PrepareForResult() {
if (QueryBlocks.empty()) {
AddQueryBlock();
}

if (!ConcurrentResults && QueryBlocks.back().Results.size() > 0) {
AddQueryBlock();
}
}

void AddResult(const TExprBase& result) {
PrepareForResult();

auto& curBlock = QueryBlocks.back();
curBlock.Results.push_back(result);
Expand Down Expand Up @@ -422,6 +426,10 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
const auto& tableData = tablesData->ExistingTable(cluster, table);
YQL_ENSURE(tableData.Metadata);

if (!write.ReturningColumns().Empty()) {
txRes.PrepareForResult();
}

if (tableOp == TYdbOperation::UpdateOn) {
auto inputColumnsSetting = GetSetting(write.Settings().Ref(), "input_columns");
YQL_ENSURE(inputColumnsSetting);
Expand All @@ -445,7 +453,9 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
.Update(node)
.Columns(write.ReturningColumns())
.Build()
.Settings().Build()
.Settings()
.Add().Name().Value("columns").Build().Value(write.ReturningColumns()).Build()
.Build()
.Done());
}

Expand Down Expand Up @@ -480,6 +490,11 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
for (const auto& item : updateStructType->GetItems()) {
updateColumns.emplace(item->GetName());
}

if (!update.ReturningColumns().Empty()) {
txRes.PrepareForResult();
}

txRes.AddUpdateOpToQueryBlock(node, tableData.Metadata, updateColumns);
if (!update.ReturningColumns().Empty()) {
txRes.AddResult(
Expand All @@ -491,7 +506,9 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
.Update(node)
.Columns(update.ReturningColumns())
.Build()
.Settings().Build()
.Settings()
.Add().Name().Value("columns").Build().Value(update.ReturningColumns()).Build()
.Build()
.Done());
}

Expand All @@ -513,6 +530,10 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
YQL_ENSURE(tablesData);
const auto& tableData = tablesData->ExistingTable(cluster, table);
YQL_ENSURE(tableData.Metadata);
if (!del.ReturningColumns().Empty()) {
txRes.PrepareForResult();
}

txRes.AddWriteOpToQueryBlock(node, tableData.Metadata, tableOp & KikimrReadOps());
if (!del.ReturningColumns().Empty()) {
txRes.AddResult(
Expand All @@ -524,7 +545,9 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
.Update(node)
.Columns(del.ReturningColumns())
.Build()
.Settings().Build()
.Settings()
.Add().Name().Value("columns").Build().Value(del.ReturningColumns()).Build()
.Build()
.Done());
}

Expand Down
49 changes: 49 additions & 0 deletions ydb/core/kqp/ut/opt/kqp_returning_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,55 @@ Y_UNIT_TEST(ReturningSerial) {
}
}

Y_UNIT_TEST(ReturningColumnsOrder) {
auto kikimr = DefaultKikimrRunner();

auto client = kikimr.GetTableClient();
auto session = client.CreateSession().GetValueSync().GetSession();
auto db = kikimr.GetQueryClient();

const auto queryCreate = Q_(R"(
CREATE TABLE test1 (id Int32, v Text, PRIMARY KEY(id));
)");

auto resultCreate = session.ExecuteSchemeQuery(queryCreate).GetValueSync();
UNIT_ASSERT_C(resultCreate.IsSuccess(), resultCreate.GetIssues().ToString());

{
const auto query = Q_(R"(
UPSERT INTO test1 (id, v) VALUES (1, '321') RETURNING id, v;
REPLACE INTO test1 (id, v) VALUES (1, '111') RETURNING v, id;
)");

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT(result.IsSuccess());
CompareYson(R"([[[1];["321"]]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[["111"];[1]]])", FormatResultSetYson(result.GetResultSet(1)));
}

auto settings = NYdb::NQuery::TExecuteQuerySettings()
.Syntax(NYdb::NQuery::ESyntax::YqlV1)
.ConcurrentResultSets(false);
{
auto result = db.ExecuteQuery(R"(
UPSERT INTO test1 (id, v) VALUES (1, '321') RETURNING id, v;
REPLACE INTO test1 (id, v) VALUES (1, '111') RETURNING v, id;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
CompareYson(R"([[[1];["321"]]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[["111"];[1]]])", FormatResultSetYson(result.GetResultSet(1)));
}
{
auto it = db.StreamExecuteQuery(R"(
UPSERT INTO test1 (id, v) VALUES (2, '321') RETURNING id, v;
REPLACE INTO test1 (id, v) VALUES (2, '111') RETURNING v, id;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
Cerr << StreamResultToYson(it);
}

}

Y_UNIT_TEST(ReturningTypes) {
auto kikimr = DefaultKikimrRunner();

Expand Down

0 comments on commit 5a9a365

Please sign in to comment.