Skip to content

Commit

Permalink
resolves #2078: sort YdbResults via ColumnHints
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Feb 23, 2024
1 parent 0293b47 commit 4b8732a
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 7 deletions.
23 changes: 19 additions & 4 deletions ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,12 +520,27 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
return false;
}

auto resultMeta = queryBindingProto.MutableResultSetMeta();
auto resultMetaColumns = queryBindingProto.MutableResultSetMeta()->Mutablecolumns();
for (size_t i = 0; i < kikimrProto.GetStruct().MemberSize(); i++) {
resultMetaColumns->Add();
}

THashMap<TString, int> columnOrder;
columnOrder.reserve(kikimrProto.GetStruct().MemberSize());
if (!txResult.GetColumnHints().empty()) {
YQL_ENSURE(txResult.GetColumnHints().size() == (int)kikimrProto.GetStruct().MemberSize());
for (int i = 0; i < txResult.GetColumnHints().size(); i++) {
const auto& hint = txResult.GetColumnHints().at(i);
columnOrder[TString(hint)] = i;
}
}

int id = 0;
for (const auto& column : kikimrProto.GetStruct().GetMember()) {
auto columnMeta = resultMeta->add_columns();
columnMeta->set_name(column.GetName());
ConvertMiniKQLTypeToYdbType(column.GetType(), *columnMeta->mutable_type());
int bindingColumnId = columnOrder.count(column.GetName()) ? columnOrder.at(column.GetName()) : id++;
auto& columnMeta = resultMetaColumns->at(bindingColumnId);
columnMeta.Setname(column.GetName());
ConvertMiniKQLTypeToYdbType(column.GetType(), *columnMeta.mutable_type());
}
}

Expand Down
62 changes: 59 additions & 3 deletions ydb/core/kqp/ut/pg/kqp_pg_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#include "ydb/public/sdk/cpp/client/ydb_proto/accessor.h"
#include <ydb/core/kqp/common/events/events.h>
#include <ydb/core/kqp/common/simple/services.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>

#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>

#include <ydb/library/yql/parser/pg_catalog/catalog.h>
Expand All @@ -9,7 +13,6 @@


extern "C" {
#include "postgres.h"
#include "catalog/pg_type_d.h"
}

Expand Down Expand Up @@ -3853,7 +3856,7 @@ Y_UNIT_TEST_SUITE(KqpPg) {
INSERT INTO RecompileTable (id) VALUES (1);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

result = db.ExecuteQuery(R"(
DROP TABLE RecompileTable;
)", NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync();
Expand Down Expand Up @@ -4037,6 +4040,59 @@ Y_UNIT_TEST_SUITE(KqpPg) {
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
}
}

Y_UNIT_TEST(ExplainColumnsReorder) {
TPortManager tp;
ui16 mbusport = tp.GetPort(2134);
auto settings = Tests::TServerSettings(mbusport)
.SetDomainName("Root")
.SetUseRealThreads(false);

Tests::TServer::TPtr server = new Tests::TServer(settings);

auto runtime = server->GetRuntime();
auto sender = runtime->AllocateEdgeActor();
auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));

InitRoot(server, sender);

auto createSession = [&]() {
runtime->Send(new IEventHandle(kqpProxy, sender, new TEvKqp::TEvCreateSessionRequest()));
auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvCreateSessionResponse>(sender);
auto record = reply->Get()->Record;
UNIT_ASSERT_VALUES_EQUAL(record.GetYdbStatus(), Ydb::StatusIds::SUCCESS);
return record.GetResponse().GetSessionId();
};

auto sendQuery = [&](const TString& queryText) {
auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN);
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);
ev->Record.MutableRequest()->SetQuery(queryText);
ev->Record.MutableRequest()->SetSyntax(::Ydb::Query::Syntax::SYNTAX_PG);
ev->Record.MutableRequest()->SetKeepSession(false);
ActorIdToProto(sender, ev->Record.MutableRequestActorId());

runtime->Send(new IEventHandle(kqpProxy, sender, ev.release()));
return runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
};

createSession();

auto reply = sendQuery(R"(
SELECT 2 y, 1 x;
)");

UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetResponse().GetYdbResults().size(), 1);
Cerr << reply->Get()->Record.GetRef().GetResponse().DebugString() << Endl;
auto ydbResults = reply->Get()->Record.GetRef().GetResponse().GetYdbResults();
TVector <TString> colNames = {"y", "x"};
UNIT_ASSERT_VALUES_EQUAL(colNames.size(), ydbResults.begin()->Getcolumns().size());
for (size_t i = 0; i < colNames.size(); i++) {
UNIT_ASSERT_VALUES_EQUAL(ydbResults.begin()->Getcolumns().at(i).Getname(), colNames[i]);
}
}
}

} // namespace NKqp
Expand Down

0 comments on commit 4b8732a

Please sign in to comment.