Skip to content

Commit

Permalink
Merge pull request #494 from eosnetworkfoundation/GH#440-ship-live-bl…
Browse files Browse the repository at this point in the history
…ocks

Fix issue with current block not sent to SHiP clients - main
  • Loading branch information
heifner authored Jun 22, 2022
2 parents a4ffdf5 + 1ab3676 commit d7f4e96
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 17 deletions.
26 changes: 14 additions & 12 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,14 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
result = state_history::zlib_decompress(compressed);
}

void get_block(uint32_t block_num, std::optional<bytes>& result) {
void get_block(uint32_t block_num, const block_state_ptr& block_state, std::optional<bytes>& result) {
chain::signed_block_ptr p;
try {
p = chain_plug->chain().fetch_block_by_number(block_num);
if( block_state && block_num == block_state->block_num ) {
p = block_state->block;
} else {
p = chain_plug->chain().fetch_block_by_number( block_num );
}
} catch (...) {
return;
}
Expand All @@ -88,11 +92,8 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
if (chain_state_log && block_num >= chain_state_log->begin_block() && block_num < chain_state_log->end_block())
return chain_state_log->get_block_id(block_num);
try {
auto block = chain_plug->chain().fetch_block_by_number(block_num);
if (block)
return block->calculate_id();
} catch (...) {
}
return chain_plug->chain().get_block_id_for_num(block_num);
} catch (...) {}
return {};
}

Expand Down Expand Up @@ -207,7 +208,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
send_update();
}

void send_update(get_blocks_result_v0 result) {
void send_update(get_blocks_result_v0 result, const block_state_ptr& block_state) {
need_to_send_update = true;
if (!send_queue.empty() || !current_request || !current_request->max_messages_in_flight)
return;
Expand All @@ -223,8 +224,9 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
auto prev_block_id = plugin->get_block_id(current_request->start_block_num - 1);
if (prev_block_id)
result.prev_block = block_position{current_request->start_block_num - 1, *prev_block_id};
if (current_request->fetch_block)
plugin->get_block(current_request->start_block_num, result.block);
if (current_request->fetch_block) {
plugin->get_block( current_request->start_block_num, block_state, result.block );
}
if (current_request->fetch_traces && plugin->trace_log)
plugin->get_log_entry(*plugin->trace_log, current_request->start_block_num, result.traces);
if (current_request->fetch_deltas && plugin->chain_state_log)
Expand All @@ -244,7 +246,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
return;
get_blocks_result_v0 result;
result.head = {block_state->block_num, block_state->id};
send_update(std::move(result));
send_update(std::move(result), block_state);
}

void send_update(bool changed = false) {
Expand All @@ -256,7 +258,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
auto& chain = plugin->chain_plug->chain();
get_blocks_result_v0 result;
result.head = {chain.head_block_num(), chain.head_block_id()};
send_update(std::move(result));
send_update(std::move(result), {});
}

template <typename F>
Expand Down
19 changes: 16 additions & 3 deletions scripts/pinned_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@

echo "Mandel Pinned Build"

if [[ $NAME != "Ubuntu" ]]
then
echo "Currently only supporting Ubuntu based builds. Proceed at your own risk."
if [[ "$(uname)" == "Linux" ]]; then
if [[ -e /etc/os-release ]]; then
# obtain NAME and other information
. /etc/os-release
if [[ ${NAME} != "Ubuntu" ]]; then
echo "Currently only supporting Ubuntu based builds. Proceed at your own risk."
fi
else
echo "Currently only supporting Ubuntu based builds. /etc/os-release not found. Your Linux distribution is not supported. Proceed at your own risk."
fi
else
echo "Currently only supporting Ubuntu based builds. Your architecture is not supported. Proceed at your own risk."
fi

if [ $# -eq 0 ] || [ -z "$1" ]
Expand All @@ -25,6 +34,7 @@ LLVM_VER=7.1.0
LIBPQXX_VER=7.2.1
ARCH=`uname -m`
SCRIPT_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]:-$0}"; )" &> /dev/null && pwd 2> /dev/null; )";
START_DIR="$(pwd)"


