Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kqp): always take snapshot for queries with stream lookup #8267

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig
return false;
}

// We need snapshot for stream lookup, besause it's used for dependent reads
if (hasStreamLookup) {
return true;
}

// We need snapshot when there are multiple table read phases, most
// likely it involves multiple tables and we would have to use a
// distributed commit otherwise. Taking snapshot helps as avoid TLI
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,8 @@ bool IsCrossShardChannel(const TKqpTasksGraph& tasksGraph, const TChannel& chann
return false;
}

return targetShard != tasksGraph.GetTask(channel.SrcTask).Meta.ShardId;
ui64 srcShard = tasksGraph.GetTask(channel.SrcTask).Meta.ShardId;
return srcShard && targetShard != srcShard;
}

void TShardKeyRanges::AddPoint(TSerializedCellVec&& point) {
Expand Down
104 changes: 104 additions & 0 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,110 @@ using namespace NYdb;
using namespace NYdb::NTable;

Y_UNIT_TEST_SUITE(KqpNewEngine) {
Y_UNIT_TEST(StreamLookupWithView) {
TKikimrSettings settings = TKikimrSettings().SetWithSampleTables(false);
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetIndexAutoChooseMode(NKikimrConfig::TTableServiceConfig_EIndexAutoChooseMode_MAX_USED_PREFIX);
appConfig.MutableFeatureFlags()->SetEnableViews(true);
settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
settings.SetAppConfig(appConfig);

auto kikimr = TKikimrRunner{settings};
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableViews(true);

auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

AssertSuccessResult(session.ExecuteSchemeQuery(R"(
--!syntax_v1

CREATE TABLE `object_table`
(
object_id utf8,
role utf8,
id utf8 not NULL,
primary key (id)
);

ALTER TABLE `object_table` ADD INDEX `object_id_index` GLOBAL ON (object_id);
ALTER TABLE `object_table` ADD INDEX `role_index` GLOBAL ON (role);

CREATE TABLE `role_table`
(
granted_by_role utf8,
granted_role utf8,
role_type utf8,
role utf8,
id utf8 not NULL,
primary key (id)
);

ALTER TABLE `role_table` ADD INDEX `granted_by_role_index` GLOBAL ON (granted_by_role);
ALTER TABLE `role_table` ADD INDEX `granted_role_index` GLOBAL ON (granted_role);
ALTER TABLE `role_table` ADD INDEX `role_index` GLOBAL ON (role);

CREATE TABLE `access_table`
(
endpoints utf8,
name utf8,
class utf8,
type utf8,
id utf8 not NULL,
primary key (id)
);

ALTER TABLE `access_table` ADD INDEX `endpoints_index` GLOBAL ON (endpoints);
ALTER TABLE `access_table` ADD INDEX `class_index` GLOBAL ON (class);
)").GetValueSync());

AssertSuccessResult(session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE VIEW granted_privilege WITH (security_invoker = TRUE) AS
SELECT DISTINCT
object_table.object_id AS object_id,
role_table.granted_role AS granted_role,
access_table.id AS id,
role_table.role AS role,
access_table.`type` AS object_type,
FROM `/Root/access_table` AS access_table
INNER JOIN `/Root/object_table` AS object_table ON access_table.id = object_table.object_id
INNER JOIN `/Root/role_table` AS role_table ON object_table.role = role_table.granted_role
)").GetValueSync());

auto result = session.ExecuteDataQuery(R"(
UPSERT INTO `access_table` (id, type) VALUES
("10", "OPERATION_PRIVILEGE");
UPSERT INTO `role_table` (id, granted_role, role_type) VALUES
("10", "admin", "USER_ROLE");
UPSERT INTO `object_table` (id, object_id, role) VALUES
("10", "10", "admin");
)", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
AssertSuccessResult(result);

auto testQueryParams = [&] (TString query, TParams params) {
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), params).GetValueSync();
AssertSuccessResult(result);

Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl;
};

auto params = kikimr.GetTableClient().GetParamsBuilder()
.AddParam("$jp1").Utf8("admin").Build()
.AddParam("$jp2").Utf8("10").Build()
.AddParam("$jp3").Uint64(2).Build()
.Build();

testQueryParams(R"(
--!syntax_v1
DECLARE $jp1 AS Text;
DECLARE $jp2 AS Text;
DECLARE $jp3 AS Uint64;
select g1_0.id from granted_privilege g1_0 where (
g1_0.role = 'admin'
) and g1_0.role=$jp1 and g1_0.object_type=$jp2 limit $jp3
)", params);
}

Y_UNIT_TEST(Select1) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
Expand Down
Loading