diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index 12ed99ef6c30..2c3419e3b197 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -195,6 +195,11 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig return false; } + // We need snapshot for stream lookup, besause it's used for dependent reads + if (hasStreamLookup) { + return true; + } + // We need snapshot when there are multiple table read phases, most // likely it involves multiple tables and we would have to use a // distributed commit otherwise. Taking snapshot helps as avoid TLI diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index b89b83e45785..6ebb045bd53a 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -536,7 +536,8 @@ bool IsCrossShardChannel(const TKqpTasksGraph& tasksGraph, const TChannel& chann return false; } - return targetShard != tasksGraph.GetTask(channel.SrcTask).Meta.ShardId; + ui64 srcShard = tasksGraph.GetTask(channel.SrcTask).Meta.ShardId; + return srcShard && targetShard != srcShard; } void TShardKeyRanges::AddPoint(TSerializedCellVec&& point) { diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index b153add48ff5..7e99ddcee3f3 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -11,6 +11,110 @@ using namespace NYdb; using namespace NYdb::NTable; Y_UNIT_TEST_SUITE(KqpNewEngine) { + Y_UNIT_TEST(StreamLookupWithView) { + TKikimrSettings settings = TKikimrSettings().SetWithSampleTables(false); + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetIndexAutoChooseMode(NKikimrConfig::TTableServiceConfig_EIndexAutoChooseMode_MAX_USED_PREFIX); + appConfig.MutableFeatureFlags()->SetEnableViews(true); + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + settings.SetAppConfig(appConfig); + + auto kikimr = TKikimrRunner{settings}; + kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableViews(true); + + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + + CREATE TABLE `object_table` + ( + object_id utf8, + role utf8, + id utf8 not NULL, + primary key (id) + ); + + ALTER TABLE `object_table` ADD INDEX `object_id_index` GLOBAL ON (object_id); + ALTER TABLE `object_table` ADD INDEX `role_index` GLOBAL ON (role); + + CREATE TABLE `role_table` + ( + granted_by_role utf8, + granted_role utf8, + role_type utf8, + role utf8, + id utf8 not NULL, + primary key (id) + ); + + ALTER TABLE `role_table` ADD INDEX `granted_by_role_index` GLOBAL ON (granted_by_role); + ALTER TABLE `role_table` ADD INDEX `granted_role_index` GLOBAL ON (granted_role); + ALTER TABLE `role_table` ADD INDEX `role_index` GLOBAL ON (role); + + CREATE TABLE `access_table` + ( + endpoints utf8, + name utf8, + class utf8, + type utf8, + id utf8 not NULL, + primary key (id) + ); + + ALTER TABLE `access_table` ADD INDEX `endpoints_index` GLOBAL ON (endpoints); + ALTER TABLE `access_table` ADD INDEX `class_index` GLOBAL ON (class); + )").GetValueSync()); + + AssertSuccessResult(session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE VIEW granted_privilege WITH (security_invoker = TRUE) AS + SELECT DISTINCT + object_table.object_id AS object_id, + role_table.granted_role AS granted_role, + access_table.id AS id, + role_table.role AS role, + access_table.`type` AS object_type, + FROM `/Root/access_table` AS access_table + INNER JOIN `/Root/object_table` AS object_table ON access_table.id = object_table.object_id + INNER JOIN `/Root/role_table` AS role_table ON object_table.role = role_table.granted_role + )").GetValueSync()); + + auto result = session.ExecuteDataQuery(R"( + UPSERT INTO `access_table` (id, type) VALUES + ("10", "OPERATION_PRIVILEGE"); + UPSERT INTO `role_table` (id, granted_role, role_type) VALUES + ("10", "admin", "USER_ROLE"); + UPSERT INTO `object_table` (id, object_id, role) VALUES + ("10", "10", "admin"); + )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + AssertSuccessResult(result); + + auto testQueryParams = [&] (TString query, TParams params) { + auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params).GetValueSync(); + AssertSuccessResult(result); + + Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl; + }; + + auto params = kikimr.GetTableClient().GetParamsBuilder() + .AddParam("$jp1").Utf8("admin").Build() + .AddParam("$jp2").Utf8("10").Build() + .AddParam("$jp3").Uint64(2).Build() + .Build(); + + testQueryParams(R"( + --!syntax_v1 + DECLARE $jp1 AS Text; + DECLARE $jp2 AS Text; + DECLARE $jp3 AS Uint64; + select g1_0.id from granted_privilege g1_0 where ( + g1_0.role = 'admin' + ) and g1_0.role=$jp1 and g1_0.object_type=$jp2 limit $jp3 + )", params); + } + Y_UNIT_TEST(Select1) { auto settings = TKikimrSettings() .SetWithSampleTables(false);