diff --git a/src/net.h b/src/net.h index f53113287..de3b3ef4b 100644 --- a/src/net.h +++ b/src/net.h @@ -50,6 +50,13 @@ static const unsigned int MAX_INV_SZ = 50000; static const unsigned int MAX_LOCATOR_SZ = 101; /** The maximum number of new addresses to accumulate before announcing. */ static const unsigned int MAX_ADDR_TO_SEND = 1000; +/** The maximum rate of address records we're willing to process on average. Can be bypassed using + * the NetPermissionFlags::Addr permission. */ +static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1}; +/** The soft limit of the address processing token bucket (the regular MAX_ADDR_RATE_PER_SECOND + * based increments won't go above this, but the MAX_ADDR_TO_SEND increment following GETADDR + * is exempt from this limit. */ +static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND}; /** Maximum length of incoming protocol messages (no message over 4 MB is currently acceptable). */ static const unsigned int MAX_PROTOCOL_MESSAGE_LENGTH = 4 * 1000 * 1000; /** Maximum length of strSubVer in `version` message */ diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 1c23136a8..291f64484 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -181,6 +181,15 @@ namespace { std::deque> vRelayExpiration GUARDED_BY(cs_main); std::atomic nTimeBestReceived(0); // Used only to inform the wallet of when we last received a block + /** Number of addrsses that can be processed from this peer. Start at 1 to + * permit self-announcement. */ + double m_addr_token_bucket{1.0}; + /** When m_addr_token_bucket was last updated */ + std::chrono::microseconds m_addr_token_timestamp{GetTimeMicros()}; + /** Total number of addresses that were dropped due to rate limiting. */ + std::atomic m_addr_rate_limited{0}; + /** Total number of addresses that were processed (excludes rate limited ones). */ + std::atomic m_addr_processed{0}; struct IteratorComparator { @@ -297,6 +306,7 @@ struct CNodeState { //! Time of last new block announcement int64_t m_last_block_announcement; + CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) { fCurrentlyConnected = false; nMisbehavior = 0; @@ -692,6 +702,8 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { if (queue.pindex) stats.vHeightInFlight.push_back(queue.pindex->nHeight); } + stats.m_addr_processed = m_addr_processed.load(); + stats.m_addr_rate_limited = m_addr_rate_limited.load(); return true; } @@ -1853,6 +1865,9 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr pfrom->fGetAddr = true; } connman->MarkAddressGood(pfrom->addr); + // When requesting a getaddr, accept an additional MAX_ADDR_TO_SEND addresses in response + // (bypassing the MAX_ADDR_PROCESSING_TOKEN_BUCKET limit). + m_addr_token_bucket += MAX_ADDR_TO_SEND; } std::string remoteAddr; @@ -1940,9 +1955,9 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr vRecv >> vAddr; // Don't want addr from older versions unless seeding - if (connman->GetAddressCount() > 1000) + if (connman->GetAddressCount() > MAX_ADDR_TO_SEND) return true; - if (vAddr.size() > 1000) + if (vAddr.size() > MAX_ADDR_TO_SEND) { LOCK(cs_main); Misbehaving(pfrom->GetId(), 20, strprintf("message addr size() = %u", vAddr.size())); @@ -1951,24 +1966,42 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // Store the new addresses std::vector vAddrOk; - int64_t nNow = GetAdjustedTime(); - int64_t nSince = nNow - 10 * 60; - for (CAddress& addr : vAddr) - { + // Update/increment addr rate limiting bucket. + const auto current_time(GetTimeMicros(true)); + if (m_addr_token_bucket < MAX_ADDR_PROCESSING_TOKEN_BUCKET) { + // Don't increment bucket if it's already full + const std::chrono::microseconds time_diff{std::max(current_time - m_addr_token_timestamp.count(), int64_t(0))}; + const double increment = std::chrono::duration_cast< std::chrono::duration>(time_diff).count() * MAX_ADDR_RATE_PER_SECOND; + m_addr_token_bucket = std::min(m_addr_token_bucket + increment, MAX_ADDR_PROCESSING_TOKEN_BUCKET); + } + m_addr_token_timestamp = std::chrono::microseconds(current_time); + + uint64_t num_proc = 0; + uint64_t num_rate_limit = 0; + Shuffle(vAddr.begin(), vAddr.end(), FastRandomContext()); + for (CAddress& addr : vAddr) { if (interruptMsgProc) return true; + // Apply rate limiting to all peers + if (m_addr_token_bucket < 1.0) { + ++num_rate_limit; + continue; + } else { + m_addr_token_bucket -= 1.0; + } // We only bother storing full nodes, though this may include - // things which we would not make an outbound connection to, in - // part because we may make feeler connections to them. + // things which we would not make an outbound connection to, if (!MayHaveUsefulAddressDB(addr.nServices) && !HasAllDesirableServiceFlags(addr.nServices)) continue; - if (addr.nTime <= 100000000 || addr.nTime > nNow + 10 * 60) - addr.nTime = nNow - 5 * 24 * 60 * 60; + if (addr.nTime <= 100000000 || addr.nTime > std::chrono::seconds(current_time).count() + 10 * 60) + addr.nTime = current_time - 5 * 24 * 60 * 60; pfrom->AddAddressKnown(addr); + ++num_proc; bool fReachable = IsReachable(addr); - if (addr.nTime > nSince && !pfrom->fGetAddr && vAddr.size() <= 10 && addr.IsRoutable()) + const auto relay_time = std::chrono::duration_cast(std::chrono::microseconds(current_time)) - std::chrono::seconds(10 * 60); + if (addr.nTime > relay_time.count() && !pfrom->fGetAddr && vAddr.size() <= 10 && addr.IsRoutable()) { // Relay to a limited number of other nodes RelayAddress(addr, fReachable, connman); @@ -1977,6 +2010,14 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr if (fReachable) vAddrOk.push_back(addr); } + m_addr_processed += num_proc; + m_addr_rate_limited += num_rate_limit; + LogPrint(BCLog::NET, "Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n", + vAddr.size(), + num_proc, + num_rate_limit, + pfrom->GetId()); + connman->AddNewAddresses(vAddrOk, pfrom->addr, 2 * 60 * 60); if (vAddr.size() < 1000) pfrom->fGetAddr = false; @@ -3341,7 +3382,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) pto->addrKnown.insert(addr.GetKey()); vAddr.push_back(addr); // receiver rejects addr messages larger than 1000 - if (vAddr.size() >= 1000) + if (vAddr.size() >= MAX_ADDR_TO_SEND) { connman->PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr)); vAddr.clear(); diff --git a/src/net_processing.h b/src/net_processing.h index 11d49f513..4c7876b98 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -76,6 +76,8 @@ struct CNodeStateStats { int nSyncHeight = -1; int nCommonHeight = -1; std::vector vHeightInFlight; + uint64_t m_addr_processed = 0; + uint64_t m_addr_rate_limited = 0; }; /** Get statistics from node state */ diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index 2017873a1..87cb54bad 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -170,6 +170,8 @@ static UniValue getpeerinfo(const JSONRPCRequest& request) heights.push_back(height); } obj.pushKV("inflight", heights); + obj.pushKV("addr_processed", statestats.m_addr_processed); + obj.pushKV("addr_rate_limited", statestats.m_addr_rate_limited); } obj.pushKV("whitelisted", stats.fWhitelisted); diff --git a/test/functional/p2p_addr_relay.py b/test/functional/p2p_addr_relay.py new file mode 100644 index 000000000..dba18327c --- /dev/null +++ b/test/functional/p2p_addr_relay.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python3 +# Copyright (c) 2020-2021 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +""" +Test addr relay +""" + +import random +import time + +from test_framework.messages import ( + CAddress, + msg_addr, + msg_getaddr, + msg_verack, + msg_headers, + NODE_NETWORK +) +from test_framework.mininode import ( + P2PInterface, + mininode_lock, + +) +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import ( + assert_equal, + assert_greater_than, + assert_greater_than_or_equal, + wait_until +) +#from test_framework.test_node import assert_equal + +ONE_MINUTE = 60 +TEN_MINUTES = 10 * ONE_MINUTE +ONE_HOUR = 60 * ONE_MINUTE +TWO_HOURS = 2 * ONE_HOUR +ONE_DAY = 24 * ONE_HOUR + +class AddrReceiver(P2PInterface): + num_ipv4_received = 0 + test_addr_contents = False + _tokens = 1 + send_getaddr = True + + def __init__(self, time_to_connect, test_addr_contents=False, send_getaddr=True): + super().__init__(time_to_connect) + self.test_addr_contents = test_addr_contents + self.send_getaddr = send_getaddr + + def on_addr(self, message): + for addr in message.addrs: + self.num_ipv4_received += 1 + if self.test_addr_contents: + # relay_tests checks the content of the addr messages match + # expectations based on the message creation in setup_addr_msg + assert_equal(addr.nServices, 1) + if not 8333 <= addr.port < 8343: + raise AssertionError("Invalid addr.port of {} (8333-8342 expected)".format(addr.port)) + assert addr.ip.startswith('123.123.') + + def on_getaddr(self, message): + # When the node sends us a getaddr, it increments the addr relay tokens for the connection by 1000 + self._tokens += 1000 + + @property + def tokens(self): + with mininode_lock: + return self._tokens + + def increment_tokens(self, n): + # When we move mocktime forward, the node increments the addr relay tokens for its peers + with mininode_lock: + self._tokens += n + + def addr_received(self): + return self.num_ipv4_received != 0 + + def on_version(self, message): + #self.send_version() + self.send_message(msg_verack()) + if (self.send_getaddr): + self.send_message(msg_getaddr()) + + def getaddr_received(self): + return self.message_count['getaddr'] > 0 + + +class AddrTest(BitcoinTestFramework): + counter = 0 + mocktime = int(time.time()) + + def set_test_params(self): + self.num_nodes = 1 + self.extra_args = [["-whitelist=127.0.0.1"]] + + def run_test(self): + self.oversized_addr_test() + self.relay_tests() + self.destination_rotates_once_in_24_hours_test() + self.blocksonly_mode_tests() + self.rate_limit_tests() + + def setup_addr_msg(self, num, sequential_ips=True): + addrs = [] + for i in range(num): + addr = CAddress() + addr.time = self.mocktime + random.randrange(-100, 100) + addr.nServices = NODE_NETWORK + if sequential_ips: + assert self.counter < 256 ** 2 # Don't allow the returned ip addresses to wrap. + addr.ip = f"123.123.{self.counter // 256}.{self.counter % 256}" + self.counter += 1 + else: + addr.ip = f"{random.randrange(128,169)}.{random.randrange(1,255)}.{random.randrange(1,255)}.{random.randrange(1,255)}" + addr.port = 8333 + i + addrs.append(addr) + + msg = msg_addr() + msg.addrs = addrs + return msg + + def send_addr_msg(self, source, msg, receivers): + source.send_and_ping(msg) + # invoke m_next_addr_send timer: + # `addr` messages are sent on an exponential distribution with mean interval of 30s. + # Setting the mocktime 600s forward gives a probability of (1 - e^-(600/30)) that + # the event will occur (i.e. this fails once in ~500 million repeats). + self.mocktime += 10 * 60 + self.nodes[0].setmocktime(self.mocktime) + for peer in receivers: + peer.sync_with_ping() + + def oversized_addr_test(self): + self.log.info('Send an addr message that is too large') + addr_source = self.nodes[0].add_p2p_connection(P2PInterface(self.nodes[0].time_to_connect)) + + msg = self.setup_addr_msg(1010) + for i in range(0,11): + addr_source.send_message(msg) + self.nodes[0].assert_debug_log(['Warning: not banning local peer']) + + self.nodes[0].disconnect_p2ps() + + def relay_tests(self): + self.log.info('Test address relay') + self.log.info('Check that addr message content is relayed and added to addrman') + addr_source = self.nodes[0].add_p2p_connection(P2PInterface(self.nodes[0].time_to_connect)) + num_receivers = 7 + receivers = [] + for _ in range(num_receivers): + receivers.append(self.nodes[0].add_p2p_connection(AddrReceiver(self.nodes[0].time_to_connect, test_addr_contents=True))) + + # Keep this with length <= 10. Addresses from larger messages are not + # relayed. + num_ipv4_addrs = 10 + msg = self.setup_addr_msg(num_ipv4_addrs) + self.send_addr_msg(addr_source, msg, receivers) + + # Every IPv4 address must be relayed to one/ two peers, other than the + # originating node (addr_source). + peerinfo = self.nodes[0].getpeerinfo()[0] + addrs_processed = peerinfo['addr_processed'] + addrs_rate_limited = peerinfo['addr_rate_limited'] + self.log.debug(f"addrs_processed = {addrs_processed}, addrs_rate_limited = {addrs_rate_limited}") + + assert_equal(addrs_processed, 1) + assert_equal(7, sum(r.tokens for r in receivers)) + assert_equal(addrs_rate_limited, 9) + + self.nodes[0].disconnect_p2ps() + + def sum_addr_messages(self, msgs_dict): + return sum(bytes_received for (msg, bytes_received) in msgs_dict.items() if msg in ['addr', 'getaddr']) + + def blocksonly_mode_tests(self): + self.log.info('Test addr relay in -blocksonly mode') + self.restart_node(0, ["-blocksonly", "-whitelist=127.0.0.1"]) + self.mocktime = int(time.time()) + + self.log.info('Check that we send getaddr messages') + full_outbound_peer = self.nodes[0].add_p2p_connection(AddrReceiver(self.nodes[0].time_to_connect)) + full_outbound_peer.sync_with_ping() + assert not full_outbound_peer.getaddr_received() + + self.log.info('Check that we relay address messages') + addr_source = self.nodes[0].add_p2p_connection(P2PInterface(self.nodes[0].time_to_connect)) + msg = self.setup_addr_msg(2) + self.send_addr_msg(addr_source, msg, [full_outbound_peer]) + + self.nodes[0].disconnect_p2ps() + + def send_addrs_and_test_rate_limiting(self, peer, new_addrs, total_addrs): + """Send an addr message and check that the number of addresses processed and rate-limited is as expected""" + + peer.send_and_ping(self.setup_addr_msg(new_addrs, sequential_ips=False)) + + peerinfo = self.nodes[0].getpeerinfo()[0] + addrs_processed = peerinfo['addr_processed'] + addrs_rate_limited = peerinfo['addr_rate_limited'] + self.log.debug(f"addrs_processed = {addrs_processed}, addrs_rate_limited = {addrs_rate_limited}") + + assert_equal(addrs_processed, min(total_addrs, peer.tokens)) + assert_equal(addrs_rate_limited, max(0, total_addrs - peer.tokens)) + + def rate_limit_tests(self): + self.mocktime = int(time.time()) + self.restart_node(0, []) + self.nodes[0].setmocktime(self.mocktime) + + self.log.info(f'Test rate limiting of addr processing for peers') + peer = self.nodes[0].add_p2p_connection(AddrReceiver(self.nodes[0].time_to_connect)) + + # Send 600 addresses. + self.send_addrs_and_test_rate_limiting(peer, new_addrs=600, total_addrs=600) + + # Send 600 more addresses. + self.send_addrs_and_test_rate_limiting(peer, new_addrs=600, total_addrs=1200) + + # Send 10 more. As we reached the processing limit for all nodes, no more addresses should be procesesd. + self.send_addrs_and_test_rate_limiting(peer, new_addrs=10, total_addrs=1210) + + # Advance the time by 100 seconds, permitting the processing of 10 more addresses. + # Send 200 and verify that 10 are processed. + self.mocktime += 100 + self.nodes[0].setmocktime(self.mocktime) + peer.increment_tokens(10) + + self.send_addrs_and_test_rate_limiting(peer, new_addrs=200, total_addrs=1410) + + # Advance the time by 1000 seconds, permitting the processing of 100 more addresses. + # Send 200 and verify that 100 are processed. + self.mocktime += 1000 + self.nodes[0].setmocktime(self.mocktime) + peer.increment_tokens(100) + + self.send_addrs_and_test_rate_limiting(peer, new_addrs=200, total_addrs=1610) + + self.nodes[0].disconnect_p2ps() + + def get_nodes_that_received_addr(self, peer, receiver_peer, addr_receivers, + time_interval_1, time_interval_2): + + # Clean addr response related to the initial getaddr. There is no way to avoid initial + # getaddr because the peer won't self-announce then. + for addr_receiver in addr_receivers: + addr_receiver.num_ipv4_received = 0 + + for _ in range(10): + self.mocktime += time_interval_1 + self.msg.addrs[0].time = self.mocktime + TEN_MINUTES + self.nodes[0].setmocktime(self.mocktime) + self.nodes[0].assert_debug_log(['received: addr (31 bytes) peer=0']) + peer.send_and_ping(self.msg) + self.mocktime += time_interval_2 + self.nodes[0].setmocktime(self.mocktime) + receiver_peer.sync_with_ping() + return [node for node in addr_receivers if node.addr_received()] + + def destination_rotates_once_in_24_hours_test(self): + self.restart_node(0, []) + + self.log.info('Test within 24 hours an addr relay destination is rotated at most once') + self.mocktime = int(time.time()) + self.msg = self.setup_addr_msg(1) + self.addr_receivers = [] + peer = self.nodes[0].add_p2p_connection(P2PInterface(self.nodes[0].time_to_connect)) + receiver_peer = self.nodes[0].add_p2p_connection(AddrReceiver(self.nodes[0].time_to_connect)) + addr_receivers = [self.nodes[0].add_p2p_connection(AddrReceiver(self.nodes[0].time_to_connect)) for _ in range(20)] + nodes_received_addr = self.get_nodes_that_received_addr(peer, receiver_peer, addr_receivers, 0, TWO_HOURS) # 10 intervals of 2 hours + # Per RelayAddress, we would announce these addrs to 1 0r 2 destinations per day. + assert_greater_than_or_equal(2, len(nodes_received_addr)) + self.nodes[0].disconnect_p2ps() + + +if __name__ == '__main__': + AddrTest().main() diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py index df8e62655..d2d3f1a89 100755 --- a/test/functional/test_framework/messages.py +++ b/test/functional/test_framework/messages.py @@ -42,6 +42,10 @@ BIP125_SEQUENCE_NUMBER = 0xfffffffd # Sequence number that is BIP 125 opt-in and BIP 68-opt-out +MAX_PROTOCOL_MESSAGE_LENGTH = 4000000 # Maximum length of incoming protocol messages +MAX_HEADERS_RESULTS = 2000 # Number of headers sent in one getheaders result +MAX_INV_SIZE = 50000 # Maximum number of entries in an 'inv' protocol message + NODE_NETWORK = (1 << 0) # NODE_GETUTXO = (1 << 1) NODE_BLOOM = (1 << 2) diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py index 18222b725..90ce6d916 100755 --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/mininode.py @@ -23,7 +23,7 @@ import threading from test_framework.messages import CBlockHeader, MIN_VERSION_SUPPORTED, msg_addr, msg_block, MSG_BLOCK, msg_blocktxn, msg_cmpctblock, msg_feefilter, msg_getaddr, msg_getblocks, msg_getblocktxn, msg_getdata, msg_getheaders, msg_headers, msg_inv, msg_mempool, msg_ping, msg_pong, msg_reject, msg_sendcmpct, msg_sendheaders, msg_tx, MSG_TX, MSG_TYPE_MASK, msg_verack, msg_version, NODE_NETWORK, NODE_WITNESS, sha256, ToHex, msg_notfound -from test_framework.util import wait_until, NetworkDirName, MagicBytes +from test_framework.util import wait_until, NetworkDirName, MagicBytes, MAX_NODES, PORT_MIN, p2p_port logger = logging.getLogger("TestFramework.mininode") @@ -69,11 +69,21 @@ def __init__(self): # The underlying transport of the connection. # Should only call methods on this from the NetworkThread, c.f. call_soon_threadsafe self._transport = None + self.reconnect = False # set if reconnection needs to happen @property def is_connected(self): return self._transport is not None + def peer_connect_helper(self, dstaddr, dstport, net): + assert not self.is_connected + self.dstaddr = dstaddr + self.dstport = dstport + # The initial message to send after the connection was made: + self.on_connection_send_msg = None + self.recvbuf = b"" + self.magic_bytes = MagicBytes(net) + def peer_connect(self, dstaddr, dstport, net): assert not self.is_connected self.dstaddr = dstaddr @@ -89,6 +99,12 @@ def peer_connect(self, dstaddr, dstport, net): conn_gen = lambda: loop.call_soon_threadsafe(loop.create_task, conn_gen_unsafe) return conn_gen + def peer_accept_connection(self, connect_id, connect_cb=lambda: None, *, net): + self.peer_connect_helper('0', 0, net) + + logger.debug('Listening for Tapyrus Node with id: {}'.format(connect_id)) + return lambda: NetworkThread.listen(self, connect_cb, idx=connect_id) + def peer_disconnect(self): # Connection could have already been closed by other end. NetworkThread.network_event_loop.call_soon_threadsafe(lambda: self._transport and self._transport.abort()) @@ -173,7 +189,7 @@ def send_message(self, message): if not self.is_connected: raise IOError('Not connected') self._log_message("send", message) - tmsg = self._build_message(message) + tmsg = self.build_message(message) def maybe_write(): if not self._transport: @@ -186,9 +202,22 @@ def maybe_write(): self._transport.write(tmsg) NetworkThread.network_event_loop.call_soon_threadsafe(maybe_write) + + def send_raw_message(self, raw_message_bytes): + if not self.is_connected: + raise IOError('Not connected') + + def maybe_write(): + if not self._transport: + return + if self._transport.is_closing(): + return + self._transport.write(raw_message_bytes) + NetworkThread.network_event_loop.call_soon_threadsafe(maybe_write) + # Class utility methods - def _build_message(self, message): + def build_message(self, message): """Build a serialized P2P message""" command = message.command data = message.serialize() @@ -320,11 +349,19 @@ def on_version(self, message): self.nServices = message.nServices # Connection helper methods + def wait_for_connect(self, timeout=60): + test_function = lambda: self.is_connected + self.wait_until(test_function, timeout=timeout, check_connected=False) def wait_for_disconnect(self, timeout=60): test_function = lambda: not self.is_connected wait_until(test_function, timeout=timeout, lock=mininode_lock) + def wait_for_reconnect(self, timeout=60): + def test_function(): + return self.is_connected and self.last_message.get('version') + self.wait_until(test_function, timeout=timeout, check_connected=False) + # Message receiving helper methods def wait_for_block(self, blockhash, timeout=60): @@ -380,6 +417,10 @@ def wait_for_verack(self, timeout=60): wait_until(test_function, timeout=timeout, lock=mininode_lock) # Message sending helper functions + def send_version(self): + if self.on_connection_send_msg: + self.send_message(self.on_connection_send_msg) + self.on_connection_send_msg = None # Never used again def send_and_ping(self, message): self.send_message(message) @@ -409,6 +450,9 @@ def __init__(self): # There is only one event loop and no more than one thread must be created assert not self.network_event_loop + NetworkThread.listeners = {} + NetworkThread.protos = {} + NetworkThread.network_event_loop = asyncio.new_event_loop() def run(self): @@ -421,6 +465,55 @@ def close(self, timeout=10): wait_until(lambda: not self.network_event_loop.is_running(), timeout=timeout) self.network_event_loop.close() self.join(timeout) + NetworkThread.network_event_loop = None + + @classmethod + def listen(cls, p2p, callback, port=None, addr=None, idx=1): + """ Ensure a listening server is running on the given port, and run the + protocol specified by `p2p` on the next connection to it. Once ready + for connections, call `callback`.""" + + if port is None: + assert 0 < idx <= MAX_NODES + port = p2p_port(MAX_NODES - idx) + if addr is None: + addr = '127.0.0.1' + + def exception_handler(loop, context): + if not p2p.reconnect: + loop.default_exception_handler(context) + + cls.network_event_loop.set_exception_handler(exception_handler) + coroutine = cls.create_listen_server(addr, port, callback, p2p) + cls.network_event_loop.call_soon_threadsafe(cls.network_event_loop.create_task, coroutine) + + @classmethod + async def create_listen_server(cls, addr, port, callback, proto): + def peer_protocol(): + """Returns a function that does the protocol handling for a new + connection. To allow different connections to have different + behaviors, the protocol function is first put in the cls.protos + dict. When the connection is made, the function removes the + protocol function from that dict, and returns it so the event loop + can start executing it.""" + response = cls.protos.get((addr, port)) + # remove protocol function from dict only when reconnection doesn't need to happen/already happened + if not proto.reconnect: + cls.protos[(addr, port)] = None + return response + + if (addr, port) not in cls.listeners: + # When creating a listener on a given (addr, port) we only need to + # do it once. If we want different behaviors for different + # connections, we can accomplish this by providing different + # `proto` functions + + listener = await cls.network_event_loop.create_server(peer_protocol, addr, port) + logger.debug("Listening server on %s:%d should be started" % (addr, port)) + cls.listeners[(addr, port)] = listener + + cls.protos[(addr, port)] = proto + callback(addr, port) class P2PDataStore(P2PInterface): diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index fb6029a3e..c7fdf554c 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -22,6 +22,7 @@ from .authproxy import JSONRPCException from .util import ( append_config, + assert_equal, delete_cookie_file, get_rpc_proxy, rpc_url, @@ -312,6 +313,36 @@ def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, **kwargs): return p2p_conn + def assert_debug_log(self, expected_msgs, unexpected_msgs=None, timeout=2): + if unexpected_msgs is None: + unexpected_msgs = [] + assert_equal(type(expected_msgs), list) + assert_equal(type(unexpected_msgs), list) + + time_end = time.time() + timeout * self.timeout_factor + prev_size = self.debug_log_size(encoding="utf-8") # Must use same encoding that is used to read() below + + yield + + while True: + found = True + with open(self.debug_log_path, encoding="utf-8", errors="replace") as dl: + dl.seek(prev_size) + log = dl.read() + print_log = " - " + "\n - ".join(log.splitlines()) + for unexpected_msg in unexpected_msgs: + if re.search(re.escape(unexpected_msg), log, flags=re.MULTILINE): + self._raise_assertion_error('Unexpected message "{}" partially matches log:\n\n{}\n\n'.format(unexpected_msg, print_log)) + for expected_msg in expected_msgs: + if re.search(re.escape(expected_msg), log, flags=re.MULTILINE) is None: + found = False + if found: + return + if time.time() >= time_end: + break + time.sleep(0.05) + self._raise_assertion_error('Expected messages "{}" does not partially match log:\n\n{}\n\n'.format(str(expected_msgs), print_log)) + @property def p2p(self): """Return the first p2p connection diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index 7a55db6fd..034e21d25 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -76,6 +76,7 @@ 'wallet_listtransactions.py', # vv Tests less than 60s vv 'p2p_sendheaders.py', + 'p2p_addr_relay.py', 'wallet_zapwallettxes.py', 'wallet_importmulti.py', 'rpc_txoutproof.py',