Skip to content

Commit

Permalink
Merge 9529a9e into 030b601
Browse files Browse the repository at this point in the history
  • Loading branch information
vporyadke authored Dec 17, 2024
2 parents 030b601 + 9529a9e commit be56391
Show file tree
Hide file tree
Showing 22 changed files with 345 additions and 127 deletions.
21 changes: 21 additions & 0 deletions ydb/core/mind/hive/data_center_info.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#include "hive.h"
#include "hive_events.h"

namespace NKikimr {
namespace NHive {

struct TDataCenterInfo {
using TFullFollowerGroupId = std::pair<TTabletId, TFollowerGroupId>;
using TFollowerIter = TList<TFollowerTabletInfo>::iterator; // list iterators are not invalidated

std::unordered_set<TNodeId> RegisteredNodes;
bool UpdateScheduled = false;
std::unordered_map<TFullFollowerGroupId, std::vector<TFollowerIter>> Followers;

bool IsRegistered() const {
return !RegisteredNodes.empty();
}
};

} // NHive
} // NKikimr
11 changes: 11 additions & 0 deletions ydb/core/mind/hive/follower_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ struct TFollowerGroup {
}
}

ui32 GetFollowerCountForDataCenter(const TDataCenterId& dc) const {
if (!FollowerCountPerDataCenter) {
return 0;
}
if (NodeFilter.IsAllowedDataCenter(dc)) {
return FollowerCount;
} else {
return 0;
}
}

void SetFollowerCount(ui32 followerCount) {
FollowerCount = followerCount;
}
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/mind/hive/hive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ NMetrics::EResource GetDominantResourceType(const TResourceNormalizedValues& nor
}

TNodeFilter::TNodeFilter(const THive& hive)
: Hive(hive)
: Hive(&hive)
{}

TArrayRef<const TSubDomainKey> TNodeFilter::GetEffectiveAllowedDomains() const {
const auto* objectDomainInfo = Hive.FindDomain(ObjectDomain);
const auto* objectDomainInfo = Hive->FindDomain(ObjectDomain);

if (!objectDomainInfo) {
return {AllowedDomains.begin(), AllowedDomains.end()};
Expand All @@ -100,6 +100,13 @@ TArrayRef<const TSubDomainKey> TNodeFilter::GetEffectiveAllowedDomains() const {
}
}

bool TNodeFilter::IsAllowedDataCenter(TDataCenterId dc) const {
if (AllowedDataCenters.empty()) {
return true;
}
return std::find(AllowedDataCenters.begin(), AllowedDataCenters.end(), dc) != AllowedDataCenters.end();
}

template <typename K, typename V>
std::unordered_map<V, K> MakeReverseMap(const std::unordered_map<K, V>& map) {
std::unordered_map<V, K> result;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/mind/hive/hive.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,13 @@ struct TNodeFilter {
TSubDomainKey ObjectDomain;
TTabletTypes::EType TabletType = TTabletTypes::TypeInvalid;

const THive& Hive;
const THive* Hive;

explicit TNodeFilter(const THive& hive);

TArrayRef<const TSubDomainKey> GetEffectiveAllowedDomains() const;

bool IsAllowedDataCenter(TDataCenterId dc) const;
};

} // NHive
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/mind/hive/hive_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct TEvPrivate {
EvDeleteNode,
EvCanMoveTablets,
EvRefreshScaleRecommendation,
EvUpdateDataCenterFollowers,
EvEnd
};

Expand Down Expand Up @@ -123,6 +124,12 @@ struct TEvPrivate {
struct TEvCanMoveTablets : TEventLocal<TEvCanMoveTablets, EvCanMoveTablets> {};

struct TEvRefreshScaleRecommendation : TEventLocal<TEvRefreshScaleRecommendation, EvRefreshScaleRecommendation> {};

struct TEvUpdateDataCenterFollowers : TEventLocal<TEvUpdateDataCenterFollowers, EvUpdateDataCenterFollowers> {
TDataCenterId DataCenter;

TEvUpdateDataCenterFollowers(TDataCenterId dataCenter) : DataCenter(dataCenter) {};
};
};

} // NHive
Expand Down
154 changes: 86 additions & 68 deletions ydb/core/mind/hive/hive_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ TInstant THive::GetAllowedBootingTime() {
return result;
}

