From 55d372c0fd125aa6d996b7bacfdaf8461423f59a Mon Sep 17 00:00:00 2001 From: Guanghua Guo Date: Wed, 12 Sep 2018 20:38:31 +0800 Subject: [PATCH] Feature/telemetry (#28) * Add telemetry thread * Run telemetry * Format code --- Cargo.lock | 33 +++++++++++ Cargo.toml | 4 ++ src/cli.rs | 16 +++++- src/client.rs | 12 ++-- src/genesis_config.rs | 42 +++++++++----- src/main.rs | 28 +++++++-- src/rpc.rs | 2 - src/telemetry.rs | 128 ++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 236 insertions(+), 29 deletions(-) create mode 100644 src/telemetry.rs diff --git a/Cargo.lock b/Cargo.lock index 4b4949f6a884e..decad89f981f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -25,6 +25,11 @@ name = "ansi_term" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "ansi_term" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "ansi_term" version = "0.11.0" @@ -210,6 +215,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" name = "chainx" version = "0.1.0" dependencies = [ + "ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "chainx-api 0.1.0", "chainx-consensus 0.1.0", "chainx-executor 0.1.0", @@ -233,6 +239,7 @@ dependencies = [ "rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.75 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.75 (registry+https://github.com/rust-lang/crates.io-index)", + "slog 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-bft 0.1.0 (git+https://github.com/chainx-org/substrate)", "substrate-client 0.1.0 (git+https://github.com/chainx-org/substrate)", "substrate-client-db 0.1.0 (git+https://github.com/chainx-org/substrate)", @@ -246,6 +253,8 @@ dependencies = [ "substrate-runtime-primitives 0.1.0 (git+https://github.com/chainx-org/substrate)", "substrate-state-db 0.1.0 (git+https://github.com/chainx-org/substrate)", "substrate-state-machine 0.1.0 (git+https://github.com/chainx-org/substrate)", + "substrate-telemetry 0.3.0 (git+https://github.com/chainx-org/substrate)", + "sysinfo 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2026,6 +2035,16 @@ dependencies = [ "rayon-core 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rayon" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-deque 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "either 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon-core 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rayon-core" version = "1.4.1" @@ -2987,6 +3006,17 @@ dependencies = [ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "sysinfo" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "take" version = "0.1.0" @@ -3661,6 +3691,7 @@ dependencies = [ [metadata] "checksum aho-corasick 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)" = "68f56c7353e5a9547cbd76ed90f7bb5ffc3ba09d4ea9bd1d8c06c8b1142eeb5a" "checksum aio-limited 0.1.0 (git+https://github.com/paritytech/aio-limited.git)" = "" +"checksum ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6b3568b48b7cefa6b8ce125f9bb4989e52fbcc29ebea88df04cc7c5f12f70455" "checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" "checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6" "checksum arrayref 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "0d382e583f07208808f6b1249e60848879ba3543f57c32277bf52d69c2f0f0ee" @@ -3852,6 +3883,7 @@ dependencies = [ "checksum rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e464cd887e869cddcae8792a4ee31d23c7edd516700695608f5b98c67ee0131c" "checksum rand_core 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "edecf0f94da5551fc9b492093e30b041a891657db7940ee221f9d2f66e82eef2" "checksum rayon 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b614fe08b6665cb9a231d07ac1364b0ef3cb3698f1239ee0c4c3a88a524f54c8" +"checksum rayon 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "df7a791f788cb4c516f0e091301a29c2b71ef680db5e644a7d68835c8ae6dbfa" "checksum rayon-core 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b055d1e92aba6877574d8fe604a63c8b5df60f60e5982bf7ccbb1338ea527356" "checksum redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)" = "c214e91d3ecf43e9a4e41e578973adeb14b474f2bee858742d127af75a0112b1" "checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" @@ -3935,6 +3967,7 @@ dependencies = [ "checksum substrate-state-machine 0.1.0 (git+https://github.com/chainx-org/substrate)" = "" "checksum substrate-telemetry 0.3.0 (git+https://github.com/chainx-org/substrate)" = "" "checksum syn 0.14.9 (registry+https://github.com/rust-lang/crates.io-index)" = "261ae9ecaa397c42b960649561949d69311f08eeaea86a65696e6e46517cf741" +"checksum sysinfo 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)" = "394abcf30852ac00878ab01642b13668db48d166d945f250c7bdbb9e12d75ad0" "checksum take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b157868d8ac1f56b64604539990685fa7611d8fa9e5476cf0c02cf34d32917c5" "checksum take_mut 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" "checksum tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" diff --git a/Cargo.toml b/Cargo.toml index 988015a941698..d3b0a848f0d24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ substrate-client-db = { git = "https://github.com/chainx-org/substrate" } substrate-keyring = { git = "https://github.com/chainx-org/substrate" } substrate-state-db = { git = "https://github.com/chainx-org/substrate" } substrate-state-machine = { git = "https://github.com/chainx-org/substrate" } +substrate-telemetry = { git = "https://github.com/chainx-org/substrate" } substrate-codec = { git = "https://github.com/chainx-org/substrate", default_features = false } substrate-bft = { git = "https://github.com/chainx-org/substrate", default_features = false } substrate-rpc-servers = { git = "https://github.com/chainx-org/substrate" } @@ -38,8 +39,11 @@ rhododendron = "0.3" hex-literal = "0.1" exit-future = "0.1" futures = "0.1.17" +ansi_term = "0.10" +sysinfo = "0.5.7" tokio = "0.1.7" clap = "2.30.0" +slog = "^2" log = "0.3" [workspace] diff --git a/src/cli.rs b/src/cli.rs index 981872673db19..661483d9d9e65 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -28,6 +28,20 @@ pub fn build_cli() -> App<'static, 'static> { .help("Specify p2p protocol TCP port") .takes_value(true), ) + .arg( + Arg::with_name("telemetry-url") + .long("telemetry-url") + .value_name("TELEMETRY_URL") + .help("The URL of the telemetry server. Implies --telemetry") + .takes_value(true), + ) + .arg( + Arg::with_name("telemetry") + .long("telemetry") + .value_name("TELEMETRY") + .help("Should connect telemetry") + .takes_value(false), + ) .arg( Arg::with_name("bootnodes") .long("bootnodes") @@ -73,7 +87,7 @@ pub fn parse_address( default: &str, port_param: &str, matches: &ArgMatches, - ) -> Result { +) -> Result { let mut address: SocketAddr = default.parse().ok().ok_or_else(|| { format!("Invalid address specified for --{}.", port_param) })?; diff --git a/src/client.rs b/src/client.rs index 0fb9396623ca1..5a5731e3a6596 100644 --- a/src/client.rs +++ b/src/client.rs @@ -23,12 +23,10 @@ pub fn build_client(db_path: &str, chainspec: ChainSpec) -> Arc { pruning: state_db::PruningMode::default(), }, FINALIZATION_WINDOW, - ).unwrap(), - ); + ).unwrap(), + ); - let executor = substrate_client::LocalCallExecutor::new( - backend.clone(), - NativeExecutor::new()); + let executor = substrate_client::LocalCallExecutor::new(backend.clone(), NativeExecutor::new()); let genesis_config = super::genesis_config::testnet_genesis(chainspec); @@ -39,6 +37,6 @@ pub fn build_client(db_path: &str, chainspec: ChainSpec) -> Arc { executor, genesis_config, ExecutionStrategy::NativeWhenPossible, - ).unwrap(), - ) + ).unwrap(), + ) } diff --git a/src/genesis_config.rs b/src/genesis_config.rs index a1e6ccc492791..74369b81e9899 100644 --- a/src/genesis_config.rs +++ b/src/genesis_config.rs @@ -1,27 +1,36 @@ // Copyright 2018 chainpool -use chainx_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfig,SessionConfig, StakingConfig, TimestampConfig,BalancesConfig}; +use chainx_runtime::{GenesisConfig, ConsensusConfig, CouncilConfig, DemocracyConfig, + SessionConfig, StakingConfig, TimestampConfig, BalancesConfig}; use super::cli::ChainSpec; use keyring::Keyring; use ed25519; pub fn testnet_genesis(chainspec: ChainSpec) -> GenesisConfig { - let auth1 = ed25519::Pair::from_seed(b"Alice ").public().into(); - let auth2 = ed25519::Pair::from_seed(b"Bob ").public().into(); - let auth3 = ed25519::Pair::from_seed(b"Gavin ").public().into(); - let auth4 = ed25519::Pair::from_seed(b"Satoshi ").public().into(); + let auth1 = ed25519::Pair::from_seed(b"Alice ") + .public() + .into(); + let auth2 = ed25519::Pair::from_seed(b"Bob ") + .public() + .into(); + let auth3 = ed25519::Pair::from_seed(b"Gavin ") + .public() + .into(); + let auth4 = ed25519::Pair::from_seed(b"Satoshi ") + .public() + .into(); let initial_authorities = match chainspec { - ChainSpec::Dev => vec![auth1,], - ChainSpec::Local => vec![auth1, auth2,], + ChainSpec::Dev => vec![auth1], + ChainSpec::Local => vec![auth1, auth2], ChainSpec::Multi => vec![auth1, auth2, auth3, auth4], }; GenesisConfig { consensus: Some(ConsensusConfig { code: include_bytes!( - "../runtime/wasm/target/wasm32-unknown-unknown/release/chainx_runtime.compact.wasm" - ).to_vec(), - authorities: initial_authorities.clone(), + "../runtime/wasm/target/wasm32-unknown-unknown/release/chainx_runtime.compact.wasm" + ).to_vec(), + authorities: initial_authorities.clone(), }), system: None, balances: Some(BalancesConfig { @@ -32,13 +41,18 @@ pub fn testnet_genesis(chainspec: ChainSpec) -> GenesisConfig { creation_fee: 0, reclaim_rebate: 0, balances: vec![ - (Keyring::Alice.to_raw_public().into(),10000), - (Keyring::Bob.to_raw_public().into(),10000), - (Keyring::Charlie.to_raw_public().into(),10000)], + (Keyring::Alice.to_raw_public().into(), 10000), + (Keyring::Bob.to_raw_public().into(), 10000), + (Keyring::Charlie.to_raw_public().into(), 10000), + ], }), session: Some(SessionConfig { - validators: initial_authorities.iter().cloned().map(Into::into).collect(), + validators: initial_authorities + .iter() + .cloned() + .map(Into::into) + .collect(), session_length: 720, // that's 1 hour per session. }), staking: Some(StakingConfig { diff --git a/src/main.rs b/src/main.rs index e04dd0ee79d8f..b1e78d61a67fc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,8 @@ extern crate substrate_network; extern crate substrate_network_libp2p; extern crate substrate_primitives; extern crate substrate_rpc_servers as rpc_server; +#[macro_use] +extern crate substrate_telemetry as tel; extern crate substrate_runtime_primitives; extern crate substrate_state_db as state_db; extern crate substrate_state_machine as state_machine; @@ -31,15 +33,20 @@ extern crate hex_literal; extern crate jsonrpc_http_server; extern crate jsonrpc_ws_server; extern crate rhododendron; +extern crate ansi_term; +extern crate sysinfo; extern crate tokio; #[macro_use] +extern crate slog; +#[macro_use] extern crate log; -mod cli; -mod client; mod genesis_config; +mod telemetry; mod network; +mod client; mod rpc; +mod cli; use substrate_client::BlockchainEvents; @@ -74,7 +81,7 @@ fn main() { let port = match matches.value_of("port") { Some(port) => port .parse() - .map_err(|_| "Invalid p2p port value specified.") + .map_err(|_| "invalid p2p port value specified.") .unwrap(), None => 20222, }; @@ -147,7 +154,7 @@ fn main() { } }; - let consensus_net = ConsensusNetwork::new(network, client.clone()); + let consensus_net = ConsensusNetwork::new(network.clone(), client.clone()); Some(consensus::Service::new( client.clone(), client.clone(), @@ -162,6 +169,17 @@ fn main() { let (_rpc_http, _rpc_ws) = rpc::start(&client, &task_executor, &matches, &extrinsic_pool); - let _ = runtime.block_on(exit); + if matches.is_present("telemetry") { + let telemetry_url = match matches.value_of("telemetry_url") { + Some(url) => Some(url.to_owned()), + None => Some("http://aws.chainx.org:8888".to_owned()), + }; + let _telemetry = telemetry::build_telemetry(telemetry_url, validator_mode); + telemetry::run_telemetry(network, client, extrinsic_pool, task_executor); + let _ = runtime.block_on(exit.clone()); + } else { + let _ = runtime.block_on(exit); + } + exit_send.fire(); } diff --git a/src/rpc.rs b/src/rpc.rs index 725133881d6d7..30e6acac2b840 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -2,8 +2,6 @@ use jsonrpc_http_server::Server as HttpServer; use jsonrpc_ws_server::Server as WsServer; -use chainx_rpc::chainext::ChainExt; -use rpc_server::apis::chain::Chain; use chainx_pool::TransactionPool; use tokio::runtime::TaskExecutor; use chainx_api::ChainXApi; diff --git a/src/telemetry.rs b/src/telemetry.rs new file mode 100644 index 0000000000000..4fdc8c64b178a --- /dev/null +++ b/src/telemetry.rs @@ -0,0 +1,128 @@ +// Copyright 2018 chainpool + +use substrate_network::{TransactionPool, SyncState, SyncProvider}; +use substrate_runtime_primitives::traits::{Header, As}; +use substrate_client::BlockchainEvents; +use tel; + +use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; +use std::time::{Duration, Instant}; +use tokio::runtime::TaskExecutor; +use tokio::prelude::{Future, Stream}; +use tokio::timer::Interval; +use ansi_term::Colour; + +const TIMER_INTERVAL_MS: u64 = 5000; + +pub fn build_telemetry( + telemetry_url: Option, + is_authority: bool, +) -> Option { + let telemetry = match telemetry_url { + Some(url) => { + Some(tel::init_telemetry(tel::TelemetryConfig { + url: url, + on_connect: Box::new(move || { + telemetry!("system.connected"; + "name" => "chainx", + "implementation" => "chainx", + "version" => "0.1", + "config" => "", + "chain" => "chainx", + "authority" => is_authority + ); + }), + })) + } + None => None, + }; + telemetry +} + +pub fn run_telemetry( + network: ::Arc<::chainx_network::NetworkService>, + client: ::Arc<::client::TClient>, + _txpool: ::Arc>, + handle: TaskExecutor, +) { + let interval = Interval::new(Instant::now(), Duration::from_millis(TIMER_INTERVAL_MS)); + + let mut last_number = None; + let mut sys = System::new(); + let self_pid = get_current_pid(); + let client1 = client.clone(); + let display_notifications = interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| { + let sync_status = network.status(); + if let Ok(best_block) = client1.best_block_header() { + let hash = best_block.hash(); + let num_peers = sync_status.num_peers; + let best_number: u64 = best_block.number().as_(); + let speed = move || speed(best_number, last_number); + let (status, target) = + match (sync_status.sync.state, sync_status.sync.best_seen_block) { + (SyncState::Idle, _) => ("Idle".into(), "".into()), + (SyncState::Downloading, None) => (format!("Syncing{}", speed()), "".into()), + (SyncState::Downloading, Some(n)) => + (format!("Syncing{}", speed()), format!(", target=#{}", n)), + }; + last_number = Some(best_number); + info!( + target: "substrate", + "{}{} ({} peers), best: #{} ({})", + Colour::White.bold().paint(&status), + target, + Colour::White.bold().paint(format!("{}", sync_status.num_peers)), + Colour::White.paint(format!("{}", best_number)), + hash + ); + + // get cpu usage and memory usage of this process + let (cpu_usage, memory) = if sys.refresh_process(self_pid) { + let proc = sys.get_process(self_pid).expect("Above refresh_process succeeds, this should be Some(), qed"); + (proc.cpu_usage(), proc.memory()) + } else { (0.0, 0) }; + telemetry!( + "system.interval"; + "status" => format!("{}{}", status, target), + "peers" => num_peers, + "height" => best_number, + "best" => ?hash, + "cpu" => cpu_usage, + "memory" => memory + ); + } else { + warn!("Error getting best block information"); + } + Ok(()) + }); + + let display_block_import = client.import_notification_stream().for_each(|n| { + info!(target: "substrate", "Imported #{} ({})", n.header.number(), n.hash); + Ok(()) + }); + + /*let display_txpool_import = txpool.import_notification_stream().for_each(move |_| { + let status = txpool.light_status(); + telemetry!("txpool.import"; + "mem_usage" => status.mem_usage, + "count" => status.transaction_count, + "sender" => status.senders); + Ok(()) + });*/ + + let informant_work = display_notifications.join(display_block_import); + handle.spawn(informant_work.map(|_| ())); +} + +fn speed(best_number: u64, last_number: Option) -> String { + let speed = match last_number { + Some(num) => (best_number.saturating_sub(num) * 10_000 / TIMER_INTERVAL_MS) as f64, + None => 0.0, + }; + + if speed < 1.0 { + "".into() + } else { + format!(" {:4.1} bps", speed / 10.0) + } +}