Skip to content

Commit

Permalink
Kafka api better client errors (ydb-platform#13929)
Browse files Browse the repository at this point in the history
  • Loading branch information
a-serebryanskiy authored Feb 4, 2025
1 parent 4ce5a23 commit f6e8a74
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 7 deletions.
4 changes: 4 additions & 0 deletions ydb/core/kafka_proxy/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ struct TContext {
NKikimr::NPQ::TRlContext RlContext;

bool Authenticated() {

return !RequireAuthentication || AuthenticationStep == SUCCESS;

}

TActorId DiscoveryCacheActor;
Expand Down Expand Up @@ -128,6 +130,8 @@ inline EKafkaErrors ConvertErrorCode(Ydb::PersQueue::ErrorCode::ErrorCode code)
switch (code) {
case Ydb::PersQueue::ErrorCode::ErrorCode::OK:
return EKafkaErrors::NONE_ERROR;
case Ydb::PersQueue::ErrorCode::ErrorCode::UNKNOWN_READ_RULE:
return EKafkaErrors::GROUP_ID_NOT_FOUND;
case Ydb::PersQueue::ErrorCode::ErrorCode::BAD_REQUEST:
return EKafkaErrors::INVALID_REQUEST;
case Ydb::PersQueue::ErrorCode::ErrorCode::ERROR:
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kafka_proxy/kafka_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet

void Bootstrap() {
Context->ConnectionId = SelfId();
Context->RequireAuthentication = NKikimr::AppData()->EnforceUserTokenRequirement;
// if no authentication required, then we can use local database as our target
if (!NKikimr::AppData()->EnforceUserTokenRequirement) {
Context->RequireAuthentication = false;
if (!Context->RequireAuthentication) {
Context->DatabasePath = NKikimr::AppData()->TenantName;
}

Expand Down
7 changes: 3 additions & 4 deletions ydb/core/kafka_proxy/ut/ut_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,6 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {

NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
CreateTopic(pqClient, topicName, minActivePartitions, {});
CreateTopic(pqClient, topicName, minActivePartitions, {});

TTestClient client(testServer.Port);

Expand Down Expand Up @@ -1482,7 +1481,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
topics.push_back(topicName);

auto joinResponse = clientA.JoinGroup(topics, notExistsGroup);
UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::INVALID_REQUEST)); // because TReadInitAndAuthActor returns BAD_REQUEST on failed readRule check
UNIT_ASSERT_VALUES_EQUAL(joinResponse->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::GROUP_ID_NOT_FOUND));
}

{
Expand Down Expand Up @@ -1649,7 +1648,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(msg->Topics.back().Partitions.size(), minActivePartitions);
for (const auto& topic : msg->Topics) {
for (const auto& partition : topic.Partitions) {
UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::INVALID_REQUEST));
UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::GROUP_ID_NOT_FOUND));
}
}
}
Expand Down Expand Up @@ -1681,7 +1680,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
UNIT_ASSERT_VALUES_EQUAL(msg->Topics.back().Partitions.size(), minActivePartitions);
for (const auto& topic : msg->Topics) {
for (const auto& partition : topic.Partitions) {
UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::INVALID_REQUEST));
UNIT_ASSERT_VALUES_EQUAL(partition.ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::GROUP_ID_NOT_FOUND));
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/public/api/protos/persqueue_error_codes_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,6 @@ enum ErrorCode {

INVALID_ARGUMENT = 500040;
VALIDATION_ERROR = 500080;

UNKNOWN_READ_RULE = 500032;
}
1 change: 1 addition & 0 deletions ydb/services/persqueue_v1/actors/persqueue_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ Ydb::StatusIds::StatusCode ConvertPersQueueInternalCodeToStatus(const Ydb::PersQ
case READ_ERROR_TOO_BIG_OFFSET:
case SET_OFFSET_ERROR_COMMIT_TO_FUTURE:
case SET_OFFSET_ERROR_COMMIT_TO_PAST:
case UNKNOWN_READ_RULE:
return Ydb::StatusIds::BAD_REQUEST;
case WRONG_COOKIE:
case CREATE_SESSION_ALREADY_LOCKED:
Expand Down
2 changes: 1 addition & 1 deletion ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ bool TReadInitAndAuthActor::CheckTopicACL(
if (!NPQ::HasConsumer(pqDescr.GetPQTabletConfig(), ClientId)) {
CloseSession(
TStringBuilder() << "no read rule provided for consumer '" << ClientPath << "' in topic '" << topic << "' in current cluster '" << LocalCluster << "'",
PersQueue::ErrorCode::BAD_REQUEST, ctx
PersQueue::ErrorCode::UNKNOWN_READ_RULE, ctx
);
return false;
}
Expand Down

0 comments on commit f6e8a74

Please sign in to comment.