Skip to content

Commit

Permalink
Merge pull request #472 from eosnetworkfoundation/GH#440-ship-live-bl…
Browse files Browse the repository at this point in the history
…ocks-3.1

Fix issue with current block not sent to SHiP clients - 3.1
  • Loading branch information
heifner authored Jun 21, 2022
2 parents 92d3547 + ba1df43 commit 5e3c31c
Show file tree
Hide file tree
Showing 5 changed files with 321 additions and 14 deletions.
26 changes: 14 additions & 12 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,14 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
result = state_history::zlib_decompress(compressed);
}

void get_block(uint32_t block_num, std::optional<bytes>& result) {
void get_block(uint32_t block_num, const block_state_ptr& block_state, std::optional<bytes>& result) {
chain::signed_block_ptr p;
try {
p = chain_plug->chain().fetch_block_by_number(block_num);
if( block_state && block_num == block_state->block_num ) {
p = block_state->block;
} else {
p = chain_plug->chain().fetch_block_by_number( block_num );
}
} catch (...) {
return;
}
Expand All @@ -88,11 +92,8 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
if (chain_state_log && block_num >= chain_state_log->begin_block() && block_num < chain_state_log->end_block())
return chain_state_log->get_block_id(block_num);
try {
auto block = chain_plug->chain().fetch_block_by_number(block_num);
if (block)
return block->calculate_id();
} catch (...) {
}
return chain_plug->chain().get_block_id_for_num(block_num);
} catch (...) {}
return {};
}

Expand Down Expand Up @@ -207,7 +208,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
send_update();
}

