Skip to content

Commit

Permalink
Randomize order of sessions (#8359)
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Aug 28, 2024
1 parent 01f56bd commit e7b25a7
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 22 deletions.
49 changes: 36 additions & 13 deletions ydb/core/persqueue/read_balancer__balancing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@
namespace NKikimr::NPQ::NBalancing {


struct LowLoadSessionComparator {
bool operator()(const TSession* lhs, const TSession* rhs) const;
};

using TLowLoadOrderedSessions = std::set<TSession*, LowLoadSessionComparator>;



//
// TPartition
//
Expand Down Expand Up @@ -868,6 +876,8 @@ void TConsumer::RegisterReadingSession(TSession* session, const TActorContext& c
CreateFamily({partitionId}, ctx);
}
}
} else {
OrderedSessions.reset();
}
}

Expand All @@ -886,6 +896,9 @@ std::vector<TPartitionFamily*> Snapshot(const std::unordered_map<size_t, const s
void TConsumer::UnregisterReadingSession(TSession* session, const TActorContext& ctx) {
auto pipe = session->Pipe;
Sessions.erase(session->Pipe);
if (!session->WithGroups()) {
OrderedSessions.reset();
}

for (auto* family : Snapshot(Families)) {
auto special = family->SpecialSessions.erase(pipe);
Expand Down Expand Up @@ -1156,11 +1169,11 @@ void TConsumer::ScheduleBalance(const TActorContext& ctx) {
ctx.Send(Balancer.TopicActor.SelfId(), new TEvPQ::TEvBalanceConsumer(ConsumerName));
}

TOrderedSessions OrderSessions(
TLowLoadOrderedSessions OrderSessions(
const std::unordered_map<TActorId, TSession*>& values,
std::function<bool (const TSession*)> predicate = [](const TSession*) { return true; }
) {
TOrderedSessions result;
TLowLoadOrderedSessions result;
for (auto& [_, v] : values) {
if (predicate(v)) {
result.insert(v);
Expand Down Expand Up @@ -1244,7 +1257,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
}
}

TOrderedSessions commonSessions = OrderSessions(Sessions, [](auto* session) {
TLowLoadOrderedSessions commonSessions = OrderSessions(Sessions, [](auto* session) {
return !session->WithGroups();
});

Expand All @@ -1253,7 +1266,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
auto families = OrderFamilies(UnreadableFamilies);
for (auto it = families.rbegin(); it != families.rend(); ++it) {
auto* family = *it;
TOrderedSessions specialSessions;
TLowLoadOrderedSessions specialSessions;
auto& sessions = (family->IsCommon()) ? commonSessions : (specialSessions = OrderSessions(family->SpecialSessions));

auto sit = sessions.begin();
Expand Down Expand Up @@ -1297,7 +1310,11 @@ void TConsumer::Balance(const TActorContext& ctx) {
GetPrefix() << "start rebalancing. familyCount=" << familyCount << ", sessionCount=" << commonSessions.size()
<< ", desiredFamilyCount=" << desiredFamilyCount << ", allowPlusOne=" << allowPlusOne);

for (auto it = commonSessions.rbegin(); it != commonSessions.rend(); ++it) {
if (!OrderedSessions) {
OrderedSessions.emplace();
OrderedSessions->insert(commonSessions.begin(), commonSessions.end());
}
for (auto it = OrderedSessions->begin(); it != OrderedSessions->end(); ++it) {
auto* session = *it;
auto targerFamilyCount = desiredFamilyCount + (allowPlusOne ? 1 : 0);
auto families = OrderFamilies(session->Families);
Expand All @@ -1308,7 +1325,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
}
}

if (allowPlusOne && session->ActiveFamilyCount > desiredFamilyCount) {
if (allowPlusOne) {
--allowPlusOne;
}
}
Expand Down Expand Up @@ -1397,7 +1414,8 @@ TSession::TSession(const TActorId& pipe)
, InactivePartitionCount(0)
, ReleasingPartitionCount(0)
, ActiveFamilyCount(0)
, ReleasingFamilyCount(0) {
, ReleasingFamilyCount(0)
, Order(RandomNumber<size_t>()) {
}

bool TSession::WithGroups() const { return !Partitions.empty(); }
Expand Down Expand Up @@ -1850,18 +1868,23 @@ bool TPartitionFamilyComparator::operator()(const TPartitionFamily* lhs, const T
}

bool SessionComparator::operator()(const TSession* lhs, const TSession* rhs) const {
if (lhs->Order != rhs->Order) {
return lhs->Order < rhs->Order;
}
return lhs->SessionName < rhs->SessionName;
}


bool LowLoadSessionComparator::operator()(const TSession* lhs, const TSession* rhs) const {
if (lhs->ActiveFamilyCount != rhs->ActiveFamilyCount) {
return lhs->ActiveFamilyCount < rhs->ActiveFamilyCount;
}
if (lhs->ActivePartitionCount != rhs->ActivePartitionCount) {
return lhs->ActivePartitionCount < rhs->ActivePartitionCount;
}
if (lhs->InactivePartitionCount != rhs->InactivePartitionCount) {
return lhs->InactivePartitionCount < rhs->InactivePartitionCount;
}
if (lhs->Partitions.size() != rhs->Partitions.size()) {
return lhs->Partitions.size() < rhs->Partitions.size();
}
if (lhs->Order != rhs->Order) {
return lhs->Order < rhs->Order;
}
return lhs->SessionName < rhs->SessionName;
}

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/read_balancer__balancing.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ struct TConsumer {
std::unordered_map<ui32, TPartitionFamily*> PartitionMapping;
// All reading sessions in which the family is currently being read.
std::unordered_map<TActorId, TSession*> Sessions;
std::optional<TOrderedSessions> OrderedSessions;

// Families is not reading now.
std::unordered_map<size_t, TPartitionFamily*> UnreadableFamilies;
Expand Down Expand Up @@ -276,6 +277,8 @@ struct TSession {
// The partition families that are being read by this session.
std::unordered_map<size_t, TPartitionFamily*> Families;

size_t Order;

// true if client connected to read from concret partitions
bool WithGroups() const;

Expand Down
7 changes: 4 additions & 3 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ void TPQReadService::TSession::SendEvent(IEventBase* ev) {
void TPQReadService::TSession::CreateActor(std::unique_ptr<NPersQueue::TTopicsListController>&& topicsHandler) {
auto classifier = Proxy->GetClassifier();

auto g(Guard(Lock));
auto* actor = new TReadSessionActor(this, *topicsHandler, Cookie, SchemeCache, NewSchemeCache, Counters,
classifier ? classifier->ClassifyAddress(GetPeerName()) : "unknown");
ui32 poolId = Proxy->ActorSystem->AppData<::NKikimr::TAppData>()->UserPoolId;
Expand Down Expand Up @@ -204,10 +205,10 @@ ui64 TPQReadService::NextCookie() {

void TPQReadService::ReleaseSession(ui64 cookie) {
auto g(Guard(Lock));
bool erased = Sessions.erase(cookie);
if (erased)
if (Sessions.erase(cookie)) {
g.Release();
ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0,0,-1,0));

}
}

void TPQReadService::CheckClusterChange(const TString& localCluster, const bool) {
Expand Down
12 changes: 6 additions & 6 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ void TPQWriteServiceImpl::TSession::SendEvent(IEventBase* ev) {
std::unique_ptr<IEventBase> e;
e.reset(ev);

TGuard<TSpinLock> lock(Lock);
auto lock(Guard(Lock));
if (ActorId) {
lock.Release();
Proxy->ActorSystem->Send(ActorId, e.release());
}
}
Expand Down Expand Up @@ -161,11 +162,10 @@ ui64 TPQWriteServiceImpl::NextCookie() {


void TPQWriteServiceImpl::ReleaseSession(TSessionRef session) {
with_lock (Lock) {
bool erased = Sessions.erase(session->GetCookie());
if (erased) {
ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0, 0, -1, 0));
}
auto lock(Guard(Lock));
if (Sessions.erase(session->GetCookie())) {
lock.Release();
ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0, 0, -1, 0));
}
}

Expand Down

0 comments on commit e7b25a7

Please sign in to comment.