diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index cbb0ab2a9c..c07a6dda86 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -3225,11 +3225,12 @@ namespace eosio { return; } - bool valid_block_header = !!bsp; - if( valid_block_header ) { + uint32_t block_num = bsp ? bsp->block_num : 0; + + if( block_num != 0 ) { fc_dlog( logger, "validated block header, broadcasting immediately, connection ${cid}, blk num = ${num}, id = ${id}", - ("cid", cid)("num", bsp->block_num)("id", bsp->id) ); + ("cid", cid)("num", block_num)("id", bsp->id) ); my_impl->dispatcher->add_peer_block( bsp->id, cid ); // no need to send back to sender my_impl->dispatcher->bcast_block( bsp->block, bsp->id ); } @@ -3238,9 +3239,9 @@ namespace eosio { c->process_signed_block( id, std::move(ptr), std::move(bsp) ); }); - if( valid_block_header ) { + if( block_num != 0 ) { // ready to process immediately, so signal producer to interrupt start_block - my_impl->producer_plug->received_block(); + my_impl->producer_plug->received_block(block_num); } }); } diff --git a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp index e3f7d940cf..37f0ec45be 100644 --- a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp +++ b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp @@ -202,7 +202,7 @@ class producer_plugin : public appbase::plugin { void register_metrics_listener(metrics_listener listener); // thread-safe, called when a new block is received - void received_block(); + void received_block(uint32_t block_num); const std::set& producer_accounts() const; diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 2e0a734b53..5d593df496 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -313,8 +313,7 @@ class producer_plugin_impl : public std::enable_shared_from_this()) - ,_ro_write_window_timer(io) - ,_ro_read_window_timer(io) + ,_ro_timer(io) { } @@ -370,7 +369,7 @@ class producer_plugin_impl : public std::enable_shared_from_this _thread_pool; std::atomic _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool - std::atomic _received_block{false}; // modified by net_plugin thread pool and app thread + std::atomic _received_block{0}; // modified by net_plugin thread pool fc::microseconds _max_irreversible_block_age_us; int32_t _produce_time_offset_us = 0; int32_t _last_block_time_offset_us = 0; @@ -486,20 +485,20 @@ class producer_plugin_impl : public std::enable_shared_from_this* received_block, fc::time_point deadline) { + void set_exit_criteria(uint32_t num_tasks, std::atomic* received_block, uint32_t block_num, fc::time_point deadline) { std::lock_guard g( mtx ); // not strictly necessary with current usage from single thread assert(num_tasks > 0 && num_waiting == 0 && received_block != nullptr); - assert(received_block && *received_block == false); max_waiting = num_tasks; num_waiting = 0; received_block_ptr = received_block; + pending_block_num = block_num; read_window_deadline = deadline; exiting_read_window = false; } private: bool should_exit() { - return exiting_read_window || fc::time_point::now() >= read_window_deadline || *received_block_ptr; + return exiting_read_window || fc::time_point::now() >= read_window_deadline || (*received_block_ptr >= pending_block_num); } mutable std::mutex mtx; @@ -508,7 +507,8 @@ class producer_plugin_impl : public std::enable_shared_from_this* received_block_ptr{nullptr}; + std::atomic* received_block_ptr{nullptr}; + uint32_t pending_block_num{0}; fc::time_point read_window_deadline; }; @@ -521,8 +521,7 @@ class producer_plugin_impl : public std::enable_shared_from_this _ro_all_threads_exec_time_us; // total time spent by all threads executing transactions. use atomic for simplicity and performance fc::time_point _ro_read_window_start_time; - boost::asio::deadline_timer _ro_write_window_timer; - boost::asio::deadline_timer _ro_read_window_timer; + boost::asio::deadline_timer _ro_timer; fc::microseconds _ro_max_trx_time_us{ 0 }; // calculated during option initialization ro_trx_queue_t _ro_trx_queue; std::atomic _ro_num_active_trx_exec_tasks{ 0 }; @@ -881,7 +880,7 @@ class producer_plugin_impl : public std::enable_shared_from_this= pending_block_num); } producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { @@ -1895,6 +1894,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { const fc::time_point now = fc::time_point::now(); const fc::time_point block_time = calculate_pending_block_time(); + const uint32_t pending_block_num = hbs->block_num + 1; const fc::time_point preprocess_deadline = calculate_block_deadline(block_time); const pending_block_mode previous_pending_mode = _pending_block_mode; @@ -1960,7 +1960,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { if (_pending_block_mode == pending_block_mode::producing) { const auto start_block_time = block_time - fc::microseconds( config::block_interval_us ); if( now < start_block_time ) { - fc_dlog(_log, "Not producing block waiting for production window ${n} ${bt}", ("n", hbs->block_num + 1)("bt", block_time) ); + fc_dlog(_log, "Not producing block waiting for production window ${n} ${bt}", ("n", pending_block_num)("bt", block_time) ); // start_block_time instead of block_time because schedule_delayed_production_loop calculates next block time from given time schedule_delayed_production_loop(weak_from_this(), calculate_producer_wake_up_time(start_block_time)); return start_block_result::waiting_for_production; @@ -1974,7 +1974,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { } fc_dlog(_log, "Starting block #${n} at ${time} producer ${p}", - ("n", hbs->block_num + 1)("time", now)("p", scheduled_producer.producer_name)); + ("n", pending_block_num)("time", now)("p", scheduled_producer.producer_name)); try { uint16_t blocks_to_confirm = 0; @@ -2038,7 +2038,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { std::swap( features_to_activate, protocol_features_to_activate ); _protocol_features_signaled = true; ilog( "signaling activation of the following protocol features in block ${num}: ${features_to_activate}", - ("num", hbs->block_num + 1)("features_to_activate", features_to_activate) ); + ("num", pending_block_num)("features_to_activate", features_to_activate) ); } } @@ -2064,7 +2064,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { if( !remove_expired_blacklisted_trxs( preprocess_deadline ) ) return start_block_result::exhausted; if( !_subjective_billing.remove_expired( _log, chain.pending_block_time(), fc::time_point::now(), - [&](){ return should_interrupt_start_block( preprocess_deadline ); } ) ) { + [&](){ return should_interrupt_start_block( preprocess_deadline, pending_block_num ); } ) ) { return start_block_result::exhausted; } @@ -2089,7 +2089,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { if( app().is_quiting() ) // db guard exception above in LOG_AND_DROP could have called app().quit() return start_block_result::failed; - if ( should_interrupt_start_block( preprocess_deadline ) || block_is_exhausted() ) { + if ( should_interrupt_start_block( preprocess_deadline, pending_block_num ) || block_is_exhausted() ) { return start_block_result::exhausted; } @@ -2116,11 +2116,12 @@ bool producer_plugin_impl::remove_expired_trxs( const fc::time_point& deadline ) { chain::controller& chain = chain_plug->chain(); auto pending_block_time = chain.pending_block_time(); + auto pending_block_num = chain.pending_block_num(); // remove all expired transactions size_t num_expired = 0; size_t orig_count = _unapplied_transactions.size(); - bool exhausted = !_unapplied_transactions.clear_expired( pending_block_time, [&](){ return should_interrupt_start_block(deadline); }, + bool exhausted = !_unapplied_transactions.clear_expired( pending_block_time, [&](){ return should_interrupt_start_block(deadline, pending_block_num); }, [&num_expired]( const packed_transaction_ptr& packed_trx_ptr, trx_enum_type trx_type ) { // expired exception is logged as part of next() call ++num_expired; @@ -2144,12 +2145,13 @@ bool producer_plugin_impl::remove_expired_blacklisted_trxs( const fc::time_point if(!blacklist_by_expiry.empty()) { const chain::controller& chain = chain_plug->chain(); const auto lib_time = chain.last_irreversible_block_time(); + const auto pending_block_num = chain.pending_block_num(); int num_expired = 0; int orig_count = _blacklisted_transactions.size(); while (!blacklist_by_expiry.empty() && blacklist_by_expiry.begin()->expiry <= lib_time) { - if ( should_interrupt_start_block( deadline ) ) { + if ( should_interrupt_start_block( deadline, pending_block_num ) ) { exhausted = true; break; } @@ -2416,12 +2418,14 @@ bool producer_plugin_impl::process_unapplied_trxs( const fc::time_point& deadlin { bool exhausted = false; if( !_unapplied_transactions.empty() ) { + const chain::controller& chain = chain_plug->chain(); + const auto pending_block_num = chain.pending_block_num(); int num_applied = 0, num_failed = 0, num_processed = 0; auto unapplied_trxs_size = _unapplied_transactions.size(); auto itr = _unapplied_transactions.unapplied_begin(); auto end_itr = _unapplied_transactions.unapplied_end(); while( itr != end_itr ) { - if( should_interrupt_start_block( deadline ) ) { + if( should_interrupt_start_block( deadline, pending_block_num ) ) { exhausted = true; break; } @@ -2596,8 +2600,10 @@ bool producer_plugin_impl::process_incoming_trxs( const fc::time_point& deadline if( itr != end ) { size_t processed = 0; fc_dlog( _log, "Processing ${n} pending transactions", ("n", _unapplied_transactions.incoming_size()) ); + const chain::controller& chain = chain_plug->chain(); + const auto pending_block_num = chain.pending_block_num(); while( itr != end ) { - if ( should_interrupt_start_block( deadline ) ) { + if ( should_interrupt_start_block( deadline, pending_block_num ) ) { exhausted = true; break; } @@ -2640,7 +2646,6 @@ bool producer_plugin_impl::block_is_exhausted() const { // -> Idle // --> Start block B (block time y.000) at time x.500 void producer_plugin_impl::schedule_production_loop() { - _received_block = false; _timer.cancel(); auto result = start_block(); @@ -2848,8 +2853,8 @@ void producer_plugin_impl::produce_block() { ("confs", new_bs->header.confirmed)); } -void producer_plugin::received_block() { - my->_received_block = true; +void producer_plugin::received_block(uint32_t block_num) { + my->_received_block = block_num; } void producer_plugin::log_failed_transaction(const transaction_id_type& trx_id, const packed_transaction_ptr& packed_trx_ptr, const char* reason) const { @@ -2878,8 +2883,7 @@ void producer_plugin_impl::switch_to_write_window() { } EOS_ASSERT(_ro_num_active_trx_exec_tasks.load() == 0 && _ro_trx_exec_tasks_fut.empty(), producer_exception, "no read-only tasks should be running before switching to write window"); - _ro_read_window_timer.cancel(); - _ro_write_window_timer.cancel(); + _ro_timer.cancel(); start_write_window(); } @@ -2893,8 +2897,8 @@ void producer_plugin_impl::start_write_window() { _idle_trx_time = fc::time_point::now(); auto expire_time = boost::posix_time::microseconds(_ro_write_window_time_us.count()); - _ro_write_window_timer.expires_from_now( expire_time ); - _ro_write_window_timer.async_wait( app().executor().wrap( // stay on app thread + _ro_timer.expires_from_now( expire_time ); + _ro_timer.async_wait( app().executor().wrap( // stay on app thread priority::high, exec_queue::read_only_trx_safe, // placed in read_only_trx_safe queue so it is ensured to be executed in either window [weak_this = weak_from_this()]( const boost::system::error_code& ec ) { @@ -2910,8 +2914,7 @@ void producer_plugin_impl::switch_to_read_window() { EOS_ASSERT(app().executor().is_write_window(), producer_exception, "expected to be in write window"); EOS_ASSERT(_ro_num_active_trx_exec_tasks.load() == 0 && _ro_trx_exec_tasks_fut.empty(), producer_exception, "_ro_trx_exec_tasks_fut expected to be empty" ); - _ro_write_window_timer.cancel(); - _ro_read_window_timer.cancel(); + _ro_timer.cancel(); _time_tracker.add_idle_time( fc::time_point::now() - _idle_trx_time ); // we are in write window, so no read-only trx threads are processing transactions. @@ -2921,16 +2924,17 @@ void producer_plugin_impl::switch_to_read_window() { return; } + auto& chain = chain_plug->chain(); + uint32_t pending_block_num = chain.head_block_num() + 1; app().executor().set_to_read_window(); - chain_plug->chain().set_db_read_only_mode(); - _received_block = false; + chain.set_db_read_only_mode(); _ro_read_window_start_time = fc::time_point::now(); _ro_all_threads_exec_time_us = 0; // start a read-only transaction execution task in each thread in the thread pool _ro_num_active_trx_exec_tasks = _ro_thread_pool_size; auto start_time = fc::time_point::now(); - _ro_trx_queue.set_exit_criteria(_ro_thread_pool_size, &_received_block, start_time + _ro_read_window_effective_time_us); + _ro_trx_queue.set_exit_criteria(_ro_thread_pool_size, &_received_block, pending_block_num, start_time + _ro_read_window_effective_time_us); for (auto i = 0; i < _ro_thread_pool_size; ++i ) { _ro_trx_exec_tasks_fut.emplace_back( post_async_task( _ro_thread_pool.get_executor(), [this, start_time] () { return read_only_trx_execution_task(start_time); @@ -2938,8 +2942,8 @@ void producer_plugin_impl::switch_to_read_window() { } auto expire_time = boost::posix_time::microseconds(_ro_read_window_time_us.count()); - _ro_read_window_timer.expires_from_now( expire_time ); - _ro_read_window_timer.async_wait( app().executor().wrap( // stay on app thread + _ro_timer.expires_from_now( expire_time ); + _ro_timer.async_wait( app().executor().wrap( // stay on app thread priority::high, exec_queue::read_only_trx_safe, [weak_this = weak_from_this()]( const boost::system::error_code& ec ) {