diff --git a/CMakeLists.txt b/CMakeLists.txt index 5f341243f0..bef15866a8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -109,7 +109,7 @@ set(Boost_USE_MULTITHREADED ON) set( Boost_USE_STATIC_LIBS ON CACHE STRING "ON or OFF" ) # Most boost deps get implictly picked up via fc, as just about everything links to fc. In addition we pick up # the pthread dependency through fc. -find_package(Boost 1.67 REQUIRED COMPONENTS program_options unit_test_framework system) +find_package(Boost 1.70 REQUIRED COMPONENTS program_options unit_test_framework system) if( APPLE AND UNIX ) # Apple Specific Options Here diff --git a/README.md b/README.md index a04f411d71..71582a1ce4 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ You will need to build on a [supported operating system](#supported-operating-sy Requirements to build: - C++17 compiler and standard library -- boost 1.67+ +- boost 1.70+ - CMake 3.16+ - LLVM 7 - 11 - for Linux only - newer versions do not work diff --git a/docs/01_nodeos/03_plugins/chain_plugin/index.md b/docs/01_nodeos/03_plugins/chain_plugin/index.md index 8899c086ec..c6fa5dd9dc 100644 --- a/docs/01_nodeos/03_plugins/chain_plugin/index.md +++ b/docs/01_nodeos/03_plugins/chain_plugin/index.md @@ -120,7 +120,7 @@ Config Options for eosio::chain_plugin: applied to them (may specify multiple times) --read-mode arg (=head) Database read mode ("head", - "irreversible"). + "irreversible", "speculative"). In "head" mode: database contains state changes up to the head block; transactions received by the node are @@ -131,7 +131,14 @@ Config Options for eosio::chain_plugin: received via the P2P network are not relayed and transactions cannot be pushed via the chain API. - + In "speculative" mode: (DEPRECATED: + head mode recommended) database + contains state changes by transactions + in the blockchain up to the head block + as well as some transactions not yet + included in the blockchain; + transactions received by the node are + relayed if valid. --api-accept-transactions arg (=1) Allow API transactions to be evaluated and relayed if valid. --validation-mode arg (=full) Chain validation mode ("full" or diff --git a/docs/01_nodeos/07_concepts/05_storage-and-read-modes.md b/docs/01_nodeos/07_concepts/05_storage-and-read-modes.md index f5292e253d..b8b1f48c47 100644 --- a/docs/01_nodeos/07_concepts/05_storage-and-read-modes.md +++ b/docs/01_nodeos/07_concepts/05_storage-and-read-modes.md @@ -29,6 +29,7 @@ The `nodeos` service can be run in different "read" modes. These modes control h - `head`: this only includes the side effects of confirmed transactions, this mode processes unconfirmed transactions but does not include them. - `irreversible`: this mode also includes confirmed transactions only up to those included in the last irreversible block. +- `speculative`: this includes the side effects of confirmed and unconfirmed transactions. A transaction is considered confirmed when a `nodeos` instance has received, processed, and written it to a block on the blockchain, i.e. it is in the head block or an earlier block. @@ -44,6 +45,16 @@ When `nodeos` is configured to be in irreversible read mode, it will still track Clients such as `cleos` and the RPC API will see database state as of the current head block of the chain. It **will not** include changes made by transactions known to this node but not included in the chain, such as unconfirmed transactions. +### Speculative Mode ( Deprecated ) + +Clients such as `cleos` and the RPC API, will see database state as of the current head block plus changes made by all transactions known to this node but potentially not included in the chain, unconfirmed transactions for example. + +Speculative mode is low latency but fragile, there is no guarantee that the transactions reflected in the state will be included in the chain OR that they will reflected in the same order the state implies. + +This mode features the lowest latency, but is the least consistent. + +In speculative mode `nodeos` is able to execute transactions which have TaPoS (Transaction as Proof of Stake) pointing to any valid block in a fork considered to be the best fork by this node. + ## How To Specify the Read Mode The mode in which `nodeos` is run can be specified using the `--read-mode` option from the `eosio::chain_plugin`. diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index ec0293df06..5d2947b6c7 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -270,9 +270,7 @@ struct controller_impl { prev = fork_db.root(); } - if ( read_mode == db_read_mode::HEAD ) { - EOS_ASSERT( head->block, block_validate_exception, "attempting to pop a block that was sparsely loaded from a snapshot"); - } + EOS_ASSERT( head->block, block_validate_exception, "attempting to pop a block that was sparsely loaded from a snapshot"); head = prev; @@ -1635,7 +1633,7 @@ struct controller_impl { if ( trx->is_transient() ) { // remove trx from pending block by not canceling 'restore' trx_context.undo(); // this will happen automatically in destructor, but make it more explicit - } else if ( pending->_block_status == controller::block_status::ephemeral ) { + } else if ( read_mode != db_read_mode::SPECULATIVE && pending->_block_status == controller::block_status::ephemeral ) { // An ephemeral block will never become a full block, but on a producer node the trxs should be saved // in the un-applied transaction queue for execution during block production. For a non-producer node // save the trxs in the un-applied transaction queue for use during block validation to skip signature diff --git a/libraries/chain/include/eosio/chain/controller.hpp b/libraries/chain/include/eosio/chain/controller.hpp index 49ea0e72d5..6fda34846c 100644 --- a/libraries/chain/include/eosio/chain/controller.hpp +++ b/libraries/chain/include/eosio/chain/controller.hpp @@ -49,7 +49,8 @@ namespace eosio { namespace chain { enum class db_read_mode { HEAD, - IRREVERSIBLE + IRREVERSIBLE, + SPECULATIVE }; enum class validation_mode { diff --git a/libraries/chain/include/eosio/chain/exceptions.hpp b/libraries/chain/include/eosio/chain/exceptions.hpp index b960f3c424..9dd47d879a 100644 --- a/libraries/chain/include/eosio/chain/exceptions.hpp +++ b/libraries/chain/include/eosio/chain/exceptions.hpp @@ -381,6 +381,10 @@ namespace eosio { namespace chain { 3080007, "Transaction exceeded the current greylisted account network usage limit" ) FC_DECLARE_DERIVED_EXCEPTION( greylist_cpu_usage_exceeded, resource_exhausted_exception, 3080008, "Transaction exceeded the current greylisted account CPU usage limit" ) + FC_DECLARE_DERIVED_EXCEPTION( ro_trx_vm_oc_compile_temporary_failure, resource_exhausted_exception, + 3080009, "Read-only transaction eos-vm-oc compile temporary failure" ) + FC_DECLARE_DERIVED_EXCEPTION( ro_trx_vm_oc_compile_permanent_failure, resource_exhausted_exception, + 3080010, "Read-only transaction eos-vm-oc compile permanent failure" ) FC_DECLARE_DERIVED_EXCEPTION( leeway_deadline_exception, deadline_exception, 3081001, "Transaction reached the deadline set due to leeway on account CPU limits" ) diff --git a/libraries/chain/include/eosio/chain/wasm_interface.hpp b/libraries/chain/include/eosio/chain/wasm_interface.hpp index 5a6f472cbc..a5660a8211 100644 --- a/libraries/chain/include/eosio/chain/wasm_interface.hpp +++ b/libraries/chain/include/eosio/chain/wasm_interface.hpp @@ -61,9 +61,6 @@ namespace eosio { namespace chain { //Calls apply or error on a given code void apply(const digest_type& code_hash, const uint8_t& vm_type, const uint8_t& vm_version, apply_context& context); - //Immediately exits currently running wasm. UB is called when no wasm running - void exit(); - //Returns true if the code is cached bool is_code_cached(const digest_type& code_hash, const uint8_t& vm_type, const uint8_t& vm_version) const; diff --git a/libraries/chain/include/eosio/chain/webassembly/eos-vm-oc.hpp b/libraries/chain/include/eosio/chain/webassembly/eos-vm-oc.hpp index 0799a67b4e..74f7868b68 100644 --- a/libraries/chain/include/eosio/chain/webassembly/eos-vm-oc.hpp +++ b/libraries/chain/include/eosio/chain/webassembly/eos-vm-oc.hpp @@ -32,7 +32,6 @@ class eosvmoc_runtime : public eosio::chain::wasm_runtime_interface { std::unique_ptr instantiate_module(const char* code_bytes, size_t code_size, const digest_type& code_hash, const uint8_t& vm_type, const uint8_t& vm_version) override; - void immediately_exit_currently_running_module() override; void init_thread_local_data() override; friend eosvmoc_instantiated_module; diff --git a/libraries/chain/include/eosio/chain/webassembly/eos-vm-oc/code_cache.hpp b/libraries/chain/include/eosio/chain/webassembly/eos-vm-oc/code_cache.hpp index ef48ff14e2..2c9f7913b8 100644 --- a/libraries/chain/include/eosio/chain/webassembly/eos-vm-oc/code_cache.hpp +++ b/libraries/chain/include/eosio/chain/webassembly/eos-vm-oc/code_cache.hpp @@ -37,6 +37,7 @@ using allocator_t = bip::rbtree_best_fit instantiate_module(const char* code_bytes, size_t code_size, const digest_type& code_hash, const uint8_t& vm_type, const uint8_t& vm_version) override; - void immediately_exit_currently_running_module() override; - private: // todo: managing this will get more complicated with sync calls; - // immediately_exit_currently_running_module() should probably - // move from wasm_runtime_interface to wasm_instantiated_module_interface. eos_vm_backend_t* _bkend = nullptr; // non owning pointer to allow for immediate exit template @@ -63,8 +59,6 @@ class eos_vm_profile_runtime : public eosio::chain::wasm_runtime_interface { eos_vm_profile_runtime(); std::unique_ptr instantiate_module(const char* code_bytes, size_t code_size, const digest_type& code_hash, const uint8_t& vm_type, const uint8_t& vm_version) override; - - void immediately_exit_currently_running_module() override; }; }}}}// eosio::chain::webassembly::eos_vm_runtime diff --git a/libraries/chain/include/eosio/chain/webassembly/runtime_interface.hpp b/libraries/chain/include/eosio/chain/webassembly/runtime_interface.hpp index 1c999f0bcd..09df1e023a 100644 --- a/libraries/chain/include/eosio/chain/webassembly/runtime_interface.hpp +++ b/libraries/chain/include/eosio/chain/webassembly/runtime_interface.hpp @@ -23,9 +23,6 @@ class wasm_runtime_interface { virtual std::unique_ptr instantiate_module(const char* code_bytes, size_t code_size, const digest_type& code_hash, const uint8_t& vm_type, const uint8_t& vm_version) = 0; - //immediately exit the currently running wasm_instantiated_module_interface. Yep, this assumes only one can possibly run at a time. - virtual void immediately_exit_currently_running_module() = 0; - virtual ~wasm_runtime_interface(); // eosvmoc_runtime needs this diff --git a/libraries/chain/wasm_interface.cpp b/libraries/chain/wasm_interface.cpp index 481d534de8..d3a7898c3d 100644 --- a/libraries/chain/wasm_interface.cpp +++ b/libraries/chain/wasm_interface.cpp @@ -92,8 +92,9 @@ namespace eosio { namespace chain { #ifdef EOSIO_EOS_VM_OC_RUNTIME_ENABLED if(my->eosvmoc) { const chain::eosvmoc::code_descriptor* cd = nullptr; + chain::eosvmoc::code_cache_base::get_cd_failure failure = chain::eosvmoc::code_cache_base::get_cd_failure::temporary; try { - cd = my->eosvmoc->cc.get_descriptor_for_code(code_hash, vm_version, context.control.is_write_window()); + cd = my->eosvmoc->cc.get_descriptor_for_code(code_hash, vm_version, context.control.is_write_window(), failure); } catch(...) { //swallow errors here, if EOS VM OC has gone in to the weeds we shouldn't bail: continue to try and run baseline @@ -107,15 +108,18 @@ namespace eosio { namespace chain { my->eosvmoc->exec->execute(*cd, my->eosvmoc->mem, context); return; } + else if (context.trx_context.is_read_only()) { + if (failure == chain::eosvmoc::code_cache_base::get_cd_failure::temporary) { + EOS_ASSERT(false, ro_trx_vm_oc_compile_temporary_failure, "get_descriptor_for_code failed with temporary failure"); + } else { + EOS_ASSERT(false, ro_trx_vm_oc_compile_permanent_failure, "get_descriptor_for_code failed with permanent failure"); + } + } } #endif my->get_instantiated_module(code_hash, vm_type, vm_version, context.trx_context)->apply(context); } - void wasm_interface::exit() { - my->runtime_interface->immediately_exit_currently_running_module(); - } - bool wasm_interface::is_code_cached(const digest_type& code_hash, const uint8_t& vm_type, const uint8_t& vm_version) const { return my->is_code_cached(code_hash, vm_type, vm_version); } diff --git a/libraries/chain/webassembly/cf_system.cpp b/libraries/chain/webassembly/cf_system.cpp index 32afe93612..8497a81bfa 100644 --- a/libraries/chain/webassembly/cf_system.cpp +++ b/libraries/chain/webassembly/cf_system.cpp @@ -45,7 +45,8 @@ namespace eosio { namespace chain { namespace webassembly { } } + //be aware that EOS VM OC handles eosio_exit internally and this function will not be called by OC void interface::eosio_exit( int32_t code ) const { - context.control.get_wasm_interface().exit(); + throw wasm_exit{}; } }}} // ns eosio::chain::webassembly diff --git a/libraries/chain/webassembly/runtimes/eos-vm-oc.cpp b/libraries/chain/webassembly/runtimes/eos-vm-oc.cpp index c4ba7335ce..b65f957b00 100644 --- a/libraries/chain/webassembly/runtimes/eos-vm-oc.cpp +++ b/libraries/chain/webassembly/runtimes/eos-vm-oc.cpp @@ -55,9 +55,6 @@ std::unique_ptr eosvmoc_runtime::instantiate return std::make_unique(code_hash, vm_type, *this); } -//never called. EOS VM OC overrides eosio_exit to its own implementation -void eosvmoc_runtime::immediately_exit_currently_running_module() {} - void eosvmoc_runtime::init_thread_local_data() { exec_thread_local = std::make_unique(cc); } diff --git a/libraries/chain/webassembly/runtimes/eos-vm-oc/code_cache.cpp b/libraries/chain/webassembly/runtimes/eos-vm-oc/code_cache.cpp index dbec72c353..3746e6c6c0 100644 --- a/libraries/chain/webassembly/runtimes/eos-vm-oc/code_cache.cpp +++ b/libraries/chain/webassembly/runtimes/eos-vm-oc/code_cache.cpp @@ -106,7 +106,7 @@ std::tuple code_cache_async::consume_compile_thread_queue() { } -const code_descriptor* const code_cache_async::get_descriptor_for_code(const digest_type& code_id, const uint8_t& vm_version, bool is_write_window) { +const code_descriptor* const code_cache_async::get_descriptor_for_code(const digest_type& code_id, const uint8_t& vm_version, bool is_write_window, get_cd_failure& failure) { //if there are any outstanding compiles, process the result queue now //When app is in write window, all tasks are running sequentially and read-only threads //are not running. Safe to update cache entries. @@ -140,33 +140,44 @@ const code_descriptor* const code_cache_async::get_descriptor_for_code(const dig _cache_index.relocate(_cache_index.begin(), _cache_index.project<0>(it)); return &*it; } - if(!is_write_window) + if(!is_write_window) { + failure = get_cd_failure::temporary; // Compile might not be done yet return nullptr; + } const code_tuple ct = code_tuple{code_id, vm_version}; - if(_blacklist.find(ct) != _blacklist.end()) + if(_blacklist.find(ct) != _blacklist.end()) { + failure = get_cd_failure::permanent; // Compile will not start return nullptr; + } if(auto it = _outstanding_compiles_and_poison.find(ct); it != _outstanding_compiles_and_poison.end()) { + failure = get_cd_failure::temporary; // Compile might not be done yet it->second = false; return nullptr; } - if(_queued_compiles.find(ct) != _queued_compiles.end()) + if(_queued_compiles.find(ct) != _queued_compiles.end()) { + failure = get_cd_failure::temporary; // Compile might not be done yet return nullptr; + } if(_outstanding_compiles_and_poison.size() >= _threads) { _queued_compiles.emplace(ct); + failure = get_cd_failure::temporary; // Compile might not be done yet return nullptr; } const code_object* const codeobject = _db.find(boost::make_tuple(code_id, 0, vm_version)); - if(!codeobject) //should be impossible right? + if(!codeobject) { //should be impossible right? + failure = get_cd_failure::permanent; // Compile will not start return nullptr; + } _outstanding_compiles_and_poison.emplace(ct, false); std::vector fds_to_pass; fds_to_pass.emplace_back(memfd_for_bytearray(codeobject->code)); write_message_with_fds(_compile_monitor_write_socket, compile_wasm_message{ ct }, fds_to_pass); + failure = get_cd_failure::temporary; // Compile might not be done yet return nullptr; } diff --git a/libraries/chain/webassembly/runtimes/eos-vm.cpp b/libraries/chain/webassembly/runtimes/eos-vm.cpp index 39e549706e..522d39e13d 100644 --- a/libraries/chain/webassembly/runtimes/eos-vm.cpp +++ b/libraries/chain/webassembly/runtimes/eos-vm.cpp @@ -233,11 +233,6 @@ class eos_vm_profiling_module : public wasm_instantiated_module_interface { template eos_vm_runtime::eos_vm_runtime() {} -template -void eos_vm_runtime::immediately_exit_currently_running_module() { - throw wasm_exit{}; -} - template std::unique_ptr eos_vm_runtime::instantiate_module(const char* code_bytes, size_t code_size, const digest_type&, const uint8_t&, const uint8_t&) { @@ -261,10 +256,6 @@ template class eos_vm_runtime; eos_vm_profile_runtime::eos_vm_profile_runtime() {} -void eos_vm_profile_runtime::immediately_exit_currently_running_module() { - throw wasm_exit{}; -} - std::unique_ptr eos_vm_profile_runtime::instantiate_module(const char* code_bytes, size_t code_size, const digest_type&, const uint8_t&, const uint8_t&) { diff --git a/libraries/libfc/include/fc/log/appender.hpp b/libraries/libfc/include/fc/log/appender.hpp index aca101e4c9..7f1b4a38a2 100644 --- a/libraries/libfc/include/fc/log/appender.hpp +++ b/libraries/libfc/include/fc/log/appender.hpp @@ -3,11 +3,7 @@ #include #include -#if BOOST_VERSION >= 106600 namespace boost { namespace asio { class io_context; typedef io_context io_service; } } -#else -namespace boost { namespace asio { class io_service; } } -#endif namespace fc { class appender; diff --git a/libraries/libfc/test/network/test_message_buffer.cpp b/libraries/libfc/test/network/test_message_buffer.cpp index 315116eefb..de872f43cb 100644 --- a/libraries/libfc/test/network/test_message_buffer.cpp +++ b/libraries/libfc/test/network/test_message_buffer.cpp @@ -7,19 +7,11 @@ namespace { size_t mb_size(boost::asio::mutable_buffer& mb) { -#if BOOST_VERSION >= 106600 return mb.size(); -#else - return boost::asio::detail::buffer_size_helper(mb); -#endif } void* mb_data(boost::asio::mutable_buffer& mb) { -#if BOOST_VERSION >= 106600 return mb.data(); -#else - return boost::asio::detail::buffer_cast_helper(mb); -#endif } } diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index f9c46f3f1a..ca586c2133 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -51,6 +51,8 @@ std::ostream& operator<<(std::ostream& osm, eosio::chain::db_read_mode m) { osm << "head"; } else if ( m == eosio::chain::db_read_mode::IRREVERSIBLE ) { osm << "irreversible"; + } else if ( m == eosio::chain::db_read_mode::SPECULATIVE ) { + osm << "speculative"; } return osm; @@ -70,10 +72,12 @@ void validate(boost::any& v, // one string, it's an error, and exception will be thrown. std::string const& s = validators::get_single_string(values); - if ( s == "head" ) { + if ( s == "head" ) { v = boost::any(eosio::chain::db_read_mode::HEAD); } else if ( s == "irreversible" ) { v = boost::any(eosio::chain::db_read_mode::IRREVERSIBLE); + } else if ( s == "speculative" ) { + v = boost::any(eosio::chain::db_read_mode::SPECULATIVE); } else { throw validation_error(validation_error::invalid_option_value); } @@ -286,10 +290,12 @@ void chain_plugin::set_program_options(options_description& cli, options_descrip ("sender-bypass-whiteblacklist", boost::program_options::value>()->composing()->multitoken(), "Deferred transactions sent by accounts in this list do not have any of the subjective whitelist/blacklist checks applied to them (may specify multiple times)") ("read-mode", boost::program_options::value()->default_value(eosio::chain::db_read_mode::HEAD), - "Database read mode (\"head\", \"irreversible\").\n" + "Database read mode (\"head\", \"irreversible\", \"speculative\").\n" "In \"head\" mode: database contains state changes up to the head block; transactions received by the node are relayed if valid.\n" "In \"irreversible\" mode: database contains state changes up to the last irreversible block; " "transactions received via the P2P network are not relayed and transactions cannot be pushed via the chain API.\n" + "In \"speculative\" mode: (DEPRECATED: head mode recommended) database contains state changes by transactions in the blockchain " + "up to the head block as well as some transactions not yet included in the blockchain; transactions received by the node are relayed if valid.\n" ) ( "api-accept-transactions", bpo::value()->default_value(true), "Allow API transactions to be evaluated and relayed if valid.") ("validation-mode", boost::program_options::value()->default_value(eosio::chain::validation_mode::FULL), diff --git a/plugins/http_plugin/http_plugin.cpp b/plugins/http_plugin/http_plugin.cpp index d72ef4c763..5313fc8a6d 100644 --- a/plugins/http_plugin/http_plugin.cpp +++ b/plugins/http_plugin/http_plugin.cpp @@ -28,7 +28,7 @@ namespace eosio { using std::regex; using boost::asio::ip::tcp; using std::shared_ptr; - + static http_plugin_defaults current_http_plugin_defaults; static bool verbose_http_errors = false; @@ -36,6 +36,10 @@ namespace eosio { current_http_plugin_defaults = config; } + std::string http_plugin::get_server_header() { + return current_http_plugin_defaults.server_header; + } + using http_plugin_impl_ptr = std::shared_ptr; class http_plugin_impl : public std::enable_shared_from_this { @@ -47,9 +51,9 @@ namespace eosio { http_plugin_impl& operator=(const http_plugin_impl&) = delete; http_plugin_impl& operator=(http_plugin_impl&&) = delete; - + std::optional listen_endpoint; - + std::optional unix_endpoint; shared_ptr > beast_server; @@ -272,7 +276,7 @@ namespace eosio { my->plugin_state->server_header = current_http_plugin_defaults.server_header; - + //watch out for the returns above when adding new code here } FC_LOG_AND_RETHROW() } @@ -309,7 +313,7 @@ namespace eosio { if(my->unix_endpoint) { try { my->create_beast_server(true); - + my->beast_unix_server->listen(*my->unix_endpoint); my->beast_unix_server->start_accept(); } catch ( const fc::exception& e ){ @@ -335,7 +339,7 @@ namespace eosio { } } }}, appbase::exec_queue::read_only); - + } catch (...) { fc_elog(logger(), "http_plugin startup fails, shutting down"); app().quit(); @@ -457,9 +461,9 @@ namespace eosio { fc::microseconds http_plugin::get_max_response_time()const { return my->plugin_state->max_response_time; } - + size_t http_plugin::get_max_body_size()const { return my->plugin_state->max_body_size; } - + } diff --git a/plugins/http_plugin/include/eosio/http_plugin/beast_http_session.hpp b/plugins/http_plugin/include/eosio/http_plugin/beast_http_session.hpp index 05bcdd7d35..bc2669fbf7 100644 --- a/plugins/http_plugin/include/eosio/http_plugin/beast_http_session.hpp +++ b/plugins/http_plugin/include/eosio/http_plugin/beast_http_session.hpp @@ -11,20 +11,10 @@ namespace eosio { using std::chrono::steady_clock; -// Boost 1.70 introduced a breaking change that causes problems with construction of strand objects from tcp_socket -// this is suggested fix OK'd Beast author (V. Falco) to handle both versions gracefully -// see https://stackoverflow.com/questions/58453017/boost-asio-tcp-socket-1-70-not-backward-compatible -#if BOOST_VERSION < 107000 -typedef tcp::socket tcp_socket_t; -#else typedef asio::basic_stream_socket tcp_socket_t; -#endif using boost::asio::local::stream_protocol; -#if BOOST_VERSION < 107000 -using local_stream = boost::asio::basic_stream_socket; -#else #if BOOST_VERSION < 107300 using local_stream = beast::basic_stream< stream_protocol, @@ -36,7 +26,6 @@ using local_stream = beast::basic_stream< asio::any_io_executor, beast::unlimited_rate_policy>; #endif -#endif //------------------------------------------------------------------------------ // fail() @@ -54,11 +43,7 @@ bool allow_host(const http::request& req, T& session, auto is_conn_secure = session.is_secure(); auto& socket = session.socket(); -#if BOOST_VERSION < 107000 - auto& lowest_layer = beast::get_lowest_layer(socket); -#else auto& lowest_layer = beast::get_lowest_layer(socket); -#endif auto local_endpoint = lowest_layer.local_endpoint(); auto local_socket_host_port = local_endpoint.address().to_string() + ":" + std::to_string(local_endpoint.port()); const std::string host_str(req["host"]); diff --git a/plugins/http_plugin/include/eosio/http_plugin/http_plugin.hpp b/plugins/http_plugin/include/eosio/http_plugin/http_plugin.hpp index f6d3d5c44d..50c47e038b 100644 --- a/plugins/http_plugin/include/eosio/http_plugin/http_plugin.hpp +++ b/plugins/http_plugin/include/eosio/http_plugin/http_plugin.hpp @@ -79,6 +79,7 @@ namespace eosio { //must be called before initialize static void set_defaults(const http_plugin_defaults& config); + static std::string get_server_header(); APPBASE_PLUGIN_REQUIRES() void set_program_options(options_description&, options_description& cfg) override; @@ -122,7 +123,7 @@ namespace eosio { void register_metrics_listener(chain::plugin_interface::metrics_listener listener); size_t get_max_body_size()const; - + private: std::shared_ptr my; }; @@ -176,7 +177,7 @@ namespace eosio { }; /** - * @brief Used to trim whitespace from body. + * @brief Used to trim whitespace from body. * Returned string_view valid only for lifetime of body */ inline std::string_view make_trimmed_string_view(const std::string& body) { diff --git a/plugins/producer_api_plugin/producer.swagger.yaml b/plugins/producer_api_plugin/producer.swagger.yaml index de6d334b38..8b6fd5bd5a 100644 --- a/plugins/producer_api_plugin/producer.swagger.yaml +++ b/plugins/producer_api_plugin/producer.swagger.yaml @@ -346,7 +346,7 @@ paths: /producer/schedule_snapshot: post: summary: schedule_snapshot - description: Submits a request to generate a schedule for automated snapshot with given parameters. If request body is empty, triest to execute snapshot immediately. Returns error when unable to create snapshot. + description: Submits a request to automatically generate snapshots according to a schedule specified with given parameters. If request body is empty, schedules immediate snapshot generation. Returns error when unable to accept schedule. operationId: schedule_snapshot requestBody: content: @@ -377,6 +377,12 @@ paths: application/json: schema: type: object + required: + - snapshot_request_id + - block_spacing + - start_block_num + - end_block_num + - snapshot_description properties: snapshot_request_id: type: integer @@ -405,8 +411,8 @@ paths: /producer/get_snapshot_requests: post: summary: get_snapshot_requests - description: Returns a list of scheduled snapshots - operationId: get_snapshot_status + description: Returns a list of scheduled snapshots. + operationId: get_snapshot_requests responses: "201": description: OK @@ -417,9 +423,16 @@ paths: properties: snapshot_requests: type: array - description: An array of scheduled snapshots + description: An array of scheduled snapshot requests items: type: object + required: + - snapshot_request_id + - block_spacing + - start_block_num + - end_block_num + - snapshot_description + - pending_snapshots properties: snapshot_request_id: type: integer @@ -444,6 +457,12 @@ paths: description: List of pending snapshots items: type: object + required: + - head_block_id + - head_block_num + - head_block_time + - version + - snapshot_name properties: head_block_id: $ref: "https://docs.eosnetwork.com/openapi/v2.0/Sha256.yaml" @@ -472,7 +491,7 @@ paths: /producer/unschedule_snapshot: post: summary: unschedule_snapshot - description: Submits a request to remove identified by id recurring snapshot from schedule. Returns error when unable to create unschedule. + description: Removes snapshot request identified by id. Returns error if referenced snapshot request does not exist. operationId: unschedule_snapshot requestBody: required: true @@ -491,6 +510,12 @@ paths: application/json: schema: type: object + required: + - snapshot_request_id + - block_spacing + - start_block_num + - end_block_num + - snapshot_description properties: snapshot_request_id: type: integer diff --git a/plugins/producer_api_plugin/producer_api_plugin.cpp b/plugins/producer_api_plugin/producer_api_plugin.cpp index a8f84a0fa8..185655de4f 100644 --- a/plugins/producer_api_plugin/producer_api_plugin.cpp +++ b/plugins/producer_api_plugin/producer_api_plugin.cpp @@ -141,9 +141,9 @@ void producer_api_plugin::plugin_startup() { CALL_ASYNC(producer, producer, create_snapshot, producer_plugin::snapshot_information, INVOKE_R_V_ASYNC(producer, create_snapshot), 201), CALL_WITH_400(producer, producer, schedule_snapshot, - INVOKE_V_R_II(producer, schedule_snapshot, producer_plugin::snapshot_request_information), 201), + INVOKE_R_R_II(producer, schedule_snapshot, producer_plugin::snapshot_request_information), 201), CALL_WITH_400(producer, producer, unschedule_snapshot, - INVOKE_V_R(producer, unschedule_snapshot, producer_plugin::snapshot_request_id_information), 201), + INVOKE_R_R(producer, unschedule_snapshot, producer_plugin::snapshot_request_id_information), 201), CALL_WITH_400(producer, producer, get_integrity_hash, INVOKE_R_V(producer, get_integrity_hash), 201), CALL_WITH_400(producer, producer, schedule_protocol_feature_activations, diff --git a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp index 01702ea9aa..7f7a95ccc4 100644 --- a/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp +++ b/plugins/producer_plugin/include/eosio/producer_plugin/producer_plugin.hpp @@ -94,8 +94,11 @@ class producer_plugin : public appbase::plugin { uint32_t snapshot_request_id = 0; }; + struct snapshot_schedule_result : public snapshot_request_id_information, public snapshot_request_information { + }; + struct snapshot_schedule_information : public snapshot_request_id_information, public snapshot_request_information { - std::optional> pending_snapshots; + std::vector pending_snapshots; }; struct get_snapshot_requests_result { @@ -159,8 +162,8 @@ class producer_plugin : public appbase::plugin { integrity_hash_information get_integrity_hash() const; void create_snapshot(next_function next); - void schedule_snapshot(const snapshot_request_information& schedule); - void unschedule_snapshot(const snapshot_request_id_information& schedule); + snapshot_schedule_result schedule_snapshot(const snapshot_request_information& schedule); + snapshot_schedule_result unschedule_snapshot(const snapshot_request_id_information& schedule); get_snapshot_requests_result get_snapshot_requests() const; scheduled_protocol_feature_activations get_scheduled_protocol_feature_activations() const; @@ -209,7 +212,7 @@ class producer_plugin : public appbase::plugin { static void set_test_mode(bool m) { test_mode_ = m; } private: inline static bool test_mode_{false}; // to be moved into appbase (application_base) - + std::shared_ptr my; }; @@ -224,6 +227,7 @@ FC_REFLECT(eosio::producer_plugin::snapshot_request_information, (block_spacing) FC_REFLECT(eosio::producer_plugin::snapshot_request_id_information, (snapshot_request_id)) FC_REFLECT(eosio::producer_plugin::get_snapshot_requests_result, (snapshot_requests)) FC_REFLECT_DERIVED(eosio::producer_plugin::snapshot_schedule_information, (eosio::producer_plugin::snapshot_request_id_information)(eosio::producer_plugin::snapshot_request_information), (pending_snapshots)) +FC_REFLECT_DERIVED(eosio::producer_plugin::snapshot_schedule_result, (eosio::producer_plugin::snapshot_request_id_information)(eosio::producer_plugin::snapshot_request_information),) FC_REFLECT(eosio::producer_plugin::scheduled_protocol_feature_activations, (protocol_features_to_activate)) FC_REFLECT(eosio::producer_plugin::get_supported_protocol_features_params, (exclude_disabled)(exclude_unactivatable)) FC_REFLECT(eosio::producer_plugin::get_account_ram_corrections_params, (lower_bound)(upper_bound)(limit)(reverse)) diff --git a/plugins/producer_plugin/include/eosio/producer_plugin/snapshot_scheduler.hpp b/plugins/producer_plugin/include/eosio/producer_plugin/snapshot_scheduler.hpp index ad6e005f63..2dd86b85fe 100644 --- a/plugins/producer_plugin/include/eosio/producer_plugin/snapshot_scheduler.hpp +++ b/plugins/producer_plugin/include/eosio/producer_plugin/snapshot_scheduler.hpp @@ -45,7 +45,7 @@ class snapshot_scheduler { void x_serialize() { auto& vec = _snapshot_requests.get(); std::vector sr(vec.begin(), vec.end()); - _snapshot_db << sr; + _snapshot_db << sr; } public: @@ -54,7 +54,7 @@ class snapshot_scheduler { // snapshot_scheduler_listener void on_start_block(uint32_t height) { bool serialize_needed = false; - bool snapshot_executed = false; + bool snapshot_executed = false; auto execute_snapshot_with_log = [this, height, &snapshot_executed](const auto & req) { // one snapshot per height @@ -64,18 +64,18 @@ class snapshot_scheduler { ("end_block_num", req.end_block_num) ("block_spacing", req.block_spacing) ("height", height)); - + execute_snapshot(req.snapshot_request_id); snapshot_executed = true; } - }; + }; std::vector unschedule_snapshot_request_ids; for(const auto& req: _snapshot_requests.get<0>()) { // -1 since its called from start block bool recurring_snapshot = req.block_spacing && (height >= req.start_block_num + 1) && (!((height - req.start_block_num - 1) % req.block_spacing)); bool onetime_snapshot = (!req.block_spacing) && (height == req.start_block_num + 1); - + // assume "asap" for snapshot with missed/zero start, it can have spacing if(!req.start_block_num) { // update start_block_num with current height only if this is recurring @@ -84,16 +84,16 @@ class snapshot_scheduler { auto& snapshot_by_id = _snapshot_requests.get(); auto it = snapshot_by_id.find(req.snapshot_request_id); _snapshot_requests.modify(it, [&height](auto& p) { p.start_block_num = height - 1; }); - serialize_needed = true; + serialize_needed = true; } execute_snapshot_with_log(req); } else if(recurring_snapshot || onetime_snapshot) { execute_snapshot_with_log(req); - } + } // cleanup - remove expired (or invalid) request - if((!req.start_block_num && !req.block_spacing) || - (!req.block_spacing && height >= (req.start_block_num + 1)) || + if((!req.start_block_num && !req.block_spacing) || + (!req.block_spacing && height >= (req.start_block_num + 1)) || (req.end_block_num > 0 && height >= (req.end_block_num + 1))) { unschedule_snapshot_request_ids.push_back(req.snapshot_request_id); } @@ -108,7 +108,7 @@ class snapshot_scheduler { } // snapshot_scheduler_handler - void schedule_snapshot(const producer_plugin::snapshot_request_information& sri) { + producer_plugin::snapshot_schedule_result schedule_snapshot(const producer_plugin::snapshot_request_information& sri) { auto& snapshot_by_value = _snapshot_requests.get(); auto existing = snapshot_by_value.find(std::make_tuple(sri.block_spacing, sri.start_block_num, sri.end_block_num)); EOS_ASSERT(existing == snapshot_by_value.end(), chain::duplicate_snapshot_request, "Duplicate snapshot request"); @@ -122,19 +122,27 @@ class snapshot_scheduler { } } - _snapshot_requests.emplace(producer_plugin::snapshot_schedule_information {{_snapshot_id++},{sri.block_spacing, sri.start_block_num, sri.end_block_num, sri.snapshot_description},{}}); + _snapshot_requests.emplace(producer_plugin::snapshot_schedule_information {{_snapshot_id++}, {sri.block_spacing, sri.start_block_num, sri.end_block_num, sri.snapshot_description},{}}); x_serialize(); + + // returning snapshot_schedule_result + return producer_plugin::snapshot_schedule_result{{_snapshot_id - 1}, {sri.block_spacing, sri.start_block_num, sri.end_block_num, sri.snapshot_description}}; } - virtual void unschedule_snapshot(uint32_t sri) { + producer_plugin::snapshot_schedule_result unschedule_snapshot(uint32_t sri) { auto& snapshot_by_id = _snapshot_requests.get(); auto existing = snapshot_by_id.find(sri); EOS_ASSERT(existing != snapshot_by_id.end(), chain::snapshot_request_not_found, "Snapshot request not found"); + + producer_plugin::snapshot_schedule_result result{{existing->snapshot_request_id}, {existing->block_spacing, existing->start_block_num, existing->end_block_num, existing->snapshot_description}}; _snapshot_requests.erase(existing); x_serialize(); + + // returning snapshot_schedule_result + return result; } - virtual producer_plugin::get_snapshot_requests_result get_snapshot_requests() { + producer_plugin::get_snapshot_requests_result get_snapshot_requests() { producer_plugin::get_snapshot_requests_result result; auto& asvector = _snapshot_requests.get(); result.snapshot_requests.reserve(asvector.size()); @@ -160,11 +168,8 @@ class snapshot_scheduler { auto& snapshot_by_id = _snapshot_requests.get(); auto snapshot_req = snapshot_by_id.find(_inflight_sid); if (snapshot_req != snapshot_by_id.end()) { - _snapshot_requests.modify(snapshot_req, [&si](auto& p) { - if (!p.pending_snapshots) { - p.pending_snapshots = std::vector(); - } - p.pending_snapshots->emplace_back(si); + _snapshot_requests.modify(snapshot_req, [&si](auto& p) { + p.pending_snapshots.emplace_back(si); }); } } @@ -193,14 +198,12 @@ class snapshot_scheduler { auto& snapshot_by_id = _snapshot_requests.get(); auto snapshot_req = snapshot_by_id.find(srid); - if (snapshot_req != snapshot_by_id.end()) { - if (auto pending = snapshot_req->pending_snapshots; pending) { - auto it = std::remove_if(pending->begin(), pending->end(), [&snapshot_info](const producer_plugin::snapshot_information & s){ return s.head_block_num <= snapshot_info.head_block_num; }); - pending->erase(it, pending->end()); - _snapshot_requests.modify(snapshot_req, [&pending](auto& p) { - p.pending_snapshots = std::move(pending); - }); - } + if (snapshot_req != snapshot_by_id.end()) { + _snapshot_requests.modify(snapshot_req, [&](auto& p) { + auto & pending = p.pending_snapshots; + auto it = std::remove_if(pending.begin(), pending.end(), [&snapshot_info](const producer_plugin::snapshot_information & s){ return s.head_block_num <= snapshot_info.head_block_num; }); + pending.erase(it, pending.end()); + }); } } }; diff --git a/plugins/producer_plugin/producer_plugin.cpp b/plugins/producer_plugin/producer_plugin.cpp index f96e07ca22..3ccf28527d 100644 --- a/plugins/producer_plugin/producer_plugin.cpp +++ b/plugins/producer_plugin/producer_plugin.cpp @@ -103,7 +103,8 @@ namespace { auto code = e.code(); return (code == block_cpu_usage_exceeded::code_value) || (code == block_net_usage_exceeded::code_value) || - (code == deadline_exception::code_value); + (code == deadline_exception::code_value) || + (code == ro_trx_vm_oc_compile_temporary_failure::code_value); } } @@ -424,7 +425,7 @@ class producer_plugin_impl : public std::enable_shared_from_this_snapshot_scheduler.schedule_snapshot(sri); + return my->_snapshot_scheduler.schedule_snapshot(sri); } -void producer_plugin::unschedule_snapshot(const snapshot_request_id_information& sri) +producer_plugin::snapshot_schedule_result producer_plugin::unschedule_snapshot(const snapshot_request_id_information& sri) { - my->_snapshot_scheduler.unschedule_snapshot(sri.snapshot_request_id); + return my->_snapshot_scheduler.unschedule_snapshot(sri.snapshot_request_id); } producer_plugin::get_snapshot_requests_result producer_plugin::get_snapshot_requests() const @@ -2332,10 +2333,10 @@ producer_plugin_impl::handle_push_result( const transaction_metadata_ptr& trx, } if( exception_is_exhausted( *trace->except ) ) { if( _pending_block_mode == pending_block_mode::producing ) { - fc_dlog(_trx_failed_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING ", + fc_dlog(trx->is_transient() ? _transient_trx_failed_trace_log : _trx_failed_trace_log, "[TRX_TRACE] Block ${block_num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING ", ("block_num", chain.head_block_num() + 1)("prod", get_pending_block_producer())("txid", trx->id())); } else { - fc_dlog(_trx_failed_trace_log, "[TRX_TRACE] Speculative execution COULD NOT FIT tx: ${txid} RETRYING", ("txid", trx->id())); + fc_dlog(trx->is_transient() ? _transient_trx_failed_trace_log : _trx_failed_trace_log, "[TRX_TRACE] Speculative execution COULD NOT FIT tx: ${txid} RETRYING", ("txid", trx->id())); } if ( !trx->is_read_only() ) pr.block_exhausted = block_is_exhausted(); // smaller trx might fit diff --git a/plugins/producer_plugin/test/test_snapshot_scheduler.cpp b/plugins/producer_plugin/test/test_snapshot_scheduler.cpp index 388684c142..40ae2b9a30 100644 --- a/plugins/producer_plugin/test/test_snapshot_scheduler.cpp +++ b/plugins/producer_plugin/test/test_snapshot_scheduler.cpp @@ -98,8 +98,8 @@ BOOST_AUTO_TEST_CASE(snapshot_scheduler_test) { // we should have a pending snapshot for request id = 0 BOOST_REQUIRE(it != snapshot_requests.end()); auto& pending = it->pending_snapshots; - if (pending && pending->size()==1) { - BOOST_CHECK_EQUAL(9, pending->begin()->head_block_num); + if (pending.size()==1) { + BOOST_CHECK_EQUAL(9, pending.begin()->head_block_num); } } }); @@ -126,8 +126,7 @@ BOOST_AUTO_TEST_CASE(snapshot_scheduler_test) { // snapshot request with id = 0 should be found and should not have any pending snapshots BOOST_REQUIRE(it != snapshot_requests.end()); - BOOST_REQUIRE(it->pending_snapshots); - BOOST_CHECK(!it->pending_snapshots->size()); + BOOST_CHECK(!it->pending_snapshots.size()); // quit app appbase::app().quit(); diff --git a/plugins/prometheus_plugin/include/eosio/prometheus_plugin/simple_rest_server.hpp b/plugins/prometheus_plugin/include/eosio/prometheus_plugin/simple_rest_server.hpp new file mode 100644 index 0000000000..f82492df28 --- /dev/null +++ b/plugins/prometheus_plugin/include/eosio/prometheus_plugin/simple_rest_server.hpp @@ -0,0 +1,234 @@ +#pragma once + +#include +#include +#include + +namespace eosio { namespace rest { + + // The majority of the code here are derived from boost source + // libs/beast/example/http/server/async/http_server_async.cpp + // with minimum modification and yet reusable. + + namespace beast = boost::beast; // from + namespace http = beast::http; // from + namespace net = boost::asio; // from + using tcp = boost::asio::ip::tcp; // from + template + class simple_server { + T* self() { return static_cast(this); } + + void fail(beast::error_code ec, char const* what) { self()->log_error(what, ec.message()); } + // Return a response for the given request. + http::response handle_request(http::request&& req) { + auto server_header = self()->server_header(); + // Returns a bad request response + auto const bad_request = [&req, &server_header](std::string_view why) { + http::response res{ http::status::bad_request, req.version() }; + res.set(http::field::server, server_header); + res.set(http::field::content_type, "text/plain"); + res.keep_alive(req.keep_alive()); + res.body() = std::string(why); + res.prepare_payload(); + return res; + }; + + // Returns a not found response + auto const not_found = [&req, &server_header](std::string_view target) { + http::response res{ http::status::not_found, req.version() }; + res.set(http::field::server, server_header); + res.set(http::field::content_type, "text/plain"); + res.keep_alive(req.keep_alive()); + res.body() = "The resource '" + std::string(target) + "' was not found."; + res.prepare_payload(); + return res; + }; + + // Returns a server error response + auto const server_error = [&req, &server_header](std::string_view what) { + http::response res{ http::status::internal_server_error, req.version() }; + res.set(http::field::server, server_header); + res.set(http::field::content_type, "text/plain"); + res.keep_alive(req.keep_alive()); + res.body() = "An error occurred: '" + std::string(what) + "'"; + res.prepare_payload(); + return res; + }; + + // Make sure we can handle the method + if (!self()->allow_method(req.method())) + return bad_request("Unknown HTTP-method"); + + // Request path must be absolute and not contain "..". + std::string_view target{req.target().data(), req.target().size()}; + if (target.empty() || target[0] != '/' || target.find("..") != std::string_view::npos) + return bad_request("Illegal request-target"); + + try { + auto res = self()->on_request(std::move(req)); + if (!res) + not_found(target); + return *res; + } catch (std::exception& ex) { return server_error(ex.what()); } + } + + class session : public std::enable_shared_from_this { + tcp::socket socket_; + boost::asio::io_context::strand strand_; + beast::flat_buffer buffer_; + http::request req_; + simple_server* server_; + std::shared_ptr> res_; + + public: + // Take ownership of the stream + session(net::io_context& ioc, tcp::socket&& socket, simple_server* server) + : socket_(std::move(socket)), strand_(ioc), server_(server) {} + + // Start the asynchronous operation + void run() { do_read(); } + + void do_read() { + // Make the request empty before reading, + // otherwise the operation behavior is undefined. + req_ = {}; + + // Read a request + http::async_read( + socket_, buffer_, req_, + boost::asio::bind_executor(strand_, [self = this->shared_from_this()](beast::error_code ec, + std::size_t bytes_transferred) { + self->on_read(ec, bytes_transferred); + })); + } + + void on_read(beast::error_code ec, std::size_t bytes_transferred) { + boost::ignore_unused(bytes_transferred); + + // This means they closed the connection + if (ec == http::error::end_of_stream) + return do_close(); + + if (ec) + return server_->fail(ec, "read"); + + // Send the response + send_response(server_->handle_request(std::move(req_))); + } + + void send_response(http::response&& msg) { + // The lifetime of the message has to extend + // for the duration of the async operation so + // we use a shared_ptr to manage it. + res_ = std::make_shared>(std::move(msg)); + + // Write the response + http::async_write(socket_, *res_, + boost::asio::bind_executor(socket_.get_executor(), + [self = this->shared_from_this(), close = res_->need_eof()]( + beast::error_code ec, std::size_t bytes_transferred) { + self->on_write(ec, bytes_transferred, close); + })); + } + + void on_write(boost::system::error_code ec, std::size_t bytes_transferred, bool close) { + boost::ignore_unused(bytes_transferred); + + if (ec) + return server_->fail(ec, "write"); + + if (close) { + // This means we should close the connection, usually because + // the response indicated the "Connection: close" semantic. + return do_close(); + } + + // We're done with the response so delete it + res_ = nullptr; + + // Read another request + do_read(); + } + + void do_close() { + // Send a TCP shutdown + beast::error_code ec; + socket_.shutdown(tcp::socket::shutdown_send, ec); + + // At this point the connection is closed gracefully + } + }; + + //------------------------------------------------------------------------------ + + // Accepts incoming connections and launches the sessions + class listener : public std::enable_shared_from_this { + net::io_context& ioc_; + tcp::acceptor acceptor_; + tcp::socket socket_; + simple_server* server_; + + public: + listener(net::io_context& ioc, tcp::endpoint endpoint, simple_server* server) + : ioc_(ioc), acceptor_(ioc), socket_(ioc), server_(server) { + boost::system::error_code ec; + + // Open the acceptor + acceptor_.open(endpoint.protocol(), ec); + if (ec) { + server_->fail(ec, "open"); + return; + } + + // Allow address reuse + acceptor_.set_option(net::socket_base::reuse_address(true), ec); + if (ec) { + server_->fail(ec, "set_option"); + return; + } + + // Bind to the server address + acceptor_.bind(endpoint, ec); + if (ec) { + server_->fail(ec, "bind"); + return; + } + + // Start listening for connections + acceptor_.listen(net::socket_base::max_listen_connections, ec); + if (ec) { + server_->fail(ec, "listen"); + return; + } + } + + // Start accepting incoming connections + void run() { + if (!acceptor_.is_open()) + return; + do_accept(); + } + + private: + void do_accept() { + acceptor_.async_accept( + socket_, [self = this->shared_from_this()](boost::system::error_code ec) { self->on_accept(ec); }); + } + + void on_accept(boost::system::error_code ec) { + if (ec) { + server_->fail(ec, "accept"); + } else { + // Create the session and run it + std::make_shared(ioc_, std::move(socket_), server_)->run(); + } + + // Accept another connection + do_accept(); + } + }; + + public: + void run(net::io_context& ioc, tcp::endpoint endpoint) { std::make_shared(ioc, endpoint, this)->run(); } + }; +}} // namespace eosio::rest \ No newline at end of file diff --git a/plugins/prometheus_plugin/prometheus_plugin.cpp b/plugins/prometheus_plugin/prometheus_plugin.cpp index 4753c6491e..33cc3cbd5a 100644 --- a/plugins/prometheus_plugin/prometheus_plugin.cpp +++ b/plugins/prometheus_plugin/prometheus_plugin.cpp @@ -1,8 +1,11 @@ #include +#include + #include #include #include #include +#include #include #include @@ -14,8 +17,8 @@ #include #include - namespace eosio { + static const char* prometheus_api_name = "/v1/prometheus/metrics"; using namespace prometheus; using namespace chain::plugin_interface; @@ -110,12 +113,41 @@ namespace eosio { } }; - struct prometheus_plugin_impl { + namespace http = boost::beast::http; + struct prometheus_plugin_impl : rest::simple_server { + + std::string server_header() const { + return http_plugin::get_server_header(); + } + + void log_error(char const* what, const std::string& message) { + elog("${what}: ${message}", ("what", what)("message", message)); + } + + bool allow_method(http::verb method) const { + return method == http::verb::get; + } + + std::optional> + on_request(http::request&& req) { + if(req.target() != prometheus_api_name) + return {}; + http::response res{ http::status::ok, req.version() }; + // Respond to GET request + res.set(http::field::server, server_header()); + res.set(http::field::content_type, "text/plain"); + res.keep_alive(req.keep_alive()); + res.body() = metrics(); + res.prepare_payload(); + return res; + } + eosio::chain::named_thread_pool _prometheus_thread_pool; boost::asio::io_context::strand _prometheus_strand; prometheus_plugin_metrics _metrics; map> _plugin_metrics; + boost::asio::ip::tcp::endpoint _endpoint; prometheus_plugin_impl(): _prometheus_strand(_prometheus_thread_pool.get_executor()){ } @@ -179,32 +211,11 @@ namespace eosio { return body; } - void metrics_async(chain::plugin_interface::next_function results) { - _prometheus_strand.post([self=this, results=std::move(results)]() { - results(self->metrics()); - }); + void start() { + run(_prometheus_thread_pool.get_executor(), _endpoint); + _prometheus_thread_pool.start( + 1, [](const fc::exception& e) { elog("Prometheus exception ${e}", ("e", e)); }); } - - }; - - using metrics_params = fc::variant_object; - - struct prometheus_api { - prometheus_plugin_impl& _pp; - fc::microseconds _max_response_time_us; - - fc::time_point start() const { - return fc::time_point::now() + _max_response_time_us; - } - - void metrics(const metrics_params&, chain::plugin_interface::next_function results) { - _pp.metrics_async(std::move(results)); - } - - prometheus_api(prometheus_plugin_impl& plugin, const fc::microseconds& max_response_time) - : _pp(plugin) - , _max_response_time_us(max_response_time){} - }; prometheus_plugin::prometheus_plugin() @@ -214,24 +225,35 @@ namespace eosio { prometheus_plugin::~prometheus_plugin() = default; void prometheus_plugin::set_program_options(options_description&, options_description& cfg) { + cfg.add_options() + ("prometheus-exporter-address", bpo::value()->default_value("127.0.0.1:9101"), + "The local IP and port to listen for incoming prometheus metrics http request."); } void prometheus_plugin::plugin_initialize(const variables_map& options) { my->initialize_metrics(); - auto& _http_plugin = app().get_plugin(); - fc::microseconds max_response_time = _http_plugin.get_max_response_time(); + string lipstr = options.at("prometheus-exporter-address").as(); + EOS_ASSERT(lipstr.size() > 0, chain::plugin_config_exception, "prometheus-exporter-address must have a value"); + + string host = lipstr.substr(0, lipstr.find(':')); + string port = lipstr.substr(host.size() + 1, lipstr.size()); - prometheus_api handle(*my, max_response_time); - app().get_plugin().add_async_api({ - CALL_ASYNC_WITH_400(prometheus, handle, eosio, metrics, std::string, 200, http_params_types::no_params)}, http_content_type::plaintext); + boost::system::error_code ec; + using tcp = boost::asio::ip::tcp; + tcp::resolver resolver(app().get_io_service()); + + my->_endpoint = *resolver.resolve(tcp::v4(), host, port, ec); + if (!ec) { + fc_ilog(logger(), "configured prometheus metrics exporter to listen on ${h}", ("h", lipstr)); + } else { + fc_elog(logger(), "failed to configure prometheus metrics exporter to listen on ${h} (${m})", + ("h", lipstr)("m", ec.message())); + } } void prometheus_plugin::plugin_startup() { - my->_prometheus_thread_pool.start(1, []( const fc::exception& e ) { - elog("Prometheus excpetion ${e}:${l}", ("e", e)); - } ); - + my->start(); ilog("Prometheus plugin started."); } diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index ea2ed32d1b..ac832711f0 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -23,21 +23,6 @@ namespace ws = boost::beast::websocket; -/* Prior to boost 1.70, if socket type is not boost::asio::ip::tcp::socket nor boost::asio::ssl::stream beast requires - an overload of async_teardown. This has been improved in 1.70+ to support any basic_stream_socket<> out of the box - which includes unix sockets. */ -#if BOOST_VERSION < 107000 -namespace boost::beast::websocket { -template -void async_teardown(role_type, unixs::socket& sock, TeardownHandler&& handler) { - boost::system::error_code ec; - sock.close(ec); - boost::asio::post(boost::asio::get_associated_executor(handler, sock.get_executor()), - [h = std::move(handler), ec]() mutable { h(ec); }); -} -} // namespace boost::beast::websocket -#endif - namespace eosio { using namespace chain; using namespace state_history; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b412c07fa7..810d0520df 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -138,6 +138,8 @@ set_property(TEST get_account_test PROPERTY LABELS nonparallelizable_tests) add_test(NAME distributed-transactions-test COMMAND tests/distributed-transactions-test.py -d 2 -p 4 -n 6 -v --clean-run ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST distributed-transactions-test PROPERTY LABELS nonparallelizable_tests) +add_test(NAME distributed-transactions-speculative-test COMMAND tests/distributed-transactions-test.py -d 2 -p 4 -n 6 --speculative -v --clean-run ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) +set_property(TEST distributed-transactions-speculative-test PROPERTY LABELS nonparallelizable_tests) add_test(NAME restart-scenarios-test-resync COMMAND tests/restart-scenarios-test.py -c resync -p4 -v --clean-run ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST restart-scenarios-test-resync PROPERTY LABELS nonparallelizable_tests) add_test(NAME restart-scenarios-test-hard_replay COMMAND tests/restart-scenarios-test.py -c hardReplay -p4 -v --clean-run ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) diff --git a/tests/TestHarness/queries.py b/tests/TestHarness/queries.py index ffaf1dd115..e73c3ad06b 100644 --- a/tests/TestHarness/queries.py +++ b/tests/TestHarness/queries.py @@ -596,15 +596,18 @@ def processCleosCmd(self, cmd, cmdDesc, silentErrors=True, exitOnError=False, ex return trans - def processUrllibRequest(self, resource, command, payload={}, silentErrors=False, exitOnError=False, exitMsg=None, returnType=ReturnType.json, endpoint=None): + def processUrllibRequest(self, resource, command, payload={}, silentErrors=False, exitOnError=False, exitMsg=None, returnType=ReturnType.json, method="POST", endpoint=None): if not endpoint: endpoint = self.endpointHttp cmd = f"{endpoint}/v1/{resource}/{command}" - req = urllib.request.Request(cmd, method="POST") - req.add_header('Content-Type', 'application/json') - data = payload - data = json.dumps(data) - data = data.encode() + req = urllib.request.Request(cmd, method=method) + if len(payload): + req.add_header('Content-Type', 'application/json') + data = payload + data = json.dumps(data) + data = data.encode() + else: + data = None if Utils.Debug: Utils.Print("cmd: %s %s" % (cmd, payload)) rtn=None start=time.perf_counter() @@ -772,4 +775,3 @@ def getLatestBlockHeaderState(self): def getActivatedProtocolFeatures(self): latestBlockHeaderState = self.getLatestBlockHeaderState() return latestBlockHeaderState["activated_protocol_features"]["protocol_features"] - diff --git a/tests/distributed-transactions-test.py b/tests/distributed-transactions-test.py index e2b828bc4e..6a0aa2c959 100755 --- a/tests/distributed-transactions-test.py +++ b/tests/distributed-transactions-test.py @@ -3,6 +3,7 @@ import random from TestHarness import Cluster, TestHelper, Utils, WalletMgr +from TestHarness.TestHelper import AppArgs ############################################################### # distributed-transactions-test @@ -19,8 +20,10 @@ Print=Utils.Print errorExit=Utils.errorExit -args=TestHelper.parse_args({"-p","-n","-d","-s","--nodes-file","--seed" - ,"--dump-error-details","-v","--leave-running","--clean-run","--keep-logs","--unshared"}) +appArgs = AppArgs() +extraArgs = appArgs.add_bool(flag="--speculative", help="Run nodes in read-mode=speculative") +args=TestHelper.parse_args({"-p","-n","-d","-s","--nodes-file","--seed", "--speculative" + ,"--dump-error-details","-v","--leave-running","--clean-run","--keep-logs","--unshared"}, applicationSpecificArgs=appArgs) pnodes=args.p topo=args.s @@ -34,6 +37,7 @@ dumpErrorDetails=args.dump_error_details killAll=args.clean_run keepLogs=args.keep_logs +speculative=args.speculative killWallet=not dontKill killEosInstances=not dontKill @@ -71,7 +75,11 @@ (pnodes, total_nodes-pnodes, topo, delay)) Print("Stand up cluster") - if cluster.launch(pnodes=pnodes, totalNodes=total_nodes, topo=topo, delay=delay) is False: + extraNodeosArgs = "" + if speculative: + extraNodeosArgs = " --read-mode speculative " + + if cluster.launch(pnodes=pnodes, totalNodes=total_nodes, topo=topo, delay=delay, extraNodeosArgs=extraNodeosArgs) is False: errorExit("Failed to stand up eos cluster.") Print ("Wait for Cluster stabilization") diff --git a/tests/nodeos_irreversible_mode_test.py b/tests/nodeos_irreversible_mode_test.py index 4cb0feb475..cfe0b1c92e 100755 --- a/tests/nodeos_irreversible_mode_test.py +++ b/tests/nodeos_irreversible_mode_test.py @@ -21,7 +21,7 @@ cmdError = Utils.cmdError relaunchTimeout = 30 numOfProducers = 4 -totalNodes = 10 +totalNodes = 15 # Parse command line arguments args = TestHelper.parse_args({"-v","--clean-run","--dump-error-details","--leave-running","--keep-logs","--unshared"}) @@ -32,6 +32,7 @@ killEosInstances=not dontKill killWallet=not dontKill keepLogs=args.keep_logs +speculativeReadMode="head" # Setup cluster and it's wallet manager walletMgr=WalletMgr(True) @@ -174,7 +175,12 @@ def relaunchNode(node: Node, chainArg="", addSwapFlags=None, relaunchAssertMessa 0:"--enable-stale-production", 4:"--read-mode irreversible", 6:"--read-mode irreversible", - 9:"--plugin eosio::producer_api_plugin"}) + 9:"--plugin eosio::producer_api_plugin", + 10:"--read-mode speculative", + 11:"--read-mode irreversible", + 12:"--read-mode speculative", + 13:"--read-mode irreversible", + 14:"--read-mode speculative --plugin eosio::producer_api_plugin"}) producingNodeId = 0 producingNode = cluster.getNode(producingNodeId) @@ -254,7 +260,7 @@ def switchSpecToIrrMode(nodeIdOfNodeToTest, nodeToTest): # Kill and relaunch in irreversible mode nodeToTest.kill(signal.SIGTERM) - relaunchNode(nodeToTest, chainArg=" --read-mode irreversible") + relaunchNode(nodeToTest, addSwapFlags={"--read-mode": "irreversible"}) # Ensure the node condition is as expected after relaunch confirmHeadLibAndForkDbHeadOfIrrMode(nodeToTest, headLibAndForkDbHeadBeforeSwitchMode) @@ -267,7 +273,7 @@ def switchIrrToSpecMode(nodeIdOfNodeToTest, nodeToTest): # Kill and relaunch in speculative mode nodeToTest.kill(signal.SIGTERM) - relaunchNode(nodeToTest, addSwapFlags={"--read-mode": "head"}) + relaunchNode(nodeToTest, addSwapFlags={"--read-mode": speculativeReadMode}) # Ensure the node condition is as expected after relaunch confirmHeadLibAndForkDbHeadOfSpecMode(nodeToTest, headLibAndForkDbHeadBeforeSwitchMode) @@ -283,7 +289,7 @@ def switchSpecToIrrModeWithConnectedToProdNode(nodeIdOfNodeToTest, nodeToTest): # Kill and relaunch in irreversible mode nodeToTest.kill(signal.SIGTERM) waitForBlksProducedAndLibAdvanced() # Wait for some blks to be produced and lib advance - relaunchNode(nodeToTest, chainArg=" --read-mode irreversible") + relaunchNode(nodeToTest, addSwapFlags={"--read-mode": "irreversible"}) # Ensure the node condition is as expected after relaunch ensureHeadLibAndForkDbHeadIsAdvancing(nodeToTest) @@ -302,7 +308,7 @@ def switchIrrToSpecModeWithConnectedToProdNode(nodeIdOfNodeToTest, nodeToTest): # Kill and relaunch in irreversible mode nodeToTest.kill(signal.SIGTERM) waitForBlksProducedAndLibAdvanced() # Wait for some blks to be produced and lib advance) - relaunchNode(nodeToTest, addSwapFlags={"--read-mode": "head"}) + relaunchNode(nodeToTest, addSwapFlags={"--read-mode": speculativeReadMode}) # Ensure the node condition is as expected after relaunch ensureHeadLibAndForkDbHeadIsAdvancing(nodeToTest) @@ -360,7 +366,7 @@ def switchToSpecModeWithIrrModeSnapshot(nodeIdOfNodeToTest, nodeToTest): backupBlksDir(nodeIdOfNodeToTest) # Relaunch in irreversible mode and create the snapshot - relaunchNode(nodeToTest, chainArg=" --read-mode irreversible") + relaunchNode(nodeToTest, addSwapFlags={"--read-mode": "irreversible"}) confirmHeadLibAndForkDbHeadOfIrrMode(nodeToTest) nodeToTest.createSnapshot() nodeToTest.kill(signal.SIGTERM) @@ -368,7 +374,7 @@ def switchToSpecModeWithIrrModeSnapshot(nodeIdOfNodeToTest, nodeToTest): # Start from clean data dir, recover back up blocks, and then relaunch with irreversible snapshot removeState(nodeIdOfNodeToTest) recoverBackedupBlksDir(nodeIdOfNodeToTest) # this function will delete the existing blocks dir first - relaunchNode(nodeToTest, chainArg=" --snapshot {}".format(getLatestSnapshot(nodeIdOfNodeToTest)), addSwapFlags={"--read-mode": "head"}) + relaunchNode(nodeToTest, chainArg=" --snapshot {}".format(getLatestSnapshot(nodeIdOfNodeToTest)), addSwapFlags={"--read-mode": speculativeReadMode}) confirmHeadLibAndForkDbHeadOfSpecMode(nodeToTest) # Ensure it automatically replays "reversible blocks", i.e. head lib and fork db should be the same headLibAndForkDbHeadAfterRelaunch = getHeadLibAndForkDbHead(nodeToTest) @@ -405,6 +411,14 @@ def switchToSpecModeWithIrrModeSnapshot(nodeIdOfNodeToTest, nodeToTest): testSuccessful = testSuccessful and executeTest(8, replayInIrrModeWithoutRevBlksAndConnectedToProdNode) testSuccessful = testSuccessful and executeTest(9, switchToSpecModeWithIrrModeSnapshot) + # retest with read-mode speculative instead of head + speculativeReadMode="speculative" + testSuccessful = testSuccessful and executeTest(10, switchSpecToIrrMode) + testSuccessful = testSuccessful and executeTest(11, switchIrrToSpecMode) + testSuccessful = testSuccessful and executeTest(12, switchSpecToIrrModeWithConnectedToProdNode) + testSuccessful = testSuccessful and executeTest(13, switchIrrToSpecModeWithConnectedToProdNode) + testSuccessful = testSuccessful and executeTest(14, switchToSpecModeWithIrrModeSnapshot) + finally: TestHelper.shutdown(cluster, walletMgr, testSuccessful, killEosInstances, killWallet, keepLogs, killAll, dumpErrorDetails) # Print test result diff --git a/tests/nodeos_read_terminate_at_block_test.py b/tests/nodeos_read_terminate_at_block_test.py index da02db1cec..222d39b2e0 100755 --- a/tests/nodeos_read_terminate_at_block_test.py +++ b/tests/nodeos_read_terminate_at_block_test.py @@ -191,7 +191,7 @@ def checkHeadOrSpeculative(head, lib): 0 : "--enable-stale-production", 1 : "--read-mode irreversible --terminate-at-block 75", 2 : "--read-mode head --terminate-at-block 100", - 3 : "--read-mode head --terminate-at-block 125" + 3 : "--read-mode speculative --terminate-at-block 125" } # Kill any existing instances and launch cluster diff --git a/tests/plugin_http_api_test.py b/tests/plugin_http_api_test.py index 9573767ae0..b020039817 100755 --- a/tests/plugin_http_api_test.py +++ b/tests/plugin_http_api_test.py @@ -1341,8 +1341,8 @@ def test_DbSizeApi(self) : def test_prometheusApi(self) : resource = "prometheus" command = "metrics" - - ret_text = self.nodeos.processUrllibRequest(resource, command, returnType = ReturnType.raw ).decode() + endpointPrometheus = f'http://{self.nodeos.host}:9101' + ret_text = self.nodeos.processUrllibRequest(resource, command, returnType = ReturnType.raw, method="GET", endpoint=endpointPrometheus).decode() # filter out all empty lines or lines starting with '#' data_lines = filter(lambda line: len(line) > 0 and line[0]!='#', ret_text.split('\n')) # converting each line into a key value pair and then construct a dictionay out of all the pairs diff --git a/tests/ship_client.cpp b/tests/ship_client.cpp index 3c383b4074..63309a77d9 100644 --- a/tests/ship_client.cpp +++ b/tests/ship_client.cpp @@ -21,16 +21,6 @@ namespace ws = boost::beast::websocket; namespace bpo = boost::program_options; -/* Prior to boost 1.70, if socket type is not boost::asio::ip::tcp::socket nor boost::asio::ssl::stream beast requires - an overload of async_/teardown. This has been improved in 1.70+ to support any basic_stream_socket<> out of the box - which includes unix sockets. */ -#if BOOST_VERSION < 107000 -namespace boost::beast::websocket { -void teardown(role_type, unixs::socket& sock, error_code& ec) { - sock.close(ec); -} -} -#endif int main(int argc, char* argv[]) { boost::asio::io_context ctx;