pushdir() {
Expand Down Expand Up @@ -113,6 +123,9 @@ install_clang ${DEP_DIR}/clang-${CLANG_VER}
install_llvm ${DEP_DIR}/llvm-${LLVM_VER}
install_boost ${DEP_DIR}/boost_${BOOST_VER//\./_}

# go back to the directory where the script starts
popdir ${START_DIR}

pushdir ${MANDEL_DIR}

# build Mandel
Expand Down
7 changes: 7 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ include_directories( "${CMAKE_SOURCE_DIR}/plugins/wallet_plugin/include" )

file(GLOB UNIT_TESTS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.cpp")
list(REMOVE_ITEM UNIT_TESTS ship_client.cpp)
list(REMOVE_ITEM UNIT_TESTS ship_streamer.cpp)

add_executable( plugin_test ${UNIT_TESTS} )
target_link_libraries( plugin_test eosio_testing eosio_chain chainbase chain_plugin wallet_plugin fc state_history ${PLATFORM_SPECIFIC_LIBS} )
Expand Down Expand Up @@ -49,6 +50,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/full-version-label.sh ${CMAKE_CURRENT
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/nodeos_producer_watermark_test.py ${CMAKE_CURRENT_BINARY_DIR}/nodeos_producer_watermark_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cli_test.py ${CMAKE_CURRENT_BINARY_DIR}/cli_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_streamer_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_streamer_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/large-lib-test.py ${CMAKE_CURRENT_BINARY_DIR}/large-lib-test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_BINARY_DIR}/http_plugin_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_high_latency_test.py ${CMAKE_CURRENT_BINARY_DIR}/p2p_high_latency_test.py COPYONLY)
Expand Down Expand Up @@ -87,10 +89,15 @@ endif()
find_package(Threads)
add_executable(ship_client ship_client.cpp)
target_link_libraries(ship_client abieos Boost::program_options Threads::Threads)
add_executable(ship_streamer ship_streamer.cpp)
target_link_libraries(ship_streamer abieos Boost::program_options Threads::Threads)

add_test(NAME ship_test COMMAND tests/ship_test.py -v --num-clients 1 --num-requests 5000 --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_test PROPERTY LABELS nonparallelizable_tests)

add_test(NAME ship_streamer_test COMMAND tests/ship_streamer_test.py -v --num-clients 1 --num-blocks 50 --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_streamer_test PROPERTY LABELS nonparallelizable_tests)

add_test(NAME p2p_dawn515_test COMMAND tests/p2p_tests/dawn_515/test.sh WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST p2p_dawn515_test PROPERTY LABELS nonparallelizable_tests)

Expand Down
163 changes: 163 additions & 0 deletions tests/ship_streamer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#include <eosio/abi.hpp>
#include <eosio/from_json.hpp>
#include <eosio/convert.hpp>

#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/prettywriter.h>

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/program_options.hpp>

#include <iostream>
#include <string>

namespace bpo = boost::program_options;

int main(int argc, char* argv[]) {
boost::asio::io_context ctx;
boost::asio::ip::tcp::resolver resolver(ctx);
boost::beast::websocket::stream<boost::asio::ip::tcp::socket> stream(ctx);
eosio::abi abi;

bpo::options_description cli("ship_streamer command line options");
bool help = false;
std::string socket_address = "127.0.0.1:8080";
uint32_t start_block_num = 1;
uint32_t end_block_num = std::numeric_limits<u_int32_t>::max()-1;
bool irreversible_only = false;
bool fetch_block = false;
bool fetch_traces = false;
bool fetch_deltas = false;

cli.add_options()
("help,h", bpo::bool_switch(&help)->default_value(false), "Print this help message and exit.")
("socket-address,a", bpo::value<std::string>(&socket_address)->default_value(socket_address), "Websocket address and port.")
("start-block-num", bpo::value<uint32_t>(&start_block_num)->default_value(start_block_num), "Block to start streaming from")
("end-block-num", bpo::value<uint32_t>(&end_block_num)->default_value(end_block_num), "Block to stop streaming")
("irreversible-only", bpo::bool_switch(&irreversible_only)->default_value(irreversible_only), "Irreversible blocks only")
("fetch-block", bpo::bool_switch(&fetch_block)->default_value(fetch_block), "Fetch blocks")
("fetch-traces", bpo::bool_switch(&fetch_traces)->default_value(fetch_traces), "Fetch traces")
("fetch-deltas", bpo::bool_switch(&fetch_deltas)->default_value(fetch_deltas), "Fetch deltas")
;
bpo::variables_map varmap;
bpo::store(bpo::parse_command_line(argc, argv, cli), varmap);
bpo::notify(varmap);

if(help) {
cli.print(std::cout);
return 0;
}

std::string::size_type colon = socket_address.find(':');
eosio::check(colon != std::string::npos, "Missing ':' seperator in Websocket address and port");
std::string statehistory_server = socket_address.substr(0, colon);
std::string statehistory_port = socket_address.substr(colon+1);

try {
boost::asio::connect(stream.next_layer(), resolver.resolve(statehistory_server, statehistory_port));
stream.handshake(statehistory_server, "/");

{
boost::beast::flat_buffer abi_buffer;
stream.read(abi_buffer);
std::string abi_string((const char*)abi_buffer.data().data(), abi_buffer.data().size());
eosio::json_token_stream token_stream(abi_string.data());
eosio::abi_def abidef = eosio::from_json<eosio::abi_def>(token_stream);
eosio::convert(abidef, abi);
}

const eosio::abi_type& request_type = abi.abi_types.at("request");
const eosio::abi_type& result_type = abi.abi_types.at("result");

rapidjson::StringBuffer request_sb;
rapidjson::PrettyWriter<rapidjson::StringBuffer> request_writer(request_sb);

//struct get_blocks_request_v0 {
// uint32_t start_block_num = 0;
// uint32_t end_block_num = 0;
// uint32_t max_messages_in_flight = 0;
// std::vector<block_position> have_positions = {};
// bool irreversible_only = false;
// bool fetch_block = false;
// bool fetch_traces = false;
// bool fetch_deltas = false;
//};
request_writer.StartArray();
request_writer.String("get_blocks_request_v0");
request_writer.StartObject();
request_writer.Key("start_block_num");
request_writer.Uint(start_block_num);
request_writer.Key("end_block_num");
request_writer.String(std::to_string(end_block_num + 1).c_str()); // SHiP is (start-end] exclusive
request_writer.Key("max_messages_in_flight");
request_writer.String(std::to_string(std::numeric_limits<u_int32_t>::max()).c_str());
request_writer.Key("have_positions");
request_writer.StartArray();
request_writer.EndArray();
request_writer.Key("irreversible_only");
request_writer.Bool(irreversible_only);
request_writer.Key("fetch_block");
request_writer.Bool(fetch_block);
request_writer.Key("fetch_traces");
request_writer.Bool(fetch_traces);
request_writer.Key("fetch_deltas");
request_writer.Bool(fetch_deltas);
request_writer.EndObject();
request_writer.EndArray();

stream.binary(true);
stream.write(boost::asio::buffer(request_type.json_to_bin(request_sb.GetString(), [](){})));

bool is_first = true;
for(;;) {
boost::beast::flat_buffer buffer;
stream.read(buffer);

eosio::input_stream is((const char*)buffer.data().data(), buffer.data().size());
rapidjson::Document result_doucment;
result_doucment.Parse(result_type.bin_to_json(is).c_str());

eosio::check(!result_doucment.HasParseError(), "Failed to parse result JSON from abieos");
eosio::check(result_doucment.IsArray(), "result should have been an array (variant) but it's not");
eosio::check(result_doucment.Size() == 2, "result was an array but did not contain 2 items like a variant should");
eosio::check(std::string(result_doucment[0].GetString()) == "get_blocks_result_v0", "result type doesn't look like get_blocks_result_v0");
eosio::check(result_doucment[1].IsObject(), "second item in result array is not an object");
eosio::check(result_doucment[1].HasMember("head"), "cannot find 'head' in result");
eosio::check(result_doucment[1]["head"].IsObject(), "'head' is not an object");
eosio::check(result_doucment[1]["head"].HasMember("block_num"), "'head' does not contain 'block_num'");
eosio::check(result_doucment[1]["head"]["block_num"].IsUint(), "'head.block_num' isn't a number");

uint32_t this_block_num = 0;
if( result_doucment[1].HasMember("this_block") && result_doucment[1]["this_block"].IsObject() ) {
if( result_doucment[1]["this_block"].HasMember("block_num") && result_doucment[1]["this_block"]["block_num"].IsUint() ) {
this_block_num = result_doucment[1]["this_block"]["block_num"].GetUint();
}
}

if(is_first) {
std::cout << "[" << std::endl;
is_first = false;
} else {
std::cout << "," << std::endl;
}
std::cout << "{ \"get_blocks_result_v0\":" << std::endl;

rapidjson::StringBuffer result_sb;
rapidjson::PrettyWriter<rapidjson::StringBuffer> result_writer(result_sb);
result_doucment[1].Accept(result_writer);
std::cout << result_sb.GetString() << std::endl << "}" << std::endl;

if( this_block_num == end_block_num ) break;
}

std::cout << "]" << std::endl;
}
catch(std::exception& e) {
std::cerr << "Caught exception: " << e.what() << std::endl;
return 1;
}

return 0;
}
Loading

0 comments on commit d7f4e96

Please sign in to comment.