Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Make Proposer instantiation potentially async. (#4630)
Browse files Browse the repository at this point in the history
* Make Proposer instantiation potentially async.

* fix node-service test

* fix basic-authority doc-test

* only block once on futures in test

* use async/await
  • Loading branch information
rphmeier authored Jan 15, 2020
1 parent af1bc8c commit 1dafa60
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 43 deletions.
16 changes: 9 additions & 7 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,13 +535,15 @@ mod tests {

digest.push(<DigestItem as CompatibleDigestItem>::babe_pre_digest(babe_pre_digest));

let mut proposer = proposer_factory.init(&parent_header).unwrap();
let new_block = futures::executor::block_on(proposer.propose(
inherent_data,
digest,
std::time::Duration::from_secs(1),
RecordProof::Yes,
)).expect("Error making test block").block;
let new_block = futures::executor::block_on(async move {
let proposer = proposer_factory.init(&parent_header).await;
proposer.unwrap().propose(
inherent_data,
digest,
std::time::Duration::from_secs(1),
RecordProof::Yes,
).await
}).expect("Error making test block").block;

let (new_header, new_body) = new_block.deconstruct();
let pre_hash = new_header.hash();
Expand Down
14 changes: 8 additions & 6 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use sp_transaction_pool::{TransactionPool, InPoolTransaction};
use sc_telemetry::{telemetry, CONSENSUS_INFO};
use sc_block_builder::BlockBuilderApi;
use sp_api::{ProvideRuntimeApi, ApiExt};
use futures::prelude::*;

/// Proposer factory.
pub struct ProposerFactory<C, A> where A: TransactionPool {
Expand All @@ -59,7 +60,7 @@ impl<B, E, Block, RA, A> ProposerFactory<SubstrateClient<B, E, Block, RA>, A>
&mut self,
parent_header: &<Block as BlockT>::Header,
now: Box<dyn Fn() -> time::Instant + Send + Sync>,
) -> Result<Proposer<Block, SubstrateClient<B, E, Block, RA>, A>, sp_blockchain::Error> {
) -> Proposer<Block, SubstrateClient<B, E, Block, RA>, A> {
let parent_hash = parent_header.hash();

let id = BlockId::hash(parent_hash);
Expand All @@ -77,7 +78,7 @@ impl<B, E, Block, RA, A> ProposerFactory<SubstrateClient<B, E, Block, RA>, A>
}),
};

Ok(proposer)
proposer
}
}

Expand All @@ -94,14 +95,15 @@ impl<B, E, Block, RA, A> sp_consensus::Environment<Block> for
BlockBuilderApi<Block, Error = sp_blockchain::Error> +
ApiExt<Block, StateBackend = backend::StateBackendFor<B, Block>>,
{
type CreateProposer = future::Ready<Result<Self::Proposer, Self::Error>>;
type Proposer = Proposer<Block, SubstrateClient<B, E, Block, RA>, A>;
type Error = sp_blockchain::Error;

fn init(
&mut self,
parent_header: &<Block as BlockT>::Header,
) -> Result<Self::Proposer, sp_blockchain::Error> {
self.init_with_now(parent_header, Box::new(time::Instant::now))
) -> Self::CreateProposer {
future::ready(Ok(self.init_with_now(parent_header, Box::new(time::Instant::now))))
}
}

Expand Down Expand Up @@ -324,7 +326,7 @@ mod tests {
*value = new;
old
})
).unwrap();
);

// when
let deadline = time::Duration::from_secs(3);
Expand Down Expand Up @@ -359,7 +361,7 @@ mod tests {
let mut proposer = proposer_factory.init_with_now(
&client.header(&block_id).unwrap().unwrap(),
Box::new(move || time::Instant::now()),
).unwrap();
);

