Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reading from the topic with guarantees after autoscaling in the new SDK #4043

Merged
merged 49 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
220dc89
Rename TPipeInfo to TReadingSession
nshestakov Apr 15, 2024
6addc83
intermediate
nshestakov Apr 16, 2024
49cc258
intermediate
nshestakov Apr 17, 2024
a264f45
Merge branch 'main' into AutoscalingRead7
nshestakov Apr 17, 2024
12a7204
intermediate
nshestakov Apr 17, 2024
28fa828
intermediate (compiled)
nshestakov Apr 17, 2024
b141dd7
intermediate
nshestakov Apr 17, 2024
0f39b4a
intermediate
nshestakov Apr 17, 2024
37a0d1c
intermediate
nshestakov Apr 17, 2024
dbf851c
intermediate
nshestakov Apr 18, 2024
9afdc2e
intermediate
nshestakov Apr 18, 2024
ac0b0f3
intermediate
nshestakov Apr 18, 2024
b87ac53
Merge branch 'main' into AutoscalingRead7
nshestakov Apr 18, 2024
60f9a15
intermediate compiled
nshestakov Apr 18, 2024
44adbd1
intermediate
nshestakov Apr 18, 2024
b6a9872
intermediate
nshestakov Apr 19, 2024
d8a13ad
intermediate. autoscaling test passed
nshestakov Apr 19, 2024
717a0f9
intermediate
nshestakov Apr 22, 2024
ddc7bf9
Merge branch 'main' into AutoscalingRead7
nshestakov Apr 22, 2024
3e0d1c8
fix
nshestakov Apr 22, 2024
256d793
fix
nshestakov Apr 22, 2024
f190ba3
fix
nshestakov Apr 22, 2024
fc46b17
lastpipe
nshestakov Apr 22, 2024
35431a3
fix step
nshestakov Apr 23, 2024
7c537d0
add test
nshestakov Apr 23, 2024
0beff98
remove cerr
nshestakov Apr 23, 2024
329a3ba
families break up
nshestakov Apr 23, 2024
a647a62
fix
nshestakov Apr 23, 2024
48cdf89
small fixes
nshestakov Apr 24, 2024
4bf8f60
Merge branch 'main' into AutoscalingRead7
nshestakov Apr 24, 2024
5bc5a56
merge families
nshestakov Apr 24, 2024
6f29efc
merge families
nshestakov Apr 24, 2024
46dedd3
merge families
nshestakov Apr 24, 2024
44bd48b
fix
nshestakov Apr 25, 2024
db908f8
rename Session to SessionName
nshestakov Apr 25, 2024
4fee7be
batching for rebalance
nshestakov Apr 25, 2024
be1f58a
decrease log level
nshestakov Apr 25, 2024
74e4914
fix error
nshestakov Apr 25, 2024
8bc0630
remove reordering
nshestakov Apr 25, 2024
335c978
disable test
nshestakov Apr 25, 2024
2a20d22
improove TODO
nshestakov Apr 25, 2024
7490cc9
Merge branch 'main' into AutoscalingRead7
nshestakov Apr 25, 2024
4663e31
revert unnecessery changes
nshestakov Apr 26, 2024
d259667
codestyle
nshestakov Apr 26, 2024
1ebcb7b
Merge branch 'main' into AutoscalingRead7
nshestakov May 3, 2024
2afa0e6
fix order
nshestakov May 3, 2024
58370fe
IsCommon
nshestakov May 3, 2024
37f0ebf
fix
nshestakov May 6, 2024
72c5964
fix BalanceScenario test
nshestakov May 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 89 additions & 47 deletions ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr
if (!joinGroupRequest->GroupId.has_value() || (GroupId != "" && joinGroupRequest->GroupId.value() != GroupId)) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_GROUP_ID, TStringBuilder() << "invalid groupId# " << joinGroupRequest->GroupId.value_or(""));
CloseReadSession(ctx);
return;
return;
}

switch (ReadStep) {
case WAIT_JOIN_GROUP: { // join first time
if (joinGroupRequest->ProtocolType.has_value() && !joinGroupRequest->ProtocolType.value().empty() && joinGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
return;
}

auto supportedProtocolFound = TryFillTopicsToRead(joinGroupRequest, TopicsToReadNames);
Expand All @@ -103,13 +103,13 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr
if (!supportedProtocolFound) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INCONSISTENT_GROUP_PROTOCOL, TStringBuilder() << "unsupported assign protocol. Must be " << SUPPORTED_ASSIGN_STRATEGY);
CloseReadSession(ctx);
return;
return;
}

if (TopicsToReadNames.size() == 0) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, "empty topics to read list");
CloseReadSession(ctx);
return;
return;
}

