From 1668126089c12ee0328d82bd39ab225cc01236c1 Mon Sep 17 00:00:00 2001 From: Sergey Veselov Date: Mon, 12 Feb 2024 10:39:56 +0000 Subject: [PATCH] LOGBROKER-8935: return error for compression.type config in Kafka API --- .../kafka_proxy/actors/control_plane_common.h | 14 ++++ .../actors/kafka_alter_configs_actor.cpp | 12 ++++ .../actors/kafka_create_topics_actor.cpp | 11 ++++ ydb/core/kafka_proxy/ut/ut_protocol.cpp | 65 ++++++++++++++++++- 4 files changed, 101 insertions(+), 1 deletion(-) diff --git a/ydb/core/kafka_proxy/actors/control_plane_common.h b/ydb/core/kafka_proxy/actors/control_plane_common.h index 2f5b38a860b0..22bf12d99239 100644 --- a/ydb/core/kafka_proxy/actors/control_plane_common.h +++ b/ydb/core/kafka_proxy/actors/control_plane_common.h @@ -95,6 +95,20 @@ inline TStringBuilder InputLogMessage( return stringBuilder; } +inline std::optional> ValidateTopicConfigName(TString configName) { + if (configName == COMPRESSION_TYPE) { + auto result = MakeHolder(); + result->Status = EKafkaErrors::INVALID_REQUEST; + result->Message = TStringBuilder() + << "Topic-level config '" + << COMPRESSION_TYPE + << "' is not allowed."; + return result; + } else { + return std::optional>(); + } +} + template inline std::unordered_set ExtractDuplicates( std::vector& source, diff --git a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp index 1cd233181f6d..f498a4ee0925 100644 --- a/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp @@ -117,7 +117,14 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) { std::optional retentionMs; std::optional retentionBytes; + std::optional> unsupportedConfigResponse; + for (auto& config : resource.Configs) { + unsupportedConfigResponse = ValidateTopicConfigName(config.Name.value()); + if (unsupportedConfigResponse.has_value()) { + break; + } + if (config.Name.value() == RETENTION_MS_CONFIG_NAME) { retentionMs = config.Value; } else if (config.Name.value() == RETENTION_BYTES_CONFIG_NAME) { @@ -125,6 +132,11 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) { } } + if (unsupportedConfigResponse.has_value()) { + this->TopicNamesToResponses[topicName] = unsupportedConfigResponse.value(); + continue; + } + TRetentionsConversionResult convertedRetentions = ConvertRetentions(retentionMs, retentionBytes); if (!convertedRetentions.IsValid) { diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp index 0c44b9f96889..3fad0055a1b1 100644 --- a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp @@ -164,7 +164,13 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { std::optional retentionMs; std::optional retentionBytes; + std::optional> unsupportedConfigResponse; + for (auto& config : topic.Configs) { + unsupportedConfigResponse = ValidateTopicConfigName(config.Name.value()); + if (unsupportedConfigResponse.has_value()) { + break; + } if (config.Name.value() == RETENTION_MS_CONFIG_NAME) { retentionMs = config.Value; } else if (config.Name.value() == RETENTION_BYTES_CONFIG_NAME) { @@ -172,6 +178,11 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { } } + if (unsupportedConfigResponse.has_value()) { + this->TopicNamesToResponses[topicName] = unsupportedConfigResponse.value(); + continue; + } + TRetentionsConversionResult convertedRetentions = ConvertRetentions(retentionMs, retentionBytes); if (!convertedRetentions.IsValid) { diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index c989140471f0..4bc681ee730b 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -290,15 +290,19 @@ std::vector Read(std::shared_ptr< } struct TTopicConfig { + inline static const std::map DummyMap; + TTopicConfig( TString name, ui32 partionsNumber, std::optional retentionMs = std::nullopt, - std::optional retentionBytes = std::nullopt) + std::optional retentionBytes = std::nullopt, + const std::map& configs = DummyMap) : Name(name) , PartitionsNumber(partionsNumber) , RetentionMs(retentionMs) , RetentionBytes(retentionBytes) + , Configs(configs) { } @@ -306,6 +310,7 @@ struct TTopicConfig { ui32 PartitionsNumber; std::optional RetentionMs; std::optional RetentionBytes; + std::map Configs; }; class TTestClient { @@ -634,6 +639,13 @@ class TTestClient { addConfig(topicToCreate.RetentionMs, "retention.ms"); addConfig(topicToCreate.RetentionBytes, "retention.bytes"); + for (auto const& [name, value] : topicToCreate.Configs) { + NKafka::TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig config; + config.Name = name; + config.Value = value; + topic.Configs.push_back(config); + } + request.Topics.push_back(topic); } @@ -683,6 +695,12 @@ class TTestClient { addConfig(topicToModify.RetentionMs, "retention.ms"); addConfig(topicToModify.RetentionBytes, "retention.bytes"); + for (auto const& [name, value] : topicToModify.Configs) { + NKafka::TAlterConfigsRequestData::TAlterConfigsResource::TAlterableConfig config; + config.Name = name; + config.Value = value; + resource.Configs.push_back(config); + } request.Resources.push_back(resource); } @@ -1778,6 +1796,30 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT(!result993.IsSuccess()); } + { + // Legal, but meaningless for Logbroker config + std::map configs { std::make_pair("flush.messages", "1") }; + auto msg = client.CreateTopics( { TTopicConfig("topic-987-test", 1, std::nullopt, std::nullopt, configs) }); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-987-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, NONE_ERROR); + + auto result = pqClient.DescribeTopic("/Root/topic-987-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result.IsSuccess()); + } + + { + // Both legal and illegal configs + std::map configs { std::make_pair("compression.type", "zstd"), std::make_pair("flush.messages", "1") }; + auto msg = client.CreateTopics( { TTopicConfig("topic-986-test", 1, std::nullopt, std::nullopt, configs) }); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-986-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST); + + auto result = pqClient.DescribeTopic("/Root/topic-986-test", describeTopicSettings).GetValueSync(); + UNIT_ASSERT(!result.IsSuccess()); + } + } // Y_UNIT_TEST(CreateTopicsScenario) Y_UNIT_TEST(CreatePartitionsScenario) { @@ -2086,6 +2128,27 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ResourceName.value(), shortTopic0Name); UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, INVALID_REQUEST); } + + { + // Legal, but meaningless for Logbroker config + std::map configs { std::make_pair("flush.messages", "1") }; + auto msg = client.AlterConfigs({ TTopicConfig(shortTopic0Name, 1, std::nullopt, std::nullopt, configs) }); + + UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ResourceName.value(), shortTopic0Name); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, NONE_ERROR); + } + + { + // Both legal and illegal configs + std::map configs { std::make_pair("compression.type", "zstd"), std::make_pair("flush.messages", "1") }; + auto msg = client.AlterConfigs({ TTopicConfig(shortTopic0Name, 1, std::nullopt, std::nullopt, configs) }); + + UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ResourceName.value(), shortTopic0Name); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, INVALID_REQUEST); + } + } Y_UNIT_TEST(LoginWithApiKey) {