Skip to content

Commit

Permalink
Merge pull request #1225 from AntelopeIO/GH-1072-sync-ahead
Browse files Browse the repository at this point in the history
P2P sync ahead while applying received blocks
  • Loading branch information
heifner authored Jun 5, 2023
2 parents c697250 + 8756419 commit d04757a
Showing 1 changed file with 62 additions and 57 deletions.
119 changes: 62 additions & 57 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,10 @@ namespace eosio {

alignas(hardware_destructive_interference_size)
std::mutex sync_mtx;
uint32_t sync_known_lib_num{0};
uint32_t sync_last_requested_num{0};
uint32_t sync_next_expected_num{0};
connection_ptr sync_source;
uint32_t sync_known_lib_num{0}; // highest known lib num from currently connected peers
uint32_t sync_last_requested_num{0}; // end block number of the last requested range, inclusive
uint32_t sync_next_expected_num{0}; // the next block number we need from peer
connection_ptr sync_source; // connection we are currently syncing from

const uint32_t sync_req_span{0};
const uint32_t sync_peer_limit{0};
Expand All @@ -242,13 +242,8 @@ namespace eosio {
void sync_reassign_fetch( const connection_ptr& c, go_away_reason reason );
void rejected_block( const connection_ptr& c, uint32_t blk_num );
void sync_recv_block( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied );
void sync_update_expected( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied );
void recv_handshake( const connection_ptr& c, const handshake_message& msg, uint32_t nblk_combined_latency );
void sync_recv_notice( const connection_ptr& c, const notice_message& msg );
inline void reset_last_requested_num() {
std::lock_guard g(sync_mtx);
sync_last_requested_num = 0;
}
};

class dispatch_manager {
Expand Down Expand Up @@ -824,6 +819,8 @@ namespace eosio {
uint32_t peer_lib_num = 0;

std::atomic<uint32_t> sync_ordinal{0};
// when syncing from a peer, the last block expected of the current range
uint32_t sync_last_requested_block{0};

alignas(hardware_destructive_interference_size)
std::atomic<uint32_t> trx_in_progress_size{0};
Expand Down Expand Up @@ -1205,7 +1202,7 @@ namespace eosio {
return;
if (s == connection_state::connected && curr != connection_state::connecting)
return;
peer_dlog(this, "old connection state ${os} becoming ${ns}", ("os", state_str(curr))("ns", state_str(s)));
fc_dlog(logger, "old connection ${id} state ${os} becoming ${ns}", ("id", connection_id)("os", state_str(curr))("ns", state_str(s)));

conn_state = s;
}
Expand Down Expand Up @@ -1317,6 +1314,7 @@ namespace eosio {
if( !shutdown) my_impl->sync_master->sync_reset_lib_num( self->shared_from_this(), true );
peer_ilog( self, "closing" );
self->cancel_wait();
self->sync_last_requested_block = 0;
self->set_state(connection_state::closed);

if( reconnect && !shutdown ) {
Expand Down Expand Up @@ -1447,11 +1445,13 @@ namespace eosio {
}
return;
}
const tstamp timeout = std::max(hb_timeout/2, 2*std::chrono::milliseconds(config::block_interval_ms).count());
if ( current_time > latest_blk_time + timeout ) {
peer_dlog(this, "half heartbeat timed out, sending handshake");
send_handshake();
return;
if (!my_impl->sync_master->syncing_from_peer()) {
const tstamp timeout = std::max(hb_timeout / 2, 2 * std::chrono::milliseconds(config::block_interval_ms).count());
if (current_time > latest_blk_time + timeout) {
peer_wlog(this, "half heartbeat timed out, sending handshake");
send_handshake();
return;
}
}

}
Expand Down Expand Up @@ -1545,6 +1545,7 @@ namespace eosio {
peer_dlog( this, "cancel sync reason = ${m}, write queue size ${o} bytes",
("m", reason_str( reason ))("o", buffer_queue.write_queue_size()) );
cancel_wait();
sync_last_requested_block = 0;
flush_queues();
switch (reason) {
case validation :
Expand Down Expand Up @@ -1776,6 +1777,7 @@ namespace eosio {

// called from connection strand
void connection::request_sync_blocks(uint32_t start, uint32_t end) {
sync_last_requested_block = end;
sync_request_message srm = {start,end};
enqueue( net_message(srm) );
sync_wait();
Expand Down Expand Up @@ -1860,12 +1862,12 @@ namespace eosio {
} );
sync_known_lib_num = highest_lib_num;

// if closing the connection we are currently syncing from or not syncing, then reset our last requested and next expected.
// if closing the connection we are currently syncing from or not syncing, then request from a diff peer
if( !sync_source || c == sync_source ) {
sync_last_requested_num = 0;
// if starting to sync need to always start from lib as we might be on our own fork
uint32_t lib_num = my_impl->get_chain_lib_num();
sync_next_expected_num = lib_num + 1;
sync_next_expected_num = std::max( lib_num + 1, sync_next_expected_num );
request_next_chunk( std::move(g) );
}
}
Expand Down Expand Up @@ -1922,20 +1924,20 @@ namespace eosio {
void sync_manager::request_next_chunk( std::unique_lock<std::mutex> g_sync, const connection_ptr& conn ) {
auto chain_info = my_impl->get_chain_info();

fc_dlog( logger, "sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}",
("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span) );
fc_dlog( logger, "sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, head: ${h}",
("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span)("h", chain_info.head_num) );

if( chain_info.head_num < sync_last_requested_num && sync_source && sync_source->current() ) {
fc_dlog( logger, "ignoring request, head is ${h} last req = ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, source connection ${c}",
if( chain_info.head_num + sync_req_span < sync_last_requested_num && sync_source && sync_source->current() ) {
fc_wlog( logger, "ignoring request, head is ${h} last req = ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, source connection ${c}",
("h", chain_info.head_num)("r", sync_last_requested_num)("e", sync_next_expected_num)
("k", sync_known_lib_num)("s", sync_req_span)("c", sync_source->connection_id) );
return;
}

if (conn) {
// p2p_high_latency_test.py test depends on this exact log statement.
peer_ilog(conn, "Catching up with chain, our last req is ${cc}, theirs is ${t}, next expected ${n}",
("cc", sync_last_requested_num)("t", sync_known_lib_num)("n", sync_next_expected_num));
peer_ilog(conn, "Catching up with chain, our last req is ${cc}, theirs is ${t}, next expected ${n}, head ${h}",
("cc", sync_last_requested_num)("t", sync_known_lib_num)("n", sync_next_expected_num)("h", chain_info.head_num));
}

/* ----------
Expand Down Expand Up @@ -1968,15 +1970,15 @@ namespace eosio {
sync_source = new_sync_source;
g_sync.unlock();
request_sent = true;
new_sync_source->strand.post( [new_sync_source, start, end]() {
peer_ilog( new_sync_source, "requesting range ${s} to ${e}", ("s", start)("e", end) );
new_sync_source->strand.post( [new_sync_source, start, end, head_num=chain_info.head_num]() {
peer_ilog( new_sync_source, "requesting range ${s} to ${e}, head ${h}", ("s", start)("e", end)("h", head_num) );
new_sync_source->request_sync_blocks( start, end );
} );
}
}
if( !request_sent ) {
g_sync.unlock();
fc_dlog(logger, "Unable to request range, sending handshakes to everyone");
fc_wlog(logger, "Unable to request range, sending handshakes to everyone");
send_handshakes();
}
}
Expand All @@ -1996,7 +1998,7 @@ namespace eosio {
("head", fork_head_block_num ) );

return( sync_last_requested_num < sync_known_lib_num ||
fork_head_block_num < sync_last_requested_num );
sync_next_expected_num < sync_last_requested_num );
}

// called from c's connection strand
Expand Down Expand Up @@ -2223,7 +2225,9 @@ namespace eosio {
c->last_handshake_recv.last_irreversible_block_num = msg.known_trx.pending;
}
sync_reset_lib_num(c, false);
start_sync(c, msg.known_trx.pending);
if (is_in_sync()) {
start_sync(c, msg.known_trx.pending);
}
}
}

Expand All @@ -2244,35 +2248,19 @@ namespace eosio {
}
}

// called from connection strand
void sync_manager::sync_update_expected( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied ) {
std::unique_lock<std::mutex> g_sync( sync_mtx );
if( blk_num <= sync_last_requested_num ) {
peer_dlog( c, "sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}",
("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span) );
if (blk_num != sync_next_expected_num && !blk_applied) {
auto sync_next_expected = sync_next_expected_num;
g_sync.unlock();
peer_dlog( c, "expected block ${ne} but got ${bn}", ("ne", sync_next_expected)("bn", blk_num) );
return;
}
sync_next_expected_num = blk_num + 1;
}
}

// called from c's connection strand
void sync_manager::sync_recv_block(const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied) {
peer_dlog( c, "got block ${bn}", ("bn", blk_num) );
peer_dlog( c, "${d} block ${bn}", ("d", blk_applied ? "applied" : "got")("bn", blk_num) );
if( app().is_quiting() ) {
c->close( false, true );
return;
}
c->latest_blk_time = c->get_time();
c->block_status_monitor_.accepted();
sync_update_expected( c, blk_id, blk_num, blk_applied );
std::unique_lock<std::mutex> g_sync( sync_mtx );
stages state = sync_state;
peer_dlog( c, "state ${s}", ("s", stage_str( state )) );
if( state == head_catchup ) {
std::unique_lock g_sync( sync_mtx );
peer_dlog( c, "sync_manager in head_catchup state" );
sync_source.reset();
g_sync.unlock();
Expand Down Expand Up @@ -2306,17 +2294,34 @@ namespace eosio {
send_handshakes();
}
} else if( state == lib_catchup ) {
if( blk_num >= sync_known_lib_num ) {
std::unique_lock g_sync( sync_mtx );
if( blk_applied && blk_num >= sync_known_lib_num ) {
peer_dlog( c, "All caught up with last known last irreversible block resending handshake" );
set_state( in_sync );
g_sync.unlock();
send_handshakes();
} else if( blk_num >= sync_last_requested_num ) {
request_next_chunk( std::move( g_sync) );
} else {
g_sync.unlock();
peer_dlog( c, "calling sync_wait" );
c->sync_wait();
if (!blk_applied) {
if (blk_num >= c->sync_last_requested_block) {
peer_dlog(c, "calling cancel_wait, block ${b}", ("b", blk_num));
c->cancel_wait();
} else {
peer_dlog(c, "calling sync_wait, block ${b}", ("b", blk_num));
c->sync_wait();
}

sync_next_expected_num = blk_num + 1;
}

uint32_t head = my_impl->get_chain_head_num();
if (head + sync_req_span > sync_last_requested_num) { // don't allow to get too far head (one sync_req_span)
if (sync_next_expected_num > sync_last_requested_num && sync_last_requested_num < sync_known_lib_num) {
fc_dlog(logger, "Requesting range ahead, head: ${h} blk_num: ${bn} sync_next_expected_num ${nen} sync_last_requested_num: ${lrn}",
("h", head)("bn", blk_num)("nen", sync_next_expected_num)("lrn", sync_last_requested_num));
request_next_chunk(std::move(g_sync));
}
}

}
}
}
Expand Down Expand Up @@ -2880,8 +2885,7 @@ namespace eosio {
("num", bh.block_num())("id", blk_id.str().substr(8,16))
("latency", (fc::time_point::now() - bh.timestamp).count()/1000)
("h", my_impl->get_chain_head_num()));
if( !my_impl->sync_master
->syncing_from_peer() ) { // guard against peer thinking it needs to send us old blocks
if( !my_impl->sync_master->syncing_from_peer() ) { // guard against peer thinking it needs to send us old blocks
uint32_t lib_num = my_impl->get_chain_lib_num();
if( blk_num < lib_num ) {
std::unique_lock<std::mutex> g( conn_mtx );
Expand All @@ -2890,14 +2894,15 @@ namespace eosio {
peer_ilog( this, "received block ${n} less than ${which}lib ${lib}",
("n", blk_num)("which", blk_num < last_sent_lib ? "sent " : "")
("lib", blk_num < last_sent_lib ? last_sent_lib : lib_num) );
my_impl->sync_master->reset_last_requested_num();
enqueue( (sync_request_message) {0, 0} );
send_handshake();
cancel_wait();

pending_message_buffer.advance_read_ptr( message_length );
return true;
}
} else {
my_impl->sync_master->sync_recv_block(shared_from_this(), blk_id, blk_num, false);
}

auto ds = pending_message_buffer.create_datastream();
Expand Down Expand Up @@ -3542,7 +3547,7 @@ namespace eosio {
c->strand.post( [sync_master = my_impl->sync_master.get(),
dispatcher = my_impl->dispatcher.get(), c, blk_id, blk_num]() {
dispatcher->add_peer_block( blk_id, c->connection_id );
sync_master->sync_recv_block( c, blk_id, blk_num, false );
sync_master->sync_recv_block( c, blk_id, blk_num, true );
});
return;
}
Expand Down

0 comments on commit d04757a

Please sign in to comment.