JoinGroupCorellationId = ev->Get()->CorrelationId;
Expand All @@ -123,7 +123,7 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr
CloseReadSession(ctx);
return;
}
ReadStep = WAIT_SYNC_GROUP;
ReadStep = WAIT_SYNC_GROUP;
SendJoinGroupResponseOk(ctx, ev->Get()->CorrelationId);
break;
}
Expand Down Expand Up @@ -158,7 +158,7 @@ void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr
if (syncGroupRequest->ProtocolType.has_value() && !syncGroupRequest->ProtocolType.value().empty() && syncGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
return;
}

if (syncGroupRequest->GroupId != GroupId) {
Expand All @@ -171,7 +171,7 @@ void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr
SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, ILLEGAL_GENERATION, TStringBuilder() << "illegal generationId# " << syncGroupRequest->GenerationId << ", must be " << GenerationId);
return;
}

ReadStep = READING;
SendSyncGroupResponseOk(ctx, ev->Get()->CorrelationId);
NeedRebalance = false;
Expand All @@ -197,14 +197,14 @@ void TKafkaReadSessionActor::HandleLeaveGroup(TEvKafka::TEvLeaveGroupRequest::TP
CloseReadSession(ctx);
return;
}

SendLeaveGroupResponseOk(ctx, ev->Get()->CorrelationId);
CloseReadSession(ctx);
}

