From bae241daa2e8ed913298df7a2a75014e2d9e82e7 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 27 Jul 2018 12:57:01 -0500 Subject: [PATCH 01/16] Only set irreversible when true to avoid overwriting an irreversibe=true with a irreversible=false --- plugins/mongo_db_plugin/mongo_db_plugin.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index dcc9a7705bf..7a3d24195bf 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -594,8 +594,7 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti }; if( start_block_reached ) { - trans_doc.append( kvp( "trx_id", trx_id_str ), - kvp( "irreversible", b_bool{false} )); + trans_doc.append( kvp( "trx_id", trx_id_str ) ); string signing_keys_json; if( t->signing_keys.valid()) { @@ -821,9 +820,8 @@ void mongo_db_plugin_impl::_process_accepted_block( const chain::block_state_ptr auto blocks = mongo_conn[db_name][blocks_col]; auto block_doc = bsoncxx::builder::basic::document{}; - block_doc.append(kvp( "block_num", b_int32{static_cast(block_num)} ), - kvp( "block_id", block_id_str ), - kvp( "irreversible", b_bool{false} )); + block_doc.append( kvp( "block_num", b_int32{static_cast(block_num)} ), + kvp( "block_id", block_id_str ) ); auto v = to_variant_with_abi( *bs->block ); json = fc::json::to_string( v ); From b13ff27ff02bf87acd655bed9729de592a925333 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 27 Jul 2018 15:50:43 -0500 Subject: [PATCH 02/16] Upsert transactions and actions --- plugins/mongo_db_plugin/mongo_db_plugin.cpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index 7a3d24195bf..d8ba2f148f9 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -563,8 +563,8 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti auto process_action = [&](const std::string& trx_id_str, const chain::action& act, bbb::array& act_array, bool cfa) -> auto { auto act_doc = bsoncxx::builder::basic::document(); if( start_block_reached ) { - act_doc.append( kvp( "action_num", b_int32{act_num} ), - kvp( "trx_id", trx_id_str )); + act_doc.append( kvp( "trx_id", trx_id_str ), + kvp( "action_num", b_int32{act_num} ) ); act_doc.append( kvp( "cfa", b_bool{cfa} )); act_doc.append( kvp( "account", act.account.to_string())); act_doc.append( kvp( "name", act.name.to_string())); @@ -585,8 +585,11 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti if( start_block_reached ) { add_data( act_doc, act ); act_array.append( act_doc ); - mongocxx::model::insert_one insert_op{act_doc.view()}; - bulk_actions.append( insert_op ); + mongocxx::model::update_one update_op{make_document( kvp( "trx_id", trx_id_str ), + kvp( "action_num", b_int32{act_num} ) ), + make_document( kvp( "$set", act_doc.view() ) )}; + update_op.upsert( true ); + bulk_actions.append( update_op ); actions_to_write = true; } ++act_num; @@ -642,7 +645,6 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti } if( start_block_reached ) { - act_num = 0; if( !trx.context_free_actions.empty()) { bsoncxx::builder::basic::array action_array; for( const auto& cfa : trx.context_free_actions ) { @@ -708,7 +710,10 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti trans_doc.append( kvp( "createdAt", b_date{now} )); try { - if( !trans.insert_one( trans_doc.view())) { + mongocxx::options::update update_opts{}; + update_opts.upsert( true ); + if( !trans.update_one( make_document( kvp( "trx_id", trx_id_str ) ), + make_document( kvp( "$set", trans_doc.view() ) ), update_opts ) ) { EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert trans ${id}", ("id", trx_id)); } } catch(...) { @@ -1287,7 +1292,7 @@ void mongo_db_plugin_impl::init() { // actions indexes auto actions = mongo_conn[db_name][actions_col]; - actions.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" )); + actions.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1, "action_num" : 1 })xxx" )); // pub_keys indexes auto pub_keys = mongo_conn[db_name][pub_keys_col]; From cb62b0c3d23c382d53996d4fc548055c74a82095 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 3 Aug 2018 14:03:13 -0500 Subject: [PATCH 03/16] Add action_traces collection --- plugins/mongo_db_plugin/mongo_db_plugin.cpp | 107 +++++++++++++++++--- 1 file changed, 94 insertions(+), 13 deletions(-) diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index d8ba2f148f9..edfc7755c03 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -78,6 +78,9 @@ class mongo_db_plugin_impl { void purge_abi_cache(); + void add_action_trace( mongocxx::bulk_write& bulk_action_traces, const chain::action_trace& atrace, + const std::chrono::milliseconds& now ); + void add_data(bsoncxx::builder::basic::document& act_doc, const chain::action& act); void update_account(const chain::action& act); @@ -97,7 +100,7 @@ class mongo_db_plugin_impl { bool configured{false}; bool wipe_database_on_startup{false}; uint32_t start_block_num = 0; - bool start_block_reached = false; + std::atomic_bool start_block_reached{false}; std::string db_name; mongocxx::instance mongo_inst; @@ -118,8 +121,8 @@ class mongo_db_plugin_impl { boost::mutex mtx; boost::condition_variable condition; boost::thread consume_thread; - boost::atomic done{false}; - boost::atomic startup{true}; + std::atomic_bool done{false}; + std::atomic_bool startup{true}; fc::optional chain_id; fc::microseconds abi_serializer_max_time; @@ -153,6 +156,7 @@ class mongo_db_plugin_impl { static const std::string trans_col; static const std::string trans_traces_col; static const std::string actions_col; + static const std::string action_traces_col; static const std::string accounts_col; static const std::string pub_keys_col; static const std::string account_controls_col; @@ -170,6 +174,7 @@ const std::string mongo_db_plugin_impl::blocks_col = "blocks"; const std::string mongo_db_plugin_impl::trans_col = "transactions"; const std::string mongo_db_plugin_impl::trans_traces_col = "transaction_traces"; const std::string mongo_db_plugin_impl::actions_col = "actions"; +const std::string mongo_db_plugin_impl::action_traces_col = "action_traces"; const std::string mongo_db_plugin_impl::accounts_col = "accounts"; const std::string mongo_db_plugin_impl::pub_keys_col = "pub_keys"; const std::string mongo_db_plugin_impl::account_controls_col = "account_controls"; @@ -233,6 +238,11 @@ void mongo_db_plugin_impl::applied_irreversible_block( const chain::block_state_ void mongo_db_plugin_impl::accepted_block( const chain::block_state_ptr& bs ) { try { + if( !start_block_reached ) { + if( bs->block_num >= start_block_num ) { + start_block_reached = true; + } + } queue( block_state_queue, bs ); } catch (fc::exception& e) { elog("FC Exception while accepted_block ${e}", ("e", e.to_string())); @@ -476,8 +486,9 @@ fc::variant mongo_db_plugin_impl::to_variant_with_abi( const T& obj ) { void mongo_db_plugin_impl::process_accepted_transaction( const chain::transaction_metadata_ptr& t ) { try { - // always call since we need to capture setabi on accounts even if not storing transactions - _process_accepted_transaction(t); + if( start_block_reached ) { + _process_accepted_transaction( t ); + } } catch (fc::exception& e) { elog("FC Exception while processing accepted transaction metadata: ${e}", ("e", e.to_detail_string())); } catch (std::exception& e) { @@ -489,9 +500,8 @@ void mongo_db_plugin_impl::process_accepted_transaction( const chain::transactio void mongo_db_plugin_impl::process_applied_transaction( const chain::transaction_trace_ptr& t ) { try { - if( start_block_reached ) { - _process_applied_transaction( t ); - } + // always call since we need to capture setabi on accounts even if not storing transaction traces + _process_applied_transaction( t ); } catch (fc::exception& e) { elog("FC Exception while processing applied transaction trace: ${e}", ("e", e.to_detail_string())); } catch (std::exception& e) { @@ -517,11 +527,6 @@ void mongo_db_plugin_impl::process_irreversible_block(const chain::block_state_p void mongo_db_plugin_impl::process_accepted_block( const chain::block_state_ptr& bs ) { try { - if( !start_block_reached ) { - if( bs->block_num >= start_block_num ) { - start_block_reached = true; - } - } if( start_block_reached ) { _process_accepted_block( bs ); } @@ -732,16 +737,88 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti } } +void +mongo_db_plugin_impl::add_action_trace( mongocxx::bulk_write& bulk_action_traces, const chain::action_trace& atrace, + const std::chrono::milliseconds& now ) +{ + using namespace bsoncxx::types; + using bsoncxx::builder::basic::kvp; + + if( atrace.receipt.receiver == chain::config::system_account_name ) { + update_account( atrace.act ); + } + + if( start_block_reached ) { + auto action_traces_doc = bsoncxx::builder::basic::document{}; + const auto& base = static_cast(atrace); // without inline action traces + + auto v = to_variant_with_abi( base ); + string json = fc::json::to_string( v ); + try { + const auto& value = bsoncxx::from_json( json ); + action_traces_doc.append( bsoncxx::builder::concatenate_doc{value.view()} ); + } catch( bsoncxx::exception& ) { + try { + json = fc::prune_invalid_utf8( json ); + const auto& value = bsoncxx::from_json( json ); + action_traces_doc.append( bsoncxx::builder::concatenate_doc{value.view()} ); + action_traces_doc.append( kvp( "non-utf8-purged", b_bool{true} ) ); + } catch( bsoncxx::exception& e ) { + elog( "Unable to convert action trace JSON to MongoDB JSON: ${e}", ("e", e.what()) ); + elog( " JSON: ${j}", ("j", json) ); + } + } + action_traces_doc.append( kvp( "createdAt", b_date{now} ) ); + + mongocxx::model::insert_one insert_op{action_traces_doc.view()}; + bulk_action_traces.append( insert_op ); + } + + for( const auto& iline_atrace : atrace.inline_traces ) { + add_action_trace( bulk_action_traces, iline_atrace, now ); + } +} + + void mongo_db_plugin_impl::_process_applied_transaction( const chain::transaction_trace_ptr& t ) { using namespace bsoncxx::types; using bsoncxx::builder::basic::kvp; auto trans_traces = mongo_conn[db_name][trans_traces_col]; + auto action_traces = mongo_conn[db_name][action_traces_col]; auto trans_traces_doc = bsoncxx::builder::basic::document{}; auto now = std::chrono::duration_cast( std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); + mongocxx::options::bulk_write bulk_opts; + bulk_opts.ordered(false); + mongocxx::bulk_write bulk_action_traces = action_traces.create_bulk_write(bulk_opts); + bool write_atraces = false; + + for( const auto& atrace : t->action_traces ) { + try { + add_action_trace( bulk_action_traces, atrace, now ); + write_atraces = true; + } catch(...) { + handle_mongo_exception("add action traces", __LINE__); + } + } + + if( write_atraces && start_block_reached ) { + try { + if( !bulk_action_traces.execute() ) { + EOS_ASSERT( false, chain::mongo_db_insert_fail, "Bulk action traces insert failed for transaction trace: ${id}", ("id", t->id)); + } + } catch(...) { + handle_mongo_exception("action traces insert", __LINE__); + } + } + + if( !start_block_reached ) return; + + // transaction trace insert + auto v = to_variant_with_abi( *t ); string json = fc::json::to_string( v ); try { @@ -1294,6 +1371,10 @@ void mongo_db_plugin_impl::init() { auto actions = mongo_conn[db_name][actions_col]; actions.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1, "action_num" : 1 })xxx" )); + // action traces indexes + auto action_traces = mongo_conn[db_name][action_traces_col]; + action_traces.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1, "receipt.global_sequence" : 1 })xxx" )); + // pub_keys indexes auto pub_keys = mongo_conn[db_name][pub_keys_col]; pub_keys.create_index( bsoncxx::from_json( R"xxx({ "account" : 1, "permission" : 1 })xxx" )); From 90c9c9e7a01923da00ac933e9fd73367583ac35f Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 3 Aug 2018 14:29:54 -0500 Subject: [PATCH 04/16] Remove actions collection in lieu of action_traces collection --- plugins/mongo_db_plugin/mongo_db_plugin.cpp | 271 +++++++++----------- 1 file changed, 118 insertions(+), 153 deletions(-) diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index edfc7755c03..827bc34469b 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -155,7 +155,6 @@ class mongo_db_plugin_impl { static const std::string blocks_col; static const std::string trans_col; static const std::string trans_traces_col; - static const std::string actions_col; static const std::string action_traces_col; static const std::string accounts_col; static const std::string pub_keys_col; @@ -173,7 +172,6 @@ const std::string mongo_db_plugin_impl::block_states_col = "block_states"; const std::string mongo_db_plugin_impl::blocks_col = "blocks"; const std::string mongo_db_plugin_impl::trans_col = "transactions"; const std::string mongo_db_plugin_impl::trans_traces_col = "transaction_traces"; -const std::string mongo_db_plugin_impl::actions_col = "actions"; const std::string mongo_db_plugin_impl::action_traces_col = "action_traces"; const std::string mongo_db_plugin_impl::accounts_col = "accounts"; const std::string mongo_db_plugin_impl::pub_keys_col = "pub_keys"; @@ -547,194 +545,164 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti namespace bbb = bsoncxx::builder::basic; auto trans = mongo_conn[db_name][trans_col]; - auto actions = mongo_conn[db_name][actions_col]; accounts = mongo_conn[db_name][accounts_col]; auto trans_doc = bsoncxx::builder::basic::document{}; auto now = std::chrono::duration_cast( - std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); + std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()} ); const auto trx_id = t->id; const auto trx_id_str = trx_id.str(); const auto& trx = t->trx; const chain::transaction_header& trx_header = trx; - bool actions_to_write = false; - mongocxx::options::bulk_write bulk_opts; - bulk_opts.ordered(false); - mongocxx::bulk_write bulk_actions = actions.create_bulk_write(bulk_opts); - int32_t act_num = 0; - auto process_action = [&](const std::string& trx_id_str, const chain::action& act, bbb::array& act_array, bool cfa) -> auto { + auto process_action = [&]( const std::string& trx_id_str, const chain::action& act, bbb::array& act_array, + bool cfa ) -> auto { auto act_doc = bsoncxx::builder::basic::document(); - if( start_block_reached ) { - act_doc.append( kvp( "trx_id", trx_id_str ), - kvp( "action_num", b_int32{act_num} ) ); - act_doc.append( kvp( "cfa", b_bool{cfa} )); - act_doc.append( kvp( "account", act.account.to_string())); - act_doc.append( kvp( "name", act.name.to_string())); - act_doc.append( kvp( "authorization", [&act]( bsoncxx::builder::basic::sub_array subarr ) { - for( const auto& auth : act.authorization ) { - subarr.append( [&auth]( bsoncxx::builder::basic::sub_document subdoc ) { - subdoc.append( kvp( "actor", auth.actor.to_string()), - kvp( "permission", auth.permission.to_string())); - } ); - } - } )); - } - try { - update_account( act ); - } catch (...) { - handle_mongo_exception( "update_account", __LINE__ ); - } - if( start_block_reached ) { - add_data( act_doc, act ); - act_array.append( act_doc ); - mongocxx::model::update_one update_op{make_document( kvp( "trx_id", trx_id_str ), - kvp( "action_num", b_int32{act_num} ) ), - make_document( kvp( "$set", act_doc.view() ) )}; - update_op.upsert( true ); - bulk_actions.append( update_op ); - actions_to_write = true; - } + act_doc.append( kvp( "trx_id", trx_id_str ), + kvp( "action_num", b_int32{act_num} ) ); + act_doc.append( kvp( "cfa", b_bool{cfa} ) ); + act_doc.append( kvp( "account", act.account.to_string() ) ); + act_doc.append( kvp( "name", act.name.to_string() ) ); + act_doc.append( kvp( "authorization", [&act]( bsoncxx::builder::basic::sub_array subarr ) { + for( const auto& auth : act.authorization ) { + subarr.append( [&auth]( bsoncxx::builder::basic::sub_document subdoc ) { + subdoc.append( kvp( "actor", auth.actor.to_string() ), + kvp( "permission", auth.permission.to_string() ) ); + } ); + } + } ) ); + add_data( act_doc, act ); + act_array.append( act_doc ); ++act_num; return act_num; }; - if( start_block_reached ) { - trans_doc.append( kvp( "trx_id", trx_id_str ) ); + trans_doc.append( kvp( "trx_id", trx_id_str ) ); - string signing_keys_json; - if( t->signing_keys.valid()) { - signing_keys_json = fc::json::to_string( t->signing_keys->second ); - } else { - auto signing_keys = trx.get_signature_keys( *chain_id, false, false ); - if( !signing_keys.empty()) { - signing_keys_json = fc::json::to_string( signing_keys ); - } + string signing_keys_json; + if( t->signing_keys.valid() ) { + signing_keys_json = fc::json::to_string( t->signing_keys->second ); + } else { + auto signing_keys = trx.get_signature_keys( *chain_id, false, false ); + if( !signing_keys.empty() ) { + signing_keys_json = fc::json::to_string( signing_keys ); } - string trx_header_json = fc::json::to_string( trx_header ); + } + string trx_header_json = fc::json::to_string( trx_header ); + try { + const auto& trx_header_value = bsoncxx::from_json( trx_header_json ); + trans_doc.append( kvp( "transaction_header", trx_header_value ) ); + } catch( bsoncxx::exception& ) { try { + trx_header_json = fc::prune_invalid_utf8( trx_header_json ); const auto& trx_header_value = bsoncxx::from_json( trx_header_json ); - trans_doc.append( kvp( "transaction_header", trx_header_value )); - } catch( bsoncxx::exception& ) { - try { - trx_header_json = fc::prune_invalid_utf8( trx_header_json ); - const auto& trx_header_value = bsoncxx::from_json( trx_header_json ); - trans_doc.append( kvp( "transaction_header", trx_header_value )); - trans_doc.append( kvp( "non-utf8-purged", b_bool{true})); - } catch( bsoncxx::exception& e ) { - elog( "Unable to convert transaction header JSON to MongoDB JSON: ${e}", ("e", e.what())); - elog( " JSON: ${j}", ("j", trx_header_json)); - } + trans_doc.append( kvp( "transaction_header", trx_header_value ) ); + trans_doc.append( kvp( "non-utf8-purged", b_bool{true} ) ); + } catch( bsoncxx::exception& e ) { + elog( "Unable to convert transaction header JSON to MongoDB JSON: ${e}", ("e", e.what()) ); + elog( " JSON: ${j}", ("j", trx_header_json) ); } - if( !signing_keys_json.empty()) { - try { - const auto& keys_value = bsoncxx::from_json( signing_keys_json ); - trans_doc.append( kvp( "signing_keys", keys_value )); - } catch( bsoncxx::exception& e ) { - // should never fail, so don't attempt to remove invalid utf8 - elog( "Unable to convert signing keys JSON to MongoDB JSON: ${e}", ("e", e.what())); - elog( " JSON: ${j}", ("j", signing_keys_json)); - } + } + if( !signing_keys_json.empty() ) { + try { + const auto& keys_value = bsoncxx::from_json( signing_keys_json ); + trans_doc.append( kvp( "signing_keys", keys_value ) ); + } catch( bsoncxx::exception& e ) { + // should never fail, so don't attempt to remove invalid utf8 + elog( "Unable to convert signing keys JSON to MongoDB JSON: ${e}", ("e", e.what()) ); + elog( " JSON: ${j}", ("j", signing_keys_json) ); } } - if( !trx.actions.empty()) { + if( !trx.actions.empty() ) { bsoncxx::builder::basic::array action_array; for( const auto& act : trx.actions ) { process_action( trx_id_str, act, action_array, false ); } - trans_doc.append( kvp( "actions", action_array )); + trans_doc.append( kvp( "actions", action_array ) ); } - if( start_block_reached ) { - if( !trx.context_free_actions.empty()) { - bsoncxx::builder::basic::array action_array; - for( const auto& cfa : trx.context_free_actions ) { - process_action( trx_id_str, cfa, action_array, true ); - } - trans_doc.append( kvp( "context_free_actions", action_array )); + if( !trx.context_free_actions.empty() ) { + bsoncxx::builder::basic::array action_array; + for( const auto& cfa : trx.context_free_actions ) { + process_action( trx_id_str, cfa, action_array, true ); } + trans_doc.append( kvp( "context_free_actions", action_array ) ); + } - string trx_extensions_json = fc::json::to_string( trx.transaction_extensions ); - string trx_signatures_json = fc::json::to_string( trx.signatures ); - string trx_context_free_data_json = fc::json::to_string( trx.context_free_data ); - - try { - if( !trx_extensions_json.empty()) { - try { - const auto& trx_extensions_value = bsoncxx::from_json( trx_extensions_json ); - trans_doc.append( kvp( "transaction_extensions", trx_extensions_value )); - } catch( bsoncxx::exception& ) { - static_assert( sizeof(std::remove_pointer::type) == sizeof(std::string::value_type), "string type not storable as b_binary" ); - trans_doc.append( kvp( "transaction_extensions", - b_binary{bsoncxx::binary_sub_type::k_binary, - static_cast(trx_extensions_json.size()), - reinterpret_cast(trx_extensions_json.data())} )); - } - } else { - trans_doc.append( kvp( "transaction_extensions", make_array())); - } - - if( !trx_signatures_json.empty()) { - // signatures contain only utf8 - const auto& trx_signatures_value = bsoncxx::from_json( trx_signatures_json ); - trans_doc.append( kvp( "signatures", trx_signatures_value )); - } else { - trans_doc.append( kvp( "signatures", make_array())); - } + string trx_extensions_json = fc::json::to_string( trx.transaction_extensions ); + string trx_signatures_json = fc::json::to_string( trx.signatures ); + string trx_context_free_data_json = fc::json::to_string( trx.context_free_data ); - if( !trx_context_free_data_json.empty()) { - try { - const auto& trx_context_free_data_value = bsoncxx::from_json( trx_context_free_data_json ); - trans_doc.append( kvp( "context_free_data", trx_context_free_data_value )); - } catch( bsoncxx::exception& ) { - static_assert( sizeof(std::remove_pointer::type) == - sizeof(std::remove_pointer::type), "context_free_data not storable as b_binary" ); - bsoncxx::builder::basic::array data_array; - for (auto& cfd : trx.context_free_data) { - data_array.append( - b_binary{bsoncxx::binary_sub_type::k_binary, - static_cast(cfd.size()), - reinterpret_cast(cfd.data())} ); - } - trans_doc.append( kvp( "context_free_data", data_array.view() )); - } - } else { - trans_doc.append( kvp( "context_free_data", make_array())); + try { + if( !trx_extensions_json.empty() ) { + try { + const auto& trx_extensions_value = bsoncxx::from_json( trx_extensions_json ); + trans_doc.append( kvp( "transaction_extensions", trx_extensions_value ) ); + } catch( bsoncxx::exception& ) { + static_assert( + sizeof( std::remove_pointer::type ) == sizeof( std::string::value_type ), + "string type not storable as b_binary" ); + trans_doc.append( kvp( "transaction_extensions", + b_binary{bsoncxx::binary_sub_type::k_binary, + static_cast(trx_extensions_json.size()), + reinterpret_cast(trx_extensions_json.data())} ) ); } - } catch( std::exception& e ) { - elog( "Unable to convert transaction JSON to MongoDB JSON: ${e}", ("e", e.what())); - elog( " JSON: ${j}", ("j", trx_extensions_json)); - elog( " JSON: ${j}", ("j", trx_signatures_json)); - elog( " JSON: ${j}", ("j", trx_context_free_data_json)); + } else { + trans_doc.append( kvp( "transaction_extensions", make_array() ) ); } - trans_doc.append( kvp( "createdAt", b_date{now} )); - - try { - mongocxx::options::update update_opts{}; - update_opts.upsert( true ); - if( !trans.update_one( make_document( kvp( "trx_id", trx_id_str ) ), - make_document( kvp( "$set", trans_doc.view() ) ), update_opts ) ) { - EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert trans ${id}", ("id", trx_id)); - } - } catch(...) { - handle_mongo_exception("trans insert", __LINE__); + if( !trx_signatures_json.empty() ) { + // signatures contain only utf8 + const auto& trx_signatures_value = bsoncxx::from_json( trx_signatures_json ); + trans_doc.append( kvp( "signatures", trx_signatures_value ) ); + } else { + trans_doc.append( kvp( "signatures", make_array() ) ); } - if (actions_to_write) { + if( !trx_context_free_data_json.empty() ) { try { - if( !bulk_actions.execute() ) { - EOS_ASSERT( false, chain::mongo_db_insert_fail, "Bulk actions insert failed for transaction: ${id}", ("id", trx_id_str)); + const auto& trx_context_free_data_value = bsoncxx::from_json( trx_context_free_data_json ); + trans_doc.append( kvp( "context_free_data", trx_context_free_data_value ) ); + } catch( bsoncxx::exception& ) { + static_assert( sizeof( std::remove_pointer::type ) == + sizeof( std::remove_pointer::type ), + "context_free_data not storable as b_binary" ); + bsoncxx::builder::basic::array data_array; + for( auto& cfd : trx.context_free_data ) { + data_array.append( + b_binary{bsoncxx::binary_sub_type::k_binary, + static_cast(cfd.size()), + reinterpret_cast(cfd.data())} ); } - } catch(...) { - handle_mongo_exception("actions insert", __LINE__); + trans_doc.append( kvp( "context_free_data", data_array.view() ) ); } + } else { + trans_doc.append( kvp( "context_free_data", make_array() ) ); } + } catch( std::exception& e ) { + elog( "Unable to convert transaction JSON to MongoDB JSON: ${e}", ("e", e.what()) ); + elog( " JSON: ${j}", ("j", trx_extensions_json) ); + elog( " JSON: ${j}", ("j", trx_signatures_json) ); + elog( " JSON: ${j}", ("j", trx_context_free_data_json) ); } + + trans_doc.append( kvp( "createdAt", b_date{now} ) ); + + try { + mongocxx::options::update update_opts{}; + update_opts.upsert( true ); + if( !trans.update_one( make_document( kvp( "trx_id", trx_id_str ) ), + make_document( kvp( "$set", trans_doc.view() ) ), update_opts ) ) { + EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert trans ${id}", ("id", trx_id) ); + } + } catch( ... ) { + handle_mongo_exception( "trans insert", __LINE__ ); + } + } void @@ -1036,6 +1004,7 @@ void mongo_db_plugin_impl::add_data( bsoncxx::builder::basic::document& act_doc, const auto& value = bsoncxx::from_json( json ); act_doc.append( kvp( "data", value )); + act_doc.append( kvp( "hex_data", fc::variant( act.data ).as_string())); return; } catch( bsoncxx::exception& e ) { ilog( "Unable to convert EOS JSON to MongoDB JSON: ${e}", ("e", e.what())); @@ -1308,7 +1277,7 @@ void mongo_db_plugin_impl::wipe_database() { auto blocks = mongo_conn[db_name][blocks_col]; auto trans = mongo_conn[db_name][trans_col]; auto trans_traces = mongo_conn[db_name][trans_traces_col]; - auto actions = mongo_conn[db_name][actions_col]; + auto action_traces = mongo_conn[db_name][action_traces_col]; accounts = mongo_conn[db_name][accounts_col]; auto pub_keys = mongo_conn[db_name][pub_keys_col]; auto account_controls = mongo_conn[db_name][account_controls_col]; @@ -1317,7 +1286,7 @@ void mongo_db_plugin_impl::wipe_database() { blocks.drop(); trans.drop(); trans_traces.drop(); - actions.drop(); + action_traces.drop(); accounts.drop(); pub_keys.drop(); account_controls.drop(); @@ -1367,13 +1336,9 @@ void mongo_db_plugin_impl::init() { auto trans_trace = mongo_conn[db_name][trans_traces_col]; trans_trace.create_index( bsoncxx::from_json( R"xxx({ "id" : 1 })xxx" )); - // actions indexes - auto actions = mongo_conn[db_name][actions_col]; - actions.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1, "action_num" : 1 })xxx" )); - // action traces indexes auto action_traces = mongo_conn[db_name][action_traces_col]; - action_traces.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1, "receipt.global_sequence" : 1 })xxx" )); + action_traces.create_index( bsoncxx::from_json( R"xxx({ "trx_id" : 1 })xxx" )); // pub_keys indexes auto pub_keys = mongo_conn[db_name][pub_keys_col]; From c02c2052a29262c328bcf58a8dc6a5c3582f8f06 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 3 Aug 2018 14:45:09 -0500 Subject: [PATCH 05/16] Process transaction traces first now since abi is captured from them --- plugins/mongo_db_plugin/mongo_db_plugin.cpp | 24 ++++++++++----------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index 827bc34469b..7139e5551c4 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -293,28 +293,28 @@ void mongo_db_plugin_impl::consume_blocks() { // process transactions auto start_time = fc::time_point::now(); - auto size = transaction_metadata_process_queue.size(); - while (!transaction_metadata_process_queue.empty()) { - const auto& t = transaction_metadata_process_queue.front(); - process_accepted_transaction(t); - transaction_metadata_process_queue.pop_front(); + auto size = transaction_trace_process_queue.size(); + while (!transaction_trace_process_queue.empty()) { + const auto& t = transaction_trace_process_queue.front(); + process_applied_transaction(t); + transaction_trace_process_queue.pop_front(); } auto time = fc::time_point::now() - start_time; auto per = size > 0 ? time.count()/size : 0; if( time > fc::microseconds(500000) ) // reduce logging, .5 secs - ilog( "process_accepted_transaction, time per: ${p}, size: ${s}, time: ${t}", ("s", size)( "t", time )( "p", per )); + ilog( "process_applied_transaction, time per: ${p}, size: ${s}, time: ${t}", ("s", size)("t", time)("p", per) ); start_time = fc::time_point::now(); - size = transaction_trace_process_queue.size(); - while (!transaction_trace_process_queue.empty()) { - const auto& t = transaction_trace_process_queue.front(); - process_applied_transaction(t); - transaction_trace_process_queue.pop_front(); + size = transaction_metadata_process_queue.size(); + while (!transaction_metadata_process_queue.empty()) { + const auto& t = transaction_metadata_process_queue.front(); + process_accepted_transaction(t); + transaction_metadata_process_queue.pop_front(); } time = fc::time_point::now() - start_time; per = size > 0 ? time.count()/size : 0; if( time > fc::microseconds(500000) ) // reduce logging, .5 secs - ilog( "process_applied_transaction, time per: ${p}, size: ${s}, time: ${t}", ("s", size)("t", time)("p", per) ); + ilog( "process_accepted_transaction, time per: ${p}, size: ${s}, time: ${t}", ("s", size)( "t", time )( "p", per )); // process blocks start_time = fc::time_point::now(); From d0f9a3706a79d6a539742659dae827cda27c8b88 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 3 Aug 2018 15:36:12 -0500 Subject: [PATCH 06/16] Add base_action_trace as a known type so action data is expanded when used --- libraries/chain/include/eosio/chain/abi_serializer.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/libraries/chain/include/eosio/chain/abi_serializer.hpp b/libraries/chain/include/eosio/chain/abi_serializer.hpp index 3d0c05caea2..3c53f903968 100644 --- a/libraries/chain/include/eosio/chain/abi_serializer.hpp +++ b/libraries/chain/include/eosio/chain/abi_serializer.hpp @@ -134,6 +134,7 @@ namespace impl { std::is_same::value || std::is_same::value || std::is_same::value || + std::is_same::value || std::is_same::value || std::is_same::value || std::is_same::value || From bdfd88a498360de1e1e366945c6cf317e2e661ef Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Sat, 4 Aug 2018 08:48:22 -0500 Subject: [PATCH 07/16] Only update account for executed actions --- plugins/mongo_db_plugin/mongo_db_plugin.cpp | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index 7139e5551c4..a1c44e8273d 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -79,7 +79,7 @@ class mongo_db_plugin_impl { void purge_abi_cache(); void add_action_trace( mongocxx::bulk_write& bulk_action_traces, const chain::action_trace& atrace, - const std::chrono::milliseconds& now ); + bool executed, const std::chrono::milliseconds& now ); void add_data(bsoncxx::builder::basic::document& act_doc, const chain::action& act); void update_account(const chain::action& act); @@ -707,12 +707,12 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti void mongo_db_plugin_impl::add_action_trace( mongocxx::bulk_write& bulk_action_traces, const chain::action_trace& atrace, - const std::chrono::milliseconds& now ) + bool executed, const std::chrono::milliseconds& now ) { using namespace bsoncxx::types; using bsoncxx::builder::basic::kvp; - if( atrace.receipt.receiver == chain::config::system_account_name ) { + if( executed && atrace.receipt.receiver == chain::config::system_account_name ) { update_account( atrace.act ); } @@ -743,7 +743,7 @@ mongo_db_plugin_impl::add_action_trace( mongocxx::bulk_write& bulk_action_traces } for( const auto& iline_atrace : atrace.inline_traces ) { - add_action_trace( bulk_action_traces, iline_atrace, now ); + add_action_trace( bulk_action_traces, iline_atrace, executed, now ); } } @@ -763,10 +763,11 @@ void mongo_db_plugin_impl::_process_applied_transaction( const chain::transactio bulk_opts.ordered(false); mongocxx::bulk_write bulk_action_traces = action_traces.create_bulk_write(bulk_opts); bool write_atraces = false; + bool executed = t->receipt.valid() && t->receipt->status == chain::transaction_receipt_header::executed; for( const auto& atrace : t->action_traces ) { try { - add_action_trace( bulk_action_traces, atrace, now ); + add_action_trace( bulk_action_traces, atrace, executed, now ); write_atraces = true; } catch(...) { handle_mongo_exception("add action traces", __LINE__); From 38d70e604384e5c7c67613b5250167efee149d4f Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Sat, 4 Aug 2018 15:16:59 -0500 Subject: [PATCH 08/16] Add ability to override type pack/unpack --- libraries/chain/abi_serializer.cpp | 5 +++++ libraries/chain/include/eosio/chain/abi_serializer.hpp | 6 ++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/libraries/chain/abi_serializer.cpp b/libraries/chain/abi_serializer.cpp index 967796d9afe..60303a5268f 100644 --- a/libraries/chain/abi_serializer.cpp +++ b/libraries/chain/abi_serializer.cpp @@ -55,6 +55,11 @@ namespace eosio { namespace chain { set_abi(abi, max_serialization_time); } + void abi_serializer::add_specialized_unpack_pack( const string& name, + std::pair unpack_pack ) { + built_in_types[name] = std::move( unpack_pack ); + } + void abi_serializer::configure_built_in_types() { built_in_types.emplace("bool", pack_unpack()); diff --git a/libraries/chain/include/eosio/chain/abi_serializer.hpp b/libraries/chain/include/eosio/chain/abi_serializer.hpp index 3c53f903968..1e5bca6b5b3 100644 --- a/libraries/chain/include/eosio/chain/abi_serializer.hpp +++ b/libraries/chain/include/eosio/chain/abi_serializer.hpp @@ -86,11 +86,13 @@ struct abi_serializer { return false; } - static const size_t max_recursion_depth = 32; // arbitrary depth to prevent infinite recursion - typedef std::function&, bool, bool)> unpack_function; typedef std::function&, bool, bool)> pack_function; + void add_specialized_unpack_pack( const string& name, std::pair unpack_pack ); + + static const size_t max_recursion_depth = 32; // arbitrary depth to prevent infinite recursion + private: map typedefs; From 4584e9b5091103b1c07971ded2487a51beeed90f Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Sat, 4 Aug 2018 15:18:02 -0500 Subject: [PATCH 09/16] Override abi serialization of eosio::setabi.abi to store abi as abi_def instead of bytes in mongodb. --- plugins/mongo_db_plugin/mongo_db_plugin.cpp | 46 +++++++++++++++++---- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index a1c44e8273d..b4ac1090848 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -463,7 +463,37 @@ optional mongo_db_plugin_impl::get_abi_serializer( account_name abi_cache entry; entry.account = n; entry.last_accessed = fc::time_point::now(); - entry.serializer.emplace( abi, abi_serializer_max_time ); + abi_serializer abis; + if( n == chain::config::system_account_name ) { + // redefine eosio setabi.abi from bytes to abi_def + // Done so that abi is stored as abi_def in mongo instead of as bytes + auto itr = std::find_if( abi.structs.begin(), abi.structs.end(), + []( const auto& s ) { return s.name == "setabi"; } ); + if( itr != abi.structs.end() ) { + auto itr2 = std::find_if( itr->fields.begin(), itr->fields.end(), + []( const auto& f ) { return f.name == "abi"; } ); + if( itr2 != itr->fields.end() ) { + if( itr2->type == "bytes" ) { + itr2->type = "abi_def"; + // unpack setabi.abi as abi_def instead of as bytes + abis.add_specialized_unpack_pack( "abi_def", + std::make_pair( + []( fc::datastream& stream, bool is_array, bool is_optional ) -> fc::variant { + EOS_ASSERT( !is_array && !is_optional, chain::mongo_db_exception, "unexpected abi_def"); + chain::bytes temp; + fc::raw::unpack( stream, temp ); + return fc::variant( fc::raw::unpack( temp ) ); + }, + []( const fc::variant& var, fc::datastream& ds, bool is_array, bool is_optional ) { + EOS_ASSERT( false, chain::mongo_db_exception, "never called" ); + } + ) ); + } + } + } + } + abis.set_abi( abi, abi_serializer_max_time ); + entry.serializer.emplace( std::move( abis ) ); abi_cache_index.insert( entry ); return entry.serializer; } @@ -718,7 +748,7 @@ mongo_db_plugin_impl::add_action_trace( mongocxx::bulk_write& bulk_action_traces if( start_block_reached ) { auto action_traces_doc = bsoncxx::builder::basic::document{}; - const auto& base = static_cast(atrace); // without inline action traces + const chain::base_action_trace& base = atrace; // without inline action traces auto v = to_variant_with_abi( base ); string json = fc::json::to_string( v ); @@ -1218,13 +1248,13 @@ void mongo_db_plugin_impl::update_account(const chain::action& act) abi_cache_index.erase( setabi.account ); - auto from_account = find_account( accounts, setabi.account ); - if( !from_account ) { + auto account = find_account( accounts, setabi.account ); + if( !account ) { create_account( accounts, setabi.account, now ); - from_account = find_account( accounts, setabi.account ); + account = find_account( accounts, setabi.account ); } - if( from_account ) { - const abi_def& abi_def = fc::raw::unpack( setabi.abi ); + if( account ) { + abi_def abi_def = fc::raw::unpack( setabi.abi ); const string json_str = fc::json::to_string( abi_def ); try{ @@ -1233,7 +1263,7 @@ void mongo_db_plugin_impl::update_account(const chain::action& act) kvp( "updatedAt", b_date{now} )))); try { - if( !accounts.update_one( make_document( kvp( "_id", from_account->view()["_id"].get_oid())), + if( !accounts.update_one( make_document( kvp( "_id", account->view()["_id"].get_oid())), update_from.view())) { EOS_ASSERT( false, chain::mongo_db_update_fail, "Failed to udpdate account ${n}", ("n", setabi.account)); } From 82d9f97ac209280f76025b833ec6e1bcc5e67abe Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Sat, 4 Aug 2018 16:15:51 -0500 Subject: [PATCH 10/16] Simplified transaction serialization --- plugins/mongo_db_plugin/mongo_db_plugin.cpp | 193 +++----------------- 1 file changed, 22 insertions(+), 171 deletions(-) diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index b4ac1090848..23ded113701 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -81,7 +81,6 @@ class mongo_db_plugin_impl { void add_action_trace( mongocxx::bulk_write& bulk_action_traces, const chain::action_trace& atrace, bool executed, const std::chrono::milliseconds& now ); - void add_data(bsoncxx::builder::basic::document& act_doc, const chain::action& act); void update_account(const chain::action& act); void add_pub_keys( const vector& keys, const account_name& name, @@ -575,42 +574,35 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti namespace bbb = bsoncxx::builder::basic; auto trans = mongo_conn[db_name][trans_col]; - accounts = mongo_conn[db_name][accounts_col]; auto trans_doc = bsoncxx::builder::basic::document{}; auto now = std::chrono::duration_cast( std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()} ); - const auto trx_id = t->id; + const auto& trx_id = t->id; const auto trx_id_str = trx_id.str(); const auto& trx = t->trx; - const chain::transaction_header& trx_header = trx; - - int32_t act_num = 0; - auto process_action = [&]( const std::string& trx_id_str, const chain::action& act, bbb::array& act_array, - bool cfa ) -> auto { - auto act_doc = bsoncxx::builder::basic::document(); - act_doc.append( kvp( "trx_id", trx_id_str ), - kvp( "action_num", b_int32{act_num} ) ); - act_doc.append( kvp( "cfa", b_bool{cfa} ) ); - act_doc.append( kvp( "account", act.account.to_string() ) ); - act_doc.append( kvp( "name", act.name.to_string() ) ); - act_doc.append( kvp( "authorization", [&act]( bsoncxx::builder::basic::sub_array subarr ) { - for( const auto& auth : act.authorization ) { - subarr.append( [&auth]( bsoncxx::builder::basic::sub_document subdoc ) { - subdoc.append( kvp( "actor", auth.actor.to_string() ), - kvp( "permission", auth.permission.to_string() ) ); - } ); - } - } ) ); - add_data( act_doc, act ); - act_array.append( act_doc ); - ++act_num; - return act_num; - }; trans_doc.append( kvp( "trx_id", trx_id_str ) ); + auto v = to_variant_with_abi( trx ); + string trx_json = fc::json::to_string( v ); + + try { + const auto& trx_value = bsoncxx::from_json( trx_json ); + trans_doc.append( bsoncxx::builder::concatenate_doc{trx_value.view()} ); + } catch( bsoncxx::exception& ) { + try { + trx_json = fc::prune_invalid_utf8( trx_json ); + const auto& trx_value = bsoncxx::from_json( trx_json ); + trans_doc.append( bsoncxx::builder::concatenate_doc{trx_value.view()} ); + trans_doc.append( kvp( "non-utf8-purged", b_bool{true} ) ); + } catch( bsoncxx::exception& e ) { + elog( "Unable to convert transaction JSON to MongoDB JSON: ${e}", ("e", e.what()) ); + elog( " JSON: ${j}", ("j", trx_json) ); + } + } + string signing_keys_json; if( t->signing_keys.valid() ) { signing_keys_json = fc::json::to_string( t->signing_keys->second ); @@ -620,22 +612,7 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti signing_keys_json = fc::json::to_string( signing_keys ); } } - string trx_header_json = fc::json::to_string( trx_header ); - try { - const auto& trx_header_value = bsoncxx::from_json( trx_header_json ); - trans_doc.append( kvp( "transaction_header", trx_header_value ) ); - } catch( bsoncxx::exception& ) { - try { - trx_header_json = fc::prune_invalid_utf8( trx_header_json ); - const auto& trx_header_value = bsoncxx::from_json( trx_header_json ); - trans_doc.append( kvp( "transaction_header", trx_header_value ) ); - trans_doc.append( kvp( "non-utf8-purged", b_bool{true} ) ); - } catch( bsoncxx::exception& e ) { - elog( "Unable to convert transaction header JSON to MongoDB JSON: ${e}", ("e", e.what()) ); - elog( " JSON: ${j}", ("j", trx_header_json) ); - } - } if( !signing_keys_json.empty() ) { try { const auto& keys_value = bsoncxx::from_json( signing_keys_json ); @@ -647,78 +624,9 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti } } - if( !trx.actions.empty() ) { - bsoncxx::builder::basic::array action_array; - for( const auto& act : trx.actions ) { - process_action( trx_id_str, act, action_array, false ); - } - trans_doc.append( kvp( "actions", action_array ) ); - } - - if( !trx.context_free_actions.empty() ) { - bsoncxx::builder::basic::array action_array; - for( const auto& cfa : trx.context_free_actions ) { - process_action( trx_id_str, cfa, action_array, true ); - } - trans_doc.append( kvp( "context_free_actions", action_array ) ); - } - - string trx_extensions_json = fc::json::to_string( trx.transaction_extensions ); - string trx_signatures_json = fc::json::to_string( trx.signatures ); - string trx_context_free_data_json = fc::json::to_string( trx.context_free_data ); - - try { - if( !trx_extensions_json.empty() ) { - try { - const auto& trx_extensions_value = bsoncxx::from_json( trx_extensions_json ); - trans_doc.append( kvp( "transaction_extensions", trx_extensions_value ) ); - } catch( bsoncxx::exception& ) { - static_assert( - sizeof( std::remove_pointer::type ) == sizeof( std::string::value_type ), - "string type not storable as b_binary" ); - trans_doc.append( kvp( "transaction_extensions", - b_binary{bsoncxx::binary_sub_type::k_binary, - static_cast(trx_extensions_json.size()), - reinterpret_cast(trx_extensions_json.data())} ) ); - } - } else { - trans_doc.append( kvp( "transaction_extensions", make_array() ) ); - } - - if( !trx_signatures_json.empty() ) { - // signatures contain only utf8 - const auto& trx_signatures_value = bsoncxx::from_json( trx_signatures_json ); - trans_doc.append( kvp( "signatures", trx_signatures_value ) ); - } else { - trans_doc.append( kvp( "signatures", make_array() ) ); - } - - if( !trx_context_free_data_json.empty() ) { - try { - const auto& trx_context_free_data_value = bsoncxx::from_json( trx_context_free_data_json ); - trans_doc.append( kvp( "context_free_data", trx_context_free_data_value ) ); - } catch( bsoncxx::exception& ) { - static_assert( sizeof( std::remove_pointer::type ) == - sizeof( std::remove_pointer::type ), - "context_free_data not storable as b_binary" ); - bsoncxx::builder::basic::array data_array; - for( auto& cfd : trx.context_free_data ) { - data_array.append( - b_binary{bsoncxx::binary_sub_type::k_binary, - static_cast(cfd.size()), - reinterpret_cast(cfd.data())} ); - } - trans_doc.append( kvp( "context_free_data", data_array.view() ) ); - } - } else { - trans_doc.append( kvp( "context_free_data", make_array() ) ); - } - } catch( std::exception& e ) { - elog( "Unable to convert transaction JSON to MongoDB JSON: ${e}", ("e", e.what()) ); - elog( " JSON: ${j}", ("j", trx_extensions_json) ); - elog( " JSON: ${j}", ("j", trx_signatures_json) ); - elog( " JSON: ${j}", ("j", trx_context_free_data_json) ); - } + trans_doc.append( kvp( "accepted", b_bool{t->accepted} ) ); + trans_doc.append( kvp( "implicit", b_bool{t->implicit} ) ); + trans_doc.append( kvp( "scheduled", b_bool{t->scheduled} ) ); trans_doc.append( kvp( "createdAt", b_date{now} ) ); @@ -1003,63 +911,6 @@ void mongo_db_plugin_impl::_process_irreversible_block(const chain::block_state_ } } -void mongo_db_plugin_impl::add_data( bsoncxx::builder::basic::document& act_doc, const chain::action& act ) -{ - using bsoncxx::builder::basic::kvp; - using bsoncxx::builder::basic::make_document; - try { - if( act.account == chain::config::system_account_name ) { - if( act.name == mongo_db_plugin_impl::setabi ) { - auto setabi = act.data_as(); - try { - const abi_def& abi_def = fc::raw::unpack( setabi.abi ); - const string json_str = fc::json::to_string( abi_def ); - - act_doc.append( - kvp( "data", make_document( kvp( "account", setabi.account.to_string()), - kvp( "abi_def", bsoncxx::from_json( json_str ))))); - return; - } catch( bsoncxx::exception& ) { - // better error handling below - } catch( fc::exception& e ) { - ilog( "Unable to convert action abi_def to json for ${n}", ("n", setabi.account.to_string())); - } - } - } - auto serializer = get_abi_serializer( act.account ); - if( serializer.valid() ) { - string json; - try { - auto v = serializer->binary_to_variant( serializer->get_action_type( act.name ), act.data, abi_serializer_max_time ); - json = fc::json::to_string( v ); - - const auto& value = bsoncxx::from_json( json ); - act_doc.append( kvp( "data", value )); - act_doc.append( kvp( "hex_data", fc::variant( act.data ).as_string())); - return; - } catch( bsoncxx::exception& e ) { - ilog( "Unable to convert EOS JSON to MongoDB JSON: ${e}", ("e", e.what())); - ilog( " EOS JSON: ${j}", ("j", json)); - ilog( " Storing data has hex." ); - } - } - } catch( std::exception& e ) { - ilog( "Unable to convert action.data to ABI: ${s}::${n}, std what: ${e}", - ("s", act.account)( "n", act.name )( "e", e.what())); - } catch (fc::exception& e) { - if (act.name != "onblock") { // eosio::onblock not in original eosio.system abi - ilog( "Unable to convert action.data to ABI: ${s}::${n}, fc exception: ${e}", - ("s", act.account)( "n", act.name )( "e", e.to_detail_string())); - } - } catch( ... ) { - ilog( "Unable to convert action.data to ABI: ${s}::${n}, unknown exception", - ("s", act.account)( "n", act.name )); - } - // if anything went wrong just store raw hex_data - act_doc.append( kvp( "hex_data", fc::variant( act.data ).as_string())); -} - - void mongo_db_plugin_impl::add_pub_keys( const vector& keys, const account_name& name, const permission_name& permission, const std::chrono::milliseconds& now ) { From efbda1015779fe1aa40d25d75bcb7e29864a6dc9 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 6 Aug 2018 13:37:26 -0500 Subject: [PATCH 11/16] Update mongo tests for new action_traces collection and new transaction layout --- tests/Node.py | 10 +++++----- tests/nodeos_run_test.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/Node.py b/tests/Node.py index 8f0b0deba60..89cbba5df60 100644 --- a/tests/Node.py +++ b/tests/Node.py @@ -334,8 +334,8 @@ def getBlockIdByTransId(self, transId, delayedRetry=True): key="[trx][trx][ref_block_num]" refBlockNum=trans["trx"]["trx"]["ref_block_num"] else: - key="[transaction_header][ref_block_num]" - refBlockNum=trans["transaction_header"]["ref_block_num"] + key="[ref_block_num]" + refBlockNum=trans["ref_block_num"] refBlockNum=int(refBlockNum)+1 except (TypeError, ValueError, KeyError) as _: Utils.Print("transaction%s not found. Transaction: %s" % (key, trans)) @@ -366,10 +366,10 @@ def getBlockIdByTransIdMdb(self, transId): refBlockNum=None try: - refBlockNum=trans["transaction_header"]["ref_block_num"] + refBlockNum=trans["ref_block_num"] refBlockNum=int(refBlockNum)+1 except (TypeError, ValueError, KeyError) as _: - Utils.Print("transaction[transaction_header][ref_block_num] not found. Transaction: %s" % (trans)) + Utils.Print("transaction[ref_block_num] not found. Transaction: %s" % (trans)) return None headBlockNum=self.getHeadBlockNum() @@ -688,7 +688,7 @@ def getActionsMdb(self, account, pos=-1, offset=-1, exitOnError=False): assert(isinstance(offset, int)) cmd="%s %s" % (Utils.MongoPath, self.mongoEndpointArgs) - subcommand='db.actions.find({$or: [{"data.from":"%s"},{"data.to":"%s"}]}).sort({"_id":%d}).limit(%d)' % (account.name, account.name, pos, abs(offset)) + subcommand='db.action_traces.find({$or: [{"act.data.from":"%s"},{"act.data.to":"%s"}]}).sort({"_id":%d}).limit(%d)' % (account.name, account.name, pos, abs(offset)) if Utils.Debug: Utils.Print("cmd: echo '%s' | %s" % (subcommand, cmd)) try: actions=Node.runMongoCmdReturnJson(cmd.split(), subcommand, exitOnError=exitOnError) diff --git a/tests/nodeos_run_test.py b/tests/nodeos_run_test.py index 5353fba7ae7..f5cc8676a59 100755 --- a/tests/nodeos_run_test.py +++ b/tests/nodeos_run_test.py @@ -275,7 +275,7 @@ if not enableMongo: assert(actions["actions"][0]["action_trace"]["act"]["name"] == "transfer") else: - assert(actions["name"] == "transfer") + assert(actions["act"]["name"] == "transfer") except (AssertionError, TypeError, KeyError) as _: Print("Action validation failed. Actions: %s" % (actions)) raise From 5dd4647b1f611cedec6be23d954673083728cf76 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 6 Aug 2018 16:00:19 -0500 Subject: [PATCH 12/16] -Add mongodb-filter-on and mongodb-filter-out similar to history plugin. Thanks to Scott Sallinen scotts@ece.ubc.ca for implementation. --- plugins/mongo_db_plugin/mongo_db_plugin.cpp | 103 ++++++++++++++++++-- 1 file changed, 96 insertions(+), 7 deletions(-) diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index 23ded113701..01f558da46c 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -48,6 +49,18 @@ using chain::packed_transaction; static appbase::abstract_plugin& _mongo_db_plugin = app().register_plugin(); +struct filter_entry { + name receiver; + name action; + name actor; + std::tuple key() const { + return std::make_tuple(receiver, action, actor); + } + friend bool operator<( const filter_entry& a, const filter_entry& b ) { + return a.key() < b.key(); + } +}; + class mongo_db_plugin_impl { public: mongo_db_plugin_impl(); @@ -78,7 +91,7 @@ class mongo_db_plugin_impl { void purge_abi_cache(); - void add_action_trace( mongocxx::bulk_write& bulk_action_traces, const chain::action_trace& atrace, + bool add_action_trace( mongocxx::bulk_write& bulk_action_traces, const chain::action_trace& atrace, bool executed, const std::chrono::milliseconds& now ); void update_account(const chain::action& act); @@ -91,6 +104,9 @@ class mongo_db_plugin_impl { const std::chrono::milliseconds& now ); void remove_account_control( const account_name& name, const permission_name& permission ); + /// @return true if act should be added to mongodb, false to skip it + bool filter_include( const chain::action& act ) const; + void init(); void wipe_database(); @@ -101,6 +117,10 @@ class mongo_db_plugin_impl { uint32_t start_block_num = 0; std::atomic_bool start_block_reached{false}; + bool filter_on_star = true; + std::set filter_on; + std::set filter_out; + std::string db_name; mongocxx::instance mongo_inst; mongocxx::client mongo_conn; @@ -176,6 +196,36 @@ const std::string mongo_db_plugin_impl::accounts_col = "accounts"; const std::string mongo_db_plugin_impl::pub_keys_col = "pub_keys"; const std::string mongo_db_plugin_impl::account_controls_col = "account_controls"; +bool mongo_db_plugin_impl::filter_include( const chain::action& act ) const { + bool include = false; + if( filter_on_star ) { + include = true; + } + if( filter_on.find( {act.account, act.name, 0} ) != filter_on.end() ) { + include = true; + } + for( const auto& a : act.authorization ) { + if( filter_on.find( {act.account, act.name, a.actor} ) != filter_on.end() ) { + include = true; + } + } + + if( !include ) { return false; } + + if( filter_out.find( {act.account, 0, 0} ) != filter_out.end() ) { + return false; + } + if( filter_out.find( {act.account, act.name, 0} ) != filter_out.end() ) { + return false; + } + for( const auto& a : act.authorization ) { + if( filter_out.find( {act.account, act.name, a.actor} ) != filter_out.end() ) { + return false; + } + } + return true; +} + template void mongo_db_plugin_impl::queue( Queue& queue, const Entry& e ) { boost::mutex::scoped_lock lock( mtx ); @@ -643,7 +693,7 @@ void mongo_db_plugin_impl::_process_accepted_transaction( const chain::transacti } -void +bool mongo_db_plugin_impl::add_action_trace( mongocxx::bulk_write& bulk_action_traces, const chain::action_trace& atrace, bool executed, const std::chrono::milliseconds& now ) { @@ -654,7 +704,8 @@ mongo_db_plugin_impl::add_action_trace( mongocxx::bulk_write& bulk_action_traces update_account( atrace.act ); } - if( start_block_reached ) { + bool added = false; + if( start_block_reached && filter_include( atrace.act ) ) { auto action_traces_doc = bsoncxx::builder::basic::document{}; const chain::base_action_trace& base = atrace; // without inline action traces @@ -678,11 +729,14 @@ mongo_db_plugin_impl::add_action_trace( mongocxx::bulk_write& bulk_action_traces mongocxx::model::insert_one insert_op{action_traces_doc.view()}; bulk_action_traces.append( insert_op ); + added = true; } for( const auto& iline_atrace : atrace.inline_traces ) { - add_action_trace( bulk_action_traces, iline_atrace, executed, now ); + added |= add_action_trace( bulk_action_traces, iline_atrace, executed, now ); } + + return added; } @@ -705,14 +759,13 @@ void mongo_db_plugin_impl::_process_applied_transaction( const chain::transactio for( const auto& atrace : t->action_traces ) { try { - add_action_trace( bulk_action_traces, atrace, executed, now ); - write_atraces = true; + write_atraces |= add_action_trace( bulk_action_traces, atrace, executed, now ); } catch(...) { handle_mongo_exception("add action traces", __LINE__); } } - if( write_atraces && start_block_reached ) { + if( write_atraces ) { try { if( !bulk_action_traces.execute() ) { EOS_ASSERT( false, chain::mongo_db_insert_fail, "Bulk action traces insert failed for transaction trace: ${id}", ("id", t->id)); @@ -1273,6 +1326,11 @@ void mongo_db_plugin::set_program_options(options_description& cli, options_desc "MongoDB URI connection string, see: https://docs.mongodb.com/master/reference/connection-string/." " If not specified then plugin is disabled. Default database 'EOS' is used if not specified in URI." " Example: mongodb://127.0.0.1:27017/EOS") + ("mongodb-filter-on", bpo::value>()->composing(), + "Mongodb: Track actions which match receiver:action:actor. Actor may be blank to include all. Receiver and Action may not be blank. Default is * include everything.") + ("mongodb-filter-out", bpo::value>()->composing(), + "Mongodb: Do not track actions which match receiver:action:actor. Action and Actor both blank excludes all from reciever. Actor blank excludes all from reciever:action. Receiver may not be blank.") + ; } @@ -1308,6 +1366,37 @@ void mongo_db_plugin::plugin_initialize(const variables_map& options) if( options.count( "mongodb-block-start" )) { my->start_block_num = options.at( "mongodb-block-start" ).as(); } + if( options.count( "mongodb-filter-on" )) { + auto fo = options.at( "mongodb-filter-on" ).as>(); + for( auto& s : fo ) { + if( s == "*" ) { + my->filter_on_star = true; + break; + } + std::vector v; + boost::split( v, s, boost::is_any_of( ":" )); + EOS_ASSERT( v.size() == 3, fc::invalid_arg_exception, "Invalid value ${s} for --mongodb-filter-on", ("s", s)); + filter_entry fe{v[0], v[1], v[2]}; + EOS_ASSERT( fe.receiver.value && fe.action.value, fc::invalid_arg_exception, + "Invalid value ${s} for --mongodb-filter-on", ("s", s)); + my->filter_on.insert( fe ); + } + } else { + my->filter_on_star = true; + } + if( options.count( "mongodb-filter-out" )) { + auto fo = options.at( "mongodb-filter-out" ).as>(); + for( auto& s : fo ) { + std::vector v; + boost::split( v, s, boost::is_any_of( ":" )); + EOS_ASSERT( v.size() == 3, fc::invalid_arg_exception, "Invalid value ${s} for --mongodb-filter-out", ("s", s)); + filter_entry fe{v[0], v[1], v[2]}; + EOS_ASSERT( fe.receiver.value, fc::invalid_arg_exception, + "Invalid value ${s} for --mongodb-filter-out", ("s", s)); + my->filter_out.insert( fe ); + } + } + if( my->start_block_num == 0 ) { my->start_block_reached = true; } From d24e006aa7957834e7a7212513ad93e35b64906a Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Mon, 6 Aug 2018 16:45:36 -0500 Subject: [PATCH 13/16] Add options to not store blocks, block-states, transactions, transaction-traces, and action-traces --- plugins/mongo_db_plugin/mongo_db_plugin.cpp | 245 ++++++++++++-------- 1 file changed, 144 insertions(+), 101 deletions(-) diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index 01f558da46c..5fe9ac61df6 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -120,6 +120,11 @@ class mongo_db_plugin_impl { bool filter_on_star = true; std::set filter_on; std::set filter_out; + bool store_blocks = true; + bool store_block_states = true; + bool store_transactions = true; + bool store_transaction_traces = true; + bool store_action_traces = true; std::string db_name; mongocxx::instance mongo_inst; @@ -249,7 +254,9 @@ void mongo_db_plugin_impl::queue( Queue& queue, const Entry& e ) { void mongo_db_plugin_impl::accepted_transaction( const chain::transaction_metadata_ptr& t ) { try { - queue( transaction_metadata_queue, t ); + if( store_transactions ) { + queue( transaction_metadata_queue, t ); + } } catch (fc::exception& e) { elog("FC Exception while accepted_transaction ${e}", ("e", e.to_string())); } catch (std::exception& e) { @@ -261,6 +268,7 @@ void mongo_db_plugin_impl::accepted_transaction( const chain::transaction_metada void mongo_db_plugin_impl::applied_transaction( const chain::transaction_trace_ptr& t ) { try { + // always queue since account information always gathered queue( transaction_trace_queue, t ); } catch (fc::exception& e) { elog("FC Exception while applied_transaction ${e}", ("e", e.to_string())); @@ -273,7 +281,9 @@ void mongo_db_plugin_impl::applied_transaction( const chain::transaction_trace_p void mongo_db_plugin_impl::applied_irreversible_block( const chain::block_state_ptr& bs ) { try { - queue( irreversible_block_state_queue, bs ); + if( store_blocks || store_transactions ) { + queue( irreversible_block_state_queue, bs ); + } } catch (fc::exception& e) { elog("FC Exception while applied_irreversible_block ${e}", ("e", e.to_string())); } catch (std::exception& e) { @@ -290,7 +300,9 @@ void mongo_db_plugin_impl::accepted_block( const chain::block_state_ptr& bs ) { start_block_reached = true; } } - queue( block_state_queue, bs ); + if( store_blocks || store_block_states ) { + queue( block_state_queue, bs ); + } } catch (fc::exception& e) { elog("FC Exception while accepted_block ${e}", ("e", e.to_string())); } catch (std::exception& e) { @@ -705,7 +717,7 @@ mongo_db_plugin_impl::add_action_trace( mongocxx::bulk_write& bulk_action_traces } bool added = false; - if( start_block_reached && filter_include( atrace.act ) ) { + if( start_block_reached && store_action_traces && filter_include( atrace.act ) ) { auto action_traces_doc = bsoncxx::builder::basic::document{}; const chain::base_action_trace& base = atrace; // without inline action traces @@ -775,7 +787,7 @@ void mongo_db_plugin_impl::_process_applied_transaction( const chain::transactio } } - if( !start_block_reached ) return; + if( !start_block_reached || !store_transaction_traces ) return; // transaction trace insert @@ -818,78 +830,81 @@ void mongo_db_plugin_impl::_process_accepted_block( const chain::block_state_ptr auto block_num = bs->block_num; if( block_num % 1000 == 0 ) ilog( "block_num: ${b}", ("b", block_num) ); - const auto block_id = bs->id; + const auto& block_id = bs->id; const auto block_id_str = block_id.str(); - const auto prev_block_id_str = bs->block->previous.str(); auto now = std::chrono::duration_cast( std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); - const chain::block_header_state& bhs = *bs; + if( store_block_states ) { + auto block_states = mongo_conn[db_name][block_states_col]; + auto block_state_doc = bsoncxx::builder::basic::document{}; + block_state_doc.append( kvp( "block_num", b_int32{static_cast(block_num)} ), + kvp( "block_id", block_id_str ), + kvp( "validated", b_bool{bs->validated} ), + kvp( "in_current_chain", b_bool{bs->in_current_chain} ) ); - auto block_states = mongo_conn[db_name][block_states_col]; - auto block_state_doc = bsoncxx::builder::basic::document{}; - block_state_doc.append(kvp( "block_num", b_int32{static_cast(block_num)} ), - kvp( "block_id", block_id_str ), - kvp( "validated", b_bool{bs->validated} ), - kvp( "in_current_chain", b_bool{bs->in_current_chain} )); + const chain::block_header_state& bhs = *bs; - auto json = fc::json::to_string( bhs ); - try { - const auto& value = bsoncxx::from_json( json ); - block_state_doc.append( kvp( "block_header_state", value )); - } catch( bsoncxx::exception& ) { + auto json = fc::json::to_string( bhs ); try { - json = fc::prune_invalid_utf8(json); const auto& value = bsoncxx::from_json( json ); - block_state_doc.append( kvp( "block_header_state", value )); - block_state_doc.append( kvp( "non-utf8-purged", b_bool{true})); - } catch( bsoncxx::exception& e ) { - elog( "Unable to convert block_header_state JSON to MongoDB JSON: ${e}", ("e", e.what())); - elog( " JSON: ${j}", ("j", json)); + block_state_doc.append( kvp( "block_header_state", value ) ); + } catch( bsoncxx::exception& ) { + try { + json = fc::prune_invalid_utf8( json ); + const auto& value = bsoncxx::from_json( json ); + block_state_doc.append( kvp( "block_header_state", value ) ); + block_state_doc.append( kvp( "non-utf8-purged", b_bool{true} ) ); + } catch( bsoncxx::exception& e ) { + elog( "Unable to convert block_header_state JSON to MongoDB JSON: ${e}", ("e", e.what()) ); + elog( " JSON: ${j}", ("j", json) ); + } } - } - block_state_doc.append(kvp( "createdAt", b_date{now} )); + block_state_doc.append( kvp( "createdAt", b_date{now} ) ); - try { - if( !block_states.update_one( make_document( kvp( "block_id", block_id_str )), - make_document( kvp( "$set", block_state_doc.view())), update_opts )) { - EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block_state ${bid}", ("bid", block_id)); + try { + if( !block_states.update_one( make_document( kvp( "block_id", block_id_str ) ), + make_document( kvp( "$set", block_state_doc.view() ) ), update_opts ) ) { + EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block_state ${bid}", ("bid", block_id) ); + } + } catch( ... ) { + handle_mongo_exception( "block_states insert: " + json, __LINE__ ); } - } catch(...) { - handle_mongo_exception("block_states insert: " + json, __LINE__); } - auto blocks = mongo_conn[db_name][blocks_col]; - auto block_doc = bsoncxx::builder::basic::document{}; - block_doc.append( kvp( "block_num", b_int32{static_cast(block_num)} ), - kvp( "block_id", block_id_str ) ); + if( store_blocks ) { + auto blocks = mongo_conn[db_name][blocks_col]; + auto block_doc = bsoncxx::builder::basic::document{}; + block_doc.append( kvp( "block_num", b_int32{static_cast(block_num)} ), + kvp( "block_id", block_id_str ) ); - auto v = to_variant_with_abi( *bs->block ); - json = fc::json::to_string( v ); - try { - const auto& value = bsoncxx::from_json( json ); - block_doc.append( kvp( "block", value )); - } catch( bsoncxx::exception& ) { + auto v = to_variant_with_abi( *bs->block ); + auto json = fc::json::to_string( v ); try { - json = fc::prune_invalid_utf8(json); const auto& value = bsoncxx::from_json( json ); - block_doc.append( kvp( "block", value )); - block_doc.append( kvp( "non-utf8-purged", b_bool{true})); - } catch( bsoncxx::exception& e ) { - elog( "Unable to convert block JSON to MongoDB JSON: ${e}", ("e", e.what())); - elog( " JSON: ${j}", ("j", json)); + block_doc.append( kvp( "block", value ) ); + } catch( bsoncxx::exception& ) { + try { + json = fc::prune_invalid_utf8( json ); + const auto& value = bsoncxx::from_json( json ); + block_doc.append( kvp( "block", value ) ); + block_doc.append( kvp( "non-utf8-purged", b_bool{true} ) ); + } catch( bsoncxx::exception& e ) { + elog( "Unable to convert block JSON to MongoDB JSON: ${e}", ("e", e.what()) ); + elog( " JSON: ${j}", ("j", json) ); + } } - } - block_doc.append(kvp( "createdAt", b_date{now} )); + block_doc.append( kvp( "createdAt", b_date{now} ) ); - try { - if( !blocks.update_one( make_document( kvp( "block_id", block_id_str )), - make_document( kvp( "$set", block_doc.view())), update_opts )) { - EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block ${bid}", ("bid", block_id)); + try { + if( !blocks.update_one( make_document( kvp( "block_id", block_id_str ) ), + make_document( kvp( "$set", block_doc.view() ) ), update_opts ) ) { + EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block ${bid}", ("bid", block_id) ); + } + } catch( ... ) { + handle_mongo_exception( "blocks insert: " + json, __LINE__ ); } - } catch(...) { - handle_mongo_exception("blocks insert: " + json, __LINE__); } } @@ -910,56 +925,60 @@ void mongo_db_plugin_impl::_process_irreversible_block(const chain::block_state_ auto now = std::chrono::duration_cast( std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()}); - auto ir_block = find_block(blocks, block_id_str); - if( !ir_block ) { - _process_accepted_block( bs ); - ir_block = find_block( blocks, block_id_str ); - if( !ir_block ) return; // should never happen + if( store_blocks ) { + auto ir_block = find_block( blocks, block_id_str ); + if( !ir_block ) { + _process_accepted_block( bs ); + ir_block = find_block( blocks, block_id_str ); + if( !ir_block ) return; // should never happen + } + + auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ), + kvp( "validated", b_bool{bs->validated} ), + kvp( "in_current_chain", b_bool{bs->in_current_chain} ), + kvp( "updatedAt", b_date{now} ) ) ) ); + + blocks.update_one( make_document( kvp( "_id", ir_block->view()["_id"].get_oid() ) ), update_doc.view() ); } - auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ), - kvp( "validated", b_bool{bs->validated} ), - kvp( "in_current_chain", b_bool{bs->in_current_chain} ), - kvp( "updatedAt", b_date{now})))); + if( store_transactions ) { + bool transactions_in_block = false; + mongocxx::options::bulk_write bulk_opts; + bulk_opts.ordered( false ); + auto bulk = trans.create_bulk_write( bulk_opts ); + + for( const auto& receipt : bs->block->transactions ) { + string trx_id_str; + if( receipt.trx.contains() ) { + const auto& pt = receipt.trx.get(); + // get id via get_raw_transaction() as packed_transaction.id() mutates internal transaction state + const auto& raw = pt.get_raw_transaction(); + const auto& id = fc::raw::unpack( raw ).id(); + trx_id_str = id.str(); + } else { + const auto& id = receipt.trx.get(); + trx_id_str = id.str(); + } - blocks.update_one( make_document( kvp( "_id", ir_block->view()["_id"].get_oid())), update_doc.view()); + auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ), + kvp( "block_id", block_id_str ), + kvp( "block_num", b_int32{static_cast(block_num)} ), + kvp( "updatedAt", b_date{now} ) ) ) ); - bool transactions_in_block = false; - mongocxx::options::bulk_write bulk_opts; - bulk_opts.ordered(false); - auto bulk = trans.create_bulk_write(bulk_opts); - - for (const auto& receipt : bs->block->transactions) { - string trx_id_str; - if( receipt.trx.contains()) { - const auto& pt = receipt.trx.get(); - // get id via get_raw_transaction() as packed_transaction.id() mutates internal transaction state - const auto& raw = pt.get_raw_transaction(); - const auto& id = fc::raw::unpack(raw).id(); - trx_id_str = id.str(); - } else { - const auto& id = receipt.trx.get(); - trx_id_str = id.str(); + mongocxx::model::update_one update_op{make_document( kvp( "trx_id", trx_id_str ) ), update_doc.view()}; + update_op.upsert( true ); + bulk.append( update_op ); + transactions_in_block = true; } - auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ), - kvp( "block_id", block_id_str ), - kvp( "block_num", b_int32{static_cast(block_num)} ), - kvp( "updatedAt", b_date{now} )))); - - mongocxx::model::update_one update_op{make_document( kvp( "trx_id", trx_id_str )), update_doc.view()}; - update_op.upsert( true ); - bulk.append( update_op ); - transactions_in_block = true; - } - - if( transactions_in_block ) { - try { - if( !bulk.execute()) { - EOS_ASSERT( false, chain::mongo_db_insert_fail, "Bulk transaction insert failed for block: ${bid}", ("bid", block_id)); + if( transactions_in_block ) { + try { + if( !bulk.execute() ) { + EOS_ASSERT( false, chain::mongo_db_insert_fail, "Bulk transaction insert failed for block: ${bid}", ("bid", block_id) ); + } + } catch( ... ) { + handle_mongo_exception( "bulk transaction insert", __LINE__ ); } - } catch(...) { - handle_mongo_exception("bulk transaction insert", __LINE__); } } } @@ -1326,11 +1345,20 @@ void mongo_db_plugin::set_program_options(options_description& cli, options_desc "MongoDB URI connection string, see: https://docs.mongodb.com/master/reference/connection-string/." " If not specified then plugin is disabled. Default database 'EOS' is used if not specified in URI." " Example: mongodb://127.0.0.1:27017/EOS") + ("mongodb-store-blocks", bpo::bool_switch()->default_value(true), + "Enables storing blocks in mongodb.") + ("mongodb-store-block-states", bpo::bool_switch()->default_value(true), + "Enables storing block state in mongodb.") + ("mongodb-store-transactions", bpo::bool_switch()->default_value(true), + "Enables storing transactions in mongodb.") + ("mongodb-store-transaction-traces", bpo::bool_switch()->default_value(true), + "Enables storing transaction traces in mongodb.") + ("mongodb-store-action-traces", bpo::bool_switch()->default_value(true), + "Enables storing action traces in mongodb.") ("mongodb-filter-on", bpo::value>()->composing(), "Mongodb: Track actions which match receiver:action:actor. Actor may be blank to include all. Receiver and Action may not be blank. Default is * include everything.") ("mongodb-filter-out", bpo::value>()->composing(), "Mongodb: Do not track actions which match receiver:action:actor. Action and Actor both blank excludes all from reciever. Actor blank excludes all from reciever:action. Receiver may not be blank.") - ; } @@ -1366,6 +1394,21 @@ void mongo_db_plugin::plugin_initialize(const variables_map& options) if( options.count( "mongodb-block-start" )) { my->start_block_num = options.at( "mongodb-block-start" ).as(); } + if( options.count( "mongodb-store-blocks" )) { + my->store_blocks = options.at( "mongodb-store-blocks" ).as(); + } + if( options.count( "mongodb-store-block-states" )) { + my->store_block_states = options.at( "mongodb-store-block-states" ).as(); + } + if( options.count( "mongodb-store-transactions" )) { + my->store_transactions = options.at( "mongodb-store-transactions" ).as(); + } + if( options.count( "mongodb-store-transaction-traces" )) { + my->store_transaction_traces = options.at( "mongodb-store-transaction-traces" ).as(); + } + if( options.count( "mongodb-store-action-traces" )) { + my->store_action_traces = options.at( "mongodb-store-action-traces" ).as(); + } if( options.count( "mongodb-filter-on" )) { auto fo = options.at( "mongodb-filter-on" ).as>(); for( auto& s : fo ) { From 854c72a1c04e823e89de299627969aebc00ecf77 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 9 Aug 2018 09:30:12 -0500 Subject: [PATCH 14/16] Optimize filter_include --- plugins/mongo_db_plugin/mongo_db_plugin.cpp | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index 5fe9ac61df6..a55049b4fc8 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -203,15 +203,14 @@ const std::string mongo_db_plugin_impl::account_controls_col = "account_controls bool mongo_db_plugin_impl::filter_include( const chain::action& act ) const { bool include = false; - if( filter_on_star ) { + if( filter_on_star || filter_on.find( {act.account, act.name, 0} ) != filter_on.end() ) { include = true; - } - if( filter_on.find( {act.account, act.name, 0} ) != filter_on.end() ) { - include = true; - } - for( const auto& a : act.authorization ) { - if( filter_on.find( {act.account, act.name, a.actor} ) != filter_on.end() ) { - include = true; + } else { + for( const auto& a : act.authorization ) { + if( filter_on.find( {act.account, act.name, a.actor} ) != filter_on.end() ) { + include = true; + break; + } } } From ba483ebdaf3e7e17b97ebd9398c0e7e23119d879 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 9 Aug 2018 09:30:55 -0500 Subject: [PATCH 15/16] Cut down on output when mongodb fails --- plugins/mongo_db_plugin/mongo_db_plugin.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index a55049b4fc8..d9d374be83e 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -451,8 +451,7 @@ void handle_mongo_exception( const std::string& desc, int line_num ) { elog( "mongo exception, ${desc}, line ${line}, code ${code}, ${details}", ("desc", desc)( "line", line_num )( "code", e.code().value() )( "details", e.code().message() )); if (e.raw_server_error()) { - elog( "mongo exception, ${desc}, line ${line}, ${details}", - ("desc", desc)( "line", line_num )( "details", bsoncxx::to_json(e.raw_server_error()->view()))); + elog( " raw_server_error: ${e}", ( "e", bsoncxx::to_json(e.raw_server_error()->view()))); } } catch( mongocxx::exception& e) { elog( "mongo exception, ${desc}, line ${line}, code ${code}, ${what}", From cbd3a3c11cf88d49473c6079e89f1f5f74bac522 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 9 Aug 2018 10:01:39 -0500 Subject: [PATCH 16/16] Use boost value instead of bool_switch since bool_switch defaults to false --- plugins/mongo_db_plugin/mongo_db_plugin.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/plugins/mongo_db_plugin/mongo_db_plugin.cpp b/plugins/mongo_db_plugin/mongo_db_plugin.cpp index d9d374be83e..4aaea9b3298 100644 --- a/plugins/mongo_db_plugin/mongo_db_plugin.cpp +++ b/plugins/mongo_db_plugin/mongo_db_plugin.cpp @@ -1343,15 +1343,15 @@ void mongo_db_plugin::set_program_options(options_description& cli, options_desc "MongoDB URI connection string, see: https://docs.mongodb.com/master/reference/connection-string/." " If not specified then plugin is disabled. Default database 'EOS' is used if not specified in URI." " Example: mongodb://127.0.0.1:27017/EOS") - ("mongodb-store-blocks", bpo::bool_switch()->default_value(true), + ("mongodb-store-blocks", bpo::value()->default_value(true), "Enables storing blocks in mongodb.") - ("mongodb-store-block-states", bpo::bool_switch()->default_value(true), + ("mongodb-store-block-states", bpo::value()->default_value(true), "Enables storing block state in mongodb.") - ("mongodb-store-transactions", bpo::bool_switch()->default_value(true), + ("mongodb-store-transactions", bpo::value()->default_value(true), "Enables storing transactions in mongodb.") - ("mongodb-store-transaction-traces", bpo::bool_switch()->default_value(true), + ("mongodb-store-transaction-traces", bpo::value()->default_value(true), "Enables storing transaction traces in mongodb.") - ("mongodb-store-action-traces", bpo::bool_switch()->default_value(true), + ("mongodb-store-action-traces", bpo::value()->default_value(true), "Enables storing action traces in mongodb.") ("mongodb-filter-on", bpo::value>()->composing(), "Mongodb: Track actions which match receiver:action:actor. Actor may be blank to include all. Receiver and Action may not be blank. Default is * include everything.")