Skip to content

Commit

Permalink
Move state init/load logic to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
serban300 committed Jan 25, 2024
1 parent f6427e4 commit ba24c52
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 166 deletions.
160 changes: 4 additions & 156 deletions substrate/client/consensus/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ use crate::{
error::Error,
import::BeefyBlockImport,
metrics::register_metrics,
round::Rounds,
worker::PersistedState,
};
use futures::{stream::Fuse, StreamExt};
use log::{debug, error, info, warn};
Expand All @@ -48,17 +46,11 @@ use sp_blockchain::{
use sp_consensus::{Error as ConsensusError, SyncOracle};
use sp_consensus_beefy::{
ecdsa_crypto::AuthorityId, BeefyApi, MmrRootHash, PayloadProvider, ValidatorSet,
BEEFY_ENGINE_ID,
};
use sp_keystore::KeystorePtr;
use sp_mmr_primitives::MmrApi;
use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero};
use std::{
collections::{BTreeMap, VecDeque},
marker::PhantomData,
sync::Arc,
time::Duration,
};
use std::{collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration};

mod aux_schema;
mod error;
Expand Down Expand Up @@ -318,14 +310,9 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
_phantom: Default::default(),
};

let persisted_state = match load_or_init_voter_state(
&*backend,
&*runtime,
beefy_genesis,
best_grandpa,
min_block_delta,
)
.await
let persisted_state = match worker_base
.load_or_init_state(beefy_genesis, best_grandpa, min_block_delta)
.await
{
Ok(state) => state,
Err(e) => {
Expand Down Expand Up @@ -374,43 +361,6 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
}
}

async fn load_or_init_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
beefy_genesis: NumberFor<B>,
best_grandpa: <B as Block>::Header,
min_block_delta: u32,
) -> Result<PersistedState<B>, Error>
where
B: Block,
BE: Backend<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B, AuthorityId>,
{
// Initialize voter state from AUX DB if compatible.
if let Some(mut state) = crate::aux_schema::load_persistent(backend)?
// Verify state pallet genesis matches runtime.
.filter(|state| state.pallet_genesis() == beefy_genesis)
{
// Overwrite persisted state with current best GRANDPA block.
state.set_best_grandpa(best_grandpa.clone());
// Overwrite persisted data with newly provided `min_block_delta`.
state.set_min_block_delta(min_block_delta);
info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);

// Make sure that all the headers that we need have been synced.
let mut header = best_grandpa.clone();
while *header.number() > state.best_beefy() {
header =
wait_for_parent_header(backend.blockchain(), header, HEADER_SYNC_DELAY).await?;
}
return Ok(state)
}

// No valid voter-state persisted, re-initialize from pallet genesis.
initialize_voter_state(backend, runtime, beefy_genesis, best_grandpa, min_block_delta).await
}

/// Waits until the parent header of `current` is available and returns it.
///
/// When the node uses GRANDPA warp sync it initially downloads only the mandatory GRANDPA headers.
Expand Down Expand Up @@ -450,108 +400,6 @@ where
}
}

