Skip to content

Commit

Permalink
Merge pull request #962 from AntelopeIO/merge_oc_main_thread_fix
Browse files Browse the repository at this point in the history
[4.0 -> main] Use is_write_window in eos-vm-oc main to determine if it is safe to update code cache
  • Loading branch information
linh2931 authored Apr 4, 2023
2 parents 6f7a948 + b31889d commit 553b834
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 30 deletions.
27 changes: 27 additions & 0 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@ struct pending_state {
};

struct controller_impl {
enum class app_window_type {
write, // Only main thread is running; read-only threads are not running.
// All read-write and read-only tasks are sequentially executed.
read // Main thread and read-only threads are running read-ony tasks in parallel.
// Read-write tasks are not being executed.
};

// LLVM sets the new handler, we need to reset this to throw a bad_alloc exception so we can possibly exit cleanly
// and not just abort.
Expand Down Expand Up @@ -251,6 +257,7 @@ struct controller_impl {
// which overwrites the same static wasmif, is used for eosvmoc too.
wasm_interface wasmif; // used by main thread and all threads for EOSVMOC
thread_local static std::unique_ptr<wasm_interface> wasmif_thread_local; // a copy for each read-only thread, used by eos-vm and eos-vm-jit
app_window_type app_window = app_window_type::write;

typedef pair<scope_name,action_name> handler_key;
map< account_name, map<handler_key, apply_handler> > apply_handlers;
Expand Down Expand Up @@ -2694,6 +2701,16 @@ struct controller_impl {

bool is_on_main_thread() { return main_thread_id == std::this_thread::get_id(); };

void set_to_write_window() {
app_window = app_window_type::write;
}
void set_to_read_window() {
app_window = app_window_type::read;
}
bool is_write_window() const {
return app_window == app_window_type::write;
}

wasm_interface& get_wasm_interface() {
if ( is_on_main_thread()
#ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED
Expand Down Expand Up @@ -3687,6 +3704,16 @@ void controller::init_thread_local_data() {
my->init_thread_local_data();
}

void controller::set_to_write_window() {
my->set_to_write_window();
}
void controller::set_to_read_window() {
my->set_to_read_window();
}
bool controller::is_write_window() const {
return my->is_write_window();
}

/// Protocol feature activation handlers:

template<>
Expand Down
3 changes: 3 additions & 0 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ namespace eosio { namespace chain {
void set_db_read_only_mode();
void unset_db_read_only_mode();
void init_thread_local_data();
void set_to_write_window();
void set_to_read_window();
bool is_write_window() const;

private:
friend class apply_context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ class code_cache_base {

template <typename T>
void serialize_cache_index(fc::datastream<T>& ds);

std::thread::id _main_thread_id;
bool is_main_thread() const;
};

class code_cache_async : public code_cache_base {
Expand All @@ -98,7 +95,7 @@ class code_cache_async : public code_cache_base {
//If code is in cache: returns pointer & bumps to front of MRU list
//If code is not in cache, and not blacklisted, and not currently compiling: return nullptr and kick off compile
//otherwise: return nullptr
const code_descriptor* const get_descriptor_for_code(const digest_type& code_id, const uint8_t& vm_version);
const code_descriptor* const get_descriptor_for_code(const digest_type& code_id, const uint8_t& vm_version, bool is_write_window);

private:
std::thread _monitor_reply_thread;
Expand All @@ -115,7 +112,7 @@ class code_cache_sync : public code_cache_base {
~code_cache_sync();

//Can still fail and return nullptr if, for example, there is an expected instantiation failure
const code_descriptor* const get_descriptor_for_code_sync(const digest_type& code_id, const uint8_t& vm_version);
const code_descriptor* const get_descriptor_for_code_sync(const digest_type& code_id, const uint8_t& vm_version, bool is_write_window);
};

}}}
2 changes: 1 addition & 1 deletion libraries/chain/wasm_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ namespace eosio { namespace chain {
if(my->eosvmoc) {
const chain::eosvmoc::code_descriptor* cd = nullptr;
try {
cd = my->eosvmoc->cc.get_descriptor_for_code(code_hash, vm_version);
cd = my->eosvmoc->cc.get_descriptor_for_code(code_hash, vm_version, context.control.is_write_window());
}
catch(...) {
//swallow errors here, if EOS VM OC has gone in to the weeds we shouldn't bail: continue to try and run baseline
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/webassembly/runtimes/eos-vm-oc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class eosvmoc_instantiated_module : public wasm_instantiated_module_interface {
bool is_main_thread() { return _main_thread_id == std::this_thread::get_id(); };

void apply(apply_context& context) override {
const code_descriptor* const cd = _eosvmoc_runtime.cc.get_descriptor_for_code_sync(_code_hash, _vm_version);
const code_descriptor* const cd = _eosvmoc_runtime.cc.get_descriptor_for_code_sync(_code_hash, _vm_version, context.control.is_write_window());
EOS_ASSERT(cd, wasm_execution_error, "EOS VM OC instantiation failed");

if ( is_main_thread() )
Expand Down
24 changes: 10 additions & 14 deletions libraries/chain/webassembly/runtimes/eos-vm-oc/code_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ std::tuple<size_t, size_t> code_cache_async::consume_compile_thread_queue() {
}


const code_descriptor* const code_cache_async::get_descriptor_for_code(const digest_type& code_id, const uint8_t& vm_version) {
const code_descriptor* const code_cache_async::get_descriptor_for_code(const digest_type& code_id, const uint8_t& vm_version, bool is_write_window) {
//if there are any outstanding compiles, process the result queue now
//do this only on main thread (which is in single threaded write window)
if(is_main_thread() && _outstanding_compiles_and_poison.size()) {
//When app is in write window, all tasks are running sequentially and read-only threads
//are not running. Safe to update cache entries.
if(is_write_window && _outstanding_compiles_and_poison.size()) {
auto [count_processed, bytes_remaining] = consume_compile_thread_queue();

if(count_processed)
Expand All @@ -136,11 +137,11 @@ const code_descriptor* const code_cache_async::get_descriptor_for_code(const dig
//check for entry in cache
code_cache_index::index<by_hash>::type::iterator it = _cache_index.get<by_hash>().find(boost::make_tuple(code_id, vm_version));
if(it != _cache_index.get<by_hash>().end()) {
if (is_main_thread())
if (is_write_window)
_cache_index.relocate(_cache_index.begin(), _cache_index.project<0>(it));
return &*it;
}
if(!is_main_thread()) // on read-only thread
if(!is_write_window)
return nullptr;

const code_tuple ct = code_tuple{code_id, vm_version};
Expand Down Expand Up @@ -179,15 +180,15 @@ code_cache_sync::~code_cache_sync() {
elog("unexpected response from EOS VM OC compile monitor during shutdown");
}

const code_descriptor* const code_cache_sync::get_descriptor_for_code_sync(const digest_type& code_id, const uint8_t& vm_version) {
const code_descriptor* const code_cache_sync::get_descriptor_for_code_sync(const digest_type& code_id, const uint8_t& vm_version, bool is_write_window) {
//check for entry in cache
code_cache_index::index<by_hash>::type::iterator it = _cache_index.get<by_hash>().find(boost::make_tuple(code_id, vm_version));
if(it != _cache_index.get<by_hash>().end()) {
if (is_main_thread())
if (is_write_window)
_cache_index.relocate(_cache_index.begin(), _cache_index.project<0>(it));
return &*it;
}
if(!is_main_thread())
if(!is_write_window)
return nullptr;

const code_object* const codeobject = _db.find<code_object,by_code_hash>(boost::make_tuple(code_id, 0, vm_version));
Expand All @@ -212,8 +213,7 @@ const code_descriptor* const code_cache_sync::get_descriptor_for_code_sync(const

code_cache_base::code_cache_base(const boost::filesystem::path data_dir, const eosvmoc::config& eosvmoc_config, const chainbase::database& db) :
_db(db),
_cache_file_path(data_dir/"code_cache.bin"),
_main_thread_id(std::this_thread::get_id())
_cache_file_path(data_dir/"code_cache.bin")
{
static_assert(sizeof(allocator_t) <= header_offset, "header offset intersects with allocator");

Expand Down Expand Up @@ -390,8 +390,4 @@ void code_cache_base::check_eviction_threshold(size_t free_bytes) {
if(free_bytes < _free_bytes_eviction_threshold)
run_eviction_round();
}

bool code_cache_base::is_main_thread() const {
return _main_thread_id == std::this_thread::get_id();
}
}}}
21 changes: 12 additions & 9 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2309,7 +2309,7 @@ producer_plugin_impl::handle_push_result( const transaction_metadata_ptr& trx,
// Dry-run trxs only run in write window. Read-only trxs can run in
// both write and read windows; time spent in read window is counted
// by read window summary.
if ( app().executor().is_write_window() ) {
if ( chain.is_write_window() ) {
auto dur = end - start;
_time_tracker.add_fail_time(dur, trx->is_transient());
}
Expand Down Expand Up @@ -2356,7 +2356,7 @@ producer_plugin_impl::handle_push_result( const transaction_metadata_ptr& trx,
// Dry-run trxs only run in write window. Read-only trxs can run in
// both write and read windows; time spent in read window is counted
// by read window summary.
if ( app().executor().is_write_window() ) {
if ( chain.is_write_window() ) {
auto dur = end - start;
_time_tracker.add_success_time(dur, trx->is_transient());
}
Expand Down Expand Up @@ -2833,9 +2833,11 @@ void producer_plugin_impl::switch_to_write_window() {
("t", _ro_all_threads_exec_time_us.load()));
}

chain::controller& chain = chain_plug->chain();

// this method can be called from multiple places. it is possible
// we are already in write window.
if ( app().executor().is_write_window() ) {
if ( chain.is_write_window() ) {
return;
}

Expand All @@ -2850,6 +2852,7 @@ void producer_plugin_impl::start_write_window() {
chain::controller& chain = chain_plug->chain();

app().executor().set_to_write_window();
chain.set_to_write_window();
chain.unset_db_read_only_mode();
_idle_trx_time = _ro_window_deadline = fc::time_point::now();

Expand All @@ -2869,7 +2872,8 @@ void producer_plugin_impl::start_write_window() {

// Called only from app thread
void producer_plugin_impl::switch_to_read_window() {
EOS_ASSERT(app().executor().is_write_window(), producer_exception, "expected to be in write window");
chain::controller& chain = chain_plug->chain();
EOS_ASSERT(chain.is_write_window(), producer_exception, "expected to be in write window");
EOS_ASSERT( _ro_num_active_exec_tasks.load() == 0 && _ro_exec_tasks_fut.empty(), producer_exception, "_ro_exec_tasks_fut expected to be empty" );

_time_tracker.add_idle_time( fc::time_point::now() - _idle_trx_time );
Expand All @@ -2880,15 +2884,14 @@ void producer_plugin_impl::switch_to_read_window() {
return;
}


auto& chain = chain_plug->chain();
uint32_t pending_block_num = chain.head_block_num() + 1;
_ro_read_window_start_time = fc::time_point::now();
_ro_window_deadline = _ro_read_window_start_time + _ro_read_window_effective_time_us;
app().executor().set_to_read_window(_ro_thread_pool_size,
[received_block=&_received_block, pending_block_num, ro_window_deadline=_ro_window_deadline]() {
return fc::time_point::now() >= ro_window_deadline || (received_block->load() >= pending_block_num); // should_exit()
});
chain.set_to_read_window();
chain.set_db_read_only_mode();
_ro_all_threads_exec_time_us = 0;

Expand Down Expand Up @@ -2988,11 +2991,11 @@ bool producer_plugin_impl::push_read_only_transaction(transaction_metadata_ptr t
// When executing a read-only trx on the main thread while in the write window,
// need to switch db mode to read only.
auto db_read_only_mode_guard = fc::make_scoped_exit([&]{
if( app().executor().is_write_window() )
if( chain.is_write_window() )
chain.unset_db_read_only_mode();
});

if ( app().executor().is_write_window() ) {
if ( chain.is_write_window() ) {
chain.set_db_read_only_mode();
auto idle_time = fc::time_point::now() - _idle_trx_time;
_time_tracker.add_idle_time( idle_time );
Expand All @@ -3012,7 +3015,7 @@ bool producer_plugin_impl::push_read_only_transaction(transaction_metadata_ptr t
_ro_exhausted_trx_queue.push_front( {std::move(trx), std::move(next)} );
}

if ( app().executor().is_write_window() ) {
if ( chain.is_write_window() ) {
_idle_trx_time = fc::time_point::now();
}
} catch ( const guard_exception& e ) {
Expand Down

0 comments on commit 553b834

Please sign in to comment.