Skip to content

Commit

Permalink
Merge pull request #999 from AntelopeIO/gh-698-part2
Browse files Browse the repository at this point in the history
Move abi serialization of transaction trace off main thread
  • Loading branch information
greg7mdp authored Apr 13, 2023
2 parents be11e4f + e0edbcd commit 3b2f665
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 299 deletions.
104 changes: 87 additions & 17 deletions libraries/chain/include/eosio/chain/abi_serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <fc/variant_object.hpp>
#include <fc/scoped_exit.hpp>

namespace eosio { namespace chain {
namespace eosio::chain {

using std::map;
using std::string;
Expand Down Expand Up @@ -485,14 +485,15 @@ namespace impl {
};

try {
auto abi = resolver(act.account);
if (abi) {
auto type = abi->get_action_type(act.name);
auto abi_optional = resolver(act.account);
if (abi_optional) {
abi_serializer& abi = *abi_optional;
auto type = abi.get_action_type(act.name);
if (!type.empty()) {
try {
binary_to_variant_context _ctx(*abi, ctx, type);
binary_to_variant_context _ctx(abi, ctx, type);
_ctx.short_path = true; // Just to be safe while avoiding the complexity of threading an override boolean all over the place
mvo( "data", abi->_binary_to_variant( type, act.data, _ctx ));
mvo( "data", abi._binary_to_variant( type, act.data, _ctx ));
} catch(...) {
// any failure to serialize data, then leave as not serailzed
set_hex_data(mvo, "data", act.data);
Expand Down Expand Up @@ -546,13 +547,14 @@ namespace impl {
mvo("return_value_hex_data", act_trace.return_value);
auto act = act_trace.act;
try {
auto abi = resolver(act.account);
if (abi) {
auto type = abi->get_action_result_type(act.name);
auto abi_optional = resolver(act.account);
if (abi_optional) {
abi_serializer& abi = *abi_optional;
auto type = abi.get_action_result_type(act.name);
if (!type.empty()) {
binary_to_variant_context _ctx(*abi, ctx, type);
binary_to_variant_context _ctx(abi, ctx, type);
_ctx.short_path = true; // Just to be safe while avoiding the complexity of threading an override boolean all over the place
mvo( "return_value_data", abi->_binary_to_variant( type, act_trace.return_value, _ctx ));
mvo( "return_value_data", abi._binary_to_variant( type, act_trace.return_value, _ctx ));
}
}
} catch(...) {}
Expand Down Expand Up @@ -678,13 +680,13 @@ namespace impl {
* this will degrade to the common fc::to_variant as soon as the type no longer contains
* ABI related info
*
* @tparam Reslover - callable with the signature (const name& code_account) -> std::optional<abi_def>
* @tparam Resolver - callable with the signature (const name& code_account) -> std::optional<abi_def>
*/
template<typename T, typename Resolver>
class abi_to_variant_visitor
{
public:
abi_to_variant_visitor( mutable_variant_object& _mvo, const T& _val, Resolver _resolver, abi_traverse_context& _ctx )
abi_to_variant_visitor( mutable_variant_object& _mvo, const T& _val, Resolver& _resolver, abi_traverse_context& _ctx )
:_vo(_mvo)
,_val(_val)
,_resolver(_resolver)
Expand All @@ -707,7 +709,7 @@ namespace impl {
private:
mutable_variant_object& _vo;
const T& _val;
Resolver _resolver;
Resolver& _resolver;
abi_traverse_context& _ctx;
};

Expand Down Expand Up @@ -890,7 +892,7 @@ namespace impl {
class abi_from_variant_visitor : public reflector_init_visitor<T>
{
public:
abi_from_variant_visitor( const variant_object& _vo, T& v, Resolver _resolver, abi_traverse_context& _ctx )
abi_from_variant_visitor( const variant_object& _vo, T& v, Resolver& _resolver, abi_traverse_context& _ctx )
: reflector_init_visitor<T>(v)
,_vo(_vo)
,_resolver(_resolver)
Expand All @@ -914,7 +916,7 @@ namespace impl {

private:
const variant_object& _vo;
Resolver _resolver;
Resolver& _resolver;
abi_traverse_context& _ctx;
};

Expand Down Expand Up @@ -970,5 +972,73 @@ void abi_serializer::from_variant( const fc::variant& v, T& o, Resolver resolver
from_variant( v, o, resolver, create_yield_function(max_serialization_time) );
}

using abi_serializer_cache_t = std::unordered_map<account_name, std::optional<abi_serializer>>;

class abi_resolver {
public:
abi_resolver(abi_serializer_cache_t&& abi_serializers) :
abi_serializers(std::move(abi_serializers))
{}

std::optional<std::reference_wrapper<abi_serializer>> operator()(const account_name& account) {
auto it = abi_serializers.find(account);
if (it != abi_serializers.end() && it->second)
return std::reference_wrapper<abi_serializer>(*it->second);
return {};
};

private:
abi_serializer_cache_t abi_serializers;
};

class abi_serializer_cache_builder {
public:
abi_serializer_cache_builder(std::function<std::optional<abi_serializer>(const account_name& name)> resolver) :
resolver_(std::move(resolver))
{
}

abi_serializer_cache_builder(const abi_serializer_cache_builder&) = delete;

abi_serializer_cache_builder&& add_serializers(const chain::signed_block_ptr& block) && {
for( const auto& receipt: block->transactions ) {
if( std::holds_alternative<chain::packed_transaction>( receipt.trx ) ) {
const auto& pt = std::get<chain::packed_transaction>( receipt.trx );
const auto& t = pt.get_transaction();
for( const auto& a: t.actions )
add_to_cache( a );
for( const auto& a: t.context_free_actions )
add_to_cache( a );
}
}
return std::move(*this);
}

abi_serializer_cache_builder&& add_serializers(const transaction_trace_ptr& trace_ptr) && {
for( const auto& trace: trace_ptr->action_traces ) {
add_to_cache(trace.act);
}
return std::move(*this);
}

abi_serializer_cache_t&& get() && {
return std::move(abi_serializers);
}

private:
void add_to_cache(const chain::action& a) {
auto it = abi_serializers.find( a.account );
if( it == abi_serializers.end() ) {
try {
abi_serializers.emplace_hint( it, a.account, resolver_( a.account ) );
} catch( ... ) {
// keep behavior of not throwing on invalid abi, will result in hex data
}
}
}

std::function<std::optional<abi_serializer>(const account_name& name)> resolver_;
abi_serializer_cache_t abi_serializers;
};

} } // eosio::chain
} // eosio::chain
20 changes: 20 additions & 0 deletions libraries/chain/include/eosio/chain/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,26 @@
NEXT(e.dynamic_copy_exception());\
}

/**
* Capture all exceptions and return return_type which is constructible from a fc::exception_ptr
*/
#define CATCH_AND_RETURN(return_type)\
catch ( const fc::exception& err ) {\
return return_type(err.dynamic_copy_exception());\
} catch ( const std::exception& e ) {\
fc::exception fce( \
FC_LOG_MESSAGE( warn, "rethrow ${what}: ", ("what",e.what())),\
fc::std_exception_code,\
BOOST_CORE_TYPEID(e).name(),\
e.what() ) ;\
return return_type(fce.dynamic_copy_exception());\
} catch( ... ) {\
fc::unhandled_exception e(\
FC_LOG_MESSAGE(warn, "rethrow"),\
std::current_exception());\
return return_type(e.dynamic_copy_exception());\
}

#define EOS_RECODE_EXC( cause_type, effect_type ) \
catch( const cause_type& e ) \
{ throw( effect_type( e.what(), e.get_log() ) ); }
Expand Down
5 changes: 4 additions & 1 deletion libraries/chain/include/eosio/chain/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,10 @@ namespace eosio::chain {
// http_plugin thread pool) and which completes the API processing and returns the result T.
// -------------------------------------------------------------------------------------------------------
template<typename T>
using next_function_variant = std::variant<fc::exception_ptr, T, std::function<T()>>;
using t_or_exception = std::variant<T, fc::exception_ptr>;

template<typename T>
using next_function_variant = std::variant<fc::exception_ptr, T, std::function<t_or_exception<T>()>>;

template<typename T>
using next_function = std::function<void(const next_function_variant<T>&)>;
Expand Down
53 changes: 2 additions & 51 deletions plugins/chain_api_plugin/chain_api_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,37 +57,12 @@ parse_params<chain_apis::read_only::get_transaction_status_params, http_params_t

#define CHAIN_RO_CALL(call_name, http_response_code, params_type) CALL_WITH_400(chain, ro_api, chain_apis::read_only, call_name, http_response_code, params_type)
#define CHAIN_RW_CALL(call_name, http_response_code, params_type) CALL_WITH_400(chain, rw_api, chain_apis::read_write, call_name, http_response_code, params_type)
#define CHAIN_RO_CALL_POST(call_name, http_response_code, params_type) CALL_WITH_400_POST(chain, ro_api, chain_apis::read_only, call_name, http_response_code, params_type)
#define CHAIN_RO_CALL_ASYNC(call_name, call_result, http_response_code, params_type) CALL_ASYNC_WITH_400(chain, ro_api, chain_apis::read_only, call_name, call_result, http_response_code, params_type)
#define CHAIN_RW_CALL_ASYNC(call_name, call_result, http_response_code, params_type) CALL_ASYNC_WITH_400(chain, rw_api, chain_apis::read_write, call_name, call_result, http_response_code, params_type)

#define CHAIN_RO_CALL_WITH_400(call_name, http_response_code, params_type) CALL_WITH_400(chain, ro_api, chain_apis::read_only, call_name, http_response_code, params_type)

template<class API, class PARAMS_PARSER, class HANDLER>
static api_entry make_api_entry(http_plugin& _http_plugin, API& api, const char* api_name,
const char* call_name, PARAMS_PARSER params_parser, HANDLER handler) {
return api_entry(
std::string("/v1/") + api_name + "/" + call_name,
[&_http_plugin, api, api_name, call_name,
params_parser = std::move(params_parser), handler = std::move(handler)](string&&, string&& body, url_response_callback&& cb) {
auto deadline = api.start();
try {
auto start = fc::time_point::now();
auto params = params_parser(body);
FC_CHECK_DEADLINE(deadline);

// call first handler on main thread (likely because it accesses non thread-safe data)
// returns a thread-safe lambda that can be enqueued on the http thread pool to complete the request
auto completion_handler = handler(api, start, deadline, params, cb);
FC_CHECK_DEADLINE(deadline); // make sure remaining time is > 0

// execute thread-safe http_handler on _http_plugin's thread pool
_http_plugin.post_http_thread_pool(std::move(completion_handler));
} catch (...) {
http_plugin::handle_exception(api_name, call_name, body, cb);
}
});
}

void chain_api_plugin::plugin_startup() {
ilog( "starting chain_api_plugin" );
my.reset(new chain_api_plugin_impl(app().get_plugin<chain_plugin>().chain()));
Expand All @@ -104,6 +79,7 @@ void chain_api_plugin::plugin_startup() {
CHAIN_RO_CALL(get_info, 200, http_params_types::no_params)}, appbase::exec_queue::read_only, appbase::priority::medium_high);
_http_plugin.add_api({
CHAIN_RO_CALL(get_activated_protocol_features, 200, http_params_types::possible_no_params),
CHAIN_RO_CALL_POST(get_block, 200, http_params_types::params_required), // _POST because get_block() returns a lambda to be executed on the http thread pool
CHAIN_RO_CALL(get_block_info, 200, http_params_types::params_required),
CHAIN_RO_CALL(get_block_header_state, 200, http_params_types::params_required),
CHAIN_RO_CALL(get_account, 200, http_params_types::params_required),
Expand Down Expand Up @@ -153,31 +129,6 @@ void chain_api_plugin::plugin_startup() {
}, appbase::exec_queue::read_only);
}

_http_plugin.add_api({
make_api_entry(
_http_plugin, ro_api, "chain", "get_block",
[](const string& body) {
return parse_params<chain_apis::read_only::get_raw_block_params, http_params_types::params_required>(body);
},
[max_response_time, &chain](auto& api, fc::time_point start, fc::time_point deadline, auto &params, const url_response_callback &cb) {

chain::signed_block_ptr block = api.get_raw_block(params, deadline);
auto max_time = std::min(chain.get_abi_serializer_max_time(), max_response_time);
auto abi_cache = api.get_block_serializers(block, max_time);
auto post_time = fc::time_point::now();
auto remaining_time = max_time - (post_time - start);

return [api, cb, deadline, post_time, remaining_time, abi_cache=std::move(abi_cache), block=std::move(block)]() mutable {
try {
auto new_deadline = deadline + (fc::time_point::now() - post_time);
fc::variant result = api.convert_block(block, std::move(abi_cache), remaining_time);
cb(200, new_deadline, std::move(result));
} catch( ... ) {
http_plugin::handle_exception("chain", "get_block", "", cb);
}
};
})},
appbase::exec_queue::read_only);
}

void chain_api_plugin::plugin_shutdown() {}
Expand Down
Loading

0 comments on commit 3b2f665

Please sign in to comment.