// If no persisted state present, walk back the chain from first GRANDPA notification to either:
// - latest BEEFY finalized block, or if none found on the way,
// - BEEFY pallet genesis;
// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to finalize.
async fn initialize_voter_state<B, BE, R>(
backend: &BE,
runtime: &R,
beefy_genesis: NumberFor<B>,
best_grandpa: <B as Block>::Header,
min_block_delta: u32,
) -> Result<PersistedState<B>, Error>
where
B: Block,
BE: Backend<B>,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B, AuthorityId>,
{
let blockchain = backend.blockchain();

let beefy_genesis = runtime
.runtime_api()
.beefy_genesis(best_grandpa.hash())
.ok()
.flatten()
.filter(|genesis| *genesis == beefy_genesis)
.ok_or_else(|| Error::Backend("BEEFY pallet expected to be active.".into()))?;
// Walk back the imported blocks and initialize voter either, at the last block with
// a BEEFY justification, or at pallet genesis block; voter will resume from there.
let mut sessions = VecDeque::new();
let mut header = best_grandpa.clone();
let state = loop {
if let Some(true) = blockchain
.justifications(header.hash())
.ok()
.flatten()
.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
{
info!(
target: LOG_TARGET,
"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
*header.number()
);
let best_beefy = *header.number();
// If no session boundaries detected so far, just initialize new rounds here.
if sessions.is_empty() {
let active_set = expect_validator_set(runtime, backend, &header).await?;
let mut rounds = Rounds::new(best_beefy, active_set);
// Mark the round as already finalized.
rounds.conclude(best_beefy);
sessions.push_front(rounds);
}
let state = PersistedState::checked_new(
best_grandpa,
best_beefy,
sessions,
min_block_delta,
beefy_genesis,
)
.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?;
break state
}

if *header.number() == beefy_genesis {
// We've reached BEEFY genesis, initialize voter here.
let genesis_set = expect_validator_set(runtime, backend, &header).await?;
info!(
target: LOG_TARGET,
"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
Starting voting rounds at block {:?}, genesis validator set {:?}.",
beefy_genesis,
genesis_set,
);

sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
break PersistedState::checked_new(
best_grandpa,
Zero::zero(),
sessions,
min_block_delta,
beefy_genesis,
)
.ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?
}

if let Some(active) = worker::find_authorities_change::<B>(&header) {
info!(
target: LOG_TARGET,
"🥩 Marking block {:?} as BEEFY Mandatory.",
*header.number()
);
sessions.push_front(Rounds::new(*header.number(), active));
}

// Move up the chain.
header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?;
};

aux_schema::write_current_version(backend)?;
aux_schema::write_voter_state(backend, &state)?;
Ok(state)
}

/// Wait for BEEFY runtime pallet to be available, return active validator set.
/// Should be called only once during worker initialization.
async fn wait_for_runtime_pallet<B, R>(
Expand Down
37 changes: 28 additions & 9 deletions substrate/client/consensus/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ use crate::{
error::Error,
gossip_protocol_name,
justification::*,
load_or_init_voter_state, wait_for_runtime_pallet, BeefyRPCLinks, BeefyVoterLinks, KnownPeers,
PersistedState,
wait_for_runtime_pallet,
worker::{BeefyWorkerBase, PersistedState},
BeefyRPCLinks, BeefyVoterLinks, KnownPeers,
};
use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt};
use parking_lot::Mutex;
Expand Down Expand Up @@ -379,7 +380,14 @@ async fn voter_init_setup(
);
let (beefy_genesis, best_grandpa) =
wait_for_runtime_pallet(api, &mut gossip_engine, finality).await.unwrap();
load_or_init_voter_state(&*backend, api, beefy_genesis, best_grandpa, 1).await
let worker_base = BeefyWorkerBase {
backend,
runtime: Arc::new(api.clone()),
key_store: None.into(),
metrics: None,
_phantom: Default::default(),
};
worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await
}

// Spawns beefy voters. Returns a future to spawn on the runtime.
Expand Down Expand Up @@ -1073,9 +1081,15 @@ async fn should_initialize_voter_at_custom_genesis() {
);
let (beefy_genesis, best_grandpa) =
wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
let persisted_state = load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1)
.await
.unwrap();
let worker_base = BeefyWorkerBase {
backend: backend.clone(),
runtime: Arc::new(api),
key_store: None.into(),
metrics: None,
_phantom: Default::default(),
};
let persisted_state =
worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();

// Test initialization at session boundary.
// verify voter initialized with single session starting at block `custom_pallet_genesis` (7)
Expand Down Expand Up @@ -1108,10 +1122,15 @@ async fn should_initialize_voter_at_custom_genesis() {
// the network state persists and uses the old `GossipEngine` initialized for `peer(0)`
let (beefy_genesis, best_grandpa) =
wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap();
let worker_base = BeefyWorkerBase {
backend: backend.clone(),
runtime: Arc::new(api),
key_store: None.into(),
metrics: None,
_phantom: Default::default(),
};
let new_persisted_state =
load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1)
.await
.unwrap();
worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap();

// verify voter initialized with single session starting at block `new_pallet_genesis` (10)
let sessions = new_persisted_state.voting_oracle().sessions();
Expand Down
Loading

0 comments on commit ba24c52

Please sign in to comment.