Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Add options to not store blocks, block-states, transactions, transact…
Browse files Browse the repository at this point in the history
…ion-traces, and action-traces
  • Loading branch information
heifner committed Aug 8, 2018
1 parent 5dd4647 commit d24e006
Showing 1 changed file with 144 additions and 101 deletions.
245 changes: 144 additions & 101 deletions plugins/mongo_db_plugin/mongo_db_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ class mongo_db_plugin_impl {
bool filter_on_star = true;
std::set<filter_entry> filter_on;
std::set<filter_entry> filter_out;
bool store_blocks = true;
bool store_block_states = true;
bool store_transactions = true;
bool store_transaction_traces = true;
bool store_action_traces = true;

std::string db_name;
mongocxx::instance mongo_inst;
Expand Down Expand Up @@ -249,7 +254,9 @@ void mongo_db_plugin_impl::queue( Queue& queue, const Entry& e ) {

void mongo_db_plugin_impl::accepted_transaction( const chain::transaction_metadata_ptr& t ) {
try {
queue( transaction_metadata_queue, t );
if( store_transactions ) {
queue( transaction_metadata_queue, t );
}
} catch (fc::exception& e) {
elog("FC Exception while accepted_transaction ${e}", ("e", e.to_string()));
} catch (std::exception& e) {
Expand All @@ -261,6 +268,7 @@ void mongo_db_plugin_impl::accepted_transaction( const chain::transaction_metada

void mongo_db_plugin_impl::applied_transaction( const chain::transaction_trace_ptr& t ) {
try {
// always queue since account information always gathered
queue( transaction_trace_queue, t );
} catch (fc::exception& e) {
elog("FC Exception while applied_transaction ${e}", ("e", e.to_string()));
Expand All @@ -273,7 +281,9 @@ void mongo_db_plugin_impl::applied_transaction( const chain::transaction_trace_p

void mongo_db_plugin_impl::applied_irreversible_block( const chain::block_state_ptr& bs ) {
try {
queue( irreversible_block_state_queue, bs );
if( store_blocks || store_transactions ) {
queue( irreversible_block_state_queue, bs );
}
} catch (fc::exception& e) {
elog("FC Exception while applied_irreversible_block ${e}", ("e", e.to_string()));
} catch (std::exception& e) {
Expand All @@ -290,7 +300,9 @@ void mongo_db_plugin_impl::accepted_block( const chain::block_state_ptr& bs ) {
start_block_reached = true;
}
}
queue( block_state_queue, bs );
if( store_blocks || store_block_states ) {
queue( block_state_queue, bs );
}
} catch (fc::exception& e) {
elog("FC Exception while accepted_block ${e}", ("e", e.to_string()));
} catch (std::exception& e) {
Expand Down Expand Up @@ -705,7 +717,7 @@ mongo_db_plugin_impl::add_action_trace( mongocxx::bulk_write& bulk_action_traces
}

bool added = false;
if( start_block_reached && filter_include( atrace.act ) ) {
if( start_block_reached && store_action_traces && filter_include( atrace.act ) ) {
auto action_traces_doc = bsoncxx::builder::basic::document{};
const chain::base_action_trace& base = atrace; // without inline action traces

Expand Down Expand Up @@ -775,7 +787,7 @@ void mongo_db_plugin_impl::_process_applied_transaction( const chain::transactio
}
}

if( !start_block_reached ) return;
if( !start_block_reached || !store_transaction_traces ) return;

// transaction trace insert

Expand Down Expand Up @@ -818,78 +830,81 @@ void mongo_db_plugin_impl::_process_accepted_block( const chain::block_state_ptr
auto block_num = bs->block_num;
if( block_num % 1000 == 0 )
ilog( "block_num: ${b}", ("b", block_num) );
const auto block_id = bs->id;
const auto& block_id = bs->id;
const auto block_id_str = block_id.str();
const auto prev_block_id_str = bs->block->previous.str();

auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()});

const chain::block_header_state& bhs = *bs;
if( store_block_states ) {
auto block_states = mongo_conn[db_name][block_states_col];
auto block_state_doc = bsoncxx::builder::basic::document{};
block_state_doc.append( kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
kvp( "block_id", block_id_str ),
kvp( "validated", b_bool{bs->validated} ),
kvp( "in_current_chain", b_bool{bs->in_current_chain} ) );

auto block_states = mongo_conn[db_name][block_states_col];
auto block_state_doc = bsoncxx::builder::basic::document{};
block_state_doc.append(kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
kvp( "block_id", block_id_str ),
kvp( "validated", b_bool{bs->validated} ),
kvp( "in_current_chain", b_bool{bs->in_current_chain} ));
const chain::block_header_state& bhs = *bs;

auto json = fc::json::to_string( bhs );
try {
const auto& value = bsoncxx::from_json( json );
block_state_doc.append( kvp( "block_header_state", value ));
} catch( bsoncxx::exception& ) {
auto json = fc::json::to_string( bhs );
try {
json = fc::prune_invalid_utf8(json);
const auto& value = bsoncxx::from_json( json );
block_state_doc.append( kvp( "block_header_state", value ));
block_state_doc.append( kvp( "non-utf8-purged", b_bool{true}));
} catch( bsoncxx::exception& e ) {
elog( "Unable to convert block_header_state JSON to MongoDB JSON: ${e}", ("e", e.what()));
elog( " JSON: ${j}", ("j", json));
block_state_doc.append( kvp( "block_header_state", value ) );
} catch( bsoncxx::exception& ) {
try {
json = fc::prune_invalid_utf8( json );
const auto& value = bsoncxx::from_json( json );
block_state_doc.append( kvp( "block_header_state", value ) );
block_state_doc.append( kvp( "non-utf8-purged", b_bool{true} ) );
} catch( bsoncxx::exception& e ) {
elog( "Unable to convert block_header_state JSON to MongoDB JSON: ${e}", ("e", e.what()) );
elog( " JSON: ${j}", ("j", json) );
}
}
}
block_state_doc.append(kvp( "createdAt", b_date{now} ));
block_state_doc.append( kvp( "createdAt", b_date{now} ) );

try {
if( !block_states.update_one( make_document( kvp( "block_id", block_id_str )),
make_document( kvp( "$set", block_state_doc.view())), update_opts )) {
EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block_state ${bid}", ("bid", block_id));
try {
if( !block_states.update_one( make_document( kvp( "block_id", block_id_str ) ),
make_document( kvp( "$set", block_state_doc.view() ) ), update_opts ) ) {
EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block_state ${bid}", ("bid", block_id) );
}
} catch( ... ) {
handle_mongo_exception( "block_states insert: " + json, __LINE__ );
}
} catch(...) {
handle_mongo_exception("block_states insert: " + json, __LINE__);
}

auto blocks = mongo_conn[db_name][blocks_col];
auto block_doc = bsoncxx::builder::basic::document{};
block_doc.append( kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
kvp( "block_id", block_id_str ) );
if( store_blocks ) {
auto blocks = mongo_conn[db_name][blocks_col];
auto block_doc = bsoncxx::builder::basic::document{};
block_doc.append( kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
kvp( "block_id", block_id_str ) );

auto v = to_variant_with_abi( *bs->block );
json = fc::json::to_string( v );
try {
const auto& value = bsoncxx::from_json( json );
block_doc.append( kvp( "block", value ));
} catch( bsoncxx::exception& ) {
auto v = to_variant_with_abi( *bs->block );
auto json = fc::json::to_string( v );
try {
json = fc::prune_invalid_utf8(json);
const auto& value = bsoncxx::from_json( json );
block_doc.append( kvp( "block", value ));
block_doc.append( kvp( "non-utf8-purged", b_bool{true}));
} catch( bsoncxx::exception& e ) {
elog( "Unable to convert block JSON to MongoDB JSON: ${e}", ("e", e.what()));
elog( " JSON: ${j}", ("j", json));
block_doc.append( kvp( "block", value ) );
} catch( bsoncxx::exception& ) {
try {
json = fc::prune_invalid_utf8( json );
const auto& value = bsoncxx::from_json( json );
block_doc.append( kvp( "block", value ) );
block_doc.append( kvp( "non-utf8-purged", b_bool{true} ) );
} catch( bsoncxx::exception& e ) {
elog( "Unable to convert block JSON to MongoDB JSON: ${e}", ("e", e.what()) );
elog( " JSON: ${j}", ("j", json) );
}
}
}
block_doc.append(kvp( "createdAt", b_date{now} ));
block_doc.append( kvp( "createdAt", b_date{now} ) );

try {
if( !blocks.update_one( make_document( kvp( "block_id", block_id_str )),
make_document( kvp( "$set", block_doc.view())), update_opts )) {
EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block ${bid}", ("bid", block_id));
try {
if( !blocks.update_one( make_document( kvp( "block_id", block_id_str ) ),
make_document( kvp( "$set", block_doc.view() ) ), update_opts ) ) {
EOS_ASSERT( false, chain::mongo_db_insert_fail, "Failed to insert block ${bid}", ("bid", block_id) );
}
} catch( ... ) {
handle_mongo_exception( "blocks insert: " + json, __LINE__ );
}
} catch(...) {
handle_mongo_exception("blocks insert: " + json, __LINE__);
}
}

Expand All @@ -910,56 +925,60 @@ void mongo_db_plugin_impl::_process_irreversible_block(const chain::block_state_
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::microseconds{fc::time_point::now().time_since_epoch().count()});

auto ir_block = find_block(blocks, block_id_str);
if( !ir_block ) {
_process_accepted_block( bs );
ir_block = find_block( blocks, block_id_str );
if( !ir_block ) return; // should never happen
if( store_blocks ) {
auto ir_block = find_block( blocks, block_id_str );
if( !ir_block ) {
_process_accepted_block( bs );
ir_block = find_block( blocks, block_id_str );
if( !ir_block ) return; // should never happen
}

auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ),
kvp( "validated", b_bool{bs->validated} ),
kvp( "in_current_chain", b_bool{bs->in_current_chain} ),
kvp( "updatedAt", b_date{now} ) ) ) );

blocks.update_one( make_document( kvp( "_id", ir_block->view()["_id"].get_oid() ) ), update_doc.view() );
}

auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ),
kvp( "validated", b_bool{bs->validated} ),
kvp( "in_current_chain", b_bool{bs->in_current_chain} ),
kvp( "updatedAt", b_date{now}))));
if( store_transactions ) {
bool transactions_in_block = false;
mongocxx::options::bulk_write bulk_opts;
bulk_opts.ordered( false );
auto bulk = trans.create_bulk_write( bulk_opts );

for( const auto& receipt : bs->block->transactions ) {
string trx_id_str;
if( receipt.trx.contains<packed_transaction>() ) {
const auto& pt = receipt.trx.get<packed_transaction>();
// get id via get_raw_transaction() as packed_transaction.id() mutates internal transaction state
const auto& raw = pt.get_raw_transaction();
const auto& id = fc::raw::unpack<transaction>( raw ).id();
trx_id_str = id.str();
} else {
const auto& id = receipt.trx.get<transaction_id_type>();
trx_id_str = id.str();
}

blocks.update_one( make_document( kvp( "_id", ir_block->view()["_id"].get_oid())), update_doc.view());
auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ),
kvp( "block_id", block_id_str ),
kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
kvp( "updatedAt", b_date{now} ) ) ) );

bool transactions_in_block = false;
mongocxx::options::bulk_write bulk_opts;
bulk_opts.ordered(false);
auto bulk = trans.create_bulk_write(bulk_opts);

for (const auto& receipt : bs->block->transactions) {
string trx_id_str;
if( receipt.trx.contains<packed_transaction>()) {
const auto& pt = receipt.trx.get<packed_transaction>();
// get id via get_raw_transaction() as packed_transaction.id() mutates internal transaction state
const auto& raw = pt.get_raw_transaction();
const auto& id = fc::raw::unpack<transaction>(raw).id();
trx_id_str = id.str();
} else {
const auto& id = receipt.trx.get<transaction_id_type>();
trx_id_str = id.str();
mongocxx::model::update_one update_op{make_document( kvp( "trx_id", trx_id_str ) ), update_doc.view()};
update_op.upsert( true );
bulk.append( update_op );
transactions_in_block = true;
}

auto update_doc = make_document( kvp( "$set", make_document( kvp( "irreversible", b_bool{true} ),
kvp( "block_id", block_id_str ),
kvp( "block_num", b_int32{static_cast<int32_t>(block_num)} ),
kvp( "updatedAt", b_date{now} ))));

mongocxx::model::update_one update_op{make_document( kvp( "trx_id", trx_id_str )), update_doc.view()};
update_op.upsert( true );
bulk.append( update_op );
transactions_in_block = true;
}

if( transactions_in_block ) {
try {
if( !bulk.execute()) {
EOS_ASSERT( false, chain::mongo_db_insert_fail, "Bulk transaction insert failed for block: ${bid}", ("bid", block_id));
if( transactions_in_block ) {
try {
if( !bulk.execute() ) {
EOS_ASSERT( false, chain::mongo_db_insert_fail, "Bulk transaction insert failed for block: ${bid}", ("bid", block_id) );
}
} catch( ... ) {
handle_mongo_exception( "bulk transaction insert", __LINE__ );
}
} catch(...) {
handle_mongo_exception("bulk transaction insert", __LINE__);
}
}
}
Expand Down Expand Up @@ -1326,11 +1345,20 @@ void mongo_db_plugin::set_program_options(options_description& cli, options_desc
"MongoDB URI connection string, see: https://docs.mongodb.com/master/reference/connection-string/."
" If not specified then plugin is disabled. Default database 'EOS' is used if not specified in URI."
" Example: mongodb://127.0.0.1:27017/EOS")
("mongodb-store-blocks", bpo::bool_switch()->default_value(true),
"Enables storing blocks in mongodb.")
("mongodb-store-block-states", bpo::bool_switch()->default_value(true),
"Enables storing block state in mongodb.")
("mongodb-store-transactions", bpo::bool_switch()->default_value(true),
"Enables storing transactions in mongodb.")
("mongodb-store-transaction-traces", bpo::bool_switch()->default_value(true),
"Enables storing transaction traces in mongodb.")
("mongodb-store-action-traces", bpo::bool_switch()->default_value(true),
"Enables storing action traces in mongodb.")
("mongodb-filter-on", bpo::value<vector<string>>()->composing(),
"Mongodb: Track actions which match receiver:action:actor. Actor may be blank to include all. Receiver and Action may not be blank. Default is * include everything.")
("mongodb-filter-out", bpo::value<vector<string>>()->composing(),
"Mongodb: Do not track actions which match receiver:action:actor. Action and Actor both blank excludes all from reciever. Actor blank excludes all from reciever:action. Receiver may not be blank.")

;
}

Expand Down Expand Up @@ -1366,6 +1394,21 @@ void mongo_db_plugin::plugin_initialize(const variables_map& options)
if( options.count( "mongodb-block-start" )) {
my->start_block_num = options.at( "mongodb-block-start" ).as<uint32_t>();
}
if( options.count( "mongodb-store-blocks" )) {
my->store_blocks = options.at( "mongodb-store-blocks" ).as<bool>();
}
if( options.count( "mongodb-store-block-states" )) {
my->store_block_states = options.at( "mongodb-store-block-states" ).as<bool>();
}
if( options.count( "mongodb-store-transactions" )) {
my->store_transactions = options.at( "mongodb-store-transactions" ).as<bool>();
}
if( options.count( "mongodb-store-transaction-traces" )) {
my->store_transaction_traces = options.at( "mongodb-store-transaction-traces" ).as<bool>();
}
if( options.count( "mongodb-store-action-traces" )) {
my->store_action_traces = options.at( "mongodb-store-action-traces" ).as<bool>();
}
if( options.count( "mongodb-filter-on" )) {
auto fo = options.at( "mongodb-filter-on" ).as<vector<string>>();
for( auto& s : fo ) {
Expand Down

0 comments on commit d24e006

Please sign in to comment.