Skip to content

Commit

Permalink
LOGBROKER-8935: return error for compression.type config in Kafka API (
Browse files Browse the repository at this point in the history
  • Loading branch information
siarheivesialou authored Feb 29, 2024
1 parent bb152f2 commit c54e54d
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 1 deletion.
14 changes: 14 additions & 0 deletions ydb/core/kafka_proxy/actors/control_plane_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ inline TStringBuilder InputLogMessage(
return stringBuilder;
}

inline std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> ValidateTopicConfigName(TString configName) {
if (configName == COMPRESSION_TYPE) {
auto result = MakeHolder<TEvKafka::TEvTopicModificationResponse>();
result->Status = EKafkaErrors::INVALID_REQUEST;
result->Message = TStringBuilder()
<< "Topic-level config '"
<< COMPRESSION_TYPE
<< "' is not allowed.";
return result;
} else {
return std::optional<THolder<TEvKafka::TEvTopicModificationResponse>>();
}
}

template<class T>
inline std::unordered_set<TString> ExtractDuplicates(
std::vector<T>& source,
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/kafka_proxy/actors/kafka_alter_configs_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,26 @@ void TKafkaAlterConfigsActor::Bootstrap(const NActors::TActorContext& ctx) {
std::optional<TString> retentionMs;
std::optional<TString> retentionBytes;

std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> unsupportedConfigResponse;

for (auto& config : resource.Configs) {
unsupportedConfigResponse = ValidateTopicConfigName(config.Name.value());
if (unsupportedConfigResponse.has_value()) {
break;
}

if (config.Name.value() == RETENTION_MS_CONFIG_NAME) {
retentionMs = config.Value;
} else if (config.Name.value() == RETENTION_BYTES_CONFIG_NAME) {
retentionBytes = config.Value;
}
}

if (unsupportedConfigResponse.has_value()) {
this->TopicNamesToResponses[topicName] = unsupportedConfigResponse.value();
continue;
}

TRetentionsConversionResult convertedRetentions = ConvertRetentions(retentionMs, retentionBytes);

if (!convertedRetentions.IsValid) {
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,25 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) {
std::optional<TString> retentionMs;
std::optional<TString> retentionBytes;

std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> unsupportedConfigResponse;

for (auto& config : topic.Configs) {
unsupportedConfigResponse = ValidateTopicConfigName(config.Name.value());
if (unsupportedConfigResponse.has_value()) {
break;
}
if (config.Name.value() == RETENTION_MS_CONFIG_NAME) {
retentionMs = config.Value;
} else if (config.Name.value() == RETENTION_BYTES_CONFIG_NAME) {
retentionBytes = config.Value;
}
}

if (unsupportedConfigResponse.has_value()) {
this->TopicNamesToResponses[topicName] = unsupportedConfigResponse.value();
continue;
}

TRetentionsConversionResult convertedRetentions = ConvertRetentions(retentionMs, retentionBytes);

if (!convertedRetentions.IsValid) {
Expand Down
65 changes: 64 additions & 1 deletion ydb/core/kafka_proxy/ut/ut_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,22 +290,27 @@ std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> Read(std::shared_ptr<
}

struct TTopicConfig {
inline static const std::map<TString, TString> DummyMap;

TTopicConfig(
TString name,
ui32 partionsNumber,
std::optional<TString> retentionMs = std::nullopt,
std::optional<TString> retentionBytes = std::nullopt)
std::optional<TString> retentionBytes = std::nullopt,
const std::map<TString, TString>& configs = DummyMap)
: Name(name)
, PartitionsNumber(partionsNumber)
, RetentionMs(retentionMs)
, RetentionBytes(retentionBytes)
, Configs(configs)
{
}

TString Name;
ui32 PartitionsNumber;
std::optional<TString> RetentionMs;
std::optional<TString> RetentionBytes;
std::map<TString, TString> Configs;
};

class TTestClient {
Expand Down Expand Up @@ -634,6 +639,13 @@ class TTestClient {
addConfig(topicToCreate.RetentionMs, "retention.ms");
addConfig(topicToCreate.RetentionBytes, "retention.bytes");

for (auto const& [name, value] : topicToCreate.Configs) {
NKafka::TCreateTopicsRequestData::TCreatableTopic::TCreateableTopicConfig config;
config.Name = name;
config.Value = value;
topic.Configs.push_back(config);
}

request.Topics.push_back(topic);
}

Expand Down Expand Up @@ -683,6 +695,12 @@ class TTestClient {
addConfig(topicToModify.RetentionMs, "retention.ms");
addConfig(topicToModify.RetentionBytes, "retention.bytes");

for (auto const& [name, value] : topicToModify.Configs) {
NKafka::TAlterConfigsRequestData::TAlterConfigsResource::TAlterableConfig config;
config.Name = name;
config.Value = value;
resource.Configs.push_back(config);
}
request.Resources.push_back(resource);
}

Expand Down Expand Up @@ -1778,6 +1796,30 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT(!result993.IsSuccess());
}

{
// Legal, but meaningless for Logbroker config
std::map<TString, TString> configs { std::make_pair("flush.messages", "1") };
auto msg = client.CreateTopics( { TTopicConfig("topic-987-test", 1, std::nullopt, std::nullopt, configs) });
UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-987-test");
UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, NONE_ERROR);

auto result = pqClient.DescribeTopic("/Root/topic-987-test", describeTopicSettings).GetValueSync();
UNIT_ASSERT(result.IsSuccess());
}

{
// Both legal and illegal configs
std::map<TString, TString> configs { std::make_pair("compression.type", "zstd"), std::make_pair("flush.messages", "1") };
auto msg = client.CreateTopics( { TTopicConfig("topic-986-test", 1, std::nullopt, std::nullopt, configs) });
UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-986-test");
UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST);

auto result = pqClient.DescribeTopic("/Root/topic-986-test", describeTopicSettings).GetValueSync();
UNIT_ASSERT(!result.IsSuccess());
}

} // Y_UNIT_TEST(CreateTopicsScenario)

Y_UNIT_TEST(CreatePartitionsScenario) {
Expand Down Expand Up @@ -2086,6 +2128,27 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ResourceName.value(), shortTopic0Name);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, INVALID_REQUEST);
}

{
// Legal, but meaningless for Logbroker config
std::map<TString, TString> configs { std::make_pair("flush.messages", "1") };
auto msg = client.AlterConfigs({ TTopicConfig(shortTopic0Name, 1, std::nullopt, std::nullopt, configs) });

UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ResourceName.value(), shortTopic0Name);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, NONE_ERROR);
}

{
// Both legal and illegal configs
std::map<TString, TString> configs { std::make_pair("compression.type", "zstd"), std::make_pair("flush.messages", "1") };
auto msg = client.AlterConfigs({ TTopicConfig(shortTopic0Name, 1, std::nullopt, std::nullopt, configs) });

UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ResourceName.value(), shortTopic0Name);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].ErrorCode, INVALID_REQUEST);
}

}

Y_UNIT_TEST(LoginWithApiKey) {
Expand Down

0 comments on commit c54e54d

Please sign in to comment.