From dabd0b87a506a9ac4d41a5cacf85f5235c05b272 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Tue, 30 Jan 2024 06:43:01 +0000 Subject: [PATCH] more tests --- build/internal/ya.conf | 2 +- .../persqueue/ut/partition_chooser_ut.cpp | 110 +++++++++++------- ydb/core/persqueue/ut/splitmerge_ut.cpp | 36 +++++- ydb/core/persqueue/ut/ya.make | 2 +- 4 files changed, 105 insertions(+), 45 deletions(-) diff --git a/build/internal/ya.conf b/build/internal/ya.conf index 4a2f45951603..81be41d837e2 100644 --- a/build/internal/ya.conf +++ b/build/internal/ya.conf @@ -1,5 +1,5 @@ canonization_backend = "ydb-canondata.storage.yandexcloud.net" -bazel_remote_store = false +bazel_remote_store = true bazel_remote_baseuri = "http://cachesrv.ydb.tech:8081" bazel_remote_client_decompress = true test_fakeid = "r13102898" diff --git a/ydb/core/persqueue/ut/partition_chooser_ut.cpp b/ydb/core/persqueue/ut/partition_chooser_ut.cpp index 34775f7b6455..8eb16403ef17 100644 --- a/ydb/core/persqueue/ut/partition_chooser_ut.cpp +++ b/ydb/core/persqueue/ut/partition_chooser_ut.cpp @@ -18,8 +18,8 @@ using namespace NKikimrPQ; void AddPartition(NKikimrSchemeOp::TPersQueueGroupDescription& conf, ui32 id, - const std::optional&& boundaryFrom, - const std::optional&& boundaryTo, + const std::optional&& boundaryFrom = std::nullopt, + const std::optional&& boundaryTo = std::nullopt, std::vector children = {}) { auto* p = conf.AddPartitions(); p->SetPartitionId(id); @@ -621,49 +621,75 @@ Y_UNIT_TEST(TPartitionChooserActor_SplitMergeEnabled_PreferedPartition_OtherPart AssertTable(server, "A_Source_10", 0, 13); } -Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Test) { +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_NewSourceId_Test) { NPersQueue::TTestServer server = CreateServer(); - CreatePQTabletMock(server, 0, ETopicPartitionStatus::Active); - CreatePQTabletMock(server, 1, ETopicPartitionStatus::Active); - CreatePQTabletMock(server, 2, ETopicPartitionStatus::Active); + auto config = CreateConfig0(false); + AddPartition(config, 0); - { - auto r = ChoosePartition(server, SMDisabled, "A_Source"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); - } - { - auto r = ChoosePartition(server, SMDisabled, "C_Source"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 2); - } - { - WriteToTable(server, "A_Source_w_0", 0); - auto r = ChoosePartition(server, SMDisabled, "A_Source_w_0"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); - } - { - // Redefine partition for sourceId. Check that partition changed; - WriteToTable(server, "A_Source_w_0", 1); - auto r = ChoosePartition(server, SMDisabled, "A_Source_w_0"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); - } - { - // Redefine partition for sourceId to inactive partition. Select new partition. - WriteToTable(server, "A_Source_w_0", 3); - auto r = ChoosePartition(server, SMDisabled, "A_Source_w_0"); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); - } - { - // Use prefered partition, and save it in table - auto r = ChoosePartition(server, SMDisabled, "A_Source_1", 1); - UNIT_ASSERT(r->Result); - UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); - } + auto r = ChoosePartition(server, config, "A_Source"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_RegisteredSourceId_Test) { + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(false); + AddPartition(config, 0); + AddPartition(config, 1); + + WriteToTable(server, "A_Source", 0); + auto r = ChoosePartition(server, config, "A_Source"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); + + WriteToTable(server, "A_Source", 1); + r = ChoosePartition(server, config, "A_Source"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_Inactive_Test) { + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(false); + AddPartition(config, 0, {}, {}, {1}); + AddPartition(config, 1); + + WriteToTable(server, "A_Source", 0); + auto r = ChoosePartition(server, config, "A_Source"); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 1); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_PreferedPartition_Test) { + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(false); + AddPartition(config, 0); + AddPartition(config, 1); + + auto r = ChoosePartition(server, config, "A_Source", 0); + + UNIT_ASSERT(r->Result); + UNIT_ASSERT_VALUES_EQUAL(r->Result->Get()->PartitionId, 0); +} + +Y_UNIT_TEST(TPartitionChooserActor_SplitMergeDisabled_PreferedPartition_Inactive_Test) { + NPersQueue::TTestServer server = CreateServer(); + + auto config = CreateConfig0(false); + AddPartition(config, 0, {}, {}, {1}); + AddPartition(config, 1); + + auto r = ChoosePartition(server, config, "A_Source", 0); + + UNIT_ASSERT(r->Error); } } diff --git a/ydb/core/persqueue/ut/splitmerge_ut.cpp b/ydb/core/persqueue/ut/splitmerge_ut.cpp index 48355fc3c8ff..2221298e8985 100644 --- a/ydb/core/persqueue/ut/splitmerge_ut.cpp +++ b/ydb/core/persqueue/ut/splitmerge_ut.cpp @@ -217,6 +217,40 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) { TTopicClient client = setup.MakeClient(); + auto writeSession = CreateWriteSession(client, "producer-1"); + + TTestReadSession ReadSession(client, 2); + + UNIT_ASSERT(writeSession->Write(Msg("message_1.1", 2))); + + ui64 txId = 1006; + SplitPartition(setup, ++txId, 0, "a"); + + UNIT_ASSERT(writeSession->Write(Msg("message_1.2", 3))); + + ReadSession.WaitAllMessages(); + + for(const auto& info : ReadSession.ReceivedMessages) { + if (info.Data == "message_1.1") { + UNIT_ASSERT_EQUAL(0, info.PartitionId); + UNIT_ASSERT_EQUAL(2, info.SeqNo); + } else if (info.Data == "message_1.2") { + UNIT_ASSERT(1 == info.PartitionId || 2 == info.PartitionId); + UNIT_ASSERT_EQUAL(3, info.SeqNo); + } else { + UNIT_ASSERT_C(false, "Unexpected message: " << info.Data); + } + } + + writeSession->Close(TDuration::Seconds(1)); + } + + Y_UNIT_TEST(PartitionSplit_PreferedPartition) { + TTopicSdkTestSetup setup = CreateSetup(); + setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 1, 100); + + TTopicClient client = setup.MakeClient(); + auto writeSession1 = CreateWriteSession(client, "producer-1"); auto writeSession2 = CreateWriteSession(client, "producer-2"); auto writeSession3 = CreateWriteSession(client, "producer-3", 0); @@ -273,7 +307,7 @@ Y_UNIT_TEST_SUITE(TopicSplitMerge) { writeSession3->Close(TDuration::Seconds(1)); } - Y_UNIT_TEST(PartitionMerge) { + Y_UNIT_TEST(PartitionMerge_PreferedPartition) { TTopicSdkTestSetup setup = CreateSetup(); setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2, 100); diff --git a/ydb/core/persqueue/ut/ya.make b/ydb/core/persqueue/ut/ya.make index ae7bb7bc9d4a..8ebb85ef2ed1 100644 --- a/ydb/core/persqueue/ut/ya.make +++ b/ydb/core/persqueue/ut/ya.make @@ -10,7 +10,7 @@ IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) TIMEOUT(3000) ELSE() SIZE(MEDIUM) - TIMEOUT(300) + TIMEOUT(600) ENDIF() PEERDIR(