Skip to content

Commit

Permalink
The TEvProposePartitionConfig message is sent only to the main part…
Browse files Browse the repository at this point in the history
…itions (ydb-platform#9599)
  • Loading branch information
Alek5andr-Kotov committed Sep 24, 2024
1 parent 1283f9e commit bcf9cf5
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 14 deletions.
35 changes: 27 additions & 8 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ class TMonitoringProxy : public TActorBootstrapped<TMonitoringProxy> {
, TabletID(tabletId)
, Inflight(inflight)
{
for (auto& p: Partitions) {
for (auto& p : Partitions) {
Results[p.first].push_back(Sprintf("Partition %u: NO DATA", p.first));
}
}
Expand Down Expand Up @@ -691,6 +691,10 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
ClearNewConfig();

for (auto& p : Partitions) { //change config for already created partitions
if (p.first.IsSupportivePartition()) {
continue;
}

ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, BootstrapConfigTx ? *BootstrapConfigTx : NKikimrPQ::TBootstrapConfig()));
}
ChangePartitionConfigInflight += Partitions.size();
Expand Down Expand Up @@ -1873,13 +1877,19 @@ void TPersQueue::Handle(TEvPersQueue::TEvOffsets::TPtr& ev, const TActorContext&
}
ui32 cnt = 0;
for (auto& p : Partitions) {
cnt += p.second.InitDone;
if (p.first.IsSupportivePartition()) {
continue;
}

cnt += p.second.InitDone;
}
TActorId ans = CreateOffsetsProxyActor(TabletID(), ev->Sender, cnt, ctx);

for (auto& p : Partitions) {
if (!p.second.InitDone)
if (!p.second.InitDone || p.first.IsSupportivePartition()) {
continue;
}

THolder<TEvPQ::TEvPartitionOffsets> event = MakeHolder<TEvPQ::TEvPartitionOffsets>(ans, ev->Get()->Record.HasClientId() ?
ev->Get()->Record.GetClientId() : "");
ctx.Send(p.second.Actor, event.Release());
Expand Down Expand Up @@ -1937,15 +1947,20 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext&
}

ui32 cnt = 0;
for (auto& [_, partitionInfo] : Partitions) {
cnt += partitionInfo.InitDone;
for (auto& [partitionId, partitionInfo] : Partitions) {
if (partitionId.IsSupportivePartition()) {
continue;
}

cnt += partitionInfo.InitDone;
}

TActorId ans = CreateStatusProxyActor(TabletID(), ev->Sender, cnt, ev->Cookie, ctx);
for (auto& p : Partitions) {
if (!p.second.InitDone) {
if (!p.second.InitDone || p.first.IsSupportivePartition()) {
continue;
}

THolder<TEvPQ::TEvPartitionStatus> event;
if (ev->Get()->Record.GetConsumers().empty()) {
event = MakeHolder<TEvPQ::TEvPartitionStatus>(ans, ev->Get()->Record.HasClientId() ? ev->Get()->Record.GetClientId() : "",
Expand Down Expand Up @@ -4556,7 +4571,11 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
TDistributedTransaction& tx)
{
for (auto& [_, partition] : Partitions) {
for (auto& [partitionId, partition] : Partitions) {
if (partitionId.IsSupportivePartition()) {
continue;
}

auto event = std::make_unique<TEvPQ::TEvProposePartitionConfig>(tx.Step, tx.TxId);

event->TopicConverter = tx.TopicConverter;
Expand All @@ -4567,7 +4586,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,
}

tx.PartitionRepliesCount = 0;
tx.PartitionRepliesExpected = Partitions.size();
tx.PartitionRepliesExpected = OriginalPartitionsCount;
}

TActorId TPersQueue::GetPartitionQuoter(const TPartitionId& partition) {
Expand Down
34 changes: 28 additions & 6 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class TFixture : public NUnitTest::TBaseFixture {
std::optional<size_t> maxPartitionCount = std::nullopt);
void DescribeTopic(const TString& path);

void AddConsumer(const TString& topic, const TVector<TString>& consumers);
void AddConsumer(const TString& path,
const TVector<TString>& consumers);

void WriteToTopicWithInvalidTxId(bool invalidTxId);

Expand Down Expand Up @@ -205,6 +206,7 @@ void TFixture::SetUp(NUnitTest::TTestContext&)
{
NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings();
settings.SetEnableTopicServiceTx(true);
settings.SetEnablePQConfigTransactionsAtSchemeShard(true);

Setup = std::make_unique<TTopicSdkTestSetup>(TEST_CASE_NAME, settings);

Expand Down Expand Up @@ -352,6 +354,11 @@ void TFixture::CreateTopic(const TString& path,
Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount);
}

void TFixture::DescribeTopic(const TString& path)
{
Setup->DescribeTopic(path);
}

void TFixture::AddConsumer(const TString& path,
const TVector<TString>& consumers)
{
Expand All @@ -366,11 +373,6 @@ void TFixture::AddConsumer(const TString& path,
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

void TFixture::DescribeTopic(const TString& path)
{
Setup->DescribeTopic(path);
}

const TDriver& TFixture::GetDriver() const
{
return *Driver;
Expand Down Expand Up @@ -2079,6 +2081,26 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
WriteMessagesInTx(0, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_39, TFixture)
{
CreateTopic("topic_A", TEST_CONSUMER);

NTable::TSession tableSession = CreateTableSession();
NTable::TTransaction tx = BeginTx(tableSession);

WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx);

WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);

AddConsumer("topic_A", {"consumer"});

CommitTx(tx, EStatus::SUCCESS);

auto messages = ReadFromTopic("topic_A", "consumer", TDuration::Seconds(2));
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
}

Y_UNIT_TEST_F(ReadRuleGeneration, TFixture)
{
// There was a server
Expand Down

0 comments on commit bcf9cf5

Please sign in to comment.