Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Make Proposer instantiation potentially async. #4630

Merged
merged 5 commits into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ mod tests {
use sc_service::AbstractService;
use crate::service::{new_full, new_light};
use sp_runtime::traits::IdentifyAccount;
use futures::prelude::*;

type AccountPublic = <Signature as Verify>::Signer;

Expand Down Expand Up @@ -535,13 +536,16 @@ mod tests {

digest.push(<DigestItem as CompatibleDigestItem>::babe_pre_digest(babe_pre_digest));

let mut proposer = proposer_factory.init(&parent_header).unwrap();
let new_block = futures::executor::block_on(proposer.propose(
inherent_data,
digest,
std::time::Duration::from_secs(1),
RecordProof::Yes,
)).expect("Error making test block").block;
let proposing = proposer_factory.init(&parent_header)
.and_then(|mut proposer| proposer.propose(
inherent_data,
digest,
std::time::Duration::from_secs(1),
RecordProof::Yes,
));

let new_block = futures::executor::block_on(proposing)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May I suggest something like this instead?

Suggested change
let new_block = futures::executor::block_on(proposing)
let new_block = futures::executor::block_on(async move {
let proposer = proposer_factory.init(&parent_header).await;
proposer.propose(
inherent_data,
digest,
std::time::Duration::from_secs(1),
RecordProof::Yes,
).await
});

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still need to get the async/await into my bloodstream :)

.expect("Error making test block").block;

let (new_header, new_body) = new_block.deconstruct();
let pre_hash = new_header.hash();
Expand Down
14 changes: 8 additions & 6 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use sp_transaction_pool::{TransactionPool, InPoolTransaction};
use sc_telemetry::{telemetry, CONSENSUS_INFO};
use sc_block_builder::BlockBuilderApi;
use sp_api::{ProvideRuntimeApi, ApiExt};
use futures::prelude::*;

/// Proposer factory.
pub struct ProposerFactory<C, A> where A: TransactionPool {
Expand All @@ -59,7 +60,7 @@ impl<B, E, Block, RA, A> ProposerFactory<SubstrateClient<B, E, Block, RA>, A>
&mut self,
parent_header: &<Block as BlockT>::Header,
now: Box<dyn Fn() -> time::Instant + Send + Sync>,
) -> Result<Proposer<Block, SubstrateClient<B, E, Block, RA>, A>, sp_blockchain::Error> {
) -> Proposer<Block, SubstrateClient<B, E, Block, RA>, A> {
let parent_hash = parent_header.hash();

let id = BlockId::hash(parent_hash);
Expand All @@ -77,7 +78,7 @@ impl<B, E, Block, RA, A> ProposerFactory<SubstrateClient<B, E, Block, RA>, A>
}),
};

Ok(proposer)
proposer
}
}

Expand All @@ -94,14 +95,15 @@ impl<B, E, Block, RA, A> sp_consensus::Environment<Block> for
BlockBuilderApi<Block, Error = sp_blockchain::Error> +
ApiExt<Block, StateBackend = backend::StateBackendFor<B, Block>>,
{
type CreateProposer = future::Ready<Result<Self::Proposer, Self::Error>>;
type Proposer = Proposer<Block, SubstrateClient<B, E, Block, RA>, A>;
type Error = sp_blockchain::Error;

fn init(
&mut self,
parent_header: &<Block as BlockT>::Header,
) -> Result<Self::Proposer, sp_blockchain::Error> {
self.init_with_now(parent_header, Box::new(time::Instant::now))
) -> Self::CreateProposer {
future::ready(Ok(self.init_with_now(parent_header, Box::new(time::Instant::now))))
}
}

Expand Down Expand Up @@ -324,7 +326,7 @@ mod tests {
*value = new;
old
})
).unwrap();
);

// when
let deadline = time::Duration::from_secs(3);
Expand Down Expand Up @@ -359,7 +361,7 @@ mod tests {
let mut proposer = proposer_factory.init_with_now(
&client.header(&block_id).unwrap().unwrap(),
Box::new(move || time::Instant::now()),
).unwrap();
);

