Skip to content

Commit

Permalink
Merge pull request #946 from AntelopeIO/merge_read_only_trx_improve_t…
Browse files Browse the repository at this point in the history
…o_main

[4.0 -> main] Small improvements for parallelizing read-only transactions and tasks
  • Loading branch information
linh2931 authored Apr 2, 2023
2 parents 6f1e67b + 15e73d9 commit 4099678
Showing 1 changed file with 16 additions and 17 deletions.
33 changes: 16 additions & 17 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,11 +455,11 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin

private:
mutable std::mutex mtx;
std::deque<ro_trx_t> queue;
deque<ro_trx_t> queue; // boost deque which is faster than std::deque
};

uint16_t _ro_thread_pool_size{ 0 };
static constexpr uint16_t _ro_max_eos_vm_oc_threads_allowed{ 8 }; // Due to uncertainty to get total virtual memory size on a 5-level paging system, set a hard limit
uint32_t _ro_thread_pool_size{ 0 };
static constexpr uint32_t _ro_max_eos_vm_oc_threads_allowed{ 8 }; // Due to uncertainty to get total virtual memory size on a 5-level paging system, set a hard limit
named_thread_pool<struct read> _ro_thread_pool;
fc::microseconds _ro_write_window_time_us{ 200000 };
fc::microseconds _ro_read_window_time_us{ 60000 };
Expand All @@ -468,7 +468,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
std::atomic<int64_t> _ro_all_threads_exec_time_us; // total time spent by all threads executing transactions. use atomic for simplicity and performance
fc::time_point _ro_read_window_start_time;
fc::time_point _ro_window_deadline; // only modified on app thread, read-window deadline or write-window deadline
boost::asio::deadline_timer _ro_timer;
boost::asio::deadline_timer _ro_timer; // only accessible from the main thread
fc::microseconds _ro_max_trx_time_us{ 0 }; // calculated during option initialization
ro_trx_queue_t _ro_exhausted_trx_queue;
std::atomic<uint32_t> _ro_num_active_exec_tasks{ 0 };
Expand Down Expand Up @@ -928,12 +928,12 @@ void producer_plugin::set_program_options(
"Number of worker threads in producer thread pool")
("snapshots-dir", bpo::value<bfs::path>()->default_value("snapshots"),
"the location of the snapshots directory (absolute path or relative to application data dir)")
("read-only-threads", bpo::value<uint16_t>(),
("read-only-threads", bpo::value<uint32_t>(),
"Number of worker threads in read-only execution thread pool")
("read-only-write-window-time-us", bpo::value<uint32_t>()->default_value(my->_ro_write_window_time_us.count()),
"time in microseconds the write window lasts")
"Time in microseconds the write window lasts.")
("read-only-read-window-time-us", bpo::value<uint32_t>()->default_value(my->_ro_read_window_time_us.count()),
"time in microseconds the read window lasts")
"Time in microseconds the read window lasts.")
;
config_file_options.add(producer_options);
}
Expand Down Expand Up @@ -1110,7 +1110,7 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
}

if ( options.count( "read-only-threads" ) ) {
my->_ro_thread_pool_size = options.at( "read-only-threads" ).as<uint16_t>();
my->_ro_thread_pool_size = options.at( "read-only-threads" ).as<uint32_t>();
} else if ( my->_producers.empty() ) {
if( options.count( "plugin" ) ) {
const auto& v = options.at( "plugin" ).as<std::vector<std::string>>();
Expand Down Expand Up @@ -1145,15 +1145,14 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
}

EOS_ASSERT( vm_total_kb > 0, plugin_config_exception, "Unable to get system virtual memory size (not a Linux?), therefore cannot determine if the system has enough virtual memory for multi-threaded read-only transactions on EOS VM OC");
EOS_ASSERT( vm_total_kb > vm_used_kb, plugin_config_exception, "vm total (${t}) must be greater than vm used (${u})", ("t", vm_total_kb)("u", vm_used_kb));
uint32_t num_threads_supported = (vm_total_kb - vm_used_kb) / 4200000000;
// reserve 1 for the app thread, 1 for anything else which might use VM
int num_threads_supported = (vm_total_kb - vm_used_kb) / 4200000000 - 2;
ilog("vm total in kb: ${total}, vm used in kb: ${used}, number of EOS VM OC threads supported ((vm total - vm used)/4.2 TB - 2): ${supp}", ("total", vm_total_kb) ("used", vm_used_kb) ("supp", num_threads_supported));
EOS_ASSERT( num_threads_supported >= my->_ro_thread_pool_size, plugin_config_exception, "--read-only-threads (${th}) greater than number of threads supported for EOS VM OC (${supp}) by the system virtual memory size", ("th", my->_ro_thread_pool_size) ("supp", num_threads_supported) );

if ( my->_ro_thread_pool_size > my->_ro_max_eos_vm_oc_threads_allowed ) {
wlog("--read-only-threads (${th}) greater than maximum number of threads allowed (${allowed}) for EOS Vm OC. Set it to ${allowed}", ("th", my->_ro_thread_pool_size) ("allowed", my->_ro_max_eos_vm_oc_threads_allowed));
my->_ro_thread_pool_size = my->_ro_max_eos_vm_oc_threads_allowed;
}
EOS_ASSERT( num_threads_supported > 2, plugin_config_exception, "With the EOS VM OC configured, there is not enough system virtual memory to support the required minimum of 3 threads (1 for main thread, 1 for read-only, and 1 for anything else), vm total: ${t}, vm used: ${u}", ("t", vm_total_kb)("u", vm_used_kb));
num_threads_supported -= 2;
auto actual_threads_allowed = std::min(my->_ro_max_eos_vm_oc_threads_allowed, num_threads_supported);
ilog("vm total in kb: ${total}, vm used in kb: ${used}, number of EOS VM OC threads supported ((vm total - vm used)/4.2 TB - 2): ${supp}, max allowed: ${max}, actual allowed: ${actual}", ("total", vm_total_kb) ("used", vm_used_kb) ("supp", num_threads_supported) ("max", my->_ro_max_eos_vm_oc_threads_allowed)("actual", actual_threads_allowed));
EOS_ASSERT( my->_ro_thread_pool_size <= actual_threads_allowed, plugin_config_exception, "--read-only-threads (${th}) greater than number of threads allowed for EOS VM OC (${allowed})", ("th", my->_ro_thread_pool_size) ("allowed", actual_threads_allowed) );
}
#endif
}
Expand Down Expand Up @@ -2890,7 +2889,7 @@ void producer_plugin_impl::switch_to_read_window() {
// start a read-only execution task in each thread in the thread pool
_ro_num_active_exec_tasks = _ro_thread_pool_size;
_ro_exec_tasks_fut.resize(0);
for (auto i = 0; i < _ro_thread_pool_size; ++i ) {
for (uint32_t i = 0; i < _ro_thread_pool_size; ++i ) {
_ro_exec_tasks_fut.emplace_back( post_async_task( _ro_thread_pool.get_executor(), [self = this, pending_block_num] () {
return self->read_only_execution_task(pending_block_num);
}) );
Expand Down

0 comments on commit 4099678

Please sign in to comment.