diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index 613a7fb16b..23f60eef3d 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -946,7 +947,9 @@ struct controller_impl { signal accepted_block; signal irreversible_block; signal)> applied_transaction; - signal voted_block; + signal voted_block; + + vote_processor_t vote_processor{fork_db, voted_block}; int64_t set_proposed_producers( vector producers ); int64_t set_proposed_producers_legacy( vector producers ); @@ -1195,6 +1198,7 @@ struct controller_impl { elog( "Exception in chain thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); if( shutdown ) shutdown(); } ); + vote_processor.start(4); set_activation_handler(); set_activation_handler(); @@ -1214,6 +1218,7 @@ struct controller_impl { irreversible_block.connect([this](const block_signal_params& t) { const auto& [ block, id] = t; wasmif.current_lib(block->block_num()); + vote_processor.notify_lib(block->block_num()); }); @@ -3552,19 +3557,8 @@ struct controller_impl { // called from net threads and controller's thread pool - vote_status process_vote_message( const vote_message& vote ) { - // only aggregate votes on proper if blocks - auto aggregate_vote = [&vote](auto& forkdb) -> vote_status { - auto bsp = forkdb.get_block(vote.block_id); - if (bsp && bsp->block->is_proper_svnn_block()) { - return bsp->aggregate_vote(vote); - } - return vote_status::unknown_block; - }; - auto aggregate_vote_legacy = [](auto&) -> vote_status { - return vote_status::unknown_block; - }; - return fork_db.apply(aggregate_vote_legacy, aggregate_vote); + void process_vote_message( uint32_t connection_id, const vote_message& vote ) { + vote_processor.process_vote_message(connection_id, vote); } bool node_has_voted_if_finalizer(const block_id_type& id) const { @@ -3593,11 +3587,10 @@ struct controller_impl { my_finalizers.maybe_vote( *bsp->active_finalizer_policy, bsp, bsp->strong_digest, [&](const vote_message& vote) { // net plugin subscribed to this signal. it will broadcast the vote message on receiving the signal - emit(voted_block, vote); + emit(voted_block, std::tuple{uint32_t{0}, vote_status::success, std::cref(vote)}); // also aggregate our own vote into the pending_qc for this block. - boost::asio::post(thread_pool.get_executor(), - [control = this, vote]() { control->process_vote_message(vote); }); + process_vote_message(0, vote); }); } @@ -5254,8 +5247,8 @@ void controller::set_proposed_finalizers( finalizer_policy&& fin_pol ) { } // called from net threads -vote_status controller::process_vote_message( const vote_message& vote ) { - return my->process_vote_message( vote ); +void controller::process_vote_message( uint32_t connection_id, const vote_message& vote ) { + my->process_vote_message( connection_id, vote ); }; bool controller::node_has_voted_if_finalizer(const block_id_type& id) const { @@ -5538,7 +5531,7 @@ signal& controller::accepted_block_header() { signal& controller::accepted_block() { return my->accepted_block; } signal& controller::irreversible_block() { return my->irreversible_block; } signal)>& controller::applied_transaction() { return my->applied_transaction; } -signal& controller::voted_block() { return my->voted_block; } +signal& controller::voted_block() { return my->voted_block; } chain_id_type controller::extract_chain_id(snapshot_reader& snapshot) { chain_snapshot_header header; diff --git a/libraries/chain/include/eosio/chain/block_header_state.hpp b/libraries/chain/include/eosio/chain/block_header_state.hpp index 02291c3547..0e9566dd3e 100644 --- a/libraries/chain/include/eosio/chain/block_header_state.hpp +++ b/libraries/chain/include/eosio/chain/block_header_state.hpp @@ -78,7 +78,7 @@ struct block_header_state { digest_type compute_finality_digest() const; // Returns true if the block is a Proper Savanna Block - bool is_proper_svnn_block() const; + bool is_proper_svnn_block() const { return header.is_proper_svnn_block(); } // block descending from this need the provided qc in the block extension bool is_needed(const qc_claim_t& qc_claim) const { diff --git a/libraries/chain/include/eosio/chain/controller.hpp b/libraries/chain/include/eosio/chain/controller.hpp index d2ac81dc32..14433df84d 100644 --- a/libraries/chain/include/eosio/chain/controller.hpp +++ b/libraries/chain/include/eosio/chain/controller.hpp @@ -58,6 +58,7 @@ namespace eosio::chain { using trx_meta_cache_lookup = std::function; using block_signal_params = std::tuple; + using vote_signal_params = std::tuple; enum class db_read_mode { HEAD, @@ -326,7 +327,7 @@ namespace eosio::chain { // called by host function set_finalizers void set_proposed_finalizers( finalizer_policy&& fin_pol ); // called from net threads - vote_status process_vote_message( const vote_message& msg ); + void process_vote_message( uint32_t connection_id, const vote_message& msg ); // thread safe, for testing bool node_has_voted_if_finalizer(const block_id_type& id) const; @@ -373,9 +374,8 @@ namespace eosio::chain { signal& accepted_block(); signal& irreversible_block(); signal)>& applied_transaction(); - - // Unlike other signals, voted_block can be signaled from other threads than the main thread. - signal& voted_block(); + // Unlike other signals, voted_block is signaled from other threads than the main thread. + signal& voted_block(); const apply_handler* find_apply_handler( account_name contract, scope_name scope, action_name act )const; wasm_interface& get_wasm_interface(); diff --git a/libraries/chain/include/eosio/chain/hotstuff/hotstuff.hpp b/libraries/chain/include/eosio/chain/hotstuff/hotstuff.hpp index b54f8d7416..f6e2d4297f 100644 --- a/libraries/chain/include/eosio/chain/hotstuff/hotstuff.hpp +++ b/libraries/chain/include/eosio/chain/hotstuff/hotstuff.hpp @@ -25,14 +25,18 @@ namespace eosio::chain { bool strong{false}; bls_public_key finalizer_key; bls_signature sig; + + auto operator<=>(const vote_message&) const = default; + bool operator==(const vote_message&) const = default; }; enum class vote_status { success, - duplicate, - unknown_public_key, - invalid_signature, - unknown_block + duplicate, // duplicate vote, expected as votes arrive on multiple connections + unknown_public_key, // public key is invalid, indicates invalid vote + invalid_signature, // signature is invalid, indicates invalid vote + unknown_block, // block not available, possibly less than LIB, or too far in the future + max_exceeded // received too many votes for a connection }; using bls_public_key = fc::crypto::blslib::bls_public_key; @@ -159,7 +163,7 @@ namespace eosio::chain { FC_REFLECT(eosio::chain::vote_message, (block_id)(strong)(finalizer_key)(sig)); -FC_REFLECT_ENUM(eosio::chain::vote_status, (success)(duplicate)(unknown_public_key)(invalid_signature)(unknown_block)) +FC_REFLECT_ENUM(eosio::chain::vote_status, (success)(duplicate)(unknown_public_key)(invalid_signature)(unknown_block)(max_exceeded)) FC_REFLECT(eosio::chain::valid_quorum_certificate, (_strong_votes)(_weak_votes)(_sig)); FC_REFLECT(eosio::chain::pending_quorum_certificate, (_valid_qc)(_quorum)(_max_weak_sum_before_weak_final)(_state)(_strong_sum)(_weak_sum)(_weak_votes)(_strong_votes)); FC_REFLECT_ENUM(eosio::chain::pending_quorum_certificate::state_t, (unrestricted)(restricted)(weak_achieved)(weak_final)(strong)); diff --git a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp index e1231bedcb..7c73856773 100644 --- a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp +++ b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp @@ -2,7 +2,6 @@ #include #include -#include #include #include diff --git a/libraries/chain/include/eosio/chain/vote_processor.hpp b/libraries/chain/include/eosio/chain/vote_processor.hpp new file mode 100644 index 0000000000..a2492fbc38 --- /dev/null +++ b/libraries/chain/include/eosio/chain/vote_processor.hpp @@ -0,0 +1,216 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include + +namespace eosio { namespace chain { + +/** + * Process votes in a dedicated thread pool. + */ +class vote_processor_t { + static constexpr size_t max_votes_per_connection = 2500; // 3000 is less than 1MB per connection + static constexpr std::chrono::milliseconds block_wait_time{10}; + + struct by_block_num; + struct by_connection; + struct by_vote; + + struct vote { + uint32_t connection_id; + vote_message msg; + + const block_id_type& id() const { return msg.block_id; } + block_num_type block_num() const { return block_header::num_from_id(msg.block_id); } + }; + + using vote_ptr = std::shared_ptr; + using vote_signal_type = decltype(controller({},chain_id_type::empty_chain_id()).voted_block()); + + typedef multi_index_container< vote_ptr, + indexed_by< + ordered_non_unique, + composite_key, + const_mem_fun + >, composite_key_compare< std::greater<>, sha256_less > // greater for block_num + >, + ordered_non_unique< tag, member >, + ordered_unique< tag, member > + > + > vote_index_type; + + fork_database& fork_db; + std::mutex mtx; + std::condition_variable cv; + vote_index_type index; + // connection, count of messages + std::map num_messages; + std::atomic lib{0}; + std::atomic stopped{false}; + vote_signal_type& vote_signal; + named_thread_pool thread_pool; + +private: + template + void emit( const Signal& s, Arg&& a ) { + try { + s(std::forward(a)); + } catch (std::bad_alloc& e) { + wlog( "std::bad_alloc: ${w}", ("w", e.what()) ); + throw e; + } catch (boost::interprocess::bad_alloc& e) { + wlog( "boost::interprocess::bad alloc: ${w}", ("w", e.what()) ); + throw e; + } catch ( controller_emit_signal_exception& e ) { + wlog( "controller_emit_signal_exception: ${details}", ("details", e.to_detail_string()) ); + throw e; + } catch ( fc::exception& e ) { + wlog( "fc::exception: ${details}", ("details", e.to_detail_string()) ); + } catch ( std::exception& e ) { + wlog( "std::exception: ${details}", ("details", e.what()) ); + } catch ( ... ) { + wlog( "signal handler threw exception" ); + } + } + + void emit(uint32_t connection_id, vote_status status, const vote_message& msg) { + if (connection_id != 0) { // this nodes vote was already signaled + emit( vote_signal, std::tuple{connection_id, status, std::cref(msg)} ); + } + } + + void remove_connection(uint32_t connection_id) { + auto& idx = index.get(); + idx.erase(idx.lower_bound(connection_id), idx.upper_bound(connection_id)); + } + + void remove_before_lib() { + auto& idx = index.get(); + idx.erase(idx.lower_bound(lib.load()), idx.end()); // descending + } + + bool remove_all_for_block(auto& idx, auto& it, const block_id_type& id) { + while (it != idx.end() && (*it)->id() == id) { + it = idx.erase(it); + } + return it == idx.end(); + } + + bool skip_all_for_block(auto& idx, auto& it, const block_id_type& id) { + while (it != idx.end() && (*it)->id() == id) { + ++it; + } + return it == idx.end(); + } + +public: + explicit vote_processor_t(fork_database& forkdb, vote_signal_type& vote_signal) + : fork_db(forkdb) + , vote_signal(vote_signal) {} + + ~vote_processor_t() { + stopped = true; + std::lock_guard g(mtx); + cv.notify_one(); + } + + void start(size_t num_threads) { + assert(num_threads > 1); // need at least two as one is used for coordinatation + thread_pool.start( num_threads, []( const fc::exception& e ) { + elog( "Exception in vote processor thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); + } ); + + // one coordinator thread + boost::asio::post(thread_pool.get_executor(), [&]() { + block_id_type not_in_forkdb_id{}; + while (!stopped) { + std::unique_lock g(mtx); + cv.wait(g, [&]() { + if (!index.empty() || stopped) + return true; + return false; + }); + if (stopped) + break; + remove_before_lib(); + if (index.empty()) + continue; + auto& idx = index.get(); + if (auto i = idx.begin(); i != idx.end() && not_in_forkdb_id == (*i)->id()) { // same block as last while loop + g.unlock(); + std::this_thread::sleep_for(block_wait_time); + g.lock(); + } + for (auto i = idx.begin(); i != idx.end();) { + auto& vt = *i; + block_state_ptr bsp = fork_db.apply_s([&](const auto& forkdb) { + return forkdb.get_block(vt->id()); + }); + if (bsp) { + if (!bsp->is_proper_svnn_block()) { + if (remove_all_for_block(idx, i, bsp->id())) + break; + continue; + } + auto iter_of_bsp = i; + std::vector to_process; + to_process.reserve(std::min(21u, idx.size())); // increase if we increase # of finalizers from 21 + for(; i != idx.end() && bsp->id() == (*i)->id(); ++i) { + // although it is the highest contention on block state pending mutex posting all of the same bsp, + // the highest priority is processing votes for this block state. + to_process.push_back(*i); + } + bool should_break = remove_all_for_block(idx, iter_of_bsp, bsp->id()); + g.unlock(); // do not hold lock when posting + for (auto& vptr : to_process) { + boost::asio::post(thread_pool.get_executor(), [this, bsp, vptr=std::move(vptr)]() { + vote_status s = bsp->aggregate_vote(vptr->msg); + emit(vptr->connection_id, s, vptr->msg); + }); + } + if (should_break) + break; + } else { + not_in_forkdb_id = vt->id(); + if (skip_all_for_block(idx, i, (*i)->id())) + break; + } + } + } + dlog("Exiting vote processor coordinator thread"); + }); + } + + void notify_lib(block_num_type block_num) { + lib = block_num; + } + + void process_vote_message(uint32_t connection_id, const vote_message& msg) { + vote_ptr vptr = std::make_shared(vote{.connection_id = connection_id, .msg = msg}); + boost::asio::post(thread_pool.get_executor(), [this, connection_id, msg] { + std::unique_lock g(mtx); + if (++num_messages[connection_id] > max_votes_per_connection) { + // consider the connection invalid, remove all votes + remove_connection(connection_id); + g.unlock(); + + elog("Exceeded max votes per connection for ${c}", ("c", connection_id)); + emit(connection_id, vote_status::max_exceeded, msg); + } else if (block_header::num_from_id(msg.block_id) < lib.load(std::memory_order_relaxed)) { + // ignore + } else { + index.insert(std::make_shared(vote{.connection_id = connection_id, .msg = msg})); + cv.notify_one(); + } + }); + } + +}; + +} } //eosio::chain diff --git a/libraries/libfc/include/fc/crypto/bls_signature.hpp b/libraries/libfc/include/fc/crypto/bls_signature.hpp index d8c2191d4e..ebf0390f1e 100644 --- a/libraries/libfc/include/fc/crypto/bls_signature.hpp +++ b/libraries/libfc/include/fc/crypto/bls_signature.hpp @@ -44,6 +44,13 @@ namespace fc::crypto::blslib { return _jacobian_montgomery_le.equal(sig._jacobian_montgomery_le); } + auto operator<=>(const bls_signature& rhs) const { + return _affine_non_montgomery_le <=> rhs._affine_non_montgomery_le; + } + auto operator==(const bls_signature& rhs) const { + return _affine_non_montgomery_le == rhs._affine_non_montgomery_le; + } + template friend T& operator<<(T& ds, const bls_signature& sig) { // Serialization as variable length array when it is stored as a fixed length array. This makes for easier deserialization by external tools diff --git a/libraries/testing/tester.cpp b/libraries/testing/tester.cpp index 719851a767..e9ccf4efa4 100644 --- a/libraries/testing/tester.cpp +++ b/libraries/testing/tester.cpp @@ -495,7 +495,7 @@ namespace eosio { namespace testing { // wait for this node's vote to be processed size_t retrys = 200; while (!c.node_has_voted_if_finalizer(c.head_block_id()) && --retrys) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } FC_ASSERT(retrys, "Never saw this nodes vote processed before timeout"); } diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 9d4c0f5b16..4bdf231f61 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -305,7 +305,7 @@ namespace eosio { bool have_txn( const transaction_id_type& tid ) const; void expire_txns(); - void bcast_vote_msg( const std::optional& exclude_peer, send_buffer_type msg ); + void bcast_vote_msg( uint32_t exclude_peer, send_buffer_type msg ); void add_unlinkable_block( signed_block_ptr b, const block_id_type& id ) { std::optional rm_blk_id = unlinkable_block_cache.add_unlinkable_block(std::move(b), id); @@ -529,12 +529,12 @@ namespace eosio { void on_accepted_block_header( const signed_block_ptr& block, const block_id_type& id ); void on_accepted_block(); - void on_voted_block ( const vote_message& vote ); + void on_voted_block ( uint32_t connection_id, vote_status stauts, const vote_message& vote ); void transaction_ack(const std::pair&); void on_irreversible_block( const block_id_type& id, uint32_t block_num ); - void bcast_vote_message( const std::optional& exclude_peer, const chain::vote_message& msg ); + void bcast_vote_message( uint32_t exclude_peer, const chain::vote_message& msg ); void warn_message( uint32_t sender_peer, const chain::hs_message_warning& code ); void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection); @@ -2666,10 +2666,10 @@ namespace eosio { } ); } - void dispatch_manager::bcast_vote_msg( const std::optional& exclude_peer, send_buffer_type msg ) { + void dispatch_manager::bcast_vote_msg( uint32_t exclude_peer, send_buffer_type msg ) { my_impl->connections.for_each_block_connection( [exclude_peer, msg{std::move(msg)}]( auto& cp ) { if( !cp->current() ) return true; - if( exclude_peer.has_value() && cp->connection_id == exclude_peer.value() ) return true; + if( cp->connection_id == exclude_peer ) return true; cp->strand.post( [cp, msg]() { if (cp->protocol_version >= proto_instant_finality) { peer_dlog(cp, "sending vote msg"); @@ -3713,24 +3713,7 @@ namespace eosio { ("bn", block_header::num_from_id(msg.block_id))("id", msg.block_id.str().substr(8,16)) ("v", msg.strong ? "strong" : "weak")("k", msg.finalizer_key.to_string().substr(8, 16))); controller& cc = my_impl->chain_plug->chain(); - - switch( cc.process_vote_message(msg) ) { - case vote_status::success: - my_impl->bcast_vote_message(connection_id, msg); - break; - case vote_status::unknown_public_key: - case vote_status::invalid_signature: // close peer immediately - close( false ); // do not reconnect after closing - break; - case vote_status::unknown_block: // track the failure - peer_dlog(this, "vote unknown block #${bn}:${id}..", ("bn", block_header::num_from_id(msg.block_id))("id", msg.block_id.str().substr(8,16))); - block_status_monitor_.rejected(); - break; - case vote_status::duplicate: // do nothing - break; - default: - assert(false); // should never happen - } + cc.process_vote_message(connection_id, msg); } size_t calc_trx_size( const packed_transaction_ptr& trx ) { @@ -3996,14 +3979,41 @@ namespace eosio { } // called from other threads including net threads - void net_plugin_impl::on_voted_block(const vote_message& msg) { - fc_dlog(logger, "on voted signal: block #${bn} ${id}.., ${t}, key ${k}..", - ("bn", block_header::num_from_id(msg.block_id))("id", msg.block_id.str().substr(8,16)) + void net_plugin_impl::on_voted_block(uint32_t connection_id, vote_status status, const vote_message& msg) { + fc_dlog(logger, "connection - ${c} on voted signal: ${s} block #${bn} ${id}.., ${t}, key ${k}..", + ("c", connection_id)("s", status)("bn", block_header::num_from_id(msg.block_id))("id", msg.block_id.str().substr(8,16)) ("t", msg.strong ? "strong" : "weak")("k", msg.finalizer_key.to_string().substr(8, 16))); - bcast_vote_message(std::nullopt, msg); + + switch( status ) { + case vote_status::success: + bcast_vote_message(connection_id, msg); + break; + case vote_status::unknown_public_key: + case vote_status::invalid_signature: + case vote_status::max_exceeded: // close peer immediately + my_impl->connections.for_each_connection([connection_id](const connection_ptr& c) { + if (c->connection_id == connection_id) { + c->close( false ); + } + }); + break; + case vote_status::unknown_block: // track the failure + fc_dlog(logger, "connection - ${c} vote unknown block #${bn}:${id}..", + ("c", connection_id)("bn", block_header::num_from_id(msg.block_id))("id", msg.block_id.str().substr(8,16))); + my_impl->connections.for_each_connection([connection_id](const connection_ptr& c) { + if (c->connection_id == connection_id) { + c->block_status_monitor_.rejected(); + } + }); + break; + case vote_status::duplicate: // do nothing + break; + default: + assert(false); // should never happen + } } - void net_plugin_impl::bcast_vote_message( const std::optional& exclude_peer, const chain::vote_message& msg ) { + void net_plugin_impl::bcast_vote_message( uint32_t exclude_peer, const chain::vote_message& msg ) { buffer_factory buff_factory; auto send_buffer = buff_factory.get_send_buffer( msg ); @@ -4420,8 +4430,8 @@ namespace eosio { my->on_irreversible_block( id, block->block_num() ); } ); - cc.voted_block().connect( [my = shared_from_this()]( const vote_message& vote ) { - my->on_voted_block(vote); + cc.voted_block().connect( [my = shared_from_this()]( const vote_signal_params& vote_signal ) { + my->on_voted_block(std::get<0>(vote_signal), std::get<1>(vote_signal), std::get<2>(vote_signal)); } ); } diff --git a/unittests/finality_test_cluster.cpp b/unittests/finality_test_cluster.cpp index 0ea13c13ed..e84340cb44 100644 --- a/unittests/finality_test_cluster.cpp +++ b/unittests/finality_test_cluster.cpp @@ -10,13 +10,20 @@ finality_test_cluster::finality_test_cluster() { produce_and_push_block(); // make setfinalizer irreversible + // node0's votes + node0.node.control->voted_block().connect( [&]( const eosio::chain::vote_signal_params& v ) { + last_vote_status = std::get<1>(v); + last_connection_vote = std::get<0>(v); + }); // collect node1's votes - node1.node.control->voted_block().connect( [&]( const eosio::chain::vote_message& vote ) { - node1.votes.emplace_back(vote); + node1.node.control->voted_block().connect( [&]( const eosio::chain::vote_signal_params& v ) { + std::lock_guard g(node1.votes_mtx); + node1.votes.emplace_back(std::get<2>(v)); }); // collect node2's votes - node2.node.control->voted_block().connect( [&]( const eosio::chain::vote_message& vote ) { - node2.votes.emplace_back(vote); + node2.node.control->voted_block().connect( [&]( const eosio::chain::vote_signal_params& v ) { + std::lock_guard g(node2.votes_mtx); + node2.votes.emplace_back(std::get<2>(v)); }); // form a 3-chain to make LIB advacing on node0 @@ -35,11 +42,24 @@ finality_test_cluster::finality_test_cluster() { // clean up processed votes for (auto& n : nodes) { + std::lock_guard g(n.votes_mtx); n.votes.clear(); n.prev_lib_num = n.node.control->if_irreversible_block_num(); } } +eosio::chain::vote_status finality_test_cluster::wait_on_vote(uint32_t connection_id) { + // wait for this node's vote to be processed + size_t retrys = 200; + while ( (last_connection_vote != connection_id) && --retrys) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + if (last_connection_vote != connection_id) { + FC_ASSERT(false, "Never received vote"); + } + return last_vote_status; +} + // node0 produces a block and pushes it to node1 and node2 void finality_test_cluster::produce_and_push_block() { auto b = node0.node.produce_block(); @@ -87,8 +107,11 @@ bool finality_test_cluster::node2_lib_advancing() { // node1_votes and node2_votes when starting. bool finality_test_cluster::produce_blocks_and_verify_lib_advancing() { // start from fresh - node1.votes.clear(); - node2.votes.clear(); + { + std::scoped_lock g(node1.votes_mtx, node2.votes_mtx); + node1.votes.clear(); + node2.votes.clear(); + } for (auto i = 0; i < 3; ++i) { produce_and_push_block(); @@ -103,6 +126,7 @@ bool finality_test_cluster::produce_blocks_and_verify_lib_advancing() { } void finality_test_cluster::node1_corrupt_vote_proposal_id() { + std::lock_guard g(node1.votes_mtx); node1_orig_vote = node1.votes[0]; if( node1.votes[0].block_id.data()[0] == 'a' ) { @@ -113,6 +137,7 @@ void finality_test_cluster::node1_corrupt_vote_proposal_id() { } void finality_test_cluster::node1_corrupt_vote_finalizer_key() { + std::lock_guard g(node1.votes_mtx); node1_orig_vote = node1.votes[0]; // corrupt the finalizer_key (manipulate so it is different) @@ -123,6 +148,7 @@ void finality_test_cluster::node1_corrupt_vote_finalizer_key() { } void finality_test_cluster::node1_corrupt_vote_signature() { + std::lock_guard g(node1.votes_mtx); node1_orig_vote = node1.votes[0]; // corrupt the signature @@ -133,6 +159,7 @@ void finality_test_cluster::node1_corrupt_vote_signature() { } void finality_test_cluster::node1_restore_to_original_vote() { + std::lock_guard g(node1.votes_mtx); node1.votes[0] = node1_orig_vote; } @@ -177,6 +204,7 @@ void finality_test_cluster::setup_node(node_info& node, eosio::chain::account_na // send a vote to node0 eosio::chain::vote_status finality_test_cluster::process_vote(node_info& node, size_t vote_index, vote_mode mode) { + std::unique_lock g(node.votes_mtx); FC_ASSERT( vote_index < node.votes.size(), "out of bound index in process_vote" ); auto& vote = node.votes[vote_index]; if( mode == vote_mode::strong ) { @@ -189,8 +217,13 @@ eosio::chain::vote_status finality_test_cluster::process_vote(node_info& node, s // convert the strong digest to weak and sign it vote.sig = node.priv_key.sign(eosio::chain::create_weak_digest(strong_digest)); } + g.unlock(); - return node0.node.control->process_vote_message( vote ); + static uint32_t connection_id = 0; + node0.node.control->process_vote_message( ++connection_id, vote ); + if (eosio::chain::block_header::num_from_id(vote.block_id) > node0.node.control->last_irreversible_block_num()) + return wait_on_vote(connection_id); + return eosio::chain::vote_status::unknown_block; } eosio::chain::vote_status finality_test_cluster::process_vote(node_info& node, vote_mode mode) { diff --git a/unittests/finality_test_cluster.hpp b/unittests/finality_test_cluster.hpp index 97ab1aa4f0..a84b86bb46 100644 --- a/unittests/finality_test_cluster.hpp +++ b/unittests/finality_test_cluster.hpp @@ -78,10 +78,14 @@ class finality_test_cluster { struct node_info { eosio::testing::tester node; uint32_t prev_lib_num{0}; + std::mutex votes_mtx; std::vector votes; fc::crypto::blslib::bls_private_key priv_key; }; + std::atomic last_connection_vote{0}; + std::atomic last_vote_status{}; + std::array nodes; node_info& node0 = nodes[0]; node_info& node1 = nodes[1]; @@ -100,4 +104,6 @@ class finality_test_cluster { // send the latest vote on "node_index" node to node0 eosio::chain::vote_status process_vote(node_info& node, vote_mode mode); + + eosio::chain::vote_status wait_on_vote(uint32_t connection_id); }; diff --git a/unittests/finality_tests.cpp b/unittests/finality_tests.cpp index 6d68774fe5..311d27b33b 100644 --- a/unittests/finality_tests.cpp +++ b/unittests/finality_tests.cpp @@ -502,8 +502,7 @@ BOOST_AUTO_TEST_CASE(unknown_proposal_votes) { try { cluster.node1_corrupt_vote_proposal_id(); // process the corrupted vote - cluster.process_node1_vote(0); - BOOST_REQUIRE(cluster.process_node1_vote(0) == eosio::chain::vote_status::unknown_block); + BOOST_REQUIRE_THROW(cluster.process_node1_vote(0), fc::exception); // throws because it times out waiting on vote cluster.produce_and_push_block(); BOOST_REQUIRE(cluster.node2_lib_advancing());