From ebadb713e9a0c275550b610ddbf4dcd699ddec7b Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 24 Mar 2023 14:17:13 -0500 Subject: [PATCH 1/3] GH-891 Use block_num for interrupt of start_block to handle case of receiving block before beginning of start_block --- plugins/net_plugin/net_plugin.cpp | 6 +-- .../eosio/producer_plugin/producer_plugin.hpp | 2 +- plugins/producer_plugin/producer_plugin.cpp | 38 +++++++++++-------- 3 files changed, 26 insertions(+), 20 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 61b61fdd35..85c56feb37 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -3166,13 +3166,13 @@ namespace eosio { return; } - bool signal_producer = !!bsp; // ready to process immediately, so signal producer to interrupt start_block + uint32_t block_num = bsp ? bsp->block_num : 0; app().post(priority::medium, [ptr{std::move(ptr)}, bsp{std::move(bsp)}, id, c = shared_from_this()]() mutable { c->process_signed_block( id, std::move(ptr), std::move(bsp) ); }); - if( signal_producer ) - my_impl->producer_plug->received_block(); + if( block_num != 0 ) // ready to process immediately, so signal producer to interrupt start_block + my_impl->producer_plug->received_block(block_num); } // called from application thread diff --git a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp index bf54bc514d..31566ff1ea 100644 --- a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp +++ b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp @@ -145,7 +145,7 @@ class producer_plugin : public appbase::plugin { void log_failed_transaction(const transaction_id_type& trx_id, const chain::packed_transaction_ptr& packed_trx_ptr, const char* reason) const; // thread-safe, called when a new block is received - void received_block(); + void received_block(uint32_t block_num); private: std::shared_ptr my; diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index f5486160d9..8069bdca1a 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -333,7 +333,7 @@ class producer_plugin_impl : public std::enable_shared_from_this _thread_pool; std::atomic _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool - std::atomic _received_block{false}; // modified by net_plugin thread pool and app thread + std::atomic _received_block{false}; // modified by net_plugin thread pool fc::microseconds _max_irreversible_block_age_us; int32_t _produce_time_offset_us = 0; int32_t _last_block_time_offset_us = 0; @@ -693,7 +693,7 @@ class producer_plugin_impl : public std::enable_shared_from_this= pending_block_num); } producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { @@ -1631,6 +1631,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { const fc::time_point now = fc::time_point::now(); const fc::time_point block_time = calculate_pending_block_time(); + const uint32_t pending_block_num = hbs->block_num + 1; const fc::time_point preprocess_deadline = calculate_block_deadline(block_time); const pending_block_mode previous_pending_mode = _pending_block_mode; @@ -1696,7 +1697,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { if (_pending_block_mode == pending_block_mode::producing) { const auto start_block_time = block_time - fc::microseconds( config::block_interval_us ); if( now < start_block_time ) { - fc_dlog(_log, "Not producing block waiting for production window ${n} ${bt}", ("n", hbs->block_num + 1)("bt", block_time) ); + fc_dlog(_log, "Not producing block waiting for production window ${n} ${bt}", ("n", pending_block_num)("bt", block_time) ); // start_block_time instead of block_time because schedule_delayed_production_loop calculates next block time from given time schedule_delayed_production_loop(weak_from_this(), calculate_producer_wake_up_time(start_block_time)); return start_block_result::waiting_for_production; @@ -1710,7 +1711,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { } fc_dlog(_log, "Starting block #${n} at ${time} producer ${p}", - ("n", hbs->block_num + 1)("time", now)("p", scheduled_producer.producer_name)); + ("n", pending_block_num)("time", now)("p", scheduled_producer.producer_name)); try { uint16_t blocks_to_confirm = 0; @@ -1774,7 +1775,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { std::swap( features_to_activate, protocol_features_to_activate ); _protocol_features_signaled = true; ilog( "signaling activation of the following protocol features in block ${num}: ${features_to_activate}", - ("num", hbs->block_num + 1)("features_to_activate", features_to_activate) ); + ("num", pending_block_num)("features_to_activate", features_to_activate) ); } } @@ -1798,7 +1799,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { if( !remove_expired_blacklisted_trxs( preprocess_deadline ) ) return start_block_result::exhausted; if( !_subjective_billing.remove_expired( _log, chain.pending_block_time(), fc::time_point::now(), - [&](){ return should_interrupt_start_block( preprocess_deadline ); } ) ) { + [&](){ return should_interrupt_start_block( preprocess_deadline, pending_block_num ); } ) ) { return start_block_result::exhausted; } @@ -1822,7 +1823,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() { if( app().is_quiting() ) // db guard exception above in LOG_AND_DROP could have called app().quit() return start_block_result::failed; - if ( should_interrupt_start_block( preprocess_deadline ) || block_is_exhausted() ) { + if ( should_interrupt_start_block( preprocess_deadline, pending_block_num ) || block_is_exhausted() ) { return start_block_result::exhausted; } @@ -1849,12 +1850,13 @@ bool producer_plugin_impl::remove_expired_trxs( const fc::time_point& deadline ) { chain::controller& chain = chain_plug->chain(); auto pending_block_time = chain.pending_block_time(); + auto pending_block_num = chain.pending_block_num(); // remove all expired transactions size_t num_expired_persistent = 0; size_t num_expired_other = 0; size_t orig_count = _unapplied_transactions.size(); - bool exhausted = !_unapplied_transactions.clear_expired( pending_block_time, [&](){ return should_interrupt_start_block(deadline); }, + bool exhausted = !_unapplied_transactions.clear_expired( pending_block_time, [&](){ return should_interrupt_start_block(deadline, pending_block_num); }, [&num_expired_persistent, &num_expired_other]( const packed_transaction_ptr& packed_trx_ptr, trx_enum_type trx_type ) { // expired exception is logged as part of next() call if( trx_type == trx_enum_type::persisted ) { @@ -1885,12 +1887,13 @@ bool producer_plugin_impl::remove_expired_blacklisted_trxs( const fc::time_point if(!blacklist_by_expiry.empty()) { const chain::controller& chain = chain_plug->chain(); const auto lib_time = chain.last_irreversible_block_time(); + const auto pending_block_num = chain.pending_block_num(); int num_expired = 0; int orig_count = _blacklisted_transactions.size(); while (!blacklist_by_expiry.empty() && blacklist_by_expiry.begin()->expiry <= lib_time) { - if ( should_interrupt_start_block( deadline ) ) { + if ( should_interrupt_start_block( deadline, pending_block_num ) ) { exhausted = true; break; } @@ -2124,6 +2127,8 @@ bool producer_plugin_impl::process_unapplied_trxs( const fc::time_point& deadlin bool exhausted = false; if( !_unapplied_transactions.empty() ) { if( _pending_block_mode != pending_block_mode::producing && _disable_persist_until_expired ) return !exhausted; + const chain::controller& chain = chain_plug->chain(); + const auto pending_block_num = chain.pending_block_num(); int num_applied = 0, num_failed = 0, num_processed = 0; auto unapplied_trxs_size = _unapplied_transactions.size(); // unapplied and persisted do not have a next method to call @@ -2132,7 +2137,7 @@ bool producer_plugin_impl::process_unapplied_trxs( const fc::time_point& deadlin auto end_itr = (_pending_block_mode == pending_block_mode::producing) ? _unapplied_transactions.unapplied_end() : _unapplied_transactions.persisted_end(); while( itr != end_itr ) { - if( should_interrupt_start_block( deadline ) ) { + if( should_interrupt_start_block( deadline, pending_block_num ) ) { exhausted = true; break; } @@ -2313,8 +2318,10 @@ bool producer_plugin_impl::process_incoming_trxs( const fc::time_point& deadline if( itr != end ) { size_t processed = 0; fc_dlog( _log, "Processing ${n} pending transactions", ("n", _unapplied_transactions.incoming_size()) ); + const chain::controller& chain = chain_plug->chain(); + const auto pending_block_num = chain.pending_block_num(); while( itr != end ) { - if ( should_interrupt_start_block( deadline ) ) { + if ( should_interrupt_start_block( deadline, pending_block_num ) ) { exhausted = true; break; } @@ -2360,7 +2367,6 @@ bool producer_plugin_impl::block_is_exhausted() const { // -> Idle // --> Start block B (block time y.000) at time x.500 void producer_plugin_impl::schedule_production_loop() { - _received_block = false; _timer.cancel(); auto result = start_block(); @@ -2565,8 +2571,8 @@ void producer_plugin_impl::produce_block() { ("confs", new_bs->header.confirmed)); } -void producer_plugin::received_block() { - my->_received_block = true; +void producer_plugin::received_block(uint32_t block_num) { + my->_received_block = block_num; } void producer_plugin::log_failed_transaction(const transaction_id_type& trx_id, const packed_transaction_ptr& packed_trx_ptr, const char* reason) const { From d73bf1b0a5399f467f78f90b66ac5d890c06da53 Mon Sep 17 00:00:00 2001 From: Lin Huang Date: Mon, 27 Mar 2023 10:52:24 -0400 Subject: [PATCH 2/3] Use a single timer for read and write windows --- plugins/producer_plugin/producer_plugin.cpp | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index 2e0a734b53..cbd538fc43 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -313,8 +313,7 @@ class producer_plugin_impl : public std::enable_shared_from_this()) - ,_ro_write_window_timer(io) - ,_ro_read_window_timer(io) + ,_ro_timer(io) { } @@ -521,8 +520,7 @@ class producer_plugin_impl : public std::enable_shared_from_this _ro_all_threads_exec_time_us; // total time spent by all threads executing transactions. use atomic for simplicity and performance fc::time_point _ro_read_window_start_time; - boost::asio::deadline_timer _ro_write_window_timer; - boost::asio::deadline_timer _ro_read_window_timer; + boost::asio::deadline_timer _ro_timer; fc::microseconds _ro_max_trx_time_us{ 0 }; // calculated during option initialization ro_trx_queue_t _ro_trx_queue; std::atomic _ro_num_active_trx_exec_tasks{ 0 }; @@ -2878,8 +2876,7 @@ void producer_plugin_impl::switch_to_write_window() { } EOS_ASSERT(_ro_num_active_trx_exec_tasks.load() == 0 && _ro_trx_exec_tasks_fut.empty(), producer_exception, "no read-only tasks should be running before switching to write window"); - _ro_read_window_timer.cancel(); - _ro_write_window_timer.cancel(); + _ro_timer.cancel(); start_write_window(); } @@ -2893,8 +2890,8 @@ void producer_plugin_impl::start_write_window() { _idle_trx_time = fc::time_point::now(); auto expire_time = boost::posix_time::microseconds(_ro_write_window_time_us.count()); - _ro_write_window_timer.expires_from_now( expire_time ); - _ro_write_window_timer.async_wait( app().executor().wrap( // stay on app thread + _ro_timer.expires_from_now( expire_time ); + _ro_timer.async_wait( app().executor().wrap( // stay on app thread priority::high, exec_queue::read_only_trx_safe, // placed in read_only_trx_safe queue so it is ensured to be executed in either window [weak_this = weak_from_this()]( const boost::system::error_code& ec ) { @@ -2910,8 +2907,7 @@ void producer_plugin_impl::switch_to_read_window() { EOS_ASSERT(app().executor().is_write_window(), producer_exception, "expected to be in write window"); EOS_ASSERT(_ro_num_active_trx_exec_tasks.load() == 0 && _ro_trx_exec_tasks_fut.empty(), producer_exception, "_ro_trx_exec_tasks_fut expected to be empty" ); - _ro_write_window_timer.cancel(); - _ro_read_window_timer.cancel(); + _ro_timer.cancel(); _time_tracker.add_idle_time( fc::time_point::now() - _idle_trx_time ); // we are in write window, so no read-only trx threads are processing transactions. @@ -2938,8 +2934,8 @@ void producer_plugin_impl::switch_to_read_window() { } auto expire_time = boost::posix_time::microseconds(_ro_read_window_time_us.count()); - _ro_read_window_timer.expires_from_now( expire_time ); - _ro_read_window_timer.async_wait( app().executor().wrap( // stay on app thread + _ro_timer.expires_from_now( expire_time ); + _ro_timer.async_wait( app().executor().wrap( // stay on app thread priority::high, exec_queue::read_only_trx_safe, [weak_this = weak_from_this()]( const boost::system::error_code& ec ) { From 9e79e82dea51d7c803db889f79c88abac2ed8b1d Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 27 Mar 2023 10:51:11 -0500 Subject: [PATCH 3/3] GH-891 Integrate new received block into ro_trx_queue --- plugins/producer_plugin/producer_plugin.cpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index e614eb1cd9..9d062732cf 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -370,7 +370,7 @@ class producer_plugin_impl : public std::enable_shared_from_this _thread_pool; std::atomic _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool - std::atomic _received_block{false}; // modified by net_plugin thread pool + std::atomic _received_block{0}; // modified by net_plugin thread pool fc::microseconds _max_irreversible_block_age_us; int32_t _produce_time_offset_us = 0; int32_t _last_block_time_offset_us = 0; @@ -486,20 +486,20 @@ class producer_plugin_impl : public std::enable_shared_from_this* received_block, fc::time_point deadline) { + void set_exit_criteria(uint32_t num_tasks, std::atomic* received_block, uint32_t block_num, fc::time_point deadline) { std::lock_guard g( mtx ); // not strictly necessary with current usage from single thread assert(num_tasks > 0 && num_waiting == 0 && received_block != nullptr); - assert(received_block && *received_block == false); max_waiting = num_tasks; num_waiting = 0; received_block_ptr = received_block; + pending_block_num = block_num; read_window_deadline = deadline; exiting_read_window = false; } private: bool should_exit() { - return exiting_read_window || fc::time_point::now() >= read_window_deadline || *received_block_ptr; + return exiting_read_window || fc::time_point::now() >= read_window_deadline || (*received_block_ptr >= pending_block_num); } mutable std::mutex mtx; @@ -508,7 +508,8 @@ class producer_plugin_impl : public std::enable_shared_from_this* received_block_ptr{nullptr}; + std::atomic* received_block_ptr{nullptr}; + uint32_t pending_block_num{0}; fc::time_point read_window_deadline; }; @@ -2927,16 +2928,17 @@ void producer_plugin_impl::switch_to_read_window() { return; } + auto& chain = chain_plug->chain(); + uint32_t pending_block_num = chain.head_block_num() + 1; app().executor().set_to_read_window(); - chain_plug->chain().set_db_read_only_mode(); - _received_block = false; + chain.set_db_read_only_mode(); _ro_read_window_start_time = fc::time_point::now(); _ro_all_threads_exec_time_us = 0; // start a read-only transaction execution task in each thread in the thread pool _ro_num_active_trx_exec_tasks = _ro_thread_pool_size; auto start_time = fc::time_point::now(); - _ro_trx_queue.set_exit_criteria(_ro_thread_pool_size, &_received_block, start_time + _ro_read_window_effective_time_us); + _ro_trx_queue.set_exit_criteria(_ro_thread_pool_size, &_received_block, pending_block_num, start_time + _ro_read_window_effective_time_us); for (auto i = 0; i < _ro_thread_pool_size; ++i ) { _ro_trx_exec_tasks_fut.emplace_back( post_async_task( _ro_thread_pool.get_executor(), [this, start_time] () { return read_only_trx_execution_task(start_time);