Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.2] Backport of 2.1 optimizations #634

Merged
merged 5 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions libraries/chain/block_log.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <eosio/chain/block_log.hpp>
#include <eosio/chain/exceptions.hpp>
#include <fstream>
#include <fc/bitutil.hpp>
#include <fc/io/cfile.hpp>
#include <fc/io/raw.hpp>
Expand Down Expand Up @@ -50,7 +49,7 @@ namespace eosio { namespace chain {
uint32_t index_first_block_num = 0; //the first number in index & the log had it not been pruned
std::optional<block_log_prune_config> prune_config;

block_log_impl(std::optional<block_log_prune_config> prune_conf) :
explicit block_log_impl(std::optional<block_log_prune_config> prune_conf) :
prune_config(prune_conf) {
if(prune_config) {
EOS_ASSERT(prune_config->prune_blocks, block_log_exception, "block log prune configuration requires at least one block");
Expand All @@ -65,6 +64,7 @@ namespace eosio { namespace chain {
reopen();
}
}

void reopen();

//close() is called all over the place. Let's make this an explict call to ensure it only is called when
Expand Down Expand Up @@ -105,7 +105,7 @@ namespace eosio { namespace chain {

void flush();

void append(const signed_block_ptr& b);
void append(const signed_block_ptr& b, const block_id_type& id, const std::vector<char>& packed_block);

void prune();

Expand Down Expand Up @@ -355,11 +355,15 @@ namespace eosio { namespace chain {
}
}

void block_log::append(const signed_block_ptr& b) {
my->append(b);
void block_log::append(const signed_block_ptr& b, const block_id_type& id) {
my->append(b, id, fc::raw::pack(*b));
}

void block_log::append(const signed_block_ptr& b, const block_id_type& id, const std::vector<char>& packed_block) {
my->append(b, id, packed_block);
}

void detail::block_log_impl::append(const signed_block_ptr& b) {
void detail::block_log_impl::append(const signed_block_ptr& b, const block_id_type& id, const std::vector<char>& packed_block) {
try {
EOS_ASSERT( genesis_written_to_block_log, block_log_append_fail, "Cannot append to block log until the genesis is first written" );

Expand All @@ -377,13 +381,12 @@ namespace eosio { namespace chain {
"Append to index file occuring at wrong position.",
("position", (uint64_t) index_file.tellp())
("expected", (b->block_num() - index_first_block_num) * sizeof(uint64_t)));
auto data = fc::raw::pack(*b);
block_file.write(data.data(), data.size());
block_file.write(packed_block.data(), packed_block.size());
block_file.write((char*)&pos, sizeof(pos));
const uint64_t end = block_file.tellp();
index_file.write((char*)&pos, sizeof(pos));
head = b;
head_id = b->calculate_id();
head_id = id;

if(prune_config) {
if((pos&prune_config->prune_threshold) != (end&prune_config->prune_threshold))
Expand Down Expand Up @@ -566,7 +569,7 @@ namespace eosio { namespace chain {
block_file.write((char*)&totem, sizeof(totem));

if (first_block) {
append(first_block);
append(first_block, first_block->calculate_id(), fc::raw::pack(*first_block));
} else {
head.reset();
head_id = {};
Expand Down
61 changes: 45 additions & 16 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,17 @@ struct building_block {
const vector<digest_type>& new_protocol_feature_activations )
:_pending_block_header_state( prev.next( when, num_prev_blocks_to_confirm ) )
,_new_protocol_feature_activations( new_protocol_feature_activations )
,_trx_mroot_or_receipt_digests( digests_t{} )
{}

pending_block_header_state _pending_block_header_state;
std::optional<producer_authority_schedule> _new_pending_producer_schedule;
vector<digest_type> _new_protocol_feature_activations;
size_t _num_new_protocol_features_that_have_activated = 0;
deque<transaction_metadata_ptr> _pending_trx_metas;
deque<transaction_receipt> _pending_trx_receipts;
deque<digest_type> _pending_trx_receipt_digests;
deque<digest_type> _action_receipt_digests;
std::optional<checksum256_type> _transaction_mroot;
deque<transaction_receipt> _pending_trx_receipts; // boost deque in 1.71 with 1024 elements performs better
std::variant<checksum256_type, digests_t> _trx_mroot_or_receipt_digests;
digests_t _action_receipt_digests;
};

struct assembled_block {
Expand Down Expand Up @@ -403,9 +403,16 @@ struct controller_impl {
if( fork_head->dpos_irreversible_blocknum <= lib_num )
return;

const auto branch = fork_db.fetch_branch( fork_head->id, fork_head->dpos_irreversible_blocknum );
auto branch = fork_db.fetch_branch( fork_head->id, fork_head->dpos_irreversible_blocknum );
try {

std::vector<std::future<std::vector<char>>> v;
v.reserve( branch.size() );
for( auto bitr = branch.rbegin(); bitr != branch.rend(); ++bitr ) {
v.emplace_back( async_thread_pool( thread_pool.get_executor(), [b=(*bitr)->block]() { return fc::raw::pack(*b); } ) );
}
auto it = v.begin();

for( auto bitr = branch.rbegin(); bitr != branch.rend(); ++bitr ) {
if( read_mode == db_read_mode::IRREVERSIBLE ) {
controller::block_report br;
Expand All @@ -418,7 +425,8 @@ struct controller_impl {

// blog.append could fail due to failures like running out of space.
// Do it before commit so that in case it throws, DB can be rolled back.
blog.append( (*bitr)->block );
blog.append( (*bitr)->block, (*bitr)->id, it->get() );
++it;

db.commit( (*bitr)->block_num );
root_id = (*bitr)->id;
Expand All @@ -433,8 +441,12 @@ struct controller_impl {
//db.commit( fork_head->dpos_irreversible_blocknum ); // redundant

if( root_id != fork_db.root()->id ) {
branch.emplace_back(fork_db.root());
fork_db.advance_root( root_id );
}

// delete branch in thread pool
boost::asio::post( thread_pool.get_executor(), [branch{std::move(branch)}]() {} );
}

/**
Expand Down Expand Up @@ -1077,7 +1089,8 @@ struct controller_impl {
auto& bb = std::get<building_block>(pending->_block_stage);
auto orig_trx_receipts_size = bb._pending_trx_receipts.size();
auto orig_trx_metas_size = bb._pending_trx_metas.size();
auto orig_trx_receipt_digests_size = bb._pending_trx_receipt_digests.size();
auto orig_trx_receipt_digests_size = std::holds_alternative<digests_t>(bb._trx_mroot_or_receipt_digests) ?
std::get<digests_t>(bb._trx_mroot_or_receipt_digests).size() : 0;
auto orig_action_receipt_digests_size = bb._action_receipt_digests.size();
std::function<void()> callback = [this,
orig_trx_receipts_size,
Expand All @@ -1088,7 +1101,8 @@ struct controller_impl {
auto& bb = std::get<building_block>(pending->_block_stage);
bb._pending_trx_receipts.resize(orig_trx_receipts_size);
bb._pending_trx_metas.resize(orig_trx_metas_size);
bb._pending_trx_receipt_digests.resize(orig_trx_receipt_digests_size);
if( std::holds_alternative<digests_t>(bb._trx_mroot_or_receipt_digests) )
std::get<digests_t>(bb._trx_mroot_or_receipt_digests).resize(orig_trx_receipt_digests_size);
bb._action_receipt_digests.resize(orig_action_receipt_digests_size);
};

Expand Down Expand Up @@ -1440,7 +1454,9 @@ struct controller_impl {
r.cpu_usage_us = cpu_usage_us;
r.net_usage_words = net_usage_words;
r.status = status;
std::get<building_block>(pending->_block_stage)._pending_trx_receipt_digests.emplace_back( r.digest() );
auto& bb = std::get<building_block>(pending->_block_stage);
if( std::holds_alternative<digests_t>(bb._trx_mroot_or_receipt_digests) )
std::get<digests_t>(bb._trx_mroot_or_receipt_digests).emplace_back( r.digest() );
return r;
}

Expand Down Expand Up @@ -1764,6 +1780,21 @@ struct controller_impl {

auto& pbhs = pending->get_pending_block_header_state();

auto& bb = std::get<building_block>(pending->_block_stage);

auto action_merkle_fut = async_thread_pool( thread_pool.get_executor(),
[ids{std::move( bb._action_receipt_digests )}]() mutable {
return merkle( std::move( ids ) );
} );
const bool calc_trx_merkle = !std::holds_alternative<checksum256_type>(bb._trx_mroot_or_receipt_digests);
std::future<checksum256_type> trx_merkle_fut;
if( calc_trx_merkle ) {
trx_merkle_fut = async_thread_pool( thread_pool.get_executor(),
[ids{std::move( std::get<digests_t>(bb._trx_mroot_or_receipt_digests) )}]() mutable {
return merkle( std::move( ids ) );
} );
}

// Update resource limits:
resource_limits.process_account_limit_updates();
const auto& chain_config = self.get_global_properties().configuration;
Expand All @@ -1774,12 +1805,10 @@ struct controller_impl {
);
resource_limits.process_block_usage(pbhs.block_num);

auto& bb = std::get<building_block>(pending->_block_stage);

// Create (unsigned) block:
auto block_ptr = std::make_shared<signed_block>( pbhs.make_block_header(
merkle( std::move( std::get<building_block>(pending->_block_stage)._pending_trx_receipt_digests ) ),
merkle( std::move( std::get<building_block>(pending->_block_stage)._action_receipt_digests ) ),
calc_trx_merkle ? trx_merkle_fut.get() : std::get<checksum256_type>(bb._trx_mroot_or_receipt_digests),
action_merkle_fut.get(),
bb._new_pending_producer_schedule,
std::move( bb._new_protocol_feature_activations ),
protocol_features.get_protocol_feature_set()
Expand Down Expand Up @@ -1950,6 +1979,9 @@ struct controller_impl {
auto producer_block_id = bsp->id;
start_block( b->timestamp, b->confirmed, new_protocol_feature_activations, s, producer_block_id, fc::time_point::maximum() );

// validated in create_block_state_future()
std::get<building_block>(pending->_block_stage)._trx_mroot_or_receipt_digests = b->transaction_mroot;

const bool existing_trxs_metas = !bsp->trxs_metas().empty();
const bool pub_keys_recovered = bsp->is_pub_keys_recovered();
const bool skip_auth_checks = self.skip_auth_check();
Expand Down Expand Up @@ -2027,9 +2059,6 @@ struct controller_impl {
}
}

// validated in create_block_state_future()
std::get<building_block>(pending->_block_stage)._transaction_mroot = b->transaction_mroot;

finalize_block();

auto& ab = std::get<assembled_block>(pending->_block_stage);
Expand Down
4 changes: 3 additions & 1 deletion libraries/chain/include/eosio/chain/block_log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ namespace eosio { namespace chain {
block_log(block_log&& other);
~block_log();

void append(const signed_block_ptr& b);
void append(const signed_block_ptr& b, const block_id_type& id);
void append(const signed_block_ptr& b, const block_id_type& id, const std::vector<char>& packed_block);

void flush();
void reset( const genesis_state& gs, const signed_block_ptr& genesis_block );
void reset( const chain_id_type& chain_id, uint32_t first_block_num );
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ namespace eosio { namespace chain {
using int128_t = __int128;
using uint128_t = unsigned __int128;
using bytes = vector<char>;
using digests_t = deque<digest_type>;

struct sha256_less {
bool operator()( const fc::sha256& lhs, const fc::sha256& rhs ) const {
Expand Down
2 changes: 1 addition & 1 deletion tests/block_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct block_log_fixture {
p->previous._hash[0] = fc::endian_reverse_u32(index-1);
p->header_extensions.push_back(std::make_pair<uint16_t, std::vector<char>>(0, std::vector<char>(a)));

log->append(p);
log->append(p, p->calculate_id());

if(index + 1 > written_data.size())
written_data.resize(index + 1);
Expand Down
2 changes: 1 addition & 1 deletion unittests/block_log_extract.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct block_log_extract_fixture {
void add(uint32_t index) {
signed_block_ptr p = std::make_shared<signed_block>();
p->previous._hash[0] = fc::endian_reverse_u32(index-1);
log->append(p);
log->append(p, p->calculate_id());
}

genesis_state gs;
Expand Down