diff --git a/libraries/state_history/create_deltas.cpp b/libraries/state_history/create_deltas.cpp index 4344ade76d..47fd55af41 100644 --- a/libraries/state_history/create_deltas.cpp +++ b/libraries/state_history/create_deltas.cpp @@ -48,6 +48,8 @@ bool include_delta(const chain::account_metadata_object& old, const chain::accou } bool include_delta(const chain::code_object& old, const chain::code_object& curr) { // + // code_object data that is exported by SHiP is never modified they are only deleted or created, + // see serialization of history_serial_wrapper return false; } diff --git a/libraries/state_history/include/eosio/state_history/log.hpp b/libraries/state_history/include/eosio/state_history/log.hpp index 8f7ac99e9c..2b5c3a4dfb 100644 --- a/libraries/state_history/include/eosio/state_history/log.hpp +++ b/libraries/state_history/include/eosio/state_history/log.hpp @@ -4,11 +4,15 @@ #include #include +#include +#include + #include #include #include #include #include +#include //set_thread_name namespace eosio { @@ -39,7 +43,7 @@ inline uint64_t ship_magic(uint16_t version, uint16_t features = 0) { using namespace eosio::chain::literals; return "ship"_n.to_uint64_t() | version | features<<16; } -inline bool is_ship(uint64_t magic) { +inline bool is_ship(uint64_t magic) { using namespace eosio::chain::literals; return (magic & 0xffff'ffff'0000'0000) == "ship"_n.to_uint64_t(); } @@ -78,6 +82,15 @@ class state_history_log { uint32_t _end_block = 0; chain::block_id_type last_block_id; + std::thread thr; + std::atomic write_thread_has_exception = false; + std::exception_ptr eptr; + boost::asio::io_context ctx; + boost::asio::io_context::strand work_strand{ctx}; + boost::asio::executor_work_guard work_guard = + boost::asio::make_work_guard(ctx); + std::recursive_mutex mx; + public: state_history_log(const char* const name, std::string log_filename, std::string index_filename, std::optional prune_conf = std::optional()) @@ -87,7 +100,7 @@ class state_history_log { , prune_config(prune_conf) { open_log(); open_index(); - + if(prune_config) { EOS_ASSERT(prune_config->prune_blocks, chain::plugin_exception, "state history log prune configuration requires at least one block"); EOS_ASSERT(__builtin_popcount(prune_config->prune_threshold) == 1, chain::plugin_exception, "state history prune threshold must be power of 2"); @@ -119,9 +132,37 @@ class state_history_log { vacuum(); } } + + thr = std::thread([this] { + try { + fc::set_os_thread_name(this->name); + this->ctx.run(); + } catch (...) { + elog("catched exception from ${name} write thread", ("name", this->name)); + eptr = std::current_exception(); + write_thread_has_exception = true; + } + elog("${name} thread ended", ("name", this->name)); + }); + } + + void stop() { + if (thr.joinable()) { + work_guard.reset(); + thr.join(); + } } ~state_history_log() { + // complete execution before possible vacuuming + if (thr.joinable()) { + try { + work_guard.reset(); + thr.join(); + } + catch (const boost::thread_interrupted&) {/* suppressed */} + } + //nothing to do if log is empty or we aren't pruning if(_begin_block == _end_block) return; @@ -158,6 +199,12 @@ class state_history_log { template void write_entry(state_history_log_header header, const chain::block_id_type& prev_id, F write_payload) { + if (write_thread_has_exception) { + std::rethrow_exception(eptr); + } + + std::unique_lock lock(mx); + auto block_num = chain::block_header::num_from_id(header.block_id); EOS_ASSERT(_begin_block == _end_block || block_num <= _end_block, chain::plugin_exception, "missed a block in ${name}.log", ("name", name)); @@ -186,8 +233,10 @@ class state_history_log { header.magic = ship_magic(get_ship_version(header.magic), ship_feature_pruned_log); uint64_t pos = log.tellp(); + write_header(header); write_payload(log); + EOS_ASSERT(log.tellp() == pos + state_history_log_header_serial_size + header.payload_size, chain::plugin_exception, "wrote payload with incorrect size to ${name}.log", ("name", name)); fc::raw::pack(log, pos); @@ -211,12 +260,14 @@ class state_history_log { fc::cfile& get_entry(uint32_t block_num, state_history_log_header& header) { EOS_ASSERT(block_num >= _begin_block && block_num < _end_block, chain::plugin_exception, "read non-existing block in ${name}.log", ("name", name)); + std::lock_guard lock(mx); log.seek(get_pos(block_num)); read_header(header); return log; } chain::block_id_type get_block_id(uint32_t block_num) { + std::lock_guard lock(mx); state_history_log_header header; get_entry(block_num, header); return header.block_id; @@ -224,6 +275,7 @@ class state_history_log { private: //file position must be at start of last block's suffix (back pointer) + //called from open_log / ctor bool get_last_block() { state_history_log_header header; uint64_t suffix; @@ -305,6 +357,7 @@ class state_history_log { EOS_ASSERT(get_last_block(), chain::plugin_exception, "recover ${name}.log failed", ("name", name)); } + // only called from constructor void open_log() { log.set_file_path(log_filename); log.open("a+b"); @@ -351,6 +404,7 @@ class state_history_log { } } + // only called from constructor void open_index() { index.set_file_path(index_filename); index.open("a+b"); diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index e760720586..e1113ed1b5 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -1,7 +1,7 @@ #include +#include #include #include -#include #include #include #include @@ -10,16 +10,38 @@ #include #include #include +#include #include #include #include #include using tcp = boost::asio::ip::tcp; +using unixs = boost::asio::local::stream_protocol; namespace ws = boost::beast::websocket; extern const char* const state_history_plugin_abi; +/* Prior to boost 1.70, if socket type is not boost::asio::ip::tcp::socket nor boost::asio::ssl::stream beast requires + an overload of async_teardown. This has been improved in 1.70+ to support any basic_stream_socket<> out of the box + which includes unix sockets. */ +#if BOOST_VERSION < 107000 +namespace boost::beast::websocket { +template +void async_teardown(role_type, unixs::socket& sock, TeardownHandler&& handler) { + boost::system::error_code ec; + sock.close(ec); + boost::asio::post(boost::asio::get_associated_executor(handler, sock.get_executor()), [h=std::move(handler),ec]() mutable { + h(ec); + }); +} +} +#endif + +// overload pattern for variant visitation +template struct overload : Ts... { using Ts::operator()...; }; +template overload(Ts...) -> overload; + namespace eosio { using namespace chain; using namespace state_history; @@ -48,15 +70,24 @@ struct state_history_plugin_impl : std::enable_shared_from_this trace_log; std::optional chain_state_log; bool trace_debug_mode = false; - bool stopping = false; + std::atomic stopping = false; std::optional applied_transaction_connection; std::optional block_start_connection; std::optional accepted_block_connection; - string endpoint_address = "0.0.0.0"; - uint16_t endpoint_port = 8080; - std::unique_ptr acceptor; + string endpoint_address; + uint16_t endpoint_port = 8080; + string unix_path; state_history::trace_converter trace_converter; + using acceptor_type = std::variant, std::unique_ptr>; + std::set acceptor; + + std::thread thr; + boost::asio::io_context ctx; + boost::asio::io_context::strand work_strand{ctx}; + boost::asio::executor_work_guard work_guard = + boost::asio::make_work_guard(ctx); + void get_log_entry(state_history_log& log, uint32_t block_num, std::optional& result) { if (block_num < log.begin_block() || block_num >= log.end_block()) return; @@ -100,71 +131,73 @@ struct state_history_plugin_impl : std::enable_shared_from_this { - std::shared_ptr plugin; - std::unique_ptr> socket_stream; + struct session_base { + virtual void send_update(const block_state_ptr& block_state) = 0; + virtual void close() = 0; + virtual ~session_base() = default; + std::optional current_request; + }; + + + template + struct session : session_base, std::enable_shared_from_this> { + std::shared_ptr plugin; + ws::stream socket_stream; bool sending = false; bool sent_abi = false; std::vector> send_queue; - std::optional current_request; bool need_to_send_update = false; - session(std::shared_ptr plugin) - : plugin(std::move(plugin)) {} + session(std::shared_ptr plugin, SocketType socket) + : plugin(std::move(plugin)), socket_stream(std::move(socket)) {} - void start(tcp::socket socket) { + void start() { fc_ilog(_log, "incoming connection"); - socket_stream = std::make_unique>(std::move(socket)); - socket_stream->binary(true); - socket_stream->next_layer().set_option(boost::asio::ip::tcp::no_delay(true)); - socket_stream->next_layer().set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024)); - socket_stream->next_layer().set_option(boost::asio::socket_base::receive_buffer_size(1024 * 1024)); - socket_stream->async_accept([self = shared_from_this()](boost::system::error_code ec) { + socket_stream.auto_fragment(false); + socket_stream.binary(true); + if constexpr (std::is_same_v) { + socket_stream.next_layer().set_option(boost::asio::ip::tcp::no_delay(true)); + } + socket_stream.next_layer().set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024)); + socket_stream.next_layer().set_option(boost::asio::socket_base::receive_buffer_size(1024 * 1024)); + + socket_stream.async_accept([self = this->shared_from_this()](boost::system::error_code ec) { self->callback(ec, "async_accept", [self] { self->start_read(); self->send(state_history_plugin_abi); }); }); } + void start_read() { auto in_buffer = std::make_shared(); - socket_stream->async_read( - *in_buffer, [self = shared_from_this(), in_buffer](boost::system::error_code ec, size_t) { + socket_stream.async_read( + *in_buffer, [self = this->shared_from_this(), in_buffer](boost::system::error_code ec, size_t) { self->callback(ec, "async_read", [self, in_buffer] { auto d = boost::asio::buffer_cast(boost::beast::buffers_front(in_buffer->data())); auto s = boost::asio::buffer_size(in_buffer->data()); fc::datastream ds(d, s); state_request req; fc::raw::unpack(ds, req); - std::visit(*self, req); + app().post(priority::medium, [self, req = std::move(req)]() mutable { std::visit(*self, req); }); self->start_read(); }); }); } - void send(const char* s) { - send_queue.push_back({s, s + strlen(s)}); - send(); - } - - template - void send(T obj) { - send_queue.push_back(fc::raw::pack(state_result{std::move(obj)})); - send(); - } - + void send() { if (sending) return; if (send_queue.empty()) return send_update(); sending = true; - socket_stream->binary(sent_abi); + socket_stream.binary(sent_abi); sent_abi = true; - socket_stream->async_write( // + socket_stream.async_write( // boost::asio::buffer(send_queue[0]), // - [self = shared_from_this()](boost::system::error_code ec, size_t) { + [self = this->shared_from_this()](boost::system::error_code ec, size_t) { self->callback(ec, "async_write", [self] { self->send_queue.erase(self->send_queue.begin()); self->sending = false; @@ -173,6 +206,21 @@ struct state_history_plugin_impl : std::enable_shared_from_thisplugin->work_strand, [self = this->shared_from_this(), str = s ]() { + self->send_queue.push_back({str, str + strlen(str)}); + self->send(); + }); + } + + template + void send(T obj) { + boost::asio::post(this->plugin->work_strand, [self = this->shared_from_this(), obj = std::move(obj) ]() { + self->send_queue.emplace_back(fc::raw::pack(state_result{std::move(obj)})); + self->send(); + }); + } + using result_type = void; void operator()(get_status_request_v0&) { fc_ilog(_log, "got get_status_request_v0"); @@ -228,13 +276,16 @@ struct state_history_plugin_impl : std::enable_shared_from_thismax_messages_in_flight) return; + auto& chain = plugin->chain_plug->chain(); result.last_irreversible = {chain.last_irreversible_block_num(), chain.last_irreversible_block_id()}; uint32_t current = current_request->irreversible_only ? result.last_irreversible.block_num : result.head.block_num; + if (current_request->start_block_num <= current && current_request->start_block_num < current_request->end_block_num) { auto block_id = plugin->get_block_id(current_request->start_block_num); + if (block_id) { result.this_block = block_position{current_request->start_block_num, *block_id}; auto prev_block_id = plugin->get_block_id(current_request->start_block_num - 1); @@ -330,9 +381,9 @@ struct state_history_plugin_impl : std::enable_shared_from_thisnext_layer().close(); - plugin->sessions.erase(this); + boost::system::error_code ec; + socket_stream.next_layer().close(ec); + if (ec) { + fc_elog(_log, "close: ${m}", ("m", ec.message())); + } + plugin->sessions.remove(this->shared_from_this()); } }; - std::map> sessions; + + class session_manager_t { + std::mutex mx; + boost::container::flat_set> session_set; + + public: + template + void add(std::shared_ptr plugin, std::shared_ptr socket) { + auto s = std::make_shared>(plugin, std::move(*socket)); + s->start(); + std::lock_guard lock(mx); + session_set.insert(std::move(s)); + } + + void remove(std::shared_ptr s) { + std::lock_guard lock(mx); + session_set.erase(s); + } + + template + void for_each(F&& f) { + std::lock_guard lock(mx); + for (auto& s : session_set) { + f(s); + } + } + } sessions; void listen() { boost::system::error_code ec; - auto address = boost::asio::ip::make_address(endpoint_address); - auto endpoint = tcp::endpoint{address, endpoint_port}; - acceptor = std::make_unique(app().get_io_service()); - auto check_ec = [&](const char* what) { if (!ec) return; - fc_elog(_log,"${w}: ${m}", ("w", what)("m", ec.message())); + fc_elog(_log, "${w}: ${m}", ("w", what)("m", ec.message())); EOS_ASSERT(false, plugin_exception, "unable to open listen socket"); }; - acceptor->open(endpoint.protocol(), ec); - check_ec("open"); - acceptor->set_option(boost::asio::socket_base::reuse_address(true)); - acceptor->bind(endpoint, ec); - check_ec("bind"); - acceptor->listen(boost::asio::socket_base::max_listen_connections, ec); - check_ec("listen"); - do_accept(); + auto init_tcp_acceptor = [&]() { acceptor.insert(std::make_unique(app().get_io_service())); }; + auto init_unix_acceptor = [&]() { + // take a sniff and see if anything is already listening at the given socket path, or if the socket path exists + // but nothing is listening + { + boost::system::error_code test_ec; + unixs::socket test_socket(app().get_io_service()); + test_socket.connect(unix_path.c_str(), test_ec); + + // looks like a service is already running on that socket, don't touch it... fail out + if (test_ec == boost::system::errc::success) + ec = boost::system::errc::make_error_code(boost::system::errc::address_in_use); + // socket exists but no one home, go ahead and remove it and continue on + else if (test_ec == boost::system::errc::connection_refused) + ::unlink(unix_path.c_str()); + else if (test_ec != boost::system::errc::no_such_file_or_directory) + ec = test_ec; + } + check_ec("open"); + acceptor.insert(std::make_unique(this->ctx)); + }; + + // create and configure acceptors, can be both + if (endpoint_address.size()) init_tcp_acceptor(); + if (unix_path.size()) init_unix_acceptor(); + + // start it + std::for_each(acceptor.begin(), acceptor.end(), [&](const acceptor_type& acc) { + std::visit(overload{[&](const std::unique_ptr& tcp_acc) { + auto address = boost::asio::ip::make_address(endpoint_address); + auto endpoint = tcp::endpoint{address, endpoint_port}; + tcp_acc->open(endpoint.protocol(), ec); + check_ec("open"); + tcp_acc->set_option(boost::asio::socket_base::reuse_address(true)); + tcp_acc->bind(endpoint, ec); + check_ec("bind"); + tcp_acc->listen(boost::asio::socket_base::max_listen_connections, ec); + check_ec("listen"); + do_accept(*tcp_acc); + }, + [&](const std::unique_ptr& unx_acc) { + unx_acc->open(unixs::acceptor::protocol_type(), ec); + check_ec("open"); + unx_acc->bind(unix_path.c_str(), ec); + check_ec("bind"); + unx_acc->listen(boost::asio::socket_base::max_listen_connections, ec); + check_ec("listen"); + do_accept(*unx_acc); + }}, + acc); + }); } - void do_accept() { - auto socket = std::make_shared(app().get_io_service()); - acceptor->async_accept(*socket, [self = shared_from_this(), socket, this](const boost::system::error_code& ec) { + template + void do_accept(Acceptor& acceptor) { + auto socket = std::make_shared(this->ctx); + acceptor.async_accept(*socket, [self = shared_from_this(), this, socket, &acceptor](const boost::system::error_code& ec) { if (stopping) return; if (ec) { if (ec == boost::system::errc::too_many_files_open) - catch_and_log([&] { do_accept(); }); + catch_and_log([&] { do_accept(acceptor); }); return; } catch_and_log([&] { - auto s = std::make_shared(self); - sessions[s.get()] = s; - s->start(std::move(*socket)); + sessions.add(self, socket); }); - catch_and_log([&] { do_accept(); }); + catch_and_log([&] { do_accept(acceptor); }); }); } @@ -400,7 +519,7 @@ struct state_history_plugin_impl : std::enable_shared_from_thiscurrent_request && block_state->block_num < p->current_request->start_block_num) p->current_request->start_block_num = block_state->block_num; p->send_update(block_state); } - } + }); } void on_block_start(uint32_t block_num) { clear_caches(); } @@ -452,7 +570,7 @@ struct state_history_plugin_impl : std::enable_shared_from_thisbegin_block() == chain_state_log->end_block(); if (fresh) - ilog("Placing initial state in block ${n}", ("n", block_state->block->block_num())); + fc_ilog(_log, "Placing initial state in block ${n}", ("n", block_state->block->block_num())); std::vector deltas = state_history::create_deltas(chain_plug->chain().db(), fresh); auto deltas_bin = state_history::zlib_compress_bytes(fc::raw::pack(deltas)); @@ -489,6 +607,8 @@ void state_history_plugin::set_program_options(options_description& cli, options options("state-history-endpoint", bpo::value()->default_value("127.0.0.1:8080"), "the endpoint upon which to listen for incoming connections. Caution: only expose this port to " "your internal network."); + options("state-history-unix-socket-path", bpo::value(), + "the path (relative to data-dir) to create a unix socket upon which to listen for incoming connections."); options("trace-history-debug-mode", bpo::bool_switch()->default_value(false), "enable debug mode for trace history"); if(cfile::supports_hole_punching()) @@ -503,8 +623,8 @@ void state_history_plugin::plugin_initialize(const variables_map& options) { my->chain_plug = app().find_plugin(); EOS_ASSERT(my->chain_plug, chain::missing_chain_plugin_exception, ""); auto& chain = my->chain_plug->chain(); - my->applied_transaction_connection.emplace( - chain.applied_transaction.connect([&](std::tuple t) { + my->applied_transaction_connection.emplace(chain.applied_transaction.connect( + [&](std::tuple t) { my->on_applied_transaction(std::get<0>(t), std::get<1>(t)); })); my->accepted_block_connection.emplace( @@ -521,12 +641,24 @@ void state_history_plugin::plugin_initialize(const variables_map& options) { if (auto resmon_plugin = app().find_plugin()) resmon_plugin->monitor_directory(state_history_dir); - auto ip_port = options.at("state-history-endpoint").as(); - auto port = ip_port.substr(ip_port.find(':') + 1, ip_port.size()); - auto host = ip_port.substr(0, ip_port.find(':')); - my->endpoint_address = host; - my->endpoint_port = std::stoi(port); - idump((ip_port)(host)(port)); + auto ip_port = options.at("state-history-endpoint").as(); + + if (ip_port.size()) { + auto port = ip_port.substr(ip_port.find(':') + 1, ip_port.size()); + auto host = ip_port.substr(0, ip_port.find(':')); + my->endpoint_address = host; + my->endpoint_port = std::stoi(port); + + fc_dlog(_log, "PLUGIN_INITIALIZE ${ip_port} ${host} ${port}", + ("ip_port", ip_port)("host", host)("port", port)); + } + + if (options.count("state-history-unix-socket-path")) { + boost::filesystem::path sock_path = options.at("state-history-unix-socket-path").as(); + if (sock_path.is_relative()) + sock_path = app().data_dir() / sock_path; + my->unix_path = sock_path.generic_string(); + } if (options.at("delete-state-history").as()) { fc_ilog(_log, "Deleting state history"); @@ -557,22 +689,32 @@ void state_history_plugin::plugin_initialize(const variables_map& options) { FC_LOG_AND_RETHROW() } // state_history_plugin::plugin_initialize -void state_history_plugin::plugin_startup() { +void state_history_plugin::plugin_startup() { handle_sighup(); // setup logging - my->listen(); + + try { + my->thr = std::thread([ptr = my.get()] { ptr->ctx.run(); }); + my->listen(); + } catch (std::exception& ex) { + appbase::app().quit(); + } } void state_history_plugin::plugin_shutdown() { my->applied_transaction_connection.reset(); my->accepted_block_connection.reset(); my->block_start_connection.reset(); - while (!my->sessions.empty()) - my->sessions.begin()->second->close(); + my->sessions.for_each([](auto& s) { s->close(); }); my->stopping = true; + my->trace_log->stop(); + my->chain_state_log->stop(); + if (my->thr.joinable()) { + my->work_guard.reset(); + my->ctx.stop(); + my->thr.join(); + } } -void state_history_plugin::handle_sighup() { - fc::logger::update( logger_name, _log ); -} +void state_history_plugin::handle_sighup() { fc::logger::update(logger_name, _log); } } // namespace eosio diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9dc8a542d1..f78245c9c0 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -100,6 +100,8 @@ target_link_libraries(ship_streamer abieos Boost::program_options Boost::system add_test(NAME ship_test COMMAND tests/ship_test.py -v --num-clients 1 --num-requests 5000 --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST ship_test PROPERTY LABELS nonparallelizable_tests) +add_test(NAME ship_test_unix COMMAND tests/ship_test.py -v --num-clients 1 --num-requests 5000 --clean-run --dump-error-detail --unix-socket WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) +set_property(TEST ship_test_unix PROPERTY LABELS nonparallelizable_tests) add_test(NAME ship_streamer_test COMMAND tests/ship_streamer_test.py -v --num-clients 1 --num-blocks 50 --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST ship_streamer_test PROPERTY LABELS nonparallelizable_tests) diff --git a/tests/ship_client.cpp b/tests/ship_client.cpp index fcb27bf9d6..ece4e2e657 100644 --- a/tests/ship_client.cpp +++ b/tests/ship_client.cpp @@ -1,26 +1,35 @@ #include -#include #include +#include #include -#include #include +#include +#include #include +#include #include #include #include #include +using tcp = boost::asio::ip::tcp; +using unixs = boost::asio::local::stream_protocol; +namespace ws = boost::beast::websocket; + namespace bpo = boost::program_options; int main(int argc, char* argv[]) { boost::asio::io_context ctx; boost::asio::ip::tcp::resolver resolver(ctx); - boost::beast::websocket::stream stream(ctx); + ws::stream tcp_stream(ctx); eosio::abi abi; + unixs::socket unix_socket(ctx); + ws::stream unix_stream(ctx); + bpo::options_description cli("ship_client command line options"); bool help = false; std::string socket_address = "127.0.0.1:8080"; @@ -39,106 +48,140 @@ int main(int argc, char* argv[]) { return 0; } - std::string::size_type colon = socket_address.find(':'); - eosio::check(colon != std::string::npos, "Missing ':' seperator in Websocket address and port"); - std::string statehistory_server = socket_address.substr(0, colon); - std::string statehistory_port = socket_address.substr(colon+1); + std::string statehistory_server, statehistory_port; + + // unix socket + if(boost::algorithm::starts_with(socket_address, "ws+unix://") || + boost::algorithm::starts_with(socket_address, "unix://")) { + statehistory_port = ""; + statehistory_server = socket_address.substr(socket_address.find("unix://") + strlen("unix://") + 1); + } else { + std::string::size_type colon = socket_address.find(':'); + eosio::check(colon != std::string::npos, "Missing ':' seperator in Websocket address and port"); + statehistory_server = socket_address.substr(0, colon); + statehistory_port = socket_address.substr(colon + 1); + } std::cerr << "[\n{\n \"status\": \"construct\",\n \"time\": " << time(NULL) << "\n},\n"; try { - boost::asio::connect(stream.next_layer(), resolver.resolve(statehistory_server, statehistory_port)); - stream.handshake(statehistory_server, "/"); - - { - boost::beast::flat_buffer abi_buffer; - stream.read(abi_buffer); - std::string abi_string((const char*)abi_buffer.data().data(), abi_buffer.data().size()); - eosio::json_token_stream token_stream(abi_string.data()); - eosio::abi_def abidef = eosio::from_json(token_stream); - eosio::convert(abidef, abi); - } - - std::cerr << "{\n \"status\": \"set_abi\",\n \"time\": " << time(NULL) << "\n},\n"; - - const eosio::abi_type& request_type = abi.abi_types.at("request"); - const eosio::abi_type& result_type = abi.abi_types.at("result"); - - bool is_first = true; - uint32_t first_block_num = 0; - uint32_t last_block_num = 0; - - while(num_requests--) { - rapidjson::StringBuffer request_sb; - rapidjson::PrettyWriter request_writer(request_sb); - - request_writer.StartArray(); - request_writer.String("get_status_request_v0"); - request_writer.StartObject(); - request_writer.EndObject(); - request_writer.EndArray(); - - stream.write(boost::asio::buffer(request_type.json_to_bin(request_sb.GetString(), [](){}))); - - boost::beast::flat_buffer buffer; - stream.read(buffer); - - eosio::input_stream is((const char*)buffer.data().data(), buffer.data().size()); - rapidjson::Document result_doucment; - result_doucment.Parse(result_type.bin_to_json(is).c_str()); - - eosio::check(!result_doucment.HasParseError(), "Failed to parse result JSON from abieos"); - eosio::check(result_doucment.IsArray(), "result should have been an array (variant) but it's not"); - eosio::check(result_doucment.Size() == 2, "result was an array but did not contain 2 items like a variant should"); - eosio::check(std::string(result_doucment[0].GetString()) == "get_status_result_v0", "result type doesn't look like get_status_result_v0"); - eosio::check(result_doucment[1].IsObject(), "second item in result array is not an object"); - eosio::check(result_doucment[1].HasMember("head"), "cannot find 'head' in result"); - eosio::check(result_doucment[1]["head"].IsObject(), "'head' is not an object"); - eosio::check(result_doucment[1]["head"].HasMember("block_num"), "'head' does not contain 'block_num'"); - eosio::check(result_doucment[1]["head"]["block_num"].IsUint(), "'head.block_num' isn't a number"); - - uint32_t this_block_num = result_doucment[1]["head"]["block_num"].GetUint(); - - if(is_first) { - std::cout << "[" << std::endl; - first_block_num = this_block_num; - is_first = false; + auto run = [&](SocketStream& stream) { + { + boost::beast::flat_buffer abi_buffer; + stream.read(abi_buffer); + std::string abi_string((const char*)abi_buffer.data().data(), + abi_buffer.data().size()); + eosio::json_token_stream token_stream(abi_string.data()); + eosio::abi_def abidef = + eosio::from_json(token_stream); + eosio::convert(abidef, abi); } - else { - std::cout << "," << std::endl; - } - std::cout << "{ \"get_status_result_v0\":" << std::endl; - rapidjson::StringBuffer result_sb; - rapidjson::PrettyWriter result_writer(result_sb); - result_doucment[1].Accept(result_writer); - std::cout << result_sb.GetString() << std::endl << "}" << std::endl; + std::cerr << "{\n \"status\": \"set_abi\",\n \"time\": " << time(NULL) << "\n},\n"; + + const eosio::abi_type& request_type = abi.abi_types.at("request"); + const eosio::abi_type& result_type = abi.abi_types.at("result"); + + bool is_first = true; + uint32_t first_block_num = 0; + uint32_t last_block_num = 0; + + while(num_requests--) { + rapidjson::StringBuffer request_sb; + rapidjson::PrettyWriter request_writer(request_sb); + + request_writer.StartArray(); + request_writer.String("get_status_request_v0"); + request_writer.StartObject(); + request_writer.EndObject(); + request_writer.EndArray(); + + stream.write(boost::asio::buffer(request_type.json_to_bin(request_sb.GetString(), [](){}))); + + boost::beast::flat_buffer buffer; + stream.read(buffer); + + eosio::input_stream is((const char*)buffer.data().data(), buffer.data().size()); + rapidjson::Document result_doucment; + result_doucment.Parse(result_type.bin_to_json(is).c_str()); + + eosio::check(!result_doucment.HasParseError(), "Failed to parse result JSON from abieos"); + eosio::check(result_doucment.IsArray(), "result should have been an array (variant) but it's not"); + eosio::check(result_doucment.Size() == 2, "result was an array but did not contain 2 items like a variant should"); + eosio::check(std::string(result_doucment[0].GetString()) == "get_status_result_v0", "result type doesn't look like get_status_result_v0"); + eosio::check(result_doucment[1].IsObject(), "second item in result array is not an object"); + eosio::check(result_doucment[1].HasMember("head"), "cannot find 'head' in result"); + eosio::check(result_doucment[1]["head"].IsObject(), "'head' is not an object"); + eosio::check(result_doucment[1]["head"].HasMember("block_num"), "'head' does not contain 'block_num'"); + eosio::check(result_doucment[1]["head"]["block_num"].IsUint(), "'head.block_num' isn't a number"); + + uint32_t this_block_num = result_doucment[1]["head"]["block_num"].GetUint(); + + if(is_first) { + std::cout << "[" << std::endl; + first_block_num = this_block_num; + is_first = false; + } + else { + std::cout << "," << std::endl; + } + std::cout << "{ \"get_status_result_v0\":" << std::endl; + + rapidjson::StringBuffer result_sb; + rapidjson::PrettyWriter result_writer(result_sb); + result_doucment[1].Accept(result_writer); + std::cout << result_sb.GetString() << std::endl << "}" << std::endl; + + last_block_num = this_block_num; + } - last_block_num = this_block_num; + std::cout << "]" << std::endl; + + rapidjson::StringBuffer done_sb; + rapidjson::PrettyWriter done_writer(done_sb); + + done_writer.StartObject(); + done_writer.Key("status"); + done_writer.String("done"); + done_writer.Key("time"); + done_writer.Uint(time(NULL)); + done_writer.Key("first_block_num"); + done_writer.Uint(first_block_num); + done_writer.Key("last_block_num"); + done_writer.Uint(last_block_num); + done_writer.EndObject(); + + std::cerr << done_sb.GetString() << std::endl << "]" << std::endl; + }; + + // unix socket + if(statehistory_port.empty()) { + boost::system::error_code ec; + auto check_ec = [&](const char* what) { + if(!ec) + return; + std::cerr << "{\n \"status\": socket error - " << ec.message() << ",\n \"time\": " << time(NULL) << "\n},\n"; + }; + + unix_stream.next_layer().connect(unixs::endpoint(statehistory_server), + ec); + if(ec == boost::system::errc::success) { + std::cerr << "{\n \"status\": \"successfully connected to unix socket\",\n \"time\": " << time(NULL) << "\n},\n"; + } else { + check_ec("connect"); + } + unix_stream.handshake("", "/"); + run(unix_stream); } - - std::cout << "]" << std::endl; - - rapidjson::StringBuffer done_sb; - rapidjson::PrettyWriter done_writer(done_sb); - - done_writer.StartObject(); - done_writer.Key("status"); - done_writer.String("done"); - done_writer.Key("time"); - done_writer.Uint(time(NULL)); - done_writer.Key("first_block_num"); - done_writer.Uint(first_block_num); - done_writer.Key("last_block_num"); - done_writer.Uint(last_block_num); - done_writer.EndObject(); - - std::cerr << done_sb.GetString() << std::endl << "]" << std::endl; - } - catch(std::exception& e) { + // tcp socket + else { + boost::asio::connect(tcp_stream.next_layer(), resolver.resolve(statehistory_server, statehistory_port)); + tcp_stream.handshake(statehistory_server, "/"); + run(tcp_stream); + } + } catch (std::exception& e) { std::cerr << "Caught exception: " << e.what() << std::endl; return 1; } - return 0; } diff --git a/tests/ship_test.py b/tests/ship_test.py index 7db527e9b5..fbeb4c855f 100755 --- a/tests/ship_test.py +++ b/tests/ship_test.py @@ -32,6 +32,7 @@ appArgs = AppArgs() extraArgs = appArgs.add(flag="--num-requests", type=int, help="How many requests that each ship_client requests", default=1) extraArgs = appArgs.add(flag="--num-clients", type=int, help="How many ship_clients should be started", default=1) +extraArgs = appArgs.add_bool(flag="--unix-socket", help="Run ship over unix socket") args = TestHelper.parse_args({"-p", "-n","--dump-error-details","--keep-logs","-v","--leave-running","--clean-run"}, applicationSpecificArgs=appArgs) Utils.Debug=args.v @@ -69,6 +70,9 @@ specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --disable-replay-opts --sync-fetch-span 200 --plugin eosio::net_api_plugin " traceNodeosArgs=" --plugin eosio::trace_api_plugin --trace-no-abis " + if args.unix_socket: + specificExtraNodeosArgs[shipNodeNum] += "--state-history-unix-socket-path ship.sock" + if cluster.launch(pnodes=totalProducerNodes, totalNodes=totalNodes, totalProducers=totalProducers, useBiosBootFile=False, specificExtraNodeosArgs=specificExtraNodeosArgs, extraNodeosArgs=traceNodeosArgs) is False: @@ -86,6 +90,8 @@ shipClient = "tests/ship_client" cmd = "%s --num-requests %d" % (shipClient, args.num_requests) + if args.unix_socket: + cmd += " -a ws+unix:///%s/%s" % (os.getcwd(), Utils.getNodeDataDir(shipNodeNum, "ship.sock")) if Utils.Debug: Utils.Print("cmd: %s" % (cmd)) clients = [] files = []