diff --git a/Cargo.lock b/Cargo.lock index 9294c90408ea..f246530285a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3309,6 +3309,7 @@ dependencies = [ "kv-log-macro", "log 0.4.8", "polkadot-primitives", + "sc-client-api", "streamunordered", ] diff --git a/overseer/Cargo.toml b/overseer/Cargo.toml index 0bef4442be4f..4bd63ad13619 100644 --- a/overseer/Cargo.toml +++ b/overseer/Cargo.toml @@ -10,6 +10,7 @@ log = "0.4.8" futures-timer = "3.0.2" streamunordered = "0.5.1" polkadot-primitives = { path = "../primitives" } +client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } [dev-dependencies] futures = { version = "0.3.5", features = ["thread-pool"] } diff --git a/overseer/src/lib.rs b/overseer/src/lib.rs index 7f0849fcb592..2265c89a62bb 100644 --- a/overseer/src/lib.rs +++ b/overseer/src/lib.rs @@ -71,7 +71,8 @@ use futures::{ use futures_timer::Delay; use streamunordered::{StreamYield, StreamUnordered}; -use polkadot_primitives::{BlockNumber, Hash}; +use polkadot_primitives::{Block, BlockNumber, Hash}; +use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; /// An error type that describes faults that may happen /// @@ -154,6 +155,26 @@ pub struct BlockInfo { pub number: BlockNumber, } +impl From> for BlockInfo { + fn from(n: BlockImportNotification) -> Self { + BlockInfo { + hash: n.hash, + parent_hash: n.header.parent_hash, + number: n.header.number, + } + } +} + +impl From> for BlockInfo { + fn from(n: FinalityNotification) -> Self { + BlockInfo { + hash: n.hash, + parent_hash: n.header.parent_hash, + number: n.header.number, + } + } +} + /// Some event from outer world. enum Event { BlockImported(BlockInfo), @@ -172,6 +193,7 @@ pub enum OutboundMessage { /// A handler used to communicate with the [`Overseer`]. /// /// [`Overseer`]: struct.Overseer.html +#[derive(Clone)] pub struct OverseerHandler { events_tx: mpsc::Sender, } @@ -206,6 +228,43 @@ impl OverseerHandler { } } +/// Glues together the [`Overseer`] and `BlockchainEvents` by forwarding +/// import and finality notifications into the [`OverseerHandler`]. +/// +/// [`Overseer`]: struct.Overseer.html +/// [`OverseerHandler`]: struct.OverseerHandler.html +pub async fn forward_events>( + client: P, + mut handler: OverseerHandler, +) -> SubsystemResult<()> { + let mut finality = client.finality_notification_stream(); + let mut imports = client.import_notification_stream(); + + loop { + select! { + f = finality.next() => { + match f { + Some(block) => { + handler.block_finalized(block.into()).await?; + } + None => break, + } + }, + i = imports.next() => { + match i { + Some(block) => { + handler.block_imported(block.into()).await?; + } + None => break, + } + }, + complete => break, + } + } + + Ok(()) +} + impl Debug for ToOverseer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self {