Skip to content

Commit

Permalink
control inflight pings in hive (ydb-platform#6916)
Browse files Browse the repository at this point in the history
  • Loading branch information
vporyadke committed Jul 30, 2024
1 parent 921d67c commit 33ba19d
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 2 deletions.
36 changes: 35 additions & 1 deletion ydb/core/mind/hive/hive_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ void THive::Handle(TEvPrivate::TEvBootTablets::TPtr&) {
for (auto* node : unimportantNodes) {
node->Ping();
}
ProcessNodePingQueue();
TVector<TTabletId> tabletsToReleaseFromParent;
TSideEffects sideEffects;
sideEffects.Reset(SelfId());
Expand Down Expand Up @@ -685,11 +686,13 @@ void THive::Cleanup() {

void THive::Handle(TEvLocal::TEvStatus::TPtr& ev) {
BLOG_D("Handle TEvLocal::TEvStatus for Node " << ev->Sender.NodeId() << ": " << ev->Get()->Record.ShortDebugString());
RemoveFromPingInProgress(ev->Sender.NodeId());
Execute(CreateStatus(ev->Sender, ev->Get()->Record));
}

void THive::Handle(TEvLocal::TEvSyncTablets::TPtr& ev) {
BLOG_D("THive::Handle::TEvSyncTablets");
RemoveFromPingInProgress(ev->Sender.NodeId());
Execute(CreateSyncTablets(ev->Sender, ev->Get()->Record));
}

Expand Down Expand Up @@ -742,7 +745,10 @@ void THive::Handle(TEvInterconnect::TEvNodeConnected::TPtr &ev) {
void THive::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) {
TNodeId nodeId = ev->Get()->NodeId;
BLOG_W("Handle TEvInterconnect::TEvNodeDisconnected, NodeId " << nodeId);
ConnectedNodes.erase(nodeId);
RemoveFromPingInProgress(nodeId);
if (ConnectedNodes.erase(nodeId)) {
UpdateCounterNodesConnected(-1);
}
Execute(CreateDisconnectNode(THolder<TEvInterconnect::TEvNodeDisconnected>(ev->Release().Release())));
}

Expand Down Expand Up @@ -912,6 +918,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
case TEvLocal::EvPing: {
TNodeId nodeId = ev->Cookie;
TNodeInfo* node = FindNode(nodeId);
NodePingsInProgress.erase(nodeId);
if (node != nullptr && ev->Sender == node->Local) {
if (node->IsDisconnecting()) {
// ping continiousily until we fully disconnected from the node
Expand All @@ -920,6 +927,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
KillNode(node->Id, node->Local);
}
}
ProcessNodePingQueue();
break;
}
};
Expand Down Expand Up @@ -1684,6 +1692,13 @@ void THive::UpdateCounterNodesConnected(i64 nodesConnectedDiff) {
}
}

void THive::UpdateCounterPingQueueSize() {
if (TabletCounters != nullptr) {
auto& counter = TabletCounters->Simple()[NHive::COUNTER_PINGQUEUE_SIZE];
counter.Set(NodePingQueue.size());
}
}

void THive::RecordTabletMove(const TTabletMoveInfo& moveInfo) {
TabletMoveHistory.PushBack(moveInfo);
TabletCounters->Cumulative()[NHive::COUNTER_TABLETS_MOVED].Increment(1);
Expand Down Expand Up @@ -2648,6 +2663,25 @@ void THive::ExecuteStartTablet(TFullTabletId tabletId, const TActorId& local, ui
Execute(CreateStartTablet(tabletId, local, cookie, external));
}

void THive::QueuePing(const TActorId& local) {
NodePingQueue.push(local);
}

void THive::ProcessNodePingQueue() {
while (!NodePingQueue.empty() && NodePingsInProgress.size() < GetMaxPingsInFlight()) {
TActorId local = NodePingQueue.front();
TNodeId node = local.NodeId();
NodePingQueue.pop();
NodePingsInProgress.insert(node);
SendPing(local, node);
}
}

void THive::RemoveFromPingInProgress(TNodeId node) {
NodePingsInProgress.erase(node);
ProcessNodePingQueue();
}

void THive::SendPing(const TActorId& local, TNodeId id) {
Send(local,
new TEvLocal::TEvPing(HiveId,
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/mind/hive/hive_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveShar
TOwnershipKeeper Keeper;
TEventPriorityQueue<THive> EventQueue{*this};
std::vector<TActorId> ActorsWaitingToMoveTablets;
std::queue<TActorId> NodePingQueue;
std::unordered_set<TNodeId> NodePingsInProgress;

struct TPendingCreateTablet {
NKikimrHive::TEvCreateTablet CreateTablet;
Expand Down Expand Up @@ -642,6 +644,7 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
void UpdateCounterBootQueueSize(ui64 bootQueueSize);
void UpdateCounterEventQueueSize(i64 eventQueueSizeDiff);
void UpdateCounterNodesConnected(i64 nodesConnectedDiff);
void UpdateCounterPingQueueSize();
void RecordTabletMove(const TTabletMoveInfo& info);
bool DomainHasNodes(const TSubDomainKey &domainKey) const;
void ProcessBootQueue();
Expand Down Expand Up @@ -670,7 +673,10 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
void UpdateRegisteredDataCenters();
void AddRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId);
void RemoveRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId);
void QueuePing(const TActorId& local);
void SendPing(const TActorId& local, TNodeId id);
void RemoveFromPingInProgress(TNodeId node);
void ProcessNodePingQueue();
void SendReconnect(const TActorId& local);
static THolder<TGroupFilter> BuildGroupParametersForChannel(const TLeaderTabletInfo& tablet, ui32 channelId);
void KickTablet(const TTabletInfo& tablet);
Expand Down Expand Up @@ -923,6 +929,10 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
return CurrentConfig.GetStorageBalancerInflight();
}

ui64 GetMaxPingsInFlight() const {
return CurrentConfig.GetMaxPingsInFlight();
}

static void ActualizeRestartStatistics(google::protobuf::RepeatedField<google::protobuf::uint64>& restartTimestamps, ui64 barrier);
static ui64 GetRestartsPerPeriod(const google::protobuf::RepeatedField<google::protobuf::uint64>& restartTimestamps, ui64 barrier);
static bool IsSystemTablet(TTabletTypes::EType type);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/mind/hive/node_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ void TNodeInfo::DeregisterInDomains() {
void TNodeInfo::Ping() {
Y_ABORT_UNLESS((bool)Local);
BLOG_D("Node(" << Id << ") Ping(" << Local << ")");
Hive.SendPing(Local, Id);
Hive.QueuePing(Local);
}

void TNodeInfo::SendReconnect(const TActorId& local) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/mind/hive/tx__register_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ class TTxRegisterNode : public TTransactionBase<THive> {
BLOG_D("THive::TTxRegisterNode(" << Local.NodeId() << ")::Complete");
TNodeInfo* node = Self->FindNode(Local.NodeId());
if (node != nullptr && node->Local) { // we send ping on every RegisterNode because we want to re-sync tablets upon every reconnection
Self->NodePingsInProgress.erase(node->Id);
node->Ping();
Self->ProcessNodePingQueue();
}
}
};
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1479,6 +1479,7 @@ message THiveConfig {
optional double MinStorageScatterToBalance = 71 [default = 999]; // storage balancer trigger is disabled by default
optional double MinGroupUsageToBalance = 72 [default = 0.1];
optional uint64 StorageBalancerInflight = 73 [default = 1];
optional uint64 MaxPingsInFlight = 78 [default = 1000];
}

message TColumnShardConfig {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/counters_hive.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ enum ESimpleCounters {
COUNTER_IMBALANCED_OBJECTS = 19 [(CounterOpts) = {Name: "ImbalancedObjects"}];
COUNTER_WORST_OBJECT_VARIANCE = 20 [(CounterOpts) = {Name: "WorstObjectVariance"}];
COUNTER_STORAGE_SCATTER = 21 [(CounterOpts) = {Name: "StorageScatter"}];
COUNTER_PINGQUEUE_SIZE = 23 [(CounterOpts) = {Name: "PingQueueSize"}];
}

enum ECumulativeCounters {
Expand Down

0 comments on commit 33ba19d

Please sign in to comment.