let deadline = time::Duration::from_secs(9);
let proposal = futures::executor::block_on(
Expand Down
7 changes: 5 additions & 2 deletions client/basic-authorship/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@
//! };
//!
//! // From this factory, we create a `Proposer`.
//! let mut proposer = proposer_factory.init(
//! let proposer = proposer_factory.init(
//! &client.header(&BlockId::number(0)).unwrap().unwrap(),
//! ).unwrap();
//! );
//!
//! // The proposer is created asynchronously.
//! let mut proposer = futures::executor::block_on(proposer).unwrap();
//!
//! // This `Proposer` allows us to create a block proposition.
//! // The proposer will grab transactions from the transaction pool, and put them into the block.
Expand Down
14 changes: 9 additions & 5 deletions client/consensus/aura/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ impl<B, C, E, I, P, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for AuraW
{
type BlockImport = I;
type SyncOracle = SO;
type CreateProposer = Pin<Box<
dyn Future<Output = Result<E::Proposer, sp_consensus::Error>> + Send + 'static
>>;
type Proposer = E::Proposer;
type Claim = P;
type EpochData = Vec<AuthorityId<P>>;
Expand Down Expand Up @@ -302,10 +305,10 @@ impl<B, C, E, I, P, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for AuraW
&mut self.sync_oracle
}

fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, sp_consensus::Error> {
self.env.init(block).map_err(|e| {
fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
Box::pin(self.env.init(block).map_err(|e| {
sp_consensus::Error::ClientImport(format!("{:?}", e)).into()
})
}))
}

fn proposing_remaining_duration(
Expand Down Expand Up @@ -874,12 +877,13 @@ mod tests {

impl Environment<TestBlock> for DummyFactory {
type Proposer = DummyProposer;
type CreateProposer = futures::future::Ready<Result<DummyProposer, Error>>;
type Error = Error;

fn init(&mut self, parent_header: &<TestBlock as BlockT>::Header)
-> Result<DummyProposer, Error>
-> Self::CreateProposer
{
Ok(DummyProposer(parent_header.number + 1, self.0.clone()))
futures::future::ready(Ok(DummyProposer(parent_header.number + 1, self.0.clone())))
}
}

Expand Down
9 changes: 6 additions & 3 deletions client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
type EpochData = Epoch;
type Claim = (BabePreDigest, AuthorityPair);
type SyncOracle = SO;
type CreateProposer = Pin<Box<
dyn Future<Output = Result<E::Proposer, sp_consensus::Error>> + Send + 'static
>>;
type Proposer = E::Proposer;
type BlockImport = I;

Expand Down Expand Up @@ -466,10 +469,10 @@ impl<B, C, E, I, Error, SO> sc_consensus_slots::SimpleSlotWorker<B> for BabeWork
&mut self.sync_oracle
}

fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, sp_consensus::Error> {
self.env.init(block).map_err(|e| {
fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
Box::pin(self.env.init(block).map_err(|e| {
sp_consensus::Error::ClientImport(format!("{:?}", e))
})
}))
}

fn proposing_remaining_duration(
Expand Down
9 changes: 5 additions & 4 deletions client/consensus/babe/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,24 @@ struct DummyProposer {
}

impl Environment<TestBlock> for DummyFactory {
type CreateProposer = future::Ready<Result<DummyProposer, Error>>;
type Proposer = DummyProposer;
type Error = Error;

fn init(&mut self, parent_header: &<TestBlock as BlockT>::Header)
-> Result<DummyProposer, Error>
-> Self::CreateProposer
{

let parent_slot = crate::find_pre_digest::<TestBlock>(parent_header)
.expect("parent header has a pre-digest")
.slot_number();

Ok(DummyProposer {
future::ready(Ok(DummyProposer {
factory: self.clone(),
parent_hash: parent_header.hash(),
parent_number: *parent_header.number(),
parent_slot,
})
}))
}
}

Expand Down Expand Up @@ -547,7 +548,7 @@ fn propose_and_import_block<Transaction>(
proposer_factory: &mut DummyFactory,
block_import: &mut BoxBlockImport<TestBlock, Transaction>,
) -> sp_core::H256 {
let mut proposer = proposer_factory.init(parent).unwrap();
let mut proposer = futures::executor::block_on(proposer_factory.init(parent)).unwrap();

let slot_number = slot_number.unwrap_or_else(|| {
let parent_pre_digest = find_pre_digest::<TestBlock>(parent).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ fn mine_loop<B: BlockT, C, Algorithm, E, SO, S, CAW>(
}

let mut aux = PowAux::read(client, &best_hash)?;
let mut proposer = env.init(&best_header)
let mut proposer = futures::executor::block_on(env.init(&best_header))
.map_err(|e| Error::Environment(format!("{:?}", e)))?;

let inherent_data = inherent_data_providers
Expand Down
28 changes: 15 additions & 13 deletions client/consensus/slots/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ pub trait SimpleSlotWorker<B: BlockT> {
/// A handle to a `SyncOracle`.
type SyncOracle: SyncOracle;

/// The type of future resolving to the proposer.
type CreateProposer: Future<Output = Result<Self::Proposer, sp_consensus::Error>>
+ Send + Unpin + 'static;

/// The type of proposer to use to build blocks.
type Proposer: Proposer<B>;

Expand Down Expand Up @@ -129,7 +133,7 @@ pub trait SimpleSlotWorker<B: BlockT> {
fn sync_oracle(&mut self) -> &mut Self::SyncOracle;

/// Returns a `Proposer` to author on top of the given block.
fn proposer(&mut self, block: &B::Header) -> Result<Self::Proposer, sp_consensus::Error>;
fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer;

/// Remaining duration of the slot.
fn slot_remaining_duration(&self, slot_info: &SlotInfo) -> Duration {
Expand Down Expand Up @@ -216,32 +220,30 @@ pub trait SimpleSlotWorker<B: BlockT> {
"timestamp" => timestamp,
);

let mut proposer = match self.proposer(&chain_head) {
Ok(proposer) => proposer,
Err(err) => {
warn!("Unable to author block in slot {:?}: {:?}", slot_number, err);
let awaiting_proposer = self.proposer(&chain_head).map_err(move |err| {
warn!("Unable to author block in slot {:?}: {:?}", slot_number, err);

telemetry!(CONSENSUS_WARN; "slots.unable_authoring_block";
"slot" => slot_number, "err" => ?err
);
telemetry!(CONSENSUS_WARN; "slots.unable_authoring_block";
"slot" => slot_number, "err" => ?err
);

return Box::pin(future::ready(Ok(())));
},
};
err
});

let slot_remaining_duration = self.slot_remaining_duration(&slot_info);
let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info);
let logs = self.pre_digest_data(slot_number, &claim);

// deadline our production to approx. the end of the slot
let proposing = proposer.propose(
let proposing = awaiting_proposer.and_then(move |mut proposer| proposer.propose(
slot_info.inherent_data,
sp_runtime::generic::Digest {
logs,
},
slot_remaining_duration,
RecordProof::No,
).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e)));
).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e))));

let delay: Box<dyn Future<Output=()> + Unpin + Send> = match proposing_remaining_duration {
Some(r) => Box::new(Delay::new(r)),
None => Box::new(future::pending()),
Expand Down
7 changes: 5 additions & 2 deletions primitives/consensus/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ pub enum BlockStatus {
/// Environment producer for a Consensus instance. Creates proposer instance and communication streams.
pub trait Environment<B: BlockT> {
/// The proposer type this creates.
type Proposer: Proposer<B> + 'static;
type Proposer: Proposer<B> + Send + 'static;
/// A future that resolves to the proposer.
type CreateProposer: Future<Output = Result<Self::Proposer, Self::Error>>
+ Send + Unpin + 'static;
/// Error which can occur upon creation.
type Error: From<Error> + std::fmt::Debug + 'static;

/// Initialize the proposal logic on top of a specific header. Provide
/// the authorities at that header.
fn init(&mut self, parent_header: &B::Header) -> Result<Self::Proposer, Self::Error>;
fn init(&mut self, parent_header: &B::Header) -> Self::CreateProposer;
}

/// A proposal that is created by a [`Proposer`].
Expand Down