diff --git a/plugins/state_history_plugin/state_history_plugin.cpp b/plugins/state_history_plugin/state_history_plugin.cpp index f3067fd0c3..0b667631e0 100644 --- a/plugins/state_history_plugin/state_history_plugin.cpp +++ b/plugins/state_history_plugin/state_history_plugin.cpp @@ -71,10 +71,14 @@ struct state_history_plugin_impl : std::enable_shared_from_this& result) { + void get_block(uint32_t block_num, const block_state_ptr& block_state, std::optional& result) { chain::signed_block_ptr p; try { - p = chain_plug->chain().fetch_block_by_number(block_num); + if( block_state && block_num == block_state->block_num ) { + p = block_state->block; + } else { + p = chain_plug->chain().fetch_block_by_number( block_num ); + } } catch (...) { return; } @@ -88,11 +92,8 @@ struct state_history_plugin_impl : std::enable_shared_from_this= chain_state_log->begin_block() && block_num < chain_state_log->end_block()) return chain_state_log->get_block_id(block_num); try { - auto block = chain_plug->chain().fetch_block_by_number(block_num); - if (block) - return block->calculate_id(); - } catch (...) { - } + return chain_plug->chain().get_block_id_for_num(block_num); + } catch (...) {} return {}; } @@ -207,7 +208,7 @@ struct state_history_plugin_impl : std::enable_shared_from_thismax_messages_in_flight) return; @@ -223,8 +224,9 @@ struct state_history_plugin_impl : std::enable_shared_from_thisget_block_id(current_request->start_block_num - 1); if (prev_block_id) result.prev_block = block_position{current_request->start_block_num - 1, *prev_block_id}; - if (current_request->fetch_block) - plugin->get_block(current_request->start_block_num, result.block); + if (current_request->fetch_block) { + plugin->get_block( current_request->start_block_num, block_state, result.block ); + } if (current_request->fetch_traces && plugin->trace_log) plugin->get_log_entry(*plugin->trace_log, current_request->start_block_num, result.traces); if (current_request->fetch_deltas && plugin->chain_state_log) @@ -244,7 +246,7 @@ struct state_history_plugin_impl : std::enable_shared_from_thisblock_num, block_state->id}; - send_update(std::move(result)); + send_update(std::move(result), block_state); } void send_update(bool changed = false) { @@ -256,7 +258,7 @@ struct state_history_plugin_impl : std::enable_shared_from_thischain_plug->chain(); get_blocks_result_v0 result; result.head = {chain.head_block_num(), chain.head_block_id()}; - send_update(std::move(result)); + send_update(std::move(result), {}); } template diff --git a/scripts/pinned_build.sh b/scripts/pinned_build.sh index 0711466182..dc55ef1241 100755 --- a/scripts/pinned_build.sh +++ b/scripts/pinned_build.sh @@ -2,9 +2,18 @@ echo "Mandel Pinned Build" -if [[ $NAME != "Ubuntu" ]] - then - echo "Currently only supporting Ubuntu based builds. Proceed at your own risk." +if [[ "$(uname)" == "Linux" ]]; then + if [[ -e /etc/os-release ]]; then + # obtain NAME and other information + . /etc/os-release + if [[ ${NAME} != "Ubuntu" ]]; then + echo "Currently only supporting Ubuntu based builds. Proceed at your own risk." + fi + else + echo "Currently only supporting Ubuntu based builds. /etc/os-release not found. Your Linux distribution is not supported. Proceed at your own risk." + fi +else + echo "Currently only supporting Ubuntu based builds. Your architecture is not supported. Proceed at your own risk." fi if [ $# -eq 0 ] || [ -z "$1" ] @@ -25,6 +34,7 @@ LLVM_VER=7.1.0 LIBPQXX_VER=7.2.1 ARCH=`uname -m` SCRIPT_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]:-$0}"; )" &> /dev/null && pwd 2> /dev/null; )"; +START_DIR="$(pwd)" pushdir() { @@ -113,6 +123,9 @@ install_clang ${DEP_DIR}/clang-${CLANG_VER} install_llvm ${DEP_DIR}/llvm-${LLVM_VER} install_boost ${DEP_DIR}/boost_${BOOST_VER//\./_} +# go back to the directory where the script starts +popdir ${START_DIR} + pushdir ${MANDEL_DIR} # build Mandel diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f2aa542c51..56a38d7c0e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -2,6 +2,7 @@ include_directories( "${CMAKE_SOURCE_DIR}/plugins/wallet_plugin/include" ) file(GLOB UNIT_TESTS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.cpp") list(REMOVE_ITEM UNIT_TESTS ship_client.cpp) +list(REMOVE_ITEM UNIT_TESTS ship_streamer.cpp) add_executable( plugin_test ${UNIT_TESTS} ) target_link_libraries( plugin_test eosio_testing eosio_chain chainbase chain_plugin wallet_plugin fc state_history ${PLATFORM_SPECIFIC_LIBS} ) @@ -49,6 +50,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/full-version-label.sh ${CMAKE_CURRENT configure_file(${CMAKE_CURRENT_SOURCE_DIR}/nodeos_producer_watermark_test.py ${CMAKE_CURRENT_BINARY_DIR}/nodeos_producer_watermark_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cli_test.py ${CMAKE_CURRENT_BINARY_DIR}/cli_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_test.py COPYONLY) +configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_streamer_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_streamer_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/large-lib-test.py ${CMAKE_CURRENT_BINARY_DIR}/large-lib-test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_BINARY_DIR}/http_plugin_test.py COPYONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_high_latency_test.py ${CMAKE_CURRENT_BINARY_DIR}/p2p_high_latency_test.py COPYONLY) @@ -87,10 +89,15 @@ endif() find_package(Threads) add_executable(ship_client ship_client.cpp) target_link_libraries(ship_client abieos Boost::program_options Threads::Threads) +add_executable(ship_streamer ship_streamer.cpp) +target_link_libraries(ship_streamer abieos Boost::program_options Threads::Threads) add_test(NAME ship_test COMMAND tests/ship_test.py -v --num-clients 1 --num-requests 5000 --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST ship_test PROPERTY LABELS nonparallelizable_tests) +add_test(NAME ship_streamer_test COMMAND tests/ship_streamer_test.py -v --num-clients 1 --num-blocks 50 --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) +set_property(TEST ship_streamer_test PROPERTY LABELS nonparallelizable_tests) + add_test(NAME p2p_dawn515_test COMMAND tests/p2p_tests/dawn_515/test.sh WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST p2p_dawn515_test PROPERTY LABELS nonparallelizable_tests) diff --git a/tests/ship_streamer.cpp b/tests/ship_streamer.cpp new file mode 100644 index 0000000000..9132f136ac --- /dev/null +++ b/tests/ship_streamer.cpp @@ -0,0 +1,163 @@ +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include +#include + +namespace bpo = boost::program_options; + +int main(int argc, char* argv[]) { + boost::asio::io_context ctx; + boost::asio::ip::tcp::resolver resolver(ctx); + boost::beast::websocket::stream stream(ctx); + eosio::abi abi; + + bpo::options_description cli("ship_streamer command line options"); + bool help = false; + std::string socket_address = "127.0.0.1:8080"; + uint32_t start_block_num = 1; + uint32_t end_block_num = std::numeric_limits::max()-1; + bool irreversible_only = false; + bool fetch_block = false; + bool fetch_traces = false; + bool fetch_deltas = false; + + cli.add_options() + ("help,h", bpo::bool_switch(&help)->default_value(false), "Print this help message and exit.") + ("socket-address,a", bpo::value(&socket_address)->default_value(socket_address), "Websocket address and port.") + ("start-block-num", bpo::value(&start_block_num)->default_value(start_block_num), "Block to start streaming from") + ("end-block-num", bpo::value(&end_block_num)->default_value(end_block_num), "Block to stop streaming") + ("irreversible-only", bpo::bool_switch(&irreversible_only)->default_value(irreversible_only), "Irreversible blocks only") + ("fetch-block", bpo::bool_switch(&fetch_block)->default_value(fetch_block), "Fetch blocks") + ("fetch-traces", bpo::bool_switch(&fetch_traces)->default_value(fetch_traces), "Fetch traces") + ("fetch-deltas", bpo::bool_switch(&fetch_deltas)->default_value(fetch_deltas), "Fetch deltas") + ; + bpo::variables_map varmap; + bpo::store(bpo::parse_command_line(argc, argv, cli), varmap); + bpo::notify(varmap); + + if(help) { + cli.print(std::cout); + return 0; + } + + std::string::size_type colon = socket_address.find(':'); + eosio::check(colon != std::string::npos, "Missing ':' seperator in Websocket address and port"); + std::string statehistory_server = socket_address.substr(0, colon); + std::string statehistory_port = socket_address.substr(colon+1); + + try { + boost::asio::connect(stream.next_layer(), resolver.resolve(statehistory_server, statehistory_port)); + stream.handshake(statehistory_server, "/"); + + { + boost::beast::flat_buffer abi_buffer; + stream.read(abi_buffer); + std::string abi_string((const char*)abi_buffer.data().data(), abi_buffer.data().size()); + eosio::json_token_stream token_stream(abi_string.data()); + eosio::abi_def abidef = eosio::from_json(token_stream); + eosio::convert(abidef, abi); + } + + const eosio::abi_type& request_type = abi.abi_types.at("request"); + const eosio::abi_type& result_type = abi.abi_types.at("result"); + + rapidjson::StringBuffer request_sb; + rapidjson::PrettyWriter request_writer(request_sb); + + //struct get_blocks_request_v0 { + // uint32_t start_block_num = 0; + // uint32_t end_block_num = 0; + // uint32_t max_messages_in_flight = 0; + // std::vector have_positions = {}; + // bool irreversible_only = false; + // bool fetch_block = false; + // bool fetch_traces = false; + // bool fetch_deltas = false; + //}; + request_writer.StartArray(); + request_writer.String("get_blocks_request_v0"); + request_writer.StartObject(); + request_writer.Key("start_block_num"); + request_writer.Uint(start_block_num); + request_writer.Key("end_block_num"); + request_writer.String(std::to_string(end_block_num + 1).c_str()); // SHiP is (start-end] exclusive + request_writer.Key("max_messages_in_flight"); + request_writer.String(std::to_string(std::numeric_limits::max()).c_str()); + request_writer.Key("have_positions"); + request_writer.StartArray(); + request_writer.EndArray(); + request_writer.Key("irreversible_only"); + request_writer.Bool(irreversible_only); + request_writer.Key("fetch_block"); + request_writer.Bool(fetch_block); + request_writer.Key("fetch_traces"); + request_writer.Bool(fetch_traces); + request_writer.Key("fetch_deltas"); + request_writer.Bool(fetch_deltas); + request_writer.EndObject(); + request_writer.EndArray(); + + stream.binary(true); + stream.write(boost::asio::buffer(request_type.json_to_bin(request_sb.GetString(), [](){}))); + + bool is_first = true; + for(;;) { + boost::beast::flat_buffer buffer; + stream.read(buffer); + + eosio::input_stream is((const char*)buffer.data().data(), buffer.data().size()); + rapidjson::Document result_doucment; + result_doucment.Parse(result_type.bin_to_json(is).c_str()); + + eosio::check(!result_doucment.HasParseError(), "Failed to parse result JSON from abieos"); + eosio::check(result_doucment.IsArray(), "result should have been an array (variant) but it's not"); + eosio::check(result_doucment.Size() == 2, "result was an array but did not contain 2 items like a variant should"); + eosio::check(std::string(result_doucment[0].GetString()) == "get_blocks_result_v0", "result type doesn't look like get_blocks_result_v0"); + eosio::check(result_doucment[1].IsObject(), "second item in result array is not an object"); + eosio::check(result_doucment[1].HasMember("head"), "cannot find 'head' in result"); + eosio::check(result_doucment[1]["head"].IsObject(), "'head' is not an object"); + eosio::check(result_doucment[1]["head"].HasMember("block_num"), "'head' does not contain 'block_num'"); + eosio::check(result_doucment[1]["head"]["block_num"].IsUint(), "'head.block_num' isn't a number"); + + uint32_t this_block_num = 0; + if( result_doucment[1].HasMember("this_block") && result_doucment[1]["this_block"].IsObject() ) { + if( result_doucment[1]["this_block"].HasMember("block_num") && result_doucment[1]["this_block"]["block_num"].IsUint() ) { + this_block_num = result_doucment[1]["this_block"]["block_num"].GetUint(); + } + } + + if(is_first) { + std::cout << "[" << std::endl; + is_first = false; + } else { + std::cout << "," << std::endl; + } + std::cout << "{ \"get_blocks_result_v0\":" << std::endl; + + rapidjson::StringBuffer result_sb; + rapidjson::PrettyWriter result_writer(result_sb); + result_doucment[1].Accept(result_writer); + std::cout << result_sb.GetString() << std::endl << "}" << std::endl; + + if( this_block_num == end_block_num ) break; + } + + std::cout << "]" << std::endl; + } + catch(std::exception& e) { + std::cerr << "Caught exception: " << e.what() << std::endl; + return 1; + } + + return 0; +} diff --git a/tests/ship_streamer_test.py b/tests/ship_streamer_test.py new file mode 100755 index 0000000000..c6611190ce --- /dev/null +++ b/tests/ship_streamer_test.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 + +from testUtils import Utils +import time +from Cluster import Cluster +from WalletMgr import WalletMgr +from TestHelper import TestHelper +from TestHelper import AppArgs + +import json +import os +import shutil +import signal +import sys + +############################################################### +# ship_streamer_test +# +# This test sets up <-p> producing node(s) and <-n - -p> +# non-producing node(s). One of the non-producing nodes +# is configured with the state_history_plugin. An instance +# of node will be started with ship_streamer to exercise +# the SHiP API. +# +############################################################### + +Print=Utils.Print + +appArgs = AppArgs() +extraArgs = appArgs.add(flag="--num-blocks", type=int, help="How many blocsk to stream from ship_streamer", default=20) +extraArgs = appArgs.add(flag="--num-clients", type=int, help="How many ship_streamers should be started", default=1) +args = TestHelper.parse_args({"-p", "-n","--dump-error-details","--keep-logs","-v","--leave-running","--clean-run"}, applicationSpecificArgs=appArgs) + +Utils.Debug=args.v +totalProducerNodes=args.p +totalNodes=args.n +if totalNodes<=totalProducerNodes: + totalNodes=totalProducerNodes+1 +totalNonProducerNodes=totalNodes-totalProducerNodes +totalProducers=totalProducerNodes +cluster=Cluster(walletd=True) +dumpErrorDetails=args.dump_error_details +keepLogs=args.keep_logs +dontKill=args.leave_running +killAll=args.clean_run +walletPort=TestHelper.DEFAULT_WALLET_PORT + +walletMgr=WalletMgr(True, port=walletPort) +testSuccessful=False +killEosInstances=not dontKill +killWallet=not dontKill + +WalletdName=Utils.EosWalletName +shipTempDir=None + +try: + TestHelper.printSystemInfo("BEGIN") + + cluster.setWalletMgr(walletMgr) + cluster.killall(allInstances=killAll) + cluster.cleanup() + Print("Stand up cluster") + specificExtraNodeosArgs={} + # non-producing nodes are at the end of the cluster's nodes, so reserving the last one for state_history_plugin + shipNodeNum = totalNodes - 1 + specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --disable-replay-opts --trace-history --sync-fetch-span 200 --plugin eosio::net_api_plugin " + traceNodeosArgs=" --plugin eosio::trace_api_plugin --trace-no-abis " + + if cluster.launch(pnodes=totalProducerNodes, + totalNodes=totalNodes, totalProducers=totalProducers, + useBiosBootFile=False, specificExtraNodeosArgs=specificExtraNodeosArgs, extraNodeosArgs=traceNodeosArgs) is False: + Utils.cmdError("launcher") + Utils.errorExit("Failed to stand up eos cluster.") + + # *** identify each node (producers and non-producing node) *** + + shipNode = cluster.getNode(shipNodeNum) + prodNode = cluster.getNode(0) + + #verify nodes are in sync and advancing + cluster.waitOnClusterSync(blockAdvancing=5) + Print("Cluster in Sync") + + start_block_num = shipNode.getBlockNum() + end_block_num = start_block_num + args.num_blocks + + shipClient = "tests/ship_streamer" + cmd = "%s --start-block-num %d --end-block-num %d --fetch-block --fetch-traces --fetch-deltas" % (shipClient, start_block_num, end_block_num) + if Utils.Debug: Utils.Print("cmd: %s" % (cmd)) + clients = [] + files = [] + shipTempDir = os.path.join(Utils.DataDir, "ship") + os.makedirs(shipTempDir, exist_ok = True) + shipClientFilePrefix = os.path.join(shipTempDir, "client") + + starts = [] + for i in range(0, args.num_clients): + start = time.perf_counter() + outFile = open("%s%d.out" % (shipClientFilePrefix, i), "w") + errFile = open("%s%d.err" % (shipClientFilePrefix, i), "w") + Print("Start client %d" % (i)) + popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile) + starts.append(time.perf_counter()) + clients.append((popen, cmd)) + files.append((outFile, errFile)) + Print("Client %d started, Ship node head is: %s" % (i, shipNode.getBlockNum())) + + Print("Stopping all %d clients" % (args.num_clients)) + + for index, (popen, _), (out, err), start in zip(range(len(clients)), clients, files, starts): + popen.wait() + Print("Stopped client %d. Ran for %.3f seconds." % (index, time.perf_counter() - start)) + out.close() + err.close() + outFile = open("%s%d.out" % (shipClientFilePrefix, index), "r") + data = json.load(outFile) + block_num = start_block_num + for i in data: + print(i) + assert block_num == i['get_blocks_result_v0']['this_block']['block_num'], f"{block_num} != {i['get_blocks_result_v0']['this_block']['block_num']}" + assert isinstance(i['get_blocks_result_v0']['block'], str) # verify block in result + block_num += 1 + assert block_num-1 == end_block_num, f"{block_num-1} != {end_block_num}" + + Print("Shutdown state_history_plugin nodeos") + shipNode.kill(signal.SIGTERM) + + testSuccessful = True +finally: + TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, killEosInstances=killEosInstances, killWallet=killWallet, keepLogs=keepLogs, cleanRun=killAll, dumpErrorDetails=dumpErrorDetails) + if shipTempDir is not None: + if testSuccessful and not keepLogs: + shutil.rmtree(shipTempDir, ignore_errors=True) + +errorCode = 0 if testSuccessful else 1 +exit(errorCode) diff --git a/tests/ship_test.py b/tests/ship_test.py index 3dcda7beca..7db527e9b5 100755 --- a/tests/ship_test.py +++ b/tests/ship_test.py @@ -22,7 +22,7 @@ # This test sets up <-p> producing node(s) and <-n - -p> # non-producing node(s). One of the non-producing nodes # is configured with the state_history_plugin. An instance -# of node will be started with a client javascript to exercise +# of node will be started with a client to exercise # the SHiP API. # ############################################################### @@ -54,7 +54,6 @@ killWallet=not dontKill WalletdName=Utils.EosWalletName -ClientName="cleos" shipTempDir=None try: