Skip to content

Commit

Permalink
fix(kqp): always take snapshot for queries with stream lookup (#8267)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulya-sidorina authored Aug 26, 2024
1 parent 4ac0ad1 commit da98ea2
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 1 deletion.
5 changes: 5 additions & 0 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
return false;
}

// We need snapshot for stream lookup, besause it's used for dependent reads
if (hasStreamLookup) {
return true;
}

// We need snapshot when there are multiple table read phases, most
// likely it involves multiple tables and we would have to use a
// distributed commit otherwise. Taking snapshot helps as avoid TLI
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,8 @@ bool IsCrossShardChannel(const TKqpTasksGraph& tasksGraph, const TChannel& chann
return false;
}

return targetShard != tasksGraph.GetTask(channel.SrcTask).Meta.ShardId;
ui64 srcShard = tasksGraph.GetTask(channel.SrcTask).Meta.ShardId;
return srcShard && targetShard != srcShard;
}

void TShardKeyRanges::AddPoint(TSerializedCellVec&& point) {
Expand Down
104 changes: 104 additions & 0 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,110 @@ using namespace NYdb;
using namespace NYdb::NTable;

Y_UNIT_TEST_SUITE(KqpNewEngine) {
Y_UNIT_TEST(StreamLookupWithView) {
TKikimrSettings settings = TKikimrSettings().SetWithSampleTables(false);
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetIndexAutoChooseMode(NKikimrConfig::TTableServiceConfig_EIndexAutoChooseMode_MAX_USED_PREFIX);
appConfig.MutableFeatureFlags()->SetEnableViews(true);
settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
settings.SetAppConfig(appConfig);

auto kikimr = TKikimrRunner{settings};
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableViews(true);

auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

AssertSuccessResult(session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE `object_table`
(
object_id utf8,
role utf8,
id utf8 not NULL,
primary key (id)
);
ALTER TABLE `object_table` ADD INDEX `object_id_index` GLOBAL ON (object_id);
ALTER TABLE `object_table` ADD INDEX `role_index` GLOBAL ON (role);
CREATE TABLE `role_table`
(
granted_by_role utf8,
granted_role utf8,
role_type utf8,
role utf8,
id utf8 not NULL,
primary key (id)
);
ALTER TABLE `role_table` ADD INDEX `granted_by_role_index` GLOBAL ON (granted_by_role);
ALTER TABLE `role_table` ADD INDEX `granted_role_index` GLOBAL ON (granted_role);
ALTER TABLE `role_table` ADD INDEX `role_index` GLOBAL ON (role);
CREATE TABLE `access_table`
(
endpoints utf8,
name utf8,
class utf8,
type utf8,
id utf8 not NULL,
primary key (id)
);
ALTER TABLE `access_table` ADD INDEX `endpoints_index` GLOBAL ON (endpoints);
ALTER TABLE `access_table` ADD INDEX `class_index` GLOBAL ON (class);
)").GetValueSync());

AssertSuccessResult(session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE VIEW granted_privilege WITH (security_invoker = TRUE) AS
SELECT DISTINCT
object_table.object_id AS object_id,
role_table.granted_role AS granted_role,
access_table.id AS id,
role_table.role AS role,
access_table.`type` AS object_type,
FROM `/Root/access_table` AS access_table
INNER JOIN `/Root/object_table` AS object_table ON access_table.id = object_table.object_id
INNER JOIN `/Root/role_table` AS role_table ON object_table.role = role_table.granted_role
)").GetValueSync());

auto result = session.ExecuteDataQuery(R"(
UPSERT INTO `access_table` (id, type) VALUES
("10", "OPERATION_PRIVILEGE");
UPSERT INTO `role_table` (id, granted_role, role_type) VALUES
("10", "admin", "USER_ROLE");
UPSERT INTO `object_table` (id, object_id, role) VALUES
("10", "10", "admin");
)", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
AssertSuccessResult(result);

auto testQueryParams = [&] (TString query, TParams params) {
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params).GetValueSync();
AssertSuccessResult(result);

Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl;
};

auto params = kikimr.GetTableClient().GetParamsBuilder()
.AddParam("$jp1").Utf8("admin").Build()
.AddParam("$jp2").Utf8("10").Build()
.AddParam("$jp3").Uint64(2).Build()
.Build();

testQueryParams(R"(
--!syntax_v1
DECLARE $jp1 AS Text;
DECLARE $jp2 AS Text;
DECLARE $jp3 AS Uint64;
select g1_0.id from granted_privilege g1_0 where (
g1_0.role = 'admin'
) and g1_0.role=$jp1 and g1_0.object_type=$jp2 limit $jp3
)", params);
}

Y_UNIT_TEST(Select1) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
Expand Down

0 comments on commit da98ea2

Please sign in to comment.