Skip to content

Commit

Permalink
GH-1072 Add any_of_block_connections and change for_each_connection a…
Browse files Browse the repository at this point in the history
…nd for_each_block_connection to not short circuit
  • Loading branch information
heifner committed May 22, 2023
1 parent 7470c68 commit 6a5bf21
Showing 1 changed file with 33 additions and 31 deletions.
64 changes: 33 additions & 31 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,13 @@ namespace eosio {
template <typename UnaryPredicate>
bool any_of_connections(UnaryPredicate&& p) const {
std::shared_lock g(connections_mtx);
return std::find_if(connections.cbegin(), connections.cend(), std::forward<UnaryPredicate>(p)) != connections.cend();
return std::any_of(connections.cbegin(), connections.cend(), std::forward<UnaryPredicate>(p));
}

template <typename UnaryPredicate>
bool any_of_block_connections(UnaryPredicate&& p) const {
std::shared_lock g(connections_mtx);
return std::any_of(connections.cbegin(), connections.cend(), std::forward<UnaryPredicate>(p));
}
};

Expand Down Expand Up @@ -1047,17 +1053,15 @@ namespace eosio {
template<typename Function>
void connections_manager::for_each_connection( Function&& f ) const {
std::shared_lock g( connections_mtx );
for( auto& c :connections ) {
if( !std::forward<Function>(f)( c ) ) return;
}
std::for_each(connections.begin(), connections.end(), std::forward<Function>(f));
}

template<typename Function>
void connections_manager::for_each_block_connection( Function&& f ) const {
std::shared_lock g( connections_mtx );
for( auto& c : connections ) {
if( c->is_transactions_only_connection() ) continue;
if( !std::forward<Function>(f)( c ) ) return;
f( c );
}
}

Expand Down Expand Up @@ -1749,7 +1753,6 @@ namespace eosio {
if( cc->current() && cc->last_handshake_recv.last_irreversible_block_num > highest_lib_num ) {
highest_lib_num = cc->last_handshake_recv.last_irreversible_block_num;
}
return true;
} );
sync_known_lib_num = highest_lib_num;

Expand Down Expand Up @@ -1830,7 +1833,6 @@ namespace eosio {
if( ci->current() ) {
ci->send_handshake();
}
return true;
} );
}

Expand Down Expand Up @@ -1998,14 +2000,17 @@ namespace eosio {
bool sync_manager::verify_catchup(const connection_ptr& c, uint32_t num, const block_id_type& id) {
request_message req;
req.req_blocks.mode = catch_up;
my_impl->connections.for_each_block_connection( [num, &id, &req]( const auto& cc ) {
auto is_fork_head_greater = [num, &id, &req]( const auto& cc ) {
std::lock_guard<std::mutex> g_conn( cc->conn_mtx );
if( cc->fork_head_num > num || cc->fork_head == id ) {
req.req_blocks.mode = none;
return false;
return true;
}
return true;
} );
return false;
};
if (my_impl->connections.any_of_block_connections(is_fork_head_greater)) {
req.req_blocks.mode = none;
}
if( req.req_blocks.mode == catch_up ) {
{
std::lock_guard<std::mutex> g( sync_mtx );
Expand Down Expand Up @@ -2132,7 +2137,6 @@ namespace eosio {
} else {
set_state_to_head_catchup = true;
}
return true;
} );

if( set_state_to_head_catchup ) {
Expand Down Expand Up @@ -2254,11 +2258,11 @@ namespace eosio {
my_impl->connections.for_each_block_connection( [this, &id, &bnum, &b, &buff_factory]( auto& cp ) {
fc_dlog( logger, "socket_is_open ${s}, connecting ${c}, syncing ${ss}, connection ${cid}",
("s", cp->socket_is_open())("c", cp->connecting.load())("ss", cp->syncing.load())("cid", cp->connection_id) );
if( !cp->current() ) return true;
if( !cp->current() ) return;

if( !add_peer_block( id, cp->connection_id ) ) {
fc_dlog( logger, "not bcast block ${b} to connection ${cid}", ("b", bnum)("cid", cp->connection_id) );
return true;
return;
}

send_buffer_type sb = buff_factory.get_send_buffer( b );
Expand All @@ -2271,7 +2275,6 @@ namespace eosio {
cp->enqueue_buffer( sb, no_reason );
}
});
return true;
} );
}

Expand Down Expand Up @@ -2302,18 +2305,17 @@ namespace eosio {
const fc::time_point_sec now{fc::time_point::now()};
my_impl->connections.for_each_connection( [this, &trx, &now, &buff_factory]( auto& cp ) {
if( cp->is_blocks_only_connection() || !cp->current() ) {
return true;
return;
}
if( !add_peer_txn(trx->id(), trx->expiration(), cp->connection_id, now) ) {
return true;
return;
}

send_buffer_type sb = buff_factory.get_send_buffer( trx );
fc_dlog( logger, "sending trx: ${id}, to connection ${cid}", ("id", trx->id())("cid", cp->connection_id) );
cp->strand.post( [cp, sb{std::move(sb)}]() {
cp->enqueue_buffer( sb, no_reason );
} );
return true;
} );
}

Expand Down Expand Up @@ -2363,14 +2365,14 @@ namespace eosio {
}
last_req = *c->last_req;
}
my_impl->connections.for_each_block_connection( [this, &c, &last_req, &bid]( auto& conn ) {
auto request_from_peer = [this, &c, &last_req, &bid]( auto& conn ) {
if( conn == c )
return true;
return false;

{
std::lock_guard<std::mutex> guard( conn->conn_mtx );
if( conn->last_req ) {
return true;
return false;
}
}

Expand All @@ -2382,16 +2384,18 @@ namespace eosio {
std::lock_guard<std::mutex> g_conn_conn( conn->conn_mtx );
conn->last_req = last_req;
} );
return false;
return true;
}
return true;
} );
return false;
};

// at this point no other peer has it, re-request or do nothing?
peer_wlog( c, "no peer has last_req" );
if( c->connected() ) {
c->enqueue( last_req );
c->fetch_wait();
if (!my_impl->connections.any_of_block_connections(request_from_peer)) {
// at this point no other peer has it, re-request or do nothing?
peer_wlog(c, "no peer has last_req");
if (c->connected()) {
c->enqueue(last_req);
c->fetch_wait();
}
}
}

Expand Down Expand Up @@ -2501,7 +2505,6 @@ namespace eosio {
}
}
}
return true;
} );
const uint32_t max_client_count = connections.get_max_client_count();
if( from_addr < max_nodes_per_host && (auto_bp_peering_enabled() || max_client_count == 0 || visitors < max_client_count)) {
Expand Down Expand Up @@ -3479,7 +3482,6 @@ namespace eosio {
c->check_heartbeat(current_time);
} );
}
return true;
} );
} );
}
Expand Down

0 comments on commit 6a5bf21

Please sign in to comment.