Skip to content

Commit

Permalink
Merge f8bfeb7 into 8a5b980
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Sep 19, 2024
2 parents 8a5b980 + f8bfeb7 commit bd1ac2b
Show file tree
Hide file tree
Showing 4 changed files with 379 additions and 2 deletions.
3 changes: 2 additions & 1 deletion ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,8 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
QueryState->TxCtx->HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery);
QueryState->TxCtx->HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
QueryState->TxCtx->HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery);
if (QueryState->TxCtx->HasOlapTable && QueryState->TxCtx->HasOltpTable && QueryState->TxCtx->HasTableWrite && !Settings.TableService.GetEnableHtapTx()) {
if (QueryState->TxCtx->HasOlapTable && QueryState->TxCtx->HasOltpTable && QueryState->TxCtx->HasTableWrite
&& !Settings.TableService.GetEnableHtapTx() && !QueryState->IsSplitted()) {
ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED,
"Write transactions between column and row tables are disabled at current time.");
return false;
Expand Down
235 changes: 234 additions & 1 deletion ydb/core/kqp/ut/query/kqp_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,8 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);
.SetWithSampleTables(false)
.SetEnableTempTables(true);
TKikimrRunner kikimr(settings);

auto client = kikimr.GetQueryClient();
Expand Down Expand Up @@ -1779,6 +1780,238 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
CompareYson(R"([[8u]])", FormatResultSetYson(it.GetResultSet(0)));
}
}

Y_UNIT_TEST(CreateAsSelect_BadCases) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(false);
appConfig.MutableTableServiceConfig()->SetEnableHtapTx(false);
appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true);
appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false)
.SetEnableTempTables(true);
TKikimrRunner kikimr(settings);

const TString query = R"(
CREATE TABLE `/Root/ColSrc` (
Col1 Uint64 NOT NULL,
Col2 Int32,
PRIMARY KEY (Col1)
)
PARTITION BY HASH(Col1)
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
CREATE TABLE `/Root/RowSrc` (
Col1 Uint64 NOT NULL,
Col2 Int32,
PRIMARY KEY (Col1)
)
WITH (STORE = ROW, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
)";

auto client = kikimr.GetQueryClient();
auto result = client.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

