Skip to content

Commit

Permalink
Merge pull request #340 from eosnetworkfoundation/state-history-impro…
Browse files Browse the repository at this point in the history
…vements

[3.2] Backporting state history plugin improvements and fixes
  • Loading branch information
vladtr authored Jul 22, 2022
2 parents b5fff95 + 8854dad commit 10b6513
Show file tree
Hide file tree
Showing 6 changed files with 425 additions and 176 deletions.
2 changes: 2 additions & 0 deletions libraries/state_history/create_deltas.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ bool include_delta(const chain::account_metadata_object& old, const chain::accou
}

bool include_delta(const chain::code_object& old, const chain::code_object& curr) { //
// code_object data that is exported by SHiP is never modified they are only deleted or created,
// see serialization of history_serial_wrapper<eosio::chain::code_object>
return false;
}

Expand Down
58 changes: 56 additions & 2 deletions libraries/state_history/include/eosio/state_history/log.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
#include <fstream>
#include <stdint.h>

#include <boost/asio.hpp>
#include <boost/thread.hpp>

#include <eosio/chain/block_header.hpp>
#include <eosio/chain/exceptions.hpp>
#include <eosio/chain/types.hpp>
#include <fc/io/cfile.hpp>
#include <fc/log/logger.hpp>
#include <fc/log/logger_config.hpp> //set_thread_name

namespace eosio {

Expand Down Expand Up @@ -39,7 +43,7 @@ inline uint64_t ship_magic(uint16_t version, uint16_t features = 0) {
using namespace eosio::chain::literals;
return "ship"_n.to_uint64_t() | version | features<<16;
}
inline bool is_ship(uint64_t magic) {
inline bool is_ship(uint64_t magic) {
using namespace eosio::chain::literals;
return (magic & 0xffff'ffff'0000'0000) == "ship"_n.to_uint64_t();
}
Expand Down Expand Up @@ -78,6 +82,15 @@ class state_history_log {
uint32_t _end_block = 0;
chain::block_id_type last_block_id;

std::thread thr;
std::atomic<bool> write_thread_has_exception = false;
std::exception_ptr eptr;
boost::asio::io_context ctx;
boost::asio::io_context::strand work_strand{ctx};
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard =
boost::asio::make_work_guard(ctx);
std::recursive_mutex mx;

public:
state_history_log(const char* const name, std::string log_filename, std::string index_filename,
std::optional<state_history_log_prune_config> prune_conf = std::optional<state_history_log_prune_config>())
Expand All @@ -87,7 +100,7 @@ class state_history_log {
, prune_config(prune_conf) {
open_log();
open_index();

if(prune_config) {
EOS_ASSERT(prune_config->prune_blocks, chain::plugin_exception, "state history log prune configuration requires at least one block");
EOS_ASSERT(__builtin_popcount(prune_config->prune_threshold) == 1, chain::plugin_exception, "state history prune threshold must be power of 2");
Expand Down Expand Up @@ -119,9 +132,37 @@ class state_history_log {
vacuum();
}
}

thr = std::thread([this] {
try {
fc::set_os_thread_name(this->name);
this->ctx.run();
} catch (...) {
elog("catched exception from ${name} write thread", ("name", this->name));
eptr = std::current_exception();
write_thread_has_exception = true;
}
elog("${name} thread ended", ("name", this->name));
});
}

void stop() {
if (thr.joinable()) {
work_guard.reset();
thr.join();
}
}

~state_history_log() {
// complete execution before possible vacuuming
if (thr.joinable()) {
try {
work_guard.reset();
thr.join();
}
catch (const boost::thread_interrupted&) {/* suppressed */}
}

//nothing to do if log is empty or we aren't pruning
if(_begin_block == _end_block)
return;
Expand Down Expand Up @@ -158,6 +199,12 @@ class state_history_log {

template <typename F>
void write_entry(state_history_log_header header, const chain::block_id_type& prev_id, F write_payload) {
if (write_thread_has_exception) {
std::rethrow_exception(eptr);
}

std::unique_lock<std::recursive_mutex> lock(mx);

auto block_num = chain::block_header::num_from_id(header.block_id);
EOS_ASSERT(_begin_block == _end_block || block_num <= _end_block, chain::plugin_exception,
"missed a block in ${name}.log", ("name", name));
Expand Down Expand Up @@ -186,8 +233,10 @@ class state_history_log {
header.magic = ship_magic(get_ship_version(header.magic), ship_feature_pruned_log);

uint64_t pos = log.tellp();

write_header(header);
write_payload(log);

EOS_ASSERT(log.tellp() == pos + state_history_log_header_serial_size + header.payload_size, chain::plugin_exception,
"wrote payload with incorrect size to ${name}.log", ("name", name));
fc::raw::pack(log, pos);
Expand All @@ -211,19 +260,22 @@ class state_history_log {
fc::cfile& get_entry(uint32_t block_num, state_history_log_header& header) {
EOS_ASSERT(block_num >= _begin_block && block_num < _end_block, chain::plugin_exception,
"read non-existing block in ${name}.log", ("name", name));
std::lock_guard lock(mx);
log.seek(get_pos(block_num));
read_header(header);
return log;
}

chain::block_id_type get_block_id(uint32_t block_num) {
std::lock_guard lock(mx);
state_history_log_header header;
get_entry(block_num, header);
return header.block_id;
}

private:
//file position must be at start of last block's suffix (back pointer)
//called from open_log / ctor
bool get_last_block() {
state_history_log_header header;
uint64_t suffix;
Expand Down Expand Up @@ -305,6 +357,7 @@ class state_history_log {
EOS_ASSERT(get_last_block(), chain::plugin_exception, "recover ${name}.log failed", ("name", name));
}

// only called from constructor
void open_log() {
log.set_file_path(log_filename);
log.open("a+b");
Expand Down Expand Up @@ -351,6 +404,7 @@ class state_history_log {
}
}

// only called from constructor
void open_index() {
index.set_file_path(index_filename);
index.open("a+b");
Expand Down
Loading

0 comments on commit 10b6513

Please sign in to comment.