void THive::ExecuteProcessBootQueue(NIceDb::TNiceDb& db, TSideEffects& sideEffects) {
void THive::ExecuteProcessBootQueue(NIceDb::TNiceDb&, TSideEffects& sideEffects) {
TInstant now = TActivationContext::Now();
if (WarmUp) {
TInstant allowed = GetAllowedBootingTime();
Expand Down Expand Up @@ -261,10 +261,6 @@ void THive::ExecuteProcessBootQueue(NIceDb::TNiceDb& db, TSideEffects& sideEffec
}
tablet->ActorsToNotifyOnRestart.clear();
tablet->InWaitQueue = true;
if (tablet->IsFollower()) {
TLeaderTabletInfo& leader = tablet->GetLeader();
UpdateTabletFollowersNumber(leader, db, sideEffects); // this may delete tablet
}
BootQueue.AddToWaitQueue(record); // waiting for new node
continue;
}
Expand Down Expand Up @@ -778,16 +774,11 @@ void THive::Handle(TEvInterconnect::TEvNodeInfo::TPtr &ev) {
}

void THive::Handle(TEvInterconnect::TEvNodesInfo::TPtr &ev) {
THashSet<TDataCenterId> dataCenters;
for (const TEvInterconnect::TNodeInfo& node : ev->Get()->Nodes) {
NodesInfo[node.NodeId] = node;
dataCenters.insert(node.Location.GetDataCenterId());
}
dataCenters.erase(0); // remove default data center id if exists
if (!dataCenters.empty()) {
if (DataCenters != dataCenters.size()) {
DataCenters = dataCenters.size();
BLOG_D("TEvInterconnect::TEvNodesInfo DataCenters=" << DataCenters << " RegisteredDataCenters=" << RegisteredDataCenters);
auto dataCenterId = node.Location.GetDataCenterId();
if (dataCenterId != 0) {
DataCenters[dataCenterId]; // just create entry in hash map
}
}
Execute(CreateLoadEverything());
Expand Down Expand Up @@ -1604,6 +1595,11 @@ void THive::DeleteTablet(TTabletId tabletId) {
}
Y_ENSURE_LOG(nt->second.LockedTablets.count(&tablet) == 0, " Deleting tablet found on node " << nt->first << " in locked set");
}
for (const auto& followerGroup : tablet.FollowerGroups) {
for (auto& [_, dataCenter] : DataCenters) {
dataCenter.Followers.erase({tabletId, followerGroup.Id});
}
}
const i64 tabletsTotalDiff = -1 - (tablet.Followers.size());
UpdateCounterTabletsTotal(tabletsTotalDiff);
UpdateDomainTabletsTotal(tablet.ObjectDomain, tabletsTotalDiff);
Expand Down Expand Up @@ -2733,81 +2729,97 @@ void THive::SendReconnect(const TActorId& local) {
}

ui32 THive::GetDataCenters() {
return DataCenters ? DataCenters : 1;
}

ui32 THive::GetRegisteredDataCenters() {
return RegisteredDataCenters ? RegisteredDataCenters : 1;
}

void THive::UpdateRegisteredDataCenters() {
if (RegisteredDataCenters != RegisteredDataCenterNodes.size()) {
BLOG_D("THive (UpdateRegisteredDC) DataCenters=" << DataCenters << " RegisteredDataCenters=" << RegisteredDataCenters << "->" << RegisteredDataCenterNodes.size());
RegisteredDataCenters = RegisteredDataCenterNodes.size();
}
return DataCenters.size() ? DataCenters.size() : 1;
}

void THive::AddRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId) {
BLOG_D("AddRegisteredDataCentersNode(" << dataCenterId << ", " << nodeId << ")");
if (dataCenterId != 0) { // ignore default data center id if exists
if (RegisteredDataCenterNodes[dataCenterId].insert(nodeId).second) {
if (RegisteredDataCenters != RegisteredDataCenterNodes.size()) {
UpdateRegisteredDataCenters();
}
auto& dataCenter = DataCenters[dataCenterId];
bool wasRegistered = dataCenter.IsRegistered();
dataCenter.RegisteredNodes.insert(nodeId);
if (!wasRegistered && !dataCenter.UpdateScheduled) {
dataCenter.UpdateScheduled = true;
Schedule(TDuration::Seconds(1), new TEvPrivate::TEvUpdateDataCenterFollowers(dataCenterId));
}
}
}

void THive::RemoveRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId) {
BLOG_D("RemoveRegisteredDataCentersNode(" << dataCenterId << ", " << nodeId << ")");
if (dataCenterId != 0) { // ignore default data center id if exists
RegisteredDataCenterNodes[dataCenterId].erase(nodeId);
if (RegisteredDataCenterNodes[dataCenterId].size() == 0) {
RegisteredDataCenterNodes.erase(dataCenterId);
}
if (RegisteredDataCenters != RegisteredDataCenterNodes.size()) {
UpdateRegisteredDataCenters();
auto& dataCenter = DataCenters[dataCenterId];
bool wasRegistered = dataCenter.IsRegistered();
dataCenter.RegisteredNodes.erase(nodeId);
if (wasRegistered && !dataCenter.IsRegistered() && !dataCenter.UpdateScheduled) {
dataCenter.UpdateScheduled = true;
Schedule(TDuration::Seconds(1), new TEvPrivate::TEvUpdateDataCenterFollowers(dataCenterId));
}
}
}

void THive::UpdateTabletFollowersNumber(TLeaderTabletInfo& tablet, NIceDb::TNiceDb& db, TSideEffects& sideEffects) {
BLOG_D("UpdateTabletFollowersNumber Tablet " << tablet.ToString() << " RegisteredDataCenters=" << GetRegisteredDataCenters());
for (TFollowerGroup& group : tablet.FollowerGroups) {
ui32 followerCount = tablet.GetActualFollowerCount(group.Id);
ui32 requiredFollowerCount = group.GetComputedFollowerCount(GetRegisteredDataCenters());

while (followerCount < requiredFollowerCount) {
BLOG_D("UpdateTabletFollowersNumber Tablet " << tablet.ToString() << " is increasing number of followers (" << followerCount << "<" << requiredFollowerCount << ")");

TFollowerTabletInfo& follower = tablet.AddFollower(group);
follower.Statistics.SetLastAliveTimestamp(TlsActivationContext->Now().MilliSeconds());
db.Table<Schema::TabletFollowerTablet>().Key(tablet.Id, follower.Id).Update(
NIceDb::TUpdate<Schema::TabletFollowerTablet::GroupID>(follower.FollowerGroup.Id),
NIceDb::TUpdate<Schema::TabletFollowerTablet::FollowerNode>(0),
NIceDb::TUpdate<Schema::TabletFollowerTablet::Statistics>(follower.Statistics));
follower.InitTabletMetrics();
follower.BecomeStopped();
++followerCount;
}
void THive::CreateTabletFollowers(TLeaderTabletInfo& tablet, NIceDb::TNiceDb& db, TSideEffects& sideEffects) {
BLOG_D("CreateTabletFollowers Tablet " << tablet.ToString());

while (followerCount > requiredFollowerCount) {
BLOG_D("UpdateTabletFollowersNumber Tablet " << tablet.ToString() << " is decreasing number of followers (" << followerCount << ">" << requiredFollowerCount << ")");
// In case tablet already has followers (happens if tablet is modified through CreateTablet), delete them
// But create new ones before deleting old ones, to avoid issues with reusing ids
decltype(tablet.Followers)::iterator oldFollowersIt;
if (tablet.Followers.empty()) {
oldFollowersIt = tablet.Followers.end();
} else {
oldFollowersIt = std::prev(tablet.Followers.end());
}

auto itFollower = tablet.Followers.rbegin();
while (itFollower != tablet.Followers.rend() && itFollower->FollowerGroup.Id != group.Id) {
++itFollower;
for (TFollowerGroup& group : tablet.FollowerGroups) {
if (group.FollowerCountPerDataCenter) {
for (auto& [dataCenterId, dataCenter] : DataCenters) {
if (!dataCenter.IsRegistered()) {
continue;
}
for (ui32 i = 0; i < group.GetFollowerCountForDataCenter(dataCenterId); ++i) {
TFollowerTabletInfo& follower = tablet.AddFollower(group);
follower.NodeFilter.AllowedDataCenters = {dataCenterId};
follower.Statistics.SetLastAliveTimestamp(TlsActivationContext->Now().MilliSeconds());
db.Table<Schema::TabletFollowerTablet>().Key(tablet.Id, follower.Id).Update(
NIceDb::TUpdate<Schema::TabletFollowerTablet::GroupID>(follower.FollowerGroup.Id),
NIceDb::TUpdate<Schema::TabletFollowerTablet::FollowerNode>(0),
NIceDb::TUpdate<Schema::TabletFollowerTablet::Statistics>(follower.Statistics),
NIceDb::TUpdate<Schema::TabletFollowerTablet::DataCenter>(dataCenterId));
follower.InitTabletMetrics();
follower.BecomeStopped();
dataCenter.Followers[{tablet.Id, group.Id}].push_back(std::prev(tablet.Followers.end()));
BLOG_D("Created follower " << follower.GetFullTabletId() << " for dc " << dataCenterId);
}
}
if (itFollower == tablet.Followers.rend()) {
break;
} else {
for (ui32 i = 0; i < group.GetRawFollowerCount(); ++i) {
TFollowerTabletInfo& follower = tablet.AddFollower(group);
follower.Statistics.SetLastAliveTimestamp(TlsActivationContext->Now().MilliSeconds());
db.Table<Schema::TabletFollowerTablet>().Key(tablet.Id, follower.Id).Update(
NIceDb::TUpdate<Schema::TabletFollowerTablet::GroupID>(follower.FollowerGroup.Id),
NIceDb::TUpdate<Schema::TabletFollowerTablet::FollowerNode>(0),
NIceDb::TUpdate<Schema::TabletFollowerTablet::Statistics>(follower.Statistics));
follower.InitTabletMetrics();
follower.BecomeStopped();
BLOG_D("Created follower " << follower.GetFullTabletId());
}
TFollowerTabletInfo& follower = *itFollower;
db.Table<Schema::TabletFollowerTablet>().Key(tablet.Id, follower.Id).Delete();
db.Table<Schema::Metrics>().Key(tablet.Id, follower.Id).Delete();
follower.InitiateStop(sideEffects);
tablet.Followers.erase(std::prev(itFollower.base()));
UpdateCounterTabletsTotal(-1);
--followerCount;

}
}

if (oldFollowersIt == tablet.Followers.end()) {
return;
}
auto endIt = std::next(oldFollowersIt);
for (auto followerIt = tablet.Followers.begin(); followerIt != endIt; ++followerIt) {
TFollowerTabletInfo& follower = *followerIt;
BLOG_D("Deleting follower " << follower.GetFullTabletId());
db.Table<Schema::TabletFollowerTablet>().Key(tablet.Id, follower.Id).Delete();
db.Table<Schema::Metrics>().Key(tablet.Id, follower.Id).Delete();
follower.InitiateStop(sideEffects);
UpdateCounterTabletsTotal(-1);
}
tablet.Followers.erase(tablet.Followers.begin(), endIt);
}

TDuration THive::GetBalancerCooldown(EBalancerType balancerType) const {
Expand Down Expand Up @@ -3035,6 +3047,7 @@ void THive::ProcessEvent(std::unique_ptr<IEventHandle> event) {
hFunc(TEvHive::TEvRequestScaleRecommendation, Handle);
hFunc(TEvPrivate::TEvRefreshScaleRecommendation, Handle);
hFunc(TEvHive::TEvConfigureScaleRecommender, Handle);
hFunc(TEvPrivate::TEvUpdateDataCenterFollowers, Handle);
}
}

Expand Down Expand Up @@ -3140,6 +3153,7 @@ STFUNC(THive::StateWork) {
fFunc(TEvHive::TEvRequestScaleRecommendation::EventType, EnqueueIncomingEvent);
fFunc(TEvPrivate::TEvRefreshScaleRecommendation::EventType, EnqueueIncomingEvent);
fFunc(TEvHive::TEvConfigureScaleRecommender::EventType, EnqueueIncomingEvent);
fFunc(TEvPrivate::TEvUpdateDataCenterFollowers::EventType, EnqueueIncomingEvent);
hFunc(TEvPrivate::TEvProcessIncomingEvent, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
Expand Down Expand Up @@ -3564,6 +3578,10 @@ void THive::Handle(TEvHive::TEvConfigureScaleRecommender::TPtr& ev) {
Execute(CreateConfigureScaleRecommender(ev));
}

void THive::Handle(TEvPrivate::TEvUpdateDataCenterFollowers::TPtr& ev) {
Execute(CreateUpdateDcFollowers(ev->Get()->DataCenter));
}

TVector<TNodeId> THive::GetNodesForWhiteboardBroadcast(size_t maxNodesToReturn) {
TVector<TNodeId> nodes;
TNodeId selfNodeId = SelfId().NodeId();
Expand Down
Loading

0 comments on commit be56391

Please sign in to comment.