Skip to content

Commit

Permalink
Make chain && state RPCs async (paritytech#3480)
Browse files Browse the repository at this point in the history
* chain+state RPCs are async now

* wrapped too long lines

* create full/light RPC impls from service

* use ordering

* post-merge fix
  • Loading branch information
svyatonik authored and jimpo committed Sep 2, 2019
1 parent c2a9c24 commit e4dacbe
Show file tree
Hide file tree
Showing 26 changed files with 1,838 additions and 700 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions core/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ fnv = { version = "1.0", optional = true }
log = { version = "0.4", optional = true }
parking_lot = { version = "0.9.0", optional = true }
hex = { package = "hex-literal", version = "0.2", optional = true }
futures-preview = { version = "=0.3.0-alpha.17", optional = true }
futures = { version = "0.1", optional = true }
futures03 = { package = "futures-preview", version = "=0.3.0-alpha.17", features = ["compat"], optional = true }
consensus = { package = "substrate-consensus-common", path = "../consensus/common", optional = true }
executor = { package = "substrate-executor", path = "../executor", optional = true }
state-machine = { package = "substrate-state-machine", path = "../state-machine", optional = true }
Expand Down Expand Up @@ -49,7 +50,8 @@ std = [
"fnv",
"log",
"hex",
"futures-preview",
"futures",
"futures03",
"executor",
"state-machine",
"keyring",
Expand Down
5 changes: 5 additions & 0 deletions core/client/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

//! Substrate Client data backend
use std::sync::Arc;
use std::collections::HashMap;
use crate::error;
use crate::light::blockchain::RemoteBlockchain;
use primitives::ChangesTrieConfiguration;
use sr_primitives::{generic::BlockId, Justification, StorageOverlay, ChildrenStorageOverlay};
use sr_primitives::traits::{Block as BlockT, NumberFor};
Expand Down Expand Up @@ -303,4 +305,7 @@ where
{
/// Returns true if the state for given block is available locally.
fn is_local_state_available(&self, block: &BlockId<Block>) -> bool;
/// Returns reference to blockchain backend that either resolves blockchain data
/// locally, or prepares request to fetch that data from remote node.
fn remote_blockchain(&self) -> Arc<dyn RemoteBlockchain<Block>>;
}
2 changes: 1 addition & 1 deletion core/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
panic::UnwindSafe, result, cell::RefCell, rc::Rc,
};
use log::{info, trace, warn};
use futures::channel::mpsc;
use futures03::channel::mpsc;
use parking_lot::{Mutex, RwLock};
use codec::{Encode, Decode};
use hash_db::{Hasher, Prefix};
Expand Down
4 changes: 4 additions & 0 deletions core/client/src/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,10 @@ where
.map(|num| num.is_zero())
.unwrap_or(false)
}

fn remote_blockchain(&self) -> Arc<dyn crate::light::blockchain::RemoteBlockchain<Block>> {
unimplemented!()
}
}

/// Prunable in-memory changes trie storage.
Expand Down
10 changes: 7 additions & 3 deletions core/client/src/light/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ impl<S, F, Block, H> ClientBackend<Block, H> for Backend<S, F, H> where
impl<S, F, Block, H> RemoteBackend<Block, H> for Backend<S, F, H>
where
Block: BlockT,
S: BlockchainStorage<Block>,
F: Fetcher<Block>,
S: BlockchainStorage<Block> + 'static,
F: Fetcher<Block> + 'static,
H: Hasher<Out=Block::Hash>,
H::Out: Ord,
{
Expand All @@ -242,6 +242,10 @@ where
.map(|num| num.is_zero())
.unwrap_or(false)
}

fn remote_blockchain(&self) -> Arc<dyn crate::light::blockchain::RemoteBlockchain<Block>> {
self.blockchain.clone()
}
}

impl<S, F, Block, H> BlockImportOperation<Block, H> for ImportOperation<Block, S, F, H>
Expand Down Expand Up @@ -358,7 +362,7 @@ where
*self.cached_header.write() = Some(cached_header);
}

