From b177c2860e6d75d189e66185954a283db000f7ef Mon Sep 17 00:00:00 2001 From: Serban Iorga Date: Thu, 1 Feb 2024 12:24:16 +0100 Subject: [PATCH] [BEEFY] Avoid missing voting sessions during node restart (#3074) Related to https://github.com/paritytech/polkadot-sdk/issues/3003 and https://github.com/paritytech/polkadot-sdk/issues/2842 --------- Co-authored-by: Adrian Catangiu --- .../client/consensus/beefy/src/aux_schema.rs | 20 +- .../incoming_requests_handler.rs | 4 +- .../outgoing_requests_engine.rs | 10 +- .../client/consensus/beefy/src/import.rs | 4 +- substrate/client/consensus/beefy/src/lib.rs | 189 ++-------- .../client/consensus/beefy/src/metrics.rs | 12 +- substrate/client/consensus/beefy/src/tests.rs | 138 ++++++- .../client/consensus/beefy/src/worker.rs | 357 ++++++++++++++---- 8 files changed, 456 insertions(+), 278 deletions(-) diff --git a/substrate/client/consensus/beefy/src/aux_schema.rs b/substrate/client/consensus/beefy/src/aux_schema.rs index 409eb30d09ab..944a00f8372f 100644 --- a/substrate/client/consensus/beefy/src/aux_schema.rs +++ b/substrate/client/consensus/beefy/src/aux_schema.rs @@ -18,11 +18,10 @@ //! Schema for BEEFY state persisted in the aux-db. -use crate::{worker::PersistedState, LOG_TARGET}; +use crate::{error::Error, worker::PersistedState, LOG_TARGET}; use codec::{Decode, Encode}; use log::{info, trace}; use sc_client_api::{backend::AuxStore, Backend}; -use sp_blockchain::{Error as ClientError, Result as ClientResult}; use sp_runtime::traits::Block as BlockT; const VERSION_KEY: &[u8] = b"beefy_auxschema_version"; @@ -30,31 +29,33 @@ const WORKER_STATE_KEY: &[u8] = b"beefy_voter_state"; const CURRENT_VERSION: u32 = 4; -pub(crate) fn write_current_version(backend: &BE) -> ClientResult<()> { +pub(crate) fn write_current_version(backend: &BE) -> Result<(), Error> { info!(target: LOG_TARGET, "🥩 write aux schema version {:?}", CURRENT_VERSION); AuxStore::insert_aux(backend, &[(VERSION_KEY, CURRENT_VERSION.encode().as_slice())], &[]) + .map_err(|e| Error::Backend(e.to_string())) } /// Write voter state. pub(crate) fn write_voter_state( backend: &BE, state: &PersistedState, -) -> ClientResult<()> { +) -> Result<(), Error> { trace!(target: LOG_TARGET, "🥩 persisting {:?}", state); AuxStore::insert_aux(backend, &[(WORKER_STATE_KEY, state.encode().as_slice())], &[]) + .map_err(|e| Error::Backend(e.to_string())) } -fn load_decode(backend: &BE, key: &[u8]) -> ClientResult> { - match backend.get_aux(key)? { +fn load_decode(backend: &BE, key: &[u8]) -> Result, Error> { + match backend.get_aux(key).map_err(|e| Error::Backend(e.to_string()))? { None => Ok(None), Some(t) => T::decode(&mut &t[..]) - .map_err(|e| ClientError::Backend(format!("BEEFY DB is corrupted: {}", e))) + .map_err(|e| Error::Backend(format!("BEEFY DB is corrupted: {}", e))) .map(Some), } } /// Load or initialize persistent data from backend. -pub(crate) fn load_persistent(backend: &BE) -> ClientResult>> +pub(crate) fn load_persistent(backend: &BE) -> Result>, Error> where B: BlockT, BE: Backend, @@ -65,8 +66,7 @@ where None => (), Some(1) | Some(2) | Some(3) => (), // versions 1, 2 & 3 are obsolete and should be ignored Some(4) => return load_decode::<_, PersistedState>(backend, WORKER_STATE_KEY), - other => - return Err(ClientError::Backend(format!("Unsupported BEEFY DB version: {:?}", other))), + other => return Err(Error::Backend(format!("Unsupported BEEFY DB version: {:?}", other))), } // No persistent state found in DB. diff --git a/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs b/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs index 71c5c49b3690..d856e9748a10 100644 --- a/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs +++ b/substrate/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs @@ -201,7 +201,7 @@ where let peer = request.peer; match self.handle_request(request) { Ok(()) => { - metric_inc!(self, beefy_successful_justification_responses); + metric_inc!(self.metrics, beefy_successful_justification_responses); debug!( target: BEEFY_SYNC_LOG_TARGET, "🥩 Handled BEEFY justification request from {:?}.", peer @@ -209,7 +209,7 @@ where }, Err(e) => { // peer reputation changes already applied in `self.handle_request()` - metric_inc!(self, beefy_failed_justification_responses); + metric_inc!(self.metrics, beefy_failed_justification_responses); debug!( target: BEEFY_SYNC_LOG_TARGET, "🥩 Failed to handle BEEFY justification request from {:?}: {}", peer, e, diff --git a/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs b/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs index 7121410ea109..992b9fa08c09 100644 --- a/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs +++ b/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs @@ -148,7 +148,7 @@ impl OnDemandJustificationsEngine { if let Some(peer) = self.try_next_peer() { self.request_from_peer(peer, RequestInfo { block, active_set }); } else { - metric_inc!(self, beefy_on_demand_justification_no_peer_to_request_from); + metric_inc!(self.metrics, beefy_on_demand_justification_no_peer_to_request_from); debug!( target: BEEFY_SYNC_LOG_TARGET, "🥩 no good peers to request justif #{:?} from", block @@ -194,13 +194,13 @@ impl OnDemandJustificationsEngine { ); match e { RequestFailure::Refused => { - metric_inc!(self, beefy_on_demand_justification_peer_refused); + metric_inc!(self.metrics, beefy_on_demand_justification_peer_refused); let peer_report = PeerReport { who: *peer, cost_benefit: cost::REFUSAL_RESPONSE }; Error::InvalidResponse(peer_report) }, _ => { - metric_inc!(self, beefy_on_demand_justification_peer_error); + metric_inc!(self.metrics, beefy_on_demand_justification_peer_error); Error::ResponseError }, } @@ -212,7 +212,7 @@ impl OnDemandJustificationsEngine { &req_info.active_set, ) .map_err(|(err, signatures_checked)| { - metric_inc!(self, beefy_on_demand_justification_invalid_proof); + metric_inc!(self.metrics, beefy_on_demand_justification_invalid_proof); debug!( target: BEEFY_SYNC_LOG_TARGET, "🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}", @@ -261,7 +261,7 @@ impl OnDemandJustificationsEngine { } }, Ok(proof) => { - metric_inc!(self, beefy_on_demand_justification_good_proof); + metric_inc!(self.metrics, beefy_on_demand_justification_good_proof); debug!( target: BEEFY_SYNC_LOG_TARGET, "🥩 received valid on-demand justif #{:?} from {:?}", block, peer diff --git a/substrate/client/consensus/beefy/src/import.rs b/substrate/client/consensus/beefy/src/import.rs index 6eced17b58ff..fc19ecc30142 100644 --- a/substrate/client/consensus/beefy/src/import.rs +++ b/substrate/client/consensus/beefy/src/import.rs @@ -165,7 +165,7 @@ where self.justification_sender .notify(|| Ok::<_, ()>(proof)) .expect("the closure always returns Ok; qed."); - metric_inc!(self, beefy_good_justification_imports); + metric_inc!(self.metrics, beefy_good_justification_imports); }, Err(err) => { debug!( @@ -174,7 +174,7 @@ where number, err, ); - metric_inc!(self, beefy_bad_justification_imports); + metric_inc!(self.metrics, beefy_bad_justification_imports); }, } }, diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs index 2e2e22288e3b..1f10d8099d83 100644 --- a/substrate/client/consensus/beefy/src/lib.rs +++ b/substrate/client/consensus/beefy/src/lib.rs @@ -27,10 +27,9 @@ use crate::{ outgoing_requests_engine::OnDemandJustificationsEngine, BeefyJustifsRequestHandler, }, }, + error::Error, import::BeefyBlockImport, metrics::register_metrics, - round::Rounds, - worker::PersistedState, }; use futures::{stream::Fuse, StreamExt}; use log::{debug, error, info, warn}; @@ -47,17 +46,11 @@ use sp_blockchain::{ use sp_consensus::{Error as ConsensusError, SyncOracle}; use sp_consensus_beefy::{ ecdsa_crypto::AuthorityId, BeefyApi, MmrRootHash, PayloadProvider, ValidatorSet, - BEEFY_ENGINE_ID, }; use sp_keystore::KeystorePtr; use sp_mmr_primitives::MmrApi; use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero}; -use std::{ - collections::{BTreeMap, VecDeque}, - marker::PhantomData, - sync::Arc, - time::Duration, -}; +use std::{collections::BTreeMap, marker::PhantomData, sync::Arc, time::Duration}; mod aux_schema; mod error; @@ -309,14 +302,17 @@ pub async fn start_beefy_gadget( }, }; - let persisted_state = match load_or_init_voter_state( - &*backend, - &*runtime, - beefy_genesis, - best_grandpa, - min_block_delta, - ) - .await + let mut worker_base = worker::BeefyWorkerBase { + backend: backend.clone(), + runtime: runtime.clone(), + key_store: key_store.clone().into(), + metrics: metrics.clone(), + _phantom: Default::default(), + }; + + let persisted_state = match worker_base + .load_or_init_state(beefy_genesis, best_grandpa, min_block_delta) + .await { Ok(state) => state, Err(e) => { @@ -334,14 +330,11 @@ pub async fn start_beefy_gadget( } let worker = worker::BeefyWorker { - backend: backend.clone(), + base: worker_base, payload_provider: payload_provider.clone(), - runtime: runtime.clone(), sync: sync.clone(), - key_store: key_store.clone().into(), comms: beefy_comms, links: links.clone(), - metrics: metrics.clone(), pending_justifications: BTreeMap::new(), persisted_state, }; @@ -368,43 +361,6 @@ pub async fn start_beefy_gadget( } } -async fn load_or_init_voter_state( - backend: &BE, - runtime: &R, - beefy_genesis: NumberFor, - best_grandpa: ::Header, - min_block_delta: u32, -) -> ClientResult> -where - B: Block, - BE: Backend, - R: ProvideRuntimeApi, - R::Api: BeefyApi, -{ - // Initialize voter state from AUX DB if compatible. - if let Some(mut state) = crate::aux_schema::load_persistent(backend)? - // Verify state pallet genesis matches runtime. - .filter(|state| state.pallet_genesis() == beefy_genesis) - { - // Overwrite persisted state with current best GRANDPA block. - state.set_best_grandpa(best_grandpa.clone()); - // Overwrite persisted data with newly provided `min_block_delta`. - state.set_min_block_delta(min_block_delta); - info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state); - - // Make sure that all the headers that we need have been synced. - let mut header = best_grandpa.clone(); - while *header.number() > state.best_beefy() { - header = - wait_for_parent_header(backend.blockchain(), header, HEADER_SYNC_DELAY).await?; - } - return Ok(state) - } - - // No valid voter-state persisted, re-initialize from pallet genesis. - initialize_voter_state(backend, runtime, beefy_genesis, best_grandpa, min_block_delta).await -} - /// Waits until the parent header of `current` is available and returns it. /// /// When the node uses GRANDPA warp sync it initially downloads only the mandatory GRANDPA headers. @@ -415,7 +371,7 @@ async fn wait_for_parent_header( blockchain: &BC, current: ::Header, delay: Duration, -) -> ClientResult<::Header> +) -> Result<::Header, Error> where B: Block, BC: BlockchainBackend, @@ -423,10 +379,13 @@ where if *current.number() == Zero::zero() { let msg = format!("header {} is Genesis, there is no parent for it", current.hash()); warn!(target: LOG_TARGET, "{}", msg); - return Err(ClientError::UnknownBlock(msg)) + return Err(Error::Backend(msg)); } loop { - match blockchain.header(*current.parent_hash())? { + match blockchain + .header(*current.parent_hash()) + .map_err(|e| Error::Backend(e.to_string()))? + { Some(parent) => return Ok(parent), None => { info!( @@ -441,108 +400,6 @@ where } } -// If no persisted state present, walk back the chain from first GRANDPA notification to either: -// - latest BEEFY finalized block, or if none found on the way, -// - BEEFY pallet genesis; -// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to finalize. -async fn initialize_voter_state( - backend: &BE, - runtime: &R, - beefy_genesis: NumberFor, - best_grandpa: ::Header, - min_block_delta: u32, -) -> ClientResult> -where - B: Block, - BE: Backend, - R: ProvideRuntimeApi, - R::Api: BeefyApi, -{ - let blockchain = backend.blockchain(); - - let beefy_genesis = runtime - .runtime_api() - .beefy_genesis(best_grandpa.hash()) - .ok() - .flatten() - .filter(|genesis| *genesis == beefy_genesis) - .ok_or_else(|| ClientError::Backend("BEEFY pallet expected to be active.".into()))?; - // Walk back the imported blocks and initialize voter either, at the last block with - // a BEEFY justification, or at pallet genesis block; voter will resume from there. - let mut sessions = VecDeque::new(); - let mut header = best_grandpa.clone(); - let state = loop { - if let Some(true) = blockchain - .justifications(header.hash()) - .ok() - .flatten() - .map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some()) - { - info!( - target: LOG_TARGET, - "🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.", - *header.number() - ); - let best_beefy = *header.number(); - // If no session boundaries detected so far, just initialize new rounds here. - if sessions.is_empty() { - let active_set = expect_validator_set(runtime, backend, &header).await?; - let mut rounds = Rounds::new(best_beefy, active_set); - // Mark the round as already finalized. - rounds.conclude(best_beefy); - sessions.push_front(rounds); - } - let state = PersistedState::checked_new( - best_grandpa, - best_beefy, - sessions, - min_block_delta, - beefy_genesis, - ) - .ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?; - break state - } - - if *header.number() == beefy_genesis { - // We've reached BEEFY genesis, initialize voter here. - let genesis_set = expect_validator_set(runtime, backend, &header).await?; - info!( - target: LOG_TARGET, - "🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \ - Starting voting rounds at block {:?}, genesis validator set {:?}.", - beefy_genesis, - genesis_set, - ); - - sessions.push_front(Rounds::new(beefy_genesis, genesis_set)); - break PersistedState::checked_new( - best_grandpa, - Zero::zero(), - sessions, - min_block_delta, - beefy_genesis, - ) - .ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))? - } - - if let Some(active) = worker::find_authorities_change::(&header) { - info!( - target: LOG_TARGET, - "🥩 Marking block {:?} as BEEFY Mandatory.", - *header.number() - ); - sessions.push_front(Rounds::new(*header.number(), active)); - } - - // Move up the chain. - header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?; - }; - - aux_schema::write_current_version(backend)?; - aux_schema::write_voter_state(backend, &state)?; - Ok(state) -} - /// Wait for BEEFY runtime pallet to be available, return active validator set. /// Should be called only once during worker initialization. async fn wait_for_runtime_pallet( @@ -595,7 +452,7 @@ async fn expect_validator_set( runtime: &R, backend: &BE, at_header: &B::Header, -) -> ClientResult> +) -> Result, Error> where B: Block, BE: Backend, @@ -618,7 +475,9 @@ where // Move up the chain. Ultimately we'll get it from chain genesis state, or error out // there. None => - header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?, + header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY) + .await + .map_err(|e| Error::Backend(e.to_string()))?, } } } diff --git a/substrate/client/consensus/beefy/src/metrics.rs b/substrate/client/consensus/beefy/src/metrics.rs index 031748bdceab..ef3928d79faa 100644 --- a/substrate/client/consensus/beefy/src/metrics.rs +++ b/substrate/client/consensus/beefy/src/metrics.rs @@ -305,10 +305,10 @@ pub(crate) fn register_metrics( // if expr does not derive `Display`. #[macro_export] macro_rules! metric_set { - ($self:ident, $m:ident, $v:expr) => {{ + ($metrics:expr, $m:ident, $v:expr) => {{ let val: u64 = format!("{}", $v).parse().unwrap(); - if let Some(metrics) = $self.metrics.as_ref() { + if let Some(metrics) = $metrics.as_ref() { metrics.$m.set(val); } }}; @@ -316,8 +316,8 @@ macro_rules! metric_set { #[macro_export] macro_rules! metric_inc { - ($self:ident, $m:ident) => {{ - if let Some(metrics) = $self.metrics.as_ref() { + ($metrics:expr, $m:ident) => {{ + if let Some(metrics) = $metrics.as_ref() { metrics.$m.inc(); } }}; @@ -325,8 +325,8 @@ macro_rules! metric_inc { #[macro_export] macro_rules! metric_get { - ($self:ident, $m:ident) => {{ - $self.metrics.as_ref().map(|metrics| metrics.$m.clone()) + ($metrics:expr, $m:ident) => {{ + $metrics.as_ref().map(|metrics| metrics.$m.clone()) }}; } diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs index 170654325642..7e61e877c1dd 100644 --- a/substrate/client/consensus/beefy/src/tests.rs +++ b/substrate/client/consensus/beefy/src/tests.rs @@ -28,10 +28,12 @@ use crate::{ }, request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler}, }, + error::Error, gossip_protocol_name, justification::*, - load_or_init_voter_state, wait_for_runtime_pallet, BeefyRPCLinks, BeefyVoterLinks, KnownPeers, - PersistedState, + wait_for_runtime_pallet, + worker::{BeefyWorkerBase, PersistedState}, + BeefyRPCLinks, BeefyVoterLinks, KnownPeers, }; use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use parking_lot::Mutex; @@ -363,7 +365,7 @@ async fn voter_init_setup( net: &mut BeefyTestNet, finality: &mut futures::stream::Fuse>, api: &TestApi, -) -> sp_blockchain::Result> { +) -> Result, Error> { let backend = net.peer(0).client().as_backend(); let known_peers = Arc::new(Mutex::new(KnownPeers::new())); let (gossip_validator, _) = GossipValidator::new(known_peers); @@ -378,7 +380,14 @@ async fn voter_init_setup( ); let (beefy_genesis, best_grandpa) = wait_for_runtime_pallet(api, &mut gossip_engine, finality).await.unwrap(); - load_or_init_voter_state(&*backend, api, beefy_genesis, best_grandpa, 1).await + let mut worker_base = BeefyWorkerBase { + backend, + runtime: Arc::new(api.clone()), + key_store: None.into(), + metrics: None, + _phantom: Default::default(), + }; + worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await } // Spawns beefy voters. Returns a future to spawn on the runtime. @@ -1072,9 +1081,15 @@ async fn should_initialize_voter_at_custom_genesis() { ); let (beefy_genesis, best_grandpa) = wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap(); - let persisted_state = load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1) - .await - .unwrap(); + let mut worker_base = BeefyWorkerBase { + backend: backend.clone(), + runtime: Arc::new(api), + key_store: None.into(), + metrics: None, + _phantom: Default::default(), + }; + let persisted_state = + worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap(); // Test initialization at session boundary. // verify voter initialized with single session starting at block `custom_pallet_genesis` (7) @@ -1107,10 +1122,15 @@ async fn should_initialize_voter_at_custom_genesis() { // the network state persists and uses the old `GossipEngine` initialized for `peer(0)` let (beefy_genesis, best_grandpa) = wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap(); + let mut worker_base = BeefyWorkerBase { + backend: backend.clone(), + runtime: Arc::new(api), + key_store: None.into(), + metrics: None, + _phantom: Default::default(), + }; let new_persisted_state = - load_or_init_voter_state(&*backend, &api, beefy_genesis, best_grandpa, 1) - .await - .unwrap(); + worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap(); // verify voter initialized with single session starting at block `new_pallet_genesis` (10) let sessions = new_persisted_state.voting_oracle().sessions(); @@ -1285,6 +1305,104 @@ async fn should_initialize_voter_at_custom_genesis_when_state_unavailable() { assert_eq!(state, persisted_state); } +#[tokio::test] +async fn should_catch_up_when_loading_saved_voter_state() { + let keys = &[BeefyKeyring::Alice]; + let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); + let mut net = BeefyTestNet::new(1); + let backend = net.peer(0).client().as_backend(); + + // push 30 blocks with `AuthorityChange` digests every 10 blocks + let hashes = net.generate_blocks_and_sync(30, 10, &validator_set, false).await; + let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse(); + // finalize 13 without justifications + net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap(); + + let api = TestApi::with_validator_set(&validator_set); + + // load persistent state - nothing in DB, should init at genesis + // + // NOTE: code from `voter_init_setup()` is moved here because the new network event system + // doesn't allow creating a new `GossipEngine` as the notification handle is consumed by the + // first `GossipEngine` + let known_peers = Arc::new(Mutex::new(KnownPeers::new())); + let (gossip_validator, _) = GossipValidator::new(known_peers); + let gossip_validator = Arc::new(gossip_validator); + let mut gossip_engine = sc_network_gossip::GossipEngine::new( + net.peer(0).network_service().clone(), + net.peer(0).sync_service().clone(), + net.peer(0).take_notification_service(&beefy_gossip_proto_name()).unwrap(), + "/beefy/whatever", + gossip_validator, + None, + ); + let (beefy_genesis, best_grandpa) = + wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap(); + let mut worker_base = BeefyWorkerBase { + backend: backend.clone(), + runtime: Arc::new(api.clone()), + key_store: None.into(), + metrics: None, + _phantom: Default::default(), + }; + let persisted_state = + worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap(); + + // Test initialization at session boundary. + // verify voter initialized with two sessions starting at blocks 1 and 10 + let sessions = persisted_state.voting_oracle().sessions(); + assert_eq!(sessions.len(), 2); + assert_eq!(sessions[0].session_start(), 1); + assert_eq!(sessions[1].session_start(), 10); + let rounds = persisted_state.active_round().unwrap(); + assert_eq!(rounds.session_start(), 1); + assert_eq!(rounds.validator_set_id(), validator_set.id()); + + // verify next vote target is mandatory block 1 + assert_eq!(persisted_state.best_beefy(), 0); + assert_eq!(persisted_state.best_grandpa_number(), 13); + assert_eq!(persisted_state.voting_oracle().voting_target(), Some(1)); + + // verify state also saved to db + assert!(verify_persisted_version(&*backend)); + let state = load_persistent(&*backend).unwrap().unwrap(); + assert_eq!(state, persisted_state); + + // now let's consider that the node goes offline, and then it restarts after a while + + // finalize 25 without justifications + net.peer(0).client().as_client().finalize_block(hashes[25], None).unwrap(); + // load persistent state - state preset in DB + // the network state persists and uses the old `GossipEngine` initialized for `peer(0)` + let (beefy_genesis, best_grandpa) = + wait_for_runtime_pallet(&api, &mut gossip_engine, &mut finality).await.unwrap(); + let mut worker_base = BeefyWorkerBase { + backend: backend.clone(), + runtime: Arc::new(api), + key_store: None.into(), + metrics: None, + _phantom: Default::default(), + }; + let persisted_state = + worker_base.load_or_init_state(beefy_genesis, best_grandpa, 1).await.unwrap(); + + // Verify voter initialized with old sessions plus a new one starting at block 20. + // There shouldn't be any duplicates. + let sessions = persisted_state.voting_oracle().sessions(); + assert_eq!(sessions.len(), 3); + assert_eq!(sessions[0].session_start(), 1); + assert_eq!(sessions[1].session_start(), 10); + assert_eq!(sessions[2].session_start(), 20); + let rounds = persisted_state.active_round().unwrap(); + assert_eq!(rounds.session_start(), 1); + assert_eq!(rounds.validator_set_id(), validator_set.id()); + + // verify next vote target is mandatory block 1 + assert_eq!(persisted_state.best_beefy(), 0); + assert_eq!(persisted_state.best_grandpa_number(), 25); + assert_eq!(persisted_state.voting_oracle().voting_target(), Some(1)); +} + #[tokio::test] async fn beefy_finalizing_after_pallet_genesis() { sp_tracing::try_init_simple(); diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs index 26f940f05f18..e67e3e0f76ad 100644 --- a/substrate/client/consensus/beefy/src/worker.rs +++ b/substrate/client/consensus/beefy/src/worker.rs @@ -17,18 +17,20 @@ // along with this program. If not, see . use crate::{ + aux_schema, communication::{ gossip::{proofs_topic, votes_topic, GossipFilterCfg, GossipMessage, GossipValidator}, peers::PeerReport, request_response::outgoing_requests_engine::{OnDemandJustificationsEngine, ResponseInfo}, }, error::Error, + expect_validator_set, justification::BeefyVersionedFinalityProof, keystore::{BeefyKeystore, BeefySignatureHasher}, metric_inc, metric_set, metrics::VoterMetrics, round::{Rounds, VoteImportResult}, - BeefyVoterLinks, LOG_TARGET, + wait_for_parent_header, BeefyVoterLinks, HEADER_SYNC_DELAY, LOG_TARGET, }; use codec::{Codec, Decode, DecodeAll, Encode}; use futures::{stream::Fuse, FutureExt, StreamExt}; @@ -38,6 +40,7 @@ use sc_network_gossip::GossipEngine; use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver}; use sp_api::ProvideRuntimeApi; use sp_arithmetic::traits::{AtLeast32Bit, Saturating}; +use sp_blockchain::Backend as BlockchainBackend; use sp_consensus::SyncOracle; use sp_consensus_beefy::{ check_equivocation_proof, @@ -53,6 +56,7 @@ use sp_runtime::{ use std::{ collections::{BTreeMap, BTreeSet, VecDeque}, fmt::Debug, + marker::PhantomData, sync::Arc, }; @@ -176,6 +180,13 @@ impl VoterOracle { } } + // Check if an observed session can be added to the Oracle. + fn can_add_session(&self, session_start: NumberFor) -> bool { + let latest_known_session_start = + self.sessions.back().map(|session| session.session_start()); + Some(session_start) > latest_known_session_start + } + /// Add new observed session to the Oracle. pub fn add_session(&mut self, rounds: Rounds) { self.sessions.push_back(rounds); @@ -236,12 +247,10 @@ impl VoterOracle { /// Return `Some(number)` if we should be voting on block `number`, /// return `None` if there is no block we should vote on. pub fn voting_target(&self) -> Option> { - let rounds = if let Some(r) = self.sessions.front() { - r - } else { + let rounds = self.sessions.front().or_else(|| { debug!(target: LOG_TARGET, "🥩 No voting round started"); - return None - }; + None + })?; let best_grandpa = *self.best_grandpa_block_header.number(); let best_beefy = self.best_beefy_block; @@ -327,50 +336,171 @@ pub(crate) struct BeefyComms { pub on_demand_justifications: OnDemandJustificationsEngine, } -/// A BEEFY worker plays the BEEFY protocol -pub(crate) struct BeefyWorker { +pub(crate) struct BeefyWorkerBase { // utilities pub backend: Arc, - pub payload_provider: P, pub runtime: Arc, - pub sync: Arc, pub key_store: BeefyKeystore, - // communication (created once, but returned and reused if worker is restarted/reinitialized) - pub comms: BeefyComms, - - // channels - /// Links between the block importer, the background voter and the RPC layer. - pub links: BeefyVoterLinks, - - // voter state /// BEEFY client metrics. pub metrics: Option, - /// Buffer holding justifications for future processing. - pub pending_justifications: BTreeMap, BeefyVersionedFinalityProof>, - /// Persisted voter state. - pub persisted_state: PersistedState, + + pub _phantom: PhantomData, } -impl BeefyWorker +impl BeefyWorkerBase where B: Block + Codec, BE: Backend, - P: PayloadProvider, - S: SyncOracle, R: ProvideRuntimeApi, R::Api: BeefyApi, { - fn best_grandpa_block(&self) -> NumberFor { - *self.persisted_state.voting_oracle.best_grandpa_block_header.number() - } + // If no persisted state present, walk back the chain from first GRANDPA notification to either: + // - latest BEEFY finalized block, or if none found on the way, + // - BEEFY pallet genesis; + // Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to + // finalize. + async fn init_state( + &self, + beefy_genesis: NumberFor, + best_grandpa: ::Header, + min_block_delta: u32, + ) -> Result, Error> { + let blockchain = self.backend.blockchain(); - fn voting_oracle(&self) -> &VoterOracle { - &self.persisted_state.voting_oracle + let beefy_genesis = self + .runtime + .runtime_api() + .beefy_genesis(best_grandpa.hash()) + .ok() + .flatten() + .filter(|genesis| *genesis == beefy_genesis) + .ok_or_else(|| Error::Backend("BEEFY pallet expected to be active.".into()))?; + // Walk back the imported blocks and initialize voter either, at the last block with + // a BEEFY justification, or at pallet genesis block; voter will resume from there. + let mut sessions = VecDeque::new(); + let mut header = best_grandpa.clone(); + let state = loop { + if let Some(true) = blockchain + .justifications(header.hash()) + .ok() + .flatten() + .map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some()) + { + info!( + target: LOG_TARGET, + "🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.", + *header.number() + ); + let best_beefy = *header.number(); + // If no session boundaries detected so far, just initialize new rounds here. + if sessions.is_empty() { + let active_set = + expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header) + .await?; + let mut rounds = Rounds::new(best_beefy, active_set); + // Mark the round as already finalized. + rounds.conclude(best_beefy); + sessions.push_front(rounds); + } + let state = PersistedState::checked_new( + best_grandpa, + best_beefy, + sessions, + min_block_delta, + beefy_genesis, + ) + .ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))?; + break state + } + + if *header.number() == beefy_genesis { + // We've reached BEEFY genesis, initialize voter here. + let genesis_set = + expect_validator_set(self.runtime.as_ref(), self.backend.as_ref(), &header) + .await?; + info!( + target: LOG_TARGET, + "🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \ + Starting voting rounds at block {:?}, genesis validator set {:?}.", + beefy_genesis, + genesis_set, + ); + + sessions.push_front(Rounds::new(beefy_genesis, genesis_set)); + break PersistedState::checked_new( + best_grandpa, + Zero::zero(), + sessions, + min_block_delta, + beefy_genesis, + ) + .ok_or_else(|| Error::Backend("Invalid BEEFY chain".into()))? + } + + if let Some(active) = find_authorities_change::(&header) { + info!( + target: LOG_TARGET, + "🥩 Marking block {:?} as BEEFY Mandatory.", + *header.number() + ); + sessions.push_front(Rounds::new(*header.number(), active)); + } + + // Move up the chain. + header = wait_for_parent_header(blockchain, header, HEADER_SYNC_DELAY).await?; + }; + + aux_schema::write_current_version(self.backend.as_ref())?; + aux_schema::write_voter_state(self.backend.as_ref(), &state)?; + Ok(state) } - fn active_rounds(&mut self) -> Result<&Rounds, Error> { - self.persisted_state.voting_oracle.active_rounds() + pub async fn load_or_init_state( + &mut self, + beefy_genesis: NumberFor, + best_grandpa: ::Header, + min_block_delta: u32, + ) -> Result, Error> { + // Initialize voter state from AUX DB if compatible. + if let Some(mut state) = crate::aux_schema::load_persistent(self.backend.as_ref())? + // Verify state pallet genesis matches runtime. + .filter(|state| state.pallet_genesis() == beefy_genesis) + { + // Overwrite persisted state with current best GRANDPA block. + state.set_best_grandpa(best_grandpa.clone()); + // Overwrite persisted data with newly provided `min_block_delta`. + state.set_min_block_delta(min_block_delta); + info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state); + + // Make sure that all the headers that we need have been synced. + let mut new_sessions = vec![]; + let mut header = best_grandpa.clone(); + while *header.number() > state.best_beefy() { + if state.voting_oracle.can_add_session(*header.number()) { + if let Some(active) = find_authorities_change::(&header) { + new_sessions.push((active, *header.number())); + } + } + header = + wait_for_parent_header(self.backend.blockchain(), header, HEADER_SYNC_DELAY) + .await?; + } + + // Make sure we didn't miss any sessions during node restart. + for (validator_set, new_session_start) in new_sessions.drain(..).rev() { + info!( + target: LOG_TARGET, + "🥩 Handling missed BEEFY session after node restart: {:?}.", + new_session_start + ); + self.init_session_at(&mut state, validator_set, new_session_start); + } + return Ok(state) + } + + // No valid voter-state persisted, re-initialize from pallet genesis. + self.init_state(beefy_genesis, best_grandpa, min_block_delta).await } /// Verify `active` validator set for `block` against the key store @@ -394,7 +524,7 @@ where if store.intersection(&active).count() == 0 { let msg = "no authority public key found in store".to_string(); debug!(target: LOG_TARGET, "🥩 for block {:?} {}", block, msg); - metric_inc!(self, beefy_no_authority_found_in_store); + metric_inc!(self.metrics, beefy_no_authority_found_in_store); Err(Error::Keystore(msg)) } else { Ok(()) @@ -404,13 +534,14 @@ where /// Handle session changes by starting new voting round for mandatory blocks. fn init_session_at( &mut self, + persisted_state: &mut PersistedState, validator_set: ValidatorSet, new_session_start: NumberFor, ) { debug!(target: LOG_TARGET, "🥩 New active validator set: {:?}", validator_set); // BEEFY should finalize a mandatory block during each session. - if let Ok(active_session) = self.active_rounds() { + if let Ok(active_session) = persisted_state.voting_oracle.active_rounds() { if !active_session.mandatory_done() { debug!( target: LOG_TARGET, @@ -418,7 +549,7 @@ where validator_set.id(), active_session.validator_set_id(), ); - metric_inc!(self, beefy_lagging_sessions); + metric_inc!(self.metrics, beefy_lagging_sessions); } } @@ -428,10 +559,10 @@ where } let id = validator_set.id(); - self.persisted_state + persisted_state .voting_oracle .add_session(Rounds::new(new_session_start, validator_set)); - metric_set!(self, beefy_validator_set_id, id); + metric_set!(self.metrics, beefy_validator_set_id, id); info!( target: LOG_TARGET, "🥩 New Rounds for validator set id: {:?} with session_start {:?}", @@ -439,6 +570,61 @@ where new_session_start ); } +} + +/// A BEEFY worker/voter that follows the BEEFY protocol +pub(crate) struct BeefyWorker { + pub base: BeefyWorkerBase, + + // utils + pub payload_provider: P, + pub sync: Arc, + + // communication (created once, but returned and reused if worker is restarted/reinitialized) + pub comms: BeefyComms, + + // channels + /// Links between the block importer, the background voter and the RPC layer. + pub links: BeefyVoterLinks, + + // voter state + /// Buffer holding justifications for future processing. + pub pending_justifications: BTreeMap, BeefyVersionedFinalityProof>, + /// Persisted voter state. + pub persisted_state: PersistedState, +} + +impl BeefyWorker +where + B: Block + Codec, + BE: Backend, + P: PayloadProvider, + S: SyncOracle, + R: ProvideRuntimeApi, + R::Api: BeefyApi, +{ + fn best_grandpa_block(&self) -> NumberFor { + *self.persisted_state.voting_oracle.best_grandpa_block_header.number() + } + + fn voting_oracle(&self) -> &VoterOracle { + &self.persisted_state.voting_oracle + } + + #[cfg(test)] + fn active_rounds(&mut self) -> Result<&Rounds, Error> { + self.persisted_state.voting_oracle.active_rounds() + } + + /// Handle session changes by starting new voting round for mandatory blocks. + fn init_session_at( + &mut self, + validator_set: ValidatorSet, + new_session_start: NumberFor, + ) { + self.base + .init_session_at(&mut self.persisted_state, validator_set, new_session_start); + } fn handle_finality_notification( &mut self, @@ -452,7 +638,8 @@ where ); let header = ¬ification.header; - self.runtime + self.base + .runtime .runtime_api() .beefy_genesis(header.hash()) .ok() @@ -466,7 +653,7 @@ where self.persisted_state.set_best_grandpa(header.clone()); // Check all (newly) finalized blocks for new session(s). - let backend = self.backend.clone(); + let backend = self.base.backend.clone(); for header in notification .tree_route .iter() @@ -485,7 +672,7 @@ where } if new_session_added { - crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) + crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state) .map_err(|e| Error::Backend(e.to_string()))?; } @@ -519,7 +706,7 @@ where true, ); }, - RoundAction::Drop => metric_inc!(self, beefy_stale_votes), + RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_votes), RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote), }; Ok(()) @@ -539,23 +726,23 @@ where match self.voting_oracle().triage_round(block_num)? { RoundAction::Process => { debug!(target: LOG_TARGET, "🥩 Process justification for round: {:?}.", block_num); - metric_inc!(self, beefy_imported_justifications); + metric_inc!(self.base.metrics, beefy_imported_justifications); self.finalize(justification)? }, RoundAction::Enqueue => { debug!(target: LOG_TARGET, "🥩 Buffer justification for round: {:?}.", block_num); if self.pending_justifications.len() < MAX_BUFFERED_JUSTIFICATIONS { self.pending_justifications.entry(block_num).or_insert(justification); - metric_inc!(self, beefy_buffered_justifications); + metric_inc!(self.base.metrics, beefy_buffered_justifications); } else { - metric_inc!(self, beefy_buffered_justifications_dropped); + metric_inc!(self.base.metrics, beefy_buffered_justifications_dropped); warn!( target: LOG_TARGET, "🥩 Buffer justification dropped for round: {:?}.", block_num ); } }, - RoundAction::Drop => metric_inc!(self, beefy_stale_justifications), + RoundAction::Drop => metric_inc!(self.base.metrics, beefy_stale_justifications), }; Ok(()) } @@ -577,7 +764,7 @@ where // We created the `finality_proof` and know to be valid. // New state is persisted after finalization. self.finalize(finality_proof.clone())?; - metric_inc!(self, beefy_good_votes_processed); + metric_inc!(self.base.metrics, beefy_good_votes_processed); return Ok(Some(finality_proof)) }, VoteImportResult::Ok => { @@ -588,17 +775,20 @@ where .map(|(mandatory_num, _)| mandatory_num == block_number) .unwrap_or(false) { - crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) - .map_err(|e| Error::Backend(e.to_string()))?; + crate::aux_schema::write_voter_state( + &*self.base.backend, + &self.persisted_state, + ) + .map_err(|e| Error::Backend(e.to_string()))?; } - metric_inc!(self, beefy_good_votes_processed); + metric_inc!(self.base.metrics, beefy_good_votes_processed); }, VoteImportResult::Equivocation(proof) => { - metric_inc!(self, beefy_equivocation_votes); + metric_inc!(self.base.metrics, beefy_equivocation_votes); self.report_equivocation(proof)?; }, - VoteImportResult::Invalid => metric_inc!(self, beefy_invalid_votes), - VoteImportResult::Stale => metric_inc!(self, beefy_stale_votes), + VoteImportResult::Invalid => metric_inc!(self.base.metrics, beefy_invalid_votes), + VoteImportResult::Stale => metric_inc!(self.base.metrics, beefy_stale_votes), }; Ok(None) } @@ -625,14 +815,15 @@ where // Set new best BEEFY block number. self.persisted_state.set_best_beefy(block_num); - crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) + crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state) .map_err(|e| Error::Backend(e.to_string()))?; - metric_set!(self, beefy_best_block, block_num); + metric_set!(self.base.metrics, beefy_best_block, block_num); self.comms.on_demand_justifications.cancel_requests_older_than(block_num); if let Err(e) = self + .base .backend .blockchain() .expect_block_hash_from_id(&BlockId::Number(block_num)) @@ -642,7 +833,8 @@ where .notify(|| Ok::<_, ()>(hash)) .expect("forwards closure result; the closure always returns Ok; qed."); - self.backend + self.base + .backend .append_justification(hash, (BEEFY_ENGINE_ID, finality_proof.encode())) }) { debug!( @@ -679,12 +871,16 @@ where for (num, justification) in justifs_to_process.into_iter() { debug!(target: LOG_TARGET, "🥩 Handle buffered justification for: {:?}.", num); - metric_inc!(self, beefy_imported_justifications); + metric_inc!(self.base.metrics, beefy_imported_justifications); if let Err(err) = self.finalize(justification) { error!(target: LOG_TARGET, "🥩 Error finalizing block: {}", err); } } - metric_set!(self, beefy_buffered_justifications, self.pending_justifications.len()); + metric_set!( + self.base.metrics, + beefy_buffered_justifications, + self.pending_justifications.len() + ); } Ok(()) } @@ -693,7 +889,7 @@ where fn try_to_vote(&mut self) -> Result<(), Error> { // Vote if there's now a new vote target. if let Some(target) = self.voting_oracle().voting_target() { - metric_set!(self, beefy_should_vote_on, target); + metric_set!(self.base.metrics, beefy_should_vote_on, target); if target > self.persisted_state.best_voted { self.do_vote(target)?; } @@ -713,6 +909,7 @@ where self.persisted_state.voting_oracle.best_grandpa_block_header.clone() } else { let hash = self + .base .backend .blockchain() .expect_block_hash_from_id(&BlockId::Number(target_number)) @@ -724,7 +921,7 @@ where Error::Backend(err_msg) })?; - self.backend.blockchain().expect_header(hash).map_err(|err| { + self.base.backend.blockchain().expect_header(hash).map_err(|err| { let err_msg = format!( "Couldn't get header for block #{:?} ({:?}) (error: {:?}), skipping vote..", target_number, hash, err @@ -744,7 +941,7 @@ where let rounds = self.persisted_state.voting_oracle.active_rounds_mut()?; let (validators, validator_set_id) = (rounds.validators(), rounds.validator_set_id()); - let authority_id = if let Some(id) = self.key_store.authority_id(validators) { + let authority_id = if let Some(id) = self.base.key_store.authority_id(validators) { debug!(target: LOG_TARGET, "🥩 Local authority id: {:?}", id); id } else { @@ -758,7 +955,7 @@ where let commitment = Commitment { payload, block_number: target_number, validator_set_id }; let encoded_commitment = commitment.encode(); - let signature = match self.key_store.sign(&authority_id, &encoded_commitment) { + let signature = match self.base.key_store.sign(&authority_id, &encoded_commitment) { Ok(sig) => sig, Err(err) => { warn!(target: LOG_TARGET, "🥩 Error signing commitment: {:?}", err); @@ -783,7 +980,7 @@ where .gossip_engine .gossip_message(proofs_topic::(), encoded_proof, true); } else { - metric_inc!(self, beefy_votes_sent); + metric_inc!(self.base.metrics, beefy_votes_sent); debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote); let encoded_vote = GossipMessage::::Vote(vote).encode(); self.comms.gossip_engine.gossip_message(votes_topic::(), encoded_vote, false); @@ -791,8 +988,8 @@ where // Persist state after vote to avoid double voting in case of voter restarts. self.persisted_state.best_voted = target_number; - metric_set!(self, beefy_best_voted, target_number); - crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) + metric_set!(self.base.metrics, beefy_best_voted, target_number); + crate::aux_schema::write_voter_state(&*self.base.backend, &self.persisted_state) .map_err(|e| Error::Backend(e.to_string())) } @@ -966,7 +1163,7 @@ where if !check_equivocation_proof::<_, _, BeefySignatureHasher>(&proof) { debug!(target: LOG_TARGET, "🥩 Skip report for bad equivocation {:?}", proof); return Ok(()) - } else if let Some(local_id) = self.key_store.authority_id(validators) { + } else if let Some(local_id) = self.base.key_store.authority_id(validators) { if offender_id == local_id { debug!(target: LOG_TARGET, "🥩 Skip equivocation report for own equivocation"); return Ok(()) @@ -975,6 +1172,7 @@ where let number = *proof.round_number(); let hash = self + .base .backend .blockchain() .expect_block_hash_from_id(&BlockId::Number(number)) @@ -985,7 +1183,7 @@ where ); Error::Backend(err_msg) })?; - let runtime_api = self.runtime.runtime_api(); + let runtime_api = self.base.runtime.runtime_api(); // generate key ownership proof at that block let key_owner_proof = match runtime_api .generate_key_ownership_proof(hash, validator_set_id, offender_id) @@ -1002,7 +1200,7 @@ where }; // submit equivocation report at **best** block - let best_block_hash = self.backend.blockchain().info().best_hash; + let best_block_hash = self.base.backend.blockchain().info().best_hash; runtime_api .submit_report_equivocation_unsigned_extrinsic(best_block_hash, proof, key_owner_proof) .map_err(Error::RuntimeApi)?; @@ -1028,7 +1226,7 @@ where /// Calculate next block number to vote on. /// -/// Return `None` if there is no voteable target yet. +/// Return `None` if there is no votable target yet. fn vote_target(best_grandpa: N, best_beefy: N, session_start: N, min_delta: u32) -> Option where N: AtLeast32Bit + Copy + Debug, @@ -1189,14 +1387,17 @@ pub(crate) mod tests { on_demand_justifications, }; BeefyWorker { - backend, + base: BeefyWorkerBase { + backend, + runtime: api, + key_store: Some(keystore).into(), + metrics, + _phantom: Default::default(), + }, payload_provider, - runtime: api, - key_store: Some(keystore).into(), + sync: Arc::new(sync), links, comms, - metrics, - sync: Arc::new(sync), pending_justifications: BTreeMap::new(), persisted_state, } @@ -1470,19 +1671,19 @@ pub(crate) mod tests { let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); // keystore doesn't contain other keys than validators' - assert_eq!(worker.verify_validator_set(&1, &validator_set), Ok(())); + assert_eq!(worker.base.verify_validator_set(&1, &validator_set), Ok(())); // unknown `Bob` key let keys = &[Keyring::Bob]; let validator_set = ValidatorSet::new(make_beefy_ids(keys), 0).unwrap(); let err_msg = "no authority public key found in store".to_string(); let expected = Err(Error::Keystore(err_msg)); - assert_eq!(worker.verify_validator_set(&1, &validator_set), expected); + assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected); // worker has no keystore - worker.key_store = None.into(); + worker.base.key_store = None.into(); let expected_err = Err(Error::Keystore("no Keystore".into())); - assert_eq!(worker.verify_validator_set(&1, &validator_set), expected_err); + assert_eq!(worker.base.verify_validator_set(&1, &validator_set), expected_err); } #[tokio::test] @@ -1634,7 +1835,7 @@ pub(crate) mod tests { let mut net = BeefyTestNet::new(1); let mut worker = create_beefy_worker(net.peer(0), &keys[0], 1, validator_set.clone()); - worker.runtime = api_alice.clone(); + worker.base.runtime = api_alice.clone(); // let there be a block with num = 1: let _ = net.peer(0).push_blocks(1, false);