void TKafkaReadSessionActor::HandleHeartbeat(TEvKafka::TEvHeartbeatRequest::TPtr ev, const TActorContext& ctx) {
auto heartbeatRequest = ev->Get()->Request;

if (NextRequestError.Code != NONE_ERROR) {
SendHeartbeatResponseFail(ctx, ev->Get()->CorrelationId, NextRequestError.Code, NextRequestError.Message);
CloseReadSession(ctx);
Expand All @@ -224,7 +224,7 @@ void TKafkaReadSessionActor::HandleHeartbeat(TEvKafka::TEvHeartbeatRequest::TPtr
}

LastHeartbeatTime = TInstant::Now();
EKafkaErrors error = NeedRebalance || GenerationId != heartbeatRequest->GenerationId ? EKafkaErrors::REBALANCE_IN_PROGRESS : EKafkaErrors::NONE_ERROR; // if REBALANCE_IN_PROGRESS, client rejoin
EKafkaErrors error = NeedRebalance || GenerationId != heartbeatRequest->GenerationId ? EKafkaErrors::REBALANCE_IN_PROGRESS : EKafkaErrors::NONE_ERROR; // if REBALANCE_IN_PROGRESS, client rejoin
SendHeartbeatResponseOk(ctx, ev->Get()->CorrelationId, error);
}

Expand Down Expand Up @@ -408,7 +408,7 @@ void TKafkaReadSessionActor::HandlePipeDestroyed(TEvTabletPipe::TEvClientDestroy

void TKafkaReadSessionActor::ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx) {
NewPartitionsToLockOnTime.clear();

for (auto& [topicName, topicInfo] : TopicsInfo) {
if (topicInfo.TabletID == tabletId) {
auto partitionsIt = TopicPartitions.find(topicName);
Expand Down Expand Up @@ -439,13 +439,13 @@ void TKafkaReadSessionActor::AuthAndFindBalancers(const TActorContext& ctx) {
auto topicHandler = std::make_unique<NPersQueue::TTopicsListController>(
topicConverterFactory
);

TopicsToConverter = topicHandler->GetReadTopicsList(TopicsToReadNames, false, Context->DatabasePath);
if (!TopicsToConverter.IsValid) {
SendJoinGroupResponseFail(ctx, JoinGroupCorellationId, INVALID_REQUEST, TStringBuilder() << "topicsToConverter is not valid");
return;
}

ctx.Register(new NGRpcProxy::V1::TReadInitAndAuthActor(
ctx, ctx.SelfID, GroupId, Cookie, Session, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), MakeSchemeCacheID(), nullptr, Context->UserToken, TopicsToConverter,
topicHandler->GetLocalCluster(), false));
Expand Down Expand Up @@ -473,12 +473,12 @@ void TKafkaReadSessionActor::HandleAuthOk(NGRpcProxy::V1::TEvPQProxy::TEvAuthRes
}

Send(Context->ConnectionId, new TEvKafka::TEvReadSessionInfo(GroupId));

for (auto& [topicName, topicInfo] : TopicsInfo) {
topicInfo.PipeClient = CreatePipeClient(topicInfo.TabletID, ctx);
RegisterBalancerSession(topicInfo.FullConverter->GetInternalName(), topicInfo.PipeClient, topicInfo.Groups, ctx);
}

if (JoinGroupCorellationId != 0) {
SendJoinGroupResponseOk(ctx, JoinGroupCorellationId);
JoinGroupCorellationId = 0;
Expand All @@ -491,7 +491,7 @@ void TKafkaReadSessionActor::HandleAuthCloseSession(NGRpcProxy::V1::TEvPQProxy::
SendJoinGroupResponseFail(ctx, JoinGroupCorellationId, ConvertErrorCode(ev->Get()->ErrorCode), TStringBuilder() << "auth failed. " << ev->Get()->Reason);
JoinGroupCorellationId = 0;
}

CloseReadSession(ctx);
}

Expand Down Expand Up @@ -522,7 +522,7 @@ void TKafkaReadSessionActor::RegisterBalancerSession(const TString& topic, const
void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->Record;
KAFKA_LOG_D("partition lock is coming from PQRB topic# " << record.GetTopic() << ", partition# " << record.GetPartition());

Y_ABORT_UNLESS(record.GetSession() == Session);
Y_ABORT_UNLESS(record.GetClientId() == GroupId);
Y_ABORT_UNLESS(record.GetGeneration() > 0);
Expand All @@ -534,8 +534,8 @@ void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition:

auto converterIter = FullPathToConverter.find(NPersQueue::NormalizeFullPath(path));
if (converterIter == FullPathToConverter.end()) {
KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic()
<< ", partition# " << record.GetPartition()
KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic()
<< ", partition# " << record.GetPartition()
<< ", reason# path not recognized");
return;
}
Expand All @@ -544,8 +544,8 @@ void TKafkaReadSessionActor::HandleLockPartition(TEvPersQueue::TEvLockPartition:

auto topicInfoIt = TopicsInfo.find(topicName);
if (topicInfoIt == TopicsInfo.end() || (topicInfoIt->second.PipeClient != ActorIdFromProto(record.GetPipeClient()))) {
KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic()
<< ", partition# " << record.GetPartition()
KAFKA_LOG_I("ignored ev lock topic# " << record.GetTopic()
<< ", partition# " << record.GetPartition()
<< ", reason# topic is unknown");
return;
}
Expand All @@ -571,7 +571,7 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
Y_ABORT_UNLESS(topicInfoIt != TopicsInfo.end());

if (topicInfoIt->second.PipeClient != ActorIdFromProto(record.GetPipeClient())) {
KAFKA_LOG_I("ignored ev release topic# " << record.GetTopic()
KAFKA_LOG_I("ignored ev release topic# " << record.GetTopic()
<< ", reason# topic is unknown");
return;
}
Expand All @@ -582,54 +582,96 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
auto topicPartitionsIt = TopicPartitions.find(pathIt->second->GetInternalName());
Y_ABORT_UNLESS(record.GetCount() <= (topicPartitionsIt.IsEnd() ? 0 : topicPartitionsIt->second.ToLock.size() + topicPartitionsIt->second.ReadingNow.size()) + newPartitionsToLockCount);

for (ui32 c = 0; c < record.GetCount(); ++c) {
// if some partition not locked yet, then release it without rebalance
if (newPartitionsToLockCount > 0) {
newPartitionsToLockCount--;
InformBalancerAboutPartitionRelease(topicInfoIt->first, newPartitionsToLockIt->second.back().PartitionId, ctx);
newPartitionsToLockIt->second.pop_back();
continue;
}
if (!group) {
for (ui32 c = 0; c < record.GetCount(); ++c) {
// if some partition not locked yet, then release it without rebalance
if (newPartitionsToLockCount > 0) {
newPartitionsToLockCount--;
InformBalancerAboutPartitionRelease(topicInfoIt->first, newPartitionsToLockIt->second.back().PartitionId, ctx);
newPartitionsToLockIt->second.pop_back();
continue;
}

if (!topicPartitionsIt->second.ToLock.empty()) {
auto partitionToReleaseIt = topicPartitionsIt->second.ToLock.begin();
topicPartitionsIt->second.ToLock.erase(partitionToReleaseIt);
InformBalancerAboutPartitionRelease(topicInfoIt->first, *partitionToReleaseIt, ctx);
continue;
}

NeedRebalance = true;
ui32 partitionToRelease = 0;
ui32 i = 0;

for (auto curPartition : topicPartitionsIt->second.ReadingNow) {
if (!topicPartitionsIt->second.ToRelease.contains(curPartition)) {
++i;
if (rand() % i == 0) {
partitionToRelease = curPartition;
}
}
}

if (!topicPartitionsIt->second.ToLock.empty()) {
auto partitionToReleaseIt = topicPartitionsIt->second.ToLock.begin();
topicPartitionsIt->second.ToLock.erase(partitionToReleaseIt);
InformBalancerAboutPartitionRelease(topicInfoIt->first, *partitionToReleaseIt, ctx);
continue;
topicPartitionsIt->second.ToRelease.emplace(partitionToRelease);
}
} else {
auto partitionToRelease = record.GetGroup() - 1;

if (newPartitionsToLockIt != NewPartitionsToLockOnTime.end()) {
auto& newPartitions = newPartitionsToLockIt->second;
for (auto& newPartition : newPartitions) {
if (newPartition.PartitionId == partitionToRelease) {
InformBalancerAboutPartitionRelease(topicInfoIt->first, partitionToRelease, ctx);

NeedRebalance = true;
ui32 partitionToRelease = 0;
ui32 i = 0;
auto tmp = std::move(newPartitions);
newPartitions.reserve(tmp.size() - 1);

for (auto curPartition : topicPartitionsIt->second.ReadingNow) {
if (!topicPartitionsIt->second.ToRelease.contains(curPartition) && (group == 0 || curPartition + 1 == group)) {
++i;
if (rand() % i == 0) {
partitionToRelease = curPartition;
for (auto& t : tmp) {
if (t.PartitionId != partitionToRelease) {
newPartitions.push_back(t);
}
}

return;
}
}
}

topicPartitionsIt->second.ToRelease.emplace(partitionToRelease);
if (topicPartitionsIt != TopicPartitions.end()) {
if (topicPartitionsIt->second.ToLock.contains(partitionToRelease)) {
InformBalancerAboutPartitionRelease(topicInfoIt->first, partitionToRelease, ctx);
topicPartitionsIt->second.ToLock.erase(partitionToRelease);
return;
}

if (topicPartitionsIt->second.ReadingNow.contains(partitionToRelease) && !topicPartitionsIt->second.ToRelease.contains(partitionToRelease)) {
InformBalancerAboutPartitionRelease(topicInfoIt->first, partitionToRelease, ctx);
NeedRebalance = true;
topicPartitionsIt->second.ReadingNow.erase(partitionToRelease);
return;
}
}

KAFKA_LOG_I("ignored ev release topic# " << record.GetTopic()
<< ", reason# partition " << partitionToRelease << " isn`t locked");
}
}

void TKafkaReadSessionActor::InformBalancerAboutPartitionRelease(const TString& topic, ui64 partition, const TActorContext& ctx) {
KAFKA_LOG_I("released topic# " << topic
<< ", partition# " << partition);
<< ", partition# " << partition);
auto request = MakeHolder<TEvPersQueue::TEvPartitionReleased>();

auto topicIt = TopicsInfo.find(topic);
Y_ABORT_UNLESS(topicIt != TopicsInfo.end());

auto& req = request->Record;
req.SetSession(Session);
ActorIdToProto(topicIt->second.PipeClient, req.MutablePipeClient());
req.SetClientId(GroupId);
req.SetTopic(topicIt->second.FullConverter->GetPrimaryPath());
req.SetPartition(partition);

NTabletPipe::SendData(ctx, topicIt->second.PipeClient, request.Release());
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kafka_proxy/ut/ya.make
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
UNITTEST_FOR(ydb/core/kafka_proxy)

SIZE(medium)
TIMEOUT(600)

SRCS(
ut_kafka_functions.cpp
Expand Down
12 changes: 1 addition & 11 deletions ydb/core/persqueue/events/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ struct TEvPersQueue {
EvDescribeResponse,
EvGetReadSessionsInfo,
EvReadSessionsInfoResponse,
EvWakeupClient,
EvWakeupClient, // deprecated
EvUpdateACL,
EvCheckACL,
EvCheckACLResponse,
Expand Down Expand Up @@ -198,16 +198,6 @@ struct TEvPersQueue {
TEvPartitionClientInfoResponse() = default;
};

struct TEvWakeupClient : TEventLocal<TEvWakeupClient, EvWakeupClient> {
TEvWakeupClient(const TString& client, const ui32 group)
: Client(client)
, Group(group)
{}

TString Client;
ui32 Group;
};

struct TEvDescribe : public TEventPB<TEvDescribe, NKikimrPQ::TDescribe, EvDescribe> {
TEvDescribe()
{}
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ struct TEvPQ {
EvWakeupReleasePartition,
EvPartitionScaleStatusChanged,
EvPartitionScaleRequestDone,
EvBalanceConsumer,
EvEnd
};

Expand Down Expand Up @@ -1123,6 +1124,14 @@ struct TEvPQ {
Record.SetScaleStatus(scaleStatus);
}
};

struct TEvBalanceConsumer : TEventLocal<TEvBalanceConsumer, EvBalanceConsumer> {
TEvBalanceConsumer(const TString& consumerName)
: ConsumerName(consumerName)
{}

TString ConsumerName;
};
};

} //NKikimr
Loading
Loading