Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.2] Backport: Send handshake for heartbeat/2 if no block #495

Merged
merged 6 commits into from
Jun 23, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 35 additions & 22 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ namespace eosio {
fc::time_point window_start_; ///< The start of the recent rbw (0 implies not started)
uint32_t events_{0}; ///< The number of consecutive rbws
const uint32_t max_consecutive_rejected_windows_{13};

public:
/// ctor
///
Expand Down Expand Up @@ -670,6 +670,7 @@ namespace eosio {
// timestamp for the lastest message
tstamp latest_msg_time{0};
tstamp hb_timeout{std::chrono::milliseconds{def_keepalive_interval}.count()};
tstamp latest_blk_time{0};

bool connected();
bool current();
Expand All @@ -684,7 +685,7 @@ namespace eosio {
bool process_next_trx_message(uint32_t message_length);
public:

bool populate_handshake( handshake_message& hello, bool force );
bool populate_handshake( handshake_message& hello );

bool resolve_and_connect();
void connect( const std::shared_ptr<tcp::resolver>& resolver, tcp::resolver::results_type endpoints );
Expand All @@ -700,7 +701,7 @@ namespace eosio {
*/
bool process_next_message(uint32_t message_length);

void send_handshake( bool force = false );
void send_handshake();

/** \name Peer Timestamps
* Time message handling
Expand Down Expand Up @@ -1129,10 +1130,10 @@ namespace eosio {
syncing = false;
}

void connection::send_handshake( bool force ) {
strand.post( [force, c = shared_from_this()]() {
void connection::send_handshake() {
strand.post( [c = shared_from_this()]() {
std::unique_lock<std::mutex> g_conn( c->conn_mtx );
if( c->populate_handshake( c->last_handshake_sent, force ) ) {
if( c->populate_handshake( c->last_handshake_sent ) ) {
static_assert( std::is_same_v<decltype( c->sent_handshake_count ), int16_t>, "INT16_MAX based on int16_t" );
if( c->sent_handshake_count == INT16_MAX ) c->sent_handshake_count = 1; // do not wrap
c->last_handshake_sent.generation = ++c->sent_handshake_count;
Expand All @@ -1148,21 +1149,31 @@ namespace eosio {
}

// called from connection strand
void connection::check_heartbeat( tstamp current_time )
{
if( protocol_version >= heartbeat_interval ) {
if( latest_msg_time > 0 && current_time > latest_msg_time + hb_timeout ) {
void connection::check_heartbeat( tstamp current_time ) {
if( protocol_version >= heartbeat_interval && latest_msg_time > 0 ) {
if( current_time > latest_msg_time + hb_timeout ) {
no_retry = benign_other;
if( !peer_address().empty() ) {
peer_wlog(this, "heartbeat timed out for peer address");
close(true); // reconnect
fc_wlog(logger, "heartbeat timed out for peer address ${adr}", ("adr", peer_address()));
close(true);
} else {
peer_wlog( this, "heartbeat timed out" );
{
std::lock_guard<std::mutex> g_conn( conn_mtx );
fc_wlog(logger, "heartbeat timed out from ${p} ${ag}",
("p", last_handshake_recv.p2p_address)("ag", last_handshake_recv.agent));
}
close(false);
}
return;
} else {
const tstamp timeout = std::max(hb_timeout/2, 2*std::chrono::milliseconds(config::block_interval_ms).count());
if ( current_time > latest_blk_time + timeout ) {
send_handshake();
return;
}
}
}

send_time();
}

Expand Down Expand Up @@ -1410,12 +1421,14 @@ namespace eosio {
enqueue_buffer( send_buffer, close_after_send );
}

// called from connection strand
void connection::enqueue_block( const signed_block_ptr& b, bool to_sync_queue) {
peer_dlog( this, "enqueue block ${num}", ("num", b->block_num()) );
verify_strand_in_this_thread( strand, __func__, __LINE__ );

block_buffer_factory buff_factory;
auto sb = buff_factory.get_send_buffer( b );
latest_blk_time = get_time();
enqueue_buffer( sb, no_reason, to_sync_queue);
}

Expand Down Expand Up @@ -1723,6 +1736,7 @@ namespace eosio {
peer_dlog( c, "We are already caught up, my irr = ${b}, head = ${h}, target = ${t}",
("b", lib_num)( "h", fork_head_block_num )( "t", target ) );
c->send_handshake();
return;
}

if( sync_state == in_sync ) {
Expand Down Expand Up @@ -1807,6 +1821,9 @@ namespace eosio {
peer_ilog( c, "handshake lib ${lib}, head ${head}, head id ${id}.. sync 1",
("lib", msg.last_irreversible_block_num)("head", msg.head_num)("id", msg.head_id.str().substr(8,16)) );
c->syncing = false;
if (c->sent_handshake_count > 0) {
c->send_handshake();
}
return;
}
if (lib_num > msg.head_num + nblk_combined_latency) {
Expand Down Expand Up @@ -1951,7 +1968,7 @@ namespace eosio {
c->close();
} else {
g.unlock();
c->send_handshake(true);
c->send_handshake();
}
}

Expand Down Expand Up @@ -2123,6 +2140,7 @@ namespace eosio {
send_buffer_type sb = buff_factory.get_send_buffer( b );

cp->strand.post( [this, cp, id, bnum, sb{std::move(sb)}]() {
cp->latest_blk_time = cp->get_time();
std::unique_lock<std::mutex> g_conn( cp->conn_mtx );
bool has_block = cp->last_handshake_recv.last_irreversible_block_num >= bnum;
g_conn.unlock();
Expand Down Expand Up @@ -2549,6 +2567,7 @@ namespace eosio {
unsigned_int which{};
fc::raw::unpack( peek_ds, which );
if( which == signed_block_which ) {
latest_blk_time = get_time();
return process_next_block_message( message_length );

} else if( which == packed_transaction_which ) {
Expand Down Expand Up @@ -2884,7 +2903,7 @@ namespace eosio {
if( protocol_version >= proto_pruned_types && protocol_version < mandel_initial ) {
sent_handshake_count = 0;
net_version = proto_explicit_sync;
send_handshake(true);
send_handshake();
return;
}

Expand Down Expand Up @@ -3448,19 +3467,13 @@ namespace eosio {
}

// call from connection strand
bool connection::populate_handshake( handshake_message& hello, bool force ) {
bool connection::populate_handshake( handshake_message& hello ) {
namespace sc = std::chrono;
bool send = force;
hello.network_version = net_version_base + net_version;
const auto prev_head_id = hello.head_id;
uint32_t lib, head;
std::tie( lib, std::ignore, head,
hello.last_irreversible_block_id, std::ignore, hello.head_id ) = my_impl->get_chain_info();
// only send handshake if state has changed since last handshake
send |= lib != hello.last_irreversible_block_num;
send |= head != hello.head_num;
send |= prev_head_id != hello.head_id;
if( !send ) return false;
hello.last_irreversible_block_num = lib;
hello.head_num = head;
hello.chain_id = my_impl->chain_id;
Expand Down