{
auto prepareResult = client.ExecuteQuery(R"(
REPLACE INTO `/Root/ColSrc` (Col1, Col2) VALUES (1u, 1), (100u, 100), (10u, 10);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
}

{
auto prepareResult = client.ExecuteQuery(R"(
REPLACE INTO `/Root/RowSrc` (Col1, Col2) VALUES (1u, 1), (100u, 100), (10u, 10);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
CREATE OR REPLACE TABLE `/Root/RowDst` (
PRIMARY KEY (Col1)
)
WITH (STORE = ROW) AS
SELECT * FROM `/Root/RowSrc`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "OR REPLACE feature is supported only for EXTERNAL DATA SOURCE and EXTERNAL TABLE", result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
CREATE IF NOT EXISTS TABLE `/Root/RowDst` (
PRIMARY KEY (Col1)
)
WITH (STORE = ROW) AS
SELECT * FROM `/Root/RowSrc`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "Unexpected token", result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
CREATE TABLE `/Root/RowDst` (
INDEX idx GLOBAL ON Col2,
PRIMARY KEY (Col1)
)
WITH (STORE = ROW) AS
SELECT * FROM `/Root/RowSrc`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "Unexpected token", result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
CREATE TABLE `/Root/RowDst` (
PRIMARY KEY (Col1)
)
WITH (STORE = ROW) AS
SELECT Col1, 1 / (Col2 - 100) As Col2 FROM `/Root/RowSrc`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

result = client.ExecuteQuery(R"(
SELECT * FROM `/Root/RowDst` ORDER BY Col1;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([[[1u];[0]];[[10u];[0]];[[100u];#]])", FormatResultSetYson(result.GetResultSet(0)));

result = client.ExecuteQuery(R"(
DROP TABLE `/Root/RowDst`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
CREATE TABLE `/Root/RowDst` (
PRIMARY KEY (Col1)
)
WITH (STORE = ROW) AS
SELECT Col2 AS Col1, Col1 As Col2 FROM `/Root/RowSrc`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

result = client.ExecuteQuery(R"(
SELECT * FROM `/Root/RowDst`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());


result = client.ExecuteQuery(R"(
DROP TABLE `/Root/RowDst`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
CREATE TABLE `/Root/ColDst` (
PRIMARY KEY (Col1)
)
WITH (STORE = COLUMN) AS
SELECT Col2 AS Col1, Col1 As Col2 FROM `/Root/ColSrc`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "Can't set NULL or optional value to not null column: Col1.", result.GetIssues().ToString());

result = client.ExecuteQuery(R"(
SELECT * FROM `/Root/ColDst`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
// TODO: Wait for RENAME from columnshards
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

result = client.ExecuteQuery(R"(
DROP TABLE `/Root/ColDst`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
CREATE TABLE `/Root/ColDst` (
PRIMARY KEY (Col1)
)
WITH (STORE = COLUMN) AS
SELECT Unwrap(Col2) AS Col1, Col1 As Col2 FROM `/Root/ColSrc`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

result = client.ExecuteQuery(R"(
SELECT * FROM `/Root/ColDst`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

result = client.ExecuteQuery(R"(
DROP TABLE `/Root/ColDst`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
CREATE TABLE `/Root/RowlDst` (
PRIMARY KEY (Col1)
)
WITH (STORE = COLUMN) AS
SELECT NotFound AS Col1, Col1 As Col2 FROM `/Root/RowSrc`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "not found", result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
CREATE TABLE `/Root/RowSrc` (
PRIMARY KEY (Col1)
)
WITH (STORE = ROW) AS
SELECT 1 AS Col1, 2 As Col2;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS_C(result.GetIssues().ToString(), "path exist", result.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
CREATE TABLE `/Root/RowDst` (
PRIMARY KEY (Col1)
)
WITH (STORE = ROW) AS
SELECT * FROM `/Root/ColSrc`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

result = client.ExecuteQuery(R"(
SELECT COUNT(*) FROM `/Root/RowDst`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(0)));
}

{
auto result = client.ExecuteQuery(R"(
CREATE TABLE `/Root/ColDst` (
PRIMARY KEY (Col1)
)
WITH (STORE = ROW) AS
SELECT * FROM `/Root/RowSrc`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

result = client.ExecuteQuery(R"(
SELECT COUNT(*) FROM `/Root/ColDst`;
)", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(0)));
}
}
}

} // namespace NKqp
Expand Down
78 changes: 78 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4256,6 +4256,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
appConfig.MutableTableServiceConfig()->SetEnableHtapTx(false);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);
Expand Down Expand Up @@ -4846,6 +4847,83 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
CompareYson(R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[101u;["test1"];10];[102u;["test2"];11];[103u;["test3"];12];[104u;#;13]])", FormatResultSetYson(result.GetResultSet(1)));
}
}

Y_UNIT_TEST(MixedReadQueryWithoutStreamLookup) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(false);
appConfig.MutableTableServiceConfig()->SetEnableHtapTx(false);
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);

auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);

TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto client = kikimr.GetQueryClient();

{
auto createTable = client.ExecuteQuery(R"sql(
CREATE TABLE `/Root/DataShard` (
Col1 Uint64 NOT NULL,
Col2 Int32 NOT NULL,
Col3 String,
PRIMARY KEY (Col1, Col2)
) WITH (STORE = ROW);
CREATE TABLE `/Root/ColumnShard` (
Col1 Uint64 NOT NULL,
Col2 Int32 NOT NULL,
Col3 String,
PRIMARY KEY (Col1, Col2)
) WITH (STORE = COLUMN);
)sql", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
UNIT_ASSERT_C(createTable.IsSuccess(), createTable.GetIssues().ToString());
}

{
auto replaceValues = client.ExecuteQuery(R"sql(
REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
(1u, 1, "row"), (1u, 2, "row"), (1u, 3, "row"), (2u, 3, "row");
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
}
{
auto replaceValues = client.ExecuteQuery(R"sql(
REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
(1u, 1, "row"), (1u, 2, "row"), (1u, 3, "row"), (2u, 3, "row");
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
}

{
auto it = client.StreamExecuteQuery(R"sql(
SELECT Col3 FROM `/Root/DataShard` WHERE Col1 = 1u
UNION ALL
SELECT Col3 FROM `/Root/ColumnShard` WHERE Col1 = 1u;
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
TString output = StreamResultToYson(it);
CompareYson(
output,
R"([[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]]])");
}

{
auto it = client.StreamExecuteQuery(R"sql(
SELECT r.Col3
FROM `/Root/DataShard` AS r
JOIN `/Root/ColumnShard` AS c
ON r.Col1 = c.Col1;
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
TString output = StreamResultToYson(it);
CompareYson(
output,
R"([[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]]])");
}
}
}

} // namespace NKqp
Expand Down
Loading

0 comments on commit bd1ac2b

Please sign in to comment.