Skip to content

Commit

Permalink
GH-525 Only interrupt start_block for validated block_header.
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Dec 15, 2022
1 parent b3391af commit 9b1401a
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 62 deletions.
83 changes: 52 additions & 31 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2056,43 +2056,59 @@ struct controller_impl {
}
} FC_CAPTURE_AND_RETHROW() } /// apply_block


block_state_ptr create_block_state( const block_id_type& id, const signed_block_ptr& b, const block_header_state_ptr& prev ) {
auto trx_mroot = calculate_trx_merkle( b->transactions );
EOS_ASSERT( b->transaction_mroot == trx_mroot, block_validate_exception,
"invalid block transaction merkle root ${b} != ${c}", ("b", b->transaction_mroot)("c", trx_mroot) );

const bool skip_validate_signee = false;
auto bsp = std::make_shared<block_state>(
*prev,
b,
protocol_features.get_protocol_feature_set(),
[this]( block_timestamp_type timestamp,
const flat_set<digest_type>& cur_features,
const vector<digest_type>& new_features )
{ check_protocol_features( timestamp, cur_features, new_features ); },
skip_validate_signee
);

EOS_ASSERT( id == bsp->id, block_validate_exception,
"provided id ${id} does not match block id ${bid}", ("id", id)("bid", bsp->id) );
return bsp;
}

std::future<block_state_ptr> create_block_state_future( const block_id_type& id, const signed_block_ptr& b ) {
EOS_ASSERT( b, block_validate_exception, "null block" );

// no reason for a block_state if fork_db already knows about block
auto existing = fork_db.get_block( id );
EOS_ASSERT( !existing, fork_database_exception, "we already know about this block: ${id}", ("id", id) );
return async_thread_pool( thread_pool.get_executor(), [b, id, control=this]() {
// no reason for a block_state if fork_db already knows about block
auto existing = control->fork_db.get_block( id );
EOS_ASSERT( !existing, fork_database_exception, "we already know about this block: ${id}", ("id", id) );

auto prev = fork_db.get_block_header( b->previous );
EOS_ASSERT( prev, unlinkable_block_exception,
"unlinkable block ${id}", ("id", id)("previous", b->previous) );
auto prev = control->fork_db.get_block_header( b->previous );
EOS_ASSERT( prev, unlinkable_block_exception,
"unlinkable block ${id}", ("id", id)("previous", b->previous) );

return async_thread_pool( thread_pool.get_executor(), [b, prev, id, control=this]() {
const bool skip_validate_signee = false;
return control->create_block_state( id, b, prev );
} );
}

auto trx_mroot = calculate_trx_merkle( b->transactions );
EOS_ASSERT( b->transaction_mroot == trx_mroot, block_validate_exception,
"invalid block transaction merkle root ${b} != ${c}", ("b", b->transaction_mroot)("c", trx_mroot) );
block_state_ptr create_block_state( const block_id_type& id, const signed_block_ptr& b ) {
EOS_ASSERT( b, block_validate_exception, "null block" );

auto bsp = std::make_shared<block_state>(
*prev,
move( b ),
control->protocol_features.get_protocol_feature_set(),
[control]( block_timestamp_type timestamp,
const flat_set<digest_type>& cur_features,
const vector<digest_type>& new_features )
{ control->check_protocol_features( timestamp, cur_features, new_features ); },
skip_validate_signee
);
block_state_ptr bsp;
// previous not found could mean that previous block not applied yet
auto prev = fork_db.get_block_header( b->previous );
if( !prev ) return bsp;

EOS_ASSERT( id == bsp->id, block_validate_exception,
"provided id ${id} does not match block id ${bid}", ("id", id)("bid", bsp->id) );
return bsp;
} );
return create_block_state( id, b, prev );
}

