Skip to content

Commit

Permalink
Reading from the topic with guarantees after autoscaling in the new S…
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored May 6, 2024
1 parent fcc1c3c commit cca5246
Show file tree
Hide file tree
Showing 22 changed files with 2,587 additions and 1,604 deletions.
136 changes: 89 additions & 47 deletions ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr
if (!joinGroupRequest->GroupId.has_value() || (GroupId != "" && joinGroupRequest->GroupId.value() != GroupId)) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_GROUP_ID, TStringBuilder() << "invalid groupId# " << joinGroupRequest->GroupId.value_or(""));
CloseReadSession(ctx);
return;
return;
}

switch (ReadStep) {
case WAIT_JOIN_GROUP: { // join first time
if (joinGroupRequest->ProtocolType.has_value() && !joinGroupRequest->ProtocolType.value().empty() && joinGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
return;
}

auto supportedProtocolFound = TryFillTopicsToRead(joinGroupRequest, TopicsToReadNames);
Expand All @@ -103,13 +103,13 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr
if (!supportedProtocolFound) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INCONSISTENT_GROUP_PROTOCOL, TStringBuilder() << "unsupported assign protocol. Must be " << SUPPORTED_ASSIGN_STRATEGY);
CloseReadSession(ctx);
return;
return;
}

if (TopicsToReadNames.size() == 0) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, "empty topics to read list");
CloseReadSession(ctx);
return;
return;
}

JoinGroupCorellationId = ev->Get()->CorrelationId;
Expand All @@ -123,7 +123,7 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr
CloseReadSession(ctx);
return;
}
ReadStep = WAIT_SYNC_GROUP;
ReadStep = WAIT_SYNC_GROUP;
SendJoinGroupResponseOk(ctx, ev->Get()->CorrelationId);
break;
}
Expand Down Expand Up @@ -158,7 +158,7 @@ void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr
if (syncGroupRequest->ProtocolType.has_value() && !syncGroupRequest->ProtocolType.value().empty() && syncGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
return;
}

if (syncGroupRequest->GroupId != GroupId) {
Expand All @@ -171,7 +171,7 @@ void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr
SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, ILLEGAL_GENERATION, TStringBuilder() << "illegal generationId# " << syncGroupRequest->GenerationId << ", must be " << GenerationId);
return;
}

ReadStep = READING;
SendSyncGroupResponseOk(ctx, ev->Get()->CorrelationId);
NeedRebalance = false;
Expand All @@ -197,14 +197,14 @@ void TKafkaReadSessionActor::HandleLeaveGroup(TEvKafka::TEvLeaveGroupRequest::TP
CloseReadSession(ctx);
return;
}

SendLeaveGroupResponseOk(ctx, ev->Get()->CorrelationId);
CloseReadSession(ctx);
}

