From a5caaf9010baba5f5f49138e10005576ad7f0625 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 1 Aug 2019 14:30:07 +0200 Subject: [PATCH 1/2] Switch offchain workers to new futures --- Cargo.lock | 3 +-- core/offchain/Cargo.toml | 3 +-- core/offchain/src/api.rs | 6 +++--- core/offchain/src/lib.rs | 11 ++++------- core/service/src/components.rs | 5 ++++- 5 files changed, 13 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index adb20893d8756..30ca90481f29a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4615,7 +4615,7 @@ name = "substrate-offchain" version = "2.0.0" dependencies = [ "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 4.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4627,7 +4627,6 @@ dependencies = [ "substrate-primitives 2.0.0", "substrate-test-runtime-client 2.0.0", "substrate-transaction-pool 2.0.0", - "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/core/offchain/Cargo.toml b/core/offchain/Cargo.toml index 694ce38663aec..76fbe1ca2c514 100644 --- a/core/offchain/Cargo.toml +++ b/core/offchain/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" [dependencies] client = { package = "substrate-client", path = "../../core/client" } -futures = "0.1.25" +futures-preview = "0.3.0-alpha.17" log = "0.4" offchain-primitives = { package = "substrate-offchain-primitives", path = "./primitives" } parity-codec = { version = "4.1.1", features = ["derive"] } @@ -22,7 +22,6 @@ network = { package = "substrate-network", path = "../../core/network" } env_logger = "0.6" client-db = { package = "substrate-client-db", path = "../../core/client/db/", default-features = true } test-client = { package = "substrate-test-runtime-client", path = "../../core/test-runtime/client" } -tokio = "0.1.7" [features] default = [] diff --git a/core/offchain/src/api.rs b/core/offchain/src/api.rs index 8af7b333f5f74..a634ebb477342 100644 --- a/core/offchain/src/api.rs +++ b/core/offchain/src/api.rs @@ -23,7 +23,7 @@ use std::{ }; use client::backend::OffchainStorage; use crate::AuthorityKeyProvider; -use futures::{Stream, Future, sync::mpsc}; +use futures::{StreamExt as _, Future, future, channel::mpsc}; use log::{info, debug, warn, error}; use parity_codec::{Encode, Decode}; use primitives::offchain::{ @@ -535,14 +535,14 @@ impl AsyncApi { } /// Run a processing task for the API - pub fn process(mut self) -> impl Future { + pub fn process(mut self) -> impl Future { let receiver = self.receiver.take().expect("Take invoked only once."); receiver.for_each(move |msg| { match msg { ExtMessage::SubmitExtrinsic(ext) => self.submit_extrinsic(ext), } - Ok(()) + future::ready(()) }) } diff --git a/core/offchain/src/lib.rs b/core/offchain/src/lib.rs index feacb535aac6a..737eb6b548af6 100644 --- a/core/offchain/src/lib.rs +++ b/core/offchain/src/lib.rs @@ -140,7 +140,7 @@ impl OffchainWorkers< number: &::Number, pool: &Arc>, network_state: Arc, - ) -> impl Future where + ) -> impl Future where A: ChainApi + 'static, { let runtime = self.client.runtime_api(); @@ -173,9 +173,9 @@ impl OffchainWorkers< log::error!("Error running offchain workers at {:?}: {:?}", at, e); } }); - futures::future::Either::A(runner.process()) + futures::future::Either::Left(runner.process()) } else { - futures::future::Either::B(futures::future::ok(())) + futures::future::Either::Right(futures::future::ready(())) } } } @@ -196,7 +196,6 @@ fn spawn_worker(f: impl FnOnce() -> () + Send + 'static) { #[cfg(test)] mod tests { use super::*; - use futures::Future; use primitives::{ed25519, sr25519}; use network::{Multiaddr, PeerId}; @@ -246,7 +245,6 @@ mod tests { fn should_call_into_runtime_and_produce_extrinsic() { // given let _ = env_logger::try_init(); - let runtime = tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); let pool = Arc::new(Pool::new(Default::default(), ::transaction_pool::ChainApi::new(client.clone()))); let db = client_db::offchain::LocalStorage::new_test(); @@ -254,10 +252,9 @@ mod tests { // when let offchain = OffchainWorkers::new(client, db, TestProvider::default(), "".to_owned().into()); - runtime.executor().spawn(offchain.on_block_imported(&0u64, &pool, mock.clone())); + futures::executor::block_on(offchain.on_block_imported(&0u64, &pool, mock.clone())); // then - runtime.shutdown_on_idle().wait().unwrap(); assert_eq!(pool.status().ready, 1); assert_eq!(pool.ready().next().unwrap().is_propagateable(), false); } diff --git a/core/service/src/components.rs b/core/service/src/components.rs index dff6161f16523..4bbe25b248184 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -33,6 +33,7 @@ use crate::config::Configuration; use primitives::{Blake2Hasher, H256, Pair}; use rpc::{self, apis::system::SystemInfo}; use futures::{prelude::*, future::Executor, sync::mpsc}; +use futures03::{FutureExt as _, compat::Compat}; // Type aliases. // These exist mainly to avoid typing `::Foo` all over the code. @@ -264,7 +265,9 @@ impl OffchainWorker for C where pool: &Arc>, network_state: &Arc, ) -> error::Result + Send>> { - Ok(Box::new(offchain.on_block_imported(number, pool, network_state.clone()))) + let future = offchain.on_block_imported(number, pool, network_state.clone()) + .map(|()| Ok(())); + Ok(Box::new(Compat::new(future))) } } From 930725219d4badb98325f0c9c606b0bc34fabb8b Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 7 Aug 2019 21:34:17 +0200 Subject: [PATCH 2/2] Fix tests --- core/offchain/src/lib.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/offchain/src/lib.rs b/core/offchain/src/lib.rs index 57a898fb64962..075a2bd8375f6 100644 --- a/core/offchain/src/lib.rs +++ b/core/offchain/src/lib.rs @@ -152,8 +152,6 @@ fn spawn_worker(f: impl FnOnce() -> () + Send + 'static) { #[cfg(test)] mod tests { use super::*; - use primitives::{ed25519, sr25519}; - use futures::Future; use network::{Multiaddr, PeerId}; struct MockNetworkStateInfo(); @@ -179,7 +177,7 @@ mod tests { // when let offchain = OffchainWorkers::new(client, db); - futures::executor::block_on(offchain.on_block_imported(&0u64, &pool, mock.clone())); + futures::executor::block_on(offchain.on_block_imported(&0u64, &pool, network_state)); // then assert_eq!(pool.status().ready, 1);