diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 178315afa537..aac7db2e1d7d 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -554,7 +554,7 @@ class TMonitoringProxy : public TActorBootstrapped { , TabletID(tabletId) , Inflight(inflight) { - for (auto& p: Partitions) { + for (auto& p : Partitions) { Results[p.first].push_back(Sprintf("Partition %u: NO DATA", p.first)); } } @@ -691,6 +691,10 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx) ClearNewConfig(); for (auto& p : Partitions) { //change config for already created partitions + if (p.first.IsSupportivePartition()) { + continue; + } + ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, BootstrapConfigTx ? *BootstrapConfigTx : NKikimrPQ::TBootstrapConfig())); } ChangePartitionConfigInflight += Partitions.size(); @@ -1873,13 +1877,19 @@ void TPersQueue::Handle(TEvPersQueue::TEvOffsets::TPtr& ev, const TActorContext& } ui32 cnt = 0; for (auto& p : Partitions) { - cnt += p.second.InitDone; + if (p.first.IsSupportivePartition()) { + continue; + } + + cnt += p.second.InitDone; } TActorId ans = CreateOffsetsProxyActor(TabletID(), ev->Sender, cnt, ctx); for (auto& p : Partitions) { - if (!p.second.InitDone) + if (!p.second.InitDone || p.first.IsSupportivePartition()) { continue; + } + THolder event = MakeHolder(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : ""); ctx.Send(p.second.Actor, event.Release()); @@ -1937,15 +1947,20 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& } ui32 cnt = 0; - for (auto& [_, partitionInfo] : Partitions) { - cnt += partitionInfo.InitDone; + for (auto& [partitionId, partitionInfo] : Partitions) { + if (partitionId.IsSupportivePartition()) { + continue; + } + + cnt += partitionInfo.InitDone; } TActorId ans = CreateStatusProxyActor(TabletID(), ev->Sender, cnt, ev->Cookie, ctx); for (auto& p : Partitions) { - if (!p.second.InitDone) { + if (!p.second.InitDone || p.first.IsSupportivePartition()) { continue; } + THolder event; if (ev->Get()->Record.GetConsumers().empty()) { event = MakeHolder(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : "", @@ -4556,7 +4571,11 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target, void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx, TDistributedTransaction& tx) { - for (auto& [_, partition] : Partitions) { + for (auto& [partitionId, partition] : Partitions) { + if (partitionId.IsSupportivePartition()) { + continue; + } + auto event = std::make_unique(tx.Step, tx.TxId); event->TopicConverter = tx.TopicConverter; @@ -4567,7 +4586,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx, } tx.PartitionRepliesCount = 0; - tx.PartitionRepliesExpected = Partitions.size(); + tx.PartitionRepliesExpected = OriginalPartitionsCount; } TActorId TPersQueue::GetPartitionQuoter(const TPartitionId& partition) { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index f46e83fe1e00..cc00b25179ee 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -2023,6 +2023,26 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture) WriteMessagesInTx(0, 1); } +Y_UNIT_TEST_F(WriteToTopic_Demo_39, TFixture) +{ + CreateTopic("topic_A", TEST_CONSUMER); + + NTable::TSession tableSession = CreateTableSession(); + NTable::TTransaction tx = BeginTx(tableSession); + + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx); + WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx); + + WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID); + + AddConsumer("topic_A", {"consumer"}); + + CommitTx(tx, EStatus::SUCCESS); + + auto messages = ReadFromTopic("topic_A", "consumer", TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2); +} + } }