From 241162b82016e378107188e62e1eb9e938ff448c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 15 Jan 2020 15:11:20 +0100 Subject: [PATCH] Make Proposer instantiation potentially async. --- .../basic-authorship/src/basic_authorship.rs | 14 ++++++---- client/consensus/aura/src/lib.rs | 14 ++++++---- client/consensus/babe/src/lib.rs | 9 ++++-- client/consensus/babe/src/tests.rs | 9 +++--- client/consensus/slots/src/lib.rs | 28 ++++++++++--------- primitives/consensus/common/src/lib.rs | 7 +++-- 6 files changed, 48 insertions(+), 33 deletions(-) diff --git a/client/basic-authorship/src/basic_authorship.rs b/client/basic-authorship/src/basic_authorship.rs index 543428b073478..576009ef27487 100644 --- a/client/basic-authorship/src/basic_authorship.rs +++ b/client/basic-authorship/src/basic_authorship.rs @@ -34,6 +34,7 @@ use sp_transaction_pool::{TransactionPool, InPoolTransaction}; use sc_telemetry::{telemetry, CONSENSUS_INFO}; use sc_block_builder::BlockBuilderApi; use sp_api::{ProvideRuntimeApi, ApiExt}; +use futures::prelude::*; /// Proposer factory. pub struct ProposerFactory where A: TransactionPool { @@ -59,7 +60,7 @@ impl ProposerFactory, A> &mut self, parent_header: &::Header, now: Box time::Instant + Send + Sync>, - ) -> Result, A>, sp_blockchain::Error> { + ) -> Proposer, A> { let parent_hash = parent_header.hash(); let id = BlockId::hash(parent_hash); @@ -77,7 +78,7 @@ impl ProposerFactory, A> }), }; - Ok(proposer) + proposer } } @@ -94,14 +95,15 @@ impl sp_consensus::Environment for BlockBuilderApi + ApiExt>, { + type CreateProposer = future::Ready>; type Proposer = Proposer, A>; type Error = sp_blockchain::Error; fn init( &mut self, parent_header: &::Header, - ) -> Result { - self.init_with_now(parent_header, Box::new(time::Instant::now)) + ) -> Self::CreateProposer { + future::ready(Ok(self.init_with_now(parent_header, Box::new(time::Instant::now)))) } } @@ -324,7 +326,7 @@ mod tests { *value = new; old }) - ).unwrap(); + ); // when let deadline = time::Duration::from_secs(3); @@ -359,7 +361,7 @@ mod tests { let mut proposer = proposer_factory.init_with_now( &client.header(&block_id).unwrap().unwrap(), Box::new(move || time::Instant::now()), - ).unwrap(); + ); let deadline = time::Duration::from_secs(9); let proposal = futures::executor::block_on( diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs index 13a4c5a777144..79ec8a52d7022 100644 --- a/client/consensus/aura/src/lib.rs +++ b/client/consensus/aura/src/lib.rs @@ -217,6 +217,9 @@ impl sc_consensus_slots::SimpleSlotWorker for AuraW { type BlockImport = I; type SyncOracle = SO; + type CreateProposer = Pin> + Send + 'static + >>; type Proposer = E::Proposer; type Claim = P; type EpochData = Vec>; @@ -302,10 +305,10 @@ impl sc_consensus_slots::SimpleSlotWorker for AuraW &mut self.sync_oracle } - fn proposer(&mut self, block: &B::Header) -> Result { - self.env.init(block).map_err(|e| { + fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer { + Box::pin(self.env.init(block).map_err(|e| { sp_consensus::Error::ClientImport(format!("{:?}", e)).into() - }) + })) } fn proposing_remaining_duration( @@ -874,12 +877,13 @@ mod tests { impl Environment for DummyFactory { type Proposer = DummyProposer; + type CreateProposer = futures::future::Ready>; type Error = Error; fn init(&mut self, parent_header: &::Header) - -> Result + -> Self::CreateProposer { - Ok(DummyProposer(parent_header.number + 1, self.0.clone())) + futures::future::ready(Ok(DummyProposer(parent_header.number + 1, self.0.clone()))) } } diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index bbf19b706601f..cb7e964ed8fbc 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -365,6 +365,9 @@ impl sc_consensus_slots::SimpleSlotWorker for BabeWork type EpochData = Epoch; type Claim = (BabePreDigest, AuthorityPair); type SyncOracle = SO; + type CreateProposer = Pin> + Send + 'static + >>; type Proposer = E::Proposer; type BlockImport = I; @@ -468,10 +471,10 @@ impl sc_consensus_slots::SimpleSlotWorker for BabeWork &mut self.sync_oracle } - fn proposer(&mut self, block: &B::Header) -> Result { - self.env.init(block).map_err(|e| { + fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer { + Box::pin(self.env.init(block).map_err(|e| { sp_consensus::Error::ClientImport(format!("{:?}", e)) - }) + })) } fn proposing_remaining_duration( diff --git a/client/consensus/babe/src/tests.rs b/client/consensus/babe/src/tests.rs index 2ddb67fe4796e..aca4d236142e8 100644 --- a/client/consensus/babe/src/tests.rs +++ b/client/consensus/babe/src/tests.rs @@ -72,23 +72,24 @@ struct DummyProposer { } impl Environment for DummyFactory { + type CreateProposer = future::Ready>; type Proposer = DummyProposer; type Error = Error; fn init(&mut self, parent_header: &::Header) - -> Result + -> Self::CreateProposer { let parent_slot = crate::find_pre_digest::(parent_header) .expect("parent header has a pre-digest") .slot_number(); - Ok(DummyProposer { + future::ready(Ok(DummyProposer { factory: self.clone(), parent_hash: parent_header.hash(), parent_number: *parent_header.number(), parent_slot, - }) + })) } } @@ -547,7 +548,7 @@ fn propose_and_import_block( proposer_factory: &mut DummyFactory, block_import: &mut BoxBlockImport, ) -> sp_core::H256 { - let mut proposer = proposer_factory.init(parent).unwrap(); + let mut proposer = futures::executor::block_on(proposer_factory.init(parent)).unwrap(); let slot_number = slot_number.unwrap_or_else(|| { let parent_pre_digest = find_pre_digest::(parent).unwrap(); diff --git a/client/consensus/slots/src/lib.rs b/client/consensus/slots/src/lib.rs index a69c710a7e944..3aa243af72b06 100644 --- a/client/consensus/slots/src/lib.rs +++ b/client/consensus/slots/src/lib.rs @@ -70,6 +70,10 @@ pub trait SimpleSlotWorker { /// A handle to a `SyncOracle`. type SyncOracle: SyncOracle; + /// The type of future resolving to the proposer. + type CreateProposer: Future> + + Send + Unpin + 'static; + /// The type of proposer to use to build blocks. type Proposer: Proposer; @@ -129,7 +133,7 @@ pub trait SimpleSlotWorker { fn sync_oracle(&mut self) -> &mut Self::SyncOracle; /// Returns a `Proposer` to author on top of the given block. - fn proposer(&mut self, block: &B::Header) -> Result; + fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer; /// Remaining duration of the slot. fn slot_remaining_duration(&self, slot_info: &SlotInfo) -> Duration { @@ -216,32 +220,30 @@ pub trait SimpleSlotWorker { "timestamp" => timestamp, ); - let mut proposer = match self.proposer(&chain_head) { - Ok(proposer) => proposer, - Err(err) => { - warn!("Unable to author block in slot {:?}: {:?}", slot_number, err); + let awaiting_proposer = self.proposer(&chain_head).map_err(move |err| { + warn!("Unable to author block in slot {:?}: {:?}", slot_number, err); - telemetry!(CONSENSUS_WARN; "slots.unable_authoring_block"; - "slot" => slot_number, "err" => ?err - ); + telemetry!(CONSENSUS_WARN; "slots.unable_authoring_block"; + "slot" => slot_number, "err" => ?err + ); - return Box::pin(future::ready(Ok(()))); - }, - }; + err + }); let slot_remaining_duration = self.slot_remaining_duration(&slot_info); let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info); let logs = self.pre_digest_data(slot_number, &claim); // deadline our production to approx. the end of the slot - let proposing = proposer.propose( + let proposing = awaiting_proposer.and_then(move |mut proposer| proposer.propose( slot_info.inherent_data, sp_runtime::generic::Digest { logs, }, slot_remaining_duration, RecordProof::No, - ).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e))); + ).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e)))); + let delay: Box + Unpin + Send> = match proposing_remaining_duration { Some(r) => Box::new(Delay::new(r)), None => Box::new(future::pending()), diff --git a/primitives/consensus/common/src/lib.rs b/primitives/consensus/common/src/lib.rs index 1b98ede376857..4927faede06f7 100644 --- a/primitives/consensus/common/src/lib.rs +++ b/primitives/consensus/common/src/lib.rs @@ -74,13 +74,16 @@ pub enum BlockStatus { /// Environment producer for a Consensus instance. Creates proposer instance and communication streams. pub trait Environment { /// The proposer type this creates. - type Proposer: Proposer + 'static; + type Proposer: Proposer + Send + 'static; + /// A future that resolves to the proposer. + type CreateProposer: Future> + + Send + Unpin + 'static; /// Error which can occur upon creation. type Error: From + std::fmt::Debug + 'static; /// Initialize the proposal logic on top of a specific header. Provide /// the authorities at that header. - fn init(&mut self, parent_header: &B::Header) -> Result; + fn init(&mut self, parent_header: &B::Header) -> Self::CreateProposer; } /// A proposal that is created by a [`Proposer`].