From 548692ee5f73dac3e10b696b099a61a7d5efb183 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 18 May 2023 13:39:17 -0500 Subject: [PATCH 01/16] GH-1072 Use better heuristic for determing who to sync from --- plugins/net_plugin/net_plugin.cpp | 54 ++++++++++++++++++------------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 6edabb35b5..ef1df0212d 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -386,7 +386,7 @@ namespace eosio { vector connection_statuses() const; // return the next connection after current in collection that has blocks above - connection_ptr round_robin_next(const connection_ptr& current, uint32_t sync_known_lib_num) const; + connection_ptr round_robin_next(const connection_ptr& current, uint32_t sync_next_expected_num) const; template void for_each_connection(Function&& f) const; @@ -869,6 +869,7 @@ namespace eosio { bool connected() const; bool current() const; + bool should_sync_from(uint32_t sync_next_expected_num) const; /// @param reconnect true if we should try and reconnect immediately after close /// @param shutdown true only if plugin is shutting down @@ -1179,6 +1180,18 @@ namespace eosio { return (connected() && !syncing); } + bool connection::should_sync_from(uint32_t sync_next_expected_num) const { + if (!is_transactions_only_connection() && current()) { + if (no_retry == go_away_reason::no_reason) { + std::lock_guard g(conn_mtx); + if (last_handshake_recv.head_num >= sync_next_expected_num) { + return true; + } + } + } + return false; + } + void connection::flush_queues() { buffer_queue.clear_write_queue(); } @@ -1799,11 +1812,11 @@ namespace eosio { if (conn && conn->current() ) { new_sync_source = conn; } else { - new_sync_source = my_impl->connections.round_robin_next(new_sync_source, sync_known_lib_num); + new_sync_source = my_impl->connections.round_robin_next(new_sync_source, sync_next_expected_num); } // verify there is an available source - if( !new_sync_source || !new_sync_source->current() || new_sync_source->is_transactions_only_connection() ) { + if( !new_sync_source ) { fc_elog( logger, "Unable to continue syncing at this time"); if( !new_sync_source ) sync_source.reset(); sync_known_lib_num = chain_info.lib_num; @@ -4030,7 +4043,7 @@ namespace eosio { connections.clear(); } - connection_ptr connections_manager::round_robin_next( const connection_ptr& current, uint32_t sync_known_lib_num ) const { + connection_ptr connections_manager::round_robin_next( const connection_ptr& current, uint32_t sync_next_expected_num ) const { connection_ptr new_sync_source = current; std::shared_lock g( connections_mtx ); if( connections.empty() ) { @@ -4064,20 +4077,15 @@ namespace eosio { if( cptr != connections.end() ) { auto cstart_it = cptr; do { - // select the first one which is current and has lib above current and break out. - if( !(*cptr)->is_transactions_only_connection() && (*cptr)->current() ) { - std::lock_guard g_conn( (*cptr)->conn_mtx ); - // TODO: change to a better heuristic than lib >= sync_known_lib_num - if( (*cptr)->last_handshake_recv.last_irreversible_block_num >= sync_known_lib_num ) { + // select the first one which we should be able to sync from + if ((*cptr)->should_sync_from(sync_next_expected_num)) { new_sync_source = *cptr; break; } - } - if( ++cptr == connections.end() ) - cptr = connections.begin(); + if( ++cptr == connections.end() ) + cptr = connections.begin(); } while( cptr != cstart_it ); } - // no need to check the result, either source advanced or the whole list was checked and the old source is reused. } return new_sync_source; @@ -4180,18 +4188,20 @@ namespace eosio { ++num_peers; } - if( !(*it)->socket_is_open() && !(*it)->connecting) { - if( !(*it)->incoming() ) { - if( !(*it)->resolve_and_connect() ) { + if (!(*it)->socket_is_open() && !(*it)->connecting) { + if (!(*it)->incoming()) { + if (!(*it)->resolve_and_connect()) { + it = connections.erase(it); + --num_peers; + ++num_rm; + continue; + } + } else { + --num_clients; + ++num_rm; it = connections.erase(it); - --num_peers; ++num_rm; continue; } - } else { - --num_clients; ++num_rm; - it = connections.erase(it); - continue; - } } ++it; } From 3f59cc85afa37422a2376bb9fb8453397713ed50 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 23 May 2023 09:23:45 -0500 Subject: [PATCH 02/16] GH-1072 Change default sync_fetch_span from 100 to 1000 --- plugins/net_plugin/net_plugin.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index ef1df0212d..57b7e7249f 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -308,7 +308,7 @@ namespace eosio { constexpr auto def_conn_retry_wait = 30; constexpr auto def_txn_expire_wait = std::chrono::seconds(3); constexpr auto def_resp_expected_wait = std::chrono::seconds(5); - constexpr auto def_sync_fetch_span = 100; + constexpr auto def_sync_fetch_span = 1000; constexpr auto def_keepalive_interval = 10000; constexpr auto message_header_size = sizeof(uint32_t); From 5027edaa58e6831c1ee58568267a1265880b5700 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 23 May 2023 14:20:31 -0500 Subject: [PATCH 03/16] Add sync-peer-limit option --- plugins/net_plugin/net_plugin.cpp | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 57b7e7249f..f5ecffc036 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -213,9 +213,11 @@ namespace eosio { uint32_t sync_known_lib_num{0}; uint32_t sync_last_requested_num{0}; uint32_t sync_next_expected_num{0}; - uint32_t sync_req_span{0}; connection_ptr sync_source; + const uint32_t sync_req_span{0}; + const uint32_t sync_peer_limit{0}; + alignas(hardware_destructive_interference_size) std::atomic sync_state{in_sync}; @@ -228,7 +230,7 @@ namespace eosio { bool verify_catchup( const connection_ptr& c, uint32_t num, const block_id_type& id ); public: - explicit sync_manager( uint32_t span ); + explicit sync_manager( uint32_t span, uint32_t sync_peer_limit ); static void send_handshakes(); bool syncing_with_peer() const { return sync_state == lib_catchup; } bool is_in_sync() const { return sync_state == in_sync; } @@ -1725,12 +1727,13 @@ namespace eosio { } //----------------------------------------------------------- - sync_manager::sync_manager( uint32_t req_span ) + sync_manager::sync_manager( uint32_t span, uint32_t sync_peer_limit ) :sync_known_lib_num( 0 ) ,sync_last_requested_num( 0 ) ,sync_next_expected_num( 1 ) - ,sync_req_span( req_span ) ,sync_source() + ,sync_req_span( span ) + ,sync_peer_limit( sync_peer_limit ) ,sync_state(in_sync) { } @@ -3706,7 +3709,10 @@ namespace eosio { ( "p2p-dedup-cache-expire-time-sec", bpo::value()->default_value(10), "Maximum time to track transaction for duplicate optimization") ( "net-threads", bpo::value()->default_value(my->thread_pool_size), "Number of worker threads in net_plugin thread pool" ) - ( "sync-fetch-span", bpo::value()->default_value(def_sync_fetch_span), "number of blocks to retrieve in a chunk from any individual peer during synchronization") + ( "sync-fetch-span", bpo::value()->default_value(def_sync_fetch_span), + "Number of blocks to retrieve in a chunk from any individual peer during synchronization") + ( "sync-peer-limit", bpo::value()->default_value(3), + "Number of peers to sync from") ( "use-socket-read-watermark", bpo::value()->default_value(false), "Enable experimental socket read watermark optimization") ( "peer-log-format", bpo::value()->default_value( "[\"${_name}\" - ${_cid} ${_ip}:${_port}] " ), "The string used to format peers when logging messages about them. Variables are escaped with ${}.\n" @@ -3736,7 +3742,9 @@ namespace eosio { peer_log_format = options.at( "peer-log-format" ).as(); - my->sync_master = std::make_unique( options.at( "sync-fetch-span" ).as()); + my->sync_master = std::make_unique( + options.at( "sync-fetch-span" ).as(), + options.at( "sync-peer-limit" ).as() ); my->txn_exp_period = def_txn_expire_wait; my->p2p_dedup_cache_expire_time_us = fc::seconds( options.at( "p2p-dedup-cache-expire-time-sec" ).as() ); From 2ebb2be892d944b23bf0f84fd89228e95a9cf939 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 23 May 2023 18:52:53 -0500 Subject: [PATCH 04/16] GH-1072 Add tracking of latency --- plugins/net_plugin/net_plugin.cpp | 69 ++++++++++++++++++++----------- 1 file changed, 45 insertions(+), 24 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 44b1b27b56..5b74eeb03d 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -62,6 +62,9 @@ namespace eosio { using connection_ptr = std::shared_ptr; using connection_wptr = std::weak_ptr; + static constexpr int64_t block_interval_ns = + std::chrono::duration_cast(std::chrono::milliseconds(config::block_interval_ms)).count(); + const std::string logger_name("net_plugin_impl"); fc::logger logger; std::string peer_log_format; @@ -206,9 +209,6 @@ namespace eosio { in_sync }; - static constexpr int64_t block_interval_ns = - std::chrono::duration_cast(std::chrono::milliseconds(config::block_interval_ms)).count(); - alignas(hardware_destructive_interference_size) std::mutex sync_mtx; uint32_t sync_known_lib_num{0}; @@ -240,7 +240,7 @@ namespace eosio { void rejected_block( const connection_ptr& c, uint32_t blk_num ); void sync_recv_block( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied ); void sync_update_expected( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied ); - void recv_handshake( const connection_ptr& c, const handshake_message& msg ); + void recv_handshake( const connection_ptr& c, const handshake_message& msg, uint32_t nblk_combined_latency ); void sync_recv_notice( const connection_ptr& c, const notice_message& msg ); inline void reset_last_requested_num() { std::lock_guard g(sync_mtx); @@ -752,13 +752,18 @@ namespace eosio { explicit connection( tcp::socket&& socket ); ~connection() = default; + connection( const connection& ) = delete; + connection( connection&& ) = delete; + connection& operator=( const connection& ) = delete; + connection& operator=( connection&& ) = delete; + bool start_session(); bool socket_is_open() const { return socket_open.load(); } // thread safe, atomic const string& peer_address() const { return peer_addr; } // thread safe, const void set_connection_type( const string& peer_addr ); - bool is_transactions_only_connection()const { return connection_type == transactions_only; } + bool is_transactions_only_connection()const { return connection_type == transactions_only; } // thread safe, atomic bool is_blocks_only_connection()const { return connection_type == blocks_only; } void set_heartbeat_timeout(std::chrono::milliseconds msec) { std::chrono::system_clock::duration dur = msec; @@ -768,7 +773,7 @@ namespace eosio { private: static const string unknown; - void update_endpoints(); + std::atomic net_latency_ns = std::numeric_limits::max(); std::optional peer_requested; // this peer is requesting info from us @@ -867,6 +872,7 @@ namespace eosio { bool process_next_block_message(uint32_t message_length); bool process_next_trx_message(uint32_t message_length); + void update_endpoints(); public: bool populate_handshake( handshake_message& hello ) const; @@ -921,7 +927,7 @@ namespace eosio { void enqueue_buffer( const std::shared_ptr>& send_buffer, go_away_reason close_after_send, bool to_sync_queue = false); - void cancel_sync(go_away_reason); + void cancel_sync(go_away_reason reason); void flush_queues(); bool enqueue_sync_block(); void request_sync_blocks(uint32_t start, uint32_t end); @@ -965,7 +971,10 @@ namespace eosio { void handle_message( const packed_transaction& msg ) = delete; // packed_transaction_ptr overload used instead void handle_message( packed_transaction_ptr trx ); - void process_signed_block( const block_id_type& id, signed_block_ptr msg, block_state_ptr bsp ); + // returns calculated number of blocks combined latency + uint32_t update_latency(const handshake_message& msg); + + void process_signed_block( const block_id_type& id, signed_block_ptr block, block_state_ptr bsp ); fc::variant_object get_logger_variant() const { fc::mutable_variant_object mvo; @@ -1189,14 +1198,17 @@ namespace eosio { } } + // thread safe, all atomics bool connection::connected() const { return socket_is_open() && !connecting && !closing; } + // thread safe, all atomics bool connection::current() const { return (connected() && !syncing); } + // thread safe bool connection::should_sync_from(uint32_t sync_next_expected_num) const { if (!is_transactions_only_connection() && current()) { if (no_retry == go_away_reason::no_reason) { @@ -1924,7 +1936,7 @@ namespace eosio { } // called from c's connection strand - void sync_manager::recv_handshake( const connection_ptr& c, const handshake_message& msg ) { + void sync_manager::recv_handshake( const connection_ptr& c, const handshake_message& msg, uint32_t nblk_combined_latency ) { if( c->is_transactions_only_connection() ) return; @@ -1932,20 +1944,6 @@ namespace eosio { sync_reset_lib_num(c, false); - auto current_time_ns = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - int64_t network_latency_ns = current_time_ns - msg.time; // net latency in nanoseconds - if( network_latency_ns < 0 ) { - peer_wlog(c, "Peer sent a handshake with a timestamp skewed by at least ${t}ms", ("t", network_latency_ns/1000000)); - network_latency_ns = 0; - } - // number of blocks syncing node is behind from a peer node, round up - uint32_t nblk_behind_by_net_latency = std::lround( static_cast(network_latency_ns) / static_cast(block_interval_ns) ); - // 2x for time it takes for message to reach back to peer node - uint32_t nblk_combined_latency = 2 * nblk_behind_by_net_latency; - // message in the log below is used in p2p_high_latency_test.py test - peer_dlog(c, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received", - ("lat", network_latency_ns/1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency)); - //-------------------------------- // sync need checks; (lib == last irreversible block) // @@ -2936,6 +2934,7 @@ namespace eosio { peer_dlog(this, "received chain_size_message"); } + // called from connection strand void connection::handle_message( const handshake_message& msg ) { peer_dlog( this, "received handshake_message" ); if( !is_valid( msg ) ) { @@ -3098,7 +3097,29 @@ namespace eosio { } } - my_impl->sync_master->recv_handshake( shared_from_this(), msg ); + uint32_t nblk_combined_latency = update_latency(msg); + my_impl->sync_master->recv_handshake( shared_from_this(), msg, nblk_combined_latency ); + } + + // called from connection strand + uint32_t connection::update_latency(const handshake_message& msg) { + auto current_time_ns = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + int64_t network_latency_ns = current_time_ns - msg.time; // net latency in nanoseconds + if( network_latency_ns < 0 ) { + peer_wlog(this, "Peer sent a handshake with a timestamp skewed by at least ${t}ms", ("t", network_latency_ns/1000000)); + network_latency_ns = 0; + } + // number of blocks syncing node is behind from a peer node, round up + uint32_t nblk_behind_by_net_latency = std::lround( static_cast(network_latency_ns) / static_cast(block_interval_ns) ); + // 2x for time it takes for message to reach back to peer node + uint32_t nblk_combined_latency = 2 * nblk_behind_by_net_latency; + // message in the log below is used in p2p_high_latency_test.py test + peer_dlog(this, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received", + ("lat", network_latency_ns/1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency)); + + net_latency_ns = network_latency_ns <= 0 ? std::numeric_limits::max() : network_latency_ns; + + return nblk_combined_latency; } void connection::handle_message( const go_away_message& msg ) { From 3ad3b7775c0432f690aff55a630377fc69f5bfbe Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 24 May 2023 18:21:50 -0500 Subject: [PATCH 05/16] GH-1072 Use latency and peer block range to determine who to sync from --- plugins/net_plugin/net_plugin.cpp | 142 +++++++++++++++++------------- 1 file changed, 81 insertions(+), 61 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 5b74eeb03d..3f6bca730b 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -221,12 +222,14 @@ namespace eosio { alignas(hardware_destructive_interference_size) std::atomic sync_state{in_sync}; + std::atomic sync_ordinal{0}; private: constexpr static auto stage_str( stages s ); bool set_state( stages newstate ); bool is_sync_required( uint32_t fork_head_block_num ); void request_next_chunk( std::unique_lock g_sync, const connection_ptr& conn = connection_ptr() ); + connection_ptr find_next_sync_node(); void start_sync( const connection_ptr& c, uint32_t target ); bool verify_catchup( const connection_ptr& c, uint32_t num, const block_id_type& id ); @@ -371,9 +374,6 @@ namespace eosio { std::optional status(const string& host) const; vector connection_statuses() const; - // return the next connection after current in collection that has blocks above - connection_ptr round_robin_next(const connection_ptr& current, uint32_t sync_next_expected_num) const; - template void for_each_connection(Function&& f) const; @@ -592,7 +592,8 @@ namespace eosio { constexpr uint16_t proto_heartbeat_interval = 4; // eosio 2.1: supports configurable heartbeat interval constexpr uint16_t proto_dup_goaway_resolution = 5; // eosio 2.1: support peer address based duplicate connection resolution constexpr uint16_t proto_dup_node_id_goaway = 6; // eosio 2.1: support peer node_id based duplicate connection resolution - constexpr uint16_t proto_leap_initial = 7; // leap client, needed because none of the 2.1 versions are supported + constexpr uint16_t proto_leap_initial = 7; // leap client, needed because none of the 2.1 versions are supported + constexpr uint16_t proto_block_range = 8; // include block range in notice_message #pragma GCC diagnostic pop constexpr uint16_t net_version_max = proto_leap_initial; @@ -770,6 +771,8 @@ namespace eosio { hb_timeout = dur.count(); } + uint64_t get_net_latency_ns() const { return net_latency_ns; } + private: static const string unknown; @@ -787,7 +790,9 @@ namespace eosio { blocks_only }; - std::atomic connection_type{both}; + std::atomic connection_type{both}; + std::atomic peer_start_block_num{0}; + std::atomic peer_head_block_num{0}; public: boost::asio::io_context::strand strand; @@ -808,6 +813,8 @@ namespace eosio { // kept in sync with last_handshake_recv.last_irreversible_block_num, only accessed from connection strand uint32_t peer_lib_num = 0; + std::atomic sync_ordinal{0}; + alignas(hardware_destructive_interference_size) std::atomic trx_in_progress_size{0}; @@ -862,7 +869,7 @@ namespace eosio { bool connected() const; bool current() const; - bool should_sync_from(uint32_t sync_next_expected_num) const; + bool should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num) const; /// @param reconnect true if we should try and reconnect immediately after close /// @param shutdown true only if plugin is shutting down @@ -1209,12 +1216,14 @@ namespace eosio { } // thread safe - bool connection::should_sync_from(uint32_t sync_next_expected_num) const { + bool connection::should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num) const { if (!is_transactions_only_connection() && current()) { if (no_retry == go_away_reason::no_reason) { - std::lock_guard g(conn_mtx); - if (last_handshake_recv.head_num >= sync_next_expected_num) { - return true; + if (peer_start_block_num <= sync_next_expected_num) { + std::lock_guard g_conn( conn_mtx ); + if (last_handshake_recv.last_irreversible_block_num >= sync_known_lib_num) { + return true; + } } } } @@ -1817,6 +1826,42 @@ namespace eosio { } } + connection_ptr sync_manager::find_next_sync_node() { + std::deque conns; + my_impl->connections.for_each_block_connection([&](const auto& c) { + if (c->should_sync_from(sync_next_expected_num, sync_known_lib_num)) { + conns.push_back(c); + } + }); + std::sort(conns.begin(), conns.end(), [](const connection_ptr& lhs, const connection_ptr& rhs) { + return lhs->get_net_latency_ns() < rhs->get_net_latency_ns(); + }); + + if (conns.empty()) { + return {}; + } + if (conns.size() == 1) { // only one available + ++sync_ordinal; + conns.front()->sync_ordinal = sync_ordinal.load(); + return conns.front(); + } + + ++sync_ordinal; + // example: sync_ordinal is 6 after inc above then there may be connections with 3,4,5. + // Choose from the lowest sync_ordinal of the sync_peer_limit of lowest latency, note 0 means not synced from yet + size_t the_one = 0; + uint32_t lowest_ordinal = std::numeric_limits::max(); + for (size_t i = 0; i < conns.size() && i < sync_peer_limit && lowest_ordinal != 0; ++i) { + uint32_t sync_ord = conns[i]->sync_ordinal; + if (sync_ord < lowest_ordinal) { + the_one = i; + lowest_ordinal = sync_ordinal; + } + } + conns[the_one]->sync_ordinal = sync_ordinal.load(); + return conns[the_one]; + } + // call with g_sync locked, called from conn's connection strand void sync_manager::request_next_chunk( std::unique_lock g_sync, const connection_ptr& conn ) { auto chain_info = my_impl->get_chain_info(); @@ -1837,11 +1882,11 @@ namespace eosio { * otherwise select the next available from the list, round-robin style. */ - connection_ptr new_sync_source = sync_source; + connection_ptr new_sync_source; if (conn && conn->current() ) { new_sync_source = conn; } else { - new_sync_source = my_impl->connections.round_robin_next(new_sync_source, sync_next_expected_num); + new_sync_source = find_next_sync_node(); } // verify there is an available source @@ -1935,6 +1980,12 @@ namespace eosio { } } + inline block_id_type make_block_id( uint32_t block_num ) { + chain::block_id_type block_id; + block_id._hash[0] += fc::endian_reverse_u32(block_num); + return block_id; + } + // called from c's connection strand void sync_manager::recv_handshake( const connection_ptr& c, const handshake_message& msg, uint32_t nblk_combined_latency ) { @@ -1984,11 +2035,17 @@ namespace eosio { ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) ("h", chain_info.head_num)("l", chain_info.lib_num) ); if (msg.generation > 1 || c->protocol_version > proto_base) { + controller& cc = my_impl->chain_plug->chain(); notice_message note; note.known_trx.pending = chain_info.lib_num; note.known_trx.mode = last_irr_catch_up; note.known_blocks.mode = last_irr_catch_up; note.known_blocks.pending = chain_info.head_num; + note.known_blocks.ids.push_back(chain_info.head_id); + if (c->protocol_version >= proto_block_range) { + // begin, more efficient to encode a block num instead of retrieving actual block id + note.known_blocks.ids.push_back(make_block_id(cc.earliest_available_block_num())); + } c->enqueue( note ); } c->syncing = true; @@ -2007,11 +2064,16 @@ namespace eosio { ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) ("h", chain_info.head_num)("l", chain_info.lib_num) ); if (msg.generation > 1 || c->protocol_version > proto_base) { + controller& cc = my_impl->chain_plug->chain(); notice_message note; note.known_trx.mode = none; note.known_blocks.mode = catch_up; note.known_blocks.pending = chain_info.head_num; note.known_blocks.ids.push_back(chain_info.head_id); + if (c->protocol_version >= proto_block_range) { + // begin, more efficient to encode a block num instead of retrieving actual block id + note.known_blocks.ids.push_back(make_block_id(cc.earliest_available_block_num())); + } c->enqueue( note ); } c->syncing = false; @@ -3190,7 +3252,7 @@ namespace eosio { // peer_dlog( this, "received notice_message" ); connecting = false; - if( msg.known_blocks.ids.size() > 1 ) { + if( msg.known_blocks.ids.size() > 2 ) { peer_elog( this, "Invalid notice_message, known_blocks.ids.size ${s}, closing connection", ("s", msg.known_blocks.ids.size()) ); close( false ); @@ -3231,6 +3293,12 @@ namespace eosio { } case last_irr_catch_up: case catch_up: { + if (msg.known_blocks.ids.size() > 1) { + peer_start_block_num = block_header::num_from_id(msg.known_blocks.ids[1]); + } + if (msg.known_blocks.ids.size() > 0) { + peer_head_block_num = block_header::num_from_id(msg.known_blocks.ids[0]); + } my_impl->sync_master->sync_recv_notice( shared_from_this(), msg ); break; } @@ -4082,54 +4150,6 @@ namespace eosio { connections.clear(); } - connection_ptr connections_manager::round_robin_next( const connection_ptr& current, uint32_t sync_next_expected_num ) const { - connection_ptr new_sync_source = current; - std::shared_lock g( connections_mtx ); - if( connections.empty() ) { - new_sync_source.reset(); - } else if( connections.size() == 1 ) { - if (!new_sync_source) { - new_sync_source = *connections.begin(); - } - } else { - // init to a linear array search - auto cptr = connections.begin(); - auto cend = connections.end(); - // do we remember the previous source? - if (current) { - //try to find it in the list - cptr = connections.find( current ); - cend = cptr; - if( cptr == connections.end() ) { - // not there - must have been closed! cend is now connections.end, so just flatten the ring. - new_sync_source.reset(); - cptr = connections.begin(); - } else { - // was found - advance the start to the next. cend is the old source. - if( ++cptr == connections.end() ) { - cptr = connections.begin(); - } - } - } - - // scan the list of peers looking for another able to provide sync blocks. - if( cptr != connections.end() ) { - auto cstart_it = cptr; - do { - // select the first one which we should be able to sync from - if ((*cptr)->should_sync_from(sync_next_expected_num)) { - new_sync_source = *cptr; - break; - } - if( ++cptr == connections.end() ) - cptr = connections.begin(); - } while( cptr != cstart_it ); - } - // no need to check the result, either source advanced or the whole list was checked and the old source is reused. - } - return new_sync_source; - } - std::optional connections_manager::status( const string& host )const { std::shared_lock g( connections_mtx ); auto con = find_connection_i( host ); From a632f751a6d6adc9d7d913b2c98d58c386de207a Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 24 May 2023 19:16:16 -0500 Subject: [PATCH 06/16] GH-1072 milliseconds not microseconds --- plugins/net_plugin/net_plugin.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 3f6bca730b..3c879708ab 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -3835,7 +3835,7 @@ namespace eosio { "p2p-keepalive_interval-ms must be greater than 0" ); my->connections.init( std::chrono::milliseconds( options.at("p2p-keepalive-interval-ms").as() * 2 ), - fc::microseconds( options.at("max-cleanup-time-msec").as() ), + fc::milliseconds( options.at("max-cleanup-time-msec").as() ), std::chrono::seconds( options.at("connection-cleanup-period").as() ), options.at("max-clients").as() ); From 7b1e62cd9f2fb71b1b3a7d81ebb073757b1f3dac Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 25 May 2023 13:14:20 -0500 Subject: [PATCH 07/16] GH-1072 Use peer_head_block_num for should_sync_from instead of lib --- plugins/net_plugin/net_plugin.cpp | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 3c879708ab..e756cfedba 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -346,6 +346,7 @@ namespace eosio { void connection_monitor(const std::weak_ptr& from_connection); public: + size_t number_connections() const; void add_supplied_peers(const vector& peers ); // not thread safe, only call on startup @@ -1217,11 +1218,14 @@ namespace eosio { // thread safe bool connection::should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num) const { + fc_wlog(logger, "id: ${id} trx_only: ${t} current: ${c} socket_open: ${so} syncing: ${s} connecting: ${con} closing: ${close} peer_start_block: ${sb} peer_head: ${h} latency: ${lat}us no_retry: ${g}", + ("id", connection_id)("t", is_transactions_only_connection()) + ("c", current())("so", socket_is_open())("s", syncing.load())("con", connecting.load())("close", closing.load()) + ("sb", peer_start_block_num.load())("h", peer_head_block_num.load())("lat", get_net_latency_ns()/1000)("g", reason_str(no_retry))); if (!is_transactions_only_connection() && current()) { if (no_retry == go_away_reason::no_reason) { - if (peer_start_block_num <= sync_next_expected_num) { - std::lock_guard g_conn( conn_mtx ); - if (last_handshake_recv.last_irreversible_block_num >= sync_known_lib_num) { + if (peer_start_block_num <= sync_next_expected_num) { // has blocks we want + if (peer_head_block_num >= sync_known_lib_num) { // is in sync return true; } } @@ -1833,9 +1837,13 @@ namespace eosio { conns.push_back(c); } }); - std::sort(conns.begin(), conns.end(), [](const connection_ptr& lhs, const connection_ptr& rhs) { - return lhs->get_net_latency_ns() < rhs->get_net_latency_ns(); - }); + if (conns.size() > sync_peer_limit) { + std::partial_sort(conns.begin(), conns.begin() + sync_peer_limit, conns.end(), [](const connection_ptr& lhs, const connection_ptr& rhs) { + return lhs->get_net_latency_ns() < rhs->get_net_latency_ns(); + }); + } + + fc_dlog(logger, "Valid sync peers ${s}, sync_ordinal ${so}", ("s", conns.size())("so", sync_ordinal.load())); if (conns.empty()) { return {}; @@ -1855,7 +1863,7 @@ namespace eosio { uint32_t sync_ord = conns[i]->sync_ordinal; if (sync_ord < lowest_ordinal) { the_one = i; - lowest_ordinal = sync_ordinal; + lowest_ordinal = sync_ord; } } conns[the_one]->sync_ordinal = sync_ordinal.load(); @@ -2998,7 +3006,6 @@ namespace eosio { // called from connection strand void connection::handle_message( const handshake_message& msg ) { - peer_dlog( this, "received handshake_message" ); if( !is_valid( msg ) ) { peer_elog( this, "bad handshake message"); no_retry = go_away_reason::fatal_other; @@ -3009,6 +3016,7 @@ namespace eosio { ("g", msg.generation)("lib", msg.last_irreversible_block_num)("head", msg.head_num) ); peer_lib_num = msg.last_irreversible_block_num; + peer_head_block_num = msg.head_num; std::unique_lock g_conn( conn_mtx ); last_handshake_recv = msg; g_conn.unlock(); @@ -4079,6 +4087,11 @@ namespace eosio { //---------------------------------------------------------------------------- + size_t connections_manager::number_connections() const { + std::lock_guard g(connections_mtx); + return connections.size(); + } + void connections_manager::add_supplied_peers(const vector& peers ) { std::lock_guard g(connections_mtx); supplied_peers.insert( peers.begin(), peers.end() ); From 24f5986eef855efab04bc9ca8cef8cc570b2afa1 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 25 May 2023 13:15:02 -0500 Subject: [PATCH 08/16] GH-1072 Drop incoming trx when syncing --- plugins/net_plugin/net_plugin.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index e756cfedba..ba78a0b55f 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -3389,6 +3389,11 @@ namespace eosio { // called from connection strand void connection::handle_message( packed_transaction_ptr trx ) { const auto& tid = trx->id(); + if (my_impl->sync_master->syncing_with_peer()) { + peer_wlog(this, "syncing, dropping trx ${id}", ("id", tid)); + return; + } + peer_dlog( this, "received packed_transaction ${id}", ("id", tid) ); size_t trx_size = calc_trx_size( trx ); From 851af76905f91b57ffbe6cf7fef3714116cffe96 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 26 May 2023 07:59:26 -0500 Subject: [PATCH 09/16] GH-1072 Move trx sync check --- plugins/net_plugin/net_plugin.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index ba78a0b55f..6af53ead98 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -2876,10 +2876,14 @@ namespace eosio { // called from connection strand bool connection::process_next_trx_message(uint32_t message_length) { if( !my_impl->p2p_accept_transactions ) { - peer_dlog( this, "p2p-accept-transaction=false - dropping txn" ); + peer_dlog( this, "p2p-accept-transaction=false - dropping trx" ); pending_message_buffer.advance_read_ptr( message_length ); return true; } + if (my_impl->sync_master->syncing_with_peer()) { + peer_wlog(this, "syncing, dropping trx"); + return true; + } const unsigned long trx_in_progress_sz = this->trx_in_progress_size.load(); @@ -3389,10 +3393,6 @@ namespace eosio { // called from connection strand void connection::handle_message( packed_transaction_ptr trx ) { const auto& tid = trx->id(); - if (my_impl->sync_master->syncing_with_peer()) { - peer_wlog(this, "syncing, dropping trx ${id}", ("id", tid)); - return; - } peer_dlog( this, "received packed_transaction ${id}", ("id", tid) ); From 386f00bdc5074fdf4ffd00d279c1647de7e75257 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 26 May 2023 13:35:16 -0500 Subject: [PATCH 10/16] GH-1072 Use absolute value latency and improve logging --- plugins/net_plugin/net_plugin.cpp | 50 +++++++++++++++++++------------ 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 6af53ead98..c9d5226291 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -1218,7 +1218,7 @@ namespace eosio { // thread safe bool connection::should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num) const { - fc_wlog(logger, "id: ${id} trx_only: ${t} current: ${c} socket_open: ${so} syncing: ${s} connecting: ${con} closing: ${close} peer_start_block: ${sb} peer_head: ${h} latency: ${lat}us no_retry: ${g}", + fc_dlog(logger, "id: ${id} trx_only: ${t} current: ${c} socket_open: ${so} syncing: ${s} connecting: ${con} closing: ${close} peer_start_block: ${sb} peer_head: ${h} latency: ${lat}us no_retry: ${g}", ("id", connection_id)("t", is_transactions_only_connection()) ("c", current())("so", socket_is_open())("s", syncing.load())("con", connecting.load())("close", closing.load()) ("sb", peer_start_block_num.load())("h", peer_head_block_num.load())("lat", get_net_latency_ns()/1000)("g", reason_str(no_retry))); @@ -1406,13 +1406,14 @@ namespace eosio { close(false); } return; - } else { - const tstamp timeout = std::max(hb_timeout/2, 2*std::chrono::milliseconds(config::block_interval_ms).count()); - if ( current_time > latest_blk_time + timeout ) { - send_handshake(); - return; - } } + const tstamp timeout = std::max(hb_timeout/2, 2*std::chrono::milliseconds(config::block_interval_ms).count()); + if ( current_time > latest_blk_time + timeout ) { + peer_dlog(this, "half heartbeat timed out, sending handshake"); + send_handshake(); + return; + } + } send_time(); @@ -1831,6 +1832,8 @@ namespace eosio { } connection_ptr sync_manager::find_next_sync_node() { + fc_dlog(logger, "Number connections ${s}, sync_next_expected_num: ${e}, sync_known_lib_num: ${l}", + ("s", my_impl->connections.number_connections())("e", sync_next_expected_num)("l", sync_known_lib_num)); std::deque conns; my_impl->connections.for_each_block_connection([&](const auto& c) { if (c->should_sync_from(sync_next_expected_num, sync_known_lib_num)) { @@ -1861,11 +1864,14 @@ namespace eosio { uint32_t lowest_ordinal = std::numeric_limits::max(); for (size_t i = 0; i < conns.size() && i < sync_peer_limit && lowest_ordinal != 0; ++i) { uint32_t sync_ord = conns[i]->sync_ordinal; + fc_dlog(logger, "compare sync ords, conn: ${lcid}, ord: ${l} < ${r}, latency: ${lat}us", + ("lcid", conns[i]->connection_id)("l", sync_ord)("r", lowest_ordinal)("lat", conns[i]->get_net_latency_ns()/1000)); if (sync_ord < lowest_ordinal) { the_one = i; lowest_ordinal = sync_ord; } } + fc_dlog(logger, "sync from ${c}", ("c", conns[the_one]->connection_id)); conns[the_one]->sync_ordinal = sync_ordinal.load(); return conns[the_one]; } @@ -1878,12 +1884,18 @@ namespace eosio { ("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span) ); if( chain_info.head_num < sync_last_requested_num && sync_source && sync_source->current() ) { - fc_ilog( logger, "ignoring request, head is ${h} last req = ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, source connection ${c}", + fc_dlog( logger, "ignoring request, head is ${h} last req = ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, source connection ${c}", ("h", chain_info.head_num)("r", sync_last_requested_num)("e", sync_next_expected_num) ("k", sync_known_lib_num)("s", sync_req_span)("c", sync_source->connection_id) ); return; } + if (conn) { + // p2p_high_latency_test.py test depends on this exact log statement. + peer_ilog(conn, "Catching up with chain, our last req is ${cc}, theirs is ${t}, next expected ${n}", + ("cc", sync_last_requested_num)("t", sync_known_lib_num)("n", sync_next_expected_num)); + } + /* ---------- * next chunk provider selection criteria * a provider is supplied and able to be used, use it. @@ -1926,6 +1938,7 @@ namespace eosio { } if( !request_sent ) { g_sync.unlock(); + fc_dlog(logger, "Unable to request range, sending handshakes to everyone"); send_handshakes(); } } @@ -1968,10 +1981,6 @@ namespace eosio { } sync_next_expected_num = std::max( chain_info.lib_num + 1, sync_next_expected_num ); - // p2p_high_latency_test.py test depends on this exact log statement. - peer_ilog( c, "Catching up with chain, our last req is ${cc}, theirs is ${t}, next expected ${n}", - ("cc", sync_last_requested_num)("t", target)("n", sync_next_expected_num) ); - request_next_chunk( std::move( g_sync ), c ); } @@ -2164,6 +2173,7 @@ namespace eosio { verify_catchup( c, msg.known_blocks.pending, id ); } else { // we already have the block, so update peer with our view of the world + peer_dlog(c, "Already have block, sending handshake"); c->send_handshake(); } } @@ -2190,6 +2200,7 @@ namespace eosio { c->close(); } else { g.unlock(); + peer_dlog(c, "rejected block, sending handshake"); c->send_handshake(); } } @@ -2247,10 +2258,12 @@ namespace eosio { if( set_state_to_head_catchup ) { if( set_state( head_catchup ) ) { - send_handshakes(); + peer_dlog( c, "Switching to head_catchup, sending handshakes" ); + send_handshakes(); } } else { set_state( in_sync ); + peer_dlog( c, "Switching to in_sync, sending handshakes" ); send_handshakes(); } } else if( state == lib_catchup ) { @@ -2824,9 +2837,10 @@ namespace eosio { pending_message_buffer.advance_read_ptr( message_length ); return true; } - peer_dlog( this, "received block ${num}, id ${id}..., latency: ${latency}", + peer_dlog( this, "received block ${num}, id ${id}..., latency: ${latency}ms, head ${h}", ("num", bh.block_num())("id", blk_id.str().substr(8,16)) - ("latency", (fc::time_point::now() - bh.timestamp).count()/1000) ); + ("latency", (fc::time_point::now() - bh.timestamp).count()/1000) + ("h", my_impl->get_chain_head_num())); if( !my_impl->sync_master->syncing_with_peer() ) { // guard against peer thinking it needs to send us old blocks uint32_t lib_num = my_impl->get_chain_lib_num(); if( blk_num < lib_num ) { @@ -3181,7 +3195,7 @@ namespace eosio { int64_t network_latency_ns = current_time_ns - msg.time; // net latency in nanoseconds if( network_latency_ns < 0 ) { peer_wlog(this, "Peer sent a handshake with a timestamp skewed by at least ${t}ms", ("t", network_latency_ns/1000000)); - network_latency_ns = 0; + network_latency_ns = -network_latency_ns; // use absolute value because it might be this node with the skew } // number of blocks syncing node is behind from a peer node, round up uint32_t nblk_behind_by_net_latency = std::lround( static_cast(network_latency_ns) / static_cast(block_interval_ns) ); @@ -3191,7 +3205,7 @@ namespace eosio { peer_dlog(this, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received", ("lat", network_latency_ns/1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency)); - net_latency_ns = network_latency_ns <= 0 ? std::numeric_limits::max() : network_latency_ns; + net_latency_ns = network_latency_ns; return nblk_combined_latency; } @@ -3420,8 +3434,6 @@ namespace eosio { // called from connection strand void connection::handle_message( const block_id_type& id, signed_block_ptr ptr ) { - peer_dlog( this, "received signed_block ${num}, id ${id}", ("num", block_header::num_from_id(id))("id", id) ); - // post to dispatcher strand so that we don't have multiple threads validating the block header my_impl->dispatcher->strand.post([id, c{shared_from_this()}, ptr{std::move(ptr)}, cid=connection_id]() mutable { controller& cc = my_impl->chain_plug->chain(); From 99bf7bc0189821d755ab891c8be493e13e819c99 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 31 May 2023 11:07:07 -0500 Subject: [PATCH 11/16] GH-1072 += not needed --- plugins/net_plugin/net_plugin.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index c9d5226291..2601d91879 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -1999,7 +1999,7 @@ namespace eosio { inline block_id_type make_block_id( uint32_t block_num ) { chain::block_id_type block_id; - block_id._hash[0] += fc::endian_reverse_u32(block_num); + block_id._hash[0] = fc::endian_reverse_u32(block_num); return block_id; } From 04a5143529b33963a072e16d7d954a00a9d55795 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 31 May 2023 11:17:18 -0500 Subject: [PATCH 12/16] GH-1072 simplify --- plugins/net_plugin/net_plugin.cpp | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 2601d91879..78425cd07b 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -1902,17 +1902,13 @@ namespace eosio { * otherwise select the next available from the list, round-robin style. */ - connection_ptr new_sync_source; - if (conn && conn->current() ) { - new_sync_source = conn; - } else { - new_sync_source = find_next_sync_node(); - } + connection_ptr new_sync_source = (conn && conn->current()) ? conn : + find_next_sync_node(); // verify there is an available source if( !new_sync_source ) { fc_elog( logger, "Unable to continue syncing at this time"); - if( !new_sync_source ) sync_source.reset(); + sync_source.reset(); sync_known_lib_num = chain_info.lib_num; sync_last_requested_num = 0; set_state( in_sync ); // probably not, but we can't do anything else From f3c57978e120634fee01bcfa07c10444aae8395b Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 31 May 2023 12:22:17 -0500 Subject: [PATCH 13/16] GH-1072 Add is_transactions_connection and is_blocks_connection to simply logic --- plugins/net_plugin/net_plugin.cpp | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 78425cd07b..df52dc9e8e 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -767,6 +767,8 @@ namespace eosio { void set_connection_type( const string& peer_addr ); bool is_transactions_only_connection()const { return connection_type == transactions_only; } // thread safe, atomic bool is_blocks_only_connection()const { return connection_type == blocks_only; } + bool is_transactions_connection() const { return connection_type != blocks_only; } // thread safe, atomic + bool is_blocks_connection() const { return connection_type != transactions_only; } // thread safe, atomic void set_heartbeat_timeout(std::chrono::milliseconds msec) { std::chrono::system_clock::duration dur = msec; hb_timeout = dur.count(); @@ -1093,8 +1095,9 @@ namespace eosio { void connections_manager::for_each_block_connection( Function&& f ) const { std::shared_lock g( connections_mtx ); for( auto& c : connections ) { - if( c->is_transactions_only_connection() ) continue; - f( c ); + if (c->is_blocks_connection()) { + f(c); + } } } @@ -1108,9 +1111,10 @@ namespace eosio { bool connections_manager::any_of_block_connections(UnaryPredicate&& p) const { std::shared_lock g( connections_mtx ); for( auto& c : connections ) { - if( c->is_transactions_only_connection() ) continue; - if (p(c)) - return true; + if( c->is_blocks_connection() ) { + if (p(c)) + return true; + } } return false; } @@ -1218,11 +1222,11 @@ namespace eosio { // thread safe bool connection::should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num) const { - fc_dlog(logger, "id: ${id} trx_only: ${t} current: ${c} socket_open: ${so} syncing: ${s} connecting: ${con} closing: ${close} peer_start_block: ${sb} peer_head: ${h} latency: ${lat}us no_retry: ${g}", - ("id", connection_id)("t", is_transactions_only_connection()) + fc_dlog(logger, "id: ${id} blocks conn: ${t} current: ${c} socket_open: ${so} syncing: ${s} connecting: ${con} closing: ${close} peer_start_block: ${sb} peer_head: ${h} latency: ${lat}us no_retry: ${g}", + ("id", connection_id)("t", is_blocks_connection()) ("c", current())("so", socket_is_open())("s", syncing.load())("con", connecting.load())("close", closing.load()) ("sb", peer_start_block_num.load())("h", peer_head_block_num.load())("lat", get_net_latency_ns()/1000)("g", reason_str(no_retry))); - if (!is_transactions_only_connection() && current()) { + if (is_blocks_connection() && current()) { if (no_retry == go_away_reason::no_reason) { if (peer_start_block_num <= sync_next_expected_num) { // has blocks we want if (peer_head_block_num >= sync_known_lib_num) { // is in sync @@ -1834,7 +1838,7 @@ namespace eosio { connection_ptr sync_manager::find_next_sync_node() { fc_dlog(logger, "Number connections ${s}, sync_next_expected_num: ${e}, sync_known_lib_num: ${l}", ("s", my_impl->connections.number_connections())("e", sync_next_expected_num)("l", sync_known_lib_num)); - std::deque conns; + deque conns; my_impl->connections.for_each_block_connection([&](const auto& c) { if (c->should_sync_from(sync_next_expected_num, sync_known_lib_num)) { conns.push_back(c); @@ -1857,6 +1861,7 @@ namespace eosio { return conns.front(); } + // keep track of which node was synced from last; round-robin among the current (sync_peer_limit) lowest latency peers ++sync_ordinal; // example: sync_ordinal is 6 after inc above then there may be connections with 3,4,5. // Choose from the lowest sync_ordinal of the sync_peer_limit of lowest latency, note 0 means not synced from yet @@ -2002,7 +2007,8 @@ namespace eosio { // called from c's connection strand void sync_manager::recv_handshake( const connection_ptr& c, const handshake_message& msg, uint32_t nblk_combined_latency ) { - if( c->is_transactions_only_connection() ) return; + if (!c->is_blocks_connection()) + return; auto chain_info = my_impl->get_chain_info(); @@ -2419,7 +2425,7 @@ namespace eosio { trx_buffer_factory buff_factory; const fc::time_point_sec now{fc::time_point::now()}; my_impl->connections.for_each_connection( [this, &trx, &now, &buff_factory]( auto& cp ) { - if( cp->is_blocks_only_connection() || !cp->current() ) { + if( !cp->is_transactions_connection() || !cp->current() ) { return; } if( !add_peer_txn(trx->id(), trx->expiration(), cp->connection_id, now) ) { From d99c7daf31fc201b3e547db2d7b3f76663636754 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 31 May 2023 14:40:34 -0500 Subject: [PATCH 14/16] GH-1072 Use a connection_state enum for connection state instead of bool variables. Rename syncing to peer_syncing_from_us. Rename syncing_with_peer() to syncing_from_peer(). --- plugins/net_plugin/net_plugin.cpp | 96 ++++++++++++++++++++----------- 1 file changed, 63 insertions(+), 33 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index df52dc9e8e..d3e16736ba 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -236,7 +236,7 @@ namespace eosio { public: explicit sync_manager( uint32_t span, uint32_t sync_peer_limit ); static void send_handshakes(); - bool syncing_with_peer() const { return sync_state == lib_catchup; } + bool syncing_from_peer() const { return sync_state == lib_catchup; } bool is_in_sync() const { return sync_state == in_sync; } void sync_reset_lib_num( const connection_ptr& conn, bool closing ); void sync_reassign_fetch( const connection_ptr& c, go_away_reason reason ); @@ -750,6 +750,8 @@ namespace eosio { class connection : public std::enable_shared_from_this { public: + enum class connection_state { connecting, connected, closing, closed }; + explicit connection( const string& endpoint ); explicit connection( tcp::socket&& socket ); ~connection() = default; @@ -762,6 +764,9 @@ namespace eosio { bool start_session(); bool socket_is_open() const { return socket_open.load(); } // thread safe, atomic + connection_state state() const { return conn_state.load(); } // thread safe atomic + void set_state(connection_state s); + static std::string state_str(connection_state s); const string& peer_address() const { return peer_addr; } // thread safe, const void set_connection_type( const string& peer_addr ); @@ -786,6 +791,8 @@ namespace eosio { alignas(hardware_destructive_interference_size) std::atomic socket_open{false}; + std::atomic conn_state{connection_state::connecting}; + const string peer_addr; enum connection_types : char { both, @@ -826,9 +833,7 @@ namespace eosio { int16_t sent_handshake_count = 0; alignas(hardware_destructive_interference_size) - std::atomic connecting{true}; - std::atomic syncing{false}; - std::atomic closing{false}; + std::atomic peer_syncing_from_us{false}; std::atomic protocol_version = 0; uint16_t net_version = net_version_max; @@ -1179,11 +1184,36 @@ namespace eosio { } } + std::string connection::state_str(connection_state s) { + switch (s) { + case connection_state::connecting: + return "connecting"; + case connection_state::connected: + return "connected"; + case connection_state::closing: + return "closing"; + case connection_state::closed: + return "closed"; + } + return "unknown"; + } + + void connection::set_state(connection_state s) { + auto curr = state(); + if (curr == s) + return; + if (s == connection_state::connected && curr != connection_state::connecting) + return; + peer_dlog(this, "old connection state ${os} becoming ${ns}", ("os", state_str(curr))("ns", state_str(s))); + + conn_state = s; + } + connection_status connection::get_status()const { connection_status stat; stat.peer = peer_addr; - stat.connecting = connecting; - stat.syncing = syncing; + stat.connecting = state() == connection_state::connecting; + stat.syncing = peer_syncing_from_us; stat.is_bp_peer = is_bp_connection; std::lock_guard g( conn_mtx ); stat.last_handshake = last_handshake_recv; @@ -1212,19 +1242,19 @@ namespace eosio { // thread safe, all atomics bool connection::connected() const { - return socket_is_open() && !connecting && !closing; + return socket_is_open() && state() == connection_state::connected; } // thread safe, all atomics bool connection::current() const { - return (connected() && !syncing); + return (connected() && !peer_syncing_from_us); } // thread safe bool connection::should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num) const { - fc_dlog(logger, "id: ${id} blocks conn: ${t} current: ${c} socket_open: ${so} syncing: ${s} connecting: ${con} closing: ${close} peer_start_block: ${sb} peer_head: ${h} latency: ${lat}us no_retry: ${g}", + fc_dlog(logger, "id: ${id} blocks conn: ${t} current: ${c} socket_open: ${so} syncing from us: ${s} state: ${con} peer_start_block: ${sb} peer_head: ${h} latency: ${lat}us no_retry: ${g}", ("id", connection_id)("t", is_blocks_connection()) - ("c", current())("so", socket_is_open())("s", syncing.load())("con", connecting.load())("close", closing.load()) + ("c", current())("so", socket_is_open())("s", peer_syncing_from_us.load())("con", state_str(state())) ("sb", peer_start_block_num.load())("h", peer_head_block_num.load())("lat", get_net_latency_ns()/1000)("g", reason_str(no_retry))); if (is_blocks_connection() && current()) { if (no_retry == go_away_reason::no_reason) { @@ -1243,7 +1273,7 @@ namespace eosio { } void connection::close( bool reconnect, bool shutdown ) { - closing = true; + set_state(connection_state::closing); strand.post( [self = shared_from_this(), reconnect, shutdown]() { connection::_close( self.get(), reconnect, shutdown ); }); @@ -1259,8 +1289,7 @@ namespace eosio { } self->socket.reset( new tcp::socket( my_impl->thread_pool.get_executor() ) ); self->flush_queues(); - self->connecting = false; - self->syncing = false; + self->peer_syncing_from_us = false; self->block_status_monitor_.reset(); ++self->consecutive_immediate_connection_close; bool has_last_req = false; @@ -1281,7 +1310,7 @@ namespace eosio { if( !shutdown) my_impl->sync_master->sync_reset_lib_num( self->shared_from_this(), true ); peer_ilog( self, "closing" ); self->cancel_wait(); - self->closing = false; + self->set_state(connection_state::closed); if( reconnect && !shutdown ) { my_impl->connections.start_conn_timer( std::chrono::milliseconds( 100 ), connection_wptr() ); @@ -1374,11 +1403,11 @@ namespace eosio { } void connection::stop_send() { - syncing = false; + peer_syncing_from_us = false; } void connection::send_handshake() { - if (closing) + if (!connected()) return; strand.post( [c = shared_from_this()]() { std::unique_lock g_conn( c->conn_mtx ); @@ -1456,7 +1485,7 @@ namespace eosio { // called from connection strand void connection::do_queue_write() { - if( !buffer_queue.ready_to_send() || closing ) + if( !buffer_queue.ready_to_send() || !connected() ) return; connection_ptr c(shared_from_this()); @@ -1863,7 +1892,7 @@ namespace eosio { // keep track of which node was synced from last; round-robin among the current (sync_peer_limit) lowest latency peers ++sync_ordinal; - // example: sync_ordinal is 6 after inc above then there may be connections with 3,4,5. + // example: sync_ordinal is 6 after inc above then there may be connections with 3,4,5 (5 being the last synced from) // Choose from the lowest sync_ordinal of the sync_peer_limit of lowest latency, note 0 means not synced from yet size_t the_one = 0; uint32_t lowest_ordinal = std::numeric_limits::max(); @@ -2031,7 +2060,7 @@ namespace eosio { if (chain_info.head_id == msg.head_id) { peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 0, lib ${l}", ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16))("l", chain_info.lib_num) ); - c->syncing = false; + c->peer_syncing_from_us = false; notice_message note; note.known_blocks.mode = none; note.known_trx.mode = catch_up; @@ -2043,7 +2072,7 @@ namespace eosio { peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 1, head ${h}, lib ${l}", ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) ("h", chain_info.head_num)("l", chain_info.lib_num) ); - c->syncing = false; + c->peer_syncing_from_us = false; if (c->sent_handshake_count > 0) { c->send_handshake(); } @@ -2067,7 +2096,7 @@ namespace eosio { } c->enqueue( note ); } - c->syncing = true; + c->peer_syncing_from_us = true; return; } @@ -2075,7 +2104,7 @@ namespace eosio { peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 3, head ${h}, lib ${l}", ("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) ("h", chain_info.head_num)("l", chain_info.lib_num) ); - c->syncing = false; + c->peer_syncing_from_us = false; verify_catchup(c, msg.head_num, msg.head_id); return; } else if(chain_info.head_num >= msg.head_num + nblk_combined_latency) { @@ -2095,7 +2124,7 @@ namespace eosio { } c->enqueue( note ); } - c->syncing = false; + c->peer_syncing_from_us = false; bool on_fork = true; try { controller& cc = my_impl->chain_plug->chain(); @@ -2372,13 +2401,13 @@ namespace eosio { void dispatch_manager::bcast_block(const signed_block_ptr& b, const block_id_type& id) { fc_dlog( logger, "bcast block ${b}", ("b", b->block_num()) ); - if( my_impl->sync_master->syncing_with_peer() ) return; + if(my_impl->sync_master->syncing_from_peer() ) return; block_buffer_factory buff_factory; const auto bnum = b->block_num(); my_impl->connections.for_each_block_connection( [this, &id, &bnum, &b, &buff_factory]( auto& cp ) { - fc_dlog( logger, "socket_is_open ${s}, connecting ${c}, syncing ${ss}, connection ${cid}", - ("s", cp->socket_is_open())("c", cp->connecting.load())("ss", cp->syncing.load())("cid", cp->connection_id) ); + fc_dlog( logger, "socket_is_open ${s}, state ${c}, syncing ${ss}, connection ${cid}", + ("s", cp->socket_is_open())("c", connection::state_str(cp->state()))("ss", cp->peer_syncing_from_us.load())("cid", cp->connection_id) ); if( !cp->current() ) return; if( !add_peer_block( id, cp->connection_id ) ) { @@ -2567,7 +2596,7 @@ namespace eosio { } else { fc_elog( logger, "Unable to resolve ${host}:${port} ${error}", ("host", host)("port", port)( "error", err.message() ) ); - c->connecting = false; + c->set_state(connection_state::closed); ++c->consecutive_immediate_connection_close; } } ) ); @@ -2577,7 +2606,7 @@ namespace eosio { // called from connection strand void connection::connect( const std::shared_ptr& resolver, const tcp::resolver::results_type& endpoints ) { - connecting = true; + set_state(connection_state::connecting); pending_message_buffer.reset(); buffer_queue.clear_out_queue(); boost::asio::async_connect( *socket, endpoints, @@ -2843,7 +2872,8 @@ namespace eosio { ("num", bh.block_num())("id", blk_id.str().substr(8,16)) ("latency", (fc::time_point::now() - bh.timestamp).count()/1000) ("h", my_impl->get_chain_head_num())); - if( !my_impl->sync_master->syncing_with_peer() ) { // guard against peer thinking it needs to send us old blocks + if( !my_impl->sync_master + ->syncing_from_peer() ) { // guard against peer thinking it needs to send us old blocks uint32_t lib_num = my_impl->get_chain_lib_num(); if( blk_num < lib_num ) { std::unique_lock g( conn_mtx ); @@ -2896,7 +2926,7 @@ namespace eosio { pending_message_buffer.advance_read_ptr( message_length ); return true; } - if (my_impl->sync_master->syncing_with_peer()) { + if (my_impl->sync_master->syncing_from_peer()) { peer_wlog(this, "syncing, dropping trx"); return true; } @@ -3041,7 +3071,7 @@ namespace eosio { last_handshake_recv = msg; g_conn.unlock(); - connecting = false; + set_state(connection_state::connected); if (msg.generation == 1) { if( msg.node_id == my_impl->node_id) { peer_elog( this, "Self connection detected node_id ${id}. Closing connection", ("id", msg.node_id) ); @@ -3279,7 +3309,7 @@ namespace eosio { // notices of previously unknown blocks or txns, // peer_dlog( this, "received notice_message" ); - connecting = false; + set_state(connection_state::connected); if( msg.known_blocks.ids.size() > 2 ) { peer_elog( this, "Invalid notice_message, known_blocks.ids.size ${s}, closing connection", ("s", msg.known_blocks.ids.size()) ); @@ -4278,7 +4308,7 @@ namespace eosio { ++num_peers; } - if (!(*it)->socket_is_open() && !(*it)->connecting) { + if (!(*it)->socket_is_open() && (*it)->state() != connection::connection_state::connecting) { if (!(*it)->incoming()) { if (!(*it)->resolve_and_connect()) { it = connections.erase(it); From c0a0b72e51a434fbefe02a704489b591e5114134 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 31 May 2023 14:50:23 -0500 Subject: [PATCH 15/16] GH-1072 Resize list since those outside range are not needed. --- plugins/net_plugin/net_plugin.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index d3e16736ba..6fe815b9cb 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -1877,6 +1877,7 @@ namespace eosio { std::partial_sort(conns.begin(), conns.begin() + sync_peer_limit, conns.end(), [](const connection_ptr& lhs, const connection_ptr& rhs) { return lhs->get_net_latency_ns() < rhs->get_net_latency_ns(); }); + conns.resize(sync_peer_limit); } fc_dlog(logger, "Valid sync peers ${s}, sync_ordinal ${so}", ("s", conns.size())("so", sync_ordinal.load())); @@ -1896,7 +1897,7 @@ namespace eosio { // Choose from the lowest sync_ordinal of the sync_peer_limit of lowest latency, note 0 means not synced from yet size_t the_one = 0; uint32_t lowest_ordinal = std::numeric_limits::max(); - for (size_t i = 0; i < conns.size() && i < sync_peer_limit && lowest_ordinal != 0; ++i) { + for (size_t i = 0; i < conns.size() && lowest_ordinal != 0; ++i) { uint32_t sync_ord = conns[i]->sync_ordinal; fc_dlog(logger, "compare sync ords, conn: ${lcid}, ord: ${l} < ${r}, latency: ${lat}us", ("lcid", conns[i]->connection_id)("l", sync_ord)("r", lowest_ordinal)("lat", conns[i]->get_net_latency_ns()/1000)); From 94df7279b9c4409ecc89630f3b637e91843cff80 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 31 May 2023 15:29:32 -0500 Subject: [PATCH 16/16] GH-1072 Need to send handshake when connecting --- plugins/net_plugin/net_plugin.cpp | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 6fe815b9cb..84d61e6b12 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -876,6 +876,7 @@ namespace eosio { tstamp latest_blk_time{0}; bool connected() const; + bool closed() const; // socket is not open or is closed or closing, thread safe bool current() const; bool should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num) const; @@ -1245,6 +1246,12 @@ namespace eosio { return socket_is_open() && state() == connection_state::connected; } + bool connection::closed() const { + return !socket_is_open() + || state() == connection_state::closing + || state() == connection_state::closed; + } + // thread safe, all atomics bool connection::current() const { return (connected() && !peer_syncing_from_us); @@ -1407,7 +1414,7 @@ namespace eosio { } void connection::send_handshake() { - if (!connected()) + if (closed()) return; strand.post( [c = shared_from_this()]() { std::unique_lock g_conn( c->conn_mtx ); @@ -1485,7 +1492,7 @@ namespace eosio { // called from connection strand void connection::do_queue_write() { - if( !buffer_queue.ready_to_send() || !connected() ) + if( !buffer_queue.ready_to_send() || closed() ) return; connection_ptr c(shared_from_this());