void push_block( std::future<block_state_ptr>& block_state_future,
const forked_branch_callback& forked_branch_cb, const trx_meta_cache_lookup& trx_lookup )
void push_block( const block_state_ptr& bsp,
const forked_branch_callback& forked_branch_cb,
const trx_meta_cache_lookup& trx_lookup )
{
controller::block_status s = controller::block_status::complete;
EOS_ASSERT(!pending, block_validate_exception, "it is not valid to push a block when there is a pending block");
Expand All @@ -2101,7 +2117,7 @@ struct controller_impl {
trusted_producer_light_validation = old_value;
});
try {
block_state_ptr bsp = block_state_future.get();
EOS_ASSERT( bsp, block_validate_exception, "null block" );
const auto& b = bsp->block;

if( conf.terminate_at_block > 0 && conf.terminate_at_block < self.head_block_num()) {
Expand Down Expand Up @@ -2816,11 +2832,16 @@ std::future<block_state_ptr> controller::create_block_state_future( const block_
return my->create_block_state_future( id, b );
}

void controller::push_block( std::future<block_state_ptr>& block_state_future,
const forked_branch_callback& forked_branch_cb, const trx_meta_cache_lookup& trx_lookup )
block_state_ptr controller::create_block_state( const block_id_type& id, const signed_block_ptr& b ) const {
return my->create_block_state( id, b );
}

void controller::push_block( const block_state_ptr& bsp,
const forked_branch_callback& forked_branch_cb,
const trx_meta_cache_lookup& trx_lookup )
{
validate_db_available_size();
my->push_block( block_state_future, forked_branch_cb, trx_lookup );
my->push_block( bsp, forked_branch_cb, trx_lookup );
}

transaction_trace_ptr controller::push_transaction( const transaction_metadata_ptr& trx,
Expand Down
6 changes: 4 additions & 2 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,15 @@ namespace eosio { namespace chain {

// thread-safe
std::future<block_state_ptr> create_block_state_future( const block_id_type& id, const signed_block_ptr& b );
// thread-safe
block_state_ptr create_block_state( const block_id_type& id, const signed_block_ptr& b ) const;

/**
* @param block_state_future provide from call to create_block_state_future
* @param bsp block to push
* @param cb calls cb with forked applied transactions for each forked block
* @param trx_lookup user provided lookup function for externally cached transaction_metadata
*/
void push_block( std::future<block_state_ptr>& block_state_future,
void push_block( const block_state_ptr& bsp,
const forked_branch_callback& cb,
const trx_meta_cache_lookup& trx_lookup );

Expand Down
8 changes: 4 additions & 4 deletions libraries/testing/include/eosio/testing/tester.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ namespace eosio { namespace testing {
signed_block_ptr produce_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override {
auto sb = _produce_block(skip_time, false);
auto bsf = validating_node->create_block_state_future( sb->calculate_id(), sb );
validating_node->push_block( bsf, forked_branch_callback{}, trx_meta_cache_lookup{} );
validating_node->push_block( bsf.get(), forked_branch_callback{}, trx_meta_cache_lookup{} );

return sb;
}
Expand All @@ -607,15 +607,15 @@ namespace eosio { namespace testing {
}

void validate_push_block(const signed_block_ptr& sb) {
auto bs = validating_node->create_block_state_future( sb->calculate_id(), sb );
validating_node->push_block( bs, forked_branch_callback{}, trx_meta_cache_lookup{} );
auto bsf = validating_node->create_block_state_future( sb->calculate_id(), sb );
validating_node->push_block( bsf.get(), forked_branch_callback{}, trx_meta_cache_lookup{} );
}

signed_block_ptr produce_empty_block( fc::microseconds skip_time = fc::milliseconds(config::block_interval_ms) )override {
unapplied_transactions.add_aborted( control->abort_block() );
auto sb = _produce_block(skip_time, true);
auto bsf = validating_node->create_block_state_future( sb->calculate_id(), sb );
validating_node->push_block( bsf, forked_branch_callback{}, trx_meta_cache_lookup{} );
validating_node->push_block( bsf.get(), forked_branch_callback{}, trx_meta_cache_lookup{} );

return sb;
}
Expand Down
4 changes: 2 additions & 2 deletions libraries/testing/tester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ namespace eosio { namespace testing {
void base_tester::push_block(signed_block_ptr b) {
auto bsf = control->create_block_state_future(b->calculate_id(), b);
unapplied_transactions.add_aborted( control->abort_block() );
control->push_block( bsf, [this]( const branch_type& forked_branch ) {
control->push_block( bsf.get(), [this]( const branch_type& forked_branch ) {
unapplied_transactions.add_forked( forked_branch );
}, [this]( const transaction_id_type& id ) {
return unapplied_transactions.get_trx( id );
Expand Down Expand Up @@ -1050,7 +1050,7 @@ namespace eosio { namespace testing {
if( block ) { //&& !b.control->is_known_block(block->id()) ) {
auto bsf = b.control->create_block_state_future( block->calculate_id(), block );
b.control->abort_block();
b.control->push_block(bsf, forked_branch_callback{}, trx_meta_cache_lookup{}); //, eosio::chain::validation_steps::created_block);
b.control->push_block(bsf.get(), forked_branch_callback{}, trx_meta_cache_lookup{}); //, eosio::chain::validation_steps::created_block);
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ namespace eosio { namespace chain { namespace plugin_interface {
}

namespace methods {
// synchronously push a block/trx to a single provider
using block_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, const std::optional<block_id_type>&), first_provider_policy>;
// synchronously push a block/trx to a single provider, block_state_ptr may be null
using block_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, const std::optional<block_id_type>&, const block_state_ptr&), first_provider_policy>;
using transaction_async = method_decl<chain_plugin_interface, void(const packed_transaction_ptr&, bool, bool, bool, next_function<transaction_trace_ptr>), first_provider_policy>;
}
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1331,8 +1331,8 @@ chain_apis::read_only chain_plugin::get_read_only_api() const {
}


bool chain_plugin::accept_block(const signed_block_ptr& block, const block_id_type& id ) {
return my->incoming_block_sync_method(block, id);
bool chain_plugin::accept_block(const signed_block_ptr& block, const block_id_type& id, const block_state_ptr& bsp ) {
return my->incoming_block_sync_method(block, id, bsp);
}

void chain_plugin::accept_transaction(const chain::packed_transaction_ptr& trx, next_function<chain::transaction_trace_ptr> next) {
Expand Down Expand Up @@ -2128,7 +2128,7 @@ fc::variant read_only::get_block_header_state(const get_block_header_state_param

void read_write::push_block(read_write::push_block_params&& params, next_function<read_write::push_block_results> next) {
try {
app().get_method<incoming::methods::block_sync>()(std::make_shared<signed_block>(std::move(params)), {});
app().get_method<incoming::methods::block_sync>()(std::make_shared<signed_block>( std::move(params) ), std::optional<block_id_type>{}, block_state_ptr{});
} catch ( boost::interprocess::bad_alloc& ) {
chain_plugin::handle_db_exhaustion();
} catch ( const std::bad_alloc& ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ class chain_plugin : public plugin<chain_plugin> {
chain_apis::read_write get_read_write_api();
chain_apis::read_only get_read_only_api() const;

bool accept_block( const chain::signed_block_ptr& block, const chain::block_id_type& id );
bool accept_block( const chain::signed_block_ptr& block, const chain::block_id_type& id, const chain::block_state_ptr& bsp );
void accept_transaction(const chain::packed_transaction_ptr& trx, chain::plugin_interface::next_function<chain::transaction_trace_ptr> next);

// Only call this after plugin_initialize()!
Expand Down
42 changes: 36 additions & 6 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ namespace eosio {
void handle_message( const packed_transaction& msg ) = delete; // packed_transaction_ptr overload used instead
void handle_message( packed_transaction_ptr msg );

void process_signed_block( const block_id_type& id, signed_block_ptr msg );
void process_signed_block( const block_id_type& id, signed_block_ptr msg, block_state_ptr bsp );

fc::variant_object get_logger_variant() const {
fc::mutable_variant_object mvo;
Expand Down Expand Up @@ -3143,14 +3143,44 @@ namespace eosio {
// called from connection strand
void connection::handle_message( const block_id_type& id, signed_block_ptr ptr ) {
peer_dlog( this, "received signed_block ${num}, id ${id}", ("num", ptr->block_num())("id", id) );
my_impl->producer_plug->received_block();
app().post(priority::medium, [ptr{std::move(ptr)}, id, c = shared_from_this()]() mutable {
c->process_signed_block( id, std::move( ptr ) );

controller& cc = my_impl->chain_plug->chain();
block_state_ptr bsp;
bool exception = false;
try {
if( cc.fetch_block_state_by_id( id ) ) {
my_impl->dispatcher->add_peer_block( id, connection_id );
my_impl->sync_master->sync_recv_block( shared_from_this(), id, ptr->block_num(), false );
return;
}
// this may return null if block is not immediately ready to be processed
bsp = cc.create_block_state( id, ptr );
} catch( const fc::exception& ex ) {
exception = true;
peer_elog(this, "bad block exception: #${n} ${id}...: ${m}",
("n", ptr->block_num())("id", id.str().substr(8,16))("m",ex.to_string()));
} catch( ... ) {
exception = true;
peer_elog(this, "bad block: #${n} ${id}...: unknown exception",
("n", ptr->block_num())("id", id.str().substr(8,16)));
}
if( exception ) {
my_impl->sync_master->rejected_block( shared_from_this(), ptr->block_num() );
my_impl->dispatcher->rejected_block( id );
return;
}

bool signal_producer = !!bsp; // ready to process immediately, so signal producer to interrupt start_block
app().post(priority::medium, [ptr{std::move(ptr)}, bsp{std::move(bsp)}, id, c = shared_from_this()]() mutable {
c->process_signed_block( id, std::move(ptr), std::move(bsp) );
});

if( signal_producer )
my_impl->producer_plug->received_block();
}

// called from application thread
void connection::process_signed_block( const block_id_type& blk_id, signed_block_ptr msg ) {
void connection::process_signed_block( const block_id_type& blk_id, signed_block_ptr msg, block_state_ptr bsp ) {
controller& cc = my_impl->chain_plug->chain();
uint32_t blk_num = msg->block_num();
// use c in this method instead of this to highlight that all methods called on c-> must be thread safe
Expand Down Expand Up @@ -3180,7 +3210,7 @@ namespace eosio {

go_away_reason reason = fatal_other;
try {
bool accepted = my_impl->chain_plug->accept_block(msg, blk_id);
bool accepted = my_impl->chain_plug->accept_block(msg, blk_id, bsp);
my_impl->update_chain_info();
if( !accepted ) return;
reason = no_reason;
Expand Down
14 changes: 9 additions & 5 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
_subjective_billing.abort_block();
}

bool on_incoming_block(const signed_block_ptr& block, const std::optional<block_id_type>& block_id) {
bool on_incoming_block(const signed_block_ptr& block, const std::optional<block_id_type>& block_id, const block_state_ptr& bsp) {
auto& chain = chain_plug->chain();
if ( _pending_block_mode == pending_block_mode::producing ) {
fc_wlog( _log, "dropped incoming block #${num} id: ${id}",
Expand All @@ -457,7 +457,10 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
if( existing ) { return false; }

// start processing of block
auto bsf = chain.create_block_state_future( id, block );
std::future<block_state_ptr> bsf;
if( !bsp ) {
bsf = chain.create_block_state_future( id, block );
}

// abort the pending block
abort_block();
Expand All @@ -471,7 +474,8 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
};

try {
chain.push_block( bsf, [this]( const branch_type& forked_branch ) {
const block_state_ptr& bspr = bsp ? bsp : bsf.get();
chain.push_block( bspr, [this]( const branch_type& forked_branch ) {
_unapplied_transactions.add_forked( forked_branch );
}, [this]( const transaction_id_type& id ) {
return _unapplied_transactions.get_trx( id );
Expand Down Expand Up @@ -1137,8 +1141,8 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
});

my->_incoming_block_sync_provider = app().get_method<incoming::methods::block_sync>().register_provider(
[this](const signed_block_ptr& block, const std::optional<block_id_type>& block_id) {
return my->on_incoming_block(block, block_id);
[this](const signed_block_ptr& block, const std::optional<block_id_type>& block_id, const block_state_ptr& bsp) {
return my->on_incoming_block(block, block_id, bsp);
});

my->_incoming_transaction_async_provider = app().get_method<incoming::methods::transaction_async>().register_provider(
Expand Down
8 changes: 4 additions & 4 deletions unittests/block_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ BOOST_AUTO_TEST_CASE(block_with_invalid_tx_test)

// Push block with invalid transaction to other chain
tester validator;
auto bs = validator.control->create_block_state_future( copy_b->calculate_id(), copy_b );
auto bsf = validator.control->create_block_state_future( copy_b->calculate_id(), copy_b );
validator.control->abort_block();
BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bs, forked_branch_callback{}, trx_meta_cache_lookup{} ), fc::exception ,
BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bsf.get(), forked_branch_callback{}, trx_meta_cache_lookup{} ), fc::exception ,
[] (const fc::exception &e)->bool {
return e.code() == account_name_exists_exception::code_value ;
}) ;
Expand Down Expand Up @@ -82,9 +82,9 @@ BOOST_AUTO_TEST_CASE(block_with_invalid_tx_mroot_test)

// Push block with invalid transaction to other chain
tester validator;
auto bs = validator.control->create_block_state_future( copy_b->calculate_id(), copy_b );
auto bsf = validator.control->create_block_state_future( copy_b->calculate_id(), copy_b );
validator.control->abort_block();
BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bs, forked_branch_callback{}, trx_meta_cache_lookup{} ), fc::exception ,
BOOST_REQUIRE_EXCEPTION(validator.control->push_block( bsf.get(), forked_branch_callback{}, trx_meta_cache_lookup{} ), fc::exception ,
[] (const fc::exception &e)->bool {
return e.code() == block_validate_exception::code_value &&
e.to_detail_string().find("invalid block transaction merkle root") != std::string::npos;
Expand Down
Loading

0 comments on commit 9b1401a

Please sign in to comment.