From 4d497f44efdb4234c48110561d0f3aedeeeb7446 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Mon, 26 Aug 2024 17:47:27 +0000 Subject: [PATCH] partial bitcoin#27981: Fix potential network stalling bug To allow for the removal of a node from `vReceivableNodes`, the collection of node pointers have been made into an `std::set`. Marking as partial as it should be revisited when bitcoin#24356 is backported. --- src/net.cpp | 59 +++++++++++++++++++++++++++++++---------------------- src/net.h | 5 ++++- 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 97511ad015dffe..79afe44b450253 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -940,7 +940,7 @@ void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vec CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr}; } -size_t CConnman::SocketSendData(CNode& node) +std::pair CConnman::SocketSendData(CNode& node) const { auto it = node.vSendMsg.begin(); size_t nSentSize = 0; @@ -998,7 +998,7 @@ size_t CConnman::SocketSendData(CNode& node) } node.vSendMsg.erase(node.vSendMsg.begin(), it); node.nSendMsgSize = node.vSendMsg.size(); - return nSentSize; + return {nSentSize, !node.vSendMsg.empty()}; } static bool ReverseCompareNodeMinPingTime(const NodeEvictionCandidate& a, const NodeEvictionCandidate& b) @@ -1715,10 +1715,10 @@ bool CConnman::GenerateSelectSet(const std::vector& nodes, recv_set.insert(hListenSocket.sock->Get()); } - for (CNode* pnode : nodes) - { + for (CNode* pnode : nodes) { bool select_recv = !pnode->fHasRecvData; bool select_send = !pnode->fCanSendData; + if (!select_recv && !select_send) continue; LOCK(pnode->m_sock_mutex); if (!pnode->m_sock) { @@ -2031,9 +2031,9 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, if (interruptNet) return; - std::vector vErrorNodes; - std::vector vReceivableNodes; - std::vector vSendableNodes; + std::set vErrorNodes; + std::set vReceivableNodes; + std::set vSendableNodes; { LOCK(cs_mapSocketToNode); for (auto hSocket : error_set) { @@ -2042,7 +2042,7 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, continue; } it->second->AddRef(); - vErrorNodes.emplace_back(it->second); + vErrorNodes.emplace(it->second); } for (auto hSocket : recv_set) { if (error_set.count(hSocket)) { @@ -2077,7 +2077,6 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, { LOCK(cs_sendable_receivable_nodes); - vReceivableNodes.reserve(mapReceivableNodes.size()); for (auto it = mapReceivableNodes.begin(); it != mapReceivableNodes.end(); ) { if (!it->second->fHasRecvData) { it = mapReceivableNodes.erase(it); @@ -2092,7 +2091,7 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, // receiving data (which should succeed as the socket signalled as receivable). if (!it->second->fPauseRecv && it->second->nSendMsgSize == 0 && !it->second->fDisconnect) { it->second->AddRef(); - vReceivableNodes.emplace_back(it->second); + vReceivableNodes.emplace(it->second); } ++it; } @@ -2103,7 +2102,6 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, // also clean up mapNodesWithDataToSend from nodes that had messages to send in the last iteration // but don't have any in this iteration LOCK(cs_mapNodesWithDataToSend); - vSendableNodes.reserve(mapNodesWithDataToSend.size()); for (auto it = mapNodesWithDataToSend.begin(); it != mapNodesWithDataToSend.end(); ) { if (it->second->nSendMsgSize == 0) { // See comment in PushMessage @@ -2112,13 +2110,36 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, } else { if (it->second->fCanSendData) { it->second->AddRef(); - vSendableNodes.emplace_back(it->second); + vSendableNodes.emplace(it->second); } ++it; } } } + for (CNode* pnode : vSendableNodes) { + if (interruptNet) { + break; + } + + // Send data + auto [bytes_sent, data_left] = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode)); + if (bytes_sent) { + RecordBytesSent(bytes_sent); + + // If both receiving and (non-optimistic) sending were possible, we first attempt + // sending. If that succeeds, but does not fully drain the send queue, do not + // attempt to receive. This avoids needlessly queueing data if the remote peer + // is slow at receiving data, by means of TCP flow control. We only do this when + // sending actually succeeded to make sure progress is always made; otherwise a + // deadlock would be possible when both sides have data to send, but neither is + // receiving. + if (data_left) { + vReceivableNodes.erase(pnode); + } + } + } + for (CNode* pnode : vErrorNodes) { if (interruptNet) { @@ -2140,16 +2161,6 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, SocketRecvData(pnode); } - for (CNode* pnode : vSendableNodes) { - if (interruptNet) { - break; - } - - // Send data - size_t bytes_sent = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode)); - if (bytes_sent) RecordBytesSent(bytes_sent); - } - for (auto& node : vErrorNodes) { node->Release(); } @@ -4187,7 +4198,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { LOCK(pnode->cs_vSend); - bool hasPendingData = !pnode->vSendMsg.empty(); + bool optimisticSend(pnode->vSendMsg.empty()); //log total amount of bytes per message type pnode->mapSendBytesPerMsgType[msg.m_type] += nTotalSize; @@ -4210,7 +4221,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) } // wake up select() call in case there was no pending data before (so it was not selecting this socket for sending) - if (!hasPendingData && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load())) + if (optimisticSend && (m_wakeup_pipe && m_wakeup_pipe->m_need_wakeup.load())) m_wakeup_pipe->Write(); } } diff --git a/src/net.h b/src/net.h index 4e1ea87b7fe9bd..70dd0d5fab7210 100644 --- a/src/net.h +++ b/src/net.h @@ -1387,8 +1387,11 @@ friend class CNode; NodeId GetNewNodeId(); - size_t SocketSendData(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend); + /** (Try to) send data from node's vSendMsg. Returns (bytes_sent, data_left). */ + std::pair SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend); + size_t SocketRecvData(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); + void DumpAddresses(); // Network stats