Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.2] Implement JSON Snapshot Reader #732

Closed
wants to merge 7 commits into from
1 change: 0 additions & 1 deletion libraries/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ add_subdirectory( chain )
add_subdirectory( testing )
add_subdirectory( version )
add_subdirectory( state_history )
add_subdirectory( rapidjson )

set(USE_EXISTING_SOFTFLOAT ON CACHE BOOL "use pre-exisiting softfloat lib")
set(ENABLE_TOOLS OFF CACHE BOOL "Build tools")
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ target_link_libraries( eosio_chain PUBLIC fc chainbase Logging IR WAST WASM Runt
target_include_directories( eosio_chain
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include"
"${CMAKE_CURRENT_SOURCE_DIR}/../wasm-jit/Include"
"${CMAKE_CURRENT_SOURCE_DIR}/../rapidjson/include"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This really needs to be an INTERFACE library

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you provide an example of what that would look like. I didn't see an example in our code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like (I'm just winging this so may not be fully correct)

add_library(rapidjson INTERFACE)
target_include_directories(rapidjson INTERFACE rapidjson/include)
target_compile_definitions(rapidjson INTERFACE -DRAPIDJSON_NAMESPACE=eosio_rapidjson)

then wherever you want to use it just

target_link_libraries(eosio_chain rapidjson)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if making the RAPIDJSON_NAMESPACE thing part of the interface is a good idea or not though.

)

add_library(eosio_chain_wrap INTERFACE )
Expand Down
8 changes: 3 additions & 5 deletions libraries/chain/include/eosio/chain/snapshot.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include <fc/variant_object.hpp>
#include <boost/core/demangle.hpp>
#include <ostream>
#include "../rapidjson/include/rapidjson/document.h"
#include <memory>

