diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 191ee25c2cb1..b7b8cce6198e 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -3127,7 +3127,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } - + { auto result = client.ExecuteQuery(R"( SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col3 = 1; @@ -4737,6 +4737,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { CheckDirEntry(kikimr, entriesToCheck); } } + Y_UNIT_TEST(CreateOrDropTopicOverTable) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); @@ -4808,6 +4809,65 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } + Y_UNIT_TEST(AlterCdcTopic) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr{serverSettings}; + auto tableClient = kikimr.GetTableClient(); + + { + auto tcSession = tableClient.CreateSession().GetValueSync().GetSession(); + UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/TmpTable` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); + )").GetValueSync().IsSuccess()); + + UNIT_ASSERT(tcSession.ExecuteSchemeQuery(R"( + ALTER TABLE `/Root/TmpTable` ADD CHANGEFEED `feed` WITH ( + MODE = 'KEYS_ONLY', FORMAT = 'JSON' + ); + )").GetValueSync().IsSuccess()); + tcSession.Close(); + } + + auto pq = NYdb::NTopic::TTopicClient(kikimr.GetDriver(), + NYdb::NTopic::TTopicClientSettings().Database("/Root").AuthToken("root@builtin")); + + auto client = kikimr.GetQueryClient(NYdb::NQuery::TClientSettings{}.AuthToken("root@builtin")); + auto session = client.GetSession().GetValueSync().GetSession(); + { + + const auto query = Q_(R"( + --!syntax_v1 + ALTER TOPIC `/Root/TmpTable/feed` ADD CONSUMER consumer21; + )"); + + RunQuery(query, session); + auto desc = pq.DescribeTopic("/Root/TmpTable/feed").ExtractValueSync(); + const auto& consumers = desc.GetTopicDescription().GetConsumers(); + UNIT_ASSERT_VALUES_EQUAL(consumers.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(consumers[0].GetConsumerName(), "consumer21"); + + } + { + const auto query = Q_(R"( + --!syntax_v1 + ALTER TOPIC `/Root/TmpTable/feed` SET (min_active_partitions = 10); + )"); + RunQuery(query, session, false); + auto desc = pq.DescribeTopic("/Root/TmpTable/feed").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(desc.GetTopicDescription().GetPartitioningSettings().GetMinActivePartitions(), 1); + } + + } + Y_UNIT_TEST(TableSink_OlapRWQueries) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); @@ -4914,7 +4974,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto it = client.StreamExecuteQuery(R"sql( SELECT r.Col3 FROM `/Root/DataShard` AS r - JOIN `/Root/ColumnShard` AS c + JOIN `/Root/ColumnShard` AS c ON r.Col1 = c.Col1; )sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h index 37795c0cf66d..01f7106a1191 100644 --- a/ydb/services/lib/actors/pq_schema_actor.h +++ b/ydb/services/lib/actors/pq_schema_actor.h @@ -548,6 +548,10 @@ namespace NKikimr::NGRpcProxy::V1 { return path; } + const TMaybe& GetCdcStreamName() const { + return CdcStreamName; + } + void SendDescribeProposeRequest(bool showPrivate = false) { return TBase::SendDescribeProposeRequest(this->ActorContext(), showPrivate); } @@ -603,6 +607,10 @@ namespace NKikimr::NGRpcProxy::V1 { if (static_cast(this)->IsCdcStreamCompatible()) { Y_ABORT_UNLESS(response.ListNodeEntry->Children.size() == 1); PrivateTopicName = response.ListNodeEntry->Children.at(0).Name; + + if (response.Self) { + CdcStreamName = response.Self->Info.GetName(); + } SendDescribeProposeRequest(true); return true; } @@ -620,6 +628,8 @@ namespace NKikimr::NGRpcProxy::V1 { TIntrusiveConstPtr PQGroupInfo; TIntrusiveConstPtr Self; TMaybe PrivateTopicName; + TMaybe CdcStreamName; + }; } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index a7a350f79ef1..f1bbf1644979 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1587,7 +1587,18 @@ void TAlterTopicActorInternal::HandleCacheNavigateResponse(TEvTxProxySchemeCache } TUpdateSchemeBase::HandleCacheNavigateResponse(ev); auto& schemeTx = Response->Response.ModifyScheme; - FillModifyScheme(schemeTx, ActorContext(), GetRequest().WorkingDir, GetRequest().Name); + std::pair pathPair; + try { + pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath()); + } catch (const std::exception &ex) { + Response->Response.Issues.AddIssue(NYql::ExceptionToIssue(ex)); + RespondWithCode(Ydb::StatusIds::BAD_REQUEST); + return; + } + + const auto& workingDir = pathPair.first; + const auto& name = pathPair.second; + FillModifyScheme(schemeTx, ActorContext(), workingDir, name); } void TAlterTopicActorInternal::ModifyPersqueueConfig( @@ -1601,7 +1612,7 @@ void TAlterTopicActorInternal::ModifyPersqueueConfig( TString error; Y_UNUSED(selfInfo); - auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, false); + auto status = FillProposeRequestImpl(GetRequest().Request, groupConfig, appData, error, GetCdcStreamName().Defined()); if (!error.empty()) { Response->Response.Issues.AddIssue(error); }