diff --git a/ydb/core/persqueue/read_balancer__balancing.cpp b/ydb/core/persqueue/read_balancer__balancing.cpp index 3cb751e1802b..f44c46cc3bad 100644 --- a/ydb/core/persqueue/read_balancer__balancing.cpp +++ b/ydb/core/persqueue/read_balancer__balancing.cpp @@ -6,6 +6,14 @@ namespace NKikimr::NPQ::NBalancing { +struct LowLoadSessionComparator { + bool operator()(const TSession* lhs, const TSession* rhs) const; +}; + +using TLowLoadOrderedSessions = std::set; + + + // // TPartition // @@ -868,6 +876,8 @@ void TConsumer::RegisterReadingSession(TSession* session, const TActorContext& c CreateFamily({partitionId}, ctx); } } + } else { + OrderedSessions.reset(); } } @@ -886,6 +896,9 @@ std::vector Snapshot(const std::unordered_mapPipe; Sessions.erase(session->Pipe); + if (!session->WithGroups()) { + OrderedSessions.reset(); + } for (auto* family : Snapshot(Families)) { auto special = family->SpecialSessions.erase(pipe); @@ -1156,11 +1169,11 @@ void TConsumer::ScheduleBalance(const TActorContext& ctx) { ctx.Send(Balancer.TopicActor.SelfId(), new TEvPQ::TEvBalanceConsumer(ConsumerName)); } -TOrderedSessions OrderSessions( +TLowLoadOrderedSessions OrderSessions( const std::unordered_map& values, std::function predicate = [](const TSession*) { return true; } ) { - TOrderedSessions result; + TLowLoadOrderedSessions result; for (auto& [_, v] : values) { if (predicate(v)) { result.insert(v); @@ -1244,7 +1257,7 @@ void TConsumer::Balance(const TActorContext& ctx) { } } - TOrderedSessions commonSessions = OrderSessions(Sessions, [](auto* session) { + TLowLoadOrderedSessions commonSessions = OrderSessions(Sessions, [](auto* session) { return !session->WithGroups(); }); @@ -1253,7 +1266,7 @@ void TConsumer::Balance(const TActorContext& ctx) { auto families = OrderFamilies(UnreadableFamilies); for (auto it = families.rbegin(); it != families.rend(); ++it) { auto* family = *it; - TOrderedSessions specialSessions; + TLowLoadOrderedSessions specialSessions; auto& sessions = (family->IsCommon()) ? commonSessions : (specialSessions = OrderSessions(family->SpecialSessions)); auto sit = sessions.begin(); @@ -1297,7 +1310,11 @@ void TConsumer::Balance(const TActorContext& ctx) { GetPrefix() << "start rebalancing. familyCount=" << familyCount << ", sessionCount=" << commonSessions.size() << ", desiredFamilyCount=" << desiredFamilyCount << ", allowPlusOne=" << allowPlusOne); - for (auto it = commonSessions.rbegin(); it != commonSessions.rend(); ++it) { + if (!OrderedSessions) { + OrderedSessions.emplace(); + OrderedSessions->insert(commonSessions.begin(), commonSessions.end()); + } + for (auto it = OrderedSessions->begin(); it != OrderedSessions->end(); ++it) { auto* session = *it; auto targerFamilyCount = desiredFamilyCount + (allowPlusOne ? 1 : 0); auto families = OrderFamilies(session->Families); @@ -1308,7 +1325,7 @@ void TConsumer::Balance(const TActorContext& ctx) { } } - if (allowPlusOne && session->ActiveFamilyCount > desiredFamilyCount) { + if (allowPlusOne) { --allowPlusOne; } } @@ -1397,7 +1414,8 @@ TSession::TSession(const TActorId& pipe) , InactivePartitionCount(0) , ReleasingPartitionCount(0) , ActiveFamilyCount(0) - , ReleasingFamilyCount(0) { + , ReleasingFamilyCount(0) + , Order(RandomNumber()) { } bool TSession::WithGroups() const { return !Partitions.empty(); } @@ -1850,18 +1868,23 @@ bool TPartitionFamilyComparator::operator()(const TPartitionFamily* lhs, const T } bool SessionComparator::operator()(const TSession* lhs, const TSession* rhs) const { + if (lhs->Order != rhs->Order) { + return lhs->Order < rhs->Order; + } + return lhs->SessionName < rhs->SessionName; +} + + +bool LowLoadSessionComparator::operator()(const TSession* lhs, const TSession* rhs) const { if (lhs->ActiveFamilyCount != rhs->ActiveFamilyCount) { return lhs->ActiveFamilyCount < rhs->ActiveFamilyCount; } - if (lhs->ActivePartitionCount != rhs->ActivePartitionCount) { - return lhs->ActivePartitionCount < rhs->ActivePartitionCount; - } - if (lhs->InactivePartitionCount != rhs->InactivePartitionCount) { - return lhs->InactivePartitionCount < rhs->InactivePartitionCount; - } if (lhs->Partitions.size() != rhs->Partitions.size()) { return lhs->Partitions.size() < rhs->Partitions.size(); } + if (lhs->Order != rhs->Order) { + return lhs->Order < rhs->Order; + } return lhs->SessionName < rhs->SessionName; } diff --git a/ydb/core/persqueue/read_balancer__balancing.h b/ydb/core/persqueue/read_balancer__balancing.h index 6834524e2478..0b2b7d6576c7 100644 --- a/ydb/core/persqueue/read_balancer__balancing.h +++ b/ydb/core/persqueue/read_balancer__balancing.h @@ -185,6 +185,7 @@ struct TConsumer { std::unordered_map PartitionMapping; // All reading sessions in which the family is currently being read. std::unordered_map Sessions; + std::optional OrderedSessions; // Families is not reading now. std::unordered_map UnreadableFamilies; @@ -276,6 +277,8 @@ struct TSession { // The partition families that are being read by this session. std::unordered_map Families; + size_t Order; + // true if client connected to read from concret partitions bool WithGroups() const; diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_read.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_read.cpp index a2f9d1d34dd2..e1f2383f5bb4 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_read.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_read.cpp @@ -154,6 +154,7 @@ void TPQReadService::TSession::SendEvent(IEventBase* ev) { void TPQReadService::TSession::CreateActor(std::unique_ptr&& topicsHandler) { auto classifier = Proxy->GetClassifier(); + auto g(Guard(Lock)); auto* actor = new TReadSessionActor(this, *topicsHandler, Cookie, SchemeCache, NewSchemeCache, Counters, classifier ? classifier->ClassifyAddress(GetPeerName()) : "unknown"); ui32 poolId = Proxy->ActorSystem->AppData<::NKikimr::TAppData>()->UserPoolId; @@ -204,10 +205,10 @@ ui64 TPQReadService::NextCookie() { void TPQReadService::ReleaseSession(ui64 cookie) { auto g(Guard(Lock)); - bool erased = Sessions.erase(cookie); - if (erased) + if (Sessions.erase(cookie)) { + g.Release(); ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0,0,-1,0)); - + } } void TPQReadService::CheckClusterChange(const TString& localCluster, const bool) { diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp index 4d96d5f7aa6c..1c65728f26dc 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_write.cpp @@ -124,8 +124,9 @@ void TPQWriteServiceImpl::TSession::SendEvent(IEventBase* ev) { std::unique_ptr e; e.reset(ev); - TGuard lock(Lock); + auto lock(Guard(Lock)); if (ActorId) { + lock.Release(); Proxy->ActorSystem->Send(ActorId, e.release()); } } @@ -161,11 +162,10 @@ ui64 TPQWriteServiceImpl::NextCookie() { void TPQWriteServiceImpl::ReleaseSession(TSessionRef session) { - with_lock (Lock) { - bool erased = Sessions.erase(session->GetCookie()); - if (erased) { - ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0, 0, -1, 0)); - } + auto lock(Guard(Lock)); + if (Sessions.erase(session->GetCookie())) { + lock.Release(); + ActorSystem->Send(MakeGRpcProxyStatusID(ActorSystem->NodeId), new TEvGRpcProxyStatus::TEvUpdateStatus(0, 0, -1, 0)); } }