void TKafkaReadSessionActor::HandleHeartbeat(TEvKafka::TEvHeartbeatRequest::TPtr ev, const TActorContext& ctx) {
auto heartbeatRequest = ev->Get()->Request;

if (NextRequestError.Code != NONE_ERROR) {
SendHeartbeatResponseFail(ctx, ev->Get()->CorrelationId, NextRequestError.Code, NextRequestError.Message);
CloseReadSession(ctx);
Expand All @@ -224,7 +224,7 @@ void TKafkaReadSessionActor::HandleHeartbeat(TEvKafka::TEvHeartbeatRequest::TPtr
}

LastHeartbeatTime = TInstant::Now();
EKafkaErrors error = NeedRebalance || GenerationId != heartbeatRequest->GenerationId ? EKafkaErrors::REBALANCE_IN_PROGRESS : EKafkaErrors::NONE_ERROR; // if REBALANCE_IN_PROGRESS, client rejoin
EKafkaErrors error = NeedRebalance || GenerationId != heartbeatRequest->GenerationId ? EKafkaErrors::REBALANCE_IN_PROGRESS : EKafkaErrors::NONE_ERROR; // if REBALANCE_IN_PROGRESS, client rejoin
SendHeartbeatResponseOk(ctx, ev->Get()->CorrelationId, error);
}

Expand Down Expand Up @@ -408,7 +408,7 @@ void TKafkaReadSessionActor::HandlePipeDestroyed(TEvTabletPipe::TEvClientDestroy

void TKafkaReadSessionActor::ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx) {
NewPartitionsToLockOnTime.clear();

for (auto& [topicName, topicInfo] : TopicsInfo) {
if (topicInfo.TabletID == tabletId) {
auto partitionsIt = TopicPartitions.find(topicName);
Expand Down Expand Up @@ -439,13 +439,13 @@ void TKafkaReadSessionActor::AuthAndFindBalancers(const TActorContext& ctx) {
auto topicHandler = std::make_unique<NPersQueue::TTopicsListController>(
topicConverterFactory
);

TopicsToConverter = topicHandler->GetReadTopicsList(TopicsToReadNames, false, Context->DatabasePath);
if (!TopicsToConverter.IsValid) {
SendJoinGroupResponseFail(ctx, JoinGroupCorellationId, INVALID_REQUEST, TStringBuilder() << "topicsToConverter is not valid");
return;
}

ctx.Register(new NGRpcProxy::V1::TReadInitAndAuthActor(
ctx, ctx.SelfID, GroupId, Cookie, Session, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), MakeSchemeCacheID(), nullptr, Context->UserToken, TopicsToConverter,
topicHandler->GetLocalCluster(), false));
Expand Down Expand Up @@ -473,12 +473,12 @@ void TKafkaReadSessionActor::HandleAuthOk(NGRpcProxy::V1::TEvPQProxy::TEvAuthRes
}

Send(Context->ConnectionId, new TEvKafka::TEvReadSessionInfo(GroupId));

for (auto& [topicName, topicInfo] : TopicsInfo) {
topicInfo.PipeClient = CreatePipeClient(topicInfo.TabletID, ctx);
RegisterBalancerSession(topicInfo.FullConverter->GetInternalName(), topicInfo.PipeClient, topicInfo.Groups, ctx);
}

if (JoinGroupCorellationId != 0) {
SendJoinGroupResponseOk(ctx, JoinGroupCorellationId);
JoinGroupCorellationId = 0;
Expand All @@ -491,7 +491,7 @@ void TKafkaReadSessionActor::HandleAuthCloseSession(NGRpcProxy::V1::TEvPQProxy::
SendJoinGroupResponseFail(ctx, JoinGroupCorellationId, ConvertErrorCode(ev->Get()->ErrorCode), TStringBuilder() << "auth failed. " << ev->Get()->Reason);
JoinGroupCorellationId = 0;
}

CloseReadSession(ctx);
}

Expand Down Expand Up @@ -522,7 +522,7 @@ void TKafkaReadSessionActor::RegisterBalancerSession(const TString& topic, const
void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record;
KAFKA_LOG_D("partition lock is coming from PQRB topic# " << record.GetTopic() << ", partition# " << record.GetPartition());

Y_ABORT_UNLESS(record.GetSession() == Session);
Y_ABORT_UNLESS(record.GetClientId() == GroupId);
Y_ABORT_UNLESS(record.GetGeneration() > 0);
Expand All @@ -534,8 +534,8 @@ void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition:

auto converterIter = FullPathToConverter.find(NPersQueue::NormalizeFullPath(path));
if (converterIter == FullPathToConverter.end()) {
KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic()
<< ", partition# " << record.GetPartition()
KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic()
<< ", partition# " << record.GetPartition()
<< ", reason# path not recognized");
return;
}
Expand All @@ -544,8 +544,8 @@ void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition:

auto topicInfoIt = TopicsInfo.find(topicName);
if (topicInfoIt == TopicsInfo.end() || (topicInfoIt->second.PipeClient != ActorIdFromProto(record.GetPipeClient()))) {
KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic()
<< ", partition# " << record.GetPartition()
KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic()
<< ", partition# " << record.GetPartition()
<< ", reason# topic is unknown");
return;
}
Expand All @@ -571,7 +571,7 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
Y_ABORT_UNLESS(topicInfoIt != TopicsInfo.end());

if (topicInfoIt->second.PipeClient != ActorIdFromProto(record.GetPipeClient())) {
KAFKA_LOG_I("ignored ev release topic# " << record.GetTopic()
KAFKA_LOG_I("ignored ev release topic# " << record.GetTopic()
<< ", reason# topic is unknown");
return;
}
Expand All @@ -582,54 +582,96 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
auto topicPartitionsIt = TopicPartitions.find(pathIt->second->GetInternalName());
Y_ABORT_UNLESS(record.GetCount() <= (topicPartitionsIt.IsEnd() ? 0 : topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size()) + newPartitionsToLockCount);

for (ui32 c = 0; c < record.GetCount(); ++c) {
// if some partition not locked yet, then release it without rebalance
if (newPartitionsToLockCount > 0) {
newPartitionsToLockCount--;
InformBalancerAboutPartitionRelease(topicInfoIt->first, newPartitionsToLockIt->second.back().PartitionId, ctx);
newPartitionsToLockIt->second.pop_back();
continue;
}
if (!group) {
for (ui32 c = 0; c < record.GetCount(); ++c) {
// if some partition not locked yet, then release it without rebalance
if (newPartitionsToLockCount > 0) {
newPartitionsToLockCount--;
InformBalancerAboutPartitionRelease(topicInfoIt->first, newPartitionsToLockIt->second.back().PartitionId, ctx);
newPartitionsToLockIt->second.pop_back();
continue;
}

if (!topicPartitionsIt->second.ToLock.empty()) {
auto partitionToReleaseIt = topicPartitionsIt->second.ToLock.begin();
topicPartitionsIt->second.ToLock.erase(partitionToReleaseIt);
InformBalancerAboutPartitionRelease(topicInfoIt->first, *partitionToReleaseIt, ctx);
continue;
}

NeedRebalance = true;
ui32 partitionToRelease = 0;
ui32 i = 0;

for (auto curPartition : topicPartitionsIt->second.ReadingNow) {
if (!topicPartitionsIt->second.ToRelease.contains(curPartition)) {
++i;
if (rand() % i == 0) {
partitionToRelease = curPartition;
}
}
}

if (!topicPartitionsIt->second.ToLock.empty()) {
auto partitionToReleaseIt = topicPartitionsIt->second.ToLock.begin();
topicPartitionsIt->second.ToLock.erase(partitionToReleaseIt);
InformBalancerAboutPartitionRelease(topicInfoIt->first, *partitionToReleaseIt, ctx);
continue;
topicPartitionsIt->second.ToRelease.emplace(partitionToRelease);
}
} else {
auto partitionToRelease = record.GetGroup() - 1;

if (newPartitionsToLockIt != NewPartitionsToLockOnTime.end()) {
auto& newPartitions = newPartitionsToLockIt->second;
for (auto& newPartition : newPartitions) {
if (newPartition.PartitionId == partitionToRelease) {
InformBalancerAboutPartitionRelease(topicInfoIt->first, partitionToRelease, ctx);

NeedRebalance = true;
ui32 partitionToRelease = 0;
ui32 i = 0;
auto tmp = std::move(newPartitions);
newPartitions.reserve(tmp.size() - 1);

for (auto curPartition : topicPartitionsIt->second.ReadingNow) {
if (!topicPartitionsIt->second.ToRelease.contains(curPartition) && (group == 0 || curPartition + 1 == group)) {
++i;
if (rand() % i == 0) {
partitionToRelease = curPartition;
for (auto& t : tmp) {
if (t.PartitionId != partitionToRelease) {
newPartitions.push_back(t);
}
}

return;
}
}
}

topicPartitionsIt->second.ToRelease.emplace(partitionToRelease);
if (topicPartitionsIt != TopicPartitions.end()) {
if (topicPartitionsIt->second.ToLock.contains(partitionToRelease)) {
InformBalancerAboutPartitionRelease(topicInfoIt->first, partitionToRelease, ctx);
topicPartitionsIt->second.ToLock.erase(partitionToRelease);
return;
}

if (topicPartitionsIt->second.ReadingNow.contains(partitionToRelease) && !topicPartitionsIt->second.ToRelease.contains(partitionToRelease)) {
InformBalancerAboutPartitionRelease(topicInfoIt->first, partitionToRelease, ctx);
NeedRebalance = true;
topicPartitionsIt->second.ReadingNow.erase(partitionToRelease);
return;
}
}

KAFKA_LOG_I("ignored ev release topic# " << record.GetTopic()
<< ", reason# partition " << partitionToRelease << " isn`t locked");
}
}

void TKafkaReadSessionActor::InformBalancerAboutPartitionRelease(const TString& topic, ui64 partition, const TActorContext& ctx) {
KAFKA_LOG_I("released topic# " << topic
<< ", partition# " << partition);
<< ", partition# " << partition);
auto request = MakeHolder<TEvPersQueue::TEvPartitionReleased>();

auto topicIt = TopicsInfo.find(topic);
Y_ABORT_UNLESS(topicIt != TopicsInfo.end());

auto& req = request->Record;
req.SetSession(Session);
ActorIdToProto(topicIt->second.PipeClient, req.MutablePipeClient());
req.SetClientId(GroupId);
req.SetTopic(topicIt->second.FullConverter->GetPrimaryPath());
req.SetPartition(partition);

NTabletPipe::SendData(ctx, topicIt->second.PipeClient, request.Release());
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kafka_proxy/ut/ya.make
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
UNITTEST_FOR(ydb/core/kafka_proxy)

SIZE(medium)
TIMEOUT(600)

SRCS(
ut_kafka_functions.cpp
Expand Down
12 changes: 1 addition & 11 deletions ydb/core/persqueue/events/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ struct TEvPersQueue {
EvDescribeResponse,
EvGetReadSessionsInfo,
EvReadSessionsInfoResponse,
EvWakeupClient,
EvWakeupClient, // deprecated
EvUpdateACL,
EvCheckACL,
EvCheckACLResponse,
Expand Down Expand Up @@ -198,16 +198,6 @@ struct TEvPersQueue {
TEvPartitionClientInfoResponse() = default;
};

struct TEvWakeupClient : TEventLocal<TEvWakeupClient, EvWakeupClient> {
TEvWakeupClient(const TString& client, const ui32 group)
: Client(client)
, Group(group)
{}

TString Client;
ui32 Group;
};

struct TEvDescribe : public TEventPB<TEvDescribe, NKikimrPQ::TDescribe, EvDescribe> {
TEvDescribe()
{}
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ struct TEvPQ {
EvWakeupReleasePartition,
EvPartitionScaleStatusChanged,
EvPartitionScaleRequestDone,
EvBalanceConsumer,
EvEnd
};

Expand Down Expand Up @@ -1123,6 +1124,14 @@ struct TEvPQ {
Record.SetScaleStatus(scaleStatus);
}
};

struct TEvBalanceConsumer : TEventLocal<TEvBalanceConsumer, EvBalanceConsumer> {
TEvBalanceConsumer(const TString& consumerName)
: ConsumerName(consumerName)
{}

TString ConsumerName;
};
};

} //NKikimr
Loading

0 comments on commit cca5246

Please sign in to comment.