diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index 3a07f1dd871e..5102144fd98b 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -52,7 +52,9 @@ struct TContext { NKikimr::NPQ::TRlContext RlContext; bool Authenticated() { + return !RequireAuthentication || AuthenticationStep == SUCCESS; + } TActorId DiscoveryCacheActor; @@ -128,6 +130,8 @@ inline EKafkaErrors ConvertErrorCode(Ydb::PersQueue::ErrorCode::ErrorCode code) switch (code) { case Ydb::PersQueue::ErrorCode::ErrorCode::OK: return EKafkaErrors::NONE_ERROR; + case Ydb::PersQueue::ErrorCode::ErrorCode::UNKNOWN_READ_RULE: + return EKafkaErrors::GROUP_ID_NOT_FOUND; case Ydb::PersQueue::ErrorCode::ErrorCode::BAD_REQUEST: return EKafkaErrors::INVALID_REQUEST; case Ydb::PersQueue::ErrorCode::ErrorCode::ERROR: diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 0a0b2aa29a44..223cc9013480 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -101,9 +101,9 @@ class TKafkaConnection: public TActorBootstrapped, public TNet void Bootstrap() { Context->ConnectionId = SelfId(); + Context->RequireAuthentication = NKikimr::AppData()->EnforceUserTokenRequirement; // if no authentication required, then we can use local database as our target - if (!NKikimr::AppData()->EnforceUserTokenRequirement) { - Context->RequireAuthentication = false; + if (!Context->RequireAuthentication) { Context->DatabasePath = NKikimr::AppData()->TenantName; } diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index cd6639a4f450..ed9556688ea0 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -1066,7 +1066,6 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); CreateTopic(pqClient, topicName, minActivePartitions, {}); - CreateTopic(pqClient, topicName, minActivePartitions, {}); TTestClient client(testServer.Port); @@ -1482,7 +1481,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { topics.push_back(topicName); auto joinResponse = clientA.JoinGroup(topics, notExistsGroup); - UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast(EKafkaErrors::INVALID_REQUEST)); // because TReadInitAndAuthActor returns BAD_REQUEST on failed readRule check + UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast(EKafkaErrors::GROUP_ID_NOT_FOUND)); } { @@ -1649,7 +1648,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->Topics.back().Partitions.size(), minActivePartitions); for (const auto& topic : msg->Topics) { for (const auto& partition : topic.Partitions) { - UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast(EKafkaErrors::INVALID_REQUEST)); + UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast(EKafkaErrors::GROUP_ID_NOT_FOUND)); } } } @@ -1681,7 +1680,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(msg->Topics.back().Partitions.size(), minActivePartitions); for (const auto& topic : msg->Topics) { for (const auto& partition : topic.Partitions) { - UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast(EKafkaErrors::INVALID_REQUEST)); + UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast(EKafkaErrors::GROUP_ID_NOT_FOUND)); } } } diff --git a/ydb/public/api/protos/persqueue_error_codes_v1.proto b/ydb/public/api/protos/persqueue_error_codes_v1.proto index c11c3fad8daf..65ef172ff004 100644 --- a/ydb/public/api/protos/persqueue_error_codes_v1.proto +++ b/ydb/public/api/protos/persqueue_error_codes_v1.proto @@ -59,4 +59,6 @@ enum ErrorCode { INVALID_ARGUMENT = 500040; VALIDATION_ERROR = 500080; + + UNKNOWN_READ_RULE = 500032; } diff --git a/ydb/services/persqueue_v1/actors/persqueue_utils.cpp b/ydb/services/persqueue_v1/actors/persqueue_utils.cpp index d26012063a7f..4af068fff99e 100644 --- a/ydb/services/persqueue_v1/actors/persqueue_utils.cpp +++ b/ydb/services/persqueue_v1/actors/persqueue_utils.cpp @@ -111,6 +111,7 @@ Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const Ydb::PersQ case READ_ERROR_TOO_BIG_OFFSET: case SET_OFFSET_ERROR_COMMIT_TO_FUTURE: case SET_OFFSET_ERROR_COMMIT_TO_PAST: + case UNKNOWN_READ_RULE: return Ydb::StatusIds::BAD_REQUEST; case WRONG_COOKIE: case CREATE_SESSION_ALREADY_LOCKED: diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index b263f5acdd0d..ca65e50c1844 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -203,7 +203,7 @@ bool TReadInitAndAuthActor::CheckTopicACL( if (!NPQ::HasConsumer(pqDescr.GetPQTabletConfig(), ClientId)) { CloseSession( TStringBuilder() << "no read rule provided for consumer '" << ClientPath << "' in topic '" << topic << "' in current cluster '" << LocalCluster << "'", - PersQueue::ErrorCode::BAD_REQUEST, ctx + PersQueue::ErrorCode::UNKNOWN_READ_RULE, ctx ); return false; }