Skip to content

Commit

Permalink
Merge 0fd6a86 into 1acf95f
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Apr 7, 2024
2 parents 1acf95f + 0fd6a86 commit 5f053a0
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,18 +599,19 @@ void TKafkaReadSessionActor::HandleReleasePartition(TEvPersQueue::TEvReleasePart
}

NeedRebalance = true;
size_t partitionToReleaseIndex = 0;
size_t i = 0;
ui32 partitionToRelease = 0;
ui32 i = 0;

for (size_t partIndex = 0; partIndex < topicPartitionsIt->second.ReadingNow.size(); partIndex++) {
if (!topicPartitionsIt->second.ToRelease.contains(partIndex) && (group == 0 || partIndex + 1 == group)) {
for (auto curPartition : topicPartitionsIt->second.ReadingNow) {
if (!topicPartitionsIt->second.ToRelease.contains(curPartition) && (group == 0 || curPartition + 1 == group)) {
++i;
if (rand() % i == 0) { // will lead to 1/n probability for each of n partitions
partitionToReleaseIndex = partIndex;
if (rand() % i == 0) {
partitionToRelease = curPartition;
}
}
}
topicPartitionsIt->second.ToRelease.emplace(partitionToReleaseIndex);

topicPartitionsIt->second.ToRelease.emplace(partitionToRelease);
}
}

Expand Down

0 comments on commit 5f053a0

Please sign in to comment.