From 0f1029409264731156a7157ebbeedd3b3148c6c8 Mon Sep 17 00:00:00 2001 From: Adrian Catangiu Date: Mon, 28 Nov 2022 13:38:24 +0200 Subject: [PATCH] client/beefy: fix on-demand justifications sync for old blocks (#12767) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * client/beefy: fix on-demand justif sync for old blocks When receiving BEEFY justifications for old blocks the state might be pruned for them, in which case justification verification fails because BEEFY validator set cannot be retrieved from runtime state. Fix this by having the voter give the validator set to the `OnDemandJustificationsEngine` as request information. On receiving a BEEFY justification for requested block, the provided validator set will be used to validate the justification. Signed-off-by: acatangiu * Apply suggestions from code review Co-authored-by: Bastian Köcher * impl review suggestions * client/beefy: fail initialization if state unavailable * beefy: remove spammy log Signed-off-by: acatangiu Co-authored-by: parity-processbot <> Co-authored-by: Bastian Köcher --- .../outgoing_requests_engine.rs | 112 ++++++++---------- client/beefy/src/lib.rs | 29 +++-- client/beefy/src/round.rs | 4 + client/beefy/src/worker.rs | 50 ++++---- 4 files changed, 94 insertions(+), 101 deletions(-) diff --git a/client/beefy/src/communication/request_response/outgoing_requests_engine.rs b/client/beefy/src/communication/request_response/outgoing_requests_engine.rs index c4d3c926190e6..00ee7610dd4f0 100644 --- a/client/beefy/src/communication/request_response/outgoing_requests_engine.rs +++ b/client/beefy/src/communication/request_response/outgoing_requests_engine.rs @@ -18,21 +18,17 @@ //! Generating request logic for request/response protocol for syncing BEEFY justifications. -use beefy_primitives::{crypto::AuthorityId, BeefyApi, ValidatorSet}; +use beefy_primitives::{crypto::AuthorityId, ValidatorSet}; use codec::Encode; use futures::channel::{oneshot, oneshot::Canceled}; -use log::{debug, error, warn}; +use log::{debug, warn}; use parking_lot::Mutex; use sc_network::{PeerId, ProtocolName}; use sc_network_common::{ request_responses::{IfDisconnected, RequestFailure}, service::NetworkRequest, }; -use sp_api::ProvideRuntimeApi; -use sp_runtime::{ - generic::BlockId, - traits::{Block, NumberFor}, -}; +use sp_runtime::traits::{Block, NumberFor}; use std::{collections::VecDeque, result::Result, sync::Arc}; use crate::{ @@ -46,14 +42,19 @@ type Response = Result, RequestFailure>; /// Used to receive a response from the network. type ResponseReceiver = oneshot::Receiver; +#[derive(Clone, Debug)] +struct RequestInfo { + block: NumberFor, + active_set: ValidatorSet, +} + enum State { Idle, - AwaitingResponse(PeerId, NumberFor, ResponseReceiver), + AwaitingResponse(PeerId, RequestInfo, ResponseReceiver), } -pub struct OnDemandJustificationsEngine { +pub struct OnDemandJustificationsEngine { network: Arc, - runtime: Arc, protocol_name: ProtocolName, live_peers: Arc>>, @@ -62,21 +63,14 @@ pub struct OnDemandJustificationsEngine { state: State, } -impl OnDemandJustificationsEngine -where - B: Block, - R: ProvideRuntimeApi, - R::Api: BeefyApi, -{ +impl OnDemandJustificationsEngine { pub fn new( network: Arc, - runtime: Arc, protocol_name: ProtocolName, live_peers: Arc>>, ) -> Self { Self { network, - runtime, protocol_name, live_peers, peers_cache: VecDeque::new(), @@ -100,10 +94,15 @@ where None } - fn request_from_peer(&mut self, peer: PeerId, block: NumberFor) { - debug!(target: "beefy::sync", "🥩 requesting justif #{:?} from peer {:?}", block, peer); + fn request_from_peer(&mut self, peer: PeerId, req_info: RequestInfo) { + debug!( + target: "beefy::sync", + "🥩 requesting justif #{:?} from peer {:?}", + req_info.block, + peer, + ); - let payload = JustificationRequest:: { begin: block }.encode(); + let payload = JustificationRequest:: { begin: req_info.block }.encode(); let (tx, rx) = oneshot::channel(); @@ -115,11 +114,13 @@ where IfDisconnected::ImmediateError, ); - self.state = State::AwaitingResponse(peer, block, rx); + self.state = State::AwaitingResponse(peer, req_info, rx); } - /// If no other request is in progress, start new justification request for `block`. - pub fn request(&mut self, block: NumberFor) { + /// Start new justification request for `block`, if no other request is in progress. + /// + /// `active_set` will be used to verify validity of potential responses. + pub fn request(&mut self, block: NumberFor, active_set: ValidatorSet) { // ignore new requests while there's already one pending if matches!(self.state, State::AwaitingResponse(_, _, _)) { return @@ -129,7 +130,7 @@ where // Start the requests engine - each unsuccessful received response will automatically // trigger a new request to the next peer in the `peers_cache` until there are none left. if let Some(peer) = self.try_next_peer() { - self.request_from_peer(peer, block); + self.request_from_peer(peer, RequestInfo { block, active_set }); } else { debug!(target: "beefy::sync", "🥩 no good peers to request justif #{:?} from", block); } @@ -138,11 +139,10 @@ where /// Cancel any pending request for block numbers smaller or equal to `block`. pub fn cancel_requests_older_than(&mut self, block: NumberFor) { match &self.state { - State::AwaitingResponse(_, number, _) if *number <= block => { + State::AwaitingResponse(_, req_info, _) if req_info.block <= block => { debug!( - target: "beefy::sync", - "🥩 cancel pending request for justification #{:?}", - number + target: "beefy::sync", "🥩 cancel pending request for justification #{:?}", + req_info.block ); self.state = State::Idle; }, @@ -153,8 +153,7 @@ where fn process_response( &mut self, peer: PeerId, - block: NumberFor, - validator_set: &ValidatorSet, + req_info: &RequestInfo, response: Result, ) -> Result, Error> { response @@ -162,7 +161,7 @@ where debug!( target: "beefy::sync", "🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}", - block, peer, e + req_info.block, peer, e ); Error::InvalidResponse })? @@ -170,60 +169,49 @@ where debug!( target: "beefy::sync", "🥩 for on demand justification #{:?}, peer {:?} error: {:?}", - block, peer, e + req_info.block, peer, e ); Error::InvalidResponse }) .and_then(|encoded| { - decode_and_verify_finality_proof::(&encoded[..], block, &validator_set).map_err( - |e| { - debug!( - target: "beefy::sync", - "🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}", - block, peer, e - ); - Error::InvalidResponse - }, + decode_and_verify_finality_proof::( + &encoded[..], + req_info.block, + &req_info.active_set, ) + .map_err(|e| { + debug!( + target: "beefy::sync", + "🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}", + req_info.block, peer, e + ); + Error::InvalidResponse + }) }) } pub async fn next(&mut self) -> Option> { - let (peer, block, resp) = match &mut self.state { + let (peer, req_info, resp) = match &mut self.state { State::Idle => { futures::pending!(); // Doesn't happen as 'futures::pending!()' is an 'await' barrier that never passes. return None }, - State::AwaitingResponse(peer, block, receiver) => { + State::AwaitingResponse(peer, req_info, receiver) => { let resp = receiver.await; - (*peer, *block, resp) + (*peer, req_info.clone(), resp) }, }; // We received the awaited response. Our 'receiver' will never generate any other response, // meaning we're done with current state. Move the engine to `State::Idle`. self.state = State::Idle; - let block_id = BlockId::number(block); - let validator_set = self - .runtime - .runtime_api() - .validator_set(&block_id) - .map_err(|e| { - error!(target: "beefy::sync", "🥩 Runtime API error {:?} in on-demand justif engine.", e); - e - }) - .ok()? - .or_else(|| { - error!(target: "beefy::sync", "🥩 BEEFY pallet not available for block {:?}.", block); - None - })?; - - self.process_response(peer, block, &validator_set, resp) + let block = req_info.block; + self.process_response(peer, &req_info, resp) .map_err(|_| { // No valid justification received, try next peer in our set. if let Some(peer) = self.try_next_peer() { - self.request_from_peer(peer, block); + self.request_from_peer(peer, req_info); } else { warn!(target: "beefy::sync", "🥩 ran out of peers to request justif #{:?} from", block); } diff --git a/client/beefy/src/lib.rs b/client/beefy/src/lib.rs index 9dccd4236bef3..a057a9fdc597d 100644 --- a/client/beefy/src/lib.rs +++ b/client/beefy/src/lib.rs @@ -244,7 +244,6 @@ where // The `GossipValidator` adds and removes known peers based on valid votes and network events. let on_demand_justifications = OnDemandJustificationsEngine::new( network.clone(), - runtime.clone(), justifications_protocol_name, known_peers, ); @@ -295,7 +294,7 @@ where persisted_state, }; - let worker = worker::BeefyWorker::<_, _, _, _, _>::new(worker_params); + let worker = worker::BeefyWorker::<_, _, _, _>::new(worker_params); futures::future::join( worker.run(block_import_justif, finality_notifications), @@ -377,17 +376,8 @@ where break state } - // Check if we should move up the chain. - let parent_hash = *header.parent_hash(); - if *header.number() == One::one() || - runtime - .runtime_api() - .validator_set(&BlockId::hash(parent_hash)) - .ok() - .flatten() - .is_none() - { - // We've reached pallet genesis, initialize voter here. + if *header.number() == One::one() { + // We've reached chain genesis, initialize voter here. let genesis_num = *header.number(); let genesis_set = expect_validator_set(runtime, BlockId::hash(header.hash())) .and_then(genesis_set_sanity_check)?; @@ -408,6 +398,19 @@ where sessions.push_front(Rounds::new(*header.number(), active)); } + // Check if state is still available if we move up the chain. + let parent_hash = *header.parent_hash(); + runtime + .runtime_api() + .validator_set(&BlockId::hash(parent_hash)) + .ok() + .flatten() + .ok_or_else(|| { + let msg = format!("{}. Could not initialize BEEFY voter.", parent_hash); + error!(target: "beefy", "🥩 {}", msg); + ClientError::Consensus(sp_consensus::Error::StateUnavailable(msg)) + })?; + // Move up the chain. header = blockchain.expect_header(BlockId::Hash(parent_hash))?; }; diff --git a/client/beefy/src/round.rs b/client/beefy/src/round.rs index 7a8cc4171a155..48d3d087299d0 100644 --- a/client/beefy/src/round.rs +++ b/client/beefy/src/round.rs @@ -89,6 +89,10 @@ where } } + pub(crate) fn validator_set(&self) -> &ValidatorSet { + &self.validator_set + } + pub(crate) fn validator_set_id(&self) -> ValidatorSetId { self.validator_set.id() } diff --git a/client/beefy/src/worker.rs b/client/beefy/src/worker.rs index 6726fa4375387..9669939e594c1 100644 --- a/client/beefy/src/worker.rs +++ b/client/beefy/src/worker.rs @@ -31,8 +31,8 @@ use crate::{ }; use beefy_primitives::{ crypto::{AuthorityId, Signature}, - BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, PayloadProvider, SignedCommitment, - ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, + Commitment, ConsensusLog, Payload, PayloadProvider, SignedCommitment, ValidatorSet, + VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, }; use codec::{Codec, Decode, Encode}; use futures::{stream::Fuse, FutureExt, StreamExt}; @@ -41,10 +41,9 @@ use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, Header use sc_network_common::service::{NetworkEventStream, NetworkRequest}; use sc_network_gossip::GossipEngine; use sc_utils::notification::NotificationReceiver; -use sp_api::{BlockId, ProvideRuntimeApi}; +use sp_api::BlockId; use sp_arithmetic::traits::{AtLeast32Bit, Saturating}; use sp_consensus::SyncOracle; -use sp_mmr_primitives::MmrApi; use sp_runtime::{ generic::OpaqueDigestItemId, traits::{Block, Header, NumberFor, Zero}, @@ -166,13 +165,13 @@ impl VoterOracle { Ok(()) } - /// Return current pending mandatory block, if any. - pub fn mandatory_pending(&self) -> Option> { + /// Return current pending mandatory block, if any, plus its active validator set. + pub fn mandatory_pending(&self) -> Option<(NumberFor, ValidatorSet)> { self.sessions.front().and_then(|round| { if round.mandatory_done() { None } else { - Some(round.session_start()) + Some((round.session_start(), round.validator_set().clone())) } }) } @@ -239,14 +238,14 @@ impl VoterOracle { } } -pub(crate) struct WorkerParams { +pub(crate) struct WorkerParams { pub backend: Arc, pub payload_provider: P, pub network: N, pub key_store: BeefyKeystore, pub gossip_engine: GossipEngine, pub gossip_validator: Arc>, - pub on_demand_justifications: OnDemandJustificationsEngine, + pub on_demand_justifications: OnDemandJustificationsEngine, pub links: BeefyVoterLinks, pub metrics: Option, pub persisted_state: PersistedState, @@ -287,7 +286,7 @@ impl PersistedState { } /// A BEEFY worker plays the BEEFY protocol -pub(crate) struct BeefyWorker { +pub(crate) struct BeefyWorker { // utilities backend: Arc, payload_provider: P, @@ -297,7 +296,7 @@ pub(crate) struct BeefyWorker { // communication gossip_engine: GossipEngine, gossip_validator: Arc>, - on_demand_justifications: OnDemandJustificationsEngine, + on_demand_justifications: OnDemandJustificationsEngine, // channels /// Links between the block importer, the background voter and the RPC layer. @@ -314,13 +313,11 @@ pub(crate) struct BeefyWorker { persisted_state: PersistedState, } -impl BeefyWorker +impl BeefyWorker where B: Block + Codec, BE: Backend, P: PayloadProvider, - R: ProvideRuntimeApi, - R::Api: BeefyApi + MmrApi>, N: NetworkEventStream + NetworkRequest + SyncOracle + Send + Sync + Clone + 'static, { /// Return a new BEEFY worker instance. @@ -329,7 +326,7 @@ where /// BEEFY pallet has been deployed on-chain. /// /// The BEEFY pallet is needed in order to keep track of the BEEFY authority set. - pub(crate) fn new(worker_params: WorkerParams) -> Self { + pub(crate) fn new(worker_params: WorkerParams) -> Self { let WorkerParams { backend, payload_provider, @@ -551,10 +548,15 @@ where // New state is persisted after finalization. self.finalize(finality_proof)?; } else { - if self_vote || self.voting_oracle().mandatory_pending() == Some(round.1) { - // Persist state after handling self vote to avoid double voting in case - // of voter restarts. - // Also persist state after handling mandatory block vote. + let mandatory_round = self + .voting_oracle() + .mandatory_pending() + .map(|p| p.0 == round.1) + .unwrap_or(false); + // Persist state after handling self vote to avoid double voting in case + // of voter restarts. + // Also persist state after handling mandatory block vote. + if self_vote || mandatory_round { crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) .map_err(|e| Error::Backend(e.to_string()))?; } @@ -784,12 +786,10 @@ where } // If the current target is a mandatory block, // make sure there's also an on-demand justification request out for it. - if let Some(block) = self.voting_oracle().mandatory_pending() { + if let Some((block, active)) = self.voting_oracle().mandatory_pending() { // This only starts new request if there isn't already an active one. - self.on_demand_justifications.request(block); + self.on_demand_justifications.request(block, active); } - } else { - debug!(target: "beefy", "🥩 Skipping voting while major syncing."); } } @@ -993,7 +993,6 @@ pub(crate) mod tests { Block, Backend, MmrRootProvider, - TestApi, Arc>, > { let keystore = create_beefy_keystore(*key); @@ -1024,7 +1023,6 @@ pub(crate) mod tests { GossipEngine::new(network.clone(), "/beefy/1", gossip_validator.clone(), None); let on_demand_justifications = OnDemandJustificationsEngine::new( network.clone(), - api.clone(), "/beefy/justifs/1".into(), known_peers, ); @@ -1050,7 +1048,7 @@ pub(crate) mod tests { on_demand_justifications, persisted_state, }; - BeefyWorker::<_, _, _, _, _>::new(worker_params) + BeefyWorker::<_, _, _, _>::new(worker_params) } #[test]