From 8b4e2e39c45ef488856eadd855620329dc679f51 Mon Sep 17 00:00:00 2001 From: Andrey Serebryanskiy Date: Tue, 24 Dec 2024 11:41:50 +0300 Subject: [PATCH] Refactor tests in YDB Topics' Kafka proxy (#12902) --- ydb/core/kafka_proxy/ut/ut_protocol.cpp | 350 +++++++----------------- 1 file changed, 97 insertions(+), 253 deletions(-) diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 287b2019f59e..0d7c1fe587dd 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -312,6 +312,30 @@ std::vector Read(std::shared_ptr< return result; } +void AssertMessageAvaialbleThroughLogbrokerApiAndCommit(std::shared_ptr topicReader) { + auto responseFromLogbrokerApi = Read(topicReader); + UNIT_ASSERT_EQUAL(responseFromLogbrokerApi.size(), 1); + + UNIT_ASSERT_EQUAL(responseFromLogbrokerApi[0].GetMessages().size(), 1); + responseFromLogbrokerApi[0].GetMessages()[0].Commit(); +} + +void CreateTopic(NYdb::NTopic::TTopicClient& pqClient, TString& topicName, ui32 minActivePartitions, std::vector consumers) { + auto topicSettings = NYdb::NTopic::TCreateTopicSettings() + .PartitioningSettings(minActivePartitions, 100); + + for (auto& consumer : consumers) { + topicSettings.BeginAddConsumer(consumer).EndAddConsumer(); + } + + auto result = pqClient + .CreateTopic(topicName, topicSettings) + .ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); +} + struct TTopicConfig { inline static const std::map DummyMap; @@ -740,6 +764,28 @@ class TTestClient { Write(So, &header, &request); } + void AuthenticateToKafka() { + { + auto msg = ApiVersions(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); + } + + { + auto msg = SaslHandshake(); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); + } + + { + auto msg = SaslAuthenticate("ouruser@/Root", "ourUserPassword"); + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + } + } + protected: ui32 NextCorrelation() { return Correlation++; @@ -771,6 +817,11 @@ class TTestClient { }; Y_UNIT_TEST_SUITE(KafkaProtocol) { + // this test imitates kafka producer behaviour: + // 1. get api version, + // 2. authenticate via sasl, + // 3. acquire producer id, + // 4. produce to topic several messages, read them and assert correct contents and metadata Y_UNIT_TEST(ProduceScenario) { TInsecureTestServer testServer("2"); @@ -778,17 +829,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { ui64 minActivePartitions = 10; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(topicName, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(minActivePartitions, 100) - .BeginAddConsumer("consumer-0").EndAddConsumer()) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } + CreateTopic(pqClient, topicName, minActivePartitions, {"consumer-0"}); auto settings = NTopic::TReadSessionSettings() .AppendTopics(NTopic::TTopicReadSettings(topicName)) @@ -804,6 +845,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); } + // authenticate { auto msg = client.SaslHandshake(); @@ -818,12 +860,14 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); } + // acquire producer id and epoch { auto msg = client.InitProducerId(); UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); } + // send test message { TString key = "record-key"; TString value = "record-value"; @@ -848,6 +892,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); + // read message from topic to assert delivery { std::vector>> topics {{topicName, {0}}}; auto msg = client.Fetch(topics); @@ -855,30 +900,35 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); auto record = msg->Responses[0].Partitions[0].Records->Records[0]; - auto data = record.Value.value(); - auto dataStr = TString(data.data(), data.size()); - UNIT_ASSERT_VALUES_EQUAL(dataStr, value); + auto recordValue = record.Value.value(); + auto recordValuesAsStr = TString(recordValue.data(), recordValue.size()); + UNIT_ASSERT_VALUES_EQUAL(recordValuesAsStr, value); - auto headerKey = record.Headers[0].Key.value(); - auto headerKeyStr = TString(headerKey.data(), headerKey.size()); - UNIT_ASSERT_VALUES_EQUAL(dataStr, value); + auto readRecordKey = record.Key.value(); + auto readRecordKeysAsStr = TString(readRecordKey.data(), readRecordKey.size()); + UNIT_ASSERT_VALUES_EQUAL(readRecordKeysAsStr, key); - auto headerValue = record.Headers[0].Value.value(); - auto headerValueStr = TString(headerValue.data(), headerValue.size()); - UNIT_ASSERT_VALUES_EQUAL(dataStr, value); + auto readHeaderKey = record.Headers[0].Key.value(); + auto readHeaderKeyStr = TString(readHeaderKey.data(), readHeaderKey.size()); + UNIT_ASSERT_VALUES_EQUAL(readHeaderKeyStr, headerKey); + + auto readHeaderValue = record.Headers[0].Value.value(); + auto readHeaderValueStr = TString(readHeaderValue.data(), readHeaderValue.size()); + UNIT_ASSERT_VALUES_EQUAL(readHeaderValueStr, headerValue); } - auto m = Read(topicReader); - UNIT_ASSERT_EQUAL(m.size(), 1); + // read by logbroker protocol + auto readMessages = Read(topicReader); + UNIT_ASSERT_EQUAL(readMessages.size(), 1); - UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); - auto& m0 = m[0].GetMessages()[0]; - m0.Commit(); + UNIT_ASSERT_EQUAL(readMessages[0].GetMessages().size(), 1); + auto& readMessage = readMessages[0].GetMessages()[0]; + readMessage.Commit(); - UNIT_ASSERT_STRINGS_EQUAL(m0.GetData(), value); - AssertMessageMeta(m0, "__key", key); - AssertMessageMeta(m0, headerKey, headerValue); + UNIT_ASSERT_STRINGS_EQUAL(readMessage.GetData(), value); + AssertMessageMeta(readMessage, "__key", key); + AssertMessageMeta(readMessage, headerKey, headerValue); } { @@ -899,12 +949,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - auto m = Read(topicReader); - UNIT_ASSERT_EQUAL(m.size(), 1); - - UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); - auto& m0 = m[0].GetMessages()[0]; - m0.Commit(); + AssertMessageAvaialbleThroughLogbrokerApiAndCommit(topicReader); } { @@ -932,21 +977,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[1].ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - { - auto m = Read(topicReader); - UNIT_ASSERT_EQUAL(m.size(), 1); - - UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); - m[0].GetMessages()[0].Commit(); - } - - { - auto m = Read(topicReader); - UNIT_ASSERT_EQUAL(m.size(), 1); - - UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); - m[0].GetMessages()[0].Commit(); - } + AssertMessageAvaialbleThroughLogbrokerApiAndCommit(topicReader); + AssertMessageAvaialbleThroughLogbrokerApiAndCommit(topicReader); } { @@ -1013,38 +1045,11 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString headerValue = "header-value"; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(topicName, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(minActivePartitions, 100)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } + CreateTopic(pqClient, topicName, minActivePartitions, {}); TTestClient client(testServer.Port); - { - auto msg = client.ApiVersions(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); - } - - { - auto msg = client.SaslHandshake(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); - } - - { - auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - } + client.AuthenticateToKafka(); { // Check list offsets for empty topic @@ -1211,7 +1216,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(dataStr, value); } } - + + // create table and init cdc for it { NYdb::NTable::TTableClient tableClient(*testServer.Driver); tableClient.RetryOperationSync([&](TSession session) @@ -1293,31 +1299,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString notExistsGroup = "consumer-not-exists"; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(topicName, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(minActivePartitions, 100) - .BeginAddConsumer(group).EndAddConsumer()) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - - } - - { - auto result = - pqClient - .CreateTopic(secondTopicName, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(minActivePartitions, 100) - .BeginAddConsumer(group).EndAddConsumer()) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - - } + CreateTopic(pqClient, topicName, minActivePartitions, {group}); + CreateTopic(pqClient, secondTopicName, minActivePartitions, {group}); TTestClient clientA(testServer.Port); TTestClient clientB(testServer.Port); @@ -1533,53 +1516,12 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString headerValue = "header-value"; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(firstTopicName, - NYdb::NTopic::TCreateTopicSettings() - .BeginAddConsumer(firstConsumerName).EndAddConsumer() - .BeginAddConsumer(secondConsumerName).EndAddConsumer() - .PartitioningSettings(minActivePartitions, 100)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } - - { - auto result = - pqClient - .CreateTopic(secondTopicName, - NYdb::NTopic::TCreateTopicSettings() - .BeginAddConsumer(firstConsumerName).EndAddConsumer() - .BeginAddConsumer(secondConsumerName).EndAddConsumer() - .PartitioningSettings(minActivePartitions, 100)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } + CreateTopic(pqClient, firstTopicName, minActivePartitions, {firstConsumerName, secondConsumerName}); + CreateTopic(pqClient, secondTopicName, minActivePartitions, {firstConsumerName, secondConsumerName}); TTestClient client(testServer.Port); - { - auto msg = client.ApiVersions(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); - } - - { - auto msg = client.SaslHandshake(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); - } - - { - auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - } + client.AuthenticateToKafka(); auto recordsCount = 5; { @@ -1766,34 +1708,11 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { Y_UNIT_TEST(CreateTopicsScenario) { TInsecureTestServer testServer("2"); - // TString key = "record-key"; - // TString value = "record-value"; - // TString headerKey = "header-key"; - // TString headerValue = "header-value"; - NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); TTestClient client(testServer.Port); - { - auto msg = client.ApiVersions(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); - } - - { - auto msg = client.SaslHandshake(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); - } - - { - auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - } + client.AuthenticateToKafka(); auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true); { @@ -2007,49 +1926,12 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString headerValue = "header-value"; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(topic1Name, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(10, 100)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } - - { - auto result = - pqClient - .CreateTopic(topic2Name, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(20, 100)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } + CreateTopic(pqClient, topic1Name, 10, {}); + CreateTopic(pqClient, topic2Name, 20, {}); TTestClient client(testServer.Port); - { - auto msg = client.ApiVersions(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); - } - - { - auto msg = client.SaslHandshake(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); - } - - { - auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - } + client.AuthenticateToKafka(); auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true); @@ -2167,25 +2049,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TTestClient client(testServer.Port); - { - auto msg = client.ApiVersions(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u); - } - - { - auto msg = client.SaslHandshake(); - - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); - UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); - } - - { - auto msg = client.SaslAuthenticate("ouruser@/Root", "ourUserPassword"); - UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast(EKafkaErrors::NONE_ERROR)); - } + client.AuthenticateToKafka(); auto describeTopicSettings = NTopic::TDescribeTopicSettings().IncludeStats(true); @@ -2326,17 +2190,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString topicName = "/Root/topic-0-test"; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(topicName, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(10, 100) - .BeginAddConsumer("consumer-0").EndAddConsumer()) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } + CreateTopic(pqClient, topicName, 10, {"consumer-0"}); auto settings = NTopic::TReadSessionSettings() .AppendTopics(NTopic::TTopicReadSettings(topicName)) @@ -2375,17 +2229,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TString topicName = "/Root/topic-0-test"; NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); - { - auto result = - pqClient - .CreateTopic(topicName, - NYdb::NTopic::TCreateTopicSettings() - .PartitioningSettings(10, 100) - .BeginAddConsumer("consumer-0").EndAddConsumer()) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); - UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); - } + CreateTopic(pqClient, topicName, 10, {"consumer-0"}); auto settings = NTopic::TReadSessionSettings() .AppendTopics(NTopic::TTopicReadSettings(topicName))