Skip to content

Commit

Permalink
Merge pull request #24 from AntelopeIO/GH-3-process-votes
Browse files Browse the repository at this point in the history
IF: Process votes in a dedicated thread pool
  • Loading branch information
heifner authored Apr 19, 2024
2 parents c3b5ed1 + 8d3a838 commit 5e5c176
Show file tree
Hide file tree
Showing 26 changed files with 809 additions and 187 deletions.
123 changes: 51 additions & 72 deletions libraries/chain/controller.cpp

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions libraries/chain/hotstuff/finalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ finalizer::vote_result finalizer::decide_vote(const block_state_ptr& bsp) {
}

// ----------------------------------------------------------------------------------------
std::optional<vote_message> finalizer::maybe_vote(const bls_public_key& pub_key,
const block_state_ptr& bsp,
const digest_type& digest) {
vote_message_ptr finalizer::maybe_vote(const bls_public_key& pub_key,
const block_state_ptr& bsp,
const digest_type& digest) {
finalizer::vote_decision decision = decide_vote(bsp).decision;
if (decision == vote_decision::strong_vote || decision == vote_decision::weak_vote) {
bls_signature sig;
Expand All @@ -99,7 +99,7 @@ std::optional<vote_message> finalizer::maybe_vote(const bls_public_key& pub_key,
} else {
sig = priv_key.sign({(uint8_t*)digest.data(), (uint8_t*)digest.data() + digest.data_size()});
}
return std::optional{vote_message{ bsp->id(), decision == vote_decision::strong_vote, pub_key, sig }};
return std::make_shared<vote_message>(bsp->id(), decision == vote_decision::strong_vote, pub_key, sig);
}
return {};
}
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/include/eosio/chain/block_header.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ namespace eosio::chain {
// When block header is validated in block_header_state's next(),
// it is already validate if schedule_version == proper_svnn_schedule_version,
// finality extension must exist.
bool is_proper_svnn_block() const { return ( schedule_version == proper_svnn_schedule_version ); }
bool is_proper_svnn_block() const { return ( schedule_version == proper_svnn_schedule_version ); }

header_extension_multimap validate_and_extract_header_extensions()const;
std::optional<block_header_extension> extract_header_extension(uint16_t extension_id)const;
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/include/eosio/chain/block_header_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ struct block_header_state {
digest_type compute_finality_digest() const;

// Returns true if the block is a Proper Savanna Block
bool is_proper_svnn_block() const;
bool is_proper_svnn_block() const { return header.is_proper_svnn_block(); }

// block descending from this need the provided qc in the block extension
bool is_needed(const qc_claim_t& qc_claim) const {
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ const static uint16_t default_max_auth_depth = 6;
const static uint32_t default_sig_cpu_bill_pct = 50 * percent_1; // billable percentage of signature recovery
const static uint32_t default_produce_block_offset_ms = 450;
const static uint16_t default_controller_thread_pool_size = 2;
const static uint16_t default_vote_thread_pool_size = 4;
const static uint32_t default_max_variable_signature_length = 16384u;
const static uint32_t default_max_action_return_value_size = 256;

Expand Down
46 changes: 40 additions & 6 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ namespace eosio::chain {
using trx_meta_cache_lookup = std::function<transaction_metadata_ptr( const transaction_id_type&)>;

using block_signal_params = std::tuple<const signed_block_ptr&, const block_id_type&>;
// connection_id, vote result status, vote_message processed
using vote_signal_params = std::tuple<uint32_t, vote_status, const vote_message_ptr&>;
using vote_signal_t = signal<void(const vote_signal_params&)>;

enum class db_read_mode {
HEAD,
Expand Down Expand Up @@ -87,7 +90,8 @@ namespace eosio::chain {
uint64_t state_size = chain::config::default_state_size;
uint64_t state_guard_size = chain::config::default_state_guard_size;
uint32_t sig_cpu_bill_pct = chain::config::default_sig_cpu_bill_pct;
uint16_t thread_pool_size = chain::config::default_controller_thread_pool_size;
uint16_t chain_thread_pool_size = chain::config::default_controller_thread_pool_size;
uint16_t vote_thread_pool_size = 0;
bool read_only = false;
bool force_all_checks = false;
bool disable_replay_opts = false;
Expand Down Expand Up @@ -326,7 +330,7 @@ namespace eosio::chain {
// called by host function set_finalizers
void set_proposed_finalizers( finalizer_policy&& fin_pol );
// called from net threads
vote_status process_vote_message( const vote_message& msg );
void process_vote_message( uint32_t connection_id, const vote_message_ptr& msg );
// thread safe, for testing
bool node_has_voted_if_finalizer(const block_id_type& id) const;

Expand Down Expand Up @@ -376,9 +380,8 @@ namespace eosio::chain {
signal<void(const block_signal_params&)>& accepted_block();
signal<void(const block_signal_params&)>& irreversible_block();
signal<void(std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&>)>& applied_transaction();

// Unlike other signals, voted_block can be signaled from other threads than the main thread.
signal<void(const vote_message&)>& voted_block();
// Unlike other signals, voted_block is signaled from other threads than the main thread.
vote_signal_t& voted_block();

const apply_handler* find_apply_handler( account_name contract, scope_name scope, action_name act )const;
wasm_interface& get_wasm_interface();
Expand Down Expand Up @@ -409,6 +412,37 @@ namespace eosio::chain {
chainbase::database& mutable_db()const;

std::unique_ptr<controller_impl> my;
};
}; // controller

/**
* Plugins / observers listening to signals emited might trigger
* errors and throw exceptions. Unless those exceptions are caught it could impact consensus and/or
* cause a node to fork.
*
* If it is ever desirable to let a signal handler bubble an exception out of this method
* a full audit of its uses needs to be undertaken.
*
*/
template<typename Signal, typename Arg>
void emit( const Signal& s, Arg&& a, const char* file, uint32_t line ) {
try {
s( std::forward<Arg>( a ));
} catch (std::bad_alloc& e) {
wlog( "${f}:${l} std::bad_alloc: ${w}", ("f", file)("l", line)("w", e.what()) );
throw e;
} catch (boost::interprocess::bad_alloc& e) {
wlog( "${f}:${l} boost::interprocess::bad alloc: ${w}", ("f", file)("l", line)("w", e.what()) );
throw e;
} catch ( controller_emit_signal_exception& e ) {
wlog( "${f}:${l} controller_emit_signal_exception: ${details}", ("f", file)("l", line)("details", e.to_detail_string()) );
throw e;
} catch ( fc::exception& e ) {
wlog( "${f}:${l} fc::exception: ${details}", ("f", file)("l", line)("details", e.to_detail_string()) );
} catch ( std::exception& e ) {
wlog( "std::exception: ${details}", ("f", file)("l", line)("details", e.what()) );
} catch ( ... ) {
wlog( "${f}:${l} signal handler threw exception", ("f", file)("l", line) );
}
}

} /// eosio::chain
9 changes: 4 additions & 5 deletions libraries/chain/include/eosio/chain/hotstuff/finalizer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ namespace eosio::chain {
finalizer_safety_information fsi;

vote_result decide_vote(const block_state_ptr& bsp);
std::optional<vote_message> maybe_vote(const bls_public_key& pub_key, const block_state_ptr& bsp,
const digest_type& digest);
vote_message_ptr maybe_vote(const bls_public_key& pub_key, const block_state_ptr& bsp, const digest_type& digest);
};

// ----------------------------------------------------------------------------------------
Expand Down Expand Up @@ -95,7 +94,7 @@ namespace eosio::chain {
if (finalizers.empty())
return;

std::vector<vote_message> votes;
std::vector<vote_message_ptr> votes;
votes.reserve(finalizers.size());

// Possible improvement in the future, look at locking only individual finalizers and releasing the lock for writing the file.
Expand All @@ -105,9 +104,9 @@ namespace eosio::chain {
// first accumulate all the votes
for (const auto& f : fin_pol.finalizers) {
if (auto it = finalizers.find(f.public_key); it != finalizers.end()) {
std::optional<vote_message> vote_msg = it->second.maybe_vote(it->first, bsp, digest);
vote_message_ptr vote_msg = it->second.maybe_vote(it->first, bsp, digest);
if (vote_msg)
votes.push_back(std::move(*vote_msg));
votes.push_back(std::move(vote_msg));
}
}
// then save the safety info and, if successful, gossip the votes
Expand Down
16 changes: 11 additions & 5 deletions libraries/chain/include/eosio/chain/hotstuff/hotstuff.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@ namespace eosio::chain {
bool strong{false};
bls_public_key finalizer_key;
bls_signature sig;

auto operator<=>(const vote_message&) const = default;
bool operator==(const vote_message&) const = default;
};

using vote_message_ptr = std::shared_ptr<vote_message>;

enum class vote_status {
success,
duplicate,
unknown_public_key,
invalid_signature,
unknown_block
duplicate, // duplicate vote, expected as votes arrive on multiple connections
unknown_public_key, // public key is invalid, indicates invalid vote
invalid_signature, // signature is invalid, indicates invalid vote
unknown_block, // block not available, possibly less than LIB, or too far in the future
max_exceeded // received too many votes for a connection
};

using bls_public_key = fc::crypto::blslib::bls_public_key;
Expand Down Expand Up @@ -159,7 +165,7 @@ namespace eosio::chain {


FC_REFLECT(eosio::chain::vote_message, (block_id)(strong)(finalizer_key)(sig));
FC_REFLECT_ENUM(eosio::chain::vote_status, (success)(duplicate)(unknown_public_key)(invalid_signature)(unknown_block))
FC_REFLECT_ENUM(eosio::chain::vote_status, (success)(duplicate)(unknown_public_key)(invalid_signature)(unknown_block)(max_exceeded))
FC_REFLECT(eosio::chain::valid_quorum_certificate, (_strong_votes)(_weak_votes)(_sig));
FC_REFLECT(eosio::chain::pending_quorum_certificate, (_valid_qc)(_quorum)(_max_weak_sum_before_weak_final)(_state)(_strong_sum)(_weak_sum)(_weak_votes)(_strong_votes));
FC_REFLECT_ENUM(eosio::chain::pending_quorum_certificate::state_t, (unrestricted)(restricted)(weak_achieved)(weak_final)(strong));
Expand Down
17 changes: 11 additions & 6 deletions libraries/chain/include/eosio/chain/thread_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,16 @@ namespace eosio { namespace chain {
/// Blocks until all threads are created and completed their init function, or an exception is thrown
/// during thread startup or an init function. Exceptions thrown during these stages are rethrown from start()
/// but some threads might still have been started. Calling stop() after such a failure is safe.
/// @param num_threads is number of threads spawned
/// @param num_threads is number of threads spawned, if 0 then no threads are spawned and stop() is a no-op.
/// @param on_except is the function to call if io_context throws an exception, is called from thread pool thread.
/// if an empty function then logs and rethrows exception on thread which will terminate. Not called
/// for exceptions during the init function (such exceptions are rethrown from start())
/// @param init is an optional function to call at startup to initialize any data.
/// @throw assert_exception if already started and not stopped.
void start( size_t num_threads, on_except_t on_except, init_t init = {} ) {
FC_ASSERT( !_ioc_work, "Thread pool already started" );
if (num_threads == 0)
return;
_ioc_work.emplace( boost::asio::make_work_guard( _ioc ) );
_ioc.restart();
_thread_pool.reserve( num_threads );
Expand All @@ -140,13 +142,16 @@ namespace eosio { namespace chain {
}

/// destroy work guard, stop io_context, join thread_pool
/// not thread safe, expected to only be called from thread that called start()
void stop() {
_ioc_work.reset();
_ioc.stop();
for( auto& t : _thread_pool ) {
t.join();
if (_thread_pool.size() > 0) {
_ioc_work.reset();
_ioc.stop();
for( auto& t : _thread_pool ) {
t.join();
}
_thread_pool.clear();
}
_thread_pool.clear();
}

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <eosio/chain/transaction_metadata.hpp>
#include <eosio/chain/trace.hpp>
#include <eosio/chain/block_state_legacy.hpp>
#include <eosio/chain/exceptions.hpp>

#include <boost/multi_index_container.hpp>
Expand Down
Loading

0 comments on commit 5e5c176

Please sign in to comment.