From 33ba19db52ae1e8a9ec0c112396cda791aa6ab5f Mon Sep 17 00:00:00 2001 From: vporyadke Date: Wed, 24 Jul 2024 11:42:21 +0300 Subject: [PATCH] control inflight pings in hive (#6916) --- ydb/core/mind/hive/hive_impl.cpp | 36 +++++++++++++++++++++++- ydb/core/mind/hive/hive_impl.h | 10 +++++++ ydb/core/mind/hive/node_info.cpp | 2 +- ydb/core/mind/hive/tx__register_node.cpp | 2 ++ ydb/core/protos/config.proto | 1 + ydb/core/protos/counters_hive.proto | 1 + 6 files changed, 50 insertions(+), 2 deletions(-) diff --git a/ydb/core/mind/hive/hive_impl.cpp b/ydb/core/mind/hive/hive_impl.cpp index b4d08484fc7c..0ae89f98c5a9 100644 --- a/ydb/core/mind/hive/hive_impl.cpp +++ b/ydb/core/mind/hive/hive_impl.cpp @@ -495,6 +495,7 @@ void THive::Handle(TEvPrivate::TEvBootTablets::TPtr&) { for (auto* node : unimportantNodes) { node->Ping(); } + ProcessNodePingQueue(); TVector tabletsToReleaseFromParent; TSideEffects sideEffects; sideEffects.Reset(SelfId()); @@ -685,11 +686,13 @@ void THive::Cleanup() { void THive::Handle(TEvLocal::TEvStatus::TPtr& ev) { BLOG_D("Handle TEvLocal::TEvStatus for Node " << ev->Sender.NodeId() << ": " << ev->Get()->Record.ShortDebugString()); + RemoveFromPingInProgress(ev->Sender.NodeId()); Execute(CreateStatus(ev->Sender, ev->Get()->Record)); } void THive::Handle(TEvLocal::TEvSyncTablets::TPtr& ev) { BLOG_D("THive::Handle::TEvSyncTablets"); + RemoveFromPingInProgress(ev->Sender.NodeId()); Execute(CreateSyncTablets(ev->Sender, ev->Get()->Record)); } @@ -742,7 +745,10 @@ void THive::Handle(TEvInterconnect::TEvNodeConnected::TPtr &ev) { void THive::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) { TNodeId nodeId = ev->Get()->NodeId; BLOG_W("Handle TEvInterconnect::TEvNodeDisconnected, NodeId " << nodeId); - ConnectedNodes.erase(nodeId); + RemoveFromPingInProgress(nodeId); + if (ConnectedNodes.erase(nodeId)) { + UpdateCounterNodesConnected(-1); + } Execute(CreateDisconnectNode(THolder(ev->Release().Release()))); } @@ -912,6 +918,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) { case TEvLocal::EvPing: { TNodeId nodeId = ev->Cookie; TNodeInfo* node = FindNode(nodeId); + NodePingsInProgress.erase(nodeId); if (node != nullptr && ev->Sender == node->Local) { if (node->IsDisconnecting()) { // ping continiousily until we fully disconnected from the node @@ -920,6 +927,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) { KillNode(node->Id, node->Local); } } + ProcessNodePingQueue(); break; } }; @@ -1684,6 +1692,13 @@ void THive::UpdateCounterNodesConnected(i64 nodesConnectedDiff) { } } +void THive::UpdateCounterPingQueueSize() { + if (TabletCounters != nullptr) { + auto& counter = TabletCounters->Simple()[NHive::COUNTER_PINGQUEUE_SIZE]; + counter.Set(NodePingQueue.size()); + } +} + void THive::RecordTabletMove(const TTabletMoveInfo& moveInfo) { TabletMoveHistory.PushBack(moveInfo); TabletCounters->Cumulative()[NHive::COUNTER_TABLETS_MOVED].Increment(1); @@ -2648,6 +2663,25 @@ void THive::ExecuteStartTablet(TFullTabletId tabletId, const TActorId& local, ui Execute(CreateStartTablet(tabletId, local, cookie, external)); } +void THive::QueuePing(const TActorId& local) { + NodePingQueue.push(local); +} + +void THive::ProcessNodePingQueue() { + while (!NodePingQueue.empty() && NodePingsInProgress.size() < GetMaxPingsInFlight()) { + TActorId local = NodePingQueue.front(); + TNodeId node = local.NodeId(); + NodePingQueue.pop(); + NodePingsInProgress.insert(node); + SendPing(local, node); + } +} + +void THive::RemoveFromPingInProgress(TNodeId node) { + NodePingsInProgress.erase(node); + ProcessNodePingQueue(); +} + void THive::SendPing(const TActorId& local, TNodeId id) { Send(local, new TEvLocal::TEvPing(HiveId, diff --git a/ydb/core/mind/hive/hive_impl.h b/ydb/core/mind/hive/hive_impl.h index 8a9cd3f2969a..d0bf197dff91 100644 --- a/ydb/core/mind/hive/hive_impl.h +++ b/ydb/core/mind/hive/hive_impl.h @@ -409,6 +409,8 @@ class THive : public TActor, public TTabletExecutedFlat, public THiveShar TOwnershipKeeper Keeper; TEventPriorityQueue EventQueue{*this}; std::vector ActorsWaitingToMoveTablets; + std::queue NodePingQueue; + std::unordered_set NodePingsInProgress; struct TPendingCreateTablet { NKikimrHive::TEvCreateTablet CreateTablet; @@ -642,6 +644,7 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId void UpdateCounterBootQueueSize(ui64 bootQueueSize); void UpdateCounterEventQueueSize(i64 eventQueueSizeDiff); void UpdateCounterNodesConnected(i64 nodesConnectedDiff); + void UpdateCounterPingQueueSize(); void RecordTabletMove(const TTabletMoveInfo& info); bool DomainHasNodes(const TSubDomainKey &domainKey) const; void ProcessBootQueue(); @@ -670,7 +673,10 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId void UpdateRegisteredDataCenters(); void AddRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId); void RemoveRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId); + void QueuePing(const TActorId& local); void SendPing(const TActorId& local, TNodeId id); + void RemoveFromPingInProgress(TNodeId node); + void ProcessNodePingQueue(); void SendReconnect(const TActorId& local); static THolder BuildGroupParametersForChannel(const TLeaderTabletInfo& tablet, ui32 channelId); void KickTablet(const TTabletInfo& tablet); @@ -923,6 +929,10 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId return CurrentConfig.GetStorageBalancerInflight(); } + ui64 GetMaxPingsInFlight() const { + return CurrentConfig.GetMaxPingsInFlight(); + } + static void ActualizeRestartStatistics(google::protobuf::RepeatedField& restartTimestamps, ui64 barrier); static ui64 GetRestartsPerPeriod(const google::protobuf::RepeatedField& restartTimestamps, ui64 barrier); static bool IsSystemTablet(TTabletTypes::EType type); diff --git a/ydb/core/mind/hive/node_info.cpp b/ydb/core/mind/hive/node_info.cpp index bc06531bfcfd..120dc1207a38 100644 --- a/ydb/core/mind/hive/node_info.cpp +++ b/ydb/core/mind/hive/node_info.cpp @@ -337,7 +337,7 @@ void TNodeInfo::DeregisterInDomains() { void TNodeInfo::Ping() { Y_ABORT_UNLESS((bool)Local); BLOG_D("Node(" << Id << ") Ping(" << Local << ")"); - Hive.SendPing(Local, Id); + Hive.QueuePing(Local); } void TNodeInfo::SendReconnect(const TActorId& local) { diff --git a/ydb/core/mind/hive/tx__register_node.cpp b/ydb/core/mind/hive/tx__register_node.cpp index f471a13ba5d0..97a3e8f66d9f 100644 --- a/ydb/core/mind/hive/tx__register_node.cpp +++ b/ydb/core/mind/hive/tx__register_node.cpp @@ -77,7 +77,9 @@ class TTxRegisterNode : public TTransactionBase { BLOG_D("THive::TTxRegisterNode(" << Local.NodeId() << ")::Complete"); TNodeInfo* node = Self->FindNode(Local.NodeId()); if (node != nullptr && node->Local) { // we send ping on every RegisterNode because we want to re-sync tablets upon every reconnection + Self->NodePingsInProgress.erase(node->Id); node->Ping(); + Self->ProcessNodePingQueue(); } } }; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 6c2fe1a961ef..09d4e74e06d3 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1479,6 +1479,7 @@ message THiveConfig { optional double MinStorageScatterToBalance = 71 [default = 999]; // storage balancer trigger is disabled by default optional double MinGroupUsageToBalance = 72 [default = 0.1]; optional uint64 StorageBalancerInflight = 73 [default = 1]; + optional uint64 MaxPingsInFlight = 78 [default = 1000]; } message TColumnShardConfig { diff --git a/ydb/core/protos/counters_hive.proto b/ydb/core/protos/counters_hive.proto index 47dc89070b26..029b3d4a9a8b 100644 --- a/ydb/core/protos/counters_hive.proto +++ b/ydb/core/protos/counters_hive.proto @@ -29,6 +29,7 @@ enum ESimpleCounters { COUNTER_IMBALANCED_OBJECTS = 19 [(CounterOpts) = {Name: "ImbalancedObjects"}]; COUNTER_WORST_OBJECT_VARIANCE = 20 [(CounterOpts) = {Name: "WorstObjectVariance"}]; COUNTER_STORAGE_SCATTER = 21 [(CounterOpts) = {Name: "StorageScatter"}]; + COUNTER_PINGQUEUE_SIZE = 23 [(CounterOpts) = {Name: "PingQueueSize"}]; } enum ECumulativeCounters {