diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 178315afa537..aac7db2e1d7d 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -554,7 +554,7 @@ class TMonitoringProxy : public TActorBootstrapped { , TabletID(tabletId) , Inflight(inflight) { - for (auto& p: Partitions) { + for (auto& p : Partitions) { Results[p.first].push_back(Sprintf("Partition %u: NO DATA", p.first)); } } @@ -691,6 +691,10 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) ClearNewConfig(); for (auto& p : Partitions) { //change config for already created partitions + if (p.first.IsSupportivePartition()) { + continue; + } + ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, BootstrapConfigTx ? *BootstrapConfigTx : NKikimrPQ::TBootstrapConfig())); } ChangePartitionConfigInflight += Partitions.size(); @@ -1873,13 +1877,19 @@ void TPersQueue::Handle(TEvPersQueue::TEvOffsets::TPtr& ev, const TActorContext& } ui32 cnt = 0; for (auto& p : Partitions) { - cnt += p.second.InitDone; + if (p.first.IsSupportivePartition()) { + continue; + } + + cnt += p.second.InitDone; } TActorId ans = CreateOffsetsProxyActor(TabletID(), ev->Sender, cnt, ctx); for (auto& p : Partitions) { - if (!p.second.InitDone) + if (!p.second.InitDone || p.first.IsSupportivePartition()) { continue; + } + THolder event = MakeHolder(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : ""); ctx.Send(p.second.Actor, event.Release()); @@ -1937,15 +1947,20 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& } ui32 cnt = 0; - for (auto& [_, partitionInfo] : Partitions) { - cnt += partitionInfo.InitDone; + for (auto& [partitionId, partitionInfo] : Partitions) { + if (partitionId.IsSupportivePartition()) { + continue; + } + + cnt += partitionInfo.InitDone; } TActorId ans = CreateStatusProxyActor(TabletID(), ev->Sender, cnt, ev->Cookie, ctx); for (auto& p : Partitions) { - if (!p.second.InitDone) { + if (!p.second.InitDone || p.first.IsSupportivePartition()) { continue; } + THolder event; if (ev->Get()->Record.GetConsumers().empty()) { event = MakeHolder(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : "", @@ -4556,7 +4571,11 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target, void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx, TDistributedTransaction& tx) { - for (auto& [_, partition] : Partitions) { + for (auto& [partitionId, partition] : Partitions) { + if (partitionId.IsSupportivePartition()) { + continue; + } + auto event = std::make_unique(tx.Step, tx.TxId); event->TopicConverter = tx.TopicConverter; @@ -4567,7 +4586,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx, } tx.PartitionRepliesCount = 0; - tx.PartitionRepliesExpected = Partitions.size(); + tx.PartitionRepliesExpected = OriginalPartitionsCount; } TActorId TPersQueue::GetPartitionQuoter(const TPartitionId& partition) { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index 0379754e32ba..1d19c1bf4805 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -69,7 +69,8 @@ class TFixture : public NUnitTest::TBaseFixture { std::optional maxPartitionCount = std::nullopt); void DescribeTopic(const TString& path); - void AddConsumer(const TString& topic, const TVector& consumers); + void AddConsumer(const TString& path, + const TVector& consumers); void WriteToTopicWithInvalidTxId(bool invalidTxId); @@ -205,6 +206,7 @@ void TFixture::SetUp(NUnitTest::TTestContext&) { NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings(); settings.SetEnableTopicServiceTx(true); + settings.SetEnablePQConfigTransactionsAtSchemeShard(true); Setup = std::make_unique(TEST_CASE_NAME, settings); @@ -352,6 +354,11 @@ void TFixture::CreateTopic(const TString& path, Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount); } +void TFixture::DescribeTopic(const TString& path) +{ + Setup->DescribeTopic(path); +} + void TFixture::AddConsumer(const TString& path, const TVector& consumers) { @@ -366,11 +373,6 @@ void TFixture::AddConsumer(const TString& path, UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } -void TFixture::DescribeTopic(const TString& path) -{ - Setup->DescribeTopic(path); -} - const TDriver& TFixture::GetDriver() const { return *Driver; @@ -2079,6 +2081,26 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture) WriteMessagesInTx(0, 1); } +Y_UNIT_TEST_F(WriteToTopic_Demo_39, TFixture) +{ + CreateTopic("topic_A", TEST_CONSUMER); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); + + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + AddConsumer("topic_A", {"consumer"}); + + CommitTx(tx, EStatus::SUCCESS); + + auto messages = ReadFromTopic("topic_A", "consumer", TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); +} + Y_UNIT_TEST_F(ReadRuleGeneration, TFixture) { // There was a server