Skip to content

Commit

Permalink
more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov committed Jan 30, 2024
1 parent 1ebe7f4 commit dabd0b8
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 45 deletions.
2 changes: 1 addition & 1 deletion build/internal/ya.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
canonization_backend = "ydb-canondata.storage.yandexcloud.net"
bazel_remote_store = false
bazel_remote_store = true
bazel_remote_baseuri = "http://cachesrv.ydb.tech:8081"
bazel_remote_client_decompress = true
test_fakeid = "r13102898"
Expand Down
110 changes: 68 additions & 42 deletions ydb/core/persqueue/ut/partition_chooser_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ using namespace NKikimrPQ;

void AddPartition(NKikimrSchemeOp::TPersQueueGroupDescription& conf,
ui32 id,
const std::optional<TString>&& boundaryFrom,
const std::optional<TString>&& boundaryTo,
const std::optional<TString>&& boundaryFrom = std::nullopt,
const std::optional<TString>&& boundaryTo = std::nullopt,
std::vector<ui32> children = {}) {
auto* p = conf.AddPartitions();
p->SetPartitionId(id);
Expand Down Expand Up @@ -621,49 +621,75 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_OtherPart
AssertTable(server, "A_Source_10", 0, 13);
}

Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Test) {
Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_NewSourceId_Test) {
NPersQueue::TTestServer server = CreateServer();

CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active);
CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active);
CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active);
auto config = CreateConfig0(false);
AddPartition(config, 0);

{
auto r = ChoosePartition(server, SMDisabled, "A_Source");
UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
}
{
auto r = ChoosePartition(server, SMDisabled, "C_Source");
UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 2);
}
{
WriteToTable(server, "A_Source_w_0", 0);
auto r = ChoosePartition(server, SMDisabled, "A_Source_w_0");
UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
}
{
// Redefine partition for sourceId. Check that partition changed;
WriteToTable(server, "A_Source_w_0", 1);
auto r = ChoosePartition(server, SMDisabled, "A_Source_w_0");
UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
}
{
// Redefine partition for sourceId to inactive partition. Select new partition.
WriteToTable(server, "A_Source_w_0", 3);
auto r = ChoosePartition(server, SMDisabled, "A_Source_w_0");
UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
}
{
// Use prefered partition, and save it in table
auto r = ChoosePartition(server, SMDisabled, "A_Source_1", 1);
UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
}
auto r = ChoosePartition(server, config, "A_Source");

UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
}

Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_RegisteredSourceId_Test) {
NPersQueue::TTestServer server = CreateServer();

auto config = CreateConfig0(false);
AddPartition(config, 0);
AddPartition(config, 1);

WriteToTable(server, "A_Source", 0);
auto r = ChoosePartition(server, config, "A_Source");

UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);

WriteToTable(server, "A_Source", 1);
r = ChoosePartition(server, config, "A_Source");

UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
}

Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Inactive_Test) {
NPersQueue::TTestServer server = CreateServer();

auto config = CreateConfig0(false);
AddPartition(config, 0, {}, {}, {1});
AddPartition(config, 1);

WriteToTable(server, "A_Source", 0);
auto r = ChoosePartition(server, config, "A_Source");

UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1);
}

Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_PreferedPartition_Test) {
NPersQueue::TTestServer server = CreateServer();

auto config = CreateConfig0(false);
AddPartition(config, 0);
AddPartition(config, 1);

auto r = ChoosePartition(server, config, "A_Source", 0);

UNIT_ASSERT(r->Result);
UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0);
}

Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_PreferedPartition_Inactive_Test) {
NPersQueue::TTestServer server = CreateServer();

auto config = CreateConfig0(false);
AddPartition(config, 0, {}, {}, {1});
AddPartition(config, 1);

auto r = ChoosePartition(server, config, "A_Source", 0);

UNIT_ASSERT(r->Error);
}

}
36 changes: 35 additions & 1 deletion ydb/core/persqueue/ut/splitmerge_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,40 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) {

TTopicClient client = setup.MakeClient();

auto writeSession = CreateWriteSession(client, "producer-1");

TTestReadSession ReadSession(client, 2);

UNIT_ASSERT(writeSession->Write(Msg("message_1.1", 2)));

ui64 txId = 1006;
SplitPartition(setup, ++txId, 0, "a");

UNIT_ASSERT(writeSession->Write(Msg("message_1.2", 3)));

ReadSession.WaitAllMessages();

for(const auto& info : ReadSession.ReceivedMessages) {
if (info.Data == "message_1.1") {
UNIT_ASSERT_EQUAL(0, info.PartitionId);
UNIT_ASSERT_EQUAL(2, info.SeqNo);
} else if (info.Data == "message_1.2") {
UNIT_ASSERT(1 == info.PartitionId || 2 == info.PartitionId);
UNIT_ASSERT_EQUAL(3, info.SeqNo);
} else {
UNIT_ASSERT_C(false, "Unexpected message: " << info.Data);
}
}

writeSession->Close(TDuration::Seconds(1));
}

Y_UNIT_TEST(PartitionSplit_PreferedPartition) {
TTopicSdkTestSetup setup = CreateSetup();
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100);

TTopicClient client = setup.MakeClient();

auto writeSession1 = CreateWriteSession(client, "producer-1");
auto writeSession2 = CreateWriteSession(client, "producer-2");
auto writeSession3 = CreateWriteSession(client, "producer-3", 0);
Expand Down Expand Up @@ -273,7 +307,7 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) {
writeSession3->Close(TDuration::Seconds(1));
}

Y_UNIT_TEST(PartitionMerge) {
Y_UNIT_TEST(PartitionMerge_PreferedPartition) {
TTopicSdkTestSetup setup = CreateSetup();
setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2, 100);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND)
TIMEOUT(3000)
ELSE()
SIZE(MEDIUM)
TIMEOUT(300)
TIMEOUT(600)
ENDIF()

PEERDIR(
Expand Down

0 comments on commit dabd0b8

Please sign in to comment.