Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Randomize order of sessions #8359

Merged
merged 3 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions ydb/core/persqueue/read_balancer__balancing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@
namespace NKikimr::NPQ::NBalancing {


struct LowLoadSessionComparator {
bool operator()(const TSession* lhs, const TSession* rhs) const;
};

using TLowLoadOrderedSessions = std::set<TSession*, LowLoadSessionComparator>;



//
// TPartition
//
Expand Down Expand Up @@ -868,6 +876,8 @@ void TConsumer::RegisterReadingSession(TSession* session, const TActorContext& c
CreateFamily({partitionId}, ctx);
}
}
} else {
OrderedSessions.reset();
}
}

Expand All @@ -886,6 +896,9 @@ std::vector<TPartitionFamily*> Snapshot(const std::unordered_map<size_t, const s
void TConsumer::UnregisterReadingSession(TSession* session, const TActorContext& ctx) {
auto pipe = session->Pipe;
Sessions.erase(session->Pipe);
if (!session->WithGroups()) {
OrderedSessions.reset();
}

for (auto* family : Snapshot(Families)) {
auto special = family->SpecialSessions.erase(pipe);
Expand Down Expand Up @@ -1156,11 +1169,11 @@ void TConsumer::ScheduleBalance(const TActorContext& ctx) {
ctx.Send(Balancer.TopicActor.SelfId(), new TEvPQ::TEvBalanceConsumer(ConsumerName));
}

TOrderedSessions OrderSessions(
TLowLoadOrderedSessions OrderSessions(
const std::unordered_map<TActorId, TSession*>& values,
std::function<bool (const TSession*)> predicate = [](const TSession*) { return true; }
) {
TOrderedSessions result;
TLowLoadOrderedSessions result;
for (auto& [_, v] : values) {
if (predicate(v)) {
result.insert(v);
Expand Down Expand Up @@ -1244,7 +1257,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
}
}

TOrderedSessions commonSessions = OrderSessions(Sessions, [](auto* session) {
TLowLoadOrderedSessions commonSessions = OrderSessions(Sessions, [](auto* session) {
return !session->WithGroups();
});

Expand All @@ -1253,7 +1266,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
auto families = OrderFamilies(UnreadableFamilies);
for (auto it = families.rbegin(); it != families.rend(); ++it) {
auto* family = *it;
TOrderedSessions specialSessions;
TLowLoadOrderedSessions specialSessions;
auto& sessions = (family->IsCommon()) ? commonSessions : (specialSessions = OrderSessions(family->SpecialSessions));

auto sit = sessions.begin();
Expand Down Expand Up @@ -1297,7 +1310,11 @@ void TConsumer::Balance(const TActorContext& ctx) {
GetPrefix() << "start rebalancing. familyCount=" << familyCount << ", sessionCount=" << commonSessions.size()
<< ", desiredFamilyCount=" << desiredFamilyCount << ", allowPlusOne=" << allowPlusOne);

for (auto it = commonSessions.rbegin(); it != commonSessions.rend(); ++it) {
if (!OrderedSessions) {
OrderedSessions.emplace();
OrderedSessions->insert(commonSessions.begin(), commonSessions.end());
}
for (auto it = OrderedSessions->begin(); it != OrderedSessions->end(); ++it) {
auto* session = *it;
auto targerFamilyCount = desiredFamilyCount + (allowPlusOne ? 1 : 0);
auto families = OrderFamilies(session->Families);
Expand All @@ -1308,7 +1325,7 @@ void TConsumer::Balance(const TActorContext& ctx) {
}
}

if (allowPlusOne && session->ActiveFamilyCount > desiredFamilyCount) {
if (allowPlusOne) {
--allowPlusOne;
}
}
Expand Down Expand Up @@ -1397,7 +1414,8 @@ TSession::TSession(const TActorId& pipe)
, InactivePartitionCount(0)
, ReleasingPartitionCount(0)
, ActiveFamilyCount(0)
, ReleasingFamilyCount(0) {
, ReleasingFamilyCount(0)
, Order(RandomNumber<size_t>()) {
}

bool TSession::WithGroups() const { return !Partitions.empty(); }
Expand Down Expand Up @@ -1850,18 +1868,23 @@ bool TPartitionFamilyComparator::operator()(const TPartitionFamily* lhs, const T
}

bool SessionComparator::operator()(const TSession* lhs, const TSession* rhs) const {
if (lhs->Order != rhs->Order) {
return lhs->Order < rhs->Order;
}
return lhs->SessionName < rhs->SessionName;
}


bool LowLoadSessionComparator::operator()(const TSession* lhs, const TSession* rhs) const {
if (lhs->ActiveFamilyCount != rhs->ActiveFamilyCount) {
return lhs->ActiveFamilyCount < rhs->ActiveFamilyCount;
}
if (lhs->ActivePartitionCount != rhs->ActivePartitionCount) {
return lhs->ActivePartitionCount < rhs->ActivePartitionCount;
}
if (lhs->InactivePartitionCount != rhs->InactivePartitionCount) {
return lhs->InactivePartitionCount < rhs->InactivePartitionCount;
}
if (lhs->Partitions.size() != rhs->Partitions.size()) {
return lhs->Partitions.size() < rhs->Partitions.size();
}
if (lhs->Order != rhs->Order) {
return lhs->Order < rhs->Order;
}
return lhs->SessionName < rhs->SessionName;
}

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/read_balancer__balancing.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ struct TConsumer {
std::unordered_map<ui32, TPartitionFamily*> PartitionMapping;
// All reading sessions in which the family is currently being read.
std::unordered_map<TActorId, TSession*> Sessions;
std::optional<TOrderedSessions> OrderedSessions;

// Families is not reading now.
std::unordered_map<size_t, TPartitionFamily*> UnreadableFamilies;
Expand Down Expand Up @@ -276,6 +277,8 @@ struct TSession {
// The partition families that are being read by this session.
std::unordered_map<size_t, TPartitionFamily*> Families;

size_t Order;

// true if client connected to read from concret partitions
bool WithGroups() const;

Expand Down
7 changes: 4 additions & 3 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ void TPQReadService::TSession::SendEvent(IEventBase* ev) {
void TPQReadService::TSession::CreateActor(std::unique_ptr<NPersQueue::TTopicsListController>&& topicsHandler) {
auto classifier = Proxy->GetClassifier();

auto g(Guard(Lock));
auto* actor = new TReadSessionActor(this, *topicsHandler, Cookie, SchemeCache, NewSchemeCache, Counters,
classifier ? classifier->ClassifyAddress(GetPeerName()) : "unknown");
ui32 poolId = Proxy->ActorSystem->AppData<::NKikimr::TAppData>()->UserPoolId;
Expand Down Expand Up @@ -204,10 +205,10 @@ ui64 TPQReadService::NextCookie() {

void TPQReadService::ReleaseSession(ui64 cookie) {
auto g(Guard(Lock));
bool erased = Sessions.erase(cookie);
if (erased)
if (Sessions.erase(cookie)) {
g.Release();
ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0,0,-1,0));

}
}

void TPQReadService::CheckClusterChange(const TString& localCluster, const bool) {
Expand Down
12 changes: 6 additions & 6 deletions ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ void TPQWriteServiceImpl::TSession::SendEvent(IEventBase* ev) {
std::unique_ptr<IEventBase> e;
e.reset(ev);

TGuard<TSpinLock> lock(Lock);
auto lock(Guard(Lock));
if (ActorId) {
lock.Release();
Proxy->ActorSystem->Send(ActorId, e.release());
}
}
Expand Down Expand Up @@ -161,11 +162,10 @@ ui64 TPQWriteServiceImpl::NextCookie() {


void TPQWriteServiceImpl::ReleaseSession(TSessionRef session) {
with_lock (Lock) {
bool erased = Sessions.erase(session->GetCookie());
if (erased) {
ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0, 0, -1, 0));
}
auto lock(Guard(Lock));
if (Sessions.erase(session->GetCookie())) {
lock.Release();
ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0, 0, -1, 0));
}
}

Expand Down
Loading