diff --git a/libraries/state_history/abi.cpp b/libraries/state_history/abi.cpp index 5cffe16fbf..d90b416e36 100644 --- a/libraries/state_history/abi.cpp +++ b/libraries/state_history/abi.cpp @@ -4,6 +4,9 @@ extern const char* const state_history_plugin_abi = R"({ { "name": "get_status_request_v0", "fields": [] }, + { + "name": "get_status_request_v1", "fields": [] + }, { "name": "block_position", "fields": [ { "name": "block_num", "type": "uint32" }, @@ -21,6 +24,19 @@ extern const char* const state_history_plugin_abi = R"({ { "name": "chain_id", "type": "checksum256$" } ] }, + { + "name": "get_status_result_v1", "fields": [ + { "name": "head", "type": "block_position" }, + { "name": "last_irreversible", "type": "block_position" }, + { "name": "trace_begin_block", "type": "uint32" }, + { "name": "trace_end_block", "type": "uint32" }, + { "name": "chain_state_begin_block", "type": "uint32" }, + { "name": "chain_state_end_block", "type": "uint32" }, + { "name": "chain_id", "type": "checksum256" }, + { "name": "finality_data_begin_block", "type": "uint32" }, + { "name": "finality_data_end_block", "type": "uint32" } + ] + }, { "name": "get_blocks_request_v0", "fields": [ { "name": "start_block_num", "type": "uint32" }, @@ -586,8 +602,8 @@ extern const char* const state_history_plugin_abi = R"({ { "new_type_name": "transaction_id", "type": "checksum256" } ], "variants": [ - { "name": "request", "types": ["get_status_request_v0", "get_blocks_request_v0", "get_blocks_ack_request_v0", "get_blocks_request_v1"] }, - { "name": "result", "types": ["get_status_result_v0", "get_blocks_result_v0", "get_blocks_result_v1"] }, + { "name": "request", "types": ["get_status_request_v0", "get_blocks_request_v0", "get_blocks_ack_request_v0", "get_blocks_request_v1", "get_status_request_v1"] }, + { "name": "result", "types": ["get_status_result_v0", "get_blocks_result_v0", "get_blocks_result_v1", "get_status_result_v1"] }, { "name": "action_receipt", "types": ["action_receipt_v0"] }, { "name": "action_trace", "types": ["action_trace_v0", "action_trace_v1"] }, diff --git a/libraries/state_history/include/eosio/state_history/types.hpp b/libraries/state_history/include/eosio/state_history/types.hpp index 75957702af..ed8ba2b6e7 100644 --- a/libraries/state_history/include/eosio/state_history/types.hpp +++ b/libraries/state_history/include/eosio/state_history/types.hpp @@ -83,6 +83,8 @@ struct block_position { struct get_status_request_v0 {}; +struct get_status_request_v1 : get_status_request_v0 {}; + struct get_status_result_v0 { block_position head = {}; block_position last_irreversible = {}; @@ -93,6 +95,11 @@ struct get_status_result_v0 { fc::sha256 chain_id = {}; }; +struct get_status_result_v1 : get_status_result_v0 { + uint32_t finality_data_begin_block = 0; + uint32_t finality_data_end_block = 0; +}; + struct get_blocks_request_v0 { uint32_t start_block_num = 0; uint32_t end_block_num = 0; @@ -129,8 +136,9 @@ struct get_blocks_result_v1 : get_blocks_result_v0 { std::optional finality_data; }; -using state_request = std::variant; -using state_result = std::variant; +// remember to add new request & result messages to end so binary numbering remains fixed for clients that don't consume the given current ABI +using state_request = std::variant; +using state_result = std::variant; using get_blocks_request = std::variant; using get_blocks_result = std::variant; @@ -141,7 +149,9 @@ using get_blocks_result = std::variant #include +#include +#include + #include #include #include @@ -16,691 +19,332 @@ extern const char* const state_history_plugin_abi; namespace eosio { -class session_manager; - -struct send_queue_entry_base { - virtual ~send_queue_entry_base() = default; - virtual void send_entry() = 0; -}; - -struct session_base { - virtual void send_update(bool changed) = 0; - virtual void send_update(const chain::signed_block_ptr& block, const chain::block_id_type& id) = 0; - virtual ~session_base() = default; - - std::optional current_request; - bool need_to_send_update = false; -}; +using namespace state_history; -class send_update_send_queue_entry : public send_queue_entry_base { - std::shared_ptr session; - const chain::signed_block_ptr block; - const chain::block_id_type id; +class session_base { public: - send_update_send_queue_entry(std::shared_ptr s, chain::signed_block_ptr block, const chain::block_id_type& id) - : session(std::move(s)) - , block(std::move(block)) - , id(id){} - - void send_entry() override { - if( block) { - session->send_update(block, id); - } else { - session->send_update(false); - } - } -}; - -/// Coordinate sending of queued entries. Only one session can read from the ship logs at a time so coordinate -/// their execution on the ship thread. -/// accessed from ship thread -class session_manager { -private: - using entry_ptr = std::unique_ptr; - - boost::asio::io_context& ship_io_context; - std::set> session_set; - bool sending = false; - std::deque, entry_ptr>> send_queue; - -public: - explicit session_manager(boost::asio::io_context& ship_io_context) - : ship_io_context(ship_io_context) {} - - void insert(std::shared_ptr s) { - session_set.insert(std::move(s)); - } - - void remove(const std::shared_ptr& s, bool active_entry) { - session_set.erase( s ); - if (active_entry) - pop_entry(); - } - - bool is_active(const std::shared_ptr& s) { - return session_set.count(s); - } - - void add_send_queue(std::shared_ptr s, entry_ptr p) { - send_queue.emplace_back(std::move(s), std::move(p)); - send(); - } - - void send() { - if (sending) - return; - if (send_queue.empty()) { - send_updates(); - return; - } - while (!send_queue.empty() && !is_active(send_queue[0].first)) { - send_queue.erase(send_queue.begin()); - } + session_base() = default; + session_base(const session_base&) = delete; + session_base& operator=(const session_base&) = delete; - if (!send_queue.empty()) { - sending = true; - send_queue[0].second->send_entry(); - } else { - send(); - } - } - - void pop_entry(bool call_send = true) { - send_queue.erase(send_queue.begin()); - sending = false; - if (call_send || !send_queue.empty()) { - // avoid blowing the stack - boost::asio::post(ship_io_context, [this]() { - send(); - }); - } - } - - void send_updates() { - for( auto& s : session_set ) { - if (s->need_to_send_update ) { - add_send_queue(s, std::make_unique(s, nullptr, chain::block_id_type{})); - } - } - } - - void send_update(const chain::signed_block_ptr& block, const chain::block_id_type& id) { - for( auto& s : session_set ) { - add_send_queue(s, std::make_unique(s, block, id)); - } - } + virtual void block_applied(const chain::block_num_type applied_block_num) = 0; + virtual ~session_base() = default; }; -template -class status_result_send_queue_entry : public send_queue_entry_base { - std::shared_ptr session; - std::vector data; +template +requires std::is_same_v || std::is_same_v +class session final : public session_base { + using coro_throwing_stream = boost::asio::use_awaitable_t<>::as_default_on_t>; + using coro_nonthrowing_steadytimer = boost::asio::as_tuple_t>::as_default_on_t; public: + session(SocketType&& s, Executor&& st, chain::controller& controller, + std::optional& trace_log, std::optional& chain_state_log, std::optional& finality_data_log, + GetBlockID&& get_block_id, GetBlock&& get_block, OnDone&& on_done, fc::logger& logger) : + strand(std::move(st)), stream(std::move(s)), wake_timer(strand), controller(controller), + trace_log(trace_log), chain_state_log(chain_state_log), finality_data_log(finality_data_log), + get_block_id(get_block_id), get_block(get_block), on_done(on_done), logger(logger), remote_endpoint_string(get_remote_endpoint_string()) { + fc_ilog(logger, "incoming state history connection from ${a}", ("a", remote_endpoint_string)); - explicit status_result_send_queue_entry(std::shared_ptr s) - : session(std::move(s)) {}; - - void send_entry() override { - data = fc::raw::pack(state_history::state_result{session->get_status_result()}); - - session->socket_stream->async_write(boost::asio::buffer(data), - [s{session}](boost::system::error_code ec, size_t) { - s->callback(ec, true, "async_write", [s] { - s->session_mgr.pop_entry(); - }); - }); - } -}; - -template -class blocks_ack_request_send_queue_entry : public send_queue_entry_base { - std::shared_ptr session; - eosio::state_history::get_blocks_ack_request_v0 req; - -public: - explicit blocks_ack_request_send_queue_entry(std::shared_ptr s, state_history::get_blocks_ack_request_v0&& r) - : session(std::move(s)) - , req(std::move(r)) {} - - void send_entry() override { - assert(session->current_request); - assert(std::holds_alternative(*session->current_request) || - std::holds_alternative(*session->current_request)); - - std::visit([&](auto& request) { request.max_messages_in_flight += req.num_messages; }, - *session->current_request); - session->send_update(false); + boost::asio::co_spawn(strand, read_loop(), [&](std::exception_ptr e) {check_coros_done(e);}); } -}; - -template -class blocks_request_send_queue_entry : public send_queue_entry_base { - std::shared_ptr session; - eosio::state_history::get_blocks_request req; - -public: - blocks_request_send_queue_entry(std::shared_ptr s, state_history::get_blocks_request&& r) - : session(std::move(s)) - , req(std::move(r)) {} - void send_entry() override { - session->update_current_request(req); - session->send_update(true); + void block_applied(const chain::block_num_type applied_block_num) { + //indicates a fork being applied for already-sent blocks; rewind the cursor + if(applied_block_num < next_block_cursor) + next_block_cursor = applied_block_num; + awake_if_idle(); } -}; - -template -class blocks_result_send_queue_entry : public send_queue_entry_base, public std::enable_shared_from_this> { - std::shared_ptr session; - state_history::get_blocks_result result; - std::vector data; - std::optional stream; - - template - void async_send(bool fin, const std::vector& d, Next&& next) { - session->socket_stream->async_write_some( - fin, boost::asio::buffer(d), - [me=this->shared_from_this(), next = std::forward(next)](boost::system::error_code ec, size_t) mutable { - if( ec ) { - me->stream.reset(); - } - me->session->callback(ec, true, "async_write", [me, next = std::move(next)]() mutable { - next(); - }); - }); - } - - template - void async_send(bool fin, std::unique_ptr& strm, Next&& next) { - data.resize(session->default_frame_size); - auto size = bio::read(*strm, data.data(), session->default_frame_size); - data.resize(size); - bool eof = (strm->sgetc() == EOF); - - session->socket_stream->async_write_some( fin && eof, boost::asio::buffer(data), - [me=this->shared_from_this(), fin, eof, next = std::forward(next)](boost::system::error_code ec, size_t) mutable { - if( ec ) { - me->stream.reset(); - } - me->session->callback(ec, true, "async_write", [me, fin, eof, next = std::move(next)]() mutable { - if (eof) { - next(); - } else { - me->async_send_buf(fin, std::move(next)); - } - }); - }); - } - - template - void async_send_buf(bool fin, Next&& next) { - std::visit([me=this->shared_from_this(), fin, next = std::forward(next)](auto& d) mutable { - me->async_send(fin, d, std::move(next)); - }, stream->buf); - } - - template - void send_log(uint64_t entry_size, bool fin, Next&& next) { - if (entry_size) { - data.resize(16); // should be at least for 1 byte (optional) + 10 bytes (variable sized uint64_t) - fc::datastream ds(data.data(), data.size()); - fc::raw::pack(ds, true); // optional true - history_pack_varuint64(ds, entry_size); - data.resize(ds.tellp()); - } else { - data = {'\0'}; // optional false - } - async_send(fin && entry_size == 0, data, - [fin, entry_size, next = std::forward(next), me=this->shared_from_this()]() mutable { - if (entry_size) { - me->async_send_buf(fin, [me, next = std::move(next)]() { - next(); - }); - } else - next(); - }); - } - - // last to be sent if result is get_blocks_result_v1 - void send_finality_data() { - assert(std::holds_alternative(result)); - stream.reset(); - send_log(session->get_finality_data_log_entry(std::get(result), stream), true, [me=this->shared_from_this()]() { - me->stream.reset(); - me->session->session_mgr.pop_entry(); - }); - } - - // second to be sent if result is get_blocks_result_v1; - // last to be sent if result is get_blocks_result_v0 - void send_deltas() { - stream.reset(); - std::visit(chain::overloaded{ - [&](state_history::get_blocks_result_v0& r) { - send_log(session->get_delta_log_entry(r, stream), true, [me=this->shared_from_this()]() { - me->stream.reset(); - me->session->session_mgr.pop_entry();}); }, - [&](state_history::get_blocks_result_v1& r) { - send_log(session->get_delta_log_entry(r, stream), false, [me=this->shared_from_this()]() { - me->send_finality_data(); }); }}, - result); - } - - // first to be sent - void send_traces() { - stream.reset(); - send_log(session->get_trace_log_entry(result, stream), false, [me=this->shared_from_this()]() { - me->send_deltas(); - }); - } - - template - void pack_result_base(const T& result, uint32_t variant_index) { - // pack the state_result{get_blocks_result} excluding the fields `traces` and `deltas`, - // and `finality_data` if get_blocks_result_v1 - fc::datastream ss; - - fc::raw::pack(ss, fc::unsigned_int(variant_index)); // pack the variant index of state_result{result} - fc::raw::pack(ss, static_cast(result)); - data.resize(ss.tellp()); - fc::datastream ds(data.data(), data.size()); - fc::raw::pack(ds, fc::unsigned_int(variant_index)); // pack the variant index of state_result{result} - fc::raw::pack(ds, static_cast(result)); - } - -public: - blocks_result_send_queue_entry(std::shared_ptr s, state_history::get_blocks_result&& r) - : session(std::move(s)), - result(std::move(r)) {} - - void send_entry() override { - std::visit( - chain::overloaded{ - [&](state_history::get_blocks_result_v0& r) { - static_assert(std::is_same_v>); - pack_result_base(r, 1); // 1 for variant index of get_blocks_result_v0 in state_result - }, - [&](state_history::get_blocks_result_v1& r) { - static_assert(std::is_same_v>); - pack_result_base(r, 2); // 2 for variant index of get_blocks_result_v1 in state_result - } - }, - result - ); - - async_send(false, data, [me=this->shared_from_this()]() { - me->send_traces(); - }); - } -}; - -template -struct session : session_base, std::enable_shared_from_this> { private: - Plugin& plugin; - session_manager& session_mgr; - std::optional> socket_stream; // ship thread only after creation - std::string description; - - uint32_t to_send_block_num = 0; - std::optional::const_iterator> position_it; - - const int32_t default_frame_size; - - friend class blocks_result_send_queue_entry; - friend class status_result_send_queue_entry; - friend class blocks_ack_request_send_queue_entry; - friend class blocks_request_send_queue_entry; - -public: - - session(Plugin& plugin, SocketType socket, session_manager& sm) - : plugin(plugin) - , session_mgr(sm) - , socket_stream(std::move(socket)) - , default_frame_size(plugin.default_frame_size) { - description = to_description_string(); - } - - void start() { - fc_ilog(plugin.get_logger(), "incoming connection from ${a}", ("a", description)); - socket_stream->auto_fragment(false); - socket_stream->binary(true); - if constexpr (std::is_same_v) { - socket_stream->next_layer().set_option(boost::asio::ip::tcp::no_delay(true)); - } - socket_stream->next_layer().set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024)); - socket_stream->next_layer().set_option(boost::asio::socket_base::receive_buffer_size(1024 * 1024)); - - socket_stream->async_accept([self = this->shared_from_this()](boost::system::error_code ec) { - self->callback(ec, false, "async_accept", [self] { - self->socket_stream->binary(false); - self->socket_stream->async_write( - boost::asio::buffer(state_history_plugin_abi, strlen(state_history_plugin_abi)), - [self](boost::system::error_code ec, size_t) { - self->callback(ec, false, "async_write", [self] { - self->socket_stream->binary(true); - self->start_read(); - }); - }); - }); - }); - } - -private: - void start_read() { - auto in_buffer = std::make_shared(); - socket_stream->async_read( - *in_buffer, [self = this->shared_from_this(), in_buffer](boost::system::error_code ec, size_t) { - self->callback(ec, false, "async_read", [self, in_buffer] { - auto d = boost::asio::buffer_cast(boost::beast::buffers_front(in_buffer->data())); - auto s = boost::asio::buffer_size(in_buffer->data()); - fc::datastream ds(d, s); - state_history::state_request req; - fc::raw::unpack(ds, req); - std::visit( [self]( auto& r ) { - self->process( r ); - }, req ); - self->start_read(); - }); - }); - } - - // should only be called once per session - std::string to_description_string() const { + std::string get_remote_endpoint_string() const { try { - boost::system::error_code ec; - auto re = socket_stream->next_layer().remote_endpoint(ec); - return boost::lexical_cast(re); + if constexpr(std::is_same_v) + return boost::lexical_cast(stream.next_layer().remote_endpoint()); + return "UNIX socket"; } catch (...) { - static uint32_t n = 0; - return "unknown " + std::to_string(++n); + return "(unknown)"; } } - uint64_t get_log_entry_impl(const eosio::state_history::get_blocks_result& result, - bool has_value, - std::optional& optional_log, - std::optional& buf) { - if (has_value) { - if( optional_log ) { - buf.emplace( optional_log->create_locked_decompress_stream() ); - return std::visit([&](auto& r) { return optional_log->get_unpacked_entry( r.this_block->block_num, *buf ); }, result); - } - } - return 0; - } - - uint64_t get_trace_log_entry(const eosio::state_history::get_blocks_result& result, - std::optional& buf) { - return std::visit([&](auto& r) { return get_log_entry_impl(r, r.traces.has_value(), plugin.get_trace_log(), buf); }, result); - } - - uint64_t get_delta_log_entry(const eosio::state_history::get_blocks_result& result, - std::optional& buf) { - return std::visit([&](auto& r) { return get_log_entry_impl(r, r.deltas.has_value(), plugin.get_chain_state_log(), buf); }, result); - } - - uint64_t get_finality_data_log_entry(const eosio::state_history::get_blocks_result_v1& result, - std::optional& buf) { - return get_log_entry_impl(result, result.finality_data.has_value(), plugin.get_finality_data_log(), buf); - } - - void process(state_history::get_status_request_v0&) { - fc_dlog(plugin.get_logger(), "received get_status_request_v0"); - - auto self = this->shared_from_this(); - auto entry_ptr = std::make_unique>(self); - session_mgr.add_send_queue(std::move(self), std::move(entry_ptr)); + void awake_if_idle() { + boost::asio::dispatch(strand, [this]() { + wake_timer.cancel_one(); + }); } - void process(state_history::get_blocks_request_v0& req) { - fc_dlog(plugin.get_logger(), "received get_blocks_request_v0 = ${req}", ("req", req)); - - auto self = this->shared_from_this(); - auto entry_ptr = std::make_unique>(self, std::move(req)); - session_mgr.add_send_queue(std::move(self), std::move(entry_ptr)); + void check_coros_done(std::exception_ptr e) { + //the only exception that should have bubbled out of the coros is a bad_alloc, bubble it up further. No need to bother + // with the rest of the cleanup: we'll be shutting down soon anyway due to bad_alloc + if(e) + std::rethrow_exception(e); + //coros always return on the session's strand + if(--coros_running == 0) + on_done(this); } - void process(state_history::get_blocks_request_v1& req) { - fc_dlog(plugin.get_logger(), "received get_blocks_request_v1 = ${req}", ("req", req)); - - auto self = this->shared_from_this(); - auto entry_ptr = std::make_unique>(self, std::move(req)); - session_mgr.add_send_queue(std::move(self), std::move(entry_ptr)); + template + void drop_exceptions(F&& f) { + try{ f(); } catch(...) {} } - void process(state_history::get_blocks_ack_request_v0& req) { - fc_dlog(plugin.get_logger(), "received get_blocks_ack_request_v0 = ${req}", ("req", req)); - if (!current_request) { - fc_dlog(plugin.get_logger(), " no current get_blocks_request_v0, discarding the get_blocks_ack_request_v0"); - return; - } + template + boost::asio::awaitable readwrite_coro_exception_wrapper(F&& f) { + coros_running++; - auto self = this->shared_from_this(); - auto entry_ptr = std::make_unique>(self, std::move(req)); - session_mgr.add_send_queue(std::move(self), std::move(entry_ptr)); - } - - state_history::get_status_result_v0 get_status_result() { - fc_dlog(plugin.get_logger(), "replying get_status_request_v0"); - state_history::get_status_result_v0 result; - result.head = plugin.get_block_head(); - result.last_irreversible = plugin.get_last_irreversible(); - result.chain_id = plugin.get_chain_id(); - auto&& trace_log = plugin.get_trace_log(); - if (trace_log) { - auto r = trace_log->block_range(); - result.trace_begin_block = r.first; - result.trace_end_block = r.second; + try { + co_await f(); } - auto&& chain_state_log = plugin.get_chain_state_log(); - if (chain_state_log) { - auto r = chain_state_log->block_range(); - result.chain_state_begin_block = r.first; - result.chain_state_end_block = r.second; + catch(std::bad_alloc&) { + throw; } - fc_dlog(plugin.get_logger(), "pushing get_status_result_v0 to send queue"); - - return result; - } - - void update_current_request_impl(state_history::get_blocks_request_v0& req) { - fc_dlog(plugin.get_logger(), "replying get_blocks_request_v0 = ${req}", ("req", req)); - to_send_block_num = std::max(req.start_block_num, plugin.get_first_available_block_num()); - for (auto& cp : req.have_positions) { - if (req.start_block_num <= cp.block_num) - continue; - auto id = plugin.get_block_id(cp.block_num); - if (!id || *id != cp.block_id) - req.start_block_num = std::min(req.start_block_num, cp.block_num); - - if (!id) { - to_send_block_num = std::min(to_send_block_num, cp.block_num); - fc_dlog(plugin.get_logger(), "block ${block_num} is not available", ("block_num", cp.block_num)); - } else if (*id != cp.block_id) { - to_send_block_num = std::min(to_send_block_num, cp.block_num); - fc_dlog(plugin.get_logger(), "the id for block ${block_num} in block request have_positions does not match the existing", - ("block_num", cp.block_num)); - } + catch(fc::exception& e) { + if(has_logged_exception.test_and_set() == false) + fc_ilog(logger, "state history connection from ${a} failed: ${w}", ("a", remote_endpoint_string)("w", e.top_message())); } - fc_dlog(plugin.get_logger(), " get_blocks_request_v0 start_block_num set to ${num}", ("num", to_send_block_num)); - - if( !req.have_positions.empty() ) { - position_it = req.have_positions.begin(); + catch(boost::system::system_error& e) { + if(has_logged_exception.test_and_set() == false) + fc_ilog(logger, "state history connection from ${a} failed: ${w}", ("a", remote_endpoint_string)("w", e.code().message())); } - } - - void update_current_request(state_history::get_blocks_request& req) { - assert(std::holds_alternative(req) || - std::holds_alternative(req)); - - std::visit( [&](auto& request) { update_current_request_impl(request); }, req ); - current_request = std::move(req); - } - - template // get_blocks_result_v0 or get_blocks_result_v1 - void send_update(state_history::get_blocks_request_v0& request, bool fetch_finality_data, T&& result, const chain::signed_block_ptr& block, const chain::block_id_type& id) { - need_to_send_update = true; - - result.last_irreversible = plugin.get_last_irreversible(); - uint32_t current = - request.irreversible_only ? result.last_irreversible.block_num : result.head.block_num; - - fc_dlog( plugin.get_logger(), "irreversible_only: ${i}, last_irreversible: ${p}, head.block_num: ${h}", ("i", request.irreversible_only)("p", result.last_irreversible.block_num)("h", result.head.block_num)); - if (to_send_block_num > current || to_send_block_num >= request.end_block_num) { - fc_dlog( plugin.get_logger(), "Not sending, to_send_block_num: ${s}, current: ${c} request.end_block_num: ${b}", - ("s", to_send_block_num)("c", current)("b", request.end_block_num) ); - session_mgr.pop_entry(false); - return; + catch(std::exception& e) { + if(has_logged_exception.test_and_set() == false) + fc_ilog(logger, "state history connection from ${a} failed: ${w}", ("a", remote_endpoint_string)("w", e.what())); } - - // not just an optimization, on accepted_block signal may not be able to find block_num in forkdb as it has not been validated - // until after the accepted_block signal - std::optional block_id = - (block && block->block_num() == to_send_block_num) ? id : plugin.get_block_id(to_send_block_num); - - if (block_id && position_it && (*position_it)->block_num == to_send_block_num) { - // This branch happens when the head block of nodeos is behind the head block of connecting client. - // In addition, the client told us the corresponding block id for block_num we are going to send. - // We can send the block when the block_id is different. - auto& itr = *position_it; - auto block_id_seen_by_client = itr->block_id; - ++itr; - if (itr == request.have_positions.end()) - position_it.reset(); - - if(block_id_seen_by_client == *block_id) { - ++to_send_block_num; - session_mgr.pop_entry(false); - return; - } + catch(...) { + if(has_logged_exception.test_and_set() == false) + fc_ilog(logger, "state history connection from ${a} failed", ("a", remote_endpoint_string)); } - if (block_id) { - result.this_block = state_history::block_position{to_send_block_num, *block_id}; - auto prev_block_id = plugin.get_block_id(to_send_block_num - 1); - if (prev_block_id) - result.prev_block = state_history::block_position{to_send_block_num - 1, *prev_block_id}; - if (request.fetch_block) { - uint32_t block_num = block ? block->block_num() : 0; // block can be nullptr in testing - plugin.get_block(to_send_block_num, block_num, block, result.block); + drop_exceptions([this](){ stream.next_layer().close(); }); + drop_exceptions([this](){ awake_if_idle(); }); + } + + boost::asio::awaitable read_loop() { + co_await readwrite_coro_exception_wrapper([this]() -> boost::asio::awaitable { + wake_timer.expires_at(std::chrono::steady_clock::time_point::max()); + + if constexpr(std::is_same_v) + stream.next_layer().set_option(boost::asio::ip::tcp::no_delay(true)); + stream.next_layer().set_option(boost::asio::socket_base::send_buffer_size(1024*1024)); + stream.write_buffer_bytes(512*1024); + stream.set_option(boost::beast::websocket::stream_base::decorator([](boost::beast::websocket::response_type& res) { + res.set(boost::beast::http::field::server, "state_history/" + app().version_string()); + })); + + co_await stream.async_accept(); + co_await stream.async_write(boost::asio::const_buffer(state_history_plugin_abi, strlen(state_history_plugin_abi))); + stream.binary(true); + boost::asio::co_spawn(strand, write_loop(), [&](std::exception_ptr e) {check_coros_done(e);}); + + while(true) { + boost::beast::flat_buffer b; + co_await stream.async_read(b); + const state_request req = fc::raw::unpack>(static_cast(b.cdata().data()), b.size()); + + auto& self = *this; //gcc10 ICE workaround wrt capturing 'this' in a coro + co_await boost::asio::co_spawn(app().get_io_service(), [&]() -> boost::asio::awaitable { + /** + * This lambda executes on the main thread; upon returning, the enclosing coroutine continues execution on the connection's strand + */ + std::visit(chain::overloaded { + [&self]>>(const GetStatusRequestV0orV1&) { + self.queued_status_requests.emplace_back(std::is_same_v); + }, + [&self]>>(const GetBlocksRequestV0orV1& gbr) { + self.current_blocks_request_v1_finality.reset(); + self.current_blocks_request = gbr; + if constexpr(std::is_same_v) + self.current_blocks_request_v1_finality = gbr.fetch_finality_data; + + for(const block_position& haveit : self.current_blocks_request.have_positions) { + if(self.current_blocks_request.start_block_num <= haveit.block_num) + continue; + if(const std::optional id = self.get_block_id(haveit.block_num); !id || *id != haveit.block_id) + self.current_blocks_request.start_block_num = std::min(self.current_blocks_request.start_block_num, haveit.block_num); + } + self.current_blocks_request.have_positions.clear(); + }, + [&self](const get_blocks_ack_request_v0& gbar0) { + self.send_credits += gbar0.num_messages; + } + }, req); + co_return; + }, boost::asio::use_awaitable); + + awake_if_idle(); } - if (request.fetch_traces && plugin.get_trace_log()) - result.traces.emplace(); - if (request.fetch_deltas && plugin.get_chain_state_log()) - result.deltas.emplace(); - if constexpr (std::is_same_v) { - if (fetch_finality_data && plugin.get_finality_data_log()) { - result.finality_data.emplace(); - } - } - } - ++to_send_block_num; - - // during syncing if block is older than 5 min, log every 1000th block - bool fresh_block = fc::time_point::now() - plugin.get_head_block_timestamp() < fc::minutes(5); - if (fresh_block || (result.this_block && result.this_block->block_num % 1000 == 0)) { - fc_ilog(plugin.get_logger(), - "pushing result " - "{\"head\":{\"block_num\":${head}},\"last_irreversible\":{\"block_num\":${last_irr}},\"this_block\":{" - "\"block_num\":${this_block}, \"block_id\":${this_id}}} to send queue", - ("head", result.head.block_num)("last_irr", result.last_irreversible.block_num) - ("this_block", result.this_block ? result.this_block->block_num : fc::variant()) - ("this_id", result.this_block ? fc::variant{result.this_block->block_id} : fc::variant{})); - } - - --request.max_messages_in_flight; - need_to_send_update = to_send_block_num <= current && - to_send_block_num < request.end_block_num; - - std::make_shared>(this->shared_from_this(), std::move(result))->send_entry(); + }); } - bool no_request_or_not_max_messages_in_flight() { - if (!current_request) - return true; + get_status_result_v1 fill_current_status_result() { + get_status_result_v1 ret; - uint32_t max_messages_in_flight = std::visit( - [&](auto& request) -> uint32_t { return request.max_messages_in_flight; }, - *current_request); + ret.head = {controller.head_block_num(), controller.head_block_id()}; + ret.last_irreversible = {controller.last_irreversible_block_num(), controller.last_irreversible_block_id()}; + ret.chain_id = controller.get_chain_id(); + if(trace_log) + std::tie(ret.trace_begin_block, ret.trace_end_block) = trace_log->block_range(); + if(chain_state_log) + std::tie(ret.chain_state_begin_block, ret.chain_state_end_block) = chain_state_log->block_range(); + if(finality_data_log) + std::tie(ret.finality_data_begin_block, ret.finality_data_end_block) = finality_data_log->block_range(); - return !max_messages_in_flight; + return ret; } - void send_update(const state_history::block_position& head, const chain::signed_block_ptr& block, const chain::block_id_type& id) { - if (no_request_or_not_max_messages_in_flight()) { - session_mgr.pop_entry(false); - return; - } + boost::asio::awaitable write_log_entry(std::optional& log_stream, std::optional& log, chain::block_num_type block_num) { + uint64_t unpacked_size = 0; - assert(current_request); - assert(std::holds_alternative(*current_request) || - std::holds_alternative(*current_request)); - - std::visit(eosio::chain::overloaded{ - [&](eosio::state_history::get_blocks_request_v0& request) { - state_history::get_blocks_result_v0 result; - result.head = head; - send_update(request, false, std::move(result), block, id); }, - [&](eosio::state_history::get_blocks_request_v1& request) { - state_history::get_blocks_result_v1 result; - result.head = head; - send_update(request, request.fetch_finality_data, std::move(result), block, id); } }, - *current_request); - } + if(log_stream) //will be unset if either request did not ask for this log entry, or the log isn't enabled + unpacked_size = log->get_unpacked_entry(block_num, *log_stream); //will return 0 if log does not include the block num asked for - void send_update(const chain::signed_block_ptr& block, const chain::block_id_type& id) override { - if (no_request_or_not_max_messages_in_flight()) { - session_mgr.pop_entry(false); - return; - } - - auto block_num = block->block_num(); - to_send_block_num = std::min(block_num, to_send_block_num); - send_update(state_history::block_position{block_num, id}, block, id); - } + if(unpacked_size) { + char buff[1024*1024]; + fc::datastream ds(buff, sizeof(buff)); + fc::raw::pack(ds, true); + history_pack_varuint64(ds, unpacked_size); + co_await stream.async_write_some(false, boost::asio::buffer(buff, ds.tellp())); - void send_update(bool changed) override { - if (changed || need_to_send_update) { - send_update(plugin.get_block_head(), nullptr, chain::block_id_type{}); - } else { - session_mgr.pop_entry(false); + ///TODO: why is there an uncompressed option in the variant?! Shouldn't it always be compressed? was this for old unit tests? + bio::filtering_istreambuf& decompression_stream = *std::get>(log_stream->buf); + std::streamsize red = 0; + while((red = bio::read(decompression_stream, buff, sizeof(buff))) != -1) { + if(red == 0) + continue; + co_await stream.async_write_some(false, boost::asio::buffer(buff, red)); + } + } + else { + co_await stream.async_write_some(false, boost::asio::buffer(fc::raw::pack(false))); } } - template - void callback(const boost::system::error_code& ec, bool active_entry, const char* what, F f) { - if( !ec ) { - try { - f(); - return; - } catch( const fc::exception& e ) { - fc_elog( plugin.get_logger(), "${e}", ("e", e.to_detail_string()) ); - } catch( const std::exception& e ) { - fc_elog( plugin.get_logger(), "${e}", ("e", e.what()) ); - } catch( ... ) { - fc_elog( plugin.get_logger(), "unknown exception" ); - } - } else { - if (ec == boost::asio::error::operation_aborted || - ec == boost::asio::error::connection_reset || - ec == boost::asio::error::eof || - ec == boost::beast::websocket::error::closed) { - fc_dlog(plugin.get_logger(), "${w}: ${m}", ("w", what)("m", ec.message())); - } else { - fc_elog(plugin.get_logger(), "${w}: ${m}", ("w", what)("m", ec.message())); - } - } + boost::asio::awaitable write_loop() { + co_await readwrite_coro_exception_wrapper([this]() -> boost::asio::awaitable { + get_status_result_v1 current_status_result; + struct block_package { + get_blocks_result_base blocks_result_base; + bool is_v1_request = false; + chain::block_num_type this_block_num = 0; //this shouldn't be needed post log de-mutexing + std::optional trace_stream; + std::optional state_stream; + std::optional finality_stream; + }; + + while(true) { + if(!stream.is_open()) + break; + + std::deque status_requests; + std::optional block_to_send; + + auto& self = *this; //gcc10 ICE workaround wrt capturing 'this' in a coro + co_await boost::asio::co_spawn(app().get_io_service(), [&]() -> boost::asio::awaitable { + /** + * This lambda executes on the main thread; upon returning, the enclosing coroutine continues execution on the connection's strand + */ + status_requests = std::move(self.queued_status_requests); + + //decide what block -- if any -- to send out + const chain::block_num_type latest_to_consider = self.current_blocks_request.irreversible_only ? + self.controller.last_irreversible_block_num() : self.controller.head_block_num(); + if(self.send_credits && self.next_block_cursor <= latest_to_consider && self.next_block_cursor < self.current_blocks_request.end_block_num) { + block_to_send.emplace( block_package{ + .blocks_result_base = { + .head = {self.controller.head_block_num(), self.controller.head_block_id()}, + .last_irreversible = {self.controller.last_irreversible_block_num(), self.controller.last_irreversible_block_id()} + }, + .is_v1_request = self.current_blocks_request_v1_finality.has_value(), + .this_block_num = self.next_block_cursor + }); + if(const std::optional this_block_id = self.get_block_id(self.next_block_cursor)) { + block_to_send->blocks_result_base.this_block = {self.current_blocks_request.start_block_num, *this_block_id}; + if(const std::optional last_block_id = self.get_block_id(self.next_block_cursor - 1)) + block_to_send->blocks_result_base.prev_block = {self.next_block_cursor - 1, *last_block_id}; + if(chain::signed_block_ptr sbp = get_block(self.next_block_cursor); sbp && self.current_blocks_request.fetch_block) + block_to_send->blocks_result_base.block = fc::raw::pack(*sbp); + if(self.current_blocks_request.fetch_traces && self.trace_log) + block_to_send->trace_stream.emplace(self.trace_log->create_locked_decompress_stream()); + if(self.current_blocks_request.fetch_deltas && self.chain_state_log) + block_to_send->state_stream.emplace(self.chain_state_log->create_locked_decompress_stream()); + if(block_to_send->is_v1_request && *self.current_blocks_request_v1_finality && self.finality_data_log) + block_to_send->finality_stream.emplace(self.finality_data_log->create_locked_decompress_stream()); + } + ++self.next_block_cursor; + --self.send_credits; + } + + if(status_requests.size()) + current_status_result = fill_current_status_result(); + co_return; + }, boost::asio::use_awaitable); + + //if there is nothing to send, go to sleep + if(status_requests.empty() && !block_to_send) { + co_await wake_timer.async_wait(); + continue; + } - // on exception allow session to be destroyed + //send replies to all send status requests first + for(const bool status_request_is_v1 : status_requests) { + if(status_request_is_v1 == false) //v0 status request, gets a v0 status result + co_await stream.async_write(boost::asio::buffer(fc::raw::pack(state_result((get_status_result_v0)current_status_result)))); + else + co_await stream.async_write(boost::asio::buffer(fc::raw::pack(state_result(current_status_result)))); + } - fc_ilog(plugin.get_logger(), "Closing connection from ${a}", ("a", description)); - session_mgr.remove( this->shared_from_this(), active_entry ); + //and then send the block + if(block_to_send) { + const fc::unsigned_int get_blocks_result_variant_index = block_to_send->is_v1_request ? + state_result(get_blocks_result_v1()).index() : + state_result(get_blocks_result_v0()).index(); + co_await stream.async_write_some(false, boost::asio::buffer(fc::raw::pack(get_blocks_result_variant_index))); + co_await stream.async_write_some(false, boost::asio::buffer(fc::raw::pack(block_to_send->blocks_result_base))); + + //accessing the _logs here violates the rule that those should only be accessed on the main thread. However, we're + // only calling get_unpacked_entry() on it which assumes the mutex is held by the locked_decompress_stream. So this is + // "safe" in some aspects but can deadlock + co_await write_log_entry(block_to_send->trace_stream, trace_log, block_to_send->this_block_num); + co_await write_log_entry(block_to_send->state_stream, chain_state_log, block_to_send->this_block_num); + if(block_to_send->is_v1_request) + co_await write_log_entry(block_to_send->finality_stream, finality_data_log, block_to_send->this_block_num); + + co_await stream.async_write_some(true, boost::asio::const_buffer()); + } + } + }); } + +private: + ///these items must only ever be touched by the session's strand + Executor strand; + coro_throwing_stream stream; + coro_nonthrowing_steadytimer wake_timer; + unsigned coros_running = 0; + std::atomic_flag has_logged_exception; //left as atomic_flag for useful test_and_set() interface + + ///these items must only ever be touched on the main thread + std::deque queued_status_requests; //false for v0, true for v1 + + get_blocks_request_v0 current_blocks_request; + std::optional current_blocks_request_v1_finality; //unset: current request is v0; set means v1; true/false is if finality requested + //current_blocks_request is modified with the current state; bind some more descriptive names to items frequently used + uint32_t& send_credits = current_blocks_request.max_messages_in_flight; + chain::block_num_type& next_block_cursor = current_blocks_request.start_block_num; + + chain::controller& controller; + std::optional& trace_log; + std::optional& chain_state_log; + std::optional& finality_data_log; + + GetBlockID get_block_id; + GetBlock get_block; + + ///these items might be used on either the strand or main thread + OnDone on_done; + fc::logger& logger; + const std::string remote_endpoint_string; }; } // namespace eosio diff --git a/plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_plugin.hpp b/plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_plugin.hpp index a9070115d1..186230ab32 100644 --- a/plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_plugin.hpp +++ b/plugins/state_history_plugin/include/eosio/state_history_plugin/state_history_plugin.hpp @@ -5,14 +5,7 @@ #include #include -namespace fc { -class variant; -} - namespace eosio { -using chain::bytes; -using std::shared_ptr; -typedef shared_ptr state_history_ptr; class state_history_plugin : public plugin { public: @@ -29,12 +22,8 @@ class state_history_plugin : public plugin { void handle_sighup() override; - const state_history_log* trace_log() const; - const state_history_log* chain_state_log() const; - const state_history_log* finality_data_log() const; - private: - state_history_ptr my; + unique_ptr my; }; } // namespace eosio diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index 966fe464d9..3aff0e48ae 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -21,8 +21,6 @@ #include -namespace ws = boost::beast::websocket; - namespace eosio { using namespace chain; using namespace state_history; @@ -38,18 +36,16 @@ template auto catch_and_log(F f) { try { return f(); - } catch (const fc::exception& e) { + } catch(const fc::exception& e) { fc_elog(_log, "${e}", ("e", e.to_detail_string())); - } catch (const std::exception& e) { + } catch(const std::exception& e) { fc_elog(_log, "${e}", ("e", e.what())); - } catch (...) { + } catch(...) { fc_elog(_log, "unknown exception"); } } -struct state_history_plugin_impl : std::enable_shared_from_this { - constexpr static uint64_t default_frame_size = 1024 * 1024; - +struct state_history_plugin_impl { private: chain_plugin* chain_plug = nullptr; std::optional trace_log; @@ -64,154 +60,87 @@ struct state_history_plugin_impl : std::enable_shared_from_this thread_pool; + named_thread_pool thread_pool; - session_manager session_mgr{thread_pool.get_executor()}; - - bool plugin_started = false; + struct connection_map_key_less { + using is_transparent = void; + template bool operator()(const L& lhs, const R& rhs) const { + return std::to_address(lhs) < std::to_address(rhs); + } + }; + //connections must only be touched by the main thread because on_accepted_block() will iterate over it + std::set, connection_map_key_less> connections; //gcc 11+ required for unordered_set public: void plugin_initialize(const variables_map& options); void plugin_startup(); void plugin_shutdown(); - session_manager& get_session_manager() { return session_mgr; } - - static fc::logger& get_logger() { return _log; } - - std::optional& get_trace_log() { return trace_log; } - std::optional& get_chain_state_log(){ return chain_state_log; } - std::optional& get_finality_data_log(){ return finality_data_log; } - - boost::asio::io_context& get_ship_executor() { return thread_pool.get_executor(); } - - // thread-safe - signed_block_ptr get_block(uint32_t block_num, uint32_t block_state_block_num, const signed_block_ptr& block) const { - chain::signed_block_ptr p; - try { - if (block_num == block_state_block_num) { - p = block; - } else { - p = chain_plug->chain().fetch_block_by_number(block_num); - } - } catch (...) { - } - return p; - } - - // thread safe - fc::sha256 get_chain_id() const { - return chain_plug->chain().get_chain_id(); - } - // thread-safe - void get_block(uint32_t block_num, uint32_t block_state_block_num, const signed_block_ptr& block, std::optional& result) const { - auto p = get_block(block_num, block_state_block_num, block); - if (p) - result = fc::raw::pack(*p); - } - - // thread-safe - std::optional get_block_id(uint32_t block_num) { - if( trace_log ) { - if ( auto id = trace_log->get_block_id( block_num ) ) + std::optional get_block_id(block_num_type block_num) { + if(trace_log) { + if(std::optional id = trace_log->get_block_id(block_num)) return id; } - if( chain_state_log ) { - if( auto id = chain_state_log->get_block_id( block_num ) ) + if(chain_state_log) { + if(std::optional id = chain_state_log->get_block_id(block_num)) return id; } - if( finality_data_log ) { - if( auto id = finality_data_log->get_block_id( block_num ) ) + if(finality_data_log) { + if(std::optional id = finality_data_log->get_block_id(block_num)) return id; } try { return chain_plug->chain().get_block_id_for_num(block_num); - } catch (...) { + } catch(...) { } return {}; } - // thread-safe - block_position get_block_head() const { - std::lock_guard g(mtx); - return { block_header::num_from_id(head_id), head_id }; - } - - // thread-safe - block_position get_last_irreversible() const { - std::lock_guard g(mtx); - return { block_header::num_from_id(lib_id), lib_id }; - } - - // thread-safe - time_point get_head_block_timestamp() const { - std::lock_guard g(mtx); - return head_timestamp; - } - - // thread-safe - uint32_t get_first_available_block_num() const { - return first_available_block; - } - template void create_listener(const std::string& address) { const boost::posix_time::milliseconds accept_timeout(200); - using socket_type = typename Protocol::socket; - fc::create_listener( - thread_pool.get_executor(), _log, accept_timeout, address, "", [this](socket_type&& socket) { - // Create a session object and run it - catch_and_log([&, this] { - auto s = std::make_shared>(*this, std::move(socket), - session_mgr); - session_mgr.insert(s); - s->start(); - }); - }); + // connections set must only be modified by main thread; run listener on main thread to avoid needing another post() + fc::create_listener(app().get_io_service(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) { + catch_and_log([this, &socket]() { + connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(), + trace_log, chain_state_log, finality_data_log, + [this](const chain::block_num_type block_num) { + return get_block_id(block_num); + }, + [this](const chain::block_num_type block_num) { + return chain_plug->chain().fetch_block_by_number(block_num); + }, + [this](session_base* conn) { + boost::asio::post(app().get_io_service(), [conn, this]() { + connections.erase(connections.find(conn)); + }); + }, _log)); + }); + }); } void listen(){ try { - if (!endpoint_address.empty()) { + if(!endpoint_address.empty()) create_listener(endpoint_address); - } - if (!unix_path.empty()) { + if(!unix_path.empty()) create_listener(unix_path); - } - } catch (std::exception&) { + } catch(std::exception&) { FC_THROW_EXCEPTION(plugin_exception, "unable to open listen socket"); } } - // called from main thread void on_applied_transaction(const transaction_trace_ptr& p, const packed_transaction_ptr& t) { - if (trace_log) + if(trace_log) trace_converter.add_transaction(p, t); } - // called from main thread - void update_current() { - const auto& chain = chain_plug->chain(); - std::lock_guard g(mtx); - head_id = chain.head_block_id(); - lib_id = chain.last_irreversible_block_id(); - head_timestamp = chain.head_block_time(); - } - - // called from main thread void on_accepted_block(const signed_block_ptr& block, const block_id_type& id) { - update_current(); - try { store_traces(block, id); store_chain_state(id, block->previous, block->block_num()); store_finality_data(id, block->previous); - } catch (const fc::exception& e) { + } catch(const fc::exception& e) { fc_elog(_log, "fc::exception: ${details}", ("details", e.to_detail_string())); // Both app().quit() and exception throwing are required. Without app().quit(), // the exception would be caught and drop before reaching main(). The exception is @@ -223,32 +152,21 @@ struct state_history_plugin_impl : std::enable_shared_from_thisshared_from_this(), block, id]() { - self->get_session_manager().send_update(block, id); - }); - } - + for(const std::unique_ptr& c : connections) + c->block_applied(block->block_num()); } - // called from main thread void on_block_start(uint32_t block_num) { clear_caches(); } - // called from main thread void clear_caches() { trace_converter.cached_traces.clear(); trace_converter.onblock_trace.reset(); } - // called from main thread void store_traces(const signed_block_ptr& block, const block_id_type& id) { - if (!trace_log) + if(!trace_log) return; state_history_log_header header{.magic = ship_magic(ship_current_version, 0), @@ -259,12 +177,11 @@ struct state_history_plugin_impl : std::enable_shared_from_thisempty(); - if (fresh) + if(fresh) fc_ilog(_log, "Placing initial state in block ${n}", ("n", block_num)); state_history_log_header header{ @@ -274,13 +191,12 @@ struct state_history_plugin_impl : std::enable_shared_from_this finality_data = chain_plug->chain().head_finality_data(); - if (!finality_data.has_value()) + if(!finality_data.has_value()) return; state_history_log_header header{ @@ -290,14 +206,10 @@ struct state_history_plugin_impl : std::enable_shared_from_this()) {} + : my(new state_history_plugin_impl()) {} state_history_plugin::~state_history_plugin() = default; @@ -341,7 +253,7 @@ void state_history_plugin_impl::plugin_initialize(const variables_map& options) EOS_ASSERT(chain_plug, chain::missing_chain_plugin_exception, ""); auto& chain = chain_plug->chain(); - if (!options.at("disable-replay-opts").as() && options.at("chain-state-history").as()) { + if(!options.at("disable-replay-opts").as() && options.at("chain-state-history").as()) { ilog("Setting disable-replay-opts=true required by state_history_plugin chain-state-history=true option"); chain.set_disable_replay_opts(true); } @@ -360,29 +272,29 @@ void state_history_plugin_impl::plugin_initialize(const variables_map& options) auto dir_option = options.at("state-history-dir").as(); std::filesystem::path state_history_dir; - if (dir_option.is_relative()) + if(dir_option.is_relative()) state_history_dir = app().data_dir() / dir_option; else state_history_dir = dir_option; - if (auto resmon_plugin = app().find_plugin()) + if(auto resmon_plugin = app().find_plugin()) resmon_plugin->monitor_directory(state_history_dir); endpoint_address = options.at("state-history-endpoint").as(); - if (options.count("state-history-unix-socket-path")) { + if(options.count("state-history-unix-socket-path")) { std::filesystem::path sock_path = options.at("state-history-unix-socket-path").as(); - if (sock_path.is_relative()) + if(sock_path.is_relative()) sock_path = app().data_dir() / sock_path; unix_path = sock_path.generic_string(); } - if (options.at("delete-state-history").as()) { + if(options.at("delete-state-history").as()) { fc_ilog(_log, "Deleting state history"); std::filesystem::remove_all(state_history_dir); } std::filesystem::create_directories(state_history_dir); - if (options.at("trace-history-debug-mode").as()) { + if(options.at("trace-history-debug-mode").as()) { trace_debug_mode = true; } @@ -391,7 +303,7 @@ void state_history_plugin_impl::plugin_initialize(const variables_map& options) options.count("state-history-stride") || options.count("max-retained-history-files"); state_history_log_config ship_log_conf; - if (options.count("state-history-log-retain-blocks")) { + if(options.count("state-history-log-retain-blocks")) { auto& ship_log_prune_conf = ship_log_conf.emplace(); ship_log_prune_conf.prune_blocks = options.at("state-history-log-retain-blocks").as(); //the arbitrary limit of 1000 here is mainly so that there is enough buffer for newly applied forks to be delivered to clients @@ -399,23 +311,23 @@ void state_history_plugin_impl::plugin_initialize(const variables_map& options) EOS_ASSERT(ship_log_prune_conf.prune_blocks >= 1000, plugin_exception, "state-history-log-retain-blocks must be 1000 blocks or greater"); EOS_ASSERT(!has_state_history_partition_options, plugin_exception, "state-history-log-retain-blocks cannot be used together with state-history-retained-dir," " state-history-archive-dir, state-history-stride or max-retained-history-files"); - } else if (has_state_history_partition_options){ + } else if(has_state_history_partition_options){ auto& config = ship_log_conf.emplace(); - if (options.count("state-history-retained-dir")) + if(options.count("state-history-retained-dir")) config.retained_dir = options.at("state-history-retained-dir").as(); - if (options.count("state-history-archive-dir")) + if(options.count("state-history-archive-dir")) config.archive_dir = options.at("state-history-archive-dir").as(); - if (options.count("state-history-stride")) + if(options.count("state-history-stride")) config.stride = options.at("state-history-stride").as(); - if (options.count("max-retained-history-files")) + if(options.count("max-retained-history-files")) config.max_retained_files = options.at("max-retained-history-files").as(); } - if (options.at("trace-history").as()) + if(options.at("trace-history").as()) trace_log.emplace("trace_history", state_history_dir , ship_log_conf); - if (options.at("chain-state-history").as()) + if(options.at("chain-state-history").as()) chain_state_log.emplace("chain_state_history", state_history_dir, ship_log_conf); - if (options.at("finality-data-history").as()) + if(options.at("finality-data-history").as()) finality_data_log.emplace("finality_data_history", state_history_dir, ship_log_conf); } FC_LOG_AND_RETHROW() @@ -429,7 +341,7 @@ void state_history_plugin::plugin_initialize(const variables_map& options) { void state_history_plugin_impl::plugin_startup() { try { const auto& chain = chain_plug->chain(); - update_current(); + uint32_t block_num = chain.head_block_num(); if( block_num > 0 && chain_state_log && chain_state_log->empty() ) { fc_ilog( _log, "Storing initial state on startup, this can take a considerable amount of time" ); @@ -437,30 +349,28 @@ void state_history_plugin_impl::plugin_startup() { fc_ilog( _log, "Done storing initial state on startup" ); } first_available_block = chain.earliest_available_block_num(); - if (trace_log) { + if(trace_log) { auto first_trace_block = trace_log->block_range().first; if( first_trace_block > 0 ) first_available_block = std::min( first_available_block, first_trace_block ); } - if (chain_state_log) { + if(chain_state_log) { auto first_state_block = chain_state_log->block_range().first; if( first_state_block > 0 ) first_available_block = std::min( first_available_block, first_state_block ); } - if (finality_data_log) { + if(finality_data_log) { auto first_state_block = finality_data_log->block_range().first; if( first_state_block > 0 ) first_available_block = std::min( first_available_block, first_state_block ); } fc_ilog(_log, "First available block for SHiP ${b}", ("b", first_available_block)); listen(); - // use of executor assumes only one thread - thread_pool.start( 1, [](const fc::exception& e) { + thread_pool.start(1, [](const fc::exception& e) { fc_elog( _log, "Exception in SHiP thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); app().quit(); }); - plugin_started = true; - } catch (std::exception& ex) { + } catch(std::exception& ex) { appbase::app().quit(); } } @@ -484,19 +394,4 @@ void state_history_plugin::handle_sighup() { fc::logger::update(logger_name, _log); } -const state_history_log* state_history_plugin::trace_log() const { - const auto& log = my->get_trace_log(); - return log ? std::addressof(*log) : nullptr; -} - -const state_history_log* state_history_plugin::chain_state_log() const { - const auto& log = my->get_chain_state_log(); - return log ? std::addressof(*log) : nullptr; -} - -const state_history_log* state_history_plugin::finality_data_log() const { - const auto& log = my->get_finality_data_log(); - return log ? std::addressof(*log) : nullptr; -} - } // namespace eosio diff --git a/plugins/state_history_plugin/tests/CMakeLists.txt b/plugins/state_history_plugin/tests/CMakeLists.txt deleted file mode 100644 index 411b11888a..0000000000 --- a/plugins/state_history_plugin/tests/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -add_executable( test_state_history main.cpp session_test.cpp plugin_config_test.cpp) -target_link_libraries(test_state_history state_history_plugin eosio_testing eosio_chain_wrap) -target_include_directories( test_state_history PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include" ) - -add_test(test_state_history test_state_history) -set_property(TEST test_state_history PROPERTY LABELS nonparallelizable_tests) \ No newline at end of file diff --git a/plugins/state_history_plugin/tests/main.cpp b/plugins/state_history_plugin/tests/main.cpp deleted file mode 100644 index e618f36999..0000000000 --- a/plugins/state_history_plugin/tests/main.cpp +++ /dev/null @@ -1,2 +0,0 @@ -#define BOOST_TEST_MODULE state_history_plugin -#include \ No newline at end of file diff --git a/plugins/state_history_plugin/tests/plugin_config_test.cpp b/plugins/state_history_plugin/tests/plugin_config_test.cpp deleted file mode 100644 index f89559c24b..0000000000 --- a/plugins/state_history_plugin/tests/plugin_config_test.cpp +++ /dev/null @@ -1,40 +0,0 @@ -#include -#include -#include -#include -#include -#include - -BOOST_AUTO_TEST_CASE(state_history_plugin_default_tests) { - fc::temp_directory tmp; - appbase::scoped_app app; - - auto tmp_path = tmp.path().string(); - std::array args = {"test_state_history", "--trace-history", "--state-history-stride", "10", - "--data-dir", tmp_path.c_str()}; - - BOOST_CHECK(app->initialize(args.size(), const_cast(args.data()))); - auto& plugin = app->get_plugin(); - - BOOST_REQUIRE(plugin.trace_log()); - auto* config = std::get_if(&plugin.trace_log()->config()); - BOOST_REQUIRE(config); - BOOST_CHECK_EQUAL(config->max_retained_files, UINT32_MAX); -} - -BOOST_AUTO_TEST_CASE(state_history_plugin_retain_blocks_tests) { - fc::temp_directory tmp; - appbase::scoped_app app; - - auto tmp_path = tmp.path().string(); - std::array args = {"test_state_history", "--trace-history", "--state-history-log-retain-blocks", "4242", - "--data-dir", tmp_path.c_str()}; - - BOOST_CHECK(app->initialize(args.size(), const_cast(args.data()))); - auto& plugin = app->get_plugin(); - - BOOST_REQUIRE(plugin.trace_log()); - auto* config = std::get_if(&plugin.trace_log()->config()); - BOOST_REQUIRE(config); - BOOST_CHECK_EQUAL(config->prune_blocks, 4242u); -} diff --git a/plugins/state_history_plugin/tests/session_test.cpp b/plugins/state_history_plugin/tests/session_test.cpp deleted file mode 100644 index 7e7b0adbbb..0000000000 --- a/plugins/state_history_plugin/tests/session_test.cpp +++ /dev/null @@ -1,876 +0,0 @@ -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include - -namespace beast = boost::beast; // from -namespace http = beast::http; // from -namespace websocket = beast::websocket; // from -namespace net = boost::asio; // from -namespace bio = boost::iostreams; -using tcp = boost::asio::ip::tcp; // from - -using namespace eosio::state_history; - -namespace eosio::state_history { - -template -inline void unpack_varuint64(fc::datastream& ds, uint64_t& val) { - val = 0; - int shift = 0; - uint8_t b = 0; - do { - fc::raw::unpack(ds, b); - val |= uint64_t(b & 0x7f) << shift; - shift += 7; - } while (b & 0x80); -} - -template -void unpack_big_bytes(fc::datastream& ds, eosio::chain::bytes& v) { - uint64_t sz; - unpack_varuint64(ds, sz); - v.resize(sz); - if (sz) - ds.read(v.data(), v.size()); -} - -template -void unpack_big_bytes(fc::datastream& ds, std::optional& v) { - bool has_value; - fc::raw::unpack(ds, has_value); - if (has_value) { - unpack_big_bytes(ds, v.emplace()); - } else { - v.reset(); - } -} - -template -fc::datastream& operator>>(fc::datastream& ds, eosio::state_history::get_blocks_result_v0& obj) { - fc::raw::unpack(ds, obj.head); - fc::raw::unpack(ds, obj.last_irreversible); - fc::raw::unpack(ds, obj.this_block); - fc::raw::unpack(ds, obj.prev_block); - unpack_big_bytes(ds, obj.block); - unpack_big_bytes(ds, obj.traces); - unpack_big_bytes(ds, obj.deltas); - return ds; -} - -template -fc::datastream& operator>>(fc::datastream& ds, eosio::state_history::get_blocks_result_v1& obj) { - fc::raw::unpack(ds, obj.head); - fc::raw::unpack(ds, obj.last_irreversible); - fc::raw::unpack(ds, obj.this_block); - fc::raw::unpack(ds, obj.prev_block); - unpack_big_bytes(ds, obj.block); - unpack_big_bytes(ds, obj.traces); - unpack_big_bytes(ds, obj.deltas); - unpack_big_bytes(ds, obj.finality_data); - return ds; -} -} // namespace eosio::state_history - -//------------------------------------------------------------------------------ - - -// Report a failure -void fail(beast::error_code ec, char const* what) { std::cerr << what << ": " << ec.message() << "\n"; } - -struct mock_state_history_plugin { - net::io_context main_ioc; - net::io_context ship_ioc; - using ioc_work_t = boost::asio::executor_work_guard; - std::optional main_ioc_work; - std::optional ship_ioc_work; - - eosio::state_history::block_position block_head; - fc::temp_directory log_dir; - std::optional trace_log; - std::optional state_log; - std::optional finality_data_log; - std::atomic stopping = false; - eosio::session_manager session_mgr{ship_ioc}; - std::unordered_map block_ids; - - constexpr static uint32_t default_frame_size = 1024; - - std::optional& get_trace_log() { return trace_log; } - std::optional& get_chain_state_log() { return state_log; } - std::optional& get_finality_data_log() { return finality_data_log; } - fc::sha256 get_chain_id() const { return {}; } - - boost::asio::io_context& get_ship_executor() { return ship_ioc; } - - void setup_state_history_log(bool fetch_finality_data, eosio::state_history_log_config conf = {}) { - trace_log.emplace("ship_trace", log_dir.path(), conf); - state_log.emplace("ship_state", log_dir.path(), conf); - if( fetch_finality_data ) { - finality_data_log.emplace("ship_finality_data", log_dir.path(), conf); - } - } - - fc::logger logger = fc::logger::get(DEFAULT_LOGGER); - - fc::logger& get_logger() { return logger; } - - void get_block(uint32_t block_num, uint32_t block_state_block_num, const eosio::chain::signed_block_ptr& block, - std::optional& result) const { - result.emplace().resize(16); - } - - fc::time_point get_head_block_timestamp() const { - return fc::time_point{}; - } - - fc::sha256 block_id_for(const uint32_t bnum, const std::string& nonce = {}) { - if (auto it = block_ids.find(bnum); it != block_ids.end()) - return it->second; - fc::sha256 m = fc::sha256::hash(fc::sha256::hash(std::to_string(bnum)+nonce)); - m._hash[0] = fc::endian_reverse_u32(bnum); - block_ids[bnum] = m; - return m; - } - - std::optional get_block_id(uint32_t block_num) { - std::optional id; - if( trace_log ) { - id = trace_log->get_block_id( block_num ); - if( id ) - return id; - } - if( state_log ) { - id = state_log->get_block_id( block_num ); - if( id ) - return id; - } - if( finality_data_log ) { - id = finality_data_log->get_block_id( block_num ); - if( id ) - return id; - } - return block_id_for(block_num); - } - - eosio::state_history::block_position get_block_head() { return block_head; } - eosio::state_history::block_position get_last_irreversible() { return block_head; } - - uint32_t get_first_available_block_num() const { return 0; } - - void add_session(std::shared_ptr s) { - session_mgr.insert(std::move(s)); - } - -}; - -using session_type = eosio::session; - -struct test_server : mock_state_history_plugin { - std::vector threads; - tcp::endpoint local_address{net::ip::make_address("127.0.0.1"), 0}; - - void run() { - - main_ioc_work.emplace( boost::asio::make_work_guard( main_ioc ) ); - ship_ioc_work.emplace( boost::asio::make_work_guard( ship_ioc ) ); - - threads.emplace_back([this]{ main_ioc.run(); }); - threads.emplace_back([this]{ ship_ioc.run(); }); - - auto create_session = [this](tcp::socket&& peer_socket) { - auto s = std::make_shared(*this, std::move(peer_socket), session_mgr); - s->start(); - add_session(s); - }; - - // Create and launch a listening port - auto server = std::make_shared>( - ship_ioc, logger, boost::posix_time::milliseconds(100), "", local_address, "", create_session); - server->do_accept(); - local_address = server->acceptor().local_endpoint(); - } - - ~test_server() { - stopping = true; - ship_ioc_work.reset(); - main_ioc_work.reset(); - ship_ioc.stop(); - - for (auto& thr : threads) { - thr.join(); - } - threads.clear(); - } -}; - -std::vector zlib_compress(const char* data, size_t size) { - std::vector out; - bio::filtering_ostream comp; - comp.push(bio::zlib_compressor(bio::zlib::default_compression)); - comp.push(bio::back_inserter(out)); - bio::write(comp, data, size); - bio::close(comp); - return out; -} - -std::vector generate_data(uint32_t sz) { - std::vector data(sz); - std::mt19937 rng; - std::generate_n(data.begin(), data.size(), std::ref(rng)); - return data; -} - -struct state_history_test_fixture { - test_server server; - net::io_context ioc; - websocket::stream ws; - - std::vector> written_data; - - state_history_test_fixture() - : ws(ioc) { - - // mock uses DEFAULT_LOGGER - fc::logger::get(DEFAULT_LOGGER).set_log_level(fc::log_level::debug); - - // start the server with 2 threads - server.run(); - connect_to(server.local_address); - - // receives the ABI - beast::flat_buffer buffer; - ws.read(buffer); - std::string text((const char*)buffer.data().data(), buffer.data().size()); - BOOST_REQUIRE_EQUAL(text, state_history_plugin_abi); - ws.binary(true); - } - - void connect_to(tcp::endpoint addr) { - ws.next_layer().connect(addr); - // Update the host_ string. This will provide the value of the - // Host HTTP header during the WebSocket handshake. - // See https://tools.ietf.org/html/rfc7230#section-5.4 - std::string host = "http:://localhost:" + std::to_string(addr.port()); - - // Set a decorator to change the User-Agent of the handshake - ws.set_option(websocket::stream_base::decorator([](websocket::request_type& req) { - req.set(http::field::user_agent, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-client-coro"); - })); - - // Perform the websocket handshake - ws.handshake(host, "/"); - } - - void send_status_request() { send_request(eosio::state_history::get_status_request_v0{}); } - - void send_request(const eosio::state_history::state_request& request) { - auto request_bin = fc::raw::pack(request); - ws.write(net::buffer(request_bin)); - } - - void receive_result(eosio::state_history::state_result& result) { - beast::flat_buffer buffer; - ws.read(buffer); - - auto d = boost::asio::buffer_cast(boost::beast::buffers_front(buffer.data())); - auto s = boost::asio::buffer_size(buffer.data()); - fc::datastream ds(d, s); - fc::raw::unpack(ds, result); - } - - void verify_status(const eosio::state_history::get_status_result_v0& status) { - - send_status_request(); - eosio::state_history::state_result result; - receive_result(result); - - BOOST_REQUIRE(std::holds_alternative(result)); - - auto received_status = std::get(result); - - // we don't have `equal` declared in eosio::state_history::get_status_result_v0, just compare its serialized form - BOOST_CHECK(fc::raw::pack(status) == fc::raw::pack(received_status)); - } - - void add_to_log(uint32_t index, uint32_t type, std::vector&& decompressed_data, bool fetch_finality_data) { - uint64_t decompressed_byte_count = decompressed_data.size() * sizeof(int32_t); - - auto compressed = zlib_compress((const char*)decompressed_data.data(), decompressed_byte_count); - - eosio::state_history_log_header header; - header.block_id = server.block_id_for(index); - header.payload_size = compressed.size() + sizeof(type); - if (type == 1) { - header.payload_size += sizeof(uint64_t); - } - - auto write_log = [&](std::optional& log) { - assert(log); - std::unique_lock lk(log->_mx); - log->write_entry(header, server.block_id_for(index - 1), [&](auto& f) { - f.write((const char*)&type, sizeof(type)); - if (type == 1) { - f.write((const char*)&decompressed_byte_count, sizeof(decompressed_byte_count)); - } - f.write(compressed.data(), compressed.size()); - }); - lk.unlock(); - }; - - write_log(server.trace_log); - write_log(server.state_log); - if( fetch_finality_data ) { - write_log(server.finality_data_log); - } - - if (written_data.size() < index) - written_data.resize(index); - written_data[index - 1].swap(decompressed_data); - } - - ~state_history_test_fixture() { ws.close(websocket::close_code::normal); } -}; - -void store_read_test_case(test_server& server, uint64_t data_size, eosio::state_history_log_config config) { - fc::temp_directory log_dir; - eosio::state_history_log log("ship", log_dir.path(), config); - - eosio::state_history_log_header header; - header.block_id = server.block_id_for(1); - header.payload_size = 0; - auto data = generate_data(data_size); - - log.pack_and_write_entry(header, server.block_id_for(0), - [&](auto&& buf) { bio::write(buf, (const char*)data.data(), data.size() * sizeof(data[0])); }); - - // make sure the current file position is at the end of file - auto pos = log.get_log_file().tellp(); - log.get_log_file().seek_end(0); - BOOST_REQUIRE_EQUAL(log.get_log_file().tellp(), pos); - - eosio::locked_decompress_stream buf = log.create_locked_decompress_stream(); - log.get_unpacked_entry(1, buf); - - std::vector decompressed; - auto& strm = std::get>(buf.buf); - BOOST_CHECK(!!strm); - BOOST_CHECK(buf.lock.owns_lock()); - bio::copy(*strm, bio::back_inserter(decompressed)); - - BOOST_CHECK_EQUAL(data.size() * sizeof(data[0]), decompressed.size()); - BOOST_CHECK(std::equal(decompressed.begin(), decompressed.end(), (const char*)data.data())); -} - -BOOST_FIXTURE_TEST_CASE(store_read_entry_no_prune, state_history_test_fixture) { - store_read_test_case(server, 1024, {}); -} - -BOOST_FIXTURE_TEST_CASE(store_read_big_entry_no_prune, state_history_test_fixture) { - // test the case where the uncompressed data size exceeds 4GB - store_read_test_case(server, (1ULL<< 32) + (1ULL << 20), {}); -} - -BOOST_FIXTURE_TEST_CASE(store_read_entry_prune_enabled, state_history_test_fixture) { - store_read_test_case(server, 1024, eosio::state_history::prune_config{.prune_blocks = 100}); -} - -BOOST_FIXTURE_TEST_CASE(store_with_existing, state_history_test_fixture) { - uint64_t data_size = 512; - fc::temp_directory log_dir; - eosio::state_history_log log("ship", log_dir.path(), {}); - - eosio::state_history_log_header header; - header.block_id = server.block_id_for(1); - header.payload_size = 0; - auto data = generate_data(data_size); - - log.pack_and_write_entry(header, server.block_id_for(0), - [&](auto&& buf) { bio::write(buf, (const char*)data.data(), data.size() * sizeof(data[0])); }); - - header.block_id = server.block_id_for(2); - log.pack_and_write_entry(header, server.block_id_for(1), - [&](auto&& buf) { bio::write(buf, (const char*)data.data(), data.size() * sizeof(data[0])); }); - - // Do not allow starting from scratch for existing - header.block_id = server.block_id_for(1); - BOOST_CHECK_EXCEPTION( - log.pack_and_write_entry(header, server.block_id_for(0), [&](auto&& buf) { bio::write(buf, (const char*)data.data(), data.size() * sizeof(data[0])); }), - eosio::chain::plugin_exception, - []( const auto& e ) { - return e.to_detail_string().find( "Existing ship log" ) != std::string::npos; - } - ); -} - -void send_request(state_history_test_fixture& fixture, bool fetch_finality_data, uint32_t start_block_num, const std::vector& have_positions) { - if( fetch_finality_data ) { - get_blocks_request_v1 req { .fetch_finality_data = true }; - - req.start_block_num = start_block_num; - req.end_block_num = UINT32_MAX; - req.max_messages_in_flight = UINT32_MAX; - req.have_positions = have_positions; - req.irreversible_only = false; - req.fetch_block = true; - req.fetch_traces = true; - req.fetch_deltas = true; - - fixture.send_request(req); - } else { - fixture.send_request(eosio::state_history::get_blocks_request_v0{ - .start_block_num = start_block_num, - .end_block_num = UINT32_MAX, - .max_messages_in_flight = UINT32_MAX, - .have_positions = have_positions, - .irreversible_only = false, - .fetch_block = true, - .fetch_traces = true, - .fetch_deltas = true} - ); - } -} - -void test_session_no_prune_impl(state_history_test_fixture& fixture, bool fetch_finality_data) { - // setup block head for the server - fixture.server.setup_state_history_log(fetch_finality_data); - uint32_t head_block_num = 3; - fixture.server.block_head = {head_block_num, fixture.server.block_id_for(head_block_num)}; - - // generate the log data used for traces, deltas, and finality_data if required - uint32_t n = mock_state_history_plugin::default_frame_size; - fixture.add_to_log(1, n * sizeof(uint32_t), generate_data(n), fetch_finality_data); // original data format - fixture.add_to_log(2, 0, generate_data(n), fetch_finality_data); // format to accommodate the compressed size greater than 4GB - fixture.add_to_log(3, 1, generate_data(n), fetch_finality_data); // format to encode decompressed size to avoid decompress entire data upfront. - - // send a get_status_request and verify the result is what we expected - fixture.verify_status(eosio::state_history::get_status_result_v0{ - .head = {head_block_num, fixture.server.block_id_for(head_block_num)}, - .last_irreversible = {head_block_num, fixture.server.block_id_for(head_block_num)}, - .trace_begin_block = 1, - .trace_end_block = head_block_num + 1, - .chain_state_begin_block = 1, - .chain_state_end_block = head_block_num + 1}); - - // send a get_blocks_request to server - send_request(fixture, fetch_finality_data, 1, {}); - - eosio::state_history::state_result result; - // we should get 3 consecutive block result - for (int i = 0; i < 3; ++i) { - fixture.receive_result(result); - - if( fetch_finality_data ) { - BOOST_REQUIRE(std::holds_alternative(result)); - auto r = std::get(result); - BOOST_REQUIRE_EQUAL(r.head.block_num, fixture.server.block_head.block_num); - BOOST_REQUIRE(r.traces.has_value()); - BOOST_REQUIRE(r.deltas.has_value()); - BOOST_REQUIRE(r.finality_data.has_value()); - auto traces = r.traces.value(); - auto deltas = r.deltas.value(); - auto& data = fixture.written_data[i]; - auto data_size = data.size() * sizeof(int32_t); - BOOST_REQUIRE_EQUAL(traces.size(), data_size); - BOOST_REQUIRE_EQUAL(deltas.size(), data_size); - BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data())); - BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data())); - - auto finality_data = r.finality_data.value(); - BOOST_REQUIRE_EQUAL(finality_data.size(), data_size); - BOOST_REQUIRE(std::equal(finality_data.begin(), finality_data.end(), (const char*)data.data())); - } else { - BOOST_REQUIRE(std::holds_alternative(result)); - auto r = std::get(result); - BOOST_REQUIRE_EQUAL(r.head.block_num, fixture.server.block_head.block_num); - BOOST_REQUIRE(r.traces.has_value()); - BOOST_REQUIRE(r.deltas.has_value()); - auto traces = r.traces.value(); - auto deltas = r.deltas.value(); - auto& data = fixture.written_data[i]; - auto data_size = data.size() * sizeof(int32_t); - BOOST_REQUIRE_EQUAL(traces.size(), data_size); - BOOST_REQUIRE_EQUAL(deltas.size(), data_size); - BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data())); - BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data())); - } - } -} - -BOOST_FIXTURE_TEST_CASE(test_session_no_prune, state_history_test_fixture) { - try { - test_session_no_prune_impl(*this, false); - } - FC_LOG_AND_RETHROW() -} - -BOOST_FIXTURE_TEST_CASE(test_session_no_prune_fetch_finality_data, state_history_test_fixture) { - try { - test_session_no_prune_impl(*this, true); - } - FC_LOG_AND_RETHROW() -} - -void test_split_log_impl(state_history_test_fixture& fixture, bool fetch_finality_data) { - // setup block head for the server - constexpr uint32_t head = 1023; - eosio::state_history::partition_config conf; - conf.stride = 25; - fixture.server.setup_state_history_log(fetch_finality_data, conf); - uint32_t head_block_num = head; - fixture.server.block_head = {head_block_num, fixture.server.block_id_for(head_block_num)}; - - // generate the log data used for traces, deltas and finality_data - uint32_t n = mock_state_history_plugin::default_frame_size; - fixture.add_to_log(1, n * sizeof(uint32_t), generate_data(n), fetch_finality_data); // original data format - fixture.add_to_log(2, 0, generate_data(n), fetch_finality_data); // format to accommodate the compressed size greater than 4GB - fixture.add_to_log(3, 1, generate_data(n), fetch_finality_data); // format to encode decompressed size to avoid decompress entire data upfront. - for (size_t i = 4; i <= head; ++i) { - fixture.add_to_log(i, 1, generate_data(n), fetch_finality_data); - } - - send_request(fixture, fetch_finality_data, 1, {}); - - eosio::state_history::state_result result; - // we should get 1023 consecutive block result - eosio::chain::block_id_type prev_id; - for (uint32_t i = 0; i < head; ++i) { - fixture.receive_result(result); - - if( fetch_finality_data ) { - BOOST_REQUIRE(std::holds_alternative(result)); - auto r = std::get(result); - BOOST_REQUIRE_EQUAL(r.head.block_num, fixture.server.block_head.block_num); - if (i > 0) { - BOOST_TEST(prev_id.str() == r.prev_block->block_id.str()); - } - prev_id = r.this_block->block_id; - BOOST_REQUIRE(r.traces.has_value()); - BOOST_REQUIRE(r.deltas.has_value()); - auto traces = r.traces.value(); - auto deltas = r.deltas.value(); - auto& data = fixture.written_data[i]; - auto data_size = data.size() * sizeof(int32_t); - BOOST_REQUIRE_EQUAL(traces.size(), data_size); - BOOST_REQUIRE_EQUAL(deltas.size(), data_size); - - BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data())); - BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data())); - - auto finality_data = r.finality_data.value(); - BOOST_REQUIRE(r.finality_data.has_value()); - BOOST_REQUIRE_EQUAL(finality_data.size(), data_size); - BOOST_REQUIRE(std::equal(finality_data.begin(), finality_data.end(), (const char*)data.data())); - } else { - BOOST_REQUIRE(std::holds_alternative(result)); - auto r = std::get(result); - BOOST_REQUIRE_EQUAL(r.head.block_num, fixture.server.block_head.block_num); - if (i > 0) { - BOOST_TEST(prev_id.str() == r.prev_block->block_id.str()); - } - prev_id = r.this_block->block_id; - BOOST_REQUIRE(r.traces.has_value()); - BOOST_REQUIRE(r.deltas.has_value()); - auto traces = r.traces.value(); - auto deltas = r.deltas.value(); - auto& data = fixture.written_data[i]; - auto data_size = data.size() * sizeof(int32_t); - BOOST_REQUIRE_EQUAL(traces.size(), data_size); - BOOST_REQUIRE_EQUAL(deltas.size(), data_size); - - BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data())); - BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data())); - } - } -} - -BOOST_FIXTURE_TEST_CASE(test_split_log, state_history_test_fixture) { - try { - test_split_log_impl(*this, false); - } - FC_LOG_AND_RETHROW() -} - -BOOST_FIXTURE_TEST_CASE(test_split_log_fetch_finality_data, state_history_test_fixture) { - try { - test_split_log_impl(*this, true); - } - FC_LOG_AND_RETHROW() -} - -void test_session_with_prune_impl(state_history_test_fixture& fixture, bool fetch_finality_data) { - // setup block head for the server - fixture.server.setup_state_history_log(fetch_finality_data, - eosio::state_history::prune_config{.prune_blocks = 2, .prune_threshold = 4 * 1024}); - - uint32_t head_block_num = 3; - fixture.server.block_head = {head_block_num, fixture.server.block_id_for(head_block_num)}; - - // generate the log data used for traces, deltas and finality_data - uint32_t n = mock_state_history_plugin::default_frame_size; - fixture.add_to_log(1, n * sizeof(uint32_t), generate_data(n), fetch_finality_data); // original data format - fixture.add_to_log(2, 0, generate_data(n), fetch_finality_data); // format to accommodate the compressed size greater than 4GB - fixture.add_to_log(3, 1, generate_data(n), fetch_finality_data); // format to encode decompressed size to avoid decompress entire data upfront. - - // send a get_status_request and verify the result is what we expected - fixture.verify_status(eosio::state_history::get_status_result_v0{ - .head = {head_block_num, fixture.server.block_id_for(head_block_num)}, - .last_irreversible = {head_block_num, fixture.server.block_id_for(head_block_num)}, - .trace_begin_block = 2, - .trace_end_block = head_block_num + 1, - .chain_state_begin_block = 2, - .chain_state_end_block = head_block_num + 1}); - - // send a get_blocks_request to fixture.server - send_request(fixture, fetch_finality_data, 1, {}); - - eosio::state_history::state_result result; - // we should get 3 consecutive block result - - fixture.receive_result(result); - if( fetch_finality_data ) { - BOOST_REQUIRE(std::holds_alternative(result)); - auto r = std::get(result); - BOOST_REQUIRE_EQUAL(r.head.block_num, fixture.server.block_head.block_num); - BOOST_REQUIRE(!r.traces.has_value()); - BOOST_REQUIRE(!r.deltas.has_value()); - BOOST_REQUIRE(!r.finality_data.has_value()); - } else { - BOOST_REQUIRE(std::holds_alternative(result)); - auto r = std::get(result); - BOOST_REQUIRE_EQUAL(r.head.block_num, fixture.server.block_head.block_num); - BOOST_REQUIRE(!r.traces.has_value()); - BOOST_REQUIRE(!r.deltas.has_value()); - } - - for (int i = 1; i < 3; ++i) { - fixture.receive_result(result); - - if( fetch_finality_data ) { - BOOST_REQUIRE(std::holds_alternative(result)); - auto r = std::get(result); - BOOST_REQUIRE_EQUAL(r.head.block_num, fixture.server.block_head.block_num); - BOOST_REQUIRE(r.traces.has_value()); - BOOST_REQUIRE(r.deltas.has_value()); - auto traces = r.traces.value(); - auto deltas = r.deltas.value(); - auto& data = fixture.written_data[i]; - auto data_size = data.size() * sizeof(int32_t); - BOOST_REQUIRE_EQUAL(traces.size(), data_size); - BOOST_REQUIRE_EQUAL(deltas.size(), data_size); - - BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data())); - BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data())); - - BOOST_REQUIRE(r.finality_data.has_value()); - auto finality_data = r.finality_data.value(); - BOOST_REQUIRE_EQUAL(finality_data.size(), data_size); - BOOST_REQUIRE(std::equal(finality_data.begin(), finality_data.end(), (const char*)data.data())); - } else { - BOOST_REQUIRE(std::holds_alternative(result)); - auto r = std::get(result); - BOOST_REQUIRE_EQUAL(r.head.block_num, fixture.server.block_head.block_num); - BOOST_REQUIRE(r.traces.has_value()); - BOOST_REQUIRE(r.deltas.has_value()); - auto traces = r.traces.value(); - auto deltas = r.deltas.value(); - auto& data = fixture.written_data[i]; - auto data_size = data.size() * sizeof(int32_t); - BOOST_REQUIRE_EQUAL(traces.size(), data_size); - BOOST_REQUIRE_EQUAL(deltas.size(), data_size); - - BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data())); - BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data())); - } - } -} - -BOOST_FIXTURE_TEST_CASE(test_session_with_prune, state_history_test_fixture) { - try { - test_session_with_prune_impl(*this, false); - } - FC_LOG_AND_RETHROW() -} - -BOOST_FIXTURE_TEST_CASE(test_session_with_prune_fetch_finality_data, state_history_test_fixture) { - try { - test_session_with_prune_impl(*this, true); - } - FC_LOG_AND_RETHROW() -} - -void test_session_fork_impl(state_history_test_fixture& fixture, bool fetch_finality_data) { - fixture.server.setup_state_history_log(fetch_finality_data); - uint32_t head_block_num = 4; - fixture.server.block_head = {head_block_num, fixture.server.block_id_for(head_block_num)}; - - // generate the log data used for traces, deltas and finality_data - uint32_t n = mock_state_history_plugin::default_frame_size; - fixture.add_to_log(1, n * sizeof(uint32_t), generate_data(n), fetch_finality_data); // original data format - fixture.add_to_log(2, 0, generate_data(n), fetch_finality_data); // format to accommodate the compressed size greater than 4GB - fixture.add_to_log(3, 1, generate_data(n), fetch_finality_data); // format to encode decompressed size to avoid decompress entire data upfront. - fixture.add_to_log(4, 1, generate_data(n), fetch_finality_data); // format to encode decompressed size to avoid decompress entire data upfront. - - // send a get_status_request and verify the result is what we expected - fixture.verify_status(eosio::state_history::get_status_result_v0{ - .head = {head_block_num, fixture.server.block_id_for(head_block_num)}, - .last_irreversible = {head_block_num, fixture.server.block_id_for(head_block_num)}, - .trace_begin_block = 1, - .trace_end_block = head_block_num + 1, - .chain_state_begin_block = 1, - .chain_state_end_block = head_block_num + 1}); - - // send a get_blocks_request to fixture.server - send_request(fixture, fetch_finality_data, 1, {}); - - std::vector have_positions; - eosio::state_history::state_result result; - // we should get 4 consecutive block result - for (uint32_t i = 0; i < 4; ++i) { - fixture.receive_result(result); - - if( fetch_finality_data ) { - BOOST_REQUIRE(std::holds_alternative(result)); - auto r = std::get(result); - BOOST_REQUIRE_EQUAL(r.head.block_num, fixture.server.block_head.block_num); - BOOST_REQUIRE(r.traces.has_value()); - BOOST_REQUIRE(r.deltas.has_value()); - auto traces = r.traces.value(); - auto deltas = r.deltas.value(); - auto& data = fixture.written_data[i]; - auto data_size = data.size() * sizeof(int32_t); - BOOST_REQUIRE_EQUAL(traces.size(), data_size); - BOOST_REQUIRE_EQUAL(deltas.size(), data_size); - BOOST_REQUIRE(r.this_block.has_value()); - BOOST_REQUIRE_EQUAL(r.this_block->block_num, i+1); - have_positions.push_back(*r.this_block); - - BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data())); - BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data())); - - BOOST_REQUIRE(r.finality_data.has_value()); - auto finality_data = r.finality_data.value(); - BOOST_REQUIRE_EQUAL(finality_data.size(), data_size); - BOOST_REQUIRE(std::equal(finality_data.begin(), finality_data.end(), (const char*)data.data())); - } else { - BOOST_REQUIRE(std::holds_alternative(result)); - auto r = std::get(result); - BOOST_REQUIRE_EQUAL(r.head.block_num, fixture.server.block_head.block_num); - BOOST_REQUIRE(r.traces.has_value()); - BOOST_REQUIRE(r.deltas.has_value()); - auto traces = r.traces.value(); - auto deltas = r.deltas.value(); - auto& data = fixture.written_data[i]; - auto data_size = data.size() * sizeof(int32_t); - BOOST_REQUIRE_EQUAL(traces.size(), data_size); - BOOST_REQUIRE_EQUAL(deltas.size(), data_size); - BOOST_REQUIRE(r.this_block.has_value()); - BOOST_REQUIRE_EQUAL(r.this_block->block_num, i+1); - have_positions.push_back(*r.this_block); - - BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data())); - BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data())); - } - } - - // generate a fork that includes blocks 3,4 and verify new data retrieved - // setup block head for the server - fixture.server.block_ids.extract(3); fixture.server.block_id_for(3, "fork"); - fixture.server.block_ids.extract(4); fixture.server.block_id_for(4, "fork"); - fixture.server.block_head = {head_block_num, fixture.server.block_id_for(head_block_num)}; - fixture.add_to_log(3, 0, generate_data(n), fetch_finality_data); - fixture.add_to_log(4, 1, generate_data(n), fetch_finality_data); - - // send a get_status_request and verify the result is what we expected - fixture.verify_status(eosio::state_history::get_status_result_v0{ - .head = {head_block_num, fixture.server.block_id_for(head_block_num)}, - .last_irreversible = {head_block_num, fixture.server.block_id_for(head_block_num)}, - .trace_begin_block = 1, - .trace_end_block = head_block_num + 1, - .chain_state_begin_block = 1, - .chain_state_end_block = head_block_num + 1}); - - // send a get_blocks_request to fixture.server starting at 5, will send 3,4 because of fork - send_request(fixture, fetch_finality_data, 5, std::move(have_positions)); - - eosio::state_history::state_result fork_result; - // we should now get data for fork 3,4 - for (uint32_t i = 2; i < 4; ++i) { - fixture.receive_result(fork_result); - - if( fetch_finality_data ) { - BOOST_REQUIRE(std::holds_alternative(fork_result)); - auto r = std::get(fork_result); - BOOST_REQUIRE_EQUAL(r.head.block_num, fixture.server.block_head.block_num); - BOOST_REQUIRE(r.this_block.has_value()); - BOOST_REQUIRE_EQUAL(r.this_block->block_num, i+1); - BOOST_REQUIRE(r.traces.has_value()); - BOOST_REQUIRE(r.deltas.has_value()); - auto traces = r.traces.value(); - auto deltas = r.deltas.value(); - auto& data = fixture.written_data[i]; - auto data_size = data.size() * sizeof(int32_t); - BOOST_REQUIRE_EQUAL(traces.size(), data_size); - BOOST_REQUIRE_EQUAL(deltas.size(), data_size); - - BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data())); - BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data())); - - BOOST_REQUIRE(r.finality_data.has_value()); - auto finality_data = r.finality_data.value(); - BOOST_REQUIRE_EQUAL(finality_data.size(), data_size); - BOOST_REQUIRE(std::equal(finality_data.begin(), finality_data.end(), (const char*)data.data())); - } else { - BOOST_REQUIRE(std::holds_alternative(fork_result)); - auto r = std::get(fork_result); - BOOST_REQUIRE_EQUAL(r.head.block_num, fixture.server.block_head.block_num); - BOOST_REQUIRE(r.this_block.has_value()); - BOOST_REQUIRE_EQUAL(r.this_block->block_num, i+1); - BOOST_REQUIRE(r.traces.has_value()); - BOOST_REQUIRE(r.deltas.has_value()); - auto traces = r.traces.value(); - auto deltas = r.deltas.value(); - auto& data = fixture.written_data[i]; - auto data_size = data.size() * sizeof(int32_t); - BOOST_REQUIRE_EQUAL(traces.size(), data_size); - BOOST_REQUIRE_EQUAL(deltas.size(), data_size); - - BOOST_REQUIRE(std::equal(traces.begin(), traces.end(), (const char*)data.data())); - BOOST_REQUIRE(std::equal(deltas.begin(), deltas.end(), (const char*)data.data())); - } - } -} - -BOOST_FIXTURE_TEST_CASE(test_session_fork, state_history_test_fixture) { - try { - test_session_fork_impl(*this, false); - } - FC_LOG_AND_RETHROW() -} - -BOOST_FIXTURE_TEST_CASE(test_session_fork_fetch_finality_data, state_history_test_fixture) { - try { - test_session_fork_impl(*this, true); - } - FC_LOG_AND_RETHROW() -} diff --git a/tests/ship_client.cpp b/tests/ship_client.cpp index 2da5284b3d..e82188d078 100644 --- a/tests/ship_client.cpp +++ b/tests/ship_client.cpp @@ -77,6 +77,7 @@ int main(int argc, char* argv[]) { eosio::from_json(token_stream); eosio::convert(abidef, abi); } + stream.binary(true); std::cerr << "{\n \"status\": \"set_abi\",\n \"time\": " << time(NULL) << "\n},\n"; @@ -87,12 +88,20 @@ int main(int argc, char* argv[]) { uint32_t first_block_num = 0; uint32_t last_block_num = 0; + struct { + std::string get_status_request; + std::string get_status_result; + } request_result_types[] = { + {"get_status_request_v0", "get_status_result_v0"}, + {"get_status_request_v1", "get_status_result_v1"}, + }; + while(num_requests--) { rapidjson::StringBuffer request_sb; rapidjson::PrettyWriter request_writer(request_sb); request_writer.StartArray(); - request_writer.String("get_status_request_v0"); + request_writer.String(request_result_types[num_requests%2].get_status_request.c_str()); request_writer.StartObject(); request_writer.EndObject(); request_writer.EndArray(); @@ -109,7 +118,7 @@ int main(int argc, char* argv[]) { eosio::check(!result_document.HasParseError(), "Failed to parse result JSON from abieos"); eosio::check(result_document.IsArray(), "result should have been an array (variant) but it's not"); eosio::check(result_document.Size() == 2, "result was an array but did not contain 2 items like a variant should"); - eosio::check(std::string(result_document[0].GetString()) == "get_status_result_v0", "result type doesn't look like get_status_result_v0"); + eosio::check(std::string(result_document[0].GetString()) == request_result_types[num_requests%2].get_status_result, "result type doesn't look like expected get_status_result_vX"); eosio::check(result_document[1].IsObject(), "second item in result array is not an object"); eosio::check(result_document[1].HasMember("head"), "cannot find 'head' in result"); eosio::check(result_document[1]["head"].IsObject(), "'head' is not an object");