Skip to content

Commit

Permalink
Merge pull request #495 from eosnetworkfoundation/more_freq_handshake
Browse files Browse the repository at this point in the history
Backport: Send handshake for heartbeat/2 if no block -- main
  • Loading branch information
linh2931 authored and heifner committed Jul 8, 2022
1 parent 5e9b0a2 commit b17b624
Showing 1 changed file with 35 additions and 22 deletions.
57 changes: 35 additions & 22 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ namespace eosio {
fc::time_point window_start_; ///< The start of the recent rbw (0 implies not started)
uint32_t events_{0}; ///< The number of consecutive rbws
const uint32_t max_consecutive_rejected_windows_{13};

public:
/// ctor
///
Expand Down Expand Up @@ -670,6 +670,7 @@ namespace eosio {
// timestamp for the lastest message
tstamp latest_msg_time{0};
tstamp hb_timeout{std::chrono::milliseconds{def_keepalive_interval}.count()};
tstamp latest_blk_time{0};

bool connected();
bool current();
Expand All @@ -684,7 +685,7 @@ namespace eosio {
bool process_next_trx_message(uint32_t message_length);
public:

bool populate_handshake( handshake_message& hello, bool force );
bool populate_handshake( handshake_message& hello );

bool resolve_and_connect();
void connect( const std::shared_ptr<tcp::resolver>& resolver, tcp::resolver::results_type endpoints );
Expand All @@ -700,7 +701,7 @@ namespace eosio {
*/
bool process_next_message(uint32_t message_length);

void send_handshake( bool force = false );
void send_handshake();

/** \name Peer Timestamps
* Time message handling
Expand Down Expand Up @@ -1129,10 +1130,10 @@ namespace eosio {
syncing = false;
}

void connection::send_handshake( bool force ) {
strand.post( [force, c = shared_from_this()]() {
void connection::send_handshake() {
strand.post( [c = shared_from_this()]() {
std::unique_lock<std::mutex> g_conn( c->conn_mtx );
if( c->populate_handshake( c->last_handshake_sent, force ) ) {
if( c->populate_handshake( c->last_handshake_sent ) ) {
static_assert( std::is_same_v<decltype( c->sent_handshake_count ), int16_t>, "INT16_MAX based on int16_t" );
if( c->sent_handshake_count == INT16_MAX ) c->sent_handshake_count = 1; // do not wrap
c->last_handshake_sent.generation = ++c->sent_handshake_count;
Expand All @@ -1148,21 +1149,31 @@ namespace eosio {
}

// called from connection strand
void connection::check_heartbeat( tstamp current_time )
{
if( protocol_version >= heartbeat_interval ) {
if( latest_msg_time > 0 && current_time > latest_msg_time + hb_timeout ) {
void connection::check_heartbeat( tstamp current_time ) {
if( protocol_version >= heartbeat_interval && latest_msg_time > 0 ) {
if( current_time > latest_msg_time + hb_timeout ) {
no_retry = benign_other;
if( !peer_address().empty() ) {
peer_wlog(this, "heartbeat timed out for peer address");
close(true); // reconnect
fc_wlog(logger, "heartbeat timed out for peer address ${adr}", ("adr", peer_address()));
close(true);
} else {
peer_wlog( this, "heartbeat timed out" );
{
std::lock_guard<std::mutex> g_conn( conn_mtx );
fc_wlog(logger, "heartbeat timed out from ${p} ${ag}",
("p", last_handshake_recv.p2p_address)("ag", last_handshake_recv.agent));
}
close(false);
}
return;
} else {
const tstamp timeout = std::max(hb_timeout/2, 2*std::chrono::milliseconds(config::block_interval_ms).count());
if ( current_time > latest_blk_time + timeout ) {
send_handshake();
return;
}
}
}

send_time();
}

Expand Down Expand Up @@ -1410,12 +1421,14 @@ namespace eosio {
enqueue_buffer( send_buffer, close_after_send );
}

// called from connection strand
void connection::enqueue_block( const signed_block_ptr& b, bool to_sync_queue) {
peer_dlog( this, "enqueue block ${num}", ("num", b->block_num()) );
verify_strand_in_this_thread( strand, __func__, __LINE__ );

block_buffer_factory buff_factory;
auto sb = buff_factory.get_send_buffer( b );
latest_blk_time = get_time();
enqueue_buffer( sb, no_reason, to_sync_queue);
}

Expand Down Expand Up @@ -1723,6 +1736,7 @@ namespace eosio {
peer_dlog( c, "We are already caught up, my irr = ${b}, head = ${h}, target = ${t}",
("b", lib_num)( "h", fork_head_block_num )( "t", target ) );
c->send_handshake();
return;
}

if( sync_state == in_sync ) {
Expand Down Expand Up @@ -1807,6 +1821,9 @@ namespace eosio {
peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 1",
("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
c->syncing = false;
if (c->sent_handshake_count > 0) {
c->send_handshake();
}
return;
}
if (lib_num > msg.head_num + nblk_combined_latency) {
Expand Down Expand Up @@ -1951,7 +1968,7 @@ namespace eosio {
c->close();
} else {
g.unlock();
c->send_handshake(true);
c->send_handshake();
}
}

Expand Down Expand Up @@ -2123,6 +2140,7 @@ namespace eosio {
send_buffer_type sb = buff_factory.get_send_buffer( b );

cp->strand.post( [this, cp, id, bnum, sb{std::move(sb)}]() {
cp->latest_blk_time = cp->get_time();
std::unique_lock<std::mutex> g_conn( cp->conn_mtx );
bool has_block = cp->last_handshake_recv.last_irreversible_block_num >= bnum;
g_conn.unlock();
Expand Down Expand Up @@ -2549,6 +2567,7 @@ namespace eosio {
unsigned_int which{};
fc::raw::unpack( peek_ds, which );
if( which == signed_block_which ) {
latest_blk_time = get_time();
return process_next_block_message( message_length );

} else if( which == packed_transaction_which ) {
Expand Down Expand Up @@ -2884,7 +2903,7 @@ namespace eosio {
if( protocol_version >= proto_pruned_types && protocol_version < mandel_initial ) {
sent_handshake_count = 0;
net_version = proto_explicit_sync;
send_handshake(true);
send_handshake();
return;
}

Expand Down Expand Up @@ -3448,19 +3467,13 @@ namespace eosio {
}

// call from connection strand
bool connection::populate_handshake( handshake_message& hello, bool force ) {
bool connection::populate_handshake( handshake_message& hello ) {
namespace sc = std::chrono;
bool send = force;
hello.network_version = net_version_base + net_version;
const auto prev_head_id = hello.head_id;
uint32_t lib, head;
std::tie( lib, std::ignore, head,
hello.last_irreversible_block_id, std::ignore, hello.head_id ) = my_impl->get_chain_info();
// only send handshake if state has changed since last handshake
send |= lib != hello.last_irreversible_block_num;
send |= head != hello.head_num;
send |= prev_head_id != hello.head_id;
if( !send ) return false;
hello.last_irreversible_block_num = lib;
hello.head_num = head;
hello.chain_id = my_impl->chain_id;
Expand Down

0 comments on commit b17b624

Please sign in to comment.