Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P use round trip of time_message to calculate latency #1235

Closed
wants to merge 8 commits into from
123 changes: 77 additions & 46 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -861,11 +861,11 @@ namespace eosio {
* Time message handling
* @{
*/
// Members set from network data
tstamp org{0}; //!< originate timestamp
tstamp rec{0}; //!< receive timestamp
tstamp dst{0}; //!< destination timestamp
tstamp xmt{0}; //!< transmit timestamp
// See NTP protocol. https://datatracker.ietf.org/doc/rfc5905/
tstamp org{0}; //!< origin timestamp. Time at the client when the request departed for the server.
tstamp rec{0}; //!< receive timestamp. Time at the server when the request arrived from the client.
tstamp dst{0}; //!< destination timestamp, Time at the client when the reply arrived from the server.
tstamp xmt{0}; //!< transmit timestamp, Time at the server when the response left for the client.
/** @} */
// timestamp for the lastest message
tstamp latest_msg_time{0};
Expand Down Expand Up @@ -926,7 +926,7 @@ namespace eosio {
* day routine and converts to a (at least) 64 bit integer.
*/
static tstamp get_time() {
return std::chrono::system_clock::now().time_since_epoch().count();
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
/** @} */

Expand Down Expand Up @@ -985,7 +985,7 @@ namespace eosio {
void handle_message( packed_transaction_ptr trx );

// returns calculated number of blocks combined latency
uint32_t update_latency(const handshake_message& msg);
uint32_t calc_block_latency();

void process_signed_block( const block_id_type& id, signed_block_ptr block, block_state_ptr bsp );

Expand Down Expand Up @@ -1466,6 +1466,7 @@ namespace eosio {
xpkt.rec = dst;
xpkt.xmt = get_time();
org = xpkt.xmt;
peer_dlog( this, "send init time_message: ${t}", ("t", xpkt) );
enqueue(xpkt);
}

Expand All @@ -1475,6 +1476,7 @@ namespace eosio {
xpkt.org = msg.xmt;
xpkt.rec = msg.dst;
xpkt.xmt = get_time();
peer_dlog( this, "send time_message: ${t}", ("t", xpkt) );
enqueue(xpkt);
}

Expand Down Expand Up @@ -2225,9 +2227,7 @@ namespace eosio {
c->last_handshake_recv.last_irreversible_block_num = msg.known_trx.pending;
}
sync_reset_lib_num(c, false);
if (is_in_sync()) {
start_sync(c, msg.known_trx.pending);
}
start_sync(c, msg.known_trx.pending);
}
}

Expand Down Expand Up @@ -2628,6 +2628,7 @@ namespace eosio {
if( !err && socket->is_open() && socket == c->socket ) {
if( c->start_session() ) {
c->send_handshake();
c->send_time();
}
} else {
fc_elog( logger, "connection failed to ${a}, ${error}", ("a", c->peer_address())( "error", err.message()));
Expand Down Expand Up @@ -3230,28 +3231,22 @@ namespace eosio {
}
}

uint32_t nblk_combined_latency = update_latency(msg);
uint32_t nblk_combined_latency = calc_block_latency();
my_impl->sync_master->recv_handshake( shared_from_this(), msg, nblk_combined_latency );
}

// called from connection strand
uint32_t connection::update_latency(const handshake_message& msg) {
auto current_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
int64_t network_latency_ns = current_time_ns - msg.time; // net latency in nanoseconds
if( network_latency_ns < 0 ) {
peer_wlog(this, "Peer sent a handshake with a timestamp skewed by at least ${t}ms", ("t", network_latency_ns/1000000));
network_latency_ns = -network_latency_ns; // use absolute value because it might be this node with the skew
}
// number of blocks syncing node is behind from a peer node, round up
uint32_t nblk_behind_by_net_latency = std::lround( static_cast<double>(network_latency_ns) / static_cast<double>(block_interval_ns) );
// 2x for time it takes for message to reach back to peer node
uint32_t nblk_combined_latency = 2 * nblk_behind_by_net_latency;
// message in the log below is used in p2p_high_latency_test.py test
peer_dlog(this, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received",
("lat", network_latency_ns/1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency));

net_latency_ns = network_latency_ns;

uint32_t connection::calc_block_latency() {
uint32_t nblk_combined_latency = 0;
if (net_latency_ns != std::numeric_limits<uint64_t>::max()) {
// number of blocks syncing node is behind from a peer node, round up
uint32_t nblk_behind_by_net_latency = std::lround(static_cast<double>(net_latency_ns.load()) / static_cast<double>(block_interval_ns));
// 2x for time it takes for message to reach back to peer node
nblk_combined_latency = 2 * nblk_behind_by_net_latency;
// message in the log below is used in p2p_high_latency_test.py test
peer_dlog(this, "Network latency is ${lat}ms, ${num} blocks discrepancy by network latency, ${tot_num} blocks discrepancy expected once message received",
("lat", net_latency_ns / 1000000)("num", nblk_behind_by_net_latency)("tot_num", nblk_combined_latency));
}
return nblk_combined_latency;
}

Expand All @@ -3277,36 +3272,66 @@ namespace eosio {
close( retry ); // reconnect if wrong_version
}

// some clients before leap 5.0 provided microsecond epoch instead of nanosecond epoch
tstamp normalize_epoch_to_ns(tstamp x) {
int digits = 1;
tstamp v = x;
while (v/=10)
++digits;
// 168 1685 9763 7880 7944 >= 19 is nanoseconds
// 1685 9763 7880 7944 >= 16 is microseconds
// 1 6859 7637 8807 >= 13 is milliseconds
// 16 8597 6378 >= 10 is seconds
if (digits >= 19)
return x;
if (digits >= 16)
return x*1000;
if (digits >= 13)
return x*1000*1000;
if (digits >= 10)
return x*1000*1000*1000;
return 0; // unknown or is zero
}

void connection::handle_message( const time_message& msg ) {
peer_ilog( this, "received time_message" );
peer_dlog( this, "received time_message: ${t}", ("t", msg) );

/* We've already lost however many microseconds it took to dispatch
* the message, but it can't be helped.
*/
// We've already lost however many microseconds it took to dispatch the message, but it can't be helped.
msg.dst = get_time();

// If the transmit timestamp is zero, the peer is horribly broken.
if(msg.xmt == 0)
return; /* invalid timestamp */
return; // invalid timestamp

auto msg_xmt = normalize_epoch_to_ns(msg.xmt);
auto msg_org = normalize_epoch_to_ns(msg.org);

if(msg.xmt == xmt)
return; /* duplicate packet */
if (msg_org != 0 && msg_org == normalize_epoch_to_ns(org)) {
auto latency = msg.dst - msg_org;
peer_dlog(this, "send_time latency ${l}us", ("l", latency/2/1000));
net_latency_ns = latency/2;
}

if (msg_xmt == xmt)
return; // duplicate packet

xmt = msg.xmt;
rec = msg.rec;
dst = msg.dst;
xmt = msg_xmt;
rec = normalize_epoch_to_ns(msg.rec);
dst = msg.dst; // already normalized

if( msg.org == 0 ) {
send_time( msg );
return; // We don't have enough data to perform the calculation yet.
}

double offset = (double(rec - org) + double(msg.xmt - dst)) / 2;
double NsecPerUsec{1000};
if (org != 0) {
int64_t offset = (double(rec - org) + double(msg_xmt - dst)) / 2.0;

if( logger.is_enabled( fc::log_level::all ) )
logger.log( FC_LOG_MESSAGE( all, "Clock offset is ${o}ns (${us}us)",
("o", offset)( "us", offset / NsecPerUsec ) ) );
if (std::abs(offset) > block_interval_ns) {
peer_wlog(this, "Clock offset is ${of}us, calculation: (rec ${r} - org ${o} + xmt ${x} - dst ${d})/2",
("of", offset / 1000)("r", rec)("o", org)("x", msg_xmt)("d", dst));
}
}
org = 0;
rec = 0;

Expand All @@ -3315,6 +3340,11 @@ namespace eosio {
g_conn.unlock();
send_handshake();
}

// make sure we also get the latency we need
if (net_latency_ns == std::numeric_limits<uint64_t>::max()) {
send_time();
}
}

void connection::handle_message( const notice_message& msg ) {
Expand Down Expand Up @@ -3683,9 +3713,10 @@ namespace eosio {
// called from application thread
void net_plugin_impl::on_accepted_block_header(const block_state_ptr& bs) {
update_chain_info();
dispatcher->strand.post( [bs]() {
fc_dlog( logger, "signaled accepted_block_header, blk num = ${num}, id = ${id}", ("num", bs->block_num)("id", bs->id) );
my_impl->dispatcher->bcast_block( bs->block, bs->id );

dispatcher->strand.post([bs]() {
fc_dlog(logger, "signaled accepted_block_header, blk num = ${num}, id = ${id}", ("num", bs->block_num)("id", bs->id));
my_impl->dispatcher->bcast_block(bs->block, bs->id);
});
}

Expand Down
17 changes: 9 additions & 8 deletions tests/nodeos_forked_chain_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,14 +559,15 @@ def getBlock(self, blockNum):
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails)

if not testSuccessful:
Print(Utils.FileDivider)
Print("Compare Blocklog")
cluster.compareBlockLogs()
Print(Utils.FileDivider)
Print("Print Blocklog")
cluster.printBlockLog()
Print(Utils.FileDivider)
# Too much output for ci/cd
# if not testSuccessful:
# Print(Utils.FileDivider)
# Print("Compare Blocklog")
# cluster.compareBlockLogs()
# Print(Utils.FileDivider)
# Print("Print Blocklog")
# cluster.printBlockLog()
# Print(Utils.FileDivider)

exitCode = 0 if testSuccessful else 1
exit(exitCode)
18 changes: 10 additions & 8 deletions tests/nodeos_high_transaction_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,14 +375,16 @@ def findTransInBlock(transId, transToBlock, node):
testSuccessful = not delayedReportError
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails)
if not testSuccessful:
Print(Utils.FileDivider)
Print("Compare Blocklog")
cluster.compareBlockLogs()
Print(Utils.FileDivider)
Print("Print Blocklog")
cluster.printBlockLog()
Print(Utils.FileDivider)

# Too much output for ci/cd
# if not testSuccessful:
# Print(Utils.FileDivider)
# Print("Compare Blocklog")
# cluster.compareBlockLogs()
# Print(Utils.FileDivider)
# Print("Print Blocklog")
# cluster.printBlockLog()
# Print(Utils.FileDivider)

errorCode = 0 if testSuccessful else 1
exit(errorCode)
17 changes: 9 additions & 8 deletions tests/nodeos_short_fork_take_over_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,15 @@ def getMinHeadAndLib(prodNodes):
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails)

if not testSuccessful:
Print(Utils.FileDivider)
Print("Compare Blocklog")
cluster.compareBlockLogs()
Print(Utils.FileDivider)
Print("Compare Blocklog")
cluster.printBlockLog()
Print(Utils.FileDivider)
# Too much output for ci/cd
# if not testSuccessful:
# Print(Utils.FileDivider)
# Print("Compare Blocklog")
# cluster.compareBlockLogs()
# Print(Utils.FileDivider)
# Print("Print Blocklog")
# cluster.printBlockLog()
# Print(Utils.FileDivider)

exitCode = 0 if testSuccessful else 1
exit(exitCode)