Skip to content

Commit

Permalink
The values of PartNo must be added (#13869)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alek5andr-Kotov authored Jan 28, 2025
1 parent d0b724a commit 55a593f
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 1 deletion.
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2290,7 +2290,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)

if (!t.WriteInfo->BlobsFromHead.empty()) {
auto& first = t.WriteInfo->BlobsFromHead.front();
NewHead.PartNo = first.GetPartNo();
NewHead.PartNo += first.GetPartNo();

Parameters->HeadCleared = Parameters->HeadCleared || !t.WriteInfo->BodyKeys.empty();

Expand Down
52 changes: 52 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2377,6 +2377,58 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_48, TFixture)
UNIT_ASSERT_GT(topicDescription.GetTotalPartitionsCount(), 2);
}

Y_UNIT_TEST_F(Write_Random_Sized_Messages_In_Wide_Transactions, TFixture)
{
// The test verifies the simultaneous execution of several transactions. There is a topic
// with PARTITIONS_COUNT partitions. In each transaction, the test writes to all the partitions.
// The size of the messages is random. Such that both large blobs in the body and small ones in
// the head of the partition are obtained.

const size_t PARTITIONS_COUNT = 20;
const size_t TXS_COUNT = 100;

CreateTopic("topic_A", TEST_CONSUMER, PARTITIONS_COUNT);

TVector<NTable::TSession> sessions;
TVector<NTable::TTransaction> transactions;

// We open TXS_COUNT transactions and write messages to the topic.
for (size_t i = 0; i < TXS_COUNT; ++i) {
sessions.push_back(CreateTableSession());
auto& session = sessions.back();

transactions.push_back(BeginTx(session));
auto& tx = transactions.back();

for (size_t j = 0; j < PARTITIONS_COUNT; ++j) {
TString sourceId = TEST_MESSAGE_GROUP_ID;
sourceId += "_";
sourceId += ToString(i);
sourceId += "_";
sourceId += ToString(j);

size_t count = RandomNumber<size_t>(20) + 3;
WriteToTopic("topic_A", sourceId, TString(512 * 1000 * count, 'x'), &tx, j);

WaitForAcks("topic_A", sourceId);
}
}

// We are doing an asynchronous commit of transactions. They will be executed simultaneously.
TVector<NTable::TAsyncCommitTransactionResult> futures;

for (size_t i = 0; i < TXS_COUNT; ++i) {
futures.push_back(transactions[i].Commit());
}

// All transactions must be completed successfully.
for (size_t i = 0; i < TXS_COUNT; ++i) {
futures[i].Wait();
const auto& result = futures[i].GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
}

}

}

0 comments on commit 55a593f

Please sign in to comment.