Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.0.2 -> main] P2P: Allow irreversible mode syncing to continue when LIB is paused #805

Merged
merged 17 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ struct controller_impl {
const chain_id_type chain_id; // read by thread_pool threads, value will not be changed
bool replaying = false;
bool is_producer_node = false; // true if node is configured as a block producer
db_read_mode read_mode = db_read_mode::HEAD;
const db_read_mode read_mode;
bool in_trx_requiring_checks = false; ///< if true, checks that are normally skipped on replay (e.g. auth checks) cannot be skipped
std::optional<fc::microseconds> subjective_cpu_leeway;
bool trusted_producer_light_validation = false;
Expand Down Expand Up @@ -4216,7 +4216,7 @@ struct controller_impl {

block_state_ptr claimed_bsp = fork_db_fetch_bsp_on_branch_by_num( bsp_in->previous(), qc_ext.qc.block_num );
if( !claimed_bsp ) {
dlog("qc not found in forkdb, qc: ${qc} for block ${bn} ${id}, previous ${p}",
dlog("block state of claimed qc not found in forkdb, qc: ${qc} for block ${bn} ${id}, previous ${p}",
("qc", qc_ext.qc.to_qc_claim())("bn", bsp_in->block_num())("id", bsp_in->id())("p", bsp_in->previous()));
return;
}
Expand Down
2 changes: 2 additions & 0 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,10 @@ namespace eosio::chain {

chain_id_type get_chain_id()const;

// thread safe
db_read_mode get_read_mode()const;
validation_mode get_validation_mode()const;

/// @return true if terminate-at-block reached
/// not-thread-safe
bool should_terminate() const;
Expand Down
65 changes: 48 additions & 17 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ namespace eosio {
bool connected() const;
bool closed() const; // socket is not open or is closed or closing, thread safe
bool current() const;
bool should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num) const;
bool should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num, uint32_t sync_fetch_span) const;

/// @param reconnect true if we should try and reconnect immediately after close
/// @param shutdown true only if plugin is shutting down
Expand Down Expand Up @@ -1421,15 +1421,16 @@ namespace eosio {
}

// thread safe
bool connection::should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num) const {
bool connection::should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num, uint32_t sync_fetch_span) const {
fc_dlog(logger, "id: ${id} blocks conn: ${t} current: ${c} socket_open: ${so} syncing from us: ${s} state: ${con} peer_start_block: ${sb} peer_fhead: ${h} ping: ${p}us no_retry: ${g}",
("id", connection_id)("t", is_blocks_connection())
("c", current())("so", socket_is_open())("s", peer_syncing_from_us.load())("con", state_str(state()))
("sb", peer_start_block_num.load())("h", peer_fork_head_block_num.load())("p", get_peer_ping_time_ns()/1000)("g", reason_str(no_retry)));
if (is_blocks_connection() && current()) {
if (no_retry == go_away_reason::no_reason) {
if (peer_start_block_num <= sync_next_expected_num) { // has blocks we want
if (peer_fork_head_block_num >= sync_known_lib_num) { // is in sync
auto needed_end = std::min(sync_next_expected_num + sync_fetch_span, sync_known_lib_num);
if (peer_fork_head_block_num >= needed_end) { // has lib blocks
return true;
}
}
Expand Down Expand Up @@ -2052,8 +2053,9 @@ namespace eosio {
deque<connection_ptr> conns;
my_impl->connections.for_each_block_connection([sync_next_expected_num = sync_next_expected_num,
sync_known_lib_num = sync_known_lib_num,
sync_fetch_span = sync_fetch_span,
&conns](const auto& c) {
if (c->should_sync_from(sync_next_expected_num, sync_known_lib_num)) {
if (c->should_sync_from(sync_next_expected_num, sync_known_lib_num, sync_fetch_span)) {
conns.push_back(c);
}
});
Expand Down Expand Up @@ -2174,14 +2176,39 @@ namespace eosio {
sync_next_expected_num < sync_last_requested_num );
}

// called from c's connection strand
bool sync_manager::is_sync_request_ahead_allowed(block_num_type blk_num) const REQUIRES(sync_mtx) {
if (blk_num >= sync_last_requested_num) {
// do not allow to get too far ahead (sync_fetch_span) of chain head
// use chain head instead of fork head so we do not get too far ahead of applied blocks
uint32_t head = my_impl->get_chain_head_num();
if (blk_num < head + sync_fetch_span)
uint32_t head_num = my_impl->get_chain_head_num();
block_num_type num_blocks_not_applied = blk_num > head_num ? blk_num - head_num : 0;
if (num_blocks_not_applied < sync_fetch_span) {
fc_dlog(logger, "sync ahead allowed past sync-fetch-span ${sp}, block ${bn} chain_lib ${cl}, forkdb size ${s}",
("bn", blk_num)("sp", sync_fetch_span)("cl", head_num)("s", my_impl->chain_plug->chain().fork_db_size()));
return true;
}

controller& cc = my_impl->chain_plug->chain();
if (cc.get_read_mode() == db_read_mode::IRREVERSIBLE) {
auto forkdb_head = cc.fork_db_head();
auto calculated_lib = forkdb_head.irreversible_blocknum();
auto num_blocks_that_can_be_applied = calculated_lib > head_num ? calculated_lib - head_num : 0;
if (num_blocks_that_can_be_applied < sync_fetch_span) {
if (head_num )
fc_ilog(logger, "sync ahead allowed past sync-fetch-span ${sp}, block ${bn} for paused LIB ${l}, chain_lib ${cl}, forkdb size ${s}",
("bn", blk_num)("sp", sync_fetch_span)("l", calculated_lib)("cl", head_num)("s", cc.fork_db_size()));
return true;
}
}

fc_dlog(logger, "sync ahead not allowed. block ${bn}, head ${h}, fhead ${fh}, fhead->lib ${fl}, sync-fetch-span ${sp}, forkdb size ${s}",
("bn", blk_num)("h", head_num)("fh", cc.fork_db_head().block_num())("fl", cc.fork_db_head().irreversible_blocknum())
("sp", sync_fetch_span)("s", cc.fork_db_size()));
}

fc_dlog(logger, "sync ahead not allowed. block ${bn}, sync_last_requested_num ${lrn}, sync-fetch-span ${s}",
("bn", blk_num)("lrn", sync_last_requested_num)("s", sync_fetch_span));
return false;
}

Expand All @@ -2200,7 +2227,10 @@ namespace eosio {
return;
}

if( sync_state != lib_catchup || !sync_recently_active()) {
stages current_sync_state = sync_state;
if( current_sync_state != lib_catchup || !sync_recently_active()) {
peer_dlog(c, "requesting next chuck, set to lib_catchup and request_next_chunk, sync_state ${s}, sync_next_expected_num ${nen}",
("s", stage_str(current_sync_state))("nen", sync_next_expected_num));
set_state( lib_catchup );
sync_last_requested_num = 0;
sync_next_expected_num = chain_info.lib_num + 1;
Expand Down Expand Up @@ -2233,14 +2263,14 @@ namespace eosio {
// called from connection strand
void sync_manager::sync_timeout(const connection_ptr& c, const boost::system::error_code& ec) {
if( !ec ) {
peer_dlog(c, "sync timeout");
peer_dlog(c, "sync timed out");
sync_reassign_fetch( c );
close(true);
} else if( ec != boost::asio::error::operation_aborted ) { // don't log on operation_aborted, called on destroy
peer_elog( c, "setting timer for sync request got error ${ec}", ("ec", ec.message()) );
}
--sync_timers_active;
peer_dlog(c, "sync timeout, active_timers ${t}", ("t", sync_timers_active.load()));
peer_dlog(c, "sync active_timers ${t}", ("t", sync_timers_active.load()));
}

// called from connection strand
Expand Down Expand Up @@ -2569,14 +2599,15 @@ namespace eosio {
}
} else { // blk_applied
if (blk_num >= sync_last_requested_num) {
// Did not request blocks ahead, likely because too far ahead of head
// Do not restrict sync_fetch_span as we want max-reversible-blocks to shut down the node for applied blocks
fc_dlog(logger, "Requesting blocks, head: ${h} fhead ${fh} blk_num: ${bn} sync_next_expected_num ${nen} "
"sync_last_requested_num: ${lrn}, sync_last_requested_block: ${lrb}",
("h", my_impl->get_chain_head_num())("fh", my_impl->get_fork_head_num())
("bn", blk_num)("nen", sync_next_expected_num)
("lrn", sync_last_requested_num)("lrb", c->sync_last_requested_block));
request_next_chunk();
if (is_sync_request_ahead_allowed(blk_num)) {
// Did not request blocks ahead, likely because too far ahead of head, or in irreversible mode
fc_dlog(logger, "Requesting blocks, head: ${h} fhead ${fh} blk_num: ${bn} sync_next_expected_num ${nen} "
"sync_last_requested_num: ${lrn}, sync_last_requested_block: ${lrb}",
("h", my_impl->get_chain_head_num())("fh", my_impl->get_fork_head_num())
("bn", blk_num)("nen", sync_next_expected_num)
("lrn", sync_last_requested_num)("lrb", c->sync_last_requested_block));
request_next_chunk();
}
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions tests/nodeos_startup_catchup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
errorExit=Utils.errorExit

appArgs=AppArgs()
extraArgs = appArgs.add(flag="--catchup-count", type=int, help="How many catchup-nodes to launch", default=12)
extraArgs = appArgs.add(flag="--catchup-count", type=int, help="How many catchup-nodes to launch", default=16)
extraArgs = appArgs.add(flag="--txn-gen-nodes", type=int, help="How many transaction generator nodes", default=2)
args = TestHelper.parse_args({"--dump-error-details","--keep-logs","-v","--leave-running",
"--activate-if","-p","--wallet-port","--unshared"}, applicationSpecificArgs=appArgs)
Expand Down Expand Up @@ -64,11 +64,15 @@
specificExtraNodeosArgs[pnodes+4] = f' --sync-fetch-span 89 '
specificExtraNodeosArgs[pnodes+5] = f' --sync-fetch-span 377 '
specificExtraNodeosArgs[pnodes+6] = f' --sync-fetch-span 1597 '
specificExtraNodeosArgs[pnodes+7] = f' --sync-fetch-span 1597 '
specificExtraNodeosArgs[pnodes+7] = f' --sync-fetch-span 2500 '
specificExtraNodeosArgs[pnodes+8] = f' --sync-fetch-span 6765 '
specificExtraNodeosArgs[pnodes+9] = f' --sync-fetch-span 28657 '
specificExtraNodeosArgs[pnodes+10] = f' --sync-fetch-span 89 --read-mode irreversible '
specificExtraNodeosArgs[pnodes+11] = f' --sync-fetch-span 377 --read-mode irreversible '
specificExtraNodeosArgs[pnodes+10] = f' ' # default
specificExtraNodeosArgs[pnodes+11] = f' --sync-fetch-span 1 --read-mode irreversible '
specificExtraNodeosArgs[pnodes+12] = f' --sync-fetch-span 5 --read-mode irreversible '
specificExtraNodeosArgs[pnodes+13] = f' --sync-fetch-span 89 --read-mode irreversible '
specificExtraNodeosArgs[pnodes+14] = f' --sync-fetch-span 200 --read-mode irreversible '
specificExtraNodeosArgs[pnodes+15] = f' --sync-fetch-span 2500 --read-mode irreversible '
if cluster.launch(prodCount=prodCount, specificExtraNodeosArgs=specificExtraNodeosArgs, activateIF=activateIF, onlyBios=False,
pnodes=pnodes, totalNodes=totalNodes, totalProducers=pnodes*prodCount, unstartedNodes=catchupCount,
loadSystemContract=True, maximumP2pPerHost=totalNodes+trxGeneratorCnt) is False:
Expand Down
9 changes: 8 additions & 1 deletion unittests/savanna_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ node_t::node_t(size_t node_idx, cluster_t& cluster, setup_policy policy /* = set
: tester(policy)
, _node_idx(node_idx)
, _last_vote({}, false)
, _cluster(cluster)
{

// since we are creating forks, finalizers may be locked on another fork and unable to vote.
Expand Down Expand Up @@ -57,4 +58,10 @@ node_t::node_t(size_t node_idx, cluster_t& cluster, setup_policy policy /* = set

node_t::~node_t() {}

}
void node_t::propagate_delayed_votes_to(const node_t& to) {
for (auto& vote : _delayed_votes)
if (to.is_open())
to.control->process_vote_message(++_cluster._connection_id, vote);
}

}
11 changes: 8 additions & 3 deletions unittests/savanna_cluster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ namespace savanna_cluster {
std::function<void(const block_signal_params&)> _accepted_block_cb;
std::function<void(const vote_signal_params&)> _voted_block_cb;

cluster_t& _cluster;

public:
node_t(size_t node_idx, cluster_t& cluster, setup_policy policy = setup_policy::none);

Expand All @@ -127,6 +129,8 @@ namespace savanna_cluster {

size_t& vote_delay() { return _vote_delay; }

void propagate_delayed_votes_to(const node_t& to);

const vote_t& last_vote() const { return _last_vote; }

void set_node_finalizers(std::span<const account_name> names) {
Expand Down Expand Up @@ -545,7 +549,8 @@ namespace savanna_cluster {
// -------------------
void print(const char* name, const signed_block_ptr& b) const {
if (_debug_mode)
std::cout << name << " (" << b->block_num() << ") timestamp = " << b->timestamp.slot << ", id = " << b->calculate_id().str().substr(8, 16)
std::cout << name << " (" << b->block_num() << ") timestamp = " << b->timestamp.slot
<< ", id = " << b->calculate_id().str().substr(8, 16)
<< ", previous = " << b->previous.str().substr(8, 16) << '\n';
}

Expand All @@ -563,14 +568,14 @@ namespace savanna_cluster {
peers_t _peers;
size_t _num_nodes;
bool _shutting_down {false};
uint32_t _connection_id = 0;

friend node_t;

void dispatch_vote_to_peers(size_t node_idx, skip_self_t skip_self, const vote_message_ptr& msg) {
static uint32_t connection_id = 0;
for_each_peer(node_idx, skip_self, [&](node_t& n) {
if (n.is_open())
n.control->process_vote_message(++connection_id, msg);
n.control->process_vote_message(++_connection_id, msg);
});
}

Expand Down
Loading