let deadline = time::Duration::from_secs(9);
let proposal = futures::executor::block_on(
Expand Down
7 changes: 5 additions & 2 deletions client/basic-authorship/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@
//! };
//!
//! // From this factory, we create a `Proposer`.
//! let mut proposer = proposer_factory.init(
//! let proposer = proposer_factory.init(
//! &client.header(&BlockId::number(0)).unwrap().unwrap(),
//! ).unwrap();
//! );
//!
//! // The proposer is created asynchronously.
//! let mut proposer = futures::executor::block_on(proposer).unwrap();
//!
//! // This `Proposer` allows us to create a block proposition.
//! // The proposer will grab transactions from the transaction pool, and put them into the block.
Expand Down
14 changes: 9 additions & 5 deletions client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ impl<B, C, E, I, P, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for AuraW
{
type BlockImport = I;
type SyncOracle = SO;
type CreateProposer = Pin<Box<
dyn Future<Output = Result<E::Proposer, sp_consensus::Error>> + Send + 'static
>>;
type Proposer = E::Proposer;
type Claim = P;
type EpochData = Vec<AuthorityId<P>>;
Expand Down Expand Up @@ -302,10 +305,10 @@ impl<B, C, E, I, P, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for AuraW
&mut self.sync_oracle
}

fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, sp_consensus::Error> {
self.env.init(block).map_err(|e| {
fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
Box::pin(self.env.init(block).map_err(|e| {
sp_consensus::Error::ClientImport(format!("{:?}", e)).into()
})
}))
}

fn proposing_remaining_duration(
Expand Down Expand Up @@ -874,12 +877,13 @@ mod tests {

impl Environment<TestBlock> for DummyFactory {
type Proposer = DummyProposer;
type CreateProposer = futures::future::Ready<Result<DummyProposer, Error>>;
type Error = Error;

fn init(&mut self, parent_header: &<TestBlock as BlockT>::Header)
-> Result<DummyProposer, Error>
-> Self::CreateProposer
{
Ok(DummyProposer(parent_header.number + 1, self.0.clone()))
futures::future::ready(Ok(DummyProposer(parent_header.number + 1, self.0.clone())))
}
}

Expand Down
9 changes: 6 additions & 3 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
type EpochData = Epoch;
type Claim = (BabePreDigest, AuthorityPair);
type SyncOracle = SO;
type CreateProposer = Pin<Box<
dyn Future<Output = Result<E::Proposer, sp_consensus::Error>> + Send + 'static
>>;
type Proposer = E::Proposer;
type BlockImport = I;

Expand Down Expand Up @@ -466,10 +469,10 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
&mut self.sync_oracle
}

fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, sp_consensus::Error> {
self.env.init(block).map_err(|e| {
fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
Box::pin(self.env.init(block).map_err(|e| {
sp_consensus::Error::ClientImport(format!("{:?}", e))
})
}))
}

fn proposing_remaining_duration(
Expand Down
9 changes: 5 additions & 4 deletions client/consensus/babe/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,24 @@ struct DummyProposer {
}

impl Environment<TestBlock> for DummyFactory {
type CreateProposer = future::Ready<Result<DummyProposer, Error>>;
type Proposer = DummyProposer;
type Error = Error;

fn init(&mut self, parent_header: &<TestBlock as BlockT>::Header)
-> Result<DummyProposer, Error>
-> Self::CreateProposer
{

let parent_slot = crate::find_pre_digest::<TestBlock>(parent_header)
.expect("parent header has a pre-digest")
.slot_number();

Ok(DummyProposer {
future::ready(Ok(DummyProposer {
factory: self.clone(),
parent_hash: parent_header.hash(),
parent_number: *parent_header.number(),
parent_slot,
})
}))
}
}

Expand Down Expand Up @@ -547,7 +548,7 @@ fn propose_and_import_block<Transaction>(
proposer_factory: &mut DummyFactory,
block_import: &mut BoxBlockImport<TestBlock, Transaction>,
) -> sp_core::H256 {
let mut proposer = proposer_factory.init(parent).unwrap();
let mut proposer = futures::executor::block_on(proposer_factory.init(parent)).unwrap();

let slot_number = slot_number.unwrap_or_else(|| {
let parent_pre_digest = find_pre_digest::<TestBlock>(parent).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ fn mine_loop<B: BlockT, C, Algorithm, E, SO, S, CAW>(
}

let mut aux = PowAux::read(client, &best_hash)?;
let mut proposer = env.init(&best_header)
let mut proposer = futures::executor::block_on(env.init(&best_header))
.map_err(|e| Error::Environment(format!("{:?}", e)))?;

let inherent_data = inherent_data_providers
Expand Down
28 changes: 15 additions & 13 deletions client/consensus/slots/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ pub trait SimpleSlotWorker<B: BlockT> {
/// A handle to a `SyncOracle`.
type SyncOracle: SyncOracle;

/// The type of future resolving to the proposer.
type CreateProposer: Future<Output = Result<Self::Proposer, sp_consensus::Error>>
+ Send + Unpin + 'static;

/// The type of proposer to use to build blocks.
type Proposer: Proposer<B>;

Expand Down Expand Up @@ -129,7 +133,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
fn sync_oracle(&mut self) -> &mut Self::SyncOracle;

/// Returns a `Proposer` to author on top of the given block.
fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, sp_consensus::Error>;
fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer;

/// Remaining duration of the slot.
fn slot_remaining_duration(&self, slot_info: &SlotInfo) -> Duration {
Expand Down Expand Up @@ -216,32 +220,30 @@ pub trait SimpleSlotWorker<B: BlockT> {
"timestamp" => timestamp,
);

let mut proposer = match self.proposer(&chain_head) {
Ok(proposer) => proposer,
Err(err) => {
warn!("Unable to author block in slot {:?}: {:?}", slot_number, err);
let awaiting_proposer = self.proposer(&chain_head).map_err(move |err| {
warn!("Unable to author block in slot {:?}: {:?}", slot_number, err);

telemetry!(CONSENSUS_WARN; "slots.unable_authoring_block";
"slot" => slot_number, "err" => ?err
);
telemetry!(CONSENSUS_WARN; "slots.unable_authoring_block";
"slot" => slot_number, "err" => ?err
);

return Box::pin(future::ready(Ok(())));
},
};
err
});

let slot_remaining_duration = self.slot_remaining_duration(&slot_info);
let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info);
let logs = self.pre_digest_data(slot_number, &claim);

// deadline our production to approx. the end of the slot
let proposing = proposer.propose(
let proposing = awaiting_proposer.and_then(move |mut proposer| proposer.propose(
slot_info.inherent_data,
sp_runtime::generic::Digest {
logs,
},
slot_remaining_duration,
RecordProof::No,
).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e)));
).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e))));

let delay: Box<dyn Future<Output=()> + Unpin + Send> = match proposing_remaining_duration {
Some(r) => Box::new(Delay::new(r)),
None => Box::new(future::pending()),
Expand Down
7 changes: 5 additions & 2 deletions primitives/consensus/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ pub enum BlockStatus {
/// Environment producer for a Consensus instance. Creates proposer instance and communication streams.
pub trait Environment<B: BlockT> {
/// The proposer type this creates.
type Proposer: Proposer<B> + 'static;
type Proposer: Proposer<B> + Send + 'static;
/// A future that resolves to the proposer.
type CreateProposer: Future<Output = Result<Self::Proposer, Self::Error>>
+ Send + Unpin + 'static;
/// Error which can occur upon creation.
type Error: From<Error> + std::fmt::Debug + 'static;

/// Initialize the proposal logic on top of a specific header. Provide
/// the authorities at that header.
fn init(&mut self, parent_header: &B::Header) -> Result<Self::Proposer, Self::Error>;
fn init(&mut self, parent_header: &B::Header) -> Self::CreateProposer;
}

/// A proposal that is created by a [`Proposer`].
Expand Down

0 comments on commit 1dafa60

Please sign in to comment.