Skip to content

Commit

Permalink
GH-1072 Use round trip time_message for calculation of latency.
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Jun 6, 2023
1 parent c776ac9 commit 703f8eb
Showing 1 changed file with 51 additions and 24 deletions.
75 changes: 51 additions & 24 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ namespace eosio {
* day routine and converts to a (at least) 64 bit integer.
*/
static tstamp get_time() {
return std::chrono::system_clock::now().time_since_epoch().count();
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
/** @} */

Expand Down Expand Up @@ -985,7 +985,7 @@ namespace eosio {
void handle_message( packed_transaction_ptr trx );

// returns calculated number of blocks combined latency
uint32_t update_latency(const handshake_message& msg);
uint32_t calc_block_latency();

void process_signed_block( const block_id_type& id, signed_block_ptr block, block_state_ptr bsp );

Expand Down Expand Up @@ -1466,6 +1466,7 @@ namespace eosio {
xpkt.rec = dst;
xpkt.xmt = get_time();
org = xpkt.xmt;
peer_dlog( this, "send init time_message: ${t}", ("t", xpkt) );
enqueue(xpkt);
}

Expand All @@ -1475,6 +1476,7 @@ namespace eosio {
xpkt.org = msg.xmt;
xpkt.rec = msg.dst;
xpkt.xmt = get_time();
peer_dlog( this, "send time_message: ${t}", ("t", xpkt) );
enqueue(xpkt);
}

Expand Down Expand Up @@ -2628,6 +2630,7 @@ namespace eosio {
if( !err && socket->is_open() && socket == c->socket ) {
if( c->start_session() ) {
c->send_handshake();
c->send_time();
}
} else {
fc_elog( logger, "connection failed to ${a}, ${error}", ("a", c->peer_address())( "error", err.message()));
Expand Down Expand Up @@ -3230,27 +3233,19 @@ namespace eosio {
}
}

uint32_t nblk_combined_latency = update_latency(msg);
uint32_t nblk_combined_latency = calc_block_latency();
my_impl->sync_master->recv_handshake( shared_from_this(), msg, nblk_combined_latency );
}

// called from connection strand
uint32_t connection::update_latency(const handshake_message& msg) {
auto current_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
int64_t network_latency_ns = current_time_ns - msg.time; // net latency in nanoseconds
if( network_latency_ns < 0 ) {
peer_wlog(this, "Peer sent a handshake with a timestamp skewed by at least ${t}ms", ("t", network_latency_ns/1000000));
network_latency_ns = -network_latency_ns; // use absolute value because it might be this node with the skew
}
uint32_t connection::calc_block_latency() {
// number of blocks syncing node is behind from a peer node, round up
uint32_t nblk_behind_by_net_latency = std::lround( static_cast<double>(network_latency_ns) / static_cast<double>(block_interval_ns) );
uint32_t nblk_behind_by_net_latency = std::lround( static_cast<double>(net_latency_ns.load()) / static_cast<double>(block_interval_ns) );
// 2x for time it takes for message to reach back to peer node
uint32_t nblk_combined_latency = 2 * nblk_behind_by_net_latency;
// message in the log below is used in p2p_high_latency_test.py test
peer_dlog(this, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received",
("lat", network_latency_ns/1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency));

net_latency_ns = network_latency_ns;
("lat", net_latency_ns/1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency));

return nblk_combined_latency;
}
Expand All @@ -3277,8 +3272,29 @@ namespace eosio {
close( retry ); // reconnect if wrong_version
}

// some clients before leap 5.0 provided microsecond epoch instead of nanosecond epoch
tstamp normalize_epoch_to_ns(tstamp x) {
int digits = 1;
tstamp v = x;
while (v/=10)
++digits;
// 168 1685 9763 7880 7944 >= 19 is nanoseconds
// 1685 9763 7880 7944 >= 16 is microseconds
// 1 6859 7637 8807 >= 13 is milliseconds
// 16 8597 6378 >= 10 is seconds
if (digits >= 19)
return x;
if (digits >= 16)
return x*1000;
if (digits >= 13)
return x*1000*1000;
if (digits >= 10)
return x*1000*1000*1000;
return 0; // unknown or is zero
}

void connection::handle_message( const time_message& msg ) {
peer_ilog( this, "received time_message" );
peer_dlog( this, "received time_message: ${t}", ("t", msg) );

/* We've already lost however many microseconds it took to dispatch
* the message, but it can't be helped.
Expand All @@ -3289,24 +3305,35 @@ namespace eosio {
if(msg.xmt == 0)
return; /* invalid timestamp */

if(msg.xmt == xmt)
auto msg_xmt = normalize_epoch_to_ns(msg.xmt);
auto msg_org = normalize_epoch_to_ns(msg.org);

if (msg_org == normalize_epoch_to_ns(org)) {
auto latency = msg.dst - msg_org;
peer_dlog(this, "send_time latency ${l}us", ("l", latency/2/1000));
net_latency_ns = latency/2;
}

if (msg_xmt == xmt)
return; /* duplicate packet */

xmt = msg.xmt;
rec = msg.rec;
dst = msg.dst;
xmt = msg_xmt;
rec = normalize_epoch_to_ns(msg.rec);
dst = msg.dst; // already normalized

if( msg.org == 0 ) {
send_time( msg );
return; // We don't have enough data to perform the calculation yet.
}

double offset = (double(rec - org) + double(msg.xmt - dst)) / 2;
double NsecPerUsec{1000};
if (org != 0) {
int64_t offset = (double(rec - org) + double(msg_xmt - dst)) / 2.0;

if( logger.is_enabled( fc::log_level::all ) )
logger.log( FC_LOG_MESSAGE( all, "Clock offset is ${o}ns (${us}us)",
("o", offset)( "us", offset / NsecPerUsec ) ) );
if (std::abs(offset) > block_interval_ns) {
peer_wlog(this, "Clock offset is ${of}us, calculation: (rec ${r} - org ${o} + xmt ${x} - dst ${d})/2",
("of", offset / 1000)("r", rec)("o", org)("x", msg_xmt)("d", dst));
}
}
org = 0;
rec = 0;

Expand Down

0 comments on commit 703f8eb

Please sign in to comment.