Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.0 -> main] Use block_num for interrupt of start_block & Use single timer for read and write windows #910

Merged
merged 9 commits into from
Mar 27, 2023
11 changes: 6 additions & 5 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3225,11 +3225,12 @@ namespace eosio {
return;
}

bool valid_block_header = !!bsp;

if( valid_block_header ) {
uint32_t block_num = bsp ? bsp->block_num : 0;

if( block_num != 0 ) {
fc_dlog( logger, "validated block header, broadcasting immediately, connection ${cid}, blk num = ${num}, id = ${id}",
("cid", cid)("num", bsp->block_num)("id", bsp->id) );
("cid", cid)("num", block_num)("id", bsp->id) );
my_impl->dispatcher->add_peer_block( bsp->id, cid ); // no need to send back to sender
my_impl->dispatcher->bcast_block( bsp->block, bsp->id );
}
Expand All @@ -3238,9 +3239,9 @@ namespace eosio {
c->process_signed_block( id, std::move(ptr), std::move(bsp) );
});

if( valid_block_header ) {
if( block_num != 0 ) {
// ready to process immediately, so signal producer to interrupt start_block
my_impl->producer_plug->received_block();
my_impl->producer_plug->received_block(block_num);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class producer_plugin : public appbase::plugin<producer_plugin> {
void register_metrics_listener(metrics_listener listener);

// thread-safe, called when a new block is received
void received_block();
void received_block(uint32_t block_num);

const std::set<account_name>& producer_accounts() const;

Expand Down
74 changes: 39 additions & 35 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
producer_plugin_impl(boost::asio::io_service& io)
:_timer(io)
,_transaction_ack_channel(app().get_channel<compat::channels::transaction_ack>())
,_ro_write_window_timer(io)
,_ro_read_window_timer(io)
,_ro_timer(io)
{
}

Expand Down Expand Up @@ -370,7 +369,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
named_thread_pool<struct prod> _thread_pool;

std::atomic<int32_t> _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool
std::atomic<bool> _received_block{false}; // modified by net_plugin thread pool and app thread
std::atomic<uint32_t> _received_block{0}; // modified by net_plugin thread pool
fc::microseconds _max_irreversible_block_age_us;
int32_t _produce_time_offset_us = 0;
int32_t _last_block_time_offset_us = 0;
Expand Down Expand Up @@ -486,20 +485,20 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// - all threads would be idle
// - or the net_plugin received a block.
// - or we have reached the read_window_deadline
void set_exit_criteria(uint32_t num_tasks, std::atomic<bool>* received_block, fc::time_point deadline) {
void set_exit_criteria(uint32_t num_tasks, std::atomic<uint32_t>* received_block, uint32_t block_num, fc::time_point deadline) {
std::lock_guard<std::mutex> g( mtx ); // not strictly necessary with current usage from single thread
assert(num_tasks > 0 && num_waiting == 0 && received_block != nullptr);
assert(received_block && *received_block == false);
max_waiting = num_tasks;
num_waiting = 0;
received_block_ptr = received_block;
pending_block_num = block_num;
read_window_deadline = deadline;
exiting_read_window = false;
}

private:
bool should_exit() {
return exiting_read_window || fc::time_point::now() >= read_window_deadline || *received_block_ptr;
return exiting_read_window || fc::time_point::now() >= read_window_deadline || (*received_block_ptr >= pending_block_num);
}

mutable std::mutex mtx;
Expand All @@ -508,7 +507,8 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
uint32_t num_waiting{0};
uint32_t max_waiting{0};
bool exiting_read_window{false};
std::atomic<bool>* received_block_ptr{nullptr};
std::atomic<uint32_t>* received_block_ptr{nullptr};
uint32_t pending_block_num{0};
fc::time_point read_window_deadline;
};

Expand All @@ -521,8 +521,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
fc::microseconds _ro_read_window_effective_time_us{ 0 }; // calculated during option initialization
std::atomic<int64_t> _ro_all_threads_exec_time_us; // total time spent by all threads executing transactions. use atomic for simplicity and performance
fc::time_point _ro_read_window_start_time;
boost::asio::deadline_timer _ro_write_window_timer;
boost::asio::deadline_timer _ro_read_window_timer;
boost::asio::deadline_timer _ro_timer;
fc::microseconds _ro_max_trx_time_us{ 0 }; // calculated during option initialization
ro_trx_queue_t _ro_trx_queue;
std::atomic<uint32_t> _ro_num_active_trx_exec_tasks{ 0 };
Expand Down Expand Up @@ -881,7 +880,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
exhausted
};

inline bool should_interrupt_start_block( const fc::time_point& deadline ) const;
inline bool should_interrupt_start_block( const fc::time_point& deadline, uint32_t pending_block_num ) const;
start_block_result start_block();

fc::time_point calculate_pending_block_time() const;
Expand Down Expand Up @@ -1870,12 +1869,12 @@ fc::time_point producer_plugin_impl::calculate_block_deadline( const fc::time_po
}
}

bool producer_plugin_impl::should_interrupt_start_block( const fc::time_point& deadline ) const {
bool producer_plugin_impl::should_interrupt_start_block( const fc::time_point& deadline, uint32_t pending_block_num ) const {
if( _pending_block_mode == pending_block_mode::producing ) {
return deadline <= fc::time_point::now();
}
// if we can produce then honor deadline so production starts on time
return (!_producers.empty() && deadline <= fc::time_point::now()) || _received_block;
return (!_producers.empty() && deadline <= fc::time_point::now()) || (_received_block >= pending_block_num);
}

producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
Expand All @@ -1895,6 +1894,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {

const fc::time_point now = fc::time_point::now();
const fc::time_point block_time = calculate_pending_block_time();
const uint32_t pending_block_num = hbs->block_num + 1;
const fc::time_point preprocess_deadline = calculate_block_deadline(block_time);

const pending_block_mode previous_pending_mode = _pending_block_mode;
Expand Down Expand Up @@ -1960,7 +1960,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
if (_pending_block_mode == pending_block_mode::producing) {
const auto start_block_time = block_time - fc::microseconds( config::block_interval_us );
if( now < start_block_time ) {
fc_dlog(_log, "Not producing block waiting for production window ${n} ${bt}", ("n", hbs->block_num + 1)("bt", block_time) );
fc_dlog(_log, "Not producing block waiting for production window ${n} ${bt}", ("n", pending_block_num)("bt", block_time) );
// start_block_time instead of block_time because schedule_delayed_production_loop calculates next block time from given time
schedule_delayed_production_loop(weak_from_this(), calculate_producer_wake_up_time(start_block_time));
return start_block_result::waiting_for_production;
Expand All @@ -1974,7 +1974,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
}

fc_dlog(_log, "Starting block #${n} at ${time} producer ${p}",
("n", hbs->block_num + 1)("time", now)("p", scheduled_producer.producer_name));
("n", pending_block_num)("time", now)("p", scheduled_producer.producer_name));

try {
uint16_t blocks_to_confirm = 0;
Expand Down Expand Up @@ -2038,7 +2038,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
std::swap( features_to_activate, protocol_features_to_activate );
_protocol_features_signaled = true;
ilog( "signaling activation of the following protocol features in block ${num}: ${features_to_activate}",
("num", hbs->block_num + 1)("features_to_activate", features_to_activate) );
("num", pending_block_num)("features_to_activate", features_to_activate) );
}
}

Expand All @@ -2064,7 +2064,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {
if( !remove_expired_blacklisted_trxs( preprocess_deadline ) )
return start_block_result::exhausted;
if( !_subjective_billing.remove_expired( _log, chain.pending_block_time(), fc::time_point::now(),
[&](){ return should_interrupt_start_block( preprocess_deadline ); } ) ) {
[&](){ return should_interrupt_start_block( preprocess_deadline, pending_block_num ); } ) ) {
return start_block_result::exhausted;
}

Expand All @@ -2089,7 +2089,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {

if( app().is_quiting() ) // db guard exception above in LOG_AND_DROP could have called app().quit()
return start_block_result::failed;
if ( should_interrupt_start_block( preprocess_deadline ) || block_is_exhausted() ) {
if ( should_interrupt_start_block( preprocess_deadline, pending_block_num ) || block_is_exhausted() ) {
return start_block_result::exhausted;
}

Expand All @@ -2116,11 +2116,12 @@ bool producer_plugin_impl::remove_expired_trxs( const fc::time_point& deadline )
{
chain::controller& chain = chain_plug->chain();
auto pending_block_time = chain.pending_block_time();
auto pending_block_num = chain.pending_block_num();

// remove all expired transactions
size_t num_expired = 0;
size_t orig_count = _unapplied_transactions.size();
bool exhausted = !_unapplied_transactions.clear_expired( pending_block_time, [&](){ return should_interrupt_start_block(deadline); },
bool exhausted = !_unapplied_transactions.clear_expired( pending_block_time, [&](){ return should_interrupt_start_block(deadline, pending_block_num); },
[&num_expired]( const packed_transaction_ptr& packed_trx_ptr, trx_enum_type trx_type ) {
// expired exception is logged as part of next() call
++num_expired;
Expand All @@ -2144,12 +2145,13 @@ bool producer_plugin_impl::remove_expired_blacklisted_trxs( const fc::time_point
if(!blacklist_by_expiry.empty()) {
const chain::controller& chain = chain_plug->chain();
const auto lib_time = chain.last_irreversible_block_time();
const auto pending_block_num = chain.pending_block_num();

int num_expired = 0;
int orig_count = _blacklisted_transactions.size();

while (!blacklist_by_expiry.empty() && blacklist_by_expiry.begin()->expiry <= lib_time) {
if ( should_interrupt_start_block( deadline ) ) {
if ( should_interrupt_start_block( deadline, pending_block_num ) ) {
exhausted = true;
break;
}
Expand Down Expand Up @@ -2416,12 +2418,14 @@ bool producer_plugin_impl::process_unapplied_trxs( const fc::time_point& deadlin
{
bool exhausted = false;
if( !_unapplied_transactions.empty() ) {
const chain::controller& chain = chain_plug->chain();
const auto pending_block_num = chain.pending_block_num();
int num_applied = 0, num_failed = 0, num_processed = 0;
auto unapplied_trxs_size = _unapplied_transactions.size();
auto itr = _unapplied_transactions.unapplied_begin();
auto end_itr = _unapplied_transactions.unapplied_end();
while( itr != end_itr ) {
if( should_interrupt_start_block( deadline ) ) {
if( should_interrupt_start_block( deadline, pending_block_num ) ) {
exhausted = true;
break;
}
Expand Down Expand Up @@ -2596,8 +2600,10 @@ bool producer_plugin_impl::process_incoming_trxs( const fc::time_point& deadline
if( itr != end ) {
size_t processed = 0;
fc_dlog( _log, "Processing ${n} pending transactions", ("n", _unapplied_transactions.incoming_size()) );
const chain::controller& chain = chain_plug->chain();
const auto pending_block_num = chain.pending_block_num();
while( itr != end ) {
if ( should_interrupt_start_block( deadline ) ) {
if ( should_interrupt_start_block( deadline, pending_block_num ) ) {
exhausted = true;
break;
}
Expand Down Expand Up @@ -2640,7 +2646,6 @@ bool producer_plugin_impl::block_is_exhausted() const {
// -> Idle
// --> Start block B (block time y.000) at time x.500
void producer_plugin_impl::schedule_production_loop() {
_received_block = false;
_timer.cancel();

auto result = start_block();
Expand Down Expand Up @@ -2848,8 +2853,8 @@ void producer_plugin_impl::produce_block() {
("confs", new_bs->header.confirmed));
}

void producer_plugin::received_block() {
my->_received_block = true;
void producer_plugin::received_block(uint32_t block_num) {
my->_received_block = block_num;
}

void producer_plugin::log_failed_transaction(const transaction_id_type& trx_id, const packed_transaction_ptr& packed_trx_ptr, const char* reason) const {
Expand Down Expand Up @@ -2878,8 +2883,7 @@ void producer_plugin_impl::switch_to_write_window() {
}

EOS_ASSERT(_ro_num_active_trx_exec_tasks.load() == 0 && _ro_trx_exec_tasks_fut.empty(), producer_exception, "no read-only tasks should be running before switching to write window");
_ro_read_window_timer.cancel();
_ro_write_window_timer.cancel();
_ro_timer.cancel();

start_write_window();
}
Expand All @@ -2893,8 +2897,8 @@ void producer_plugin_impl::start_write_window() {
_idle_trx_time = fc::time_point::now();

auto expire_time = boost::posix_time::microseconds(_ro_write_window_time_us.count());
_ro_write_window_timer.expires_from_now( expire_time );
_ro_write_window_timer.async_wait( app().executor().wrap( // stay on app thread
_ro_timer.expires_from_now( expire_time );
_ro_timer.async_wait( app().executor().wrap( // stay on app thread
priority::high,
exec_queue::read_only_trx_safe, // placed in read_only_trx_safe queue so it is ensured to be executed in either window
[weak_this = weak_from_this()]( const boost::system::error_code& ec ) {
Expand All @@ -2910,8 +2914,7 @@ void producer_plugin_impl::switch_to_read_window() {
EOS_ASSERT(app().executor().is_write_window(), producer_exception, "expected to be in write window");
EOS_ASSERT(_ro_num_active_trx_exec_tasks.load() == 0 && _ro_trx_exec_tasks_fut.empty(), producer_exception, "_ro_trx_exec_tasks_fut expected to be empty" );

_ro_write_window_timer.cancel();
_ro_read_window_timer.cancel();
_ro_timer.cancel();
_time_tracker.add_idle_time( fc::time_point::now() - _idle_trx_time );

// we are in write window, so no read-only trx threads are processing transactions.
Expand All @@ -2921,25 +2924,26 @@ void producer_plugin_impl::switch_to_read_window() {
return;
}

auto& chain = chain_plug->chain();
uint32_t pending_block_num = chain.head_block_num() + 1;
app().executor().set_to_read_window();
chain_plug->chain().set_db_read_only_mode();
_received_block = false;
chain.set_db_read_only_mode();
_ro_read_window_start_time = fc::time_point::now();
_ro_all_threads_exec_time_us = 0;

// start a read-only transaction execution task in each thread in the thread pool
_ro_num_active_trx_exec_tasks = _ro_thread_pool_size;
auto start_time = fc::time_point::now();
_ro_trx_queue.set_exit_criteria(_ro_thread_pool_size, &_received_block, start_time + _ro_read_window_effective_time_us);
_ro_trx_queue.set_exit_criteria(_ro_thread_pool_size, &_received_block, pending_block_num, start_time + _ro_read_window_effective_time_us);
for (auto i = 0; i < _ro_thread_pool_size; ++i ) {
_ro_trx_exec_tasks_fut.emplace_back( post_async_task( _ro_thread_pool.get_executor(), [this, start_time] () {
return read_only_trx_execution_task(start_time);
}) );
}

auto expire_time = boost::posix_time::microseconds(_ro_read_window_time_us.count());
_ro_read_window_timer.expires_from_now( expire_time );
_ro_read_window_timer.async_wait( app().executor().wrap( // stay on app thread
_ro_timer.expires_from_now( expire_time );
_ro_timer.async_wait( app().executor().wrap( // stay on app thread
priority::high,
exec_queue::read_only_trx_safe,
[weak_this = weak_from_this()]( const boost::system::error_code& ec ) {
Expand Down