Skip to content

Commit

Permalink
Merge 3a1a787 into ee906a5
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Sep 17, 2024
2 parents ee906a5 + 3a1a787 commit 2f09885
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 4 deletions.
4 changes: 2 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2564,8 +2564,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

const bool singlePartitionOptAllowed = !HasOlapTable && !UnknownAffectedShardCount && !HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty();
const bool useDataQueryPool = !(HasExternalSources && DatashardTxs.empty() && EvWriteTxs.empty());
const bool localComputeTasks = !DatashardTxs.empty();
const bool mayRunTasksLocally = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.empty());
const bool localComputeTasks = !DatashardTxs.empty() && !HasOlapTable;
const bool mayRunTasksLocally = !((HasExternalSources || HasDatashardSourceScan) && DatashardTxs.empty()) && !HasOlapTable;

Planner = CreateKqpPlanner({
.TasksGraph = TasksGraph,
Expand Down
120 changes: 118 additions & 2 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3174,7 +3174,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(1)));
}

/*{
{
auto result = client.ExecuteQuery(R"(
DELETE FROM `/Root/ColumnShard` ON SELECT * FROM `/Root/DataShard` WHERE Col1 > 9;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
Expand Down Expand Up @@ -3223,7 +3223,123 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
DELETE FROM `/Root/ColumnShard` WHERE Col2 = "not found";
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}*/
}
}

Y_UNIT_TEST_TWIN(TableSink_HtapComplex, withOltpSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withOltpSink);
appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();

const TString query = R"(
CREATE TABLE `/Root/ColumnSrc` (
Col1 Uint64 NOT NULL,
Col2 String NOT NULL,
Col3 Int32 NOT NULL,
PRIMARY KEY (Col1)
)
PARTITION BY HASH(Col1)
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
CREATE TABLE `/Root/RowSrc` (
Col1 Uint64 NOT NULL,
Col2 String NOT NULL,
Col3 Int32 NOT NULL,
PRIMARY KEY (Col1)
)
WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2);
CREATE TABLE `/Root/ColumnDst` (
Col1 Uint64 NOT NULL,
Col2 String,
Col3 Int32,
PRIMARY KEY (Col1)
)
PARTITION BY HASH(Col1)
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
CREATE TABLE `/Root/RowDst` (
Col1 Uint64 NOT NULL,
Col2 String,
Col3 Int32,
PRIMARY KEY (Col1)
)
WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2);
)";

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

auto client = kikimr.GetQueryClient();

{
auto result = client.ExecuteQuery(R"(
UPSERT INTO `/Root/ColumnSrc` (Col1, Col2, Col3) VALUES
(1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, "test", 13);
UPSERT INTO `/Root/RowSrc` (Col1, Col2, Col3) VALUES
(10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, "test", 13);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
$data = SELECT c.Col1 as Col1, c.Col2 As Col2, r.Col3 AS Col3
FROM `/Root/ColumnSrc`as c
JOIN `/Root/RowSrc` as r
ON c.Col1 + 10 = r.Col3;
UPSERT INTO `/Root/ColumnDst` SELECT * FROM $data;
REPLACE INTO `/Root/RowDst` SELECT * FROM $data;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
SELECT COUNT(*) FROM `/Root/ColumnDst`;
SELECT COUNT(*) FROM `/Root/RowDst`;
DELETE FROM `/Root/ColumnDst` WHERE 1=1;
DELETE FROM `/Root/RowDst` WHERE 1=1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(1)));
}

{
auto result = client.ExecuteQuery(R"(
$prepare = SELECT *
FROM `/Root/ColumnSrc`
WHERE Col2 LIKE 'test?';
$data = SELECT c.Col1 as Col1, c.Col2 As Col2, r.Col3 AS Col3
FROM `/Root/RowSrc`as c
LEFT OUTER JOIN $prepare as r
ON c.Col1 + 10 = r.Col3;
UPSERT INTO `/Root/ColumnDst` SELECT * FROM $data;
REPLACE INTO `/Root/RowDst` SELECT * FROM $data;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
SELECT COUNT(*) FROM `/Root/ColumnDst`;
SELECT COUNT(*) FROM `/Root/RowDst`;
DELETE FROM `/Root/ColumnDst` WHERE 1=1;
DELETE FROM `/Root/RowDst` WHERE 1=1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(1)));
}
}

Y_UNIT_TEST_TWIN(TableSink_HtapInteractive, withOltpSink) {
Expand Down

0 comments on commit 2f09885

Please sign in to comment.