futures::executor::block_on(
futures03::executor::block_on(
self.fetcher.upgrade().ok_or(ClientError::NotAvailableOnLightClient)?
.remote_read(RemoteReadRequest {
block: self.block,
Expand Down
112 changes: 84 additions & 28 deletions core/client/src/light/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
//! Light client blockchain backend. Only stores headers and justifications of recent
//! blocks. CHT roots are stored for headers of ancient blocks.
use std::future::Future;
use std::{sync::{Weak, Arc}, collections::HashMap};
use parking_lot::Mutex;

Expand Down Expand Up @@ -72,6 +73,27 @@ pub trait Storage<Block: BlockT>: AuxStore + BlockchainHeaderBackend<Block> {
fn cache(&self) -> Option<Arc<dyn BlockchainCache<Block>>>;
}

/// Remote header.
#[derive(Debug)]
pub enum LocalOrRemote<Data, Request> {
/// When data is available locally, it is returned.
Local(Data),
/// When data is unavailable locally, the request to fetch it from remote node is returned.
Remote(Request),
/// When data is unknown.
Unknown,
}

/// Futures-based blockchain backend that either resolves blockchain data
/// locally, or fetches required data from remote node.
pub trait RemoteBlockchain<Block: BlockT>: Send + Sync {
/// Get block header.
fn header(&self, id: BlockId<Block>) -> ClientResult<LocalOrRemote<
Block::Header,
RemoteHeaderRequest<Block::Header>,
>>;
}

/// Light client blockchain.
pub struct Blockchain<S, F> {
fetcher: Mutex<Weak<F>>,
Expand Down Expand Up @@ -105,32 +127,10 @@ impl<S, F> Blockchain<S, F> {

impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Block: BlockT, S: Storage<Block>, F: Fetcher<Block> {
fn header(&self, id: BlockId<Block>) -> ClientResult<Option<Block::Header>> {
match self.storage.header(id)? {
Some(header) => Ok(Some(header)),
None => {
let number = match id {
BlockId::Hash(hash) => match self.storage.number(hash)? {
Some(number) => number,
None => return Ok(None),
},
BlockId::Number(number) => number,
};

// if the header is from future or genesis (we never prune genesis) => return
if number.is_zero() || self.storage.status(BlockId::Number(number))? == BlockStatus::Unknown {
return Ok(None);
}

futures::executor::block_on(
self.fetcher().upgrade()
.ok_or(ClientError::NotAvailableOnLightClient)?
.remote_header(RemoteHeaderRequest {
cht_root: self.storage.header_cht_root(cht::size(), number)?,
block: number,
retry_count: None,
})
).map(Some)
}
match RemoteBlockchain::header(self, id)? {
LocalOrRemote::Local(header) => Ok(Some(header)),
LocalOrRemote::Remote(_) => Err(ClientError::NotAvailableOnLightClient),
LocalOrRemote::Unknown => Ok(None),
}
}

Expand All @@ -153,12 +153,12 @@ impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Bloc

impl<S, F, Block> BlockchainBackend<Block> for Blockchain<S, F> where Block: BlockT, S: Storage<Block>, F: Fetcher<Block> {
fn body(&self, id: BlockId<Block>) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
let header = match self.header(id)? {
let header = match BlockchainHeaderBackend::header(self, id)? {
Some(header) => header,
None => return Ok(None),
};

futures::executor::block_on(
futures03::executor::block_on(
self.fetcher().upgrade().ok_or(ClientError::NotAvailableOnLightClient)?
.remote_body(RemoteBodyRequest {
header,
Expand Down Expand Up @@ -194,6 +194,62 @@ impl<S: Storage<Block>, F, Block: BlockT> ProvideCache<Block> for Blockchain<S,
}
}

impl<S, F, Block: BlockT> RemoteBlockchain<Block> for Blockchain<S, F>
where
S: Storage<Block>,
F: Fetcher<Block> + Send + Sync,
{
fn header(&self, id: BlockId<Block>) -> ClientResult<LocalOrRemote<
Block::Header,
RemoteHeaderRequest<Block::Header>,
>> {
// first, try to read header from local storage
if let Some(local_header) = self.storage.header(id)? {
return Ok(LocalOrRemote::Local(local_header));
}

// we need to know block number to check if it's a part of CHT
let number = match id {
BlockId::Hash(hash) => match self.storage.number(hash)? {
Some(number) => number,
None => return Ok(LocalOrRemote::Unknown),
},
BlockId::Number(number) => number,
};

// if the header is genesis (never pruned), non-canonical, or from future => return
if number.is_zero() || self.storage.status(BlockId::Number(number))? == BlockStatus::Unknown {
return Ok(LocalOrRemote::Unknown);
}

Ok(LocalOrRemote::Remote(RemoteHeaderRequest {
cht_root: self.storage.header_cht_root(cht::size(), number)?,
block: number,
retry_count: None,
}))
}
}

/// Returns future that resolves header either locally, or remotely.
pub fn future_header<Block: BlockT, F: Fetcher<Block>>(
blockchain: &dyn RemoteBlockchain<Block>,
fetcher: &F,
id: BlockId<Block>,
) -> impl Future<Output = Result<Option<Block::Header>, ClientError>> {
use futures03::future::{ready, Either, FutureExt};

match blockchain.header(id) {
Ok(LocalOrRemote::Remote(request)) => Either::Left(
fetcher
.remote_header(request)
.then(|header| ready(header.map(Some)))
),
Ok(LocalOrRemote::Unknown) => Either::Right(ready(Ok(None))),
Ok(LocalOrRemote::Local(local_header)) => Either::Right(ready(Ok(Some(local_header)))),
Err(err) => Either::Right(ready(Err(err))),
}
}

#[cfg(test)]
pub mod tests {
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion core/client/src/light/call_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
let block_hash = self.blockchain.expect_block_hash_from_id(id)?;
let block_header = self.blockchain.expect_header(id.clone())?;

futures::executor::block_on(self.fetcher.remote_call(RemoteCallRequest {
futures03::executor::block_on(self.fetcher.remote_call(RemoteCallRequest {
block: block_hash,
header: block_header,
method: method.into(),
Expand Down
16 changes: 8 additions & 8 deletions core/client/src/light/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,15 @@ pub struct RemoteBodyRequest<Header: HeaderT> {
/// is correct (see FetchedDataChecker) and return already checked data.
pub trait Fetcher<Block: BlockT>: Send + Sync {
/// Remote header future.
type RemoteHeaderResult: Future<Output = Result<Block::Header, ClientError>>;
type RemoteHeaderResult: Future<Output = Result<Block::Header, ClientError>> + Send + 'static;
/// Remote storage read future.
type RemoteReadResult: Future<Output = Result<Option<Vec<u8>>, ClientError>>;
type RemoteReadResult: Future<Output = Result<Option<Vec<u8>>, ClientError>> + Send + 'static;
/// Remote call result future.
type RemoteCallResult: Future<Output = Result<Vec<u8>, ClientError>>;
type RemoteCallResult: Future<Output = Result<Vec<u8>, ClientError>> + Send + 'static;
/// Remote changes result future.
type RemoteChangesResult: Future<Output = Result<Vec<(NumberFor<Block>, u32)>, ClientError>>;
type RemoteChangesResult: Future<Output = Result<Vec<(NumberFor<Block>, u32)>, ClientError>> + Send + 'static;
/// Remote block body result future.
type RemoteBodyResult: Future<Output = Result<Vec<Block::Extrinsic>, ClientError>>;
type RemoteBodyResult: Future<Output = Result<Vec<Block::Extrinsic>, ClientError>> + Send + 'static;

/// Fetch remote header.
fn remote_header(&self, request: RemoteHeaderRequest<Block::Header>) -> Self::RemoteHeaderResult;
Expand Down Expand Up @@ -493,7 +493,7 @@ impl<'a, H, Number, Hash> ChangesTrieRootsStorage<H, Number> for RootsStorage<'a

#[cfg(test)]
pub mod tests {
use futures::future::Ready;
use futures03::future::Ready;
use parking_lot::Mutex;
use codec::Decode;
use crate::client::tests::prepare_client_with_key_changes;
Expand Down Expand Up @@ -521,7 +521,7 @@ pub mod tests {
where
E: std::convert::From<&'static str>,
{
futures::future::ready(Err("Not implemented on test node".into()))
futures03::future::ready(Err("Not implemented on test node".into()))
}

impl Fetcher<Block> for OkCallFetcher {
Expand All @@ -544,7 +544,7 @@ pub mod tests {
}

fn remote_call(&self, _request: RemoteCallRequest<Header>) -> Self::RemoteCallResult {
futures::future::ready(Ok((*self.lock()).clone()))
futures03::future::ready(Ok((*self.lock()).clone()))
}

fn remote_changes(&self, _request: RemoteChangesRequest<Header>) -> Self::RemoteChangesResult {
Expand Down
4 changes: 2 additions & 2 deletions core/client/src/light/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ pub fn new_light<B, S, F, GS, RA, E>(
>, B, RA>>
where
B: BlockT<Hash=H256>,
S: BlockchainStorage<B>,
F: Fetcher<B>,
S: BlockchainStorage<B> + 'static,
F: Fetcher<B> + 'static,
GS: BuildStorage,
E: CodeExecutor<Blake2Hasher> + RuntimeInfo,
{
Expand Down
20 changes: 10 additions & 10 deletions core/client/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
};

use fnv::{FnvHashSet, FnvHashMap};
use futures::channel::mpsc;
use futures03::channel::mpsc;
use primitives::storage::{StorageKey, StorageData};
use sr_primitives::traits::Block as BlockT;

Expand Down Expand Up @@ -347,7 +347,7 @@ mod tests {
// given
let mut notifications = StorageNotifications::<Block>::default();
let child_filter = [(StorageKey(vec![4]), None)];
let mut recv = futures::executor::block_on_stream(
let mut recv = futures03::executor::block_on_stream(
notifications.listen(None, Some(&child_filter[..]))
);

Expand Down Expand Up @@ -382,13 +382,13 @@ mod tests {
// given
let mut notifications = StorageNotifications::<Block>::default();
let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))];
let mut recv1 = futures::executor::block_on_stream(
let mut recv1 = futures03::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![1])]), None)
);
let mut recv2 = futures::executor::block_on_stream(
let mut recv2 = futures03::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![2])]), None)
);
let mut recv3 = futures::executor::block_on_stream(
let mut recv3 = futures03::executor::block_on_stream(
notifications.listen(Some(&[]), Some(&child_filter))
);

Expand Down Expand Up @@ -429,16 +429,16 @@ mod tests {
let mut notifications = StorageNotifications::<Block>::default();
{
let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))];
let _recv1 = futures::executor::block_on_stream(
let _recv1 = futures03::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![1])]), None)
);
let _recv2 = futures::executor::block_on_stream(
let _recv2 = futures03::executor::block_on_stream(
notifications.listen(Some(&[StorageKey(vec![2])]), None)
);
let _recv3 = futures::executor::block_on_stream(
let _recv3 = futures03::executor::block_on_stream(
notifications.listen(None, None)
);
let _recv4 = futures::executor::block_on_stream(
let _recv4 = futures03::executor::block_on_stream(
notifications.listen(None, Some(&child_filter))
);
assert_eq!(notifications.listeners.len(), 2);
Expand All @@ -465,7 +465,7 @@ mod tests {
// given
let mut recv = {
let mut notifications = StorageNotifications::<Block>::default();
let recv = futures::executor::block_on_stream(notifications.listen(None, None));
let recv = futures03::executor::block_on_stream(notifications.listen(None, None));

// when
let changeset = vec![];
Expand Down
1 change: 1 addition & 0 deletions core/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ state_machine = { package = "substrate-state-machine", path = "../state-machine"
substrate-executor = { path = "../executor" }
substrate-keystore = { path = "../keystore" }
transaction_pool = { package = "substrate-transaction-pool", path = "../transaction-pool" }
hash-db = { version = "0.15.0", default-features = false }

[dev-dependencies]
assert_matches = "1.1"
Expand Down
Loading

0 comments on commit e4dacbe

Please sign in to comment.