Skip to content

Commit

Permalink
control inflight pings in hive (#6916) (#7238)
Browse files Browse the repository at this point in the history
  • Loading branch information
vporyadke authored Aug 9, 2024
1 parent 7beef2e commit d65a0a9
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 1 deletion.
32 changes: 32 additions & 0 deletions ydb/core/mind/hive/hive_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ void THive::Handle(TEvPrivate::TEvBootTablets::TPtr&) {
for (auto* node : unimportantNodes) {
node->Ping();
}
ProcessNodePingQueue();
TVector<TTabletId> tabletsToReleaseFromParent;
TSideEffects sideEffects;
sideEffects.Reset(SelfId());
Expand Down Expand Up @@ -685,11 +686,13 @@ void THive::Cleanup() {

void THive::Handle(TEvLocal::TEvStatus::TPtr& ev) {
BLOG_D("Handle TEvLocal::TEvStatus for Node " << ev->Sender.NodeId() << ": " << ev->Get()->Record.ShortDebugString());
RemoveFromPingInProgress(ev->Sender.NodeId());
Execute(CreateStatus(ev->Sender, ev->Get()->Record));
}

void THive::Handle(TEvLocal::TEvSyncTablets::TPtr& ev) {
BLOG_D("THive::Handle::TEvSyncTablets");
RemoveFromPingInProgress(ev->Sender.NodeId());
Execute(CreateSyncTablets(ev->Sender, ev->Get()->Record));
}

Expand Down Expand Up @@ -743,6 +746,7 @@ void THive::Handle(TEvInterconnect::TEvNodeConnected::TPtr &ev) {
void THive::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) {
TNodeId nodeId = ev->Get()->NodeId;
BLOG_W("Handle TEvInterconnect::TEvNodeDisconnected, NodeId " << nodeId);
RemoveFromPingInProgress(nodeId);
if (ConnectedNodes.erase(nodeId)) {
UpdateCounterNodesConnected(-1);
}
Expand Down Expand Up @@ -915,6 +919,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
case TEvLocal::EvPing: {
TNodeId nodeId = ev->Cookie;
TNodeInfo* node = FindNode(nodeId);
NodePingsInProgress.erase(nodeId);
if (node != nullptr && ev->Sender == node->Local) {
if (node->IsDisconnecting()) {
// ping continiousily until we fully disconnected from the node
Expand All @@ -923,6 +928,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
KillNode(node->Id, node->Local);
}
}
ProcessNodePingQueue();
break;
}
};
Expand Down Expand Up @@ -1686,6 +1692,13 @@ void THive::UpdateCounterNodesConnected(i64 nodesConnectedDiff) {
}
}

void THive::UpdateCounterPingQueueSize() {
if (TabletCounters != nullptr) {
auto& counter = TabletCounters->Simple()[NHive::COUNTER_PINGQUEUE_SIZE];
counter.Set(NodePingQueue.size());
}
}

void THive::RecordTabletMove(const TTabletMoveInfo& moveInfo) {
TabletMoveHistory.PushBack(moveInfo);
TabletCounters->Cumulative()[NHive::COUNTER_TABLETS_MOVED].Increment(1);
Expand Down Expand Up @@ -2662,6 +2675,25 @@ void THive::ExecuteStartTablet(TFullTabletId tabletId, const TActorId& local, ui
Execute(CreateStartTablet(tabletId, local, cookie, external));
}

void THive::QueuePing(const TActorId& local) {
NodePingQueue.push(local);
}

void THive::ProcessNodePingQueue() {
while (!NodePingQueue.empty() && NodePingsInProgress.size() < GetMaxPingsInFlight()) {
TActorId local = NodePingQueue.front();
TNodeId node = local.NodeId();
NodePingQueue.pop();
NodePingsInProgress.insert(node);
SendPing(local, node);
}
}

void THive::RemoveFromPingInProgress(TNodeId node) {
NodePingsInProgress.erase(node);
ProcessNodePingQueue();
}

void THive::SendPing(const TActorId& local, TNodeId id) {
Send(local,
new TEvLocal::TEvPing(HiveId,
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/mind/hive/hive_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveShar
TEventPriorityQueue<THive> EventQueue{*this};
ui64 OperationsLogIndex = 0;
std::vector<TActorId> ActorsWaitingToMoveTablets;
std::queue<TActorId> NodePingQueue;
std::unordered_set<TNodeId> NodePingsInProgress;

struct TPendingCreateTablet {
NKikimrHive::TEvCreateTablet CreateTablet;
Expand Down Expand Up @@ -649,6 +651,7 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
void UpdateCounterBootQueueSize(ui64 bootQueueSize);
void UpdateCounterEventQueueSize(i64 eventQueueSizeDiff);
void UpdateCounterNodesConnected(i64 nodesConnectedDiff);
void UpdateCounterPingQueueSize();
void RecordTabletMove(const TTabletMoveInfo& info);
bool DomainHasNodes(const TSubDomainKey &domainKey) const;
void ProcessBootQueue();
Expand Down Expand Up @@ -677,7 +680,10 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
void UpdateRegisteredDataCenters();
void AddRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId);
void RemoveRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId);
void QueuePing(const TActorId& local);
void SendPing(const TActorId& local, TNodeId id);
void RemoveFromPingInProgress(TNodeId node);
void ProcessNodePingQueue();
void SendReconnect(const TActorId& local);
static THolder<TGroupFilter> BuildGroupParametersForChannel(const TLeaderTabletInfo& tablet, ui32 channelId);
void KickTablet(const TTabletInfo& tablet);
Expand Down Expand Up @@ -938,8 +944,13 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
return CurrentConfig.GetNodeUsageRangeToKick();
}

ui64 GetMaxPingsInFlight() const {
return CurrentConfig.GetMaxPingsInFlight();
}

bool GetLessSystemTabletsMoves() const {
return CurrentConfig.GetLessSystemTabletsMoves();

}

static void ActualizeRestartStatistics(google::protobuf::RepeatedField<google::protobuf::uint64>& restartTimestamps, ui64 barrier);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/mind/hive/node_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ void TNodeInfo::DeregisterInDomains() {
void TNodeInfo::Ping() {
Y_ABORT_UNLESS((bool)Local);
BLOG_D("Node(" << Id << ") Ping(" << Local << ")");
Hive.SendPing(Local, Id);
Hive.QueuePing(Local);
}

void TNodeInfo::SendReconnect(const TActorId& local) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/mind/hive/tx__register_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ class TTxRegisterNode : public TTransactionBase<THive> {
BLOG_D("THive::TTxRegisterNode(" << Local.NodeId() << ")::Complete");
TNodeInfo* node = Self->FindNode(Local.NodeId());
if (node != nullptr && node->Local) { // we send ping on every RegisterNode because we want to re-sync tablets upon every reconnection
Self->NodePingsInProgress.erase(node->Id);
node->Ping();
Self->ProcessNodePingQueue();
}
}
};
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,7 @@ message THiveConfig {
optional bool EnableDestroyOperations = 74 [default = false];
optional double NodeUsageRangeToKick = 75 [default = 0.2];
optional bool LessSystemTabletsMoves = 77 [default = true];
optional uint64 MaxPingsInFlight = 78 [default = 1000];
}

message TBlobCacheConfig {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/counters_hive.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ enum ESimpleCounters {
COUNTER_IMBALANCED_OBJECTS = 19 [(CounterOpts) = {Name: "ImbalancedObjects"}];
COUNTER_WORST_OBJECT_VARIANCE = 20 [(CounterOpts) = {Name: "WorstObjectVariance"}];
COUNTER_STORAGE_SCATTER = 21 [(CounterOpts) = {Name: "StorageScatter"}];
RESERVED22 = 22;
COUNTER_PINGQUEUE_SIZE = 23 [(CounterOpts) = {Name: "PingQueueSize"}];
}

enum ECumulativeCounters {
Expand Down

0 comments on commit d65a0a9

Please sign in to comment.