Skip to content

Commit

Permalink
Merge a647a62 into f638ce2
Browse files Browse the repository at this point in the history
  • Loading branch information
nshestakov authored Apr 23, 2024
2 parents f638ce2 + a647a62 commit 44a7b34
Show file tree
Hide file tree
Showing 19 changed files with 2,330 additions and 1,539 deletions.
1,175 changes: 103 additions & 1,072 deletions ydb/core/persqueue/read_balancer.cpp

Large diffs are not rendered by default.

253 changes: 27 additions & 226 deletions ydb/core/persqueue/read_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ namespace NPQ {

using namespace NTabletFlatExecutor;

namespace NBalancing {
class TBalancer;
}


struct TPartitionInfo {
ui64 TabletId;
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;
};


class TMetricsTimeKeeper {
public:
TMetricsTimeKeeper(NMetrics::TResourceMetrics* metrics, const TActorContext& ctx)
Expand Down Expand Up @@ -67,36 +78,41 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext& ctx) override;
TString GenerateStat();

void Handle(TEvPersQueue::TEvWakeupClient::TPtr &ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvWakeupReleasePartition::TPtr &ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvDescribe::TPtr &ev, const TActorContext& ctx);

void HandleOnInit(TEvPersQueue::TEvUpdateBalancerConfig::TPtr &ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvUpdateBalancerConfig::TPtr &ev, const TActorContext& ctx);

void HandleOnInit(TEvPersQueue::TEvRegisterReadSession::TPtr &ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvRegisterReadSession::TPtr &ev, const TActorContext& ctx);

void HandleOnInit(TEvPersQueue::TEvGetPartitionsLocation::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvGetPartitionsLocation::TPtr& ev, const TActorContext& ctx);

void Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr &ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvCheckACL::TPtr&, const TActorContext&);
void Handle(TEvPersQueue::TEvGetPartitionIdForWrite::TPtr&, const TActorContext&);

void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext&);
void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TActorContext&);

void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext&);

void Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx);

// Begin balancing
void Handle(TEvPersQueue::TEvPartitionReleased::TPtr& ev, const TActorContext& ctx);

void Handle(TEvPQ::TEvWakeupReleasePartition::TPtr &ev, const TActorContext& ctx);

void Handle(TEvPQ::TEvReadingPartitionStatusRequest::TPtr& ev, const TActorContext& ctx); // from Partition/PQ
void Handle(TEvPersQueue::TEvReadingPartitionStartedRequest::TPtr& ev, const TActorContext& ctx); // from ReadSession
void Handle(TEvPersQueue::TEvReadingPartitionFinishedRequest::TPtr& ev, const TActorContext& ctx); // from ReadSession

void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext&);
void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TActorContext&);

void HandleOnInit(TEvPersQueue::TEvRegisterReadSession::TPtr &ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvRegisterReadSession::TPtr &ev, const TActorContext& ctx);

void Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr &ev, const TActorContext& ctx);
// End balancing

TStringBuilder GetPrefix() const;

TActorId GetPipeClient(const ui64 tabletId, const TActorContext&);
Expand All @@ -116,8 +132,6 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
void GetACL(const TActorContext&);
void AnswerWaitingRequests(const TActorContext& ctx);

void Handle(TEvPersQueue::TEvPartitionReleased::TPtr& ev, const TActorContext& ctx);

void Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvStatsWakeup::TPtr& ev, const TActorContext& ctx);
void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx);
Expand All @@ -126,9 +140,6 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
void Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const TActorContext& ctx);
void Handle(TPartitionScaleRequest::TEvPartitionScaleRequestDone::TPtr& ev, const TActorContext& ctx);

void RegisterSession(const TActorId& pipe, const TActorContext& ctx);
void UnregisterSession(const TActorId& pipe, const TActorContext& ctx);
void RebuildStructs();
ui64 PartitionReserveSize() {
return TopicPartitionReserveSize(TabletConfig);
}
Expand All @@ -154,8 +165,6 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa


struct TConsumerInfo {
NKikimrPQ::EConsumerScalingSupport ScalingSupport;

std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> AggregatedCounters;
THolder<TTabletLabeledCountersBase> Aggr;
};
Expand All @@ -169,24 +178,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
std::vector<TEvPersQueue::TEvCheckACL::TPtr> WaitingACLRequests;
std::vector<TEvPersQueue::TEvDescribe::TPtr> WaitingDescribeRequests;

enum EPartitionState {
EPS_FREE = 0,
EPS_ACTIVE = 1
};

struct TPartitionInfo {
ui64 TabletId;
EPartitionState State;
TActorId Session;
ui32 GroupId;
NSchemeShard::TTopicTabletInfo::TKeyRange KeyRange;

void Unlock() { Session = TActorId(); State = EPS_FREE; };
void Lock(const TActorId& session) { Session = session; State = EPS_ACTIVE; }
};

std::unordered_map<ui32, TPartitionInfo> PartitionsInfo;
std::unordered_map<ui32, std::vector<ui32>> GroupsInfo;

struct TTabletInfo {
ui64 Owner;
Expand All @@ -200,204 +192,15 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
ui32 NextPartitionIdForWrite;
ui32 StartPartitionIdForWrite;
ui32 TotalGroups;
bool NoGroupsInBase;

private:
struct TClientInfo;

struct TReadingPartitionStatus {
// Client had commited rad offset equals EndOffset of the partition
bool Commited = false;
// ReadSession reach EndOffset of the partition
bool ReadingFinished = false;
// ReadSession connected with new SDK with garantee of read order
bool ScaleAwareSDK = false;
// ReadSession reach EndOffset of the partition by first request
bool StartedReadingFromEndOffset = false;

size_t Iteration = 0;
ui64 Cookie = 0;

TActorId LastPipe;

// Generation of PQ-tablet and cookie for synchronization of commit information.
ui32 PartitionGeneration;
ui64 PartitionCookie;

// Return true if the reading of the partition has been finished and children's partitions are readable.
bool IsFinished() const;
// Return true if children's partitions can't be balance separately.
bool NeedReleaseChildren() const;
bool BalanceToOtherPipe() const;

// Called when reading from a partition is started.
// Return true if the reading of the partition has been finished before.
bool StartReading();
// Called when reading from a partition is stopped.
// Return true if children's partitions can't be balance separately.
bool StopReading();

// Called when the partition is inactive and commited offset is equal to EndOffset.
// Return true if the commited status changed.
bool SetCommittedState(ui32 generation, ui64 cookie);
// Called when the partition reading finished.
// Return true if the reading status changed.
bool SetFinishedState(bool scaleAwareSDK, bool startedReadingFromEndOffset);
// Called when the parent partition is reading.
bool Reset();
};

struct TSessionInfo {
TSessionInfo(const TString& session, const TActorId sender, const TString& clientNode, ui32 proxyNodeId, TInstant ts)
: Session(session)
, Sender(sender)
, NumSuspended(0)
, NumActive(0)
, NumInactive(0)
, ClientNode(clientNode)
, ProxyNodeId(proxyNodeId)
, Timestamp(ts)
{}

TString Session;
TActorId Sender;
ui32 NumSuspended;
ui32 NumActive;
ui32 NumInactive;

TString ClientNode;
ui32 ProxyNodeId;
TInstant Timestamp;

void Unlock(bool inactive);
};

struct TClientGroupInfo {
TClientGroupInfo(TClientInfo& clientInfo)
: ClientInfo(clientInfo) {}

TClientInfo& ClientInfo;

TString ClientId;
TString Topic;
ui64 TabletId;
TString Path;
ui32 Generation = 0;
ui64 SessionKeySalt = 0;
ui32* Step = nullptr;

ui32 Group = 0;

std::unordered_map<ui32, TPartitionInfo> PartitionsInfo; // partitionId -> info
std::deque<ui32> FreePartitions;
std::unordered_map<std::pair<TActorId, ui64>, TSessionInfo> SessionsInfo; //map from ActorID and random value - need for reordering sessions in different topics (groups?)

std::pair<TActorId, ui64> SessionKey(const TActorId pipe) const;
bool EraseSession(const TActorId pipe);
TSessionInfo* FindSession(const TActorId pipe);
TSessionInfo* FindSession(ui32 partitionId);

void ScheduleBalance(const TActorContext& ctx);
void Balance(const TActorContext& ctx);

void LockPartition(const TActorId pipe, TSessionInfo& sessionInfo, ui32 partition, const TActorContext& ctx);
void ReleasePartition(const ui32 partitionId, const TActorContext& ctx);
void ReleasePartition(const TActorId pipe, TSessionInfo& sessionInfo, const ui32 count, const TActorContext& ctx);
void ReleasePartition(const TActorId pipe, TSessionInfo& sessionInfo, const std::set<ui32>& partitions, const TActorContext& ctx);
THolder<TEvPersQueue::TEvReleasePartition> MakeEvReleasePartition(const TActorId pipe, const TSessionInfo& sessionInfo, const ui32 count, const std::set<ui32>& partitions);

void FreePartition(ui32 partitionId);
void ActivatePartition(ui32 partitionId);
void InactivatePartition(ui32 partitionId);

TStringBuilder GetPrefix() const;

std::tuple<ui32, ui32, ui32> TotalPartitions() const;
void ReleaseExtraPartitions(ui32 desired, ui32 allowPlusOne, const TActorContext& ctx);
void LockMissingPartitions(ui32 desired,
ui32 allowPlusOne,
const std::function<bool (ui32 partitionId)> partitionPredicate,
const std::function<ssize_t (const TSessionInfo& sessionInfo)> actualExtractor,
const TActorContext& ctx);

bool WakeupScheduled = false;
};
friend class NBalancing::TBalancer;
std::unique_ptr<NBalancing::TBalancer> Balancer;

std::unique_ptr<TPartitionScaleManager> PartitionsScaleManager;

struct TClientInfo {
constexpr static ui32 MAIN_GROUP = 0;

TClientInfo(const TPersQueueReadBalancer& balancer, NKikimrPQ::EConsumerScalingSupport scalingSupport)
: Balancer(balancer)
, ScalingSupport_(scalingSupport) {
}

const TPersQueueReadBalancer& Balancer;
const NKikimrPQ::EConsumerScalingSupport ScalingSupport_;

std::unordered_map<ui32, TClientGroupInfo> ClientGroupsInfo; //map from group to info
std::unordered_map<ui32, TReadingPartitionStatus> ReadingPartitionStatus; // partitionId->status

size_t SessionsWithGroup = 0;

TString ClientId;
TString Topic;
ui64 TabletId;
TString Path;
ui32 Generation = 0;
ui32 Step = 0;

bool ScalingSupport() const;

void KillSessionsWithoutGroup(const TActorContext& ctx);
void MergeGroups(const TActorContext& ctx);
TClientGroupInfo& AddGroup(const ui32 group);
void FillEmptyGroup(const ui32 group, const std::unordered_map<ui32, TPartitionInfo>& partitionsInfo);
void AddSession(const ui32 group, const std::unordered_map<ui32, TPartitionInfo>& partitionsInfo,
const TActorId& sender, const NKikimrPQ::TRegisterReadSession& record);

bool ProccessReadingFinished(ui32 partitionId, const TActorContext& ctx);

TStringBuilder GetPrefix() const;

void UnlockPartition(ui32 partitionId, const TActorContext& ctx);

TReadingPartitionStatus& GetPartitionReadingStatus(ui32 partitionId);

bool IsReadeable(ui32 partitionId) const;
bool IsFinished(ui32 partitionId) const;
bool SetCommittedState(ui32 partitionId, ui32 generation, ui64 cookie);

TClientGroupInfo* FindGroup(ui32 partitionId);
};

std::unordered_map<TString, TClientInfo> ClientsInfo; //map from userId -> to info

private:
struct TPipeInfo {
TPipeInfo()
: ServerActors(0)
{}

TString ClientId; // The consumer name
TString Session;
TActorId Sender;
std::vector<ui32> Groups; // groups which are reading
ui32 ServerActors; // the number of pipes connected from SessionActor to ReadBalancer

// true if client connected to read from concret partitions
bool WithGroups() { return !Groups.empty(); }

void Init(const TString& clientId, const TString& session, const TActorId& sender, const std::vector<ui32>& groups) {
ClientId = clientId;
Session = session;
Sender = sender;
Groups = groups;
}
};

std::unordered_map<TActorId, TPipeInfo> PipesInfo;

NMetrics::TResourceMetrics *ResourceMetrics;

Expand Down Expand Up @@ -486,7 +289,6 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa

switch (ev->GetTypeRewrite()) {
HFunc(TEvPersQueue::TEvUpdateBalancerConfig, HandleOnInit);
HFunc(TEvPersQueue::TEvWakeupClient, Handle);
HFunc(TEvPersQueue::TEvDescribe, Handle);
HFunc(TEvPersQueue::TEvRegisterReadSession, HandleOnInit);
HFunc(TEvPersQueue::TEvGetReadSessionsInfo, Handle);
Expand All @@ -513,7 +315,6 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
HFunc(TEvPersQueue::TEvCheckACL, Handle);
HFunc(TEvPersQueue::TEvGetPartitionIdForWrite, Handle);
HFunc(TEvPersQueue::TEvUpdateBalancerConfig, Handle);
HFunc(TEvPersQueue::TEvWakeupClient, Handle);
HFunc(TEvPersQueue::TEvDescribe, Handle);
HFunc(TEvPersQueue::TEvRegisterReadSession, Handle);
HFunc(TEvPersQueue::TEvGetReadSessionsInfo, Handle);
Expand Down
Loading

0 comments on commit 44a7b34

Please sign in to comment.