void send_update(get_blocks_result_v0 result) {
void send_update(get_blocks_result_v0 result, const block_state_ptr& block_state) {
need_to_send_update = true;
if (!send_queue.empty() || !current_request || !current_request->max_messages_in_flight)
return;
Expand All @@ -223,8 +224,9 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
auto prev_block_id = plugin->get_block_id(current_request->start_block_num - 1);
if (prev_block_id)
result.prev_block = block_position{current_request->start_block_num - 1, *prev_block_id};
if (current_request->fetch_block)
plugin->get_block(current_request->start_block_num, result.block);
if (current_request->fetch_block) {
plugin->get_block( current_request->start_block_num, block_state, result.block );
}
if (current_request->fetch_traces && plugin->trace_log)
plugin->get_log_entry(*plugin->trace_log, current_request->start_block_num, result.traces);
if (current_request->fetch_deltas && plugin->chain_state_log)
Expand All @@ -244,7 +246,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
return;
get_blocks_result_v0 result;
result.head = {block_state->block_num, block_state->id};
send_update(std::move(result));
send_update(std::move(result), block_state);
}

void send_update(bool changed = false) {
Expand All @@ -256,7 +258,7 @@ struct state_history_plugin_impl : std::enable_shared_from_this<state_history_pl
auto& chain = plugin->chain_plug->chain();
get_blocks_result_v0 result;
result.head = {chain.head_block_num(), chain.head_block_id()};
send_update(std::move(result));
send_update(std::move(result), {});
}

template <typename F>
Expand Down
7 changes: 7 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ include_directories( "${CMAKE_SOURCE_DIR}/plugins/wallet_plugin/include" )

file(GLOB UNIT_TESTS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.cpp")
list(REMOVE_ITEM UNIT_TESTS ship_client.cpp)
list(REMOVE_ITEM UNIT_TESTS ship_streamer.cpp)

add_executable( plugin_test ${UNIT_TESTS} )
target_link_libraries( plugin_test eosio_testing eosio_chain chainbase chain_plugin wallet_plugin fc state_history ${PLATFORM_SPECIFIC_LIBS} )
Expand Down Expand Up @@ -49,6 +50,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/full-version-label.sh ${CMAKE_CURRENT
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/nodeos_producer_watermark_test.py ${CMAKE_CURRENT_BINARY_DIR}/nodeos_producer_watermark_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cli_test.py ${CMAKE_CURRENT_BINARY_DIR}/cli_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_streamer_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_streamer_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/large-lib-test.py ${CMAKE_CURRENT_BINARY_DIR}/large-lib-test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_BINARY_DIR}/http_plugin_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/p2p_high_latency_test.py ${CMAKE_CURRENT_BINARY_DIR}/p2p_high_latency_test.py COPYONLY)
Expand Down Expand Up @@ -87,10 +89,15 @@ endif()
find_package(Threads)
add_executable(ship_client ship_client.cpp)
target_link_libraries(ship_client abieos Boost::program_options Threads::Threads)
add_executable(ship_streamer ship_streamer.cpp)
target_link_libraries(ship_streamer abieos Boost::program_options Threads::Threads)

add_test(NAME ship_test COMMAND tests/ship_test.py -v --num-clients 1 --num-requests 5000 --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_test PROPERTY LABELS nonparallelizable_tests)

add_test(NAME ship_streamer_test COMMAND tests/ship_streamer_test.py -v --num-clients 1 --num-blocks 50 --clean-run --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_streamer_test PROPERTY LABELS nonparallelizable_tests)

add_test(NAME p2p_dawn515_test COMMAND tests/p2p_tests/dawn_515/test.sh WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST p2p_dawn515_test PROPERTY LABELS nonparallelizable_tests)

Expand Down
163 changes: 163 additions & 0 deletions tests/ship_streamer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#include <eosio/abi.hpp>
#include <eosio/from_json.hpp>
#include <eosio/convert.hpp>

#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/prettywriter.h>

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/program_options.hpp>

#include <iostream>
#include <string>

namespace bpo = boost::program_options;

int main(int argc, char* argv[]) {
boost::asio::io_context ctx;
boost::asio::ip::tcp::resolver resolver(ctx);
boost::beast::websocket::stream<boost::asio::ip::tcp::socket> stream(ctx);
eosio::abi abi;

bpo::options_description cli("ship_streamer command line options");
bool help = false;
std::string socket_address = "127.0.0.1:8080";
uint32_t start_block_num = 1;
uint32_t end_block_num = std::numeric_limits<u_int32_t>::max()-1;
bool irreversible_only = false;
bool fetch_block = false;
bool fetch_traces = false;
bool fetch_deltas = false;

cli.add_options()
("help,h", bpo::bool_switch(&help)->default_value(false), "Print this help message and exit.")
("socket-address,a", bpo::value<std::string>(&socket_address)->default_value(socket_address), "Websocket address and port.")
("start-block-num", bpo::value<uint32_t>(&start_block_num)->default_value(start_block_num), "Block to start streaming from")
("end-block-num", bpo::value<uint32_t>(&end_block_num)->default_value(end_block_num), "Block to stop streaming")
("irreversible-only", bpo::bool_switch(&irreversible_only)->default_value(irreversible_only), "Irreversible blocks only")
("fetch-block", bpo::bool_switch(&fetch_block)->default_value(fetch_block), "Fetch blocks")
("fetch-traces", bpo::bool_switch(&fetch_traces)->default_value(fetch_traces), "Fetch traces")
("fetch-deltas", bpo::bool_switch(&fetch_deltas)->default_value(fetch_deltas), "Fetch deltas")
;
bpo::variables_map varmap;
bpo::store(bpo::parse_command_line(argc, argv, cli), varmap);
bpo::notify(varmap);

if(help) {
cli.print(std::cout);
return 0;
}

std::string::size_type colon = socket_address.find(':');
eosio::check(colon != std::string::npos, "Missing ':' seperator in Websocket address and port");
std::string statehistory_server = socket_address.substr(0, colon);
std::string statehistory_port = socket_address.substr(colon+1);

try {
boost::asio::connect(stream.next_layer(), resolver.resolve(statehistory_server, statehistory_port));
stream.handshake(statehistory_server, "/");

{
boost::beast::flat_buffer abi_buffer;
stream.read(abi_buffer);
std::string abi_string((const char*)abi_buffer.data().data(), abi_buffer.data().size());
eosio::json_token_stream token_stream(abi_string.data());
eosio::abi_def abidef = eosio::from_json<eosio::abi_def>(token_stream);
eosio::convert(abidef, abi);
}

const eosio::abi_type& request_type = abi.abi_types.at("request");
const eosio::abi_type& result_type = abi.abi_types.at("result");

rapidjson::StringBuffer request_sb;
rapidjson::PrettyWriter<rapidjson::StringBuffer> request_writer(request_sb);

//struct get_blocks_request_v0 {
// uint32_t start_block_num = 0;
// uint32_t end_block_num = 0;
// uint32_t max_messages_in_flight = 0;
// std::vector<block_position> have_positions = {};
// bool irreversible_only = false;
// bool fetch_block = false;
// bool fetch_traces = false;
// bool fetch_deltas = false;
//};
request_writer.StartArray();
request_writer.String("get_blocks_request_v0");
request_writer.StartObject();
request_writer.Key("start_block_num");
request_writer.Uint(start_block_num);
request_writer.Key("end_block_num");
request_writer.String(std::to_string(end_block_num + 1).c_str()); // SHiP is (start-end] exclusive
request_writer.Key("max_messages_in_flight");
request_writer.String(std::to_string(std::numeric_limits<u_int32_t>::max()).c_str());
request_writer.Key("have_positions");
request_writer.StartArray();
request_writer.EndArray();
request_writer.Key("irreversible_only");
request_writer.Bool(irreversible_only);
request_writer.Key("fetch_block");
request_writer.Bool(fetch_block);
request_writer.Key("fetch_traces");
request_writer.Bool(fetch_traces);
request_writer.Key("fetch_deltas");
request_writer.Bool(fetch_deltas);
request_writer.EndObject();
request_writer.EndArray();

stream.binary(true);
stream.write(boost::asio::buffer(request_type.json_to_bin(request_sb.GetString(), [](){})));

bool is_first = true;
for(;;) {
boost::beast::flat_buffer buffer;
stream.read(buffer);

eosio::input_stream is((const char*)buffer.data().data(), buffer.data().size());
rapidjson::Document result_doucment;
result_doucment.Parse(result_type.bin_to_json(is).c_str());

eosio::check(!result_doucment.HasParseError(), "Failed to parse result JSON from abieos");
eosio::check(result_doucment.IsArray(), "result should have been an array (variant) but it's not");
eosio::check(result_doucment.Size() == 2, "result was an array but did not contain 2 items like a variant should");
eosio::check(std::string(result_doucment[0].GetString()) == "get_blocks_result_v0", "result type doesn't look like get_blocks_result_v0");
eosio::check(result_doucment[1].IsObject(), "second item in result array is not an object");
eosio::check(result_doucment[1].HasMember("head"), "cannot find 'head' in result");
eosio::check(result_doucment[1]["head"].IsObject(), "'head' is not an object");
eosio::check(result_doucment[1]["head"].HasMember("block_num"), "'head' does not contain 'block_num'");
eosio::check(result_doucment[1]["head"]["block_num"].IsUint(), "'head.block_num' isn't a number");

uint32_t this_block_num = 0;
if( result_doucment[1].HasMember("this_block") && result_doucment[1]["this_block"].IsObject() ) {
if( result_doucment[1]["this_block"].HasMember("block_num") && result_doucment[1]["this_block"]["block_num"].IsUint() ) {
this_block_num = result_doucment[1]["this_block"]["block_num"].GetUint();
}
}

if(is_first) {
std::cout << "[" << std::endl;
is_first = false;
} else {
std::cout << "," << std::endl;
}
std::cout << "{ \"get_blocks_result_v0\":" << std::endl;

rapidjson::StringBuffer result_sb;
rapidjson::PrettyWriter<rapidjson::StringBuffer> result_writer(result_sb);
result_doucment[1].Accept(result_writer);
std::cout << result_sb.GetString() << std::endl << "}" << std::endl;

if( this_block_num == end_block_num ) break;
}

std::cout << "]" << std::endl;
}
catch(std::exception& e) {
std::cerr << "Caught exception: " << e.what() << std::endl;
return 1;
}

return 0;
}
Loading

0 comments on commit 5e3c31c

Please sign in to comment.