namespace eosio { namespace chain {
/**
Expand Down Expand Up @@ -377,6 +377,7 @@ namespace eosio { namespace chain {
class istream_json_snapshot_reader : public snapshot_reader {
public:
explicit istream_json_snapshot_reader(const fc::path& p);
~istream_json_snapshot_reader();

void validate() const override;
void set_section( const string& section_name ) override;
Expand All @@ -388,10 +389,7 @@ namespace eosio { namespace chain {
private:
bool validate_section() const;

uint64_t num_rows;
uint64_t cur_row;
rapidjson::Document doc;
std::string sec_name;
std::unique_ptr<struct istream_json_snapshot_reader_impl> impl;
};

class integrity_hash_snapshot_writer : public snapshot_writer {
Expand Down
100 changes: 58 additions & 42 deletions libraries/chain/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@
#include <eosio/chain/exceptions.hpp>
#include <fc/scoped_exit.hpp>
#include <fc/io/json.hpp>
#include "../rapidjson/include/rapidjson/filereadstream.h"
#include "../rapidjson/include/rapidjson/stringbuffer.h"
#include "../rapidjson/include/rapidjson/writer.h"

#define RAPIDJSON_NAMESPACE eosio_rapidjson
#include <rapidjson/document.h>
#include <rapidjson/filereadstream.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>

using namespace eosio_rapidjson;

namespace eosio { namespace chain {

variant_snapshot_writer::variant_snapshot_writer(fc::mutable_variant_object& snapshot)
Expand Down Expand Up @@ -199,19 +205,19 @@ ostream_json_snapshot_writer::ostream_json_snapshot_writer(std::ostream& snapsho
void ostream_json_snapshot_writer::write_start_section( const std::string& section_name )
{
row_count = 0;
snapshot.inner << "," << fc::json::to_string(section_name, fc::time_point::maximum()) << ":{\n";
snapshot.inner << "," << fc::json::to_string(section_name, fc::time_point::maximum()) << ":{\n\"rows\":[\n";
}

void ostream_json_snapshot_writer::write_row( const detail::abstract_snapshot_row_writer& row_writer ) {
const auto yield = [&](size_t s) {};

if(row_count != 0) snapshot.inner << ",";
snapshot.inner << "\"row_" << row_count << "\":" << fc::json::to_string(row_writer.to_variant(), yield) << "\n";
snapshot.inner << fc::json::to_string(row_writer.to_variant(), yield) << "\n";
++row_count;
}

void ostream_json_snapshot_writer::write_end_section( ) {
snapshot.inner << "}\n";
snapshot.inner << "],\n\"num_rows\":" << row_count << "\n}\n";
row_count = 0;
}

Expand Down Expand Up @@ -339,37 +345,42 @@ void istream_snapshot_reader::return_to_header() {
clear_section();
}

struct istream_json_snapshot_reader_impl {
uint64_t num_rows;
uint64_t cur_row;
eosio_rapidjson::Document doc;
std::string sec_name;
};

istream_json_snapshot_reader::~istream_json_snapshot_reader() = default;

istream_json_snapshot_reader::istream_json_snapshot_reader(const fc::path& p)
:num_rows(0)
,cur_row(0)
: impl{new istream_json_snapshot_reader_impl{0, 0, {}, {}}}
{
FILE* fp = fopen(p.string().c_str(), "rb");
EOS_ASSERT(fp, snapshot_exception, "Failed to JSON snapshot: ${file}", ("file", p));
auto close = fc::make_scoped_exit( [&fp]() { fclose( fp ); } );
char readBuffer[65536];
rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
doc.ParseStream(is);
fclose(fp);
eosio_rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
impl->doc.ParseStream(is);
}

void istream_json_snapshot_reader::validate() const {
try {
// validate totem
auto expected_totem = ostream_json_snapshot_writer::magic_number;
decltype(expected_totem) actual_totem;
if (doc.HasMember("magic_number")) {
actual_totem = doc["magic_number"].GetInt();
EOS_ASSERT(actual_totem == expected_totem, snapshot_exception,
"JSON snapshot has unexpected magic number!");
}
EOS_ASSERT(impl->doc.HasMember("magic_number"), snapshot_exception, "magic_number section not found" );
auto actual_totem = impl->doc["magic_number"].GetInt();
EOS_ASSERT( actual_totem == expected_totem, snapshot_exception, "JSON snapshot has unexpected magic number" );

// validate version
auto expected_version = current_snapshot_version;
decltype(expected_version) actual_version;
if (doc.HasMember("version")) {
actual_version = doc["version"].GetInt();
EOS_ASSERT(actual_version == expected_version, snapshot_exception,
"JSON snapshot is an unsuppored version. Expected : ${expected}, Got: ${actual}",
("expected", expected_version)("actual", actual_version));
}
EOS_ASSERT(impl->doc.HasMember("version"), snapshot_exception, "version section not found" );
auto actual_version = impl->doc["version"].GetInt();
EOS_ASSERT( actual_version == expected_version, snapshot_exception,
"JSON snapshot is an unsupported version. Expected : ${expected}, Got: ${actual}",
("expected", expected_version)( "actual", actual_version ) );

} catch( const std::exception& e ) { \
snapshot_exception fce(FC_LOG_MESSAGE( warn, "JSON snapshot validation threw IO exception (${what})",("what",e.what())));
throw fce;
Expand All @@ -381,33 +392,38 @@ bool istream_json_snapshot_reader::validate_section() const {
}

void istream_json_snapshot_reader::set_section( const string& section_name ) {
if (doc.HasMember(section_name.c_str())) {
sec_name = section_name.c_str();
cur_row = 0;
num_rows = doc[section_name.c_str()].Size();
} else {
EOS_THROW(snapshot_exception, "JSON snapshot has no section named ${n}", ("n", section_name));
}
EOS_ASSERT( impl->doc.HasMember( section_name.c_str() ), snapshot_exception, "JSON snapshot has no section ${sec}", ("sec", section_name) );
EOS_ASSERT( impl->doc[section_name.c_str()].HasMember( "num_rows" ), snapshot_exception, "JSON snapshot ${sec} num_rows not found", ("sec", section_name) );
EOS_ASSERT( impl->doc[section_name.c_str()].HasMember( "rows" ), snapshot_exception, "JSON snapshot ${sec} rows not found", ("sec", section_name) );
EOS_ASSERT( impl->doc[section_name.c_str()]["rows"].IsArray(), snapshot_exception, "JSON snapshot ${sec} rows is not an array", ("sec_name", section_name) );

impl->sec_name = section_name;
impl->num_rows = impl->doc[section_name.c_str()]["num_rows"].GetInt();
ilog( "reading ${section_name}, num_rows: ${num_rows}", ("section_name", section_name)( "num_rows", impl->num_rows ) );
}

bool istream_json_snapshot_reader::read_row( detail::abstract_snapshot_row_reader& row_reader ) {
std::string row_name = "row_" + std::to_string(cur_row);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
doc[sec_name.c_str()][row_name.c_str()].Accept(writer);
const auto& row = fc::json::from_string(buffer.GetString());
row_reader.provide(row);
return ++cur_row < num_rows;
EOS_ASSERT( impl->cur_row < impl->num_rows, snapshot_exception, "JSON snapshot ${sect}'s cur_row ${cur_row} >= num_rows ${num_rows}",
("sect_name", impl->sec_name)( "cur_row", impl->cur_row )( "num_rows", impl->num_rows ) );

const eosio_rapidjson::Value& rows = impl->doc[impl->sec_name.c_str()]["rows"];
eosio_rapidjson::StringBuffer buffer;
eosio_rapidjson::Writer<eosio_rapidjson::StringBuffer> writer( buffer );
rows[impl->cur_row].Accept( writer );

const auto& row = fc::json::from_string( buffer.GetString() );
row_reader.provide( row );
return ++impl->cur_row < impl->num_rows;
}

bool istream_json_snapshot_reader::empty ( ) {
return num_rows == 0;
return impl->num_rows == 0;
}

void istream_json_snapshot_reader::clear_section() {
num_rows = 0;
cur_row = 0;
sec_name = "";
impl->num_rows = 0;
impl->cur_row = 0;
impl->sec_name = "";
}

void istream_json_snapshot_reader::return_to_header() {
Expand Down
76 changes: 30 additions & 46 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class chain_plugin_impl {
std::optional<vm_type> wasm_runtime;
fc::microseconds abi_serializer_max_time_us;
std::optional<bfs::path> snapshot_path;
std::optional<bfs::path> json_snapshot_path;
snapshot_reader_ptr snapshot_json_reader_ptr;


// retained references to channels for easy publication
Expand Down Expand Up @@ -377,8 +377,7 @@ void chain_plugin::set_program_options(options_description& cli, options_descrip
"stop hard replay / block log recovery at this block number (if set to non-zero number)")
("terminate-at-block", bpo::value<uint32_t>()->default_value(0),
"terminate after reaching this block number (if set to a non-zero number)")
("snapshot", bpo::value<bfs::path>(), "File to read Snapshot State from")
("json-snapshot", bpo::value<bfs::path>(), "JSON File to read Snapshot State from")
("snapshot", bpo::value<bfs::path>(), "File to read snapshot state from, .json extension for JSON input.")
;

}
Expand Down Expand Up @@ -893,6 +892,7 @@ void chain_plugin::plugin_initialize(const variables_map& options) {
my->chain->add_indices();
my->chain->startup(shutdown, check_shutdown, reader);
infile.close();
EOS_ASSERT( !app().is_quiting(), snapshot_exception, "Loading of snapshot failed" );
app().quit(); // shutdown as we will be finished after writing the snapshot

ilog("Writing snapshot: ${s}", ("s", my->snapshot_path->generic_string() + ".json"));
Expand Down Expand Up @@ -943,62 +943,45 @@ void chain_plugin::plugin_initialize(const variables_map& options) {
wlog( "The --truncate-at-block option can only be used with --hard-replay-blockchain." );
}

if (options.count( "snapshot" ) || options.count( "json-snapshot")) {
bfs::path snap_path;
std::string command;
bool is_json_snapshot = false;
if (options.count( "snapshot" )) {
snap_path = options.at( "snapshot" ).as<bfs::path>();
my->snapshot_path = snap_path;
command = "--snapshot";
} else {
snap_path = options.at( "json-snapshot" ).as<bfs::path>();
is_json_snapshot = true;
my->json_snapshot_path = snap_path;
command = "--json-snapshot";
}
if (options.count( "snapshot" )) {
bfs::path snap_path = options.at( "snapshot" ).as<bfs::path>();
EOS_ASSERT( fc::exists(snap_path), plugin_config_exception,
"Cannot load snapshot, ${name} does not exist", ("name", snap_path.generic_string()) );
my->snapshot_path = snap_path;

// recover genesis information from the snapshot
// used for validation code below
auto infile = std::ifstream(snap_path.generic_string(), (std::ios::in | std::ios::binary));
shared_ptr<snapshot_reader> reader_ptr;
if (is_json_snapshot) {
reader_ptr = shared_ptr<snapshot_reader>(new istream_json_snapshot_reader(snap_path));
if( my->snapshot_path->extension().generic_string().compare( ".json" ) == 0 ) {
ilog("Reading JSON snapshot, this may take a significant amount of time.");
my->snapshot_json_reader_ptr.reset( new istream_json_snapshot_reader(snap_path) );
my->snapshot_json_reader_ptr->validate();
chain_id = controller::extract_chain_id(*my->snapshot_json_reader_ptr);
} else {
reader_ptr = shared_ptr<snapshot_reader>(new istream_snapshot_reader(infile));
std::ifstream infile = std::ifstream(snap_path.generic_string(), (std::ios::in | std::ios::binary));
istream_snapshot_reader reader(infile);
reader.validate();
chain_id = controller::extract_chain_id(reader);
}
reader_ptr->validate();
chain_id = controller::extract_chain_id(*reader_ptr);
infile.close();

EOS_ASSERT( options.count( "genesis-timestamp" ) == 0,
plugin_config_exception,
command + " is incompatible with --genesis-timestamp as the snapshot contains genesis information");
EOS_ASSERT( options.count( "genesis-json" ) == 0,
plugin_config_exception,
command + " is incompatible with --genesis-json as the snapshot contains genesis information");
EOS_ASSERT( options.count( "genesis-timestamp" ) == 0, plugin_config_exception,
"--snapshot is incompatible with --genesis-timestamp as the snapshot contains genesis information");
EOS_ASSERT( options.count( "genesis-json" ) == 0, plugin_config_exception,
"--snapshot is incompatible with --genesis-json as the snapshot contains genesis information");

auto shared_mem_path = my->chain_config->state_dir / "shared_memory.bin";
EOS_ASSERT( !fc::is_regular_file(shared_mem_path),
plugin_config_exception,
EOS_ASSERT( !fc::is_regular_file(shared_mem_path), plugin_config_exception,
"A snapshot can only be used to initialize an empty database." );

if( fc::is_regular_file( my->blocks_dir / "blocks.log" )) {
auto block_log_genesis = block_log::extract_genesis_state(my->blocks_dir);
if( block_log_genesis ) {
const auto& block_log_chain_id = block_log_genesis->compute_chain_id();
EOS_ASSERT( *chain_id == block_log_chain_id,
plugin_config_exception,
EOS_ASSERT( *chain_id == block_log_chain_id, plugin_config_exception,
"snapshot chain ID (${snapshot_chain_id}) does not match the chain ID from the genesis state in the block log (${block_log_chain_id})",
("snapshot_chain_id", *chain_id)
("block_log_chain_id", block_log_chain_id)
);
} else {
const auto& block_log_chain_id = block_log::extract_chain_id(my->blocks_dir);
EOS_ASSERT( *chain_id == block_log_chain_id,
plugin_config_exception,
EOS_ASSERT( *chain_id == block_log_chain_id, plugin_config_exception,
"snapshot chain ID (${snapshot_chain_id}) does not match the chain ID (${block_log_chain_id}) in the block log",
("snapshot_chain_id", *chain_id)
("block_log_chain_id", block_log_chain_id)
Expand Down Expand Up @@ -1324,13 +1307,14 @@ void chain_plugin::plugin_startup()
auto shutdown = [](){ return app().quit(); };
auto check_shutdown = [](){ return app().is_quiting(); };
if (my->snapshot_path) {
auto infile = std::ifstream(my->snapshot_path->generic_string(), (std::ios::in | std::ios::binary));
auto reader = std::make_shared<istream_snapshot_reader>(infile);
my->chain->startup(shutdown, check_shutdown, reader);
infile.close();
} else if (my->json_snapshot_path) {
auto reader = std::make_shared<istream_json_snapshot_reader>(fc::path(*my->json_snapshot_path));
my->chain->startup(shutdown, check_shutdown, reader);
if( my->snapshot_json_reader_ptr ) {
my->chain->startup(shutdown, check_shutdown, my->snapshot_json_reader_ptr);
my->snapshot_json_reader_ptr.reset();
} else {
auto infile = std::ifstream( my->snapshot_path->generic_string(), (std::ios::in | std::ios::binary) );
auto reader = std::make_shared<istream_snapshot_reader>( infile );
my->chain->startup( shutdown, check_shutdown, reader );
}
} else if( my->genesis ) {
my->chain->startup(shutdown, check_shutdown, *my->genesis);
} else {
Expand Down