Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual seal delayed finalize time #1

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/consensus/manual-seal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ assert_matches = "1.3.0"
async-trait = "0.1.57"
codec = { package = "parity-scale-codec", version = "3.2.2" }
futures = "0.3.21"
futures-timer = "3.0.1"
log = "0.4.17"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0"
Expand Down
154 changes: 152 additions & 2 deletions client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@
//! This is suitable for a testing environment.

use futures::prelude::*;
use futures_timer::Delay;
use prometheus_endpoint::Registry;
use sc_client_api::backend::{Backend as ClientBackend, Finalizer};
use sc_client_api::{
backend::{Backend as ClientBackend, Finalizer},
client::BlockchainEvents,
};
use sc_consensus::{
block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy},
import_queue::{BasicQueue, BoxBlockImport, Verifier},
};
use sp_blockchain::HeaderBackend;
use sp_consensus::{Environment, Proposer, SelectChain};
use sp_core::traits::SpawnNamed;
use sp_inherents::CreateInherentDataProviders;
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::{marker::PhantomData, sync::Arc};
use std::{marker::PhantomData, sync::Arc, time::Duration};

mod error;
mod finalize_block;
Expand Down Expand Up @@ -136,6 +141,19 @@ pub struct InstantSealParams<B: BlockT, BI, E, C: ProvideRuntimeApi<B>, TP, SC,
pub create_inherent_data_providers: CIDP,
}

pub struct DelayedFinalizeParams<B: BlockT, C: ProvideRuntimeApi<B>, S: SpawnNamed> {
/// Block import instance for well. importing blocks.
pub client: Arc<C>,

pub spawn_handle: S,

/// The delay in seconds before a block is finalized.
pub delay_sec: u64,

/// phantom type to pin the Block type
pub _phantom: PhantomData<B>,
}

/// Creates the background authorship task for the manual seal engine.
pub async fn run_manual_seal<B, BI, CB, E, C, TP, SC, CS, CIDP, P>(
ManualSealParams {
Expand Down Expand Up @@ -303,6 +321,42 @@ pub async fn run_instant_seal_and_finalize<B, BI, CB, E, C, TP, SC, CIDP, P>(
.await
}

pub async fn run_delayed_finalize<B, CB, C, S>(
DelayedFinalizeParams {
client,
spawn_handle,
delay_sec,
_phantom: PhantomData,
}: DelayedFinalizeParams<B, C, S>,
) where
B: BlockT + 'static,
CB: ClientBackend<B> + 'static,
C: HeaderBackend<B> + Finalizer<B, CB> + ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
S: SpawnNamed,
{
let mut block_import_stream = client.import_notification_stream();

while let Some(notification) = block_import_stream.next().await {
let delay = Delay::new(Duration::from_secs(delay_sec));
let cloned_client = client.clone();
spawn_handle.spawn(
"delayed-finalize",
None,
Box::pin(async move {
delay.await;
finalize_block(FinalizeBlockParams {
hash: notification.hash,
sender: None,
justification: None,
finalizer: cloned_client,
_phantom: PhantomData,
})
.await
}),
);
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -428,6 +482,102 @@ mod tests {
assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1)
}

#[tokio::test]
async fn instant_seal_delayed_finalize() {
let builder = TestClientBuilder::new();
let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
let genesis_hash = client.info().genesis_hash;
let pool = Arc::new(BasicPool::with_revalidation_type(
Options::default(),
true.into(),
api(),
None,
RevalidationType::Full,
spawner.clone(),
0,
genesis_hash,
genesis_hash,
));
let env = ProposerFactory::new(spawner.clone(), client.clone(), pool.clone(), None, None);
// this test checks that blocks are created as soon as transactions are imported into the
// pool.
let (sender, receiver) = futures::channel::oneshot::channel();
let mut sender = Arc::new(Some(sender));
let commands_stream =
pool.pool().validated_pool().import_notification_stream().map(move |_| {
// we're only going to submit one tx so this fn will only be called once.
let mut_sender = Arc::get_mut(&mut sender).unwrap();
let sender = std::mem::take(mut_sender);
EngineCommand::SealNewBlock {
create_empty: false,
// set to `false`, expecting to be finalized by delayed finalize
finalize: false,
parent_hash: None,
sender,
}
});

let future_instant_seal = run_manual_seal(ManualSealParams {
block_import: client.clone(),
commands_stream,
env,
client: client.clone(),
pool: pool.clone(),
select_chain,
create_inherent_data_providers: |_, _| async { Ok(()) },
consensus_data_provider: None,
});
std::thread::spawn(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
// spawn the background authorship task
rt.block_on(future_instant_seal);
});

let delay_sec = 5;
let future_delayed_finalize = run_delayed_finalize(DelayedFinalizeParams {
client: client.clone(),
delay_sec,
spawn_handle: spawner,
_phantom: PhantomData::default(),
});
std::thread::spawn(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
// spawn the background authorship task
rt.block_on(future_delayed_finalize);
});

// submit a transaction to pool.
let result = pool.submit_one(&BlockId::Number(0), SOURCE, uxt(Alice, 0)).await;
// assert that it was successfully imported
assert!(result.is_ok());
// assert that the background task returns ok
let created_block = receiver.await.unwrap().unwrap();
assert_eq!(
created_block,
CreatedBlock {
hash: created_block.hash,
aux: ImportedAux {
header_only: false,
clear_justification_requests: false,
needs_justification: false,
bad_justification: false,
is_new_best: true,
}
}
);
// assert that there's a new block in the db.
assert!(client.header(created_block.hash).unwrap().is_some());
assert_eq!(client.header(created_block.hash).unwrap().unwrap().number, 1);

assert_eq!(client.info().finalized_hash, client.info().genesis_hash);
// ensuring run_delayed_finalize's Future is always processed before checking finalized hash
// by adding 1 sec
Delay::new(Duration::from_secs(delay_sec + 1)).await;
assert_eq!(client.info().finalized_hash, created_block.hash);
}

#[tokio::test]
async fn manual_seal_and_finalization() {
let builder = TestClientBuilder::new();
Expand Down
7 changes: 4 additions & 3 deletions client/consensus/manual-seal/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,11 @@ pub fn send_result<T: std::fmt::Debug>(
}
}
} else {
// instant seal doesn't report errors over rpc, simply log them.
// Sealing/Finalization with no RPC sender such as instant seal or delayed finalize doesn't
// report errors over rpc, simply log them.
match result {
Ok(r) => log::info!("Instant Seal success: {:?}", r),
Err(e) => log::error!("Instant Seal encountered an error: {}", e),
Ok(r) => log::info!("Consensus with no RPC sender success: {:?}", r),
Err(e) => log::error!("Consensus with no RPC sender encountered an error: {}", e),
}
}
}