Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor CreatePartitions endpoint of Kafka API to use TUpdateSchemeActor #1005

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 32 additions & 61 deletions ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TString topicPath,
TString databaseName,
const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus, TString&)> sendResultCallback)
const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus, const TString&)> sendResultCallback)
: UserToken(userToken)
, TopicPath(topicPath)
, DatabaseName(databaseName)
Expand Down Expand Up @@ -71,7 +71,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
};

void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
processYdbStatusCode(status);
ProcessYdbStatusCode(status);
};

void ReplyWithRpcStatus(grpc::StatusCode code, const TString& msg = "", const TString& details = "") override {
Expand Down Expand Up @@ -101,8 +101,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
}

void RaiseIssue(const NYql::TIssue& issue) override{
ReplyMessage = issue.GetMessage();
Y_UNUSED(issue);
Issue = issue;
}

void RaiseIssues(const NYql::TIssues& issues) override {
Expand Down Expand Up @@ -174,7 +173,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt

void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override {
Y_UNUSED(result);
processYdbStatusCode(status);
ProcessYdbStatusCode(status);
};

void SendResult(
Expand All @@ -184,15 +183,15 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt

Y_UNUSED(result);
Y_UNUSED(message);
processYdbStatusCode(status, std::optional(std::ref(message)));
ProcessYdbStatusCode(status);
};

void SendResult(
Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<NKikimr::NGRpcService::TYdbIssueMessageType>& message) override {

Y_UNUSED(message);
processYdbStatusCode(status, std::optional(std::ref(message)));
ProcessYdbStatusCode(status);
};

const Ydb::Operations::OperationParams& operation_params() const {
Expand All @@ -214,47 +213,33 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts;
const TString TopicPath;
const TString DatabaseName;
const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message)> SendResultCallback;
TString ReplyMessage;

void processYdbStatusCode(
Ydb::StatusIds::StatusCode& status,
std::optional<std::reference_wrapper<const google::protobuf::RepeatedPtrField<
NKikimr::NGRpcService::TYdbIssueMessageType>>> issueMessagesOpt = std::nullopt) {
const std::function<void(const TEvKafka::TEvTopicModificationResponse::EStatus status, const TString& message)> SendResultCallback;
NYql::TIssue Issue;

void ProcessYdbStatusCode(Ydb::StatusIds::StatusCode& status) {
const auto& message = Issue.GetMessage();
switch (status) {
case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SUCCESS:
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::OK, ReplyMessage);
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::OK, message);
break;
case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST:
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST, ReplyMessage);
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST, message);
break;
case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SCHEME_ERROR:
if (issueMessagesOpt.has_value()) {
auto& issueMessages = issueMessagesOpt.value().get();
bool hasPathNotExists = std::find_if(
issueMessages.begin(),
issueMessages.end(),
[](const auto& msg){ return msg.issue_code() == NKikimrIssues::TIssuesIds::PATH_NOT_EXIST; }
) != issueMessages.end();

if (hasPathNotExists) {
SendResultCallback(TEvKafka::TEvTopicModificationResponse:: EStatus::TOPIC_DOES_NOT_EXIST, ReplyMessage);
} else {
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
}
if (Issue.IssueCode == Ydb::PersQueue::ErrorCode::ACCESS_DENIED) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему такой выбор именно конвертации scheme_error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Обсудили, исправил

SendResultCallback(TEvKafka::TEvTopicModificationResponse:: EStatus::TOPIC_DOES_NOT_EXIST, message);
} else {
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, message);
}
break;
default:
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, message);
}
}
};

class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreatePartitionsActor, TKafkaCreatePartitionsRequest> {
using TBase = NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreatePartitionsActor, TKafkaCreatePartitionsRequest>;
class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TUpdateSchemeActor<TCreatePartitionsActor, TKafkaCreatePartitionsRequest>{
using TBase = NKikimr::NGRpcProxy::V1::TUpdateSchemeActor<TCreatePartitionsActor, TKafkaCreatePartitionsRequest>;
public:

TCreatePartitionsActor(
Expand All @@ -267,7 +252,7 @@ class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase
userToken,
topicPath,
databaseName,
[this](TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) {
[this](const TEvKafka::TEvTopicModificationResponse::EStatus status, const TString& message) {
this->SendResult(status, message);
})
)
Expand All @@ -283,7 +268,7 @@ class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase

~TCreatePartitionsActor() = default;

void SendResult(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) {
void SendResult(const TEvKafka::TEvTopicModificationResponse::EStatus status, const TString& message) {
THolder<TEvKafka::TEvTopicModificationResponse> response(new TEvKafka::TEvTopicModificationResponse());
response->Status = status;
response->TopicPath = TopicPath;
Expand All @@ -292,39 +277,25 @@ class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase
Send(SelfId(), new TEvents::TEvPoison());
}

void FillProposeRequest(
NKikimr::TEvTxUserProxy::TEvProposeTransaction &proposal,
const TActorContext &ctx,
const TString &workingDir,
const TString &name) {

void ModifyPersqueueConfig(
const TActorContext& ctx,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
) {
Y_UNUSED(ctx);
Y_UNUSED(pqGroupDescription);
Y_UNUSED(selfInfo);

NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme());
modifyScheme.SetWorkingDir(workingDir);
modifyScheme.SetOperationType(::NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);

auto pqDescr = modifyScheme.MutableAlterPersQueueGroup();
(*pqDescr).SetTotalGroupCount(PartionsNumber);
(*pqDescr).SetName(name);
};
groupConfig.SetTotalGroupCount(PartionsNumber);
}

void Bootstrap(const NActors::TActorContext& ctx) {
TBase::Bootstrap(ctx);
SendProposeRequest(ctx);
Become(&TCreatePartitionsActor::StateWork);
SendDescribeProposeRequest(ctx);
Become(&TBase::StateWork);
};

void StateWork(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
hFunc(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult, TActorBase::Handle);
default:
TBase::StateWork(ev);
}
}

void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); }

private:
const TActorId Requester;
const TString TopicPath;
Expand Down
Loading