From 540503a95fe242dadfd1ebd53c405a212c584769 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Tue, 27 Aug 2024 22:33:36 +0500 Subject: [PATCH 1/2] Randomize order of sessions (#8329) --- .../persqueue/read_balancer__balancing.cpp | 36 +++++++++++++++---- ydb/core/persqueue/read_balancer__balancing.h | 2 ++ 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/ydb/core/persqueue/read_balancer__balancing.cpp b/ydb/core/persqueue/read_balancer__balancing.cpp index 3cb751e1802b..1bb53330efec 100644 --- a/ydb/core/persqueue/read_balancer__balancing.cpp +++ b/ydb/core/persqueue/read_balancer__balancing.cpp @@ -6,6 +6,14 @@ namespace NKikimr::NPQ::NBalancing { +struct LowLoadSessionComparator { + bool operator()(const TSession* lhs, const TSession* rhs) const; +}; + +using TLowLoadOrderedSessions = std::set; + + + // // TPartition // @@ -1156,11 +1164,11 @@ void TConsumer::ScheduleBalance(const TActorContext& ctx) { ctx.Send(Balancer.TopicActor.SelfId(), new TEvPQ::TEvBalanceConsumer(ConsumerName)); } -TOrderedSessions OrderSessions( +TLowLoadOrderedSessions OrderSessions( const std::unordered_map& values, std::function predicate = [](const TSession*) { return true; } ) { - TOrderedSessions result; + TLowLoadOrderedSessions result; for (auto& [_, v] : values) { if (predicate(v)) { result.insert(v); @@ -1244,7 +1252,7 @@ void TConsumer::Balance(const TActorContext& ctx) { } } - TOrderedSessions commonSessions = OrderSessions(Sessions, [](auto* session) { + TLowLoadOrderedSessions commonSessions = OrderSessions(Sessions, [](auto* session) { return !session->WithGroups(); }); @@ -1253,7 +1261,7 @@ void TConsumer::Balance(const TActorContext& ctx) { auto families = OrderFamilies(UnreadableFamilies); for (auto it = families.rbegin(); it != families.rend(); ++it) { auto* family = *it; - TOrderedSessions specialSessions; + TLowLoadOrderedSessions specialSessions; auto& sessions = (family->IsCommon()) ? commonSessions : (specialSessions = OrderSessions(family->SpecialSessions)); auto sit = sessions.begin(); @@ -1297,7 +1305,9 @@ void TConsumer::Balance(const TActorContext& ctx) { GetPrefix() << "start rebalancing. familyCount=" << familyCount << ", sessionCount=" << commonSessions.size() << ", desiredFamilyCount=" << desiredFamilyCount << ", allowPlusOne=" << allowPlusOne); - for (auto it = commonSessions.rbegin(); it != commonSessions.rend(); ++it) { + TOrderedSessions orderedSession; + orderedSession.insert(commonSessions.begin(), commonSessions.end()); + for (auto it = orderedSession.begin(); it != orderedSession.end(); ++it) { auto* session = *it; auto targerFamilyCount = desiredFamilyCount + (allowPlusOne ? 1 : 0); auto families = OrderFamilies(session->Families); @@ -1308,7 +1318,7 @@ void TConsumer::Balance(const TActorContext& ctx) { } } - if (allowPlusOne && session->ActiveFamilyCount > desiredFamilyCount) { + if (allowPlusOne) { --allowPlusOne; } } @@ -1397,7 +1407,8 @@ TSession::TSession(const TActorId& pipe) , InactivePartitionCount(0) , ReleasingPartitionCount(0) , ActiveFamilyCount(0) - , ReleasingFamilyCount(0) { + , ReleasingFamilyCount(0) + , Order(RandomNumber()) { } bool TSession::WithGroups() const { return !Partitions.empty(); } @@ -1850,6 +1861,14 @@ bool TPartitionFamilyComparator::operator()(const TPartitionFamily* lhs, const T } bool SessionComparator::operator()(const TSession* lhs, const TSession* rhs) const { + if (lhs->Order != rhs->Order) { + return lhs->Order < rhs->Order; + } + return lhs->SessionName < rhs->SessionName; +} + + +bool LowLoadSessionComparator::operator()(const TSession* lhs, const TSession* rhs) const { if (lhs->ActiveFamilyCount != rhs->ActiveFamilyCount) { return lhs->ActiveFamilyCount < rhs->ActiveFamilyCount; } @@ -1862,6 +1881,9 @@ bool SessionComparator::operator()(const TSession* lhs, const TSession* rhs) con if (lhs->Partitions.size() != rhs->Partitions.size()) { return lhs->Partitions.size() < rhs->Partitions.size(); } + if (lhs->Order != rhs->Order) { + return lhs->Order < rhs->Order; + } return lhs->SessionName < rhs->SessionName; } diff --git a/ydb/core/persqueue/read_balancer__balancing.h b/ydb/core/persqueue/read_balancer__balancing.h index 6834524e2478..6f98329f3350 100644 --- a/ydb/core/persqueue/read_balancer__balancing.h +++ b/ydb/core/persqueue/read_balancer__balancing.h @@ -276,6 +276,8 @@ struct TSession { // The partition families that are being read by this session. std::unordered_map Families; + size_t Order; + // true if client connected to read from concret partitions bool WithGroups() const; From 9a04a12c734bebbd1d9b368b8c65e3523f282134 Mon Sep 17 00:00:00 2001 From: Nikolay Shestakov Date: Tue, 27 Aug 2024 18:37:49 +0000 Subject: [PATCH 2/2] optimizations --- .../persqueue/read_balancer__balancing.cpp | 19 ++++++++++--------- ydb/core/persqueue/read_balancer__balancing.h | 1 + .../deprecated/persqueue_v0/grpc_pq_read.cpp | 7 ++++--- .../deprecated/persqueue_v0/grpc_pq_write.cpp | 12 ++++++------ 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/ydb/core/persqueue/read_balancer__balancing.cpp b/ydb/core/persqueue/read_balancer__balancing.cpp index 1bb53330efec..f44c46cc3bad 100644 --- a/ydb/core/persqueue/read_balancer__balancing.cpp +++ b/ydb/core/persqueue/read_balancer__balancing.cpp @@ -876,6 +876,8 @@ void TConsumer::RegisterReadingSession(TSession* session, const TActorContext& c CreateFamily({partitionId}, ctx); } } + } else { + OrderedSessions.reset(); } } @@ -894,6 +896,9 @@ std::vector Snapshot(const std::unordered_mapPipe; Sessions.erase(session->Pipe); + if (!session->WithGroups()) { + OrderedSessions.reset(); + } for (auto* family : Snapshot(Families)) { auto special = family->SpecialSessions.erase(pipe); @@ -1305,9 +1310,11 @@ void TConsumer::Balance(const TActorContext& ctx) { GetPrefix() << "start rebalancing. familyCount=" << familyCount << ", sessionCount=" << commonSessions.size() << ", desiredFamilyCount=" << desiredFamilyCount << ", allowPlusOne=" << allowPlusOne); - TOrderedSessions orderedSession; - orderedSession.insert(commonSessions.begin(), commonSessions.end()); - for (auto it = orderedSession.begin(); it != orderedSession.end(); ++it) { + if (!OrderedSessions) { + OrderedSessions.emplace(); + OrderedSessions->insert(commonSessions.begin(), commonSessions.end()); + } + for (auto it = OrderedSessions->begin(); it != OrderedSessions->end(); ++it) { auto* session = *it; auto targerFamilyCount = desiredFamilyCount + (allowPlusOne ? 1 : 0); auto families = OrderFamilies(session->Families); @@ -1872,12 +1879,6 @@ bool LowLoadSessionComparator::operator()(const TSession* lhs, const TSession* r if (lhs->ActiveFamilyCount != rhs->ActiveFamilyCount) { return lhs->ActiveFamilyCount < rhs->ActiveFamilyCount; } - if (lhs->ActivePartitionCount != rhs->ActivePartitionCount) { - return lhs->ActivePartitionCount < rhs->ActivePartitionCount; - } - if (lhs->InactivePartitionCount != rhs->InactivePartitionCount) { - return lhs->InactivePartitionCount < rhs->InactivePartitionCount; - } if (lhs->Partitions.size() != rhs->Partitions.size()) { return lhs->Partitions.size() < rhs->Partitions.size(); } diff --git a/ydb/core/persqueue/read_balancer__balancing.h b/ydb/core/persqueue/read_balancer__balancing.h index 6f98329f3350..0b2b7d6576c7 100644 --- a/ydb/core/persqueue/read_balancer__balancing.h +++ b/ydb/core/persqueue/read_balancer__balancing.h @@ -185,6 +185,7 @@ struct TConsumer { std::unordered_map PartitionMapping; // All reading sessions in which the family is currently being read. std::unordered_map Sessions; + std::optional OrderedSessions; // Families is not reading now. std::unordered_map UnreadableFamilies; diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_read.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_read.cpp index a8ca115db4d0..39e3457d3e07 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_read.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_read.cpp @@ -154,6 +154,7 @@ void TPQReadService::TSession::SendEvent(IEventBase* ev) { void TPQReadService::TSession::CreateActor(std::unique_ptr&& topicsHandler) { auto classifier = Proxy->GetClassifier(); + auto g(Guard(Lock)); ActorId = Proxy->ActorSystem->Register( new TReadSessionActor(this, *topicsHandler, Cookie, SchemeCache, NewSchemeCache, Counters, classifier ? classifier->ClassifyAddress(GetPeerName()) @@ -204,10 +205,10 @@ ui64 TPQReadService::NextCookie() { void TPQReadService::ReleaseSession(ui64 cookie) { auto g(Guard(Lock)); - bool erased = Sessions.erase(cookie); - if (erased) + if (Sessions.erase(cookie)) { + g.Release(); ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0,0,-1,0)); - + } } void TPQReadService::CheckClusterChange(const TString& localCluster, const bool) { diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp index 3d5eb3b33a48..ef7736b6930f 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp @@ -125,8 +125,9 @@ void TPQWriteServiceImpl::TSession::SendEvent(IEventBase* ev) { std::unique_ptr e; e.reset(ev); - TGuard lock(Lock); + auto lock(Guard(Lock)); if (ActorId) { + lock.Release(); Proxy->ActorSystem->Send(ActorId, e.release()); } } @@ -162,11 +163,10 @@ ui64 TPQWriteServiceImpl::NextCookie() { void TPQWriteServiceImpl::ReleaseSession(TSessionRef session) { - with_lock (Lock) { - bool erased = Sessions.erase(session->GetCookie()); - if (erased) { - ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0, 0, -1, 0)); - } + auto lock(Guard(Lock)); + if (Sessions.erase(session->GetCookie())) { + lock.Release(); + ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0, 0, -1, 0)); } }