diff --git a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp index 6f98326a5824..5bdd06dce583 100644 --- a/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp @@ -17,7 +17,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt TIntrusiveConstPtr userToken, TString topicPath, TString databaseName, - const std::function sendResultCallback) + const std::function sendResultCallback) : UserToken(userToken) , TopicPath(topicPath) , DatabaseName(databaseName) @@ -71,7 +71,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt }; void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override { - processYdbStatusCode(status); + ProcessYdbStatusCode(status); }; void ReplyWithRpcStatus(grpc::StatusCode code, const TString& msg = "", const TString& details = "") override { @@ -101,8 +101,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt } void RaiseIssue(const NYql::TIssue& issue) override{ - ReplyMessage = issue.GetMessage(); - Y_UNUSED(issue); + Issue = issue; } void RaiseIssues(const NYql::TIssues& issues) override { @@ -174,7 +173,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override { Y_UNUSED(result); - processYdbStatusCode(status); + ProcessYdbStatusCode(status); }; void SendResult( @@ -184,7 +183,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt Y_UNUSED(result); Y_UNUSED(message); - processYdbStatusCode(status, std::optional(std::ref(message))); + ProcessYdbStatusCode(status); }; void SendResult( @@ -192,7 +191,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt const google::protobuf::RepeatedPtrField& message) override { Y_UNUSED(message); - processYdbStatusCode(status, std::optional(std::ref(message))); + ProcessYdbStatusCode(status); }; const Ydb::Operations::OperationParams& operation_params() const { @@ -214,47 +213,16 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts; const TString TopicPath; const TString DatabaseName; - const std::function SendResultCallback; - TString ReplyMessage; - - void processYdbStatusCode( - Ydb::StatusIds::StatusCode& status, - std::optional>> issueMessagesOpt = std::nullopt) { - - switch (status) { - case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SUCCESS: - SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::OK, ReplyMessage); - break; - case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST: - SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST, ReplyMessage); - break; - case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SCHEME_ERROR: - if (issueMessagesOpt.has_value()) { - auto& issueMessages = issueMessagesOpt.value().get(); - bool hasPathNotExists = std::find_if( - issueMessages.begin(), - issueMessages.end(), - [](const auto& msg){ return msg.issue_code() == NKikimrIssues::TIssuesIds::PATH_NOT_EXIST; } - ) != issueMessages.end(); - - if (hasPathNotExists) { - SendResultCallback(TEvKafka::TEvTopicModificationResponse:: EStatus::TOPIC_DOES_NOT_EXIST, ReplyMessage); - } else { - SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage); - } - } else { - SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage); - } - break; - default: - SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage); - } + const std::function SendResultCallback; + NYql::TIssue Issue; + + void ProcessYdbStatusCode(Ydb::StatusIds::StatusCode& status) { + SendResultCallback(ConvertErrorCode(status), Issue.GetMessage()); } }; -class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase { - using TBase = NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase; +class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TUpdateSchemeActor{ + using TBase = NKikimr::NGRpcProxy::V1::TUpdateSchemeActor; public: TCreatePartitionsActor( @@ -267,7 +235,7 @@ class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase userToken, topicPath, databaseName, - [this](TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) { + [this](const EKafkaErrors status, const TString& message) { this->SendResult(status, message); }) ) @@ -283,7 +251,7 @@ class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase ~TCreatePartitionsActor() = default; - void SendResult(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) { + void SendResult(const EKafkaErrors status, const TString& message) { THolder response(new TEvKafka::TEvTopicModificationResponse()); response->Status = status; response->TopicPath = TopicPath; @@ -292,39 +260,25 @@ class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase Send(SelfId(), new TEvents::TEvPoison()); } - void FillProposeRequest( - NKikimr::TEvTxUserProxy::TEvProposeTransaction &proposal, - const TActorContext &ctx, - const TString &workingDir, - const TString &name) { - + void ModifyPersqueueConfig( + const TActorContext& ctx, + NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, + const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, + const NKikimrSchemeOp::TDirEntry& selfInfo + ) { Y_UNUSED(ctx); + Y_UNUSED(pqGroupDescription); + Y_UNUSED(selfInfo); - NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme()); - modifyScheme.SetWorkingDir(workingDir); - modifyScheme.SetOperationType(::NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup); - - auto pqDescr = modifyScheme.MutableAlterPersQueueGroup(); - (*pqDescr).SetTotalGroupCount(PartionsNumber); - (*pqDescr).SetName(name); - }; + groupConfig.SetTotalGroupCount(PartionsNumber); + } void Bootstrap(const NActors::TActorContext& ctx) { TBase::Bootstrap(ctx); - SendProposeRequest(ctx); - Become(&TCreatePartitionsActor::StateWork); + SendDescribeProposeRequest(ctx); + Become(&TBase::StateWork); }; - void StateWork(TAutoPtr& ev) { - switch (ev->GetTypeRewrite()) { - hFunc(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult, TActorBase::Handle); - default: - TBase::StateWork(ev); - } - } - - void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); } - private: const TActorId Requester; const TString TopicPath; @@ -367,7 +321,7 @@ void TKafkaCreatePartitionsActor::Bootstrap(const NActors::TActorContext& ctx) { if (topicName == "") { auto result = MakeHolder(); - result->Status = TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST; + result->Status = INVALID_REQUEST; result->Message = "Empty topic name"; this->TopicNamesToResponses[topicName] = TAutoPtr(result.Release()); continue; @@ -407,40 +361,33 @@ void TKafkaCreatePartitionsActor::Reply(const TActorContext& ctx) { for (auto& requestTopic : Message->Topics) { auto topicName = requestTopic.Name.value(); + if (!TopicNamesToResponses.contains(topicName)) { + continue; + } + TCreatePartitionsResponseData::TCreatePartitionsTopicResult responseTopic; responseTopic.Name = topicName; + responseTopic.ErrorMessage = TopicNamesToResponses[topicName]->Message; - if (TopicNamesToResponses.contains(topicName)) { - responseTopic.ErrorMessage = TopicNamesToResponses[topicName]->Message; - } - - auto setError= [&responseTopic, &responseStatus](EKafkaErrors status) { - responseTopic.ErrorCode = status; + EKafkaErrors status = TopicNamesToResponses[topicName]->Status; + responseTopic.ErrorCode = status; + if(status != NONE_ERROR) { responseStatus = status; - }; + } - if (DuplicateTopicNames.contains(topicName)) { - setError(DUPLICATE_RESOURCE); - } else { - switch (TopicNamesToResponses[topicName]->Status) { - case TEvKafka::TEvTopicModificationResponse::OK: - responseTopic.ErrorCode = NONE_ERROR; - break; - case TEvKafka::TEvTopicModificationResponse::BAD_REQUEST: - case TEvKafka::TEvTopicModificationResponse::TOPIC_DOES_NOT_EXIST: - setError(INVALID_REQUEST); - break; - case TEvKafka::TEvTopicModificationResponse::ERROR: - setError(UNKNOWN_SERVER_ERROR); - break; - case TEvKafka::TEvTopicModificationResponse::INVALID_CONFIG: - setError(INVALID_CONFIG); - break; - } - } response->Results.push_back(responseTopic); } + for (auto& topicName : DuplicateTopicNames) { + TCreatePartitionsResponseData::TCreatePartitionsTopicResult responseTopic; + responseTopic.Name = topicName; + responseTopic.ErrorMessage = "Duplicate topic in request."; + responseTopic.ErrorCode = INVALID_REQUEST; + + response->Results.push_back(responseTopic); + + responseStatus = INVALID_REQUEST; + } Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, responseStatus)); Die(ctx); diff --git a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp index ecb3ddd1b3a9..f70d6ec20d25 100644 --- a/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_create_topics_actor.cpp @@ -17,7 +17,7 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx { TIntrusiveConstPtr userToken, TString topicPath, TString databaseName, - const std::function sendResultCallback) + const std::function sendResultCallback) : UserToken(userToken) , TopicPath(topicPath) , DatabaseName(databaseName) @@ -71,7 +71,7 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx { }; void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override { - processYdbStatusCode(status); + ProcessYdbStatusCode(status); }; void ReplyWithRpcStatus(grpc::StatusCode code, const TString& msg = "", const TString& details = "") override { @@ -171,7 +171,7 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx { void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override { Y_UNUSED(result); - processYdbStatusCode(status); + ProcessYdbStatusCode(status); }; void SendResult( @@ -181,7 +181,7 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx { Y_UNUSED(result); Y_UNUSED(message); - processYdbStatusCode(status); + ProcessYdbStatusCode(status); }; void SendResult( @@ -189,7 +189,7 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx { const google::protobuf::RepeatedPtrField& message) override { Y_UNUSED(message); - processYdbStatusCode(status); + ProcessYdbStatusCode(status); }; const Ydb::Operations::OperationParams& operation_params() const { @@ -211,20 +211,11 @@ class TKafkaCreateTopicRequest : public NKikimr::NGRpcService::IRequestOpCtx { const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts; const TString TopicPath; const TString DatabaseName; - const std::function SendResultCallback; + const std::function SendResultCallback; TString ReplyMessage; - void processYdbStatusCode(Ydb::StatusIds::StatusCode& status) { - switch (status) { - case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SUCCESS: - SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::OK, ReplyMessage); - break; - case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST: - SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST, ReplyMessage); - break; - default: - SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage); - } + void ProcessYdbStatusCode(Ydb::StatusIds::StatusCode& status) { + SendResultCallback(ConvertErrorCode(status), ReplyMessage); } }; @@ -244,7 +235,7 @@ class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBaseSendResult(status, message); }) ) @@ -259,7 +250,7 @@ class TCreateTopicActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase response(new TEvKafka::TEvTopicModificationResponse()); response->Status = status; response->TopicPath = TopicPath; @@ -378,7 +369,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { if (topicName == "") { auto result = MakeHolder(); - result->Status = TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST; + result->Status = EKafkaErrors::INVALID_REQUEST; result->Message = "Empty topic name"; this->TopicNamesToResponses[topicName] = TAutoPtr(result.Release()); continue; @@ -395,7 +386,7 @@ void TKafkaCreateTopicsActor::Bootstrap(const NActors::TActorContext& ctx) { return true; } catch(std::invalid_argument) { auto result = MakeHolder(); - result->Status = TEvKafka::TEvTopicModificationResponse::EStatus::INVALID_CONFIG; + result->Status = EKafkaErrors::INVALID_CONFIG; result->Message = "Provided retention value is not a number"; this->TopicNamesToResponses[topic.Name.value()] = TAutoPtr(result.Release()); return false; @@ -489,26 +480,12 @@ void TKafkaCreateTopicsActor::Reply(const TActorContext& ctx) { if (DuplicateTopicNames.contains(topicName)) { setError(DUPLICATE_RESOURCE); } else { - switch (TopicNamesToResponses[topicName]->Status) { - case TEvKafka::TEvTopicModificationResponse::OK: - responseTopic.ErrorCode = NONE_ERROR; - addConfigIfRequired(TopicNamesToRetentions[topicName].first, RETENTION_MS_CONFIG_NAME); - addConfigIfRequired(TopicNamesToRetentions[topicName].second, RETENTION_BYTES_CONFIG_NAME); - break; - case TEvKafka::TEvTopicModificationResponse::BAD_REQUEST: - setError(INVALID_REQUEST); - break; - case TEvKafka::TEvTopicModificationResponse::TOPIC_DOES_NOT_EXIST: - KAFKA_LOG_ERROR("Create topics actor: Topic: [" << topicName << "]. Unexpected TOPIC_DOES_NOT_EXIST status received."); - setError(UNKNOWN_SERVER_ERROR); - break; - case TEvKafka::TEvTopicModificationResponse::ERROR: - setError(UNKNOWN_SERVER_ERROR); - break; - case TEvKafka::TEvTopicModificationResponse::INVALID_CONFIG: - setError(INVALID_CONFIG); - break; - } + auto status = TopicNamesToResponses[topicName]->Status; + if (status == EKafkaErrors::NONE_ERROR) { + addConfigIfRequired(TopicNamesToRetentions[topicName].first, RETENTION_MS_CONFIG_NAME); + addConfigIfRequired(TopicNamesToRetentions[topicName].second, RETENTION_BYTES_CONFIG_NAME); + } + setError(TopicNamesToResponses[topicName]->Status); } response->Topics.push_back(responseTopic); } diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index 1bec0c622ac3..d2237971c6d7 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -243,7 +243,7 @@ struct TEvTopicModificationResponse : public NActors::TEventLocal Read(std::shared_ptr< return result; } -struct TopicToCreate { - TopicToCreate( +struct TTopicConfig { + TTopicConfig( TString name, ui32 partionsNumber, std::optional retentionMs = std::nullopt, @@ -604,7 +604,7 @@ class TTestClient { return WriteAndRead(header, request); } - TMessagePtr CreateTopics(std::vector topicsToCreate, bool validateOnly = false) { + TMessagePtr CreateTopics(std::vector topicsToCreate, bool validateOnly = false) { Cerr << ">>>>> TCreateTopicsRequestData\n"; TRequestHeaderData header = Header(NKafka::EApiKey::CREATE_TOPICS, 7); @@ -634,7 +634,7 @@ class TTestClient { return WriteAndRead(header, request); } - TMessagePtr CreatePartitions(std::vector topicsToCreate, bool validateOnly = false) { + TMessagePtr CreatePartitions(std::vector topicsToCreate, bool validateOnly = false) { Cerr << ">>>>> TCreateTopicsRequestData\n"; TRequestHeaderData header = Header(NKafka::EApiKey::CREATE_PARTITIONS, 3); @@ -1576,8 +1576,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Creation of two topics auto msg = client.CreateTopics({ - TopicToCreate("topic-999-test", 12), - TopicToCreate("topic-998-test", 13) + TTopicConfig("topic-999-test", 12), + TTopicConfig("topic-998-test", 13) }); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 2); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-999-test"); @@ -1595,8 +1595,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Duplicate topics auto msg = client.CreateTopics({ - TopicToCreate("topic-997-test", 1), - TopicToCreate("topic-997-test", 1) + TTopicConfig("topic-997-test", 1), + TTopicConfig("topic-997-test", 1) }); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 2); @@ -1613,9 +1613,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // One OK, two duplicate topics auto msg = client.CreateTopics({ - TopicToCreate("topic-996-test", 1), - TopicToCreate("topic-995-test", 1), - TopicToCreate("topic-995-test", 1) + TTopicConfig("topic-996-test", 1), + TTopicConfig("topic-995-test", 1), + TTopicConfig("topic-995-test", 1) }); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 3); @@ -1636,11 +1636,11 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Existing topic - client.CreateTopics({ TopicToCreate("topic-994-test", 1) }); + client.CreateTopics({ TTopicConfig("topic-994-test", 1) }); auto result = pqClient.DescribeTopic("/Root/topic-994-test", describeTopicSettings).GetValueSync(); UNIT_ASSERT(result.IsSuccess()); - auto msg = client.CreateTopics({ TopicToCreate("topic-994-test", 1) }); + auto msg = client.CreateTopics({ TTopicConfig("topic-994-test", 1) }); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-994-test"); } @@ -1650,7 +1650,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { ui64 retentionMs = 168 * 60 * 60 * 1000; ui64 retentionBytes = 51'200'000'000ul; - auto msg = client.CreateTopics({ TopicToCreate("topic-993-test", 1, std::to_string(retentionMs), std::to_string(retentionBytes))}); + auto msg = client.CreateTopics({ TTopicConfig("topic-993-test", 1, std::to_string(retentionMs), std::to_string(retentionBytes))}); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-993-test"); @@ -1662,7 +1662,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // retention.ms is not number - auto msg = client.CreateTopics({ TopicToCreate("topic-992-test", 1, "not_a_number", "42")}); + auto msg = client.CreateTopics({ TTopicConfig("topic-992-test", 1, "not_a_number", "42")}); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-992-test"); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_CONFIG); @@ -1673,7 +1673,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // retention.bytes is not number - auto msg = client.CreateTopics({ TopicToCreate("topic-991-test", 1, "42", "not_a_number")}); + auto msg = client.CreateTopics({ TTopicConfig("topic-991-test", 1, "42", "not_a_number")}); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-991-test"); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_CONFIG); @@ -1684,28 +1684,28 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Empty topic name - auto msg = client.CreateTopics({ TopicToCreate("", 1)}); + auto msg = client.CreateTopics({ TTopicConfig("", 1)}); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST); } { // Wrong topic name - auto msg = client.CreateTopics({ TopicToCreate("//////", 1)}); + auto msg = client.CreateTopics({ TTopicConfig("//////", 1)}); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST); } { // Wrong topic name - auto msg = client.CreateTopics({ TopicToCreate("/Root/", 1)}); + auto msg = client.CreateTopics({ TTopicConfig("/Root/", 1)}); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST); } { // Wrong topic name - auto msg = client.CreateTopics({ TopicToCreate("/Root//", 1)}); + auto msg = client.CreateTopics({ TTopicConfig("/Root//", 1)}); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST); } @@ -1715,7 +1715,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { ui64 retentionMs = 13 * 60 * 60 * 1000; ui64 retentionBytes = 11'000'000'000ul; - auto msg = client.CreateTopics({ TopicToCreate("topic-990-test", 1, std::to_string(retentionMs), std::to_string(retentionBytes))}); + auto msg = client.CreateTopics({ TTopicConfig("topic-990-test", 1, std::to_string(retentionMs), std::to_string(retentionBytes))}); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-990-test"); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].ErrorCode, INVALID_REQUEST); @@ -1727,7 +1727,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Set only ms retention ui64 retentionMs = 168 * 60 * 60 * 1000; - auto msg = client.CreateTopics({ TopicToCreate("topic-989-test", 1, std::to_string(retentionMs)) }); + auto msg = client.CreateTopics({ TTopicConfig("topic-989-test", 1, std::to_string(retentionMs)) }); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-989-test"); @@ -1739,7 +1739,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Validation only - auto msg = client.CreateTopics({ TopicToCreate("topic-988-test", 1)}, true); + auto msg = client.CreateTopics({ TTopicConfig("topic-988-test", 1)}, true); UNIT_ASSERT_VALUES_EQUAL(msg->Topics.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Topics[0].Name.value(), "topic-988-test"); @@ -1814,8 +1814,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Validate only auto msg = client.CreatePartitions({ - TopicToCreate(topic1Name, 11), - TopicToCreate(topic2Name, 21) + TTopicConfig(topic1Name, 11), + TTopicConfig(topic2Name, 21) }, true); UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 2); UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), topic1Name); @@ -1833,8 +1833,8 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Increase partitions number auto msg = client.CreatePartitions({ - TopicToCreate(shortTopic1Name, 11), - TopicToCreate(shortTopic2Name, 21) + TTopicConfig(shortTopic1Name, 11), + TTopicConfig(shortTopic2Name, 21) }); UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 2); @@ -1855,28 +1855,22 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Check with two same topic names auto msg = client.CreatePartitions({ - TopicToCreate(shortTopic1Name, 11), - TopicToCreate(shortTopic1Name, 11) + TTopicConfig(shortTopic1Name, 12), + TTopicConfig(shortTopic1Name, 12) }); - UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), shortTopic1Name); - UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].ErrorCode, DUPLICATE_RESOURCE); - UNIT_ASSERT_VALUES_EQUAL(msg->Results[1].Name.value(), shortTopic1Name); - UNIT_ASSERT_VALUES_EQUAL(msg->Results[1].ErrorCode, DUPLICATE_RESOURCE); - - auto result1 = pqClient.DescribeTopic(topic1Name, describeTopicSettings).GetValueSync(); - UNIT_ASSERT(result1.IsSuccess()); - UNIT_ASSERT_EQUAL(result1.GetTopicDescription().GetPartitions().size(), 11); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].ErrorCode, INVALID_REQUEST); - auto result2 = pqClient.DescribeTopic(topic2Name, describeTopicSettings).GetValueSync(); - UNIT_ASSERT(result2.IsSuccess()); - UNIT_ASSERT_EQUAL(result2.GetTopicDescription().GetPartitions().size(), 21); + auto result = pqClient.DescribeTopic(topic1Name, describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result.IsSuccess()); + UNIT_ASSERT_EQUAL(result.GetTopicDescription().GetPartitions().size(), 11); } { // Check with lesser partitions number - auto msg = client.CreatePartitions({ TopicToCreate(shortTopic1Name, 1) }); + auto msg = client.CreatePartitions({ TTopicConfig(shortTopic1Name, 1) }); UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), shortTopic1Name); @@ -1890,11 +1884,11 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { { // Check with nonexistent topic name auto topicName = "NonExTopicName"; - auto msg = client.CreatePartitions({ TopicToCreate(topicName, 1) }); + auto msg = client.CreatePartitions({ TTopicConfig(topicName, 1) }); UNIT_ASSERT_VALUES_EQUAL(msg->Results.size(), 1); UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].Name.value(), topicName); - UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].ErrorCode, INVALID_REQUEST); + UNIT_ASSERT_VALUES_EQUAL(msg->Results[0].ErrorCode, UNKNOWN_TOPIC_OR_PARTITION); auto result1 = pqClient.DescribeTopic(topic1Name, describeTopicSettings).GetValueSync(); UNIT_ASSERT(result1.IsSuccess());