diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 84d61e6b12..572574cd4f 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -212,10 +212,10 @@ namespace eosio { alignas(hardware_destructive_interference_size) std::mutex sync_mtx; - uint32_t sync_known_lib_num{0}; - uint32_t sync_last_requested_num{0}; - uint32_t sync_next_expected_num{0}; - connection_ptr sync_source; + uint32_t sync_known_lib_num{0}; // highest known lib num from currently connected peers + uint32_t sync_last_requested_num{0}; // end block number of the last requested range, inclusive + uint32_t sync_next_expected_num{0}; // the next block number we need from peer + connection_ptr sync_source; // connection we are currently syncing from const uint32_t sync_req_span{0}; const uint32_t sync_peer_limit{0}; @@ -242,13 +242,8 @@ namespace eosio { void sync_reassign_fetch( const connection_ptr& c, go_away_reason reason ); void rejected_block( const connection_ptr& c, uint32_t blk_num ); void sync_recv_block( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied ); - void sync_update_expected( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied ); void recv_handshake( const connection_ptr& c, const handshake_message& msg, uint32_t nblk_combined_latency ); void sync_recv_notice( const connection_ptr& c, const notice_message& msg ); - inline void reset_last_requested_num() { - std::lock_guard g(sync_mtx); - sync_last_requested_num = 0; - } }; class dispatch_manager { @@ -824,6 +819,8 @@ namespace eosio { uint32_t peer_lib_num = 0; std::atomic sync_ordinal{0}; + // when syncing from a peer, the last block expected of the current range + uint32_t sync_last_requested_block{0}; alignas(hardware_destructive_interference_size) std::atomic trx_in_progress_size{0}; @@ -1205,7 +1202,7 @@ namespace eosio { return; if (s == connection_state::connected && curr != connection_state::connecting) return; - peer_dlog(this, "old connection state ${os} becoming ${ns}", ("os", state_str(curr))("ns", state_str(s))); + fc_dlog(logger, "old connection ${id} state ${os} becoming ${ns}", ("id", connection_id)("os", state_str(curr))("ns", state_str(s))); conn_state = s; } @@ -1317,6 +1314,7 @@ namespace eosio { if( !shutdown) my_impl->sync_master->sync_reset_lib_num( self->shared_from_this(), true ); peer_ilog( self, "closing" ); self->cancel_wait(); + self->sync_last_requested_block = 0; self->set_state(connection_state::closed); if( reconnect && !shutdown ) { @@ -1447,11 +1445,13 @@ namespace eosio { } return; } - const tstamp timeout = std::max(hb_timeout/2, 2*std::chrono::milliseconds(config::block_interval_ms).count()); - if ( current_time > latest_blk_time + timeout ) { - peer_dlog(this, "half heartbeat timed out, sending handshake"); - send_handshake(); - return; + if (!my_impl->sync_master->syncing_from_peer()) { + const tstamp timeout = std::max(hb_timeout / 2, 2 * std::chrono::milliseconds(config::block_interval_ms).count()); + if (current_time > latest_blk_time + timeout) { + peer_wlog(this, "half heartbeat timed out, sending handshake"); + send_handshake(); + return; + } } } @@ -1545,6 +1545,7 @@ namespace eosio { peer_dlog( this, "cancel sync reason = ${m}, write queue size ${o} bytes", ("m", reason_str( reason ))("o", buffer_queue.write_queue_size()) ); cancel_wait(); + sync_last_requested_block = 0; flush_queues(); switch (reason) { case validation : @@ -1776,6 +1777,7 @@ namespace eosio { // called from connection strand void connection::request_sync_blocks(uint32_t start, uint32_t end) { + sync_last_requested_block = end; sync_request_message srm = {start,end}; enqueue( net_message(srm) ); sync_wait(); @@ -1860,12 +1862,12 @@ namespace eosio { } ); sync_known_lib_num = highest_lib_num; - // if closing the connection we are currently syncing from or not syncing, then reset our last requested and next expected. + // if closing the connection we are currently syncing from or not syncing, then request from a diff peer if( !sync_source || c == sync_source ) { sync_last_requested_num = 0; // if starting to sync need to always start from lib as we might be on our own fork uint32_t lib_num = my_impl->get_chain_lib_num(); - sync_next_expected_num = lib_num + 1; + sync_next_expected_num = std::max( lib_num + 1, sync_next_expected_num ); request_next_chunk( std::move(g) ); } } @@ -1922,11 +1924,11 @@ namespace eosio { void sync_manager::request_next_chunk( std::unique_lock g_sync, const connection_ptr& conn ) { auto chain_info = my_impl->get_chain_info(); - fc_dlog( logger, "sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}", - ("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span) ); + fc_dlog( logger, "sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, head: ${h}", + ("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span)("h", chain_info.head_num) ); - if( chain_info.head_num < sync_last_requested_num && sync_source && sync_source->current() ) { - fc_dlog( logger, "ignoring request, head is ${h} last req = ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, source connection ${c}", + if( chain_info.head_num + sync_req_span < sync_last_requested_num && sync_source && sync_source->current() ) { + fc_wlog( logger, "ignoring request, head is ${h} last req = ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, source connection ${c}", ("h", chain_info.head_num)("r", sync_last_requested_num)("e", sync_next_expected_num) ("k", sync_known_lib_num)("s", sync_req_span)("c", sync_source->connection_id) ); return; @@ -1934,8 +1936,8 @@ namespace eosio { if (conn) { // p2p_high_latency_test.py test depends on this exact log statement. - peer_ilog(conn, "Catching up with chain, our last req is ${cc}, theirs is ${t}, next expected ${n}", - ("cc", sync_last_requested_num)("t", sync_known_lib_num)("n", sync_next_expected_num)); + peer_ilog(conn, "Catching up with chain, our last req is ${cc}, theirs is ${t}, next expected ${n}, head ${h}", + ("cc", sync_last_requested_num)("t", sync_known_lib_num)("n", sync_next_expected_num)("h", chain_info.head_num)); } /* ---------- @@ -1968,15 +1970,15 @@ namespace eosio { sync_source = new_sync_source; g_sync.unlock(); request_sent = true; - new_sync_source->strand.post( [new_sync_source, start, end]() { - peer_ilog( new_sync_source, "requesting range ${s} to ${e}", ("s", start)("e", end) ); + new_sync_source->strand.post( [new_sync_source, start, end, head_num=chain_info.head_num]() { + peer_ilog( new_sync_source, "requesting range ${s} to ${e}, head ${h}", ("s", start)("e", end)("h", head_num) ); new_sync_source->request_sync_blocks( start, end ); } ); } } if( !request_sent ) { g_sync.unlock(); - fc_dlog(logger, "Unable to request range, sending handshakes to everyone"); + fc_wlog(logger, "Unable to request range, sending handshakes to everyone"); send_handshakes(); } } @@ -1996,7 +1998,7 @@ namespace eosio { ("head", fork_head_block_num ) ); return( sync_last_requested_num < sync_known_lib_num || - fork_head_block_num < sync_last_requested_num ); + sync_next_expected_num < sync_last_requested_num ); } // called from c's connection strand @@ -2223,7 +2225,9 @@ namespace eosio { c->last_handshake_recv.last_irreversible_block_num = msg.known_trx.pending; } sync_reset_lib_num(c, false); - start_sync(c, msg.known_trx.pending); + if (is_in_sync()) { + start_sync(c, msg.known_trx.pending); + } } } @@ -2244,35 +2248,19 @@ namespace eosio { } } - // called from connection strand - void sync_manager::sync_update_expected( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied ) { - std::unique_lock g_sync( sync_mtx ); - if( blk_num <= sync_last_requested_num ) { - peer_dlog( c, "sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}", - ("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span) ); - if (blk_num != sync_next_expected_num && !blk_applied) { - auto sync_next_expected = sync_next_expected_num; - g_sync.unlock(); - peer_dlog( c, "expected block ${ne} but got ${bn}", ("ne", sync_next_expected)("bn", blk_num) ); - return; - } - sync_next_expected_num = blk_num + 1; - } - } - // called from c's connection strand void sync_manager::sync_recv_block(const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied) { - peer_dlog( c, "got block ${bn}", ("bn", blk_num) ); + peer_dlog( c, "${d} block ${bn}", ("d", blk_applied ? "applied" : "got")("bn", blk_num) ); if( app().is_quiting() ) { c->close( false, true ); return; } + c->latest_blk_time = c->get_time(); c->block_status_monitor_.accepted(); - sync_update_expected( c, blk_id, blk_num, blk_applied ); - std::unique_lock g_sync( sync_mtx ); stages state = sync_state; peer_dlog( c, "state ${s}", ("s", stage_str( state )) ); if( state == head_catchup ) { + std::unique_lock g_sync( sync_mtx ); peer_dlog( c, "sync_manager in head_catchup state" ); sync_source.reset(); g_sync.unlock(); @@ -2306,17 +2294,34 @@ namespace eosio { send_handshakes(); } } else if( state == lib_catchup ) { - if( blk_num >= sync_known_lib_num ) { + std::unique_lock g_sync( sync_mtx ); + if( blk_applied && blk_num >= sync_known_lib_num ) { peer_dlog( c, "All caught up with last known last irreversible block resending handshake" ); set_state( in_sync ); g_sync.unlock(); send_handshakes(); - } else if( blk_num >= sync_last_requested_num ) { - request_next_chunk( std::move( g_sync) ); } else { - g_sync.unlock(); - peer_dlog( c, "calling sync_wait" ); - c->sync_wait(); + if (!blk_applied) { + if (blk_num >= c->sync_last_requested_block) { + peer_dlog(c, "calling cancel_wait, block ${b}", ("b", blk_num)); + c->cancel_wait(); + } else { + peer_dlog(c, "calling sync_wait, block ${b}", ("b", blk_num)); + c->sync_wait(); + } + + sync_next_expected_num = blk_num + 1; + } + + uint32_t head = my_impl->get_chain_head_num(); + if (head + sync_req_span > sync_last_requested_num) { // don't allow to get too far head (one sync_req_span) + if (sync_next_expected_num > sync_last_requested_num && sync_last_requested_num < sync_known_lib_num) { + fc_dlog(logger, "Requesting range ahead, head: ${h} blk_num: ${bn} sync_next_expected_num ${nen} sync_last_requested_num: ${lrn}", + ("h", head)("bn", blk_num)("nen", sync_next_expected_num)("lrn", sync_last_requested_num)); + request_next_chunk(std::move(g_sync)); + } + } + } } } @@ -2880,8 +2885,7 @@ namespace eosio { ("num", bh.block_num())("id", blk_id.str().substr(8,16)) ("latency", (fc::time_point::now() - bh.timestamp).count()/1000) ("h", my_impl->get_chain_head_num())); - if( !my_impl->sync_master - ->syncing_from_peer() ) { // guard against peer thinking it needs to send us old blocks + if( !my_impl->sync_master->syncing_from_peer() ) { // guard against peer thinking it needs to send us old blocks uint32_t lib_num = my_impl->get_chain_lib_num(); if( blk_num < lib_num ) { std::unique_lock g( conn_mtx ); @@ -2890,7 +2894,6 @@ namespace eosio { peer_ilog( this, "received block ${n} less than ${which}lib ${lib}", ("n", blk_num)("which", blk_num < last_sent_lib ? "sent " : "") ("lib", blk_num < last_sent_lib ? last_sent_lib : lib_num) ); - my_impl->sync_master->reset_last_requested_num(); enqueue( (sync_request_message) {0, 0} ); send_handshake(); cancel_wait(); @@ -2898,6 +2901,8 @@ namespace eosio { pending_message_buffer.advance_read_ptr( message_length ); return true; } + } else { + my_impl->sync_master->sync_recv_block(shared_from_this(), blk_id, blk_num, false); } auto ds = pending_message_buffer.create_datastream(); @@ -3542,7 +3547,7 @@ namespace eosio { c->strand.post( [sync_master = my_impl->sync_master.get(), dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() { dispatcher->add_peer_block( blk_id, c->connection_id ); - sync_master->sync_recv_block( c, blk_id, blk_num, false ); + sync_master->sync_recv_block( c, blk_id, blk_num, true ); }); return; }