Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Chain head subscription (#126)
Browse files Browse the repository at this point in the history
* Start WebSockets server.

* Expose non-working subscription.

* Dummy subscription for testing.

* Proper implementation with event loop.

* Finalized pubsub.

* Bump clap.

* Fix yml.

* Disable WS logs.

* Remove stale TransactionHash mention

* Fix build from nightly API change.

* Don't panic on invalid port.

* Bind server to random port.

* Send only best blocks.
  • Loading branch information
tomusdrw authored and gavofyork committed Apr 17, 2018
1 parent d153954 commit 98d2777
Show file tree
Hide file tree
Showing 18 changed files with 462 additions and 105 deletions.
137 changes: 96 additions & 41 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#!/bin/sh

# NOTE `cargo install wasm-gc` before running this script.

set -e
export CARGO_INCREMENTAL=0

cd demo/runtime/wasm && ./build.sh && cd ../../..
cd substrate/executor/wasm && ./build.sh && cd ../../..
cd substrate/test-runtime/wasm && ./build.sh && cd ../../..
Expand Down
7 changes: 5 additions & 2 deletions demo/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ description = "Substrate Demo node implementation in Rust."

[dependencies]
clap = { version = "2.27", features = ["yaml"] }
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
ed25519 = { path = "../../substrate/ed25519" }
env_logger = "0.4"
futures = "0.1.17"
error-chain = "0.11"
log = "0.3"
hex-literal = "0.1"
log = "0.3"
tokio-core = "0.1.12"
triehash = "0.1"
ed25519 = { path = "../../substrate/ed25519" }
substrate-client = { path = "../../substrate/client" }
substrate-codec = { path = "../../substrate/codec" }
substrate-runtime-io = { path = "../../substrate/runtime-io" }
Expand Down
45 changes: 32 additions & 13 deletions demo/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
#![warn(missing_docs)]

extern crate env_logger;
extern crate ctrlc;
extern crate ed25519;
extern crate env_logger;
extern crate futures;
extern crate tokio_core;
extern crate triehash;
extern crate substrate_codec as codec;
extern crate substrate_runtime_io as runtime_io;
extern crate substrate_state_machine as state_machine;
extern crate substrate_client as client;
extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives;
extern crate substrate_rpc;
extern crate substrate_rpc_servers as rpc;
extern crate demo_primitives;
extern crate substrate_runtime_io as runtime_io;
extern crate substrate_state_machine as state_machine;
extern crate demo_executor;
extern crate demo_primitives;
extern crate demo_runtime;

#[macro_use]
Expand All @@ -44,11 +47,12 @@ extern crate log;
pub mod error;

use std::sync::Arc;
use client::genesis;
use codec::Slicable;
use runtime_io::with_externalities;
use demo_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfig,
SessionConfig, StakingConfig, BuildExternalities};
use client::genesis;
use futures::{Future, Sink, Stream};


struct DummyPool;
impl substrate_rpc::author::AuthorApi for DummyPool {
Expand Down Expand Up @@ -128,15 +132,30 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
(primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
};
let client = Arc::new(client::new_in_mem(executor, prepare_genesis)?);

let address = "127.0.0.1:9933".parse().unwrap();
let handler = rpc::rpc_handler(client.clone(), DummyPool, client);
let server = rpc::start_http(&address, handler)?;
let mut core = ::tokio_core::reactor::Core::new().expect("Unable to spawn event loop.");

let _rpc_servers = {
let handler = || {
let chain = rpc::apis::chain::Chain::new(client.clone(), core.remote());
rpc::rpc_handler(client.clone(), chain, DummyPool)
};
let http_address = "127.0.0.1:9933".parse().unwrap();
let ws_address = "127.0.0.1:9944".parse().unwrap();

(
rpc::start_http(&http_address, handler())?,
rpc::start_ws(&ws_address, handler())?
)
};

if let Some(_) = matches.subcommand_matches("validator") {
info!("Starting validator.");
server.wait();
return Ok(());
let (exit_send, exit) = futures::sync::mpsc::channel(1);
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).wait().expect("Error sending exit notification");
});
core.run(exit.into_future()).expect("Error running informant event loop");
return Ok(())
}

println!("No command given.\n");
Expand Down
3 changes: 1 addition & 2 deletions polkadot/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,6 @@ impl<S: state_machine::Backend> BlockBuilder for ClientBlockBuilder<S>
#[cfg(test)]
mod tests {
use super::*;
use runtime_io::with_externalities;
use keyring::Keyring;
use codec::Slicable;
use client::in_mem::Backend as InMemory;
Expand Down Expand Up @@ -388,7 +387,7 @@ mod tests {
::client::new_in_mem(
LocalDispatch::new(),
|| {
let mut storage = genesis_config.build_externalities();
let storage = genesis_config.build_externalities();
let block = ::client::genesis::construct_genesis_block(&storage);
(substrate_primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
}
Expand Down
7 changes: 6 additions & 1 deletion polkadot/cli/src/cli.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ args:
- rpc-port:
long: rpc-port
value_name: PORT
help: Specify RPC server TCP port
help: Specify HTTP RPC server TCP port
takes_value: true
- ws-port:
long: ws-port
value_name: PORT
help: Specify WebSockets RPC server TCP port
takes_value: true
- bootnodes:
long: bootnodes
Expand Down
81 changes: 58 additions & 23 deletions polkadot/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ extern crate log;
pub mod error;
mod informant;

use std::path::{Path, PathBuf};
use std::io;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use futures::sync::mpsc;
use futures::{Sink, Future, Stream};
use tokio_core::reactor;
Expand Down Expand Up @@ -117,43 +118,76 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
});

config.roles = role;
config.network.boot_nodes = matches
.values_of("bootnodes")
.map_or(Default::default(), |v| v.map(|n| n.to_owned()).collect());
config.network.config_path = Some(network_path(&base_path).to_string_lossy().into());
config.network.net_config_path = config.network.config_path.clone();

let port = match matches.value_of("port") {
Some(port) => port.parse().expect("Invalid p2p port value specified."),
None => 30333,
};
config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port));
config.network.public_address = None;
config.network.client_version = format!("parity-polkadot/{}", crate_version!());
{
config.network.boot_nodes = matches
.values_of("bootnodes")
.map_or(Default::default(), |v| v.map(|n| n.to_owned()).collect());
config.network.config_path = Some(network_path(&base_path).to_string_lossy().into());
config.network.net_config_path = config.network.config_path.clone();

let port = match matches.value_of("port") {
Some(port) => port.parse().expect("Invalid p2p port value specified."),
None => 30333,
};
config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port));
config.network.public_address = None;
config.network.client_version = format!("parity-polkadot/{}", crate_version!());
}

config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect();

let service = service::Service::new(config)?;

let mut address: SocketAddr = "127.0.0.1:9933".parse().unwrap();
if let Some(port) = matches.value_of("rpc-port") {
let rpc_port: u16 = port.parse().expect("Invalid RPC port value specified.");
address.set_port(rpc_port);
}

let handler = rpc::rpc_handler(service.client(), service.transaction_pool(), service.client());
let _server = rpc::start_http(&address, handler)?;

informant::start(&service, core.handle());

let (exit_send, exit) = mpsc::channel(1);
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).wait().expect("Error sending exit notification");
});

let _rpc_servers = {
let http_address = parse_address("127.0.0.1:9933", "rpc-port", &matches)?;
let ws_address = parse_address("127.0.0.1:9944", "ws-port", &matches)?;

let handler = || {
let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
rpc::rpc_handler(service.client(), chain, service.transaction_pool())
};
(
start_server(http_address, |address| rpc::start_http(address, handler())),
start_server(ws_address, |address| rpc::start_ws(address, handler())),
)
};

core.run(exit.into_future()).expect("Error running informant event loop");
Ok(())
}

fn start_server<T, F>(mut address: SocketAddr, start: F) -> Result<T, io::Error> where
F: Fn(&SocketAddr) -> Result<T, io::Error>,
{
start(&address)
.or_else(|e| match e.kind() {
io::ErrorKind::AddrInUse |
io::ErrorKind::PermissionDenied => {
warn!("Unable to bind server to {}. Trying random port.", address);
address.set_port(0);
start(&address)
},
_ => Err(e),
})
}

fn parse_address(default: &str, port_param: &str, matches: &clap::ArgMatches) -> Result<SocketAddr, String> {
let mut address: SocketAddr = default.parse().unwrap();
if let Some(port) = matches.value_of(port_param) {
let port: u16 = port.parse().ok().ok_or(format!("Invalid port for --{} specified.", port_param))?;
address.set_port(port);
}

Ok(address)
}

fn keystore_path(base_path: &Path) -> PathBuf {
let mut path = base_path.to_owned();
path.push("keystore");
Expand Down Expand Up @@ -183,6 +217,7 @@ fn default_base_path() -> PathBuf {
fn init_logger(pattern: &str) {
let mut builder = env_logger::LogBuilder::new();
// Disable info logging by default for some modules:
builder.filter(Some("ws"), log::LogLevelFilter::Warn);
builder.filter(Some("hyper"), log::LogLevelFilter::Warn);
// Enable info for others.
builder.filter(None, log::LogLevelFilter::Info);
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use {error, in_mem, block_builder, runtime_io, bft};
pub type BlockchainEventStream = mpsc::UnboundedReceiver<BlockImportNotification>;

/// Polkadot Client
pub struct Client<B, E> where B: backend::Backend {
pub struct Client<B, E> {
backend: B,
executor: E,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification>>>,
Expand Down
7 changes: 5 additions & 2 deletions substrate/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]

[dependencies]
jsonrpc-core = { git = "https://github.com/paritytech/jsonrpc.git" }
jsonrpc-http-server = { git = "https://github.com/paritytech/jsonrpc.git" }
jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc.git" }
jsonrpc-ws-server = { git = "https://github.com/paritytech/jsonrpc.git" }
log = "0.3"
substrate-rpc = { path = "../rpc", version = "0.1" }
jsonrpc-core = { git="https://github.com/paritytech/jsonrpc.git" }
jsonrpc-http-server = { git="https://github.com/paritytech/jsonrpc.git" }
43 changes: 36 additions & 7 deletions substrate/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,63 @@
#[warn(missing_docs)]

extern crate substrate_rpc as apis;
pub extern crate substrate_rpc as apis;

extern crate jsonrpc_core as rpc;
extern crate jsonrpc_http_server as http;
extern crate jsonrpc_pubsub as pubsub;
extern crate jsonrpc_ws_server as ws;

#[macro_use]
extern crate log;

use std::io;

type Metadata = apis::metadata::Metadata;
type RpcHandler = pubsub::PubSubHandler<Metadata>;

/// Construct rpc `IoHandler`
pub fn rpc_handler<S, T, C>(state: S, transaction_pool: T, chain: C) -> rpc::IoHandler where
pub fn rpc_handler<S, C, A>(
state: S,
chain: C,
author: A,
) -> RpcHandler where
S: apis::state::StateApi,
T: apis::author::AuthorApi,
C: apis::chain::ChainApi,
C: apis::chain::ChainApi<Metadata=Metadata>,
A: apis::author::AuthorApi,
{
let mut io = rpc::IoHandler::new();
let mut io = pubsub::PubSubHandler::default();
io.extend_with(state.to_delegate());
io.extend_with(transaction_pool.to_delegate());
io.extend_with(chain.to_delegate());
io.extend_with(author.to_delegate());
io
}

/// Start HTTP server listening on given address.
pub fn start_http(
addr: &std::net::SocketAddr,
io: rpc::IoHandler,
io: RpcHandler,
) -> io::Result<http::Server> {
http::ServerBuilder::new(io)
.threads(4)
.rest_api(http::RestApi::Unsecure)
.cors(http::DomainsValidation::Disabled)
.start_http(addr)
}

/// Start WS server listening on given address.
pub fn start_ws(
addr: &std::net::SocketAddr,
io: RpcHandler,
) -> io::Result<ws::Server> {
ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| Metadata::new(context.sender()))
.start(addr)
.map_err(|err| match err {
ws::Error(ws::ErrorKind::Io(io), _) => io,
ws::Error(ws::ErrorKind::ConnectionClosed, _) => io::ErrorKind::BrokenPipe.into(),
ws::Error(e, _) => {
error!("{}", e);
io::ErrorKind::Other.into()
}
})
}
8 changes: 5 additions & 3 deletions substrate/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@ version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]

[dependencies]
parking_lot = "0.4"
log = "0.3"
error-chain = "0.11"
jsonrpc-core = { git="https://github.com/paritytech/jsonrpc.git" }
jsonrpc-macros = { git="https://github.com/paritytech/jsonrpc.git" }
jsonrpc-pubsub = { git="https://github.com/paritytech/jsonrpc.git" }
log = "0.3"
parking_lot = "0.4"
substrate-client = { path = "../client" }
substrate-executor = { path = "../executor" }
substrate-primitives = { path = "../primitives" }
substrate-state-machine = { path = "../state-machine" }
substrate-executor = { path = "../executor" }
tokio-core = "0.1.12"

[dev-dependencies]
assert_matches = "1.1"
Expand Down
2 changes: 2 additions & 0 deletions substrate/rpc/src/author/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

//! Authoring RPC module errors.
use client;
use rpc;

Expand Down
3 changes: 1 addition & 2 deletions substrate/rpc/src/author/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use primitives::block;
use substrate_executor as executor;
use super::*;
use super::error::*;

Expand All @@ -38,7 +37,7 @@ impl AsyncAuthorApi for DummyTxPool {

#[test]
fn submit_transaction_should_not_cause_error() {
let mut p = Arc::new(Mutex::new(DummyTxPool::default()));
let p = Arc::new(Mutex::new(DummyTxPool::default()));

assert_matches!(
AuthorApi::submit_extrinsic(&p, block::Extrinsic(vec![])),
Expand Down
Loading

0 comments on commit 98d2777

Please sign in to comment.