Skip to content

Commit

Permalink
refactor CreatePartitions endpoint of Kafka API to use TUpdateSchemeA…
Browse files Browse the repository at this point in the history
…ctor
  • Loading branch information
siarheivesialou committed Jan 15, 2024
1 parent c5256a8 commit 8df7353
Showing 1 changed file with 32 additions and 61 deletions.
93 changes: 32 additions & 61 deletions ydb/core/kafka_proxy/actors/kafka_create_partitions_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
TIntrusiveConstPtr<NACLib::TUserToken> userToken,
TString topicPath,
TString databaseName,
const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus, TString&)> sendResultCallback)
const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus, const TString&)> sendResultCallback)
: UserToken(userToken)
, TopicPath(topicPath)
, DatabaseName(databaseName)
Expand Down Expand Up @@ -71,7 +71,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
};

void ReplyWithYdbStatus(Ydb::StatusIds::StatusCode status) override {
processYdbStatusCode(status);
ProcessYdbStatusCode(status);
};

void ReplyWithRpcStatus(grpc::StatusCode code, const TString& msg = "", const TString& details = "") override {
Expand Down Expand Up @@ -101,8 +101,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
}

void RaiseIssue(const NYql::TIssue& issue) override{
ReplyMessage = issue.GetMessage();
Y_UNUSED(issue);
Issue = issue;
}

void RaiseIssues(const NYql::TIssues& issues) override {
Expand Down Expand Up @@ -174,7 +173,7 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt

void SendResult(const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override {
Y_UNUSED(result);
processYdbStatusCode(status);
ProcessYdbStatusCode(status);
};

void SendResult(
Expand All @@ -184,15 +183,15 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt

Y_UNUSED(result);
Y_UNUSED(message);
processYdbStatusCode(status, std::optional(std::ref(message)));
ProcessYdbStatusCode(status);
};

void SendResult(
Ydb::StatusIds::StatusCode status,
const google::protobuf::RepeatedPtrField<NKikimr::NGRpcService::TYdbIssueMessageType>& message) override {

Y_UNUSED(message);
processYdbStatusCode(status, std::optional(std::ref(message)));
ProcessYdbStatusCode(status);
};

const Ydb::Operations::OperationParams& operation_params() const {
Expand All @@ -214,47 +213,33 @@ class TKafkaCreatePartitionsRequest : public NKikimr::NGRpcService::IRequestOpCt
const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts;
const TString TopicPath;
const TString DatabaseName;
const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message)> SendResultCallback;
TString ReplyMessage;

void processYdbStatusCode(
Ydb::StatusIds::StatusCode& status,
std::optional<std::reference_wrapper<const google::protobuf::RepeatedPtrField<
NKikimr::NGRpcService::TYdbIssueMessageType>>> issueMessagesOpt = std::nullopt) {
const std::function<void(TEvKafka::TEvTopicModificationResponse::EStatus status, const TString& message)> SendResultCallback;
NYql::TIssue Issue;

void ProcessYdbStatusCode(Ydb::StatusIds::StatusCode& status) {
const auto& message = Issue.GetMessage();
switch (status) {
case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SUCCESS:
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::OK, ReplyMessage);
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::OK, message);
break;
case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_BAD_REQUEST:
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST, ReplyMessage);
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::BAD_REQUEST, message);
break;
case Ydb::StatusIds::StatusCode::StatusIds_StatusCode_SCHEME_ERROR:
if (issueMessagesOpt.has_value()) {
auto& issueMessages = issueMessagesOpt.value().get();
bool hasPathNotExists = std::find_if(
issueMessages.begin(),
issueMessages.end(),
[](const auto& msg){ return msg.issue_code() == NKikimrIssues::TIssuesIds::PATH_NOT_EXIST; }
) != issueMessages.end();

if (hasPathNotExists) {
SendResultCallback(TEvKafka::TEvTopicModificationResponse:: EStatus::TOPIC_DOES_NOT_EXIST, ReplyMessage);
} else {
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
}
if (Issue.IssueCode == Ydb::PersQueue::ErrorCode::ACCESS_DENIED) {
SendResultCallback(TEvKafka::TEvTopicModificationResponse:: EStatus::TOPIC_DOES_NOT_EXIST, message);
} else {
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, message);
}
break;
default:
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, ReplyMessage);
SendResultCallback(TEvKafka::TEvTopicModificationResponse::EStatus::ERROR, message);
}
}
};

class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreatePartitionsActor, TKafkaCreatePartitionsRequest> {
using TBase = NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase<TCreatePartitionsActor, TKafkaCreatePartitionsRequest>;
class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TUpdateSchemeActor<TCreatePartitionsActor, TKafkaCreatePartitionsRequest>{
using TBase = NKikimr::NGRpcProxy::V1::TUpdateSchemeActor<TCreatePartitionsActor, TKafkaCreatePartitionsRequest>;
public:

TCreatePartitionsActor(
Expand All @@ -267,7 +252,7 @@ class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase
userToken,
topicPath,
databaseName,
[this](TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) {
[this](TEvKafka::TEvTopicModificationResponse::EStatus status, const TString& message) {
this->SendResult(status, message);
})
)
Expand All @@ -283,7 +268,7 @@ class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase

~TCreatePartitionsActor() = default;

void SendResult(TEvKafka::TEvTopicModificationResponse::EStatus status, TString& message) {
void SendResult(TEvKafka::TEvTopicModificationResponse::EStatus status, const TString& message) {
THolder<TEvKafka::TEvTopicModificationResponse> response(new TEvKafka::TEvTopicModificationResponse());
response->Status = status;
response->TopicPath = TopicPath;
Expand All @@ -292,39 +277,25 @@ class TCreatePartitionsActor : public NKikimr::NGRpcProxy::V1::TPQGrpcSchemaBase
Send(SelfId(), new TEvents::TEvPoison());
}

void FillProposeRequest(
NKikimr::TEvTxUserProxy::TEvProposeTransaction &proposal,
const TActorContext &ctx,
const TString &workingDir,
const TString &name) {

void ModifyPersqueueConfig(
const TActorContext& ctx,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
) {
Y_UNUSED(ctx);
Y_UNUSED(pqGroupDescription);
Y_UNUSED(selfInfo);

NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme());
modifyScheme.SetWorkingDir(workingDir);
modifyScheme.SetOperationType(::NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);

auto pqDescr = modifyScheme.MutableAlterPersQueueGroup();
(*pqDescr).SetTotalGroupCount(PartionsNumber);
(*pqDescr).SetName(name);
};
groupConfig.SetTotalGroupCount(PartionsNumber);
}

void Bootstrap(const NActors::TActorContext& ctx) {
TBase::Bootstrap(ctx);
SendProposeRequest(ctx);
Become(&TCreatePartitionsActor::StateWork);
SendDescribeProposeRequest(ctx);
Become(&TBase::StateWork);
};

void StateWork(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
hFunc(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult, TActorBase::Handle);
default:
TBase::StateWork(ev);
}
}

void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); }

private:
const TActorId Requester;
const TString TopicPath;
Expand Down

0 comments on commit 8df7353

Please sign in to comment.