diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index cd4bf4be82fa..6dba8582563e 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -542,9 +542,9 @@ void PrintQueryStats(const TDataQueryResult& result) { } } -void AssertTableStats(const TDataQueryResult& result, TStringBuf table, const TExpectedTableStats& expectedStats) { - auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - +void AssertTableStats(const Ydb::TableStats::QueryStats& stats, TStringBuf table, + const TExpectedTableStats& expectedStats) +{ ui64 actualReads = 0; ui64 actualUpdates = 0; ui64 actualDeletes = 0; @@ -575,6 +575,11 @@ void AssertTableStats(const TDataQueryResult& result, TStringBuf table, const TE } } +void AssertTableStats(const TDataQueryResult& result, TStringBuf table, const TExpectedTableStats& expectedStats) { + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + return AssertTableStats(stats, table, expectedStats); +} + TDataQueryResult ExecQueryAndTestResult(TSession& session, const TString& query, const NYdb::TParams& params, const TString& expectedYson) { diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index 0210aa1aaca7..fce52247bd30 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -268,6 +268,12 @@ struct TExpectedTableStats { TMaybe ExpectedDeletes; }; +void AssertTableStats(const Ydb::TableStats::QueryStats& stats, TStringBuf table, + const TExpectedTableStats& expectedStats); + +void AssertTableStats(const NYdb::NTable::TDataQueryResult& result, TStringBuf table, + const TExpectedTableStats& expectedStats); + void AssertTableStats(const NYdb::NTable::TDataQueryResult& result, TStringBuf table, const TExpectedTableStats& expectedStats); diff --git a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp index dfcfb1226692..ddf538058fd2 100644 --- a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp +++ b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp @@ -9,8 +9,8 @@ using namespace NYdb::NTable; namespace { -TParams BuildUpdateParams(TTableClient& client) { - return client.GetParamsBuilder() +TParams BuildUpdateParams() { + return TParamsBuilder() .AddParam("$items") .BeginList() .AddListItem() @@ -28,8 +28,8 @@ TParams BuildUpdateParams(TTableClient& client) { .Build(); } -TParams BuildInsertParams(TTableClient& client) { - return client.GetParamsBuilder() +TParams BuildInsertParams() { + return TParamsBuilder() .AddParam("$items") .BeginList() .AddListItem() @@ -47,8 +47,8 @@ TParams BuildInsertParams(TTableClient& client) { .Build(); } -TParams BuildDeleteParams(TTableClient& client) { - return client.GetParamsBuilder() +TParams BuildDeleteParams() { + return TParamsBuilder() .AddParam("$items") .BeginList() .AddListItem() @@ -64,8 +64,8 @@ TParams BuildDeleteParams(TTableClient& client) { .Build(); } -TParams BuildUpdateIndexParams(TTableClient& client) { - return client.GetParamsBuilder() +TParams BuildUpdateIndexParams() { + return TParamsBuilder() .AddParam("$items") .BeginList() .AddListItem() @@ -85,8 +85,8 @@ TParams BuildUpdateIndexParams(TTableClient& client) { .Build(); } -TParams BuildDeleteIndexParams(TTableClient& client) { - return client.GetParamsBuilder() +TParams BuildDeleteIndexParams() { + return TParamsBuilder() .AddParam("$items") .BeginList() .AddListItem() @@ -102,8 +102,8 @@ TParams BuildDeleteIndexParams(TTableClient& client) { .Build(); } -TParams BuildInsertIndexParams(TTableClient& client) { - return client.GetParamsBuilder() +TParams BuildInsertIndexParams() { + return TParamsBuilder() .AddParam("$items") .BeginList() .AddListItem() @@ -117,39 +117,55 @@ TParams BuildInsertIndexParams(TTableClient& client) { .Build(); } +std::tuple ExecQuery(const TKikimrRunner& db, bool queryService, + const TString& query, const NYdb::TParams& params) +{ + if (queryService) { + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .StatsMode(NYdb::NQuery::EStatsMode::Full); + + auto result = db.GetQueryClient().ExecuteQuery(query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), + params, settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + return std::make_tuple(stats, result.GetResultSets()); + } else { + auto session = db.GetTableClient().CreateSession().GetValueSync().GetSession(); + + auto settings = NYdb::NTable::TExecDataQuerySettings() + .CollectQueryStats(ECollectQueryStatsMode::Full); + + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), + params, settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + return std::make_tuple(stats, result.GetResultSets()); + } +} + } // namespace Y_UNIT_TEST_SUITE(KqpQueryPerf) { - Y_UNIT_TEST_TWIN(KvRead, EnableSourceRead) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(EnableSourceRead); - auto settings = TKikimrSettings() - .SetAppConfig(appConfig); - TKikimrRunner kikimr{settings}; - - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); + Y_UNIT_TEST_TWIN(KvRead, QueryService) { + TKikimrRunner kikimr; - auto params = db.GetParamsBuilder() + auto params = TParamsBuilder() .AddParam("$key").Uint64(102).Build() .Build(); - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Full); - - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $key AS Uint64; SELECT * FROM EightShard WHERE Key = $key; - )"), TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - - // Cerr << stats.query_plan() << Endl; - - AssertTableStats(result, "/Root/EightShard", {.ExpectedReads = 1,}); + )"), params); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + if (QueryService) { + // TODO: Fix QS. + return; + } UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).affected_shards(), 1); @@ -158,36 +174,26 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { NJson::ReadJsonTree(stats.query_plan(), &plan, true); auto stages = FindPlanStages(plan); - UNIT_ASSERT_VALUES_EQUAL(stages.size(), EnableSourceRead ? 1 : 2); + UNIT_ASSERT_VALUES_EQUAL(stages.size(), 1); i64 totalTasks = 0; for (const auto& stage : stages) { totalTasks += stage.GetMapSafe().at("Stats").GetMapSafe().at("Tasks").GetIntegerSafe(); } - UNIT_ASSERT_VALUES_EQUAL(totalTasks, EnableSourceRead ? 1 : 2); + UNIT_ASSERT_VALUES_EQUAL(totalTasks, 1); } - Y_UNIT_TEST_TWIN(RangeLimitRead, EnableSourceRead) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(EnableSourceRead); - auto settings = TKikimrSettings() - .SetAppConfig(appConfig); - TKikimrRunner kikimr{settings}; - - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); + Y_UNIT_TEST_TWIN(RangeLimitRead, QueryService) { + TKikimrRunner kikimr; - auto params = db.GetParamsBuilder() + auto params = TParamsBuilder() .AddParam("$from").Int32(1).Build() .AddParam("$to").Int32(5).Build() .AddParam("$limit").Uint64(3).Build() .Build(); - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Full); - - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $from AS Int32; DECLARE $to AS Int32; DECLARE $limit AS Uint64; @@ -196,13 +202,16 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { WHERE Key >= $from AND Key < $to ORDER BY Key LIMIT $limit; - )"), TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + )"), params); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); // Cerr << stats.query_plan() << Endl; - AssertTableStats(result, "/Root/Join1", { + if (QueryService) { + // TODO: Fix QS. + return; + } + + AssertTableStats(stats, "/Root/Join1", { .ExpectedReads = 3, }); @@ -224,38 +233,32 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { UNIT_ASSERT_VALUES_EQUAL(totalTasks, 3); } - Y_UNIT_TEST_TWIN(RangeRead, EnableSourceRead) { - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(EnableSourceRead); - auto settings = TKikimrSettings() - .SetAppConfig(appConfig); - TKikimrRunner kikimr{settings}; - - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); + Y_UNIT_TEST_TWIN(RangeRead, QueryService) { + TKikimrRunner kikimr; - auto params = db.GetParamsBuilder() + auto params = TParamsBuilder() .AddParam("$from").Int32(2).Build() .AddParam("$to").Int32(7).Build() .Build(); - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Full); - - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $from AS Int32; DECLARE $to AS Int32; SELECT * FROM Join1 WHERE Key > $from AND Key <= $to ORDER BY Key; - )"), TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + )"), params); + - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); // Cerr << stats.query_plan() << Endl; - AssertTableStats(result, "/Root/Join1", { + if (QueryService) { + // TODO: Fix QS. + return; + } + + AssertTableStats(stats, "/Root/Join1", { .ExpectedReads = 5, }); @@ -272,30 +275,24 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { for (const auto& stage : stages) { totalTasks += stage.GetMapSafe().at("Stats").GetMapSafe().at("Tasks").GetIntegerSafe(); } - UNIT_ASSERT_VALUES_EQUAL(totalTasks, EnableSourceRead ? 2 : 3); + UNIT_ASSERT_VALUES_EQUAL(totalTasks, 2); } - Y_UNIT_TEST_TWIN(IndexLookupJoin, EnableStreamLookup) { + Y_UNIT_TEST_QUAD(IndexLookupJoin, EnableStreamLookup, QueryService) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(EnableStreamLookup); auto settings = TKikimrSettings() .SetAppConfig(appConfig); TKikimrRunner kikimr{settings}; - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); - - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, results] = ExecQuery(kikimr, QueryService, Q1_(R"( SELECT l.Key, r.Key1, r.Key2 FROM `/Root/Join1` AS l INNER JOIN `/Root/Join2` AS r ON l.Fk21 = r.Key1 ORDER BY l.Key, r.Key1, r.Key2; - )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + )"), TParamsBuilder().Build()); + CompareYson(R"([ [[1];[101u];["One"]]; [[1];[101u];["Three"]]; @@ -310,45 +307,42 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { [[9];[101u];["One"]]; [[9];[101u];["Three"]]; [[9];[101u];["Two"]] - ])", FormatResultSetYson(result.GetResultSet(0))); + ])", FormatResultSetYson(results[0])); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + if (QueryService) { + // TODO: Fix QS. + return; + } - AssertTableStats(result, "/Root/Join1", { + AssertTableStats(stats, "/Root/Join1", { .ExpectedReads = 9, }); - AssertTableStats(result, "/Root/Join2", { + AssertTableStats(stats, "/Root/Join2", { .ExpectedReads = EnableStreamLookup ? 13 : 10, }); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), EnableStreamLookup ? 1 : 3); } - Y_UNIT_TEST(Upsert) { + Y_UNIT_TEST_TWIN(Upsert, QueryService) { auto kikimr = DefaultKikimrRunner(); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - auto params = BuildUpdateParams(db); + auto params = BuildUpdateParams(); - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); - - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $items AS List>; UPSERT INTO EightShard SELECT * FROM AS_TABLE($items); - )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + )"), params); + - AssertTableStats(result, "/Root/EightShard", { + AssertTableStats(stats, "/Root/EightShard", { .ExpectedReads = 0, .ExpectedUpdates = 2, }); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); // TODO: Get rid of additional precompute stage for adding optionality to row members UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); @@ -358,31 +352,23 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { } } - Y_UNIT_TEST(Replace) { + Y_UNIT_TEST_TWIN(Replace, QueryService) { auto kikimr = DefaultKikimrRunner(); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - - auto params = BuildUpdateParams(db); - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + auto params = BuildUpdateParams(); - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $items AS List>; REPLACE INTO EightShard SELECT * FROM AS_TABLE($items); - )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + )"), params); - AssertTableStats(result, "/Root/EightShard", { + AssertTableStats(stats, "/Root/EightShard", { .ExpectedReads = 0, .ExpectedUpdates = 2, }); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - // Single-phase REPLACE require additional runtime write callable UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); @@ -391,31 +377,23 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { } } - Y_UNIT_TEST(UpdateOn) { + Y_UNIT_TEST_TWIN(UpdateOn, QueryService) { auto kikimr = DefaultKikimrRunner(); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - - auto params = BuildUpdateParams(db); - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + auto params = BuildUpdateParams(); - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $items AS List>; UPDATE EightShard ON SELECT * FROM AS_TABLE($items); - )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + )"), params); - AssertTableStats(result, "/Root/EightShard", { + AssertTableStats(stats, "/Root/EightShard", { .ExpectedReads = 1, // Non-existing keys don't count in reads .ExpectedUpdates = 1, }); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - // Two-phase UPDATE ON require more complex runtime callables UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); @@ -424,31 +402,23 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { } } - Y_UNIT_TEST(Insert) { + Y_UNIT_TEST_TWIN(Insert, QueryService) { auto kikimr = DefaultKikimrRunner(); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - auto params = BuildInsertParams(db); + auto params = BuildInsertParams(); - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); - - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $items AS List>; INSERT INTO EightShard SELECT * FROM AS_TABLE($items); - )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + )"), params); - AssertTableStats(result, "/Root/EightShard", { + AssertTableStats(stats, "/Root/EightShard", { .ExpectedReads = 0, // Non-existing keys don't count in reads .ExpectedUpdates = 2, }); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - // Three-phase INSERT require more complex runtime callables UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 4); @@ -457,31 +427,23 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { } } - Y_UNIT_TEST(DeleteOn) { + Y_UNIT_TEST_TWIN(DeleteOn, QueryService) { auto kikimr = DefaultKikimrRunner(); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - auto params = BuildDeleteParams(db); + auto params = BuildDeleteParams(); - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); - - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $items AS List>; DELETE FROM EightShard ON SELECT * FROM AS_TABLE($items); - )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + )"), params); - AssertTableStats(result, "/Root/EightShard", { + AssertTableStats(stats, "/Root/EightShard", { .ExpectedReads = 0, .ExpectedDeletes = 2, }); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - // TODO: Get rid of additional precompute stage for adding optionality to row members UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); @@ -490,34 +452,31 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { } } - Y_UNIT_TEST(Update) { + Y_UNIT_TEST_TWIN(Update, QueryService) { auto kikimr = DefaultKikimrRunner(); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - auto params = db.GetParamsBuilder() + auto params = TParamsBuilder() .AddParam("$key").Uint64(201).Build() - .Build(); - - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); + .Build(); - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $key AS Uint64; UPDATE EightShard SET Data = Data + 1 WHERE Key = $key; - )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + )"), params); + + if (QueryService) { + // TODO: Fix QS. + return; + } - AssertTableStats(result, "/Root/EightShard", { + AssertTableStats(stats, "/Root/EightShard", { .ExpectedReads = 1, .ExpectedUpdates = 1, }); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); for (const auto& phase : stats.query_phases()) { @@ -525,35 +484,32 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { } } - Y_UNIT_TEST(Delete) { + Y_UNIT_TEST_TWIN(Delete, QueryService) { auto kikimr = DefaultKikimrRunner(); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - auto params = db.GetParamsBuilder() + auto params = TParamsBuilder() .AddParam("$key").Uint64(201).Build() .AddParam("$text").String("Value1").Build() - .Build(); - - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); + .Build(); - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $key AS Uint64; DECLARE $text AS String; DELETE FROM EightShard WHERE Key = $key AND Text = $text; - )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + )"), params); + + if (QueryService) { + // TODO: Fix QS. + return; + } - AssertTableStats(result, "/Root/EightShard", { + AssertTableStats(stats, "/Root/EightShard", { .ExpectedReads = 1, .ExpectedDeletes = 1, }); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); for (const auto& phase : stats.query_phases()) { @@ -561,145 +517,114 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { } } - Y_UNIT_TEST(IndexUpsert) { + Y_UNIT_TEST_TWIN(IndexUpsert, QueryService) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); CreateSampleTablesWithIndex(session); - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); - - auto params = BuildUpdateIndexParams(db); + auto params = BuildUpdateIndexParams(); - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $items AS List>; UPSERT INTO SecondaryWithDataColumns SELECT * FROM AS_TABLE($items); - )"), TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )"), params); + - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); } - Y_UNIT_TEST(IndexReplace) { + Y_UNIT_TEST_TWIN(IndexReplace, QueryService) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); CreateSampleTablesWithIndex(session); - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + auto params = BuildUpdateIndexParams(); - auto params = BuildUpdateIndexParams(db); - - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $items AS List>; REPLACE INTO SecondaryWithDataColumns SELECT * FROM AS_TABLE($items); - )"), TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )"), params); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); } - Y_UNIT_TEST(IndexUpdateOn) { + Y_UNIT_TEST_TWIN(IndexUpdateOn, QueryService) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); CreateSampleTablesWithIndex(session); - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); - - auto params = BuildUpdateIndexParams(db); + auto params = BuildUpdateIndexParams(); - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $items AS List>; UPDATE SecondaryWithDataColumns ON SELECT * FROM AS_TABLE($items); - )"), TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )"), params); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); } - Y_UNIT_TEST(IndexDeleteOn) { + Y_UNIT_TEST_TWIN(IndexDeleteOn, QueryService) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); CreateSampleTablesWithIndex(session); - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); - - auto params = BuildDeleteIndexParams(db); + auto params = BuildDeleteIndexParams(); - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $items AS List>; DELETE FROM SecondaryWithDataColumns ON SELECT * FROM AS_TABLE($items); - )"), TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )"), params); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 4); } - Y_UNIT_TEST(IndexInsert) { + Y_UNIT_TEST_TWIN(IndexInsert, QueryService) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); CreateSampleTablesWithIndex(session); - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); - - auto params = BuildInsertIndexParams(db); + auto params = BuildInsertIndexParams(); - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $items AS List>; INSERT INTO SecondaryWithDataColumns SELECT * FROM AS_TABLE($items); - )"), TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )"), params); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 5); } - Y_UNIT_TEST(IdxLookupJoin) { + Y_UNIT_TEST_TWIN(IdxLookupJoin, QueryService) { TKikimrSettings settings; TKikimrRunner kikimr(settings); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); - auto params = db.GetParamsBuilder() + auto params = TParamsBuilder() .AddParam("$key").Int32(3).Build() .Build(); - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $key AS Int32; SELECT * FROM Join1 AS t1 INNER JOIN Join2 AS t2 ON t1.Fk21 = t2.Key1 AND t1.Fk22 = t2.Key2 WHERE t1.Key = $key; - )"), TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )"), params); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); } else { @@ -707,20 +632,15 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { } } - Y_UNIT_TEST(IdxLookupJoinThreeWay) { + Y_UNIT_TEST_TWIN(IdxLookupJoinThreeWay, QueryService) { TKikimrSettings settings; TKikimrRunner kikimr(settings); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); - auto params = db.GetParamsBuilder() + auto params = TParamsBuilder() .AddParam("$key").Int32(3).Build() .Build(); - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $key AS Int32; SELECT t1.Key, t3.Value @@ -728,10 +648,13 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { INNER JOIN Join2 AS t2 ON t1.Fk21 = t2.Key1 AND t1.Fk22 = t2.Key2 INNER JOIN KeyValue2 AS t3 ON t2.Name = t3.Key WHERE t1.Key = $key; - )"), TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + )"), params); + + if (QueryService) { + // TODO: Fix QS. + return; + } - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); } else { @@ -739,57 +662,40 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { } } - Y_UNIT_TEST(ComputeLength) { - auto kikimr = DefaultKikimrRunner(); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + Y_UNIT_TEST_TWIN(ComputeLength, QueryService) { + TKikimrRunner kikimr; - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, results] = ExecQuery(kikimr, QueryService, Q1_(R"( SELECT COUNT(*) FROM EightShard; - )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([[24u]])", FormatResultSetYson(result.GetResultSet(0))); + )"), TParamsBuilder().Build()); + + CompareYson(R"([[24u]])", FormatResultSetYson(results[0])); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); } - Y_UNIT_TEST(AggregateToScalar) { - auto kikimr = DefaultKikimrRunner(); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); + Y_UNIT_TEST_TWIN(AggregateToScalar, QueryService) { + TKikimrRunner kikimr; auto params = TParamsBuilder() .AddParam("$group").Uint32(1).Build() .Build(); - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, results] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $group AS Uint32; SELECT MIN(Name) AS MinName, SUM(Amount) AS TotalAmount FROM Test WHERE Group = $group; - )"), TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - CompareYson(R"([[["Anna"];[3800u]]])", FormatResultSetYson(result.GetResultSet(0))); + )"), params); + + CompareYson(R"([[["Anna"];[3800u]]])", FormatResultSetYson(results[0])); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); } - Y_UNIT_TEST(MultiDeleteFromTable) { + Y_UNIT_TEST_TWIN(MultiDeleteFromTable, QueryService) { TKikimrRunner kikimr; - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - - NYdb::NTable::TExecDataQuerySettings execSettings; - execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic); auto params = TParamsBuilder() .AddParam("$key1_1").Uint32(101).Build() @@ -798,7 +704,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { .AddParam("$key2_2").String("Two").Build() .Build(); - auto result = session.ExecuteDataQuery(Q1_(R"( + auto [stats, results] = ExecQuery(kikimr, QueryService, Q1_(R"( DECLARE $key1_1 AS Uint32; DECLARE $key1_2 AS String; DECLARE $key2_1 AS Uint32; @@ -809,51 +715,37 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { DELETE FROM Join2 ON SELECT * FROM $fetch1; DELETE FROM Join2 ON SELECT * FROM $fetch2; + )"), params); - )"), TTxControl::BeginTx().CommitTx(), params, execSettings).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - auto checkResult = session.ExecuteDataQuery(Q1_(R"( + auto [_, checkResults] = ExecQuery(kikimr, QueryService, Q1_(R"( SELECT COUNT(*) FROM Join2; - )"), TTxControl::BeginTx().CommitTx()).ExtractValueSync(); - UNIT_ASSERT_C(checkResult.IsSuccess(), checkResult.GetIssues().ToString()); - CompareYson(R"([[7u]])", FormatResultSetYson(checkResult.GetResultSet(0))); + )"), TParamsBuilder().Build()); + + CompareYson(R"([[7u]])", FormatResultSetYson(checkResults[0])); - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); - AssertTableStats(result, "/Root/Join2", { + AssertTableStats(stats, "/Root/Join2", { .ExpectedReads = 3, .ExpectedDeletes = 3, }); } - Y_UNIT_TEST_TWIN(MultiRead, SourceRead) { - TKikimrSettings settings; - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(SourceRead); - settings.SetAppConfig(appConfig); + Y_UNIT_TEST_TWIN(MultiRead, QueryService) { + TKikimrRunner kikimr; - TKikimrRunner kikimr(settings); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); + auto [stats, _] = ExecQuery(kikimr, QueryService, Q1_(R"( + SELECT * FROM `/Root/KeyValueLargePartition` WHERE Key > 101; + SELECT * FROM `/Root/KeyValueLargePartition` Where Key < 201; + )"), TParamsBuilder().Build()); - { - auto settings = NYdb::NTable::TExecDataQuerySettings().CollectQueryStats(ECollectQueryStatsMode::Full); - auto result = session.ExecuteDataQuery(R"( - SELECT * FROM `/Root/KeyValueLargePartition` WHERE Key > 101; - SELECT * FROM `/Root/KeyValueLargePartition` Where Key < 201; - )", TTxControl::BeginTx().CommitTx(), settings).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - size_t partitionsCount = 0; - auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - for (auto& phase : stats.query_phases()) { - for (auto& read : phase.table_access()) { - partitionsCount += read.partitions_count(); - } + size_t partitionsCount = 0; + for (auto& phase : stats.query_phases()) { + for (auto& read : phase.table_access()) { + partitionsCount += read.partitions_count(); } - UNIT_ASSERT_VALUES_EQUAL(partitionsCount, SourceRead ? 2 : 1); } + UNIT_ASSERT_VALUES_EQUAL(partitionsCount, 2); } }