diff --git a/Docker/builder/Dockerfile b/Docker/builder/Dockerfile index cf255c44123..3eec3d9f601 100644 --- a/Docker/builder/Dockerfile +++ b/Docker/builder/Dockerfile @@ -34,11 +34,14 @@ RUN wget https://dl.bintray.com/boostorg/release/1.67.0/source/boost_1_67_0.tar. --with-signals --with-serialization --with-chrono --with-test --with-context --with-locale --with-coroutine --with-iostreams toolset=clang link=static install \ && cd .. && rm -rf boost_1_67_0 -RUN wget https://github.com/mongodb/mongo-c-driver/releases/download/1.9.3/mongo-c-driver-1.9.3.tar.gz -O - | tar -xz \ - && cd mongo-c-driver-1.9.3 \ - && ./configure --enable-static --with-libbson=bundled --enable-ssl=openssl --disable-automatic-init-and-cleanup --prefix=/usr/local \ - && make -j$(nproc) install \ - && cd .. && rm -rf mongo-c-driver-1.9.3 +RUN wget https://github.com/mongodb/mongo-c-driver/releases/download/1.10.2/mongo-c-driver-1.10.2.tar.gz -O - | tar -xz \ + && cd mongo-c-driver-1.10.2 \ + && mkdir cmake-build && cd cmake-build \ + && cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/usr/local -DENABLE_BSON=ON \ + -DENABLE_SSL=OPENSSL -DENABLE_AUTOMATIC_INIT_AND_CLEANUP=OFF -DENABLE_STATIC=ON .. \ + && make -j$(nproc) \ + && make install \ + && cd ../../ && rm -rf mongo-c-driver-1.10.2 RUN git clone --depth 1 --single-branch --branch release_40 https://github.com/llvm-mirror/llvm.git \ && git clone --depth 1 --single-branch --branch release_40 https://github.com/llvm-mirror/clang.git llvm/tools/clang \ @@ -60,11 +63,12 @@ RUN git clone --depth 1 https://github.com/cryptonomex/secp256k1-zkp \ && make -j$(nproc) install \ && cd .. && rm -rf secp256k1-zkp -RUN git clone --depth 1 -b releases/v3.2 https://github.com/mongodb/mongo-cxx-driver \ - && cd mongo-cxx-driver \ - && cmake -H. -Bbuild -G Ninja -DBUILD_SHARED_LIBS=OFF -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/usr/local\ - && cmake --build build --target install \ - && cd .. && rm -rf mongo-cxx-driver +RUN git clone --depth 1 -b releases/v3.3 https://github.com/mongodb/mongo-cxx-driver \ + && cd mongo-cxx-driver/build \ + && cmake -DBUILD_SHARED_LIBS=OFF -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/usr/local .. \ + && make -j$(nproc) \ + && make install \ + && cd ../../ && rm -rf mongo-cxx-driver RUN git clone --depth 1 --single-branch --branch master https://github.com/ucb-bar/berkeley-softfloat-3.git \ && cd berkeley-softfloat-3/build/Linux-x86_64-GCC \ diff --git a/libraries/chain/apply_context.cpp b/libraries/chain/apply_context.cpp index a88906ed4f1..ec219d6924b 100644 --- a/libraries/chain/apply_context.cpp +++ b/libraries/chain/apply_context.cpp @@ -35,31 +35,30 @@ action_trace apply_context::exec_one() const auto& cfg = control.get_global_properties().configuration; try { - const auto &a = control.get_account(receiver); + const auto& a = control.get_account( receiver ); privileged = a.privileged; - auto native = control.find_apply_handler(receiver, act.account, act.name); + auto native = control.find_apply_handler( receiver, act.account, act.name ); if( native ) { - if( trx_context.can_subjectively_fail && control.is_producing_block() ) { + if( trx_context.can_subjectively_fail && control.is_producing_block()) { control.check_contract_list( receiver ); control.check_action_list( act.account, act.name ); } - (*native)(*this); + (*native)( *this ); } if( a.code.size() > 0 - && !(act.account == config::system_account_name && act.name == N(setcode) && receiver == config::system_account_name) ) - { - if( trx_context.can_subjectively_fail && control.is_producing_block() ) { + && !(act.account == config::system_account_name && act.name == N( setcode ) && + receiver == config::system_account_name)) { + if( trx_context.can_subjectively_fail && control.is_producing_block()) { control.check_contract_list( receiver ); control.check_action_list( act.account, act.name ); } try { - control.get_wasm_interface().apply(a.code_version, a.code, *this); - } catch ( const wasm_exit& ){} + control.get_wasm_interface().apply( a.code_version, a.code, *this ); + } catch( const wasm_exit& ) {} } - - } FC_CAPTURE_AND_RETHROW((_pending_console_output.str())); + } FC_RETHROW_EXCEPTIONS(warn, "pending console output: ${console}", ("console", _pending_console_output.str())) action_receipt r; r.receiver = receiver; diff --git a/libraries/chain/include/eosio/chain/transaction.hpp b/libraries/chain/include/eosio/chain/transaction.hpp index 55d65a0bd46..1d325352d0d 100644 --- a/libraries/chain/include/eosio/chain/transaction.hpp +++ b/libraries/chain/include/eosio/chain/transaction.hpp @@ -61,7 +61,8 @@ namespace eosio { namespace chain { flat_set get_signature_keys( const vector& signatures, const chain_id_type& chain_id, const vector& cfd = vector(), - bool allow_duplicate_keys = false )const; + bool allow_duplicate_keys = false, + bool use_cache = true )const; uint32_t total_actions()const { return context_free_actions.size() + actions.size(); } account_name first_authorizor()const { @@ -91,7 +92,7 @@ namespace eosio { namespace chain { const signature_type& sign(const private_key_type& key, const chain_id_type& chain_id); signature_type sign(const private_key_type& key, const chain_id_type& chain_id)const; - flat_set get_signature_keys( const chain_id_type& chain_id, bool allow_duplicate_keys = false )const; + flat_set get_signature_keys( const chain_id_type& chain_id, bool allow_duplicate_keys = false, bool use_cache = true )const; }; struct packed_transaction { diff --git a/libraries/chain/transaction.cpp b/libraries/chain/transaction.cpp index 07c3b94b85e..54915f4385b 100644 --- a/libraries/chain/transaction.cpp +++ b/libraries/chain/transaction.cpp @@ -80,7 +80,8 @@ digest_type transaction::sig_digest( const chain_id_type& chain_id, const vector return enc.result(); } -flat_set transaction::get_signature_keys( const vector& signatures, const chain_id_type& chain_id, const vector& cfd, bool allow_duplicate_keys )const +flat_set transaction::get_signature_keys( const vector& signatures, + const chain_id_type& chain_id, const vector& cfd, bool allow_duplicate_keys, bool use_cache )const { try { using boost::adaptors::transformed; @@ -90,14 +91,17 @@ flat_set transaction::get_signature_keys( const vector recovered_pub_keys; for(const signature_type& sig : signatures) { - recovery_cache_type::index::type::iterator it = recovery_cache.get().find(sig); - public_key_type recov; - if(it == recovery_cache.get().end() || it->trx_id != id()) { - recov = public_key_type(sig, digest); - recovery_cache.emplace_back( cached_pub_key{id(), recov, sig} ); //could fail on dup signatures; not a problem + if( use_cache ) { + recovery_cache_type::index::type::iterator it = recovery_cache.get().find( sig ); + if( it == recovery_cache.get().end() || it->trx_id != id()) { + recov = public_key_type( sig, digest ); + recovery_cache.emplace_back(cached_pub_key{id(), recov, sig} ); //could fail on dup signatures; not a problem + } else { + recov = it->pub_key; + } } else { - recov = it->pub_key; + recov = public_key_type( sig, digest ); } bool successful_insertion = false; std::tie(std::ignore, successful_insertion) = recovered_pub_keys.insert(recov); @@ -107,8 +111,10 @@ flat_set transaction::get_signature_keys( const vector recovery_cache_size) - recovery_cache.erase(recovery_cache.begin()); + if( use_cache ) { + while ( recovery_cache.size() > recovery_cache_size ) + recovery_cache.erase( recovery_cache.begin() ); + } return recovered_pub_keys; } FC_CAPTURE_AND_RETHROW() } @@ -123,9 +129,9 @@ signature_type signed_transaction::sign(const private_key_type& key, const chain return key.sign(sig_digest(chain_id, context_free_data)); } -flat_set signed_transaction::get_signature_keys( const chain_id_type& chain_id, bool allow_duplicate_keys )const +flat_set signed_transaction::get_signature_keys( const chain_id_type& chain_id, bool allow_duplicate_keys, bool use_cache )const { - return transaction::get_signature_keys(signatures, chain_id, context_free_data, allow_duplicate_keys); + return transaction::get_signature_keys(signatures, chain_id, context_free_data, allow_duplicate_keys, use_cache); } uint32_t packed_transaction::get_unprunable_size()const { diff --git a/libraries/chain/wasm_interface.cpp b/libraries/chain/wasm_interface.cpp index 18cf81362b7..f18cda8293f 100644 --- a/libraries/chain/wasm_interface.cpp +++ b/libraries/chain/wasm_interface.cpp @@ -1323,7 +1323,7 @@ class transaction_api : public context_aware_api { transaction trx; fc::raw::unpack(data, data_len, trx); context.schedule_deferred_transaction(sender_id, payer, std::move(trx), replace_existing); - } FC_CAPTURE_AND_RETHROW((fc::to_hex(data, data_len))); + } FC_RETHROW_EXCEPTIONS(warn, "data as hex: ${data}", ("data", fc::to_hex(data, data_len))) } bool cancel_deferred( const unsigned __int128& val ) { diff --git a/libraries/testing/tester.cpp b/libraries/testing/tester.cpp index cf41224b26d..d707be67323 100644 --- a/libraries/testing/tester.cpp +++ b/libraries/testing/tester.cpp @@ -286,7 +286,7 @@ namespace eosio { namespace testing { if( r->except_ptr ) std::rethrow_exception( r->except_ptr ); if( r->except ) throw *r->except; return r; - } FC_CAPTURE_AND_RETHROW( (transaction_header(trx.get_transaction())) ) } + } FC_RETHROW_EXCEPTIONS( warn, "transaction_header: ${header}", ("header", transaction_header(trx.get_transaction()) )) } transaction_trace_ptr base_tester::push_transaction( signed_transaction& trx, fc::time_point deadline, @@ -305,7 +305,9 @@ namespace eosio { namespace testing { if( r->except_ptr ) std::rethrow_exception( r->except_ptr ); if( r->except) throw *r->except; return r; - } FC_CAPTURE_AND_RETHROW( (transaction_header(trx))(billed_cpu_time_us) ) } + } FC_RETHROW_EXCEPTIONS( warn, "transaction_header: ${header}, billed_cpu_time_us: ${billed}", + ("header", transaction_header(trx) ) ("billed", billed_cpu_time_us)) + } typename base_tester::action_result base_tester::push_action(action&& act, uint64_t authorizer) { signed_transaction trx; diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index adaafacff5d..154a147956e 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -16,7 +16,7 @@ add_subdirectory(wallet_api_plugin) add_subdirectory(txn_test_gen_plugin) add_subdirectory(db_size_api_plugin) #add_subdirectory(faucet_testnet_plugin) -#add_subdirectory(mongo_db_plugin) +add_subdirectory(mongo_db_plugin) #add_subdirectory(sql_db_plugin) add_subdirectory(login_plugin) diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index b03d088ba73..c730edbb71a 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -1193,7 +1193,8 @@ read_only::abi_json_to_bin_result read_only::abi_json_to_bin( const read_only::a EOS_ASSERT(false, abi_not_found_exception, "No ABI found for ${contract}", ("contract", params.code)); } return result; -} FC_CAPTURE_AND_RETHROW( (params.code)(params.action)(params.args) ) +} FC_RETHROW_EXCEPTIONS( warn, "code: ${code}, action: ${action}, args: ${args}", + ("code", params.code)( "action", params.action )( "args", params.args )) read_only::abi_bin_to_json_result read_only::abi_bin_to_json( const read_only::abi_bin_to_json_params& params )const { abi_bin_to_json_result result; diff --git a/plugins/mongo_db_plugin/include/eosio/mongo_db_plugin/mongo_db_plugin.hpp b/plugins/mongo_db_plugin/include/eosio/mongo_db_plugin/mongo_db_plugin.hpp index 87fe0f49f69..6ee85f07bc8 100644 --- a/plugins/mongo_db_plugin/include/eosio/mongo_db_plugin/mongo_db_plugin.hpp +++ b/plugins/mongo_db_plugin/include/eosio/mongo_db_plugin/mongo_db_plugin.hpp @@ -14,20 +14,16 @@ using mongo_db_plugin_impl_ptr = std::shared_ptr; /** * Provides persistence to MongoDB for: - * Blocks - * Transactions - * Actions - * Accounts + * accounts + * actions + * block_states + * blocks + * transaction_traces + * transactions * * See data dictionary (DB Schema Definition - EOS API) for description of MongoDB schema. * - * The goal ultimately is for all chainbase data to be mirrored in MongoDB via a delayed node processing - * irreversible blocks. Currently, only Blocks, Transactions, Messages, and Account balance it mirrored. - * Chainbase is being rewritten to be multi-threaded. Once chainbase is stable, integration directly with - * a mirror database approach can be followed removing the need for the direct processing of Blocks employed - * with this implementation. - * - * If MongoDB env not available (#ifndef MONGODB) this plugin is a no-op. + * If cmake -DBUILD_MONGO_DB_PLUGIN=true not specified then this plugin not compiled/included. */ class mongo_db_plugin : public plugin { public: diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index 2ebe4720ee4..739ab2b479c 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -10,8 +10,11 @@ #include #include +#include #include +#include +#include #include #include #include @@ -20,11 +23,13 @@ #include #include -#include +#include #include #include #include +#include +#include namespace fc { class variant; } @@ -39,6 +44,7 @@ using chain::signed_transaction; using chain::signed_block; using chain::block_trace; using chain::transaction_id_type; +using chain::packed_transaction; static appbase::abstract_plugin& _mongo_db_plugin = app().register_plugin(); @@ -47,20 +53,33 @@ class mongo_db_plugin_impl { mongo_db_plugin_impl(); ~mongo_db_plugin_impl(); - void applied_block(const block_trace&); - void applied_irreversible_block(const signed_block&); - void process_block(const block_trace&, const signed_block&); - void _process_block(const block_trace&, const signed_block&); - void process_irreversible_block(const signed_block&); - void _process_irreversible_block(const signed_block&); + fc::optional accepted_block_connection; + fc::optional irreversible_block_connection; + fc::optional accepted_transaction_connection; + fc::optional applied_transaction_connection; + + void consume_blocks(); + + void accepted_block( const chain::block_state_ptr& ); + void applied_irreversible_block(const chain::block_state_ptr&); + void accepted_transaction(const chain::transaction_metadata_ptr&); + void applied_transaction(const chain::transaction_trace_ptr&); + void process_accepted_transaction(const chain::transaction_metadata_ptr&); + void _process_accepted_transaction(const chain::transaction_metadata_ptr&); + void process_applied_transaction(const chain::transaction_trace_ptr&); + void _process_applied_transaction(const chain::transaction_trace_ptr&); + void process_accepted_block( const chain::block_state_ptr& ); + void _process_accepted_block( const chain::block_state_ptr& ); + void process_irreversible_block(const chain::block_state_ptr&); + void _process_irreversible_block(const chain::block_state_ptr&); void init(); void wipe_database(); - static abi_def eos_abi; // cached for common use - bool configured{false}; bool wipe_database_on_startup{false}; + uint32_t start_block_num = 0; + bool start_block_reached = false; std::string db_name; mongocxx::instance mongo_inst; @@ -68,56 +87,96 @@ class mongo_db_plugin_impl { mongocxx::collection accounts; size_t queue_size = 0; - size_t processed = 0; - std::deque signed_block_queue; - std::deque signed_block_process_queue; - std::deque> block_trace_queue; - std::deque> block_trace_process_queue; - // transaction.id -> actions - std::map> reversible_actions; + std::deque transaction_metadata_queue; + std::deque transaction_metadata_process_queue; + std::deque transaction_trace_queue; + std::deque transaction_trace_process_queue; + std::deque block_state_queue; + std::deque block_state_process_queue; + std::deque irreversible_block_state_queue; + std::deque irreversible_block_state_process_queue; boost::mutex mtx; boost::condition_variable condition; boost::thread consume_thread; boost::atomic done{false}; boost::atomic startup{true}; - - void consume_blocks(); - - void update_account(const chain::action& msg); + fc::optional chain_id; static const account_name newaccount; - static const account_name transfer; static const account_name setabi; + static const std::string block_states_col; static const std::string blocks_col; static const std::string trans_col; + static const std::string trans_traces_col; static const std::string actions_col; - static const std::string action_traces_col; static const std::string accounts_col; }; const account_name mongo_db_plugin_impl::newaccount = "newaccount"; -const account_name mongo_db_plugin_impl::transfer = "transfer"; const account_name mongo_db_plugin_impl::setabi = "setabi"; -const std::string mongo_db_plugin_impl::blocks_col = "Blocks"; -const std::string mongo_db_plugin_impl::trans_col = "Transactions"; -const std::string mongo_db_plugin_impl::actions_col = "Actions"; -const std::string mongo_db_plugin_impl::action_traces_col = "ActionTraces"; -const std::string mongo_db_plugin_impl::accounts_col = "Accounts"; +const std::string mongo_db_plugin_impl::block_states_col = "block_states"; +const std::string mongo_db_plugin_impl::blocks_col = "blocks"; +const std::string mongo_db_plugin_impl::trans_col = "transactions"; +const std::string mongo_db_plugin_impl::trans_traces_col = "transaction_traces"; +const std::string mongo_db_plugin_impl::actions_col = "actions"; +const std::string mongo_db_plugin_impl::accounts_col = "accounts"; +namespace { -void mongo_db_plugin_impl::applied_irreversible_block(const signed_block& block) { - try { - if (startup) { - // on startup we don't want to queue, instead push back on caller - process_irreversible_block(block); +template +void queue(boost::mutex& mtx, boost::condition_variable& condition, Queue& queue, const Entry& e, size_t queue_size) { + int sleep_time = 100; + size_t last_queue_size = 0; + boost::mutex::scoped_lock lock(mtx); + if (queue.size() > queue_size) { + lock.unlock(); + condition.notify_one(); + if (last_queue_size < queue.size()) { + sleep_time += 100; } else { - boost::mutex::scoped_lock lock(mtx); - signed_block_queue.push_back(block); - lock.unlock(); - condition.notify_one(); + sleep_time -= 100; + if (sleep_time < 0) sleep_time = 100; } + last_queue_size = queue.size(); + boost::this_thread::sleep_for(boost::chrono::milliseconds(sleep_time)); + lock.lock(); + } + queue.emplace_back(e); + lock.unlock(); + condition.notify_one(); +} + +} + +void mongo_db_plugin_impl::accepted_transaction( const chain::transaction_metadata_ptr& t ) { + try { + queue( mtx, condition, transaction_metadata_queue, t, queue_size ); + } catch (fc::exception& e) { + elog("FC Exception while accepted_transaction ${e}", ("e", e.to_string())); + } catch (std::exception& e) { + elog("STD Exception while accepted_transaction ${e}", ("e", e.what())); + } catch (...) { + elog("Unknown exception while accepted_transaction"); + } +} + +void mongo_db_plugin_impl::applied_transaction( const chain::transaction_trace_ptr& t ) { + try { + queue( mtx, condition, transaction_trace_queue, t, queue_size ); + } catch (fc::exception& e) { + elog("FC Exception while applied_transaction ${e}", ("e", e.to_string())); + } catch (std::exception& e) { + elog("STD Exception while applied_transaction ${e}", ("e", e.what())); + } catch (...) { + elog("Unknown exception while applied_transaction"); + } +} + +void mongo_db_plugin_impl::applied_irreversible_block( const chain::block_state_ptr& bs ) { + try { + queue( mtx, condition, irreversible_block_state_queue, bs, queue_size ); } catch (fc::exception& e) { elog("FC Exception while applied_irreversible_block ${e}", ("e", e.to_string())); } catch (std::exception& e) { @@ -127,23 +186,15 @@ void mongo_db_plugin_impl::applied_irreversible_block(const signed_block& block) } } -void mongo_db_plugin_impl::applied_block(const block_trace& bt) { +void mongo_db_plugin_impl::accepted_block( const chain::block_state_ptr& bs ) { try { - if (startup) { - // on startup we don't want to queue, instead push back on caller - process_block(bt, bt.block); - } else { - boost::mutex::scoped_lock lock(mtx); - block_trace_queue.emplace_back(std::make_pair(bt, bt.block)); - lock.unlock(); - condition.notify_one(); - } + queue( mtx, condition, block_state_queue, bs, queue_size ); } catch (fc::exception& e) { - elog("FC Exception while applied_block ${e}", ("e", e.to_string())); + elog("FC Exception while accepted_block ${e}", ("e", e.to_string())); } catch (std::exception& e) { - elog("STD Exception while applied_block ${e}", ("e", e.what())); + elog("STD Exception while accepted_block ${e}", ("e", e.what())); } catch (...) { - elog("Unknown exception while applied_block"); + elog("Unknown exception while accepted_block"); } } @@ -151,45 +202,82 @@ void mongo_db_plugin_impl::consume_blocks() { try { while (true) { boost::mutex::scoped_lock lock(mtx); - while (signed_block_queue.empty() && block_trace_queue.empty() && !done) { + while ( transaction_metadata_queue.empty() && + transaction_trace_queue.empty() && + block_state_queue.empty() && + irreversible_block_state_queue.empty() && + !done ) { condition.wait(lock); } - // capture blocks for processing - size_t block_trace_size = block_trace_queue.size(); - if (block_trace_size > 0) { - block_trace_process_queue = move(block_trace_queue); - block_trace_queue.clear(); + + // capture for processing + size_t transaction_metadata_size = transaction_metadata_queue.size(); + if (transaction_metadata_size > 0) { + transaction_metadata_process_queue = move(transaction_metadata_queue); + transaction_metadata_queue.clear(); + } + size_t transaction_trace_size = transaction_trace_queue.size(); + if (transaction_trace_size > 0) { + transaction_trace_process_queue = move(transaction_trace_queue); + transaction_trace_queue.clear(); + } + size_t block_state_size = block_state_queue.size(); + if (block_state_size > 0) { + block_state_process_queue = move(block_state_queue); + block_state_queue.clear(); } - size_t signed_block_size = signed_block_queue.size(); - if (signed_block_size > 0) { - signed_block_process_queue = move(signed_block_queue); - signed_block_queue.clear(); + size_t irreversible_block_size = irreversible_block_state_queue.size(); + if (irreversible_block_size > 0) { + irreversible_block_state_process_queue = move(irreversible_block_state_queue); + irreversible_block_state_queue.clear(); } lock.unlock(); // warn if queue size greater than 75% - if (signed_block_size > (queue_size * 0.75) || block_trace_size > (queue_size * 0.75)) { - wlog("queue size: ${q}", ("q", signed_block_size + block_trace_size + 1)); + if( transaction_metadata_size > (queue_size * 0.75) || + transaction_trace_size > (queue_size * 0.75) || + block_state_size > (queue_size * 0.75) || + irreversible_block_size > (queue_size * 0.75)) { + wlog("queue size: ${q}", ("q", transaction_metadata_size + transaction_trace_size + block_state_size + irreversible_block_size)); } else if (done) { - ilog("draining queue, size: ${q}", ("q", signed_block_size + block_trace_size + 1)); + ilog("draining queue, size: ${q}", ("q", transaction_metadata_size + transaction_trace_size + block_state_size + irreversible_block_size)); } - // process block traces - while (!block_trace_process_queue.empty()) { - const auto& bt_pair = block_trace_process_queue.front(); - process_block(bt_pair.first, bt_pair.second); - block_trace_process_queue.pop_front(); + // process transactions + while (!transaction_metadata_process_queue.empty()) { + const auto& t = transaction_metadata_process_queue.front(); + process_accepted_transaction(t); + transaction_metadata_process_queue.pop_front(); + } + + while (!transaction_trace_process_queue.empty()) { + const auto& t = transaction_trace_process_queue.front(); + process_applied_transaction(t); + transaction_trace_process_queue.pop_front(); } // process blocks - while (!signed_block_process_queue.empty()) { - const signed_block& block = signed_block_process_queue.front(); - process_irreversible_block(block); - signed_block_process_queue.pop_front(); + while (!block_state_process_queue.empty()) { + const auto& bs = block_state_process_queue.front(); + process_accepted_block( bs ); + block_state_process_queue.pop_front(); + } + + // process irreversible blocks + while (!irreversible_block_state_process_queue.empty()) { + const auto& bs = irreversible_block_state_process_queue.front(); + process_irreversible_block(bs); + irreversible_block_state_process_queue.pop_front(); } - if (signed_block_size == 0 && block_trace_size == 0 && done) break; + if( transaction_metadata_size == 0 && + transaction_trace_size == 0 && + block_state_size == 0 && + irreversible_block_size == 0 && + done ) { + break; + } } ilog("mongo_db_plugin consume thread shutdown gracefully"); } catch (fc::exception& e) { @@ -204,495 +292,664 @@ void mongo_db_plugin_impl::consume_blocks() { namespace { auto find_account(mongocxx::collection& accounts, const account_name& name) { - using bsoncxx::builder::stream::document; - document find_acc{}; - find_acc << "name" << name.to_string(); - auto account = accounts.find_one(find_acc.view()); - if (!account) { - FC_THROW("Unable to find account ${n}", ("n", name)); + using bsoncxx::builder::basic::make_document; + using bsoncxx::builder::basic::kvp; + return accounts.find_one( make_document( kvp( "name", name.to_string()))); + } + + auto find_transaction(mongocxx::collection& trans, const string& id) { + using bsoncxx::builder::basic::make_document; + using bsoncxx::builder::basic::kvp; + return trans.find_one( make_document( kvp( "trx_id", id ))); + } + + auto find_block(mongocxx::collection& blocks, const string& id) { + using bsoncxx::builder::basic::make_document; + using bsoncxx::builder::basic::kvp; + return blocks.find_one( make_document( kvp( "block_id", id ))); + } + + optional get_abi_serializer( account_name n, mongocxx::collection& accounts ) { + using bsoncxx::builder::basic::kvp; + using bsoncxx::builder::basic::make_document; + if( n.good()) { + try { + auto account = accounts.find_one( make_document( kvp("name", n.to_string())) ); + if(account) { + auto view = account->view(); + abi_def abi; + if( view.find( "abi" ) != view.end()) { + try { + abi = fc::json::from_string( bsoncxx::to_json( view["abi"].get_document())).as(); + } catch (...) { + ilog( "Unable to convert account abi to abi_def for ${n}", ( "n", n )); + return optional(); + } + return abi_serializer( abi ); + } + } + } FC_CAPTURE_AND_LOG((n)) } - return *account; + return optional(); + } + + template + fc::variant to_variant_with_abi( const T& obj, mongocxx::collection& accounts ) { + fc::variant pretty_output; + abi_serializer::to_variant( obj, pretty_output, [&]( account_name n ) { return get_abi_serializer( n, accounts ); } ); + return pretty_output; } - auto find_transaction(mongocxx::collection& transactions, const string& id) { - using bsoncxx::builder::stream::document; - document find_trans{}; - find_trans << "transaction_id" << id; - auto transaction = transactions.find_one(find_trans.view()); - if (!transaction) { - FC_THROW("Unable to find transaction ${id}", ("id", id)); +void handle_mongo_exception( const std::string& desc, int line_num ) { + bool shutdown = true; + try { + try { + throw; + } catch( mongocxx::logic_error& e) { + // logic_error on invalid key, do not shutdown + wlog( "mongo logic error, ${desc}, line ${line}, code ${code}, ${what}", + ("desc", desc)( "line", line_num )( "code", e.code().value() )( "what", e.what() )); + shutdown = false; + } catch( mongocxx::operation_exception& e) { + elog( "mongo exception, ${desc}, line ${line}, code ${code}, ${details}", + ("desc", desc)( "line", line_num )( "code", e.code().value() )( "details", e.code().message() )); + if (e.raw_server_error()) { + elog( "mongo exception, ${desc}, line ${line}, ${details}", + ("desc", desc)( "line", line_num )( "details", bsoncxx::to_json(e.raw_server_error()->view()))); + } + } catch( mongocxx::exception& e) { + elog( "mongo exception, ${desc}, line ${line}, code ${code}, ${what}", + ("desc", desc)( "line", line_num )( "code", e.code().value() )( "what", e.what() )); + } catch( bsoncxx::exception& e) { + elog( "bsoncxx exception, ${desc}, line ${line}, code ${code}, ${what}", + ("desc", desc)( "line", line_num )( "code", e.code().value() )( "what", e.what() )); + } catch( fc::exception& er ) { + elog( "mongo fc exception, ${desc}, line ${line}, ${details}", + ("desc", desc)( "line", line_num )( "details", er.to_detail_string())); + } catch( const std::exception& e ) { + elog( "mongo std exception, ${desc}, line ${line}, ${what}", + ("desc", desc)( "line", line_num )( "what", e.what())); + } catch( ... ) { + elog( "mongo unknown exception, ${desc}, line ${line_nun}", ("desc", desc)( "line_num", line_num )); } - return *transaction; + } catch (...) { + std::cerr << "Exception attempting to handle exception for " << desc << " " << line_num << std::endl; } - auto find_block(mongocxx::collection& blocks, const string& id) { - using bsoncxx::builder::stream::document; - document find_block{}; - find_block << "block_id" << id; - auto block = blocks.find_one(find_block.view()); - if (!block) { - FC_THROW("Unable to find block ${id}", ("id", id)); - } - return *block; - } - - void add_data(bsoncxx::builder::basic::document& msg_doc, - mongocxx::collection& accounts, - const chain::action& msg) - { - using bsoncxx::builder::basic::kvp; - try { - auto from_account = find_account(accounts, msg.account); - abi_def abi; - if (from_account.view().find("abi") != from_account.view().end()) { - abi = fc::json::from_string(bsoncxx::to_json(from_account.view()["abi"].get_document())).as(); - } - abi_serializer abis; - if (msg.account == chain::config::system_account_name) { - abi = chain::eosio_contract_abi(abi); - } - abis.set_abi(abi); - auto v = abis.binary_to_variant(abis.get_action_type(msg.name), msg.data); - auto json = fc::json::to_string(v); - try { - const auto& value = bsoncxx::from_json(json); - msg_doc.append(kvp("data", value)); - return; - } catch (std::exception& e) { - elog("Unable to convert EOS JSON to MongoDB JSON: ${e}", ("e", e.what())); - elog(" EOS JSON: ${j}", ("j", json)); - } - } catch (fc::exception& e) { - elog("Unable to convert action.data to ABI: ${s} :: ${n}, what: ${e}", ("s", msg.account)("n", msg.name)("e", e.to_string())); - } catch (std::exception& e) { - elog("Unable to convert action.data to ABI: ${s} :: ${n}, std what: ${e}", ("s", msg.account)("n", msg.name)("e", e.what())); - } catch (...) { - elog("Unable to convert action.data to ABI: ${s} :: ${n}, unknown exception", ("s", msg.account)("n", msg.name)); - } - // if anything went wrong just store raw hex_data - msg_doc.append(kvp("hex_data", fc::variant(msg.data).as_string())); - } + if( shutdown ) { + // shutdown if mongo failed to provide opportunity to fix issue and restart + app().quit(); + } +} - void verify_last_block(mongocxx::collection& blocks, const std::string& prev_block_id) { - mongocxx::options::find opts; - opts.sort(bsoncxx::from_json(R"xxx({ "_id" : -1 })xxx")); - auto last_block = blocks.find_one({}, opts); - if (!last_block) { - FC_THROW("No blocks found in database"); - } - const auto id = last_block->view()["block_id"].get_utf8().value.to_string(); - if (id != prev_block_id) { - FC_THROW("Did not find expected block ${pid}, instead found ${id}", ("pid", prev_block_id)("id", id)); - } - } + void update_account(mongocxx::collection& accounts, const chain::action& act) { + using bsoncxx::builder::basic::kvp; + using bsoncxx::builder::basic::make_document; + using namespace bsoncxx::types; + + if (act.account != chain::config::system_account_name) + return; - void verify_no_blocks(mongocxx::collection& blocks) { - if (blocks.count(bsoncxx::from_json("{}")) > 0) { - FC_THROW("Existing blocks found in database"); + try { + if( act.name == mongo_db_plugin_impl::newaccount ) { + auto now = std::chrono::duration_cast( + std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()} ); + auto newaccount = act.data_as(); + + // create new account + if( !accounts.insert_one( make_document( kvp( "name", newaccount.name.to_string()), + kvp( "createdAt", b_date{now} )))) { + elog( "Failed to insert account ${n}", ("n", newaccount.name)); + } + + } else if( act.name == mongo_db_plugin_impl::setabi ) { + auto now = std::chrono::duration_cast( + std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()} ); + auto setabi = act.data_as(); + auto from_account = find_account( accounts, setabi.account ); + if( !from_account ) { + if( !accounts.insert_one( make_document( kvp( "name", setabi.account.to_string()), + kvp( "createdAt", b_date{now} )))) { + elog( "Failed to insert account ${n}", ("n", setabi.account)); + } + from_account = find_account( accounts, setabi.account ); + } + if( from_account ) { + const abi_def& abi_def = fc::raw::unpack( setabi.abi ); + const string json_str = fc::json::to_string( abi_def ); + + try{ + auto update_from = make_document( + kvp( "$set", make_document( kvp( "abi", bsoncxx::from_json( json_str )), + kvp( "updatedAt", b_date{now} )))); + + try { + if( !accounts.update_one( make_document( kvp( "_id", from_account->view()["_id"].get_oid())), + update_from.view())) { + FC_ASSERT( false, "Failed to udpdate account ${n}", ("n", setabi.account)); + } + } catch( ... ) { + handle_mongo_exception( "account update", __LINE__ ); + } + } catch( bsoncxx::exception& e ) { + elog( "Unable to convert abi JSON to MongoDB JSON: ${e}", ("e", e.what())); + elog( " JSON: ${j}", ("j", json_str)); + } + } + } + } catch( fc::exception& e ) { + // if unable to unpack native type, skip account creation } } + +void add_data( bsoncxx::builder::basic::document& act_doc, mongocxx::collection& accounts, const chain::action& act ) { + using bsoncxx::builder::basic::kvp; + using bsoncxx::builder::basic::make_document; + try { + if( act.account == chain::config::system_account_name ) { + if( act.name == mongo_db_plugin_impl::setabi ) { + auto setabi = act.data_as(); + try { + const abi_def& abi_def = fc::raw::unpack( setabi.abi ); + const string json_str = fc::json::to_string( abi_def ); + + act_doc.append( + kvp( "data", make_document( kvp( "account", setabi.account.to_string()), + kvp( "abi_def", bsoncxx::from_json( json_str ))))); + return; + } catch( bsoncxx::exception& ) { + // better error handling below + } catch( fc::exception& e ) { + ilog( "Unable to convert action abi_def to json for ${n}", ("n", setabi.account.to_string())); + } + } + } + auto account = find_account( accounts, act.account ); + if( account ) { + auto from_account = *account; + abi_def abi; + if( from_account.view().find( "abi" ) != from_account.view().end()) { + try { + abi = fc::json::from_string( bsoncxx::to_json( from_account.view()["abi"].get_document())).as(); + } catch( ... ) { + ilog( "Unable to convert account abi to abi_def for ${s}::${n}", ("s", act.account)( "n", act.name )); + } + } + string json; + try { + abi_serializer abis; + abis.set_abi( abi ); + auto v = abis.binary_to_variant( abis.get_action_type( act.name ), act.data ); + json = fc::json::to_string( v ); + + const auto& value = bsoncxx::from_json( json ); + act_doc.append( kvp( "data", value )); + return; + } catch( bsoncxx::exception& e ) { + ilog( "Unable to convert EOS JSON to MongoDB JSON: ${e}", ("e", e.what())); + ilog( " EOS JSON: ${j}", ("j", json)); + ilog( " Storing data has hex." ); + } + } + } catch( std::exception& e ) { + ilog( "Unable to convert action.data to ABI: ${s}::${n}, std what: ${e}", + ("s", act.account)( "n", act.name )( "e", e.what())); + } catch( ... ) { + ilog( "Unable to convert action.data to ABI: ${s}::${n}, unknown exception", + ("s", act.account)( "n", act.name )); + } + // if anything went wrong just store raw hex_data + act_doc.append( kvp( "hex_data", fc::variant( act.data ).as_string())); +} + } -void mongo_db_plugin_impl::process_irreversible_block(const signed_block& block) { +void mongo_db_plugin_impl::process_accepted_transaction( const chain::transaction_metadata_ptr& t ) { + try { + // always call since we need to capture setabi on accounts even if not storing transactions + _process_accepted_transaction(t); + } catch (fc::exception& e) { + elog("FC Exception while processing accepted transaction metadata: ${e}", ("e", e.to_detail_string())); + } catch (std::exception& e) { + elog("STD Exception while processing accepted tranasction metadata: ${e}", ("e", e.what())); + } catch (...) { + elog("Unknown exception while processing accepted transaction metadata"); + } +} + +void mongo_db_plugin_impl::process_applied_transaction( const chain::transaction_trace_ptr& t ) { + try { + if( start_block_reached ) { + _process_applied_transaction( t ); + } + } catch (fc::exception& e) { + elog("FC Exception while processing applied transaction trace: ${e}", ("e", e.to_detail_string())); + } catch (std::exception& e) { + elog("STD Exception while processing applied transaction trace: ${e}", ("e", e.what())); + } catch (...) { + elog("Unknown exception while processing applied transaction trace"); + } +} + +void mongo_db_plugin_impl::process_irreversible_block(const chain::block_state_ptr& bs) { try { - _process_irreversible_block(block); + if( start_block_reached ) { + _process_irreversible_block( bs ); + } } catch (fc::exception& e) { - elog("FC Exception while processing block ${e}", ("e", e.to_string())); + elog("FC Exception while processing irreversible block: ${e}", ("e", e.to_detail_string())); } catch (std::exception& e) { - elog("STD Exception while processing block ${e}", ("e", e.what())); + elog("STD Exception while processing irreversible block: ${e}", ("e", e.what())); } catch (...) { - elog("Unknown exception while processing block"); + elog("Unknown exception while processing irreversible block"); } } -void mongo_db_plugin_impl::process_block(const block_trace& bt, const signed_block& block) { +void mongo_db_plugin_impl::process_accepted_block( const chain::block_state_ptr& bs ) { try { - _process_block(bt, block); + if( !start_block_reached ) { + if( bs->block_num >= start_block_num ) { + start_block_reached = true; + } + } + if( start_block_reached ) { + _process_accepted_block( bs ); + } } catch (fc::exception& e) { - elog("FC Exception while processing block trace ${e}", ("e", e.to_string())); + elog("FC Exception while processing accepted block trace ${e}", ("e", e.to_string())); } catch (std::exception& e) { - elog("STD Exception while processing block trace ${e}", ("e", e.what())); + elog("STD Exception while processing accepted block trace ${e}", ("e", e.what())); } catch (...) { - elog("Unknown exception while processing trace block"); + elog("Unknown exception while processing accepted block trace"); } } -void mongo_db_plugin_impl::_process_block(const block_trace& bt, const signed_block& block) { - // note bt.block is invalid at this point since it is a reference to internal chainbase block +void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transaction_metadata_ptr& t ) { using namespace bsoncxx::types; - using namespace bsoncxx::builder; using bsoncxx::builder::basic::kvp; + using bsoncxx::builder::basic::make_document; + using bsoncxx::builder::basic::make_array; + namespace bbb = bsoncxx::builder::basic; + + auto trans = mongo_conn[db_name][trans_col]; + auto actions = mongo_conn[db_name][actions_col]; + accounts = mongo_conn[db_name][accounts_col]; + auto trans_doc = bsoncxx::builder::basic::document{}; + auto now = std::chrono::duration_cast( + std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); + + const auto trx_id = t->id; + const auto trx_id_str = trx_id.str(); + const auto& trx = t->trx; + const chain::transaction_header& trx_header = trx; + + bool actions_to_write = false; mongocxx::options::bulk_write bulk_opts; bulk_opts.ordered(false); - mongocxx::bulk_write bulk_trans{bulk_opts}; + mongocxx::bulk_write bulk_actions = actions.create_bulk_write(bulk_opts); + + int32_t act_num = 0; + auto process_action = [&](const std::string& trx_id_str, const chain::action& act, bbb::array& act_array, bool cfa) -> auto { + auto act_doc = bsoncxx::builder::basic::document(); + if( start_block_reached ) { + act_doc.append( kvp( "action_num", b_int32{act_num} ), + kvp( "trx_id", trx_id_str )); + act_doc.append( kvp( "cfa", b_bool{cfa} )); + act_doc.append( kvp( "account", act.account.to_string())); + act_doc.append( kvp( "name", act.name.to_string())); + act_doc.append( kvp( "authorization", [&act]( bsoncxx::builder::basic::sub_array subarr ) { + for( const auto& auth : act.authorization ) { + subarr.append( [&auth]( bsoncxx::builder::basic::sub_document subdoc ) { + subdoc.append( kvp( "actor", auth.actor.to_string()), + kvp( "permission", auth.permission.to_string())); + } ); + } + } )); + } + try { + update_account( accounts, act ); + } catch (...) { + ilog( "Unable to update account for ${s}::${n}", ("s", act.account)( "n", act.name )); + } + if( start_block_reached ) { + add_data( act_doc, accounts, act ); + act_array.append( act_doc ); + mongocxx::model::insert_one insert_op{act_doc.view()}; + bulk_actions.append( insert_op ); + actions_to_write = true; + } + ++act_num; + return act_num; + }; - auto blocks = mongo_conn[db_name][blocks_col]; // Blocks - auto trans = mongo_conn[db_name][trans_col]; // Transactions - auto msgs = mongo_conn[db_name][actions_col]; // Actions - auto action_traces = mongo_conn[db_name][action_traces_col]; // ActionTraces + if( start_block_reached ) { + trans_doc.append( kvp( "trx_id", trx_id_str ), + kvp( "irreversible", b_bool{false} )); - auto block_doc = bsoncxx::builder::basic::document{}; - const auto block_id = block.id(); - const auto block_id_str = block_id.str(); - const auto prev_block_id_str = block.previous.str(); - auto block_num = block.block_num(); - - if (processed == 0) { - if (wipe_database_on_startup) { - // verify on start we have no previous blocks - verify_no_blocks(blocks); - FC_ASSERT(block_num < 2, "Expected start of block, instead received block_num: ${bn}", ("bn", block_num)); + string signing_keys_json; + if( t->signing_keys.valid()) { + signing_keys_json = fc::json::to_string( t->signing_keys->second ); } else { - // verify on restart we have previous block - verify_last_block(blocks, prev_block_id_str); + auto signing_keys = trx.get_signature_keys( *chain_id, false, false ); + if( !signing_keys.empty()) { + signing_keys_json = fc::json::to_string( signing_keys ); + } + } + string trx_header_json = fc::json::to_string( trx_header ); + + try { + const auto& trx_header_value = bsoncxx::from_json( trx_header_json ); + trans_doc.append( kvp( "transaction_header", trx_header_value )); + } catch( bsoncxx::exception& ) { + try { + trx_header_json = fc::prune_invalid_utf8( trx_header_json ); + const auto& trx_header_value = bsoncxx::from_json( trx_header_json ); + trans_doc.append( kvp( "transaction_header", trx_header_value )); + trans_doc.append( kvp( "non-utf8-purged", b_bool{true})); + } catch( bsoncxx::exception& e ) { + elog( "Unable to convert transaction header JSON to MongoDB JSON: ${e}", ("e", e.what())); + elog( " JSON: ${j}", ("j", trx_header_json)); + } + } + if( !signing_keys_json.empty()) { + try { + const auto& keys_value = bsoncxx::from_json( signing_keys_json ); + trans_doc.append( kvp( "signing_keys", keys_value )); + } catch( bsoncxx::exception& e ) { + // should never fail, so don't attempt to remove invalid utf8 + elog( "Unable to convert signing keys JSON to MongoDB JSON: ${e}", ("e", e.what())); + elog( " JSON: ${j}", ("j", signing_keys_json)); + } } } - auto now = std::chrono::duration_cast( - std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); - - block_doc.append(kvp("block_num", b_int32{static_cast(block_num)}), - kvp("block_id", block_id_str), - kvp("prev_block_id", prev_block_id_str), - kvp("timestamp", b_date{std::chrono::milliseconds{ - std::chrono::seconds{block.timestamp.operator fc::time_point().sec_since_epoch()}}}), - kvp("transaction_merkle_root", block.transaction_mroot.str()), - kvp("producer_account_id", block.producer.to_string()), - kvp("pending", b_bool{true})); - block_doc.append(kvp("createdAt", b_date{now})); - - if (!blocks.insert_one(block_doc.view())) { - elog("Failed to insert block ${bid}", ("bid", block_id)); + if( !trx.actions.empty()) { + bsoncxx::builder::basic::array action_array; + for( const auto& act : trx.actions ) { + process_action( trx_id_str, act, action_array, false ); + } + trans_doc.append( kvp( "actions", action_array )); } - int32_t msg_num = -1; - bool actions_to_write = false; - auto process_action = [&](const std::string& trans_id_str, mongocxx::bulk_write& bulk_msgs, const chain::action& msg) -> auto { - auto msg_oid = bsoncxx::oid{}; - auto msg_doc = bsoncxx::builder::basic::document{}; - msg_doc.append(kvp("_id", b_oid{msg_oid}), - kvp("action_id", b_int32{msg_num}), - kvp("transaction_id", trans_id_str)); - msg_doc.append(kvp("authorization", [&msg](bsoncxx::builder::basic::sub_array subarr) { - for (const auto& auth : msg.authorization) { - subarr.append([&auth](bsoncxx::builder::basic::sub_document subdoc) { - subdoc.append(kvp("actor", auth.actor.to_string()), - kvp("permission", auth.permission.to_string())); - }); - } - })); - msg_doc.append(kvp("handler_account_name", msg.account.to_string())); - msg_doc.append(kvp("name", msg.name.to_string())); - add_data(msg_doc, accounts, msg); - msg_doc.append(kvp("createdAt", b_date{now})); - mongocxx::model::insert_one insert_msg{msg_doc.view()}; - bulk_msgs.append(insert_msg); - actions_to_write = true; - ++msg_num; - return msg_oid; - }; - - bool action_traces_to_write = false; - auto process_action_trace = [&](const std::string& trans_id_str, - mongocxx::bulk_write& bulk_acts, - const chain::action_trace& act, - const auto& msg_oid) - { - auto act_oid = bsoncxx::oid{}; - auto act_doc = bsoncxx::builder::basic::document{}; - act_doc.append(kvp("_id", b_oid{act_oid}), - kvp("transaction_id", trans_id_str), - kvp("receiver", act.receiver.to_string()), - kvp("action", b_oid{msg_oid}), - kvp("console", act.console)); - act_doc.append(kvp("data_access", [&act](bsoncxx::builder::basic::sub_array subarr) { - for (const auto& data : act.data_access) { - subarr.append([&data](bsoncxx::builder::basic::sub_document subdoc) { - subdoc.append(kvp("type", data.type == chain::data_access_info::read ? "read" : "write"), - kvp("code", data.code.to_string()), - kvp("scope", data.scope.to_string()), - kvp("sequence", b_int64{static_cast(data.sequence)})); - }); - } - })); - act_doc.append(kvp("createdAt", b_date{now})); - mongocxx::model::insert_one insert_act{act_doc.view()}; - bulk_acts.append(insert_act); - action_traces_to_write = true; - }; + if( start_block_reached ) { + act_num = 0; + if( !trx.context_free_actions.empty()) { + bsoncxx::builder::basic::array action_array; + for( const auto& cfa : trx.context_free_actions ) { + process_action( trx_id_str, cfa, action_array, true ); + } + trans_doc.append( kvp( "context_free_actions", action_array )); + } - int32_t trx_num = 0; - std::map trx_status_map; - bool transactions_in_block = false; + string trx_extensions_json = fc::json::to_string( trx.transaction_extensions ); + string trx_signatures_json = fc::json::to_string( trx.signatures ); + string trx_context_free_data_json = fc::json::to_string( trx.context_free_data ); + + try { + if( !trx_extensions_json.empty()) { + try { + const auto& trx_extensions_value = bsoncxx::from_json( trx_extensions_json ); + trans_doc.append( kvp( "transaction_extensions", trx_extensions_value )); + } catch( bsoncxx::exception& ) { + static_assert( sizeof(std::remove_pointer::type) == sizeof(std::string::value_type), "string type not storable as b_binary" ); + trans_doc.append( kvp( "transaction_extensions", + b_binary{bsoncxx::binary_sub_type::k_binary, + static_cast(trx_extensions_json.size()), + reinterpret_cast(trx_extensions_json.data())} )); + } + } else { + trans_doc.append( kvp( "transaction_extensions", make_array())); + } - auto process_trx = [&](const chain::transaction& trx) -> auto { - auto txn_oid = bsoncxx::oid{}; - auto doc = bsoncxx::builder::basic::document{}; - auto trx_id = trx.id(); - const auto trans_id_str = trx_id.str(); - doc.append(kvp("_id", txn_oid), - kvp("transaction_id", trans_id_str), - kvp("sequence_num", b_int32{trx_num}), - kvp("block_id", block_id_str), - kvp("ref_block_num", b_int32{static_cast(trx.ref_block_num)}), - kvp("ref_block_prefix", b_int32{static_cast(trx.ref_block_prefix)}), - kvp("status", trx_status_map[trx_id]), - kvp("expiration", - b_date{std::chrono::milliseconds{std::chrono::seconds{trx.expiration.sec_since_epoch()}}}), - kvp("pending", b_bool{true}) - ); - doc.append(kvp("createdAt", b_date{now})); - - if (!trx.actions.empty()) { - mongocxx::bulk_write bulk_msgs{bulk_opts}; - msg_num = 0; - for (const auto& msg : trx.actions) { - process_action(trans_id_str, bulk_msgs, msg); - } - auto result = msgs.bulk_write(bulk_msgs); - if (!result) { - elog("Bulk action insert failed for block: ${bid}, transaction: ${trx}", - ("bid", block_id)("trx", trx_id)); - } - } - transactions_in_block = true; - return doc; - }; + if( !trx_signatures_json.empty()) { + // signatures contain only utf8 + const auto& trx_signatures_value = bsoncxx::from_json( trx_signatures_json ); + trans_doc.append( kvp( "signatures", trx_signatures_value )); + } else { + trans_doc.append( kvp( "signatures", make_array())); + } - mongocxx::bulk_write bulk_msgs{bulk_opts}; - mongocxx::bulk_write bulk_acts{bulk_opts}; - trx_num = 1000000; - for (const auto& rt: bt.region_traces) { - for (const auto& ct: rt.cycle_traces) { - for (const auto& st: ct.shard_traces) { - for (const auto& trx_trace: st.transaction_traces) { - std::string trx_status = (trx_trace.status == chain::transaction_receipt::executed) ? "executed" : - (trx_trace.status == chain::transaction_receipt::soft_fail) ? "soft_fail" : - (trx_trace.status == chain::transaction_receipt::hard_fail) ? "hard_fail" : - "unknown"; - trx_status_map[trx_trace.id] = trx_status; - - for (const auto& req : trx_trace.deferred_transaction_requests) { - if ( req.contains() ) { - auto trx = req.get(); - auto doc = process_trx(trx); - doc.append(kvp("type", "deferred"), - kvp("sender_id", b_int64{static_cast(trx.sender_id)}), - kvp("sender", trx.sender.to_string()), - kvp("execute_after", b_date{std::chrono::milliseconds{ - std::chrono::seconds{trx.execute_after.sec_since_epoch()}}})); - mongocxx::model::insert_one insert_op{doc.view()}; - bulk_trans.append(insert_op); - ++trx_num; - } else { - auto cancel = req.get(); - auto doc = bsoncxx::builder::basic::document{}; - doc.append(kvp("type", "cancel_deferred"), - kvp("sender_id", b_int64{static_cast(cancel.sender_id)}), - kvp("sender", cancel.sender.to_string()) - ); - } - } - if (!trx_trace.action_traces.empty()) { - actions_to_write = true; - msg_num = 1000000; - for (const auto& act_trace : trx_trace.action_traces) { - const auto& msg = act_trace.act; - auto msg_oid = process_action(trx_trace.id.str(), bulk_msgs, msg); - if (trx_trace.status == chain::transaction_receipt::executed) { - if (act_trace.receiver == chain::config::system_account_name) { - reversible_actions[trx_trace.id.str()].emplace_back(msg); - } - } - process_action_trace(trx_trace.id.str(), bulk_acts, act_trace, msg_oid); - } + if( !trx_context_free_data_json.empty()) { + try { + const auto& trx_context_free_data_value = bsoncxx::from_json( trx_context_free_data_json ); + trans_doc.append( kvp( "context_free_data", trx_context_free_data_value )); + } catch( bsoncxx::exception& ) { + static_assert( sizeof(std::remove_pointer::type) == + sizeof(std::remove_pointer::type), "context_free_data not storable as b_binary" ); + bsoncxx::builder::basic::array data_array; + for (auto& cfd : trx.context_free_data) { + data_array.append( + b_binary{bsoncxx::binary_sub_type::k_binary, + static_cast(cfd.size()), + reinterpret_cast(cfd.data())} ); } + trans_doc.append( kvp( "context_free_data", data_array.view() )); + } + } else { + trans_doc.append( kvp( "context_free_data", make_array())); + } + } catch( std::exception& e ) { + elog( "Unable to convert transaction JSON to MongoDB JSON: ${e}", ("e", e.what())); + elog( " JSON: ${j}", ("j", trx_extensions_json)); + elog( " JSON: ${j}", ("j", trx_signatures_json)); + elog( " JSON: ${j}", ("j", trx_context_free_data_json)); + } + + trans_doc.append( kvp( "createdAt", b_date{now} )); + + try { + if( !trans.insert_one( trans_doc.view())) { + FC_ASSERT( false, "Failed to insert trans ${id}", ("id", trx_id)); + } + } catch(...) { + handle_mongo_exception("trans insert", __LINE__); + } - // TODO: handle canceled_deferred + if (actions_to_write) { + try { + if( !bulk_actions.execute() ) { + FC_ASSERT( false, "Bulk actions insert failed for transaction: ${id}", ("id", trx_id_str)); } + } catch(...) { + handle_mongo_exception("actions insert", __LINE__); } } } +} - trx_num = 0; - for (const auto& packed_trx : block.input_transactions) { - const signed_transaction& trx = packed_trx.get_signed_transaction(); - auto doc = process_trx(trx); - doc.append(kvp("type", "input")); - doc.append(kvp("signatures", [&trx](bsoncxx::builder::basic::sub_array subarr) { - for (const auto& sig : trx.signatures) { - subarr.append(fc::variant(sig).as_string()); - } - })); - mongocxx::model::insert_one insert_op{doc.view()}; - bulk_trans.append(insert_op); - ++trx_num; +void mongo_db_plugin_impl::_process_applied_transaction( const chain::transaction_trace_ptr& t ) { + using namespace bsoncxx::types; + using bsoncxx::builder::basic::kvp; + + auto trans_traces = mongo_conn[db_name][trans_traces_col]; + auto trans_traces_doc = bsoncxx::builder::basic::document{}; + + auto now = std::chrono::duration_cast( + std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); + + auto v = to_variant_with_abi( *t, accounts ); + string json = fc::json::to_string( v ); + try { + const auto& value = bsoncxx::from_json( json ); + trans_traces_doc.append( bsoncxx::builder::concatenate_doc{value.view()} ); + } catch( bsoncxx::exception& ) { + try { + json = fc::prune_invalid_utf8( json ); + const auto& value = bsoncxx::from_json( json ); + trans_traces_doc.append( bsoncxx::builder::concatenate_doc{value.view()} ); + trans_traces_doc.append( kvp( "non-utf8-purged", b_bool{true} )); + } catch( bsoncxx::exception& e ) { + elog( "Unable to convert transaction JSON to MongoDB JSON: ${e}", ("e", e.what())); + elog( " JSON: ${j}", ("j", json)); + } } + trans_traces_doc.append( kvp( "createdAt", b_date{now} )); - for (const auto& implicit_trx : bt.implicit_transactions ){ - auto doc = process_trx(implicit_trx); - doc.append(kvp("type", "implicit")); - mongocxx::model::insert_one insert_op{doc.view()}; - bulk_trans.append(insert_op); - ++trx_num; + try { + if( !trans_traces.insert_one( trans_traces_doc.view())) { + FC_ASSERT( false, "Failed to insert trans ${id}", ("id", t->id)); + } + } catch(...) { + handle_mongo_exception("trans_traces insert: " + json, __LINE__); } +} + +void mongo_db_plugin_impl::_process_accepted_block( const chain::block_state_ptr& bs ) { + using namespace bsoncxx::types; + using namespace bsoncxx::builder; + using bsoncxx::builder::basic::kvp; + + auto block_num = bs->block_num; + const auto block_id = bs->id; + const auto block_id_str = block_id.str(); + const auto prev_block_id_str = bs->block->previous.str(); + + auto now = std::chrono::duration_cast( + std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); - if (actions_to_write) { - auto result = msgs.bulk_write(bulk_msgs); - if (!result) { - elog("Bulk actions insert failed for block: ${bid}", ("bid", block_id)); + const chain::block_header_state& bhs = *bs; + + auto block_states = mongo_conn[db_name][block_states_col]; + auto block_state_doc = bsoncxx::builder::basic::document{}; + block_state_doc.append(kvp( "block_num", b_int32{static_cast(block_num)} ), + kvp( "block_id", block_id_str ), + kvp( "validated", b_bool{bs->validated} ), + kvp( "in_current_chain", b_bool{bs->in_current_chain} )); + + auto json = fc::json::to_string( bhs ); + try { + const auto& value = bsoncxx::from_json( json ); + block_state_doc.append( kvp( "block_header_state", value )); + } catch( bsoncxx::exception& ) { + try { + json = fc::prune_invalid_utf8(json); + const auto& value = bsoncxx::from_json( json ); + block_state_doc.append( kvp( "block_header_state", value )); + block_state_doc.append( kvp( "non-utf8-purged", b_bool{true})); + } catch( bsoncxx::exception& e ) { + elog( "Unable to convert block_header_state JSON to MongoDB JSON: ${e}", ("e", e.what())); + elog( " JSON: ${j}", ("j", json)); } } - if (action_traces_to_write) { - auto result = action_traces.bulk_write(bulk_acts); - if (!result) { - elog("Bulk action traces insert failed for block: ${bid}", ("bid", block_id)); + block_state_doc.append(kvp( "createdAt", b_date{now} )); + + try { + if( !block_states.insert_one( block_state_doc.view())) { + FC_ASSERT( false, "Failed to insert block_state ${bid}", ("bid", block_id)); } + } catch(...) { + handle_mongo_exception("block_states insert: " + json, __LINE__); } - if (transactions_in_block) { - auto result = trans.bulk_write(bulk_trans); - if (!result) { - elog("Bulk transaction insert failed for block: ${bid}", ("bid", block_id)); + + auto blocks = mongo_conn[db_name][blocks_col]; + auto block_doc = bsoncxx::builder::basic::document{}; + block_doc.append(kvp( "block_num", b_int32{static_cast(block_num)} ), + kvp( "block_id", block_id_str ), + kvp( "irreversible", b_bool{false} )); + + auto v = to_variant_with_abi( *bs->block, accounts ); + json = fc::json::to_string( v ); + try { + const auto& value = bsoncxx::from_json( json ); + block_doc.append( kvp( "block", value )); + } catch( bsoncxx::exception& ) { + try { + json = fc::prune_invalid_utf8(json); + const auto& value = bsoncxx::from_json( json ); + block_doc.append( kvp( "block", value )); + block_doc.append( kvp( "non-utf8-purged", b_bool{true})); + } catch( bsoncxx::exception& e ) { + elog( "Unable to convert block JSON to MongoDB JSON: ${e}", ("e", e.what())); + elog( " JSON: ${j}", ("j", json)); } } + block_doc.append(kvp( "createdAt", b_date{now} )); - ++processed; + try { + if( !blocks.insert_one( block_doc.view())) { + FC_ASSERT( false, "Failed to insert block ${bid}", ("bid", block_id)); + } + } catch(...) { + handle_mongo_exception("blocks insert: " + json, __LINE__); + } } -void mongo_db_plugin_impl::_process_irreversible_block(const signed_block& block) +void mongo_db_plugin_impl::_process_irreversible_block(const chain::block_state_ptr& bs) { using namespace bsoncxx::types; using namespace bsoncxx::builder; + using bsoncxx::builder::basic::make_document; using bsoncxx::builder::basic::kvp; - using bsoncxx::builder::stream::document; - using bsoncxx::builder::stream::open_document; - using bsoncxx::builder::stream::close_document; - using bsoncxx::builder::stream::finalize; - auto blocks = mongo_conn[db_name][blocks_col]; // Blocks auto trans = mongo_conn[db_name][trans_col]; // Transactions - const auto block_id = block.id(); + const auto block_id = bs->block->id(); const auto block_id_str = block_id.str(); + const auto block_num = bs->block->block_num(); + + // genesis block 1 is not signaled to accepted_block + if (block_num < 2) return; auto now = std::chrono::duration_cast( std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); auto ir_block = find_block(blocks, block_id_str); + if( !ir_block ) { + _process_accepted_block( bs ); + ir_block = find_block( blocks, block_id_str ); + if( !ir_block ) return; // should never happen + } - document update_block{}; - update_block << "$set" << open_document << "pending" << b_bool{false} - << "updatedAt" << b_date{now} - << close_document; - - blocks.update_one(document{} << "_id" << ir_block.view()["_id"].get_oid() << finalize, update_block.view()); - - for (const auto& r: block.regions) { - for (const auto& cs: r.cycles_summary) { - for (const auto& ss: cs) { - for (const auto& trx_receipt: ss.transactions) { - const auto trans_id_str = trx_receipt.id.str(); - auto ir_trans = find_transaction(trans, trans_id_str); - - document update_trans{}; - update_trans << "$set" << open_document << "pending" << b_bool{false} - << "updatedAt" << b_date{now} - << close_document; + auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ), + kvp( "validated", b_bool{bs->validated} ), + kvp( "in_current_chain", b_bool{bs->in_current_chain} ), + kvp( "updatedAt", b_date{now})))); - trans.update_one(document{} << "_id" << ir_trans.view()["_id"].get_oid() << finalize, - update_trans.view()); + blocks.update_one( make_document( kvp( "_id", ir_block->view()["_id"].get_oid())), update_doc.view()); - // actions are irreversible, so update account document - if (ir_trans.view()["status"].get_utf8().value.to_string() == "executed") { - for (const auto& msg : reversible_actions[trans_id_str]) { - update_account(msg); - } - } - reversible_actions.erase(trans_id_str); - } - } + bool transactions_in_block = false; + mongocxx::options::bulk_write bulk_opts; + bulk_opts.ordered(false); + auto bulk = trans.create_bulk_write(bulk_opts); + + for (const auto& receipt : bs->block->transactions) { + string trx_id_str; + if( receipt.trx.contains()) { + const auto& pt = receipt.trx.get(); + // get id via get_raw_transaction() as packed_transaction.id() mutates inernal transaction state + const auto& raw = pt.get_raw_transaction(); + const auto& id = fc::raw::unpack(raw).id(); + trx_id_str = id.str(); + } else { + const auto& id = receipt.trx.get(); + trx_id_str = id.str(); } - } - -} - -// For now providing some simple account processing to maintain eos_balance -void mongo_db_plugin_impl::update_account(const chain::action& msg) { - using bsoncxx::builder::basic::kvp; - using bsoncxx::builder::stream::document; - using bsoncxx::builder::stream::open_document; - using bsoncxx::builder::stream::close_document; - using bsoncxx::builder::stream::finalize; - using namespace bsoncxx::types; - if (msg.account != chain::config::system_account_name) - return; + auto ir_trans = find_transaction(trans, trx_id_str); - if (msg.name == transfer) { - auto now = std::chrono::duration_cast( - std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); + if (ir_trans) { + auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ), + kvp( "block_id", block_id_str), + kvp( "block_num", b_int32{static_cast(block_num)} ), + kvp( "updatedAt", b_date{now})))); - abi_serializer abis; - auto eosio_account = find_account(accounts, msg.account); - auto abi = fc::json::from_string(bsoncxx::to_json(eosio_account.view()["abi"].get_document())).as(); - abis.set_abi(abi); - auto transfer = abis.binary_to_variant(abis.get_action_type(msg.name), msg.data); - auto from_name = transfer["from"].as().to_string(); - auto to_name = transfer["to"].as().to_string(); - auto from_account = find_account(accounts, from_name); - auto to_account = find_account(accounts, to_name); - - asset from_balance = asset::from_string(from_account.view()["eos_balance"].get_utf8().value.to_string()); - asset to_balance = asset::from_string(to_account.view()["eos_balance"].get_utf8().value.to_string()); - auto asset_quantity = transfer["quantity"].as(); - edump((from_balance)(to_balance)(asset_quantity)); - from_balance -= asset_quantity; - to_balance += asset_quantity; - - document update_from{}; - update_from << "$set" << open_document << "eos_balance" << from_balance.to_string() - << "updatedAt" << b_date{now} - << close_document; - document update_to{}; - update_to << "$set" << open_document << "eos_balance" << to_balance.to_string() - << "updatedAt" << b_date{now} - << close_document; - - accounts.update_one(document{} << "_id" << from_account.view()["_id"].get_oid() << finalize, update_from.view()); - accounts.update_one(document{} << "_id" << to_account.view()["_id"].get_oid() << finalize, update_to.view()); - - } else if (msg.name == newaccount) { - auto now = std::chrono::duration_cast( - std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); - auto newaccount = msg.data_as(); + mongocxx::model::update_one update_op{ make_document(kvp("_id", ir_trans->view()["_id"].get_oid())), update_doc.view()}; + bulk.append(update_op); - // create new account - bsoncxx::builder::stream::document doc{}; - doc << "name" << newaccount.name.to_string() - << "eos_balance" << asset().to_string() - << "staked_balance" << asset().to_string() - << "unstaking_balance" << asset().to_string() - << "createdAt" << b_date{now} - << "updatedAt" << b_date{now}; - if (!accounts.insert_one(doc.view())) { - elog("Failed to insert account ${n}", ("n", newaccount.name)); + transactions_in_block = true; } + } - } else if (msg.name == setabi) { - auto now = std::chrono::duration_cast( - std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); - auto setabi = msg.data_as(); - auto from_account = find_account(accounts, setabi.account); - - document update_from{}; - update_from << "$set" << open_document - << "abi" << bsoncxx::from_json(fc::json::to_string(setabi.abi)) - << "updatedAt" << b_date{now} - << close_document; - - accounts.update_one(document{} << "_id" << from_account.view()["_id"].get_oid() << finalize, update_from.view()); + if( transactions_in_block ) { + try { + if( !bulk.execute()) { + FC_ASSERT( false, "Bulk transaction insert failed for block: ${bid}", ("bid", block_id)); + } + } catch(...) { + handle_mongo_exception("bulk transaction insert", __LINE__); + } } } @@ -703,74 +960,90 @@ mongo_db_plugin_impl::mongo_db_plugin_impl() } mongo_db_plugin_impl::~mongo_db_plugin_impl() { - try { - done = true; - condition.notify_one(); + if (!startup) { + try { + ilog( "mongo_db_plugin shutdown in process please be patient this can take a few minutes" ); + done = true; + condition.notify_one(); - consume_thread.join(); - } catch (std::exception& e) { - elog("Exception on mongo_db_plugin shutdown of consume thread: ${e}", ("e", e.what())); + consume_thread.join(); + } catch( std::exception& e ) { + elog( "Exception on mongo_db_plugin shutdown of consume thread: ${e}", ("e", e.what())); + } } } void mongo_db_plugin_impl::wipe_database() { ilog("mongo db wipe_database"); - accounts = mongo_conn[db_name][accounts_col]; // Accounts - auto blocks = mongo_conn[db_name][blocks_col]; // Blocks - auto trans = mongo_conn[db_name][trans_col]; // Transactions - auto msgs = mongo_conn[db_name][actions_col]; // Actions - auto action_traces = mongo_conn[db_name][action_traces_col]; // ActionTraces + auto block_states = mongo_conn[db_name][block_states_col]; + auto blocks = mongo_conn[db_name][blocks_col]; + auto trans = mongo_conn[db_name][trans_col]; + auto trans_traces = mongo_conn[db_name][trans_traces_col]; + auto actions = mongo_conn[db_name][actions_col]; + accounts = mongo_conn[db_name][accounts_col]; + block_states.drop(); blocks.drop(); trans.drop(); + trans_traces.drop(); + actions.drop(); accounts.drop(); - msgs.drop(); - action_traces.drop(); } void mongo_db_plugin_impl::init() { using namespace bsoncxx::types; + using bsoncxx::builder::basic::make_document; + using bsoncxx::builder::basic::kvp; // Create the native contract accounts manually; sadly, we can't run their contracts to make them create themselves // See native_contract_chain_initializer::prepare_database() - accounts = mongo_conn[db_name][accounts_col]; // Accounts - bsoncxx::builder::stream::document doc{}; - if (accounts.count(doc.view()) == 0) { + accounts = mongo_conn[db_name][accounts_col]; + if (accounts.count(make_document()) == 0) { auto now = std::chrono::duration_cast( std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); - doc << "name" << name(chain::config::system_account_name).to_string() - << "eos_balance" << asset().to_string() - << "staked_balance" << asset().to_string() - << "unstaking_balance" << asset().to_string() - << "createdAt" << b_date{now} - << "updatedAt" << b_date{now}; - if (!accounts.insert_one(doc.view())) { - elog("Failed to insert account ${n}", ("n", name(chain::config::system_account_name).to_string())); + auto doc = make_document( kvp( "name", name( chain::config::system_account_name ).to_string()), + kvp( "createdAt", b_date{now} )); + + try { + if( !accounts.insert_one( doc.view())) { + FC_ASSERT( false, "Failed to insert account ${n}", + ("n", name( chain::config::system_account_name ).to_string())); + } + } catch(...) { + handle_mongo_exception("account insert", __LINE__); } - // Blocks indexes - auto blocks = mongo_conn[db_name][blocks_col]; // Blocks - blocks.create_index(bsoncxx::from_json(R"xxx({ "block_num" : 1 })xxx")); - blocks.create_index(bsoncxx::from_json(R"xxx({ "block_id" : 1 })xxx")); + try { + // blocks indexes + auto blocks = mongo_conn[db_name][blocks_col]; // Blocks + blocks.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" )); + blocks.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" )); - // Accounts indexes - accounts.create_index(bsoncxx::from_json(R"xxx({ "name" : 1 })xxx")); + auto block_stats = mongo_conn[db_name][block_states_col]; + block_stats.create_index( bsoncxx::from_json( R"xxx({ "block_num" : 1 })xxx" )); + block_stats.create_index( bsoncxx::from_json( R"xxx({ "block_id" : 1 })xxx" )); - // Transactions indexes - auto trans = mongo_conn[db_name][trans_col]; // Transactions - trans.create_index(bsoncxx::from_json(R"xxx({ "transaction_id" : 1 })xxx")); + // accounts indexes + accounts.create_index( bsoncxx::from_json( R"xxx({ "name" : 1 })xxx" )); - // Action indexes - auto msgs = mongo_conn[db_name][actions_col]; // Messages - msgs.create_index(bsoncxx::from_json(R"xxx({ "action_id" : 1 })xxx")); - msgs.create_index(bsoncxx::from_json(R"xxx({ "transaction_id" : 1 })xxx")); + // transactions indexes + auto trans = mongo_conn[db_name][trans_col]; // Transactions + trans.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" )); - // ActionTraces indexes - auto action_traces = mongo_conn[db_name][action_traces_col]; // ActionTraces - action_traces.create_index(bsoncxx::from_json(R"xxx({ "transaction_id" : 1 })xxx")); + auto actions = mongo_conn[db_name][actions_col]; + actions.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" )); + } catch(...) { + handle_mongo_exception("create indexes", __LINE__); + } } + + ilog("starting db plugin thread"); + + consume_thread = boost::thread([this] { consume_blocks(); }); + + startup = false; } //////////// @@ -789,74 +1062,103 @@ mongo_db_plugin::~mongo_db_plugin() void mongo_db_plugin::set_program_options(options_description& cli, options_description& cfg) { cfg.add_options() - ("mongodb-queue-size,q", bpo::value()->default_value(256), - "The queue size between nodeos and MongoDB plugin thread.") + ("mongodb-queue-size,q", bpo::value()->default_value(256), + "The target queue size between nodeos and MongoDB plugin thread.") + ("mongodb-wipe", bpo::bool_switch()->default_value(false), + "Required with --replay-blockchain, --hard-replay-blockchain, or --delete-all-blocks to wipe mongo db." + "This option required to prevent accidental wipe of mongo db.") + ("mongodb-block-start", bpo::value()->default_value(0), + "If specified then only abi data pushed to mongodb until specified block is reached.") ("mongodb-uri,m", bpo::value(), "MongoDB URI connection string, see: https://docs.mongodb.com/master/reference/connection-string/." - " If not specified then plugin is disabled. Default database 'EOS' is used if not specified in URI.") + " If not specified then plugin is disabled. Default database 'EOS' is used if not specified in URI." + " Example: mongodb://127.0.0.1:27017/EOS") ; } void mongo_db_plugin::plugin_initialize(const variables_map& options) { - if (options.count("mongodb-uri")) { - ilog("initializing mongo_db_plugin"); - my->configured = true; - - if (options.at("replay-blockchain").as()) { - ilog("Replay requested: wiping mongo database on startup"); - my->wipe_database_on_startup = true; - } - if (options.at("delete-all-blocks").as()) { - ilog("Resync requested: wiping mongo database on startup"); - my->wipe_database_on_startup = true; - } - - if (options.count("mongodb-queue-size")) { - auto size = options.at("mongodb-queue-size").as(); - my->queue_size = size; - } + try { + if( options.count( "mongodb-uri" )) { + ilog( "initializing mongo_db_plugin" ); + my->configured = true; + + if( options.at( "replay-blockchain" ).as() || options.at( "hard-replay-blockchain" ).as() || options.at( "delete-all-blocks" ).as() ) { + if( options.at( "mongodb-wipe" ).as()) { + ilog( "Wiping mongo database on startup" ); + my->wipe_database_on_startup = true; + } else if( options.count( "mongodb-block-start" ) == 0 ) { + FC_ASSERT( false, "--mongodb-wipe required with --replay-blockchain, --hard-replay-blockchain, or --delete-all-blocks" + " --mongodb-wipe will remove all EOS collections from mongodb." ); + } + } - std::string uri_str = options.at("mongodb-uri").as(); - ilog("connecting to ${u}", ("u", uri_str)); - mongocxx::uri uri = mongocxx::uri{uri_str}; - my->db_name = uri.database(); - if (my->db_name.empty()) - my->db_name = "EOS"; - my->mongo_conn = mongocxx::client{uri}; + if( options.count( "abi-serializer-max-time-ms") == 0 ) { + FC_ASSERT(false, "--abi-serializer-max-time-ms required as default value not appropriate for parsing full blocks"); + } - // add callback to chain_controller config - chain_plugin* chain_plug = app().find_plugin(); - FC_ASSERT(chain_plug); - chain_plug->chain_config().applied_block_callbacks.emplace_back( - [my = my](const chain::block_trace& bt) { my->applied_block(bt); }); - chain_plug->chain_config().applied_irreversible_block_callbacks.emplace_back( - [my = my](const chain::signed_block& b) { my->applied_irreversible_block(b); }); + if( options.count( "mongodb-queue-size" )) { + my->queue_size = options.at( "mongodb-queue-size" ).as(); + } + if( options.count( "mongodb-block-start" )) { + my->start_block_num = options.at( "mongodb-block-start" ).as(); + } + if( my->start_block_num == 0 ) { + my->start_block_reached = true; + } - if (my->wipe_database_on_startup) { - my->wipe_database(); + std::string uri_str = options.at( "mongodb-uri" ).as(); + ilog( "connecting to ${u}", ("u", uri_str)); + mongocxx::uri uri = mongocxx::uri{uri_str}; + my->db_name = uri.database(); + if( my->db_name.empty()) + my->db_name = "EOS"; + my->mongo_conn = mongocxx::client{uri}; + + // hook up to signals on controller + chain_plugin* chain_plug = app().find_plugin(); + FC_ASSERT( chain_plug ); + auto& chain = chain_plug->chain(); + my->chain_id.emplace( chain.get_chain_id()); + + my->accepted_block_connection.emplace( chain.accepted_block.connect( [&]( const chain::block_state_ptr& bs ) { + my->accepted_block( bs ); + } )); + my->irreversible_block_connection.emplace( + chain.irreversible_block.connect( [&]( const chain::block_state_ptr& bs ) { + my->applied_irreversible_block( bs ); + } )); + my->accepted_transaction_connection.emplace( + chain.accepted_transaction.connect( [&]( const chain::transaction_metadata_ptr& t ) { + my->accepted_transaction( t ); + } )); + my->applied_transaction_connection.emplace( + chain.applied_transaction.connect( [&]( const chain::transaction_trace_ptr& t ) { + my->applied_transaction( t ); + } )); + + if( my->wipe_database_on_startup ) { + my->wipe_database(); + } + my->init(); + } else { + wlog( "eosio::mongo_db_plugin configured, but no --mongodb-uri specified." ); + wlog( "mongo_db_plugin disabled." ); } - my->init(); - } else { - wlog("eosio::mongo_db_plugin configured, but no --mongodb-uri specified."); - wlog("mongo_db_plugin disabled."); - } + } FC_LOG_AND_RETHROW() } void mongo_db_plugin::plugin_startup() { - if (my->configured) { - ilog("starting db plugin"); - - my->consume_thread = boost::thread([this] { my->consume_blocks(); }); - - // chain_controller is created and has resynced or replayed if needed - my->startup = false; - } } void mongo_db_plugin::plugin_shutdown() { + my->accepted_block_connection.reset(); + my->irreversible_block_connection.reset(); + my->accepted_transaction_connection.reset(); + my->applied_transaction_connection.reset(); + my.reset(); } diff --git a/programs/nodeos/CMakeLists.txt b/programs/nodeos/CMakeLists.txt index 99449215900..1ab5850ef9b 100644 --- a/programs/nodeos/CMakeLists.txt +++ b/programs/nodeos/CMakeLists.txt @@ -64,10 +64,9 @@ if(TARGET sql_db_plugin) target_link_libraries( nodeos PRIVATE -Wl,${whole_archive_flag} sql_db_plugin -Wl,${no_whole_archive_flag} ) endif() -#if(BUILD_MONGO_DB_PLUGIN) -# target_link_libraries( nodeos -# PRIVATE -Wl,${whole_archive_flag} mongo_db_plugin -Wl,${no_whole_archive_flag} ) -#endif() +if(BUILD_MONGO_DB_PLUGIN) + target_link_libraries( nodeos PRIVATE -Wl,${whole_archive_flag} mongo_db_plugin -Wl,${no_whole_archive_flag} ) +endif() install( TARGETS nodeos diff --git a/programs/nodeos/main.cpp b/programs/nodeos/main.cpp index cf612a02ab6..2e40e8de774 100644 --- a/programs/nodeos/main.cpp +++ b/programs/nodeos/main.cpp @@ -119,7 +119,7 @@ int main(int argc, char** argv) return DATABASE_DIRTY; } } - elog("${e}", ("e",e.to_detail_string())); + elog( "${e}", ("e", e.to_detail_string())); return OTHER_FAIL; } catch( const boost::interprocess::bad_alloc& e ) { elog("bad alloc"); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 6c66bbb37b1..eb60966046d 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -46,9 +46,9 @@ add_test(NAME nodeos_sanity_test COMMAND tests/nodeos_run_test.py -v --sanity-te add_test(NAME nodeos_run_test COMMAND tests/nodeos_run_test.py -v --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) add_test(NAME p2p_dawn515_test COMMAND tests/p2p_tests/dawn_515/test.sh WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) -#if(BUILD_MONGO_DB_PLUGIN) -# add_test(NAME nodeos_run_test-mongodb COMMAND tests/nodeos_run_test.py --mongodb -v --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) -#endif() +if(BUILD_MONGO_DB_PLUGIN) + add_test(NAME nodeos_run_test-mongodb COMMAND tests/nodeos_run_test.py --mongodb -v --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) +endif() add_test(NAME distributed-transactions-test COMMAND tests/distributed-transactions-test.py -d 2 -p 1 -n 4 -v --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) add_test(NAME restart-scenarios-test-resync COMMAND tests/restart-scenarios-test.py -c resync -p4 -v --clean-run --dump-error-details WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) diff --git a/tests/Cluster.py b/tests/Cluster.py index fe2a203eb40..dcb4bad0f3f 100644 --- a/tests/Cluster.py +++ b/tests/Cluster.py @@ -129,11 +129,11 @@ def launch(self, pnodes=1, totalNodes=1, prodCount=1, topo="mesh", p2pPlugin="ne if self.staging: cmdArr.append("--nogen") - nodeosArgs="--max-transaction-time 5000 --filter-on * --p2p-max-nodes-per-host %d" % (totalNodes) + nodeosArgs="--max-transaction-time 5000 --abi-serializer-max-time-ms 5000 --filter-on * --p2p-max-nodes-per-host %d" % (totalNodes) if not self.walletd: nodeosArgs += " --plugin eosio::wallet_api_plugin" if self.enableMongo: - nodeosArgs += " --plugin eosio::mongo_db_plugin --delete-all-blocks --mongodb-uri %s" % self.mongoUri + nodeosArgs += " --plugin eosio::mongo_db_plugin --mongodb-wipe --delete-all-blocks --mongodb-uri %s" % self.mongoUri if nodeosArgs: cmdArr.append("--nodeos") diff --git a/tests/Node.py b/tests/Node.py index c938c97cde6..5734f1898c9 100644 --- a/tests/Node.py +++ b/tests/Node.py @@ -1,6 +1,5 @@ import decimal import subprocess -import time import os import re import datetime @@ -22,7 +21,6 @@ def __init__(self, host, port, pid=None, cmd=None, enableMongo=False, mongoHost= self.cmd=cmd self.killed=False # marks node as killed self.enableMongo=enableMongo - self.mongoSyncTime=None if Utils.mongoSyncTime < 1 else Utils.mongoSyncTime self.mongoHost=mongoHost self.mongoPort=mongoPort self.mongoDb=mongoDb @@ -46,10 +44,13 @@ def printTrans(trans): assert trans["processed"]["receipt"]["status"] == "executed", printTrans(trans) - # Passes input to stdin, executes cmd. Returns tuple with return code(int), - # stdout(byte stream) and stderr(byte stream). @staticmethod def stdinAndCheckOutput(cmd, subcommand): + """Passes input to stdin, executes cmd. Returns tuple with return code(int), stdout(byte stream) and stderr(byte stream).""" + assert(cmd) + assert(isinstance(cmd, list)) + assert(subcommand) + assert(isinstance(subcommand, str)) outs=None errs=None ret=0 @@ -68,12 +69,19 @@ def normalizeJsonObject(extJStr): tmpStr=extJStr tmpStr=re.sub(r'ObjectId\("(\w+)"\)', r'"ObjectId-\1"', tmpStr) tmpStr=re.sub(r'ISODate\("([\w|\-|\:|\.]+)"\)', r'"ISODate-\1"', tmpStr) + tmpStr=re.sub(r'NumberLong\("(\w+)"\)', r'"NumberLong-\1"', tmpStr) return tmpStr @staticmethod - def runMongoCmdReturnJson(cmdArr, subcommand, trace=False): - retId,outs=Node.stdinAndCheckOutput(cmdArr, subcommand) + def runMongoCmdReturnJson(cmd, subcommand, trace=False): + """Run mongodb subcommand and return response.""" + assert(cmd) + assert(isinstance(cmd, list)) + assert(subcommand) + assert(isinstance(subcommand, str)) + retId,outs,errs=Node.stdinAndCheckOutput(cmd, subcommand) if retId is not 0: + Utils.Print("ERROR: mongodb call failed. %s" % (errs)) return None outStr=Node.byteArrToStr(outs) if not outStr: @@ -85,8 +93,14 @@ def runMongoCmdReturnJson(cmdArr, subcommand, trace=False): if not jStr: return None if trace: Utils.Print ("RAW > %s"% (outStr)) - #trace and Utils.Print ("JSON> %s"% jStr) - jsonData=json.loads(jStr) + if trace: Utils.Print ("JSON> %s"% jStr) + try: + jsonData=json.loads(jStr) + except json.decoder.JSONDecodeError as _: + Utils.Print ("ERROR: JSONDecodeError") + Utils.Print ("Raw MongoDB response: > %s"% (outStr)) + Utils.Print ("Normalized MongoDB response: > %s"% (jStr)) + raise return jsonData @staticmethod @@ -117,85 +131,60 @@ def validateAccounts(self, accounts): accountInfo=self.getEosAccount(account.name) try: assert(accountInfo) - assert(accountInfo["account_name"] == account.name) + if not self.enableMongo: + assert(accountInfo["account_name"] == account.name) + else: + assert(accountInfo["name"] == account.name) except (AssertionError, TypeError, KeyError) as _: Utils.Print("account validation failed. account: %s" % (account.name)) raise # pylint: disable=too-many-branches - def getBlock(self, blockNum, retry=True, silentErrors=False): + def getBlock(self, blockNum, silentErrors=False): """Given a blockId will return block details.""" assert(isinstance(blockNum, int)) if not self.enableMongo: cmd="%s %s get block %d" % (Utils.EosClientPath, self.endpointArgs, blockNum) if Utils.Debug: Utils.Print("cmd: %s" % (cmd)) try: - trans=Utils.runCmdReturnJson(cmd) - return trans + block=Utils.runCmdReturnJson(cmd) + return block except subprocess.CalledProcessError as ex: if not silentErrors: msg=ex.output.decode("utf-8") Utils.Print("ERROR: Exception during get block. %s" % (msg)) return None else: - for _ in range(2): - cmd="%s %s" % (Utils.MongoPath, self.mongoEndpointArgs) - subcommand='db.Blocks.findOne( { "block_num": %d } )' % (blockNum) - if Utils.Debug: Utils.Print("cmd: echo '%s' | %s" % (subcommand, cmd)) - try: - trans=Node.runMongoCmdReturnJson(cmd.split(), subcommand) - if trans is not None: - return trans - except subprocess.CalledProcessError as ex: - if not silentErrors: - msg=ex.output.decode("utf-8") - Utils.Print("ERROR: Exception during get db node get block. %s" % (msg)) - return None - if not retry: - break - if self.mongoSyncTime is not None: - if Utils.Debug: Utils.Print("cmd: sleep %d seconds" % (self.mongoSyncTime)) - time.sleep(self.mongoSyncTime) - - return None - - def getBlockById(self, blockId, retry=True, silentErrors=False): - for _ in range(2): cmd="%s %s" % (Utils.MongoPath, self.mongoEndpointArgs) - subcommand='db.Blocks.findOne( { "block_id": "%s" } )' % (blockId) + subcommand='db.blocks.findOne( { "block_num": %d } )' % (blockNum) if Utils.Debug: Utils.Print("cmd: echo '%s' | %s" % (subcommand, cmd)) try: - trans=Node.runMongoCmdReturnJson(cmd.split(), subcommand) - if trans is not None: - return trans + block=Node.runMongoCmdReturnJson(cmd.split(), subcommand) + if block is not None: + return block except subprocess.CalledProcessError as ex: if not silentErrors: msg=ex.output.decode("utf-8") - Utils.Print("ERROR: Exception during db get block by id. %s" % (msg)) + Utils.Print("ERROR: Exception during get db node get block. %s" % (msg)) return None - if not retry: - break - if self.mongoSyncTime is not None: - if Utils.Debug: Utils.Print("cmd: sleep %d seconds" % (self.mongoSyncTime)) - time.sleep(self.mongoSyncTime) return None - # def doesNodeHaveBlockNum(self, blockNum): - # """Does node have head_block_num >= blockNum""" - # assert isinstance(blockNum, int) - # assert (blockNum > 0) - - # info=self.getInfo(silentErrors=True) - # assert(info) - # head_block_num=0 - # try: - # head_block_num=int(info["head_block_num"]) - # except (TypeError, KeyError) as _: - # Utils.Print("Failure in get info parsing. %s" % (info)) - # raise + def getBlockById(self, blockId, silentErrors=False): + cmd="%s %s" % (Utils.MongoPath, self.mongoEndpointArgs) + subcommand='db.blocks.findOne( { "block_id": "%s" } )' % (blockId) + if Utils.Debug: Utils.Print("cmd: echo '%s' | %s" % (subcommand, cmd)) + try: + trans=Node.runMongoCmdReturnJson(cmd.split(), subcommand) + if trans is not None: + return trans + except subprocess.CalledProcessError as ex: + if not silentErrors: + msg=ex.output.decode("utf-8") + Utils.Print("ERROR: Exception during db get block by id. %s" % (msg)) + return None - # return True if blockNum <= head_block_num else False + return None def isBlockPresent(self, blockNum): """Does node have head_block_num >= blockNum""" @@ -238,7 +227,7 @@ def isBlockFinalized(self, blockNum): return finalized # pylint: disable=too-many-branches - def getTransaction(self, transId, retry=True, silentErrors=False): + def getTransaction(self, transId, silentErrors=False): if not self.enableMongo: cmd="%s %s get transaction %s" % (Utils.EosClientPath, self.endpointArgs, transId) if Utils.Debug: Utils.Print("cmd: %s" % (cmd)) @@ -254,26 +243,25 @@ def getTransaction(self, transId, retry=True, silentErrors=False): Utils.Print("ERROR: Exception during transaction retrieval. %s" % (msg)) return None else: - for _ in range(2): - cmd="%s %s" % (Utils.MongoPath, self.mongoEndpointArgs) - subcommand='db.Transactions.findOne( { $and : [ { "transaction_id": "%s" }, {"pending":false} ] } )' % (transId) - if Utils.Debug: Utils.Print("cmd: echo '%s' | %s" % (subcommand, cmd)) - try: - trans=Node.runMongoCmdReturnJson(cmd.split(), subcommand) - return trans - except subprocess.CalledProcessError as ex: - if not silentErrors: - msg=ex.output.decode("utf-8") - Utils.Print("ERROR: Exception during get db node get trans. %s" % (msg)) - return None - if not retry: - break - if self.mongoSyncTime is not None: - if Utils.Debug: Utils.Print("cmd: sleep %d seconds" % (self.mongoSyncTime)) - time.sleep(self.mongoSyncTime) + return self.getTransactionMdb(transId, silentErrors) return None + def getTransactionMdb(self, transId, silentErrors=False): + """Get transaction from MongoDB. Since DB only contains finalized blocks, transactions can take a while to appear in DB.""" + cmd="%s %s" % (Utils.MongoPath, self.mongoEndpointArgs) + #subcommand='db.Transactions.findOne( { $and : [ { "trx_id": "%s" }, {"irreversible":true} ] } )' % (transId) + subcommand='db.transactions.findOne( { "trx_id": "%s" } )' % (transId) + if Utils.Debug: Utils.Print("cmd: echo '%s' | %s" % (subcommand, cmd)) + try: + trans=Node.runMongoCmdReturnJson(cmd.split(), subcommand) + return trans + except subprocess.CalledProcessError as ex: + if not silentErrors: + msg=ex.output.decode("utf-8") + Utils.Print("ERROR: Exception during get db node get trans. %s" % (msg)) + return None + def isTransInBlock(self, transId, blockId): """Check if transId is within block identified by blockId""" assert(transId) @@ -282,11 +270,18 @@ def isTransInBlock(self, transId, blockId): assert(isinstance(blockId, int)) block=self.getBlock(blockId) + assert(block) transactions=None + key="" try: - transactions=block["transactions"] + if not self.enableMongo: + key="[transactions]" + transactions=block["transactions"] + else: + key="[blocks][transactions]" + transactions=block["block"]["transactions"] except (AssertionError, TypeError, KeyError) as _: - Utils.Print("Failed to parse block. %s" % (block)) + Utils.Print("block%s not found. Block: %s" % (key,block)) raise if transactions is not None: @@ -297,7 +292,7 @@ def isTransInBlock(self, transId, blockId): if transId == myTransId: return True except (TypeError, KeyError) as _: - Utils.Print("Failed to parse block transactions. %s" % (trans)) + Utils.Print("transaction%s not found. Transaction: %s" % (key, trans)) return False @@ -309,11 +304,17 @@ def getBlockIdByTransId(self, transId): assert(trans) refBlockNum=None + key="" try: - refBlockNum=trans["trx"]["trx"]["ref_block_num"] + if not self.enableMongo: + key="[trx][trx][ref_block_num]" + refBlockNum=trans["trx"]["trx"]["ref_block_num"] + else: + key="[transaction_header][ref_block_num]" + refBlockNum=trans["transaction_header"]["ref_block_num"] refBlockNum=int(refBlockNum)+1 except (TypeError, ValueError, KeyError) as _: - Utils.Print("transaction parsing failed. Transaction: %s" % (trans)) + Utils.Print("transaction%s not found. Transaction: %s" % (key, trans)) return None headBlockNum=self.getHeadBlockNum() @@ -332,11 +333,42 @@ def getBlockIdByTransId(self, transId): return None + def getBlockIdByTransIdMdb(self, transId): + """Given a transaction Id (string), will return block id (int) containing the transaction. This is specific to MongoDB.""" + assert(transId) + assert(isinstance(transId, str)) + trans=self.getTransactionMdb(transId) + if not trans: return None + + refBlockNum=None + try: + refBlockNum=trans["transaction_header"]["ref_block_num"] + refBlockNum=int(refBlockNum)+1 + except (TypeError, ValueError, KeyError) as _: + Utils.Print("transaction[transaction_header][ref_block_num] not found. Transaction: %s" % (trans)) + return None + + headBlockNum=self.getHeadBlockNum() + assert(headBlockNum) + try: + headBlockNum=int(headBlockNum) + except(ValueError) as _: + Utils.Print("Info parsing failed. %s" % (headBlockNum)) + + for blockNum in range(refBlockNum, headBlockNum+1): + if self.isTransInBlock(str(transId), blockNum): + return blockNum + + return None + def isTransInAnyBlock(self, transId): """Check if transaction (transId) is in a block.""" assert(transId) assert(isinstance(transId, str)) + # if not self.enableMongo: blockId=self.getBlockIdByTransId(transId) + # else: + # blockId=self.getBlockIdByTransIdMdb(transId) return True if blockId else False def isTransFinalized(self, transId): @@ -350,75 +382,8 @@ def isTransFinalized(self, transId): assert(isinstance(blockId, int)) return self.isBlockFinalized(blockId) - # Disabling MongodDB funbction - # def getTransByBlockId(self, blockId, retry=True, silentErrors=False): - # for _ in range(2): - # cmd="%s %s" % (Utils.MongoPath, self.mongoEndpointArgs) - # subcommand='db.Transactions.find( { "block_id": "%s" } )' % (blockId) - # if Utils.Debug: Utils.Print("cmd: echo '%s' | %s" % (subcommand, cmd)) - # try: - # trans=Node.runMongoCmdReturnJson(cmd.split(), subcommand, True) - # if trans is not None: - # return trans - # except subprocess.CalledProcessError as ex: - # if not silentErrors: - # msg=ex.output.decode("utf-8") - # Utils.Print("ERROR: Exception during db get trans by blockId. %s" % (msg)) - # return None - # if not retry: - # break - # if self.mongoSyncTime is not None: - # if Utils.Debug: Utils.Print("cmd: sleep %d seconds" % (self.mongoSyncTime)) - # time.sleep(self.mongoSyncTime) - - # return None - - def getActionFromDb(self, transId, retry=True, silentErrors=False): - for _ in range(2): - cmd="%s %s" % (Utils.MongoPath, self.mongoEndpointArgs) - subcommand='db.Actions.findOne( { "transaction_id": "%s" } )' % (transId) - if Utils.Debug: Utils.Print("cmd: echo '%s' | %s" % (subcommand, cmd)) - try: - trans=Node.runMongoCmdReturnJson(cmd.split(), subcommand) - if trans is not None: - return trans - except subprocess.CalledProcessError as ex: - if not silentErrors: - msg=ex.output.decode("utf-8") - Utils.Print("ERROR: Exception during get db node get message. %s" % (msg)) - return None - if not retry: - break - if self.mongoSyncTime is not None: - if Utils.Debug: Utils.Print("cmd: sleep %d seconds" % (self.mongoSyncTime)) - time.sleep(self.mongoSyncTime) - - return None - - def getMessageFromDb(self, transId, retry=True, silentErrors=False): - for _ in range(2): - cmd="%s %s" % (Utils.MongoPath, self.mongoEndpointArgs) - subcommand='db.Messages.findOne( { "transaction_id": "%s" } )' % (transId) - if Utils.Debug: Utils.Print("cmd: echo '%s' | %s" % (subcommand, cmd)) - try: - trans=Node.runMongoCmdReturnJson(cmd.split(), subcommand) - if trans is not None: - return trans - except subprocess.CalledProcessError as ex: - if not silentErrors: - msg=ex.output.decode("utf-8") - Utils.Print("ERROR: Exception during get db node get message. %s" % (msg)) - return None - if not retry: - break - if self.mongoSyncTime is not None: - if Utils.Debug: Utils.Print("cmd: sleep %d seconds" % (self.mongoSyncTime)) - time.sleep(self.mongoSyncTime) - - return None - - # Create & initialize account and return creation transactions. Return transaction json object def createInitializeAccount(self, account, creatorAccount, stakedDeposit=1000, waitForTransBlock=False): + """Create & initialize account and return creation transactions. Return transaction json object""" cmd='%s %s system newaccount -j %s %s %s %s --stake-net "100 %s" --stake-cpu "100 %s" --buy-ram "100 %s"' % ( Utils.EosClientPath, self.endpointArgs, creatorAccount.name, account.name, account.ownerPublicKey, account.activePublicKey, @@ -444,9 +409,9 @@ def createInitializeAccount(self, account, creatorAccount, stakedDeposit=1000, w return trans - # Create account and return creation transactions. Return transaction json object - # waitForTransBlock: wait on creation transaction id to appear in a block def createAccount(self, account, creatorAccount, stakedDeposit=1000, waitForTransBlock=False): + """Create account and return creation transactions. Return transaction json object. + waitForTransBlock: wait on creation transaction id to appear in a block.""" cmd="%s %s create account -j %s %s %s %s" % ( Utils.EosClientPath, self.endpointArgs, creatorAccount.name, account.name, account.ownerPublicKey, account.activePublicKey) @@ -473,19 +438,22 @@ def createAccount(self, account, creatorAccount, stakedDeposit=1000, waitForTran def getEosAccount(self, name): assert(isinstance(name, str)) - cmd="%s %s get account -j %s" % (Utils.EosClientPath, self.endpointArgs, name) - if Utils.Debug: Utils.Print("cmd: %s" % (cmd)) - try: - trans=Utils.runCmdReturnJson(cmd) - return trans - except subprocess.CalledProcessError as ex: - msg=ex.output.decode("utf-8") - Utils.Print("ERROR: Exception during get account. %s" % (msg)) - return None + if not self.enableMongo: + cmd="%s %s get account -j %s" % (Utils.EosClientPath, self.endpointArgs, name) + if Utils.Debug: Utils.Print("cmd: %s" % (cmd)) + try: + trans=Utils.runCmdReturnJson(cmd) + return trans + except subprocess.CalledProcessError as ex: + msg=ex.output.decode("utf-8") + Utils.Print("ERROR: Exception during get account. %s" % (msg)) + return None + else: + return self.getEosAccountFromDb(name) def getEosAccountFromDb(self, name): cmd="%s %s" % (Utils.MongoPath, self.mongoEndpointArgs) - subcommand='db.Accounts.findOne({"name" : "%s"})' % (name) + subcommand='db.accounts.findOne({"name" : "%s"})' % (name) if Utils.Debug: Utils.Print("cmd: echo '%s' | %s" % (subcommand, cmd)) try: trans=Node.runMongoCmdReturnJson(cmd.split(), subcommand) @@ -515,7 +483,7 @@ def getTableAccountBalance(self, contract, scope): try: return trans["rows"][0]["balance"] except (TypeError, KeyError) as _: - print("Transaction parsing failed. Transaction: %s" % (trans)) + print("transaction[rows][0][balance] not found. Transaction: %s" % (trans)) raise def getCurrencyBalance(self, contract, account, symbol=CORE_SYMBOL): @@ -554,6 +522,7 @@ def getCurrencyStats(self, contract, symbol=CORE_SYMBOL): # Verifies account. Returns "get account" json return object def verifyAccount(self, account): + assert(account) if not self.enableMongo: ret=self.getEosAccount(account.name) if ret is not None: @@ -563,17 +532,17 @@ def verifyAccount(self, account): return None return ret else: - for _ in range(2): - ret=self.getEosAccountFromDb(account.name) - if ret is not None: - account_name=ret["name"] - if account_name is None: - Utils.Print("ERROR: Failed to verify account creation.", account.name) - return None - return ret - if self.mongoSyncTime is not None: - if Utils.Debug: Utils.Print("cmd: sleep %d seconds" % (self.mongoSyncTime)) - time.sleep(self.mongoSyncTime) + return self.verifyAccountMdb(account) + + def verifyAccountMdb(self, account): + assert(account) + ret=self.getEosAccountFromDb(account.name) + if ret is not None: + account_name=ret["name"] + if account_name is None: + Utils.Print("ERROR: Failed to verify account creation.", account.name) + return None + return ret return None @@ -709,15 +678,35 @@ def getActions(self, account, pos=-1, offset=-1): assert(isinstance(pos, int)) assert(isinstance(offset, int)) - cmd="%s %s get actions -j %s %d %d" % (Utils.EosClientPath, self.endpointArgs, account.name, pos, offset) - if Utils.Debug: Utils.Print("cmd: %s" % (cmd)) + if not self.enableMongo: + cmd="%s %s get actions -j %s %d %d" % (Utils.EosClientPath, self.endpointArgs, account.name, pos, offset) + if Utils.Debug: Utils.Print("cmd: %s" % (cmd)) + try: + actions=Utils.runCmdReturnJson(cmd) + return actions + except subprocess.CalledProcessError as ex: + msg=ex.output.decode("utf-8") + Utils.Print("ERROR: Exception during actions by account retrieval. %s" % (msg)) + return None + else: + return self.getActionsMdb(account, pos, offset) + + def getActionsMdb(self, account, pos=-1, offset=-1): + assert(isinstance(account, Account)) + assert(isinstance(pos, int)) + assert(isinstance(offset, int)) + + cmd="%s %s" % (Utils.MongoPath, self.mongoEndpointArgs) + subcommand='db.actions.find({$or: [{"data.from":"%s"},{"data.to":"%s"}]}).sort({"_id":%d}).limit(%d)' % (account.name, account.name, pos, abs(offset)) + if Utils.Debug: Utils.Print("cmd: echo '%s' | %s" % (subcommand, cmd)) try: - actions=Utils.runCmdReturnJson(cmd) - return actions + actions=Node.runMongoCmdReturnJson(cmd.split(), subcommand) + if actions is not None: + return actions except subprocess.CalledProcessError as ex: msg=ex.output.decode("utf-8") - Utils.Print("ERROR: Exception during actions by account retrieval. %s" % (msg)) - return None + Utils.Print("ERROR: Exception during get db actions. %s" % (msg)) + return None # Gets accounts mapped to key. Returns array def getAccountsArrByKey(self, key): @@ -746,22 +735,10 @@ def getServantsArr(self, name): def getAccountEosBalanceStr(self, scope): """Returns SYS currency0000 account balance from cleos get table command. Returned balance is string following syntax "98.0311 SYS". """ assert isinstance(scope, str) - if not self.enableMongo: - amount=self.getTableAccountBalance("eosio.token", scope) - if Utils.Debug: Utils.Print("getNodeAccountEosBalance %s %s" % (scope, amount)) - assert isinstance(amount, str) - return amount - else: - if self.mongoSyncTime is not None: - if Utils.Debug: Utils.Print("cmd: sleep %d seconds" % (self.mongoSyncTime)) - time.sleep(self.mongoSyncTime) - - account=self.getEosAccountFromDb(scope) - if account is not None: - balance=account["eos_balance"] - return balance - - return None + amount=self.getTableAccountBalance("eosio.token", scope) + if Utils.Debug: Utils.Print("getNodeAccountEosBalance %s %s" % (scope, amount)) + assert isinstance(amount, str) + return amount def getAccountEosBalance(self, scope): """Returns SYS currency0000 account balance from cleos get table command. Returned balance is an integer e.g. 980311. """ @@ -895,7 +872,7 @@ def getInfo(self, silentErrors=False): def getBlockFromDb(self, idx): cmd="%s %s" % (Utils.MongoPath, self.mongoEndpointArgs) - subcommand="db.Blocks.find().sort({\"_id\":%d}).limit(1).pretty()" % (idx) + subcommand="db.blocks.find().sort({\"_id\":%d}).limit(1).pretty()" % (idx) if Utils.Debug: Utils.Print("cmd: echo \"%s\" | %s" % (subcommand, cmd)) try: trans=Node.runMongoCmdReturnJson(cmd.split(), subcommand) diff --git a/tests/nodeos_run_test.py b/tests/nodeos_run_test.py index 8ecc93f13bf..de2c8f18574 100755 --- a/tests/nodeos_run_test.py +++ b/tests/nodeos_run_test.py @@ -56,7 +56,8 @@ def cmdError(name, cmdCode=0, exitNow=False): WalletdName="keosd" ClientName="cleos" -# Utils.setMongoSyncTime(50) +timeout = .5 * 12 * 2 + 60 # time for finalization with 1 producer + 60 seconds padding +Utils.setIrreversibleTimeout(timeout) try: TestHelper.printSystemInfo("BEGIN") @@ -305,62 +306,17 @@ def cmdError(name, cmdCode=0, exitNow=False): actions=node.getActions(testeraAccount, -1, -1) assert(actions) try: - assert(actions["actions"][0]["action_trace"]["act"]["name"] == "transfer") + if not enableMongo: + assert(actions["actions"][0]["action_trace"]["act"]["name"] == "transfer") + else: + assert(actions["name"] == "transfer") except (AssertionError, TypeError, KeyError) as _: - Print("Last action validation failed. Actions: %s" % (actions)) + Print("Action validation failed. Actions: %s" % (actions)) raise - # This API (get accounts) is no longer supported (Issue 2876) - # expectedAccounts=[testeraAccount.name, currencyAccount.name, exchangeAccount.name] - # Print("Get accounts by key %s, Expected: %s" % (PUB_KEY3, expectedAccounts)) - # actualAccounts=node.getAccountsArrByKey(PUB_KEY3) - # if actualAccounts is None: - # cmdError("%s get accounts pub_key3" % (ClientName)) - # errorExit("Failed to retrieve accounts by key %s" % (PUB_KEY3)) - # noMatch=list(set(expectedAccounts) - set(actualAccounts)) - # if len(noMatch) > 0: - # errorExit("FAILURE - Accounts lookup by key %s. Expected: %s, Actual: %s" % ( - # PUB_KEY3, expectedAccounts, actualAccounts), raw=True) - # - # expectedAccounts=[testeraAccount.name] - # Print("Get accounts by key %s, Expected: %s" % (PUB_KEY1, expectedAccounts)) - # actualAccounts=node.getAccountsArrByKey(PUB_KEY1) - # if actualAccounts is None: - # cmdError("%s get accounts pub_key1" % (ClientName)) - # errorExit("Failed to retrieve accounts by key %s" % (PUB_KEY1)) - # noMatch=list(set(expectedAccounts) - set(actualAccounts)) - # if len(noMatch) > 0: - # errorExit("FAILURE - Accounts lookup by key %s. Expected: %s, Actual: %s" % ( - # PUB_KEY1, expectedAccounts, actualAccounts), raw=True) - - # This API (get servants) is no longer supported. (Issue 3160) - # expectedServants=[testeraAccount.name, currencyAccount.name] - # Print("Get %s servants, Expected: %s" % (defproduceraAccount.name, expectedServants)) - # actualServants=node.getServantsArr(defproduceraAccount.name) - # if actualServants is None: - # cmdError("%s get servants testera11111" % (ClientName)) - # errorExit("Failed to retrieve %s servants" % (defproduceraAccount.name)) - # noMatch=list(set(expectedAccounts) - set(actualAccounts)) - # if len(noMatch) > 0: - # errorExit("FAILURE - %s servants. Expected: %s, Actual: %s" % ( - # defproduceraAccount.name, expectedServants, actualServants), raw=True) - # - # Print("Get %s servants, Expected: []" % (testeraAccount.name)) - # actualServants=node.getServantsArr(testeraAccount.name) - # if actualServants is None: - # cmdError("%s get servants testera11111" % (ClientName)) - # errorExit("Failed to retrieve %s servants" % (testeraAccount.name)) - # if len(actualServants) > 0: - # errorExit("FAILURE - %s servants. Expected: [], Actual: %s" % ( - # testeraAccount.name, actualServants), raw=True) - node.waitForTransInBlock(transId) - transaction=None - if not enableMongo: - transaction=node.getTransaction(transId) - else: - transaction=node.getActionFromDb(transId) + transaction=node.getTransaction(transId) if transaction is None: cmdError("%s get transaction trans_id" % (ClientName)) errorExit("Failed to retrieve transaction details %s" % (transId)) @@ -368,17 +324,22 @@ def cmdError(name, cmdCode=0, exitNow=False): typeVal=None amountVal=None assert(transaction) + key="" try: if not enableMongo: + key="[traces][0][act][name]" typeVal= transaction["traces"][0]["act"]["name"] + key="[traces][0][act][data][quantity]" amountVal=transaction["traces"][0]["act"]["data"]["quantity"] amountVal=int(decimal.Decimal(amountVal.split()[0])*10000) else: - typeVal= transaction["name"] - amountVal=transaction["data"]["quantity"] + key="[actions][0][name]" + typeVal= transaction["actions"][0]["name"] + key="[actions][0][data][quantity]" + amountVal=transaction["actions"][0]["data"]["quantity"] amountVal=int(decimal.Decimal(amountVal.split()[0])*10000) except (TypeError, KeyError) as e: - Print("Transaction validation parsing failed. Transaction: %s" % (transaction)) + Print("transaction%s not found. Transaction: %s" % (key, transaction)) raise if typeVal != "transfer" or amountVal != 975311: @@ -534,7 +495,10 @@ def cmdError(name, cmdCode=0, exitNow=False): assert(block) transactions=None try: - transactions=block["transactions"] + if not enableMongo: + transactions=block["transactions"] + else: + transactions=block["block"]["transactions"] assert(transactions) except (AssertionError, TypeError, KeyError) as _: Print("FAILURE - Failed to parse block. %s" % (block)) @@ -718,38 +682,33 @@ def cmdError(name, cmdCode=0, exitNow=False): cmdError("%s get account" % (ClientName)) errorExit("Failed to get account %s" % (defproduceraAccount.name)) - # - # Proxy - # - # not implemented + Print("Unlocking wallet \"%s\"." % (defproduceraWallet.name)) + if not walletMgr.unlockWallet(testWallet): + cmdError("%s wallet unlock test" % (ClientName)) + errorExit("Failed to unlock wallet %s" % (testWallet.name)) + Print("Get head block num.") currentBlockNum=node.getHeadBlockNum() Print("CurrentBlockNum: %d" % (currentBlockNum)) Print("Request blocks 1-%d" % (currentBlockNum)) - for blockNum in range(1, currentBlockNum+1): - block=node.getBlock(blockNum, retry=False, silentErrors=False) + start=1 + if enableMongo: + start=2 # block 1 (genesis block) is not signaled to the plugins, so not available in DB + for blockNum in range(start, currentBlockNum+1): + block=node.getBlock(blockNum, silentErrors=False) if block is None: cmdError("%s get block" % (ClientName)) errorExit("get block by num %d" % blockNum) if enableMongo: blockId=block["block_id"] - block2=node.getBlockById(blockId, retry=False) + block2=node.getBlockById(blockId) if block2 is None: errorExit("mongo get block by id %s" % blockId) - # TBD: getTransByBlockId() needs to handle multiple returned transactions - # trans=node.getTransByBlockId(blockId, retry=False) - # if trans is not None: - # transId=Node.getTransId(trans) - # trans2=node.getMessageFromDb(transId) - # if trans2 is None: - # errorExit("mongo get messages by transaction id %s" % (transId)) - - Print("Request invalid block numbered %d. This will generate an expected error message." % (currentBlockNum+1000)) - block=node.getBlock(currentBlockNum+1000, silentErrors=True, retry=False) + block=node.getBlock(currentBlockNum+1000, silentErrors=True) if block is not None: errorExit("ERROR: Received block where not expected") else: diff --git a/tests/testUtils.py b/tests/testUtils.py index 543600f84bb..580d40a2768 100755 --- a/tests/testUtils.py +++ b/tests/testUtils.py @@ -41,14 +41,11 @@ def Print(*args, **kwargs): SigTermTag="term" systemWaitTimeout=90 - - # mongoSyncTime: nodeos mongodb plugin seems to sync with a 10-15 seconds delay. This will inject - # a wait period before the 2nd DB check (if first check fails) - mongoSyncTime=25 + irreversibleTimeout=60 @staticmethod - def setMongoSyncTime(syncTime): - Utils.mongoSyncTime=syncTime + def setIrreversibleTimeout(timeout): + Utils.irreversibleTimeout=timeout @staticmethod def setSystemWaitTimeout(timeout): diff --git a/tests/validate-dirty-db.py b/tests/validate-dirty-db.py index 24919926030..2ab7a57c801 100755 --- a/tests/validate-dirty-db.py +++ b/tests/validate-dirty-db.py @@ -38,16 +38,19 @@ def errorExit(msg="", errorCode=1): Utils.Debug=debug testSuccessful=False -def runNodeosAndGetOutput(myNodeId, myTimeout=3): +def runNodeosAndGetOutput(myTimeout=3): """Startup nodeos, wait for timeout (before forced shutdown) and collect output. Stdout, stderr and return code are returned in a dictionary.""" - Print("Launching nodeos process id: %d" % (myNodeId)) + Print("Launching nodeos process.") cmd="programs/nodeos/nodeos --config-dir etc/eosio/node_bios --data-dir var/lib/node_bios --verbose-http-errors" Print("cmd: %s" % (cmd)) proc=subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if debug: Print("Nodeos process launched.") output={} try: + if debug: Print("Setting nodeos process timeout.") outs,errs = proc.communicate(timeout=myTimeout) + if debug: Print("Nodeos process has exited.") output["stdout"] = outs.decode("utf-8") output["stderr"] = errs.decode("utf-8") output["returncode"] = proc.returncode @@ -56,6 +59,7 @@ def runNodeosAndGetOutput(myNodeId, myTimeout=3): proc.send_signal(signal.SIGKILL) return (False, None) + if debug: Print("Returning success.") return (True, output) random.seed(seed) # Use a fixed seed for repeatability. @@ -84,27 +88,27 @@ def runNodeosAndGetOutput(myNodeId, myTimeout=3): cluster.killall(allInstances=killAll) Print("Restart nodeos repeatedly to ensure dirty database flag sticks.") - nodeId=0 timeout=3 - for i in range(0,3): + for i in range(1,4): Print("Attempt %d." % (i)) - ret = runNodeosAndGetOutput(nodeId, timeout) + ret = runNodeosAndGetOutput(timeout) assert(ret) assert(isinstance(ret, tuple)) - if not ret or not ret[0]: - exit(1) - + assert(ret[0]) assert(ret[1]) assert(isinstance(ret[1], dict)) # pylint: disable=unsubscriptable-object stderr= ret[1]["stderr"] retCode=ret[1]["returncode"] - assert(retCode == 2) + assert retCode == 2, "actual return code: %s" % str(retCode) assert("database dirty flag set" in stderr) + if debug: Print("Setting test result to success.") testSuccessful=True finally: + if debug: Print("Cleanup in finally block.") TestHelper.shutdown(cluster, None, testSuccessful, killEosInstances, False, keepLogs, killAll, dumpErrorDetails) +if debug: Print("Exiting test, exit value 0.") exit(0)