Skip to content

Commit

Permalink
Merge d4aba5d into db99cc3
Browse files Browse the repository at this point in the history
  • Loading branch information
vporyadke authored Aug 21, 2024
2 parents db99cc3 + d4aba5d commit feb063e
Show file tree
Hide file tree
Showing 21 changed files with 320 additions and 109 deletions.
21 changes: 21 additions & 0 deletions ydb/core/mind/hive/data_center_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include "hive.h"
#include "hive_events.h"

namespace NKikimr {
namespace NHive {

struct TDataCenterInfo {
using TFullFollowerGroupId = std::pair<TTabletId, TFollowerGroupId>;
using TFollowerIter = TList<TFollowerTabletInfo>::iterator; // list iterators are not invalidated

std::unordered_set<TNodeId> RegisteredNodes;
bool UpdateScheduled = false;
std::unordered_map<TFullFollowerGroupId, std::vector<TFollowerIter>> Followers;

bool IsRegistered() const {
return !RegisteredNodes.empty();
}
};

} // NHive
} // NKikimr
11 changes: 11 additions & 0 deletions ydb/core/mind/hive/follower_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ struct TFollowerGroup {
}
}

ui32 GetFollowerCountForDataCenter(const TDataCenterId& dc) const {
if (!FollowerCountPerDataCenter) {
return 0;
}
if (NodeFilter.IsAllowedDataCenter(dc)) {
return FollowerCount;
} else {
return 0;
}
}

void SetFollowerCount(ui32 followerCount) {
FollowerCount = followerCount;
}
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/mind/hive/hive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ NMetrics::EResource GetDominantResourceType(const TResourceNormalizedValues& nor
}

TNodeFilter::TNodeFilter(const THive& hive)
: Hive(hive)
: Hive(&hive)
{}

TArrayRef<const TSubDomainKey> TNodeFilter::GetEffectiveAllowedDomains() const {
const auto* objectDomainInfo = Hive.FindDomain(ObjectDomain);
const auto* objectDomainInfo = Hive->FindDomain(ObjectDomain);

if (!objectDomainInfo) {
return {AllowedDomains.begin(), AllowedDomains.end()};
Expand All @@ -100,6 +100,13 @@ TArrayRef<const TSubDomainKey> TNodeFilter::GetEffectiveAllowedDomains() const {
}
}

bool TNodeFilter::IsAllowedDataCenter(TDataCenterId dc) const {
if (AllowedDataCenters.empty()) {
return true;
}
return std::find(AllowedDataCenters.begin(), AllowedDataCenters.end(), dc) != AllowedDataCenters.end();
}

template <typename K, typename V>
std::unordered_map<V, K> MakeReverseMap(const std::unordered_map<K, V>& map) {
std::unordered_map<V, K> result;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/mind/hive/hive.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,13 @@ struct TNodeFilter {
TSubDomainKey ObjectDomain;
TTabletTypes::EType TabletType = TTabletTypes::TypeInvalid;

const THive& Hive;
const THive* Hive;

explicit TNodeFilter(const THive& hive);

TArrayRef<const TSubDomainKey> GetEffectiveAllowedDomains() const;

bool IsAllowedDataCenter(TDataCenterId dc) const;
};

} // NHive
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/mind/hive/hive_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct TEvPrivate {
EvStorageBalancerOut,
EvDeleteNode,
EvCanMoveTablets,
EvUpdateDataCenterFollowers,
EvEnd
};

Expand Down Expand Up @@ -120,6 +121,12 @@ struct TEvPrivate {
};

struct TEvCanMoveTablets : TEventLocal<TEvCanMoveTablets, EvCanMoveTablets> {};

struct TEvUpdateDataCenterFollowers : TEventLocal<TEvUpdateDataCenterFollowers, EvUpdateDataCenterFollowers> {
TDataCenterId DataCenter;

TEvUpdateDataCenterFollowers(TDataCenterId dataCenter) : DataCenter(dataCenter) {};
};
};

} // NHive
Expand Down
149 changes: 81 additions & 68 deletions ydb/core/mind/hive/hive_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ TInstant THive::GetAllowedBootingTime() {
return result;
}

void THive::ExecuteProcessBootQueue(NIceDb::TNiceDb& db, TSideEffects& sideEffects) {
void THive::ExecuteProcessBootQueue(NIceDb::TNiceDb&, TSideEffects& sideEffects) {
TInstant now = TActivationContext::Now();
if (WarmUp) {
TInstant allowed = GetAllowedBootingTime();
Expand Down Expand Up @@ -257,10 +257,6 @@ void THive::ExecuteProcessBootQueue(NIceDb::TNiceDb& db, TSideEffects& sideEffec
sideEffects.Send(actorToNotify, new TEvPrivate::TEvRestartComplete(tablet->GetFullTabletId(), "boot delay"));
}
tablet->ActorsToNotifyOnRestart.clear();
if (tablet->IsFollower()) {
TLeaderTabletInfo& leader = tablet->GetLeader();
UpdateTabletFollowersNumber(leader, db, sideEffects);
}
BootQueue.AddToWaitQueue(record); // waiting for new node
continue;
}
Expand Down Expand Up @@ -773,16 +769,11 @@ void THive::Handle(TEvInterconnect::TEvNodeInfo::TPtr &ev) {
}

void THive::Handle(TEvInterconnect::TEvNodesInfo::TPtr &ev) {
THashSet<TDataCenterId> dataCenters;
for (const TEvInterconnect::TNodeInfo& node : ev->Get()->Nodes) {
NodesInfo[node.NodeId] = node;
dataCenters.insert(node.Location.GetDataCenterId());
}
dataCenters.erase(0); // remove default data center id if exists
if (!dataCenters.empty()) {
if (DataCenters != dataCenters.size()) {
DataCenters = dataCenters.size();
BLOG_D("TEvInterconnect::TEvNodesInfo DataCenters=" << DataCenters << " RegisteredDataCenters=" << RegisteredDataCenters);
auto dataCenterId = node.Location.GetDataCenterId();
if (dataCenterId != 0) {
DataCenters[dataCenterId]; // just create entry in hash map
}
}
Execute(CreateLoadEverything());
Expand Down Expand Up @@ -2725,81 +2716,97 @@ void THive::SendReconnect(const TActorId& local) {
}

ui32 THive::GetDataCenters() {
return DataCenters ? DataCenters : 1;
}

ui32 THive::GetRegisteredDataCenters() {
return RegisteredDataCenters ? RegisteredDataCenters : 1;
}

void THive::UpdateRegisteredDataCenters() {
if (RegisteredDataCenters != RegisteredDataCenterNodes.size()) {
BLOG_D("THive (UpdateRegisteredDC) DataCenters=" << DataCenters << " RegisteredDataCenters=" << RegisteredDataCenters << "->" << RegisteredDataCenterNodes.size());
RegisteredDataCenters = RegisteredDataCenterNodes.size();
}
return DataCenters.size() ? DataCenters.size() : 1;
}

void THive::AddRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId) {
BLOG_D("AddRegisteredDataCentersNode(" << dataCenterId << ", " << nodeId << ")");
if (dataCenterId != 0) { // ignore default data center id if exists
if (RegisteredDataCenterNodes[dataCenterId].insert(nodeId).second) {
if (RegisteredDataCenters != RegisteredDataCenterNodes.size()) {
UpdateRegisteredDataCenters();
}
auto& dataCenter = DataCenters[dataCenterId];
bool wasRegistered = dataCenter.IsRegistered();
dataCenter.RegisteredNodes.insert(nodeId);
if (!wasRegistered && !dataCenter.UpdateScheduled) {
dataCenter.UpdateScheduled = true;
Schedule(TDuration::Seconds(1), new TEvPrivate::TEvUpdateDataCenterFollowers(dataCenterId));
}
}
}

void THive::RemoveRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId) {
BLOG_D("RemoveRegisteredDataCentersNode(" << dataCenterId << ", " << nodeId << ")");
if (dataCenterId != 0) { // ignore default data center id if exists
RegisteredDataCenterNodes[dataCenterId].erase(nodeId);
if (RegisteredDataCenterNodes[dataCenterId].size() == 0) {
RegisteredDataCenterNodes.erase(dataCenterId);
}
if (RegisteredDataCenters != RegisteredDataCenterNodes.size()) {
UpdateRegisteredDataCenters();
auto& dataCenter = DataCenters[dataCenterId];
bool wasRegistered = dataCenter.IsRegistered();
dataCenter.RegisteredNodes.erase(nodeId);
if (wasRegistered && !dataCenter.IsRegistered() && !dataCenter.UpdateScheduled) {
dataCenter.UpdateScheduled = true;
Schedule(TDuration::Seconds(1), new TEvPrivate::TEvUpdateDataCenterFollowers(dataCenterId));
}
}
}

void THive::UpdateTabletFollowersNumber(TLeaderTabletInfo& tablet, NIceDb::TNiceDb& db, TSideEffects& sideEffects) {
BLOG_D("UpdateTabletFollowersNumber Tablet " << tablet.ToString() << " RegisteredDataCenters=" << GetRegisteredDataCenters());
for (TFollowerGroup& group : tablet.FollowerGroups) {
ui32 followerCount = tablet.GetActualFollowerCount(group.Id);
ui32 requiredFollowerCount = group.GetComputedFollowerCount(GetRegisteredDataCenters());

while (followerCount < requiredFollowerCount) {
BLOG_D("UpdateTabletFollowersNumber Tablet " << tablet.ToString() << " is increasing number of followers (" << followerCount << "<" << requiredFollowerCount << ")");

TFollowerTabletInfo& follower = tablet.AddFollower(group);
follower.Statistics.SetLastAliveTimestamp(TlsActivationContext->Now().MilliSeconds());
db.Table<Schema::TabletFollowerTablet>().Key(tablet.Id, follower.Id).Update(
NIceDb::TUpdate<Schema::TabletFollowerTablet::GroupID>(follower.FollowerGroup.Id),
NIceDb::TUpdate<Schema::TabletFollowerTablet::FollowerNode>(0),
NIceDb::TUpdate<Schema::TabletFollowerTablet::Statistics>(follower.Statistics));
follower.InitTabletMetrics();
follower.BecomeStopped();
++followerCount;
}
void THive::CreateTabletFollowers(TLeaderTabletInfo& tablet, NIceDb::TNiceDb& db, TSideEffects& sideEffects) {
BLOG_D("CreateTabletFollowers Tablet " << tablet.ToString());

while (followerCount > requiredFollowerCount) {
BLOG_D("UpdateTabletFollowersNumber Tablet " << tablet.ToString() << " is decreasing number of followers (" << followerCount << ">" << requiredFollowerCount << ")");
// In case tablet already has followers (happens if tablet is modified through CreateTablet), delete them
// But create new ones before deleting old ones, to avoid issues with reusing ids
decltype(tablet.Followers)::iterator oldFollowersIt;
if (tablet.Followers.empty()) {
oldFollowersIt = tablet.Followers.end();
} else {
oldFollowersIt = std::prev(tablet.Followers.end());
}

auto itFollower = tablet.Followers.rbegin();
while (itFollower != tablet.Followers.rend() && itFollower->FollowerGroup.Id != group.Id) {
++itFollower;
for (TFollowerGroup& group : tablet.FollowerGroups) {
if (group.FollowerCountPerDataCenter) {
for (auto& [dataCenterId, dataCenter] : DataCenters) {
if (!dataCenter.IsRegistered()) {
continue;
}
for (ui32 i = 0; i < group.GetFollowerCountForDataCenter(dataCenterId); ++i) {
TFollowerTabletInfo& follower = tablet.AddFollower(group);
follower.NodeFilter.AllowedDataCenters = {dataCenterId};
follower.Statistics.SetLastAliveTimestamp(TlsActivationContext->Now().MilliSeconds());
db.Table<Schema::TabletFollowerTablet>().Key(tablet.Id, follower.Id).Update(
NIceDb::TUpdate<Schema::TabletFollowerTablet::GroupID>(follower.FollowerGroup.Id),
NIceDb::TUpdate<Schema::TabletFollowerTablet::FollowerNode>(0),
NIceDb::TUpdate<Schema::TabletFollowerTablet::Statistics>(follower.Statistics),
NIceDb::TUpdate<Schema::TabletFollowerTablet::DataCenter>(dataCenterId));
follower.InitTabletMetrics();
follower.BecomeStopped();
dataCenter.Followers[{tablet.Id, group.Id}].push_back(std::prev(tablet.Followers.end()));
BLOG_D("Created follower " << follower.GetFullTabletId() << " for dc " << dataCenterId);
}
}
if (itFollower == tablet.Followers.rend()) {
break;
} else {
for (ui32 i = 0; i < group.GetRawFollowerCount(); ++i) {
TFollowerTabletInfo& follower = tablet.AddFollower(group);
follower.Statistics.SetLastAliveTimestamp(TlsActivationContext->Now().MilliSeconds());
db.Table<Schema::TabletFollowerTablet>().Key(tablet.Id, follower.Id).Update(
NIceDb::TUpdate<Schema::TabletFollowerTablet::GroupID>(follower.FollowerGroup.Id),
NIceDb::TUpdate<Schema::TabletFollowerTablet::FollowerNode>(0),
NIceDb::TUpdate<Schema::TabletFollowerTablet::Statistics>(follower.Statistics));
follower.InitTabletMetrics();
follower.BecomeStopped();
BLOG_D("Created follower " << follower.GetFullTabletId());
}
TFollowerTabletInfo& follower = *itFollower;
db.Table<Schema::TabletFollowerTablet>().Key(tablet.Id, follower.Id).Delete();
db.Table<Schema::Metrics>().Key(tablet.Id, follower.Id).Delete();
follower.InitiateStop(sideEffects);
tablet.Followers.erase(std::prev(itFollower.base()));
UpdateCounterTabletsTotal(-1);
--followerCount;

}
}

if (oldFollowersIt == tablet.Followers.end()) {
return;
}
auto endIt = std::next(oldFollowersIt);
for (auto followerIt = tablet.Followers.begin(); followerIt != endIt; ++followerIt) {
TFollowerTabletInfo& follower = *followerIt;
BLOG_D("Deleting follower " << follower.GetFullTabletId());
db.Table<Schema::TabletFollowerTablet>().Key(tablet.Id, follower.Id).Delete();
db.Table<Schema::Metrics>().Key(tablet.Id, follower.Id).Delete();
follower.InitiateStop(sideEffects);
UpdateCounterTabletsTotal(-1);
}
tablet.Followers.erase(tablet.Followers.begin(), endIt);
}

TDuration THive::GetBalancerCooldown(EBalancerType balancerType) const {
Expand Down Expand Up @@ -3024,6 +3031,7 @@ void THive::ProcessEvent(std::unique_ptr<IEventHandle> event) {
hFunc(TEvHive::TEvUpdateDomain, Handle);
hFunc(TEvPrivate::TEvDeleteNode, Handle);
hFunc(TEvHive::TEvRequestTabletDistribution, Handle);
hFunc(TEvPrivate::TEvUpdateDataCenterFollowers, Handle);
}
}

Expand Down Expand Up @@ -3126,6 +3134,7 @@ STFUNC(THive::StateWork) {
fFunc(TEvPrivate::TEvProcessStorageBalancer::EventType, EnqueueIncomingEvent);
fFunc(TEvPrivate::TEvDeleteNode::EventType, EnqueueIncomingEvent);
fFunc(TEvHive::TEvRequestTabletDistribution::EventType, EnqueueIncomingEvent);
fFunc(TEvPrivate::TEvUpdateDataCenterFollowers::EventType, EnqueueIncomingEvent);
hFunc(TEvPrivate::TEvProcessIncomingEvent, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
Expand Down Expand Up @@ -3423,6 +3432,10 @@ void THive::Handle(TEvHive::TEvRequestTabletDistribution::TPtr& ev) {
Send(ev->Sender, response.release());
}

void THive::Handle(TEvPrivate::TEvUpdateDataCenterFollowers::TPtr& ev) {
Execute(CreateUpdateDcFollowers(ev->Get()->DataCenter));
}

TVector<TNodeId> THive::GetNodesForWhiteboardBroadcast(size_t maxNodesToReturn) {
TVector<TNodeId> nodes;
TNodeId selfNodeId = SelfId().NodeId();
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/mind/hive/hive_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "sequencer.h"
#include "boot_queue.h"
#include "object_distribution.h"
#include "data_center_info.h"

#define DEPRECATED_CTX (ActorContext())
#define DEPRECATED_NOW (TActivationContext::Now())
Expand Down Expand Up @@ -239,6 +240,7 @@ class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveShar
friend class TTxUpdateTabletGroups;
friend class TTxMonEvent_TabletAvailability;
friend class TLoggedMonTransaction;
friend class TTxUpdateDcFollowers;

friend class TDeleteTabletActor;

Expand Down Expand Up @@ -301,6 +303,7 @@ class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveShar
ITransaction* CreateRequestTabletOwners(TEvHive::TEvRequestTabletOwners::TPtr event);
ITransaction* CreateUpdateTabletsObject(TEvHive::TEvUpdateTabletsObject::TPtr event);
ITransaction* CreateUpdateDomain(TSubDomainKey subdomainKey, TEvHive::TEvUpdateDomain::TPtr event = {});
ITransaction* CreateUpdateDcFollowers(const TDataCenterId& dc);

public:
TDomainsView DomainsView;
Expand Down Expand Up @@ -329,8 +332,6 @@ class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveShar
ui32 ConfigurationGeneration = 0;
ui64 TabletsTotal = 0;
ui64 TabletsAlive = 0;
ui32 DataCenters = 1;
ui32 RegisteredDataCenters = 1;
TObjectDistributions ObjectDistributions;
double StorageScatter = 0;
std::set<TTabletTypes::EType> SeenTabletTypes;
Expand Down Expand Up @@ -448,7 +449,7 @@ class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveShar
TDuration NodeBrokerEpoch;
std::unordered_map<TTabletTypes::EType, NKikimrConfig::THiveTabletLimit> TabletLimit; // built from CurrentConfig
std::unordered_map<TTabletTypes::EType, NKikimrHive::TDataCentersPreference> DefaultDataCentersPreference;
std::unordered_map<TDataCenterId, std::unordered_set<TNodeId>> RegisteredDataCenterNodes;
std::unordered_map<TDataCenterId, TDataCenterInfo> DataCenters;
std::unordered_set<TNodeId> ConnectedNodes;

// normalized to be sorted list of unique values
Expand Down Expand Up @@ -574,6 +575,7 @@ class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveShar
void Handle(TEvHive::TEvUpdateDomain::TPtr& ev);
void Handle(TEvPrivate::TEvDeleteNode::TPtr& ev);
void Handle(TEvHive::TEvRequestTabletDistribution::TPtr& ev);
void Handle(TEvPrivate::TEvUpdateDataCenterFollowers::TPtr& ev);

protected:
void RestartPipeTx(ui64 tabletId);
Expand Down Expand Up @@ -678,8 +680,6 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
void FillTabletInfo(NKikimrHive::TEvResponseHiveInfo& response, ui64 tabletId, const TLeaderTabletInfo* info, const NKikimrHive::TEvRequestHiveInfo& req);
void ExecuteStartTablet(TFullTabletId tabletId, const TActorId& local, ui64 cookie, bool external);
ui32 GetDataCenters();
ui32 GetRegisteredDataCenters();
void UpdateRegisteredDataCenters();
void AddRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId);
void RemoveRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId);
void QueuePing(const TActorId& local);
Expand All @@ -692,7 +692,7 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
void StopTablet(const TActorId& local, const TTabletInfo& tablet);
void StopTablet(const TActorId& local, TFullTabletId tabletId);
void ExecuteProcessBootQueue(NIceDb::TNiceDb& db, TSideEffects& sideEffects);
void UpdateTabletFollowersNumber(TLeaderTabletInfo& tablet, NIceDb::TNiceDb& db, TSideEffects& sideEffects);
void CreateTabletFollowers(TLeaderTabletInfo& tablet, NIceDb::TNiceDb& db, TSideEffects& sideEffects);
TDuration GetBalancerCooldown(EBalancerType balancerType) const;
void UpdateObjectCount(const TLeaderTabletInfo& tablet, const TNodeInfo& node, i64 diff);
ui64 GetObjectImbalance(TFullObjectId object);
Expand Down
Loading

0 comments on commit feb063e

Please sign in to comment.