From 1974f94dc9ee0bf41089d956b7314448e5d1dfac Mon Sep 17 00:00:00 2001 From: nodebreaker-carlsagan Date: Thu, 2 Jan 2020 21:23:51 +0900 Subject: [PATCH] no-std or warm compatibility issues, grapana-data -source code reference and correction,applicable --- Cargo.lock | 66 +++++++ client/cli/src/lib.rs | 11 +- client/cli/src/params.rs | 6 +- client/service/src/builder.rs | 14 +- client/service/src/config.rs | 6 +- client/service/test/src/lib.rs | 2 +- utils/prometheus/Cargo.toml | 14 +- utils/prometheus/README.md | 281 ++++++++++++++++++++--------- utils/prometheus/src/lib.rs | 169 ++++++++++++----- utils/prometheus/src/metrics.rs | 19 +- utils/prometheus/src/networking.rs | 66 +++++++ 11 files changed, 497 insertions(+), 157 deletions(-) create mode 100644 utils/prometheus/src/networking.rs diff --git a/Cargo.lock b/Cargo.lock index 8022469e81549..282df55da6540 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3328,6 +3328,11 @@ name = "nohash-hasher" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "nom" +version = "1.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "nom" version = "4.2.3" @@ -4435,6 +4440,32 @@ dependencies = [ "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "procinfo" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "nom 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prometheus" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", + "procinfo 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "protobuf 2.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "spin 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "prost" version = "0.5.0" @@ -4870,6 +4901,14 @@ name = "rustc-hex" version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "rustc_version" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rustc_version" version = "0.2.3" @@ -5043,6 +5082,7 @@ dependencies = [ "rpassword 4.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "sc-client-api 2.0.0", "sc-network 0.8.0", + "sc-prometheus 2.0.0", "sc-service 2.0.0", "sc-telemetry 2.0.0", "sc-tracing 2.0.0", @@ -5578,6 +5618,21 @@ dependencies = [ "serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "sc-prometheus" +version = "2.0.0" +dependencies = [ + "async-std 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "derive_more 0.99.2 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "prometheus 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "sp-runtime 2.0.0", + "tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "sc-rpc" version = "2.0.0" @@ -5684,6 +5739,7 @@ dependencies = [ "sc-keystore 2.0.0", "sc-network 0.8.0", "sc-offchain 2.0.0", + "sc-prometheus 2.0.0", "sc-rpc 2.0.0", "sc-rpc-server 2.0.0", "sc-telemetry 2.0.0", @@ -5899,6 +5955,11 @@ dependencies = [ "core-foundation-sys 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "semver" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "semver" version = "0.6.0" @@ -8479,6 +8540,7 @@ dependencies = [ "checksum nix 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6c722bee1037d430d0f8e687bbdbf222f27cc6e4e68d5caf630857bb2b6dbdce" "checksum nodrop 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" "checksum nohash-hasher 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4e657a6ec97f9a3ba46f6f7034ea6db9fcd5b71d25ef1074b7bc03da49be0e8e" +"checksum nom 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a5b8c256fd9471521bcb84c3cdba98921497f1a331cbc15b8030fc63b82050ce" "checksum nom 4.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2ad2a91a8e869eeb30b9cb3119ae87773a8f4ae617f41b1eb9c154b2905f7bd6" "checksum num-bigint 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "f9c3f34cdd24f334cb265d9bf8bfa8a241920d026916785747a92f0e55541a1a" "checksum num-integer 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)" = "b85e541ef8255f6cf42bbfe4ef361305c6c135d10919ecc26126c4e5ae94bc09" @@ -8536,6 +8598,8 @@ dependencies = [ "checksum proc-macro-nested 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "369a6ed065f249a159e06c45752c780bda2fb53c995718f9e484d08daa9eb42e" "checksum proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)" = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" "checksum proc-macro2 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "9c9e470a8dc4aeae2dee2f335e8f533e2d4b347e1434e5671afc49b054592f27" +"checksum procinfo 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f42e8578852a3306838981aedad8c5642ba794929aa12af0c9eb6c072b77af6c" +"checksum prometheus 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5567486d5778e2c6455b1b90ff1c558f29e751fc018130fa182e15828e728af1" "checksum prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "96d14b1c185652833d24aaad41c5832b0be5616a590227c1fbff57c616754b23" "checksum prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "eb788126ea840817128183f8f603dce02cb7aea25c2a0b764359d8e20010702e" "checksum prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5e7dc378b94ac374644181a2247cebf59a6ec1c88b49ac77f3a94b86b79d0e11" @@ -8583,6 +8647,7 @@ dependencies = [ "checksum rust-argon2 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4ca4eaef519b494d1f2848fc602d18816fed808a981aedf4f1f00ceb7c9d32cf" "checksum rustc-demangle 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "4c691c0e608126e00913e33f0ccf3727d5fc84573623b8d65b2df340b5201783" "checksum rustc-hex 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "403bb3a286107a04825a5f82e1270acc1e14028d3d554d7a1e08914549575ab8" +"checksum rustc_version 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "c5f5376ea5e30ce23c03eb77cbe4962b988deead10910c372b226388b594c084" "checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" "checksum rustls 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b25a18b1bf7387f0145e7f8324e700805aade3842dd3db2e74e4cdeb4677c09e" "checksum rustversion 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c48f91977f4ef3be5358c15d131d3f663f6b4d7a112555bf3bf52ad23b6659e5" @@ -8600,6 +8665,7 @@ dependencies = [ "checksum sct 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e3042af939fca8c3453b7af0f1c66e533a15a86169e39de2657310ade8f98d3c" "checksum security-framework 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8ef2429d7cefe5fd28bd1d2ed41c944547d4ff84776f5935b456da44593a16df" "checksum security-framework-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e31493fc37615debb8c5090a7aeb4a9730bc61e77ab10b9af59f1a202284f895" +"checksum semver 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "d4f410fedcf71af0345d7607d246e7ad15faaadd49d240ee3b24e5dc21a820ac" "checksum semver 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a3186ec9e65071a2095434b1f5bb24838d4e8e130f584c790f6033c79943537" "checksum semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" "checksum semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index e2323694f1b51..36bf19eedd0ec 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -910,6 +910,7 @@ where let rpc_interface: &str = interface_str(cli.rpc_external, cli.unsafe_rpc_external, cli.validator)?; let ws_interface: &str = interface_str(cli.ws_external, cli.unsafe_ws_external, cli.validator)?; let grafana_interface: &str = if cli.grafana_external { "0.0.0.0" } else { "127.0.0.1" }; + let prometheus_interface: &str = if cli.prometheus_external { "0.0.0.0" } else { "127.0.0.1" }; config.rpc_http = Some(parse_address(&format!("{}:{}", rpc_interface, 9933), cli.rpc_port)?); config.rpc_ws = Some(parse_address(&format!("{}:{}", ws_interface, 9944), cli.ws_port)?); @@ -943,12 +944,10 @@ where config.tracing_receiver = cli.tracing_receiver.into(); // Override prometheus - match cli.prometheus_endpoint { - None => {config.prometheus_endpoint = None;}, - Some(x) => { - config.prometheus_endpoint = Some(parse_address(&format!("{}:{}", x, 33333), cli.prometheus_port)?); - } - } + if cli.prometheus_external { + config.prometheus_port = Some( + parse_address(&format!("{}:{}", prometheus_interface, 33333), cli.prometheus_port)? + )} // Imply forced authoring on --dev config.force_authoring = cli.shared_params.dev || cli.force_authoring; diff --git a/client/cli/src/params.rs b/client/cli/src/params.rs index 79de3384e3168..3fd521b1334b9 100644 --- a/client/cli/src/params.rs +++ b/client/cli/src/params.rs @@ -443,9 +443,9 @@ pub struct RunCmd { /// Prometheus exporter TCP port. #[structopt(long = "prometheus-port", value_name = "PORT")] pub prometheus_port: Option, - /// Prometheus exporter IP addr. - #[structopt(long = "prometheus-addr", value_name = "Local IP address")] - pub prometheus_endpoint: Option, + /// Prometheus exporter on/off external". + #[structopt(long = "prometheus-external")] + pub prometheus_external: bool, /// Listen to all Websocket interfaces. /// diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 062da1c655b77..17a9429c4d0d9 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -1116,11 +1116,15 @@ ServiceBuilder< telemetry }); // prometheus init - match config.prometheus_endpoint { - None => (), - Some(x) => { - let _prometheus = sc_prometheus::init_prometheus(x); - } + if let Some(port) = config.prometheus_port { + let future = select( + sc_prometheus::init_prometheus(port).boxed() + ,exit.clone() + ).map(|either| match either { + Either::Left((result, _)) => result.map_err(|_| ()), + Either::Right(_) => Ok(()) + }).compat(); + let _ = to_spawn_tx.unbounded_send(Box::new(future)); } // Grafana data source if let Some(port) = config.grafana_port { diff --git a/client/service/src/config.rs b/client/service/src/config.rs index d2bcca7a22b25..835f0fc39d215 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -75,8 +75,8 @@ pub struct Configuration { pub rpc_cors: Option>, /// Grafana data source http port. `None` if disabled. pub grafana_port: Option, - /// Promteheus IP addr. `None` if disabled. and defult port 33333 - pub prometheus_endpoint: Option, + /// Promteheus Port. `None` if disabled. and defult port 33333 + pub prometheus_port: Option, /// Telemetry service URL. `None` if disabled. pub telemetry_endpoints: Option, /// External WASM transport for the telemetry. If `Some`, when connection to a telemetry @@ -155,7 +155,7 @@ impl Configuration where rpc_ws: None, rpc_ws_max_connections: None, rpc_cors: Some(vec![]), - prometheus_endpoint: None, + prometheus_port: None, grafana_port: None, telemetry_endpoints: None, telemetry_external_transport: None, diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index ea935100e954c..ea3e523d4ef0a 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -192,7 +192,7 @@ fn node_config ( rpc_ws: None, rpc_ws_max_connections: None, rpc_cors: None, - prometheus_endpoint: None, + prometheus_port: None, grafana_port: None, telemetry_endpoints: None, telemetry_external_transport: None, diff --git a/utils/prometheus/Cargo.toml b/utils/prometheus/Cargo.toml index 8ac6d7001833f..ea131e3952049 100644 --- a/utils/prometheus/Cargo.toml +++ b/utils/prometheus/Cargo.toml @@ -6,12 +6,14 @@ description = "prometheus utils" edition = "2018" [dependencies] -hyper = "0.12" -lazy_static = "1.0" -log = "0.4" +hyper = { version = "0.13.1", default-features = false, features = ["stream"] } +lazy_static = "1.4" +log = "0.4.8" prometheus = { version = "0.7", features = ["nightly", "process"]} -tokio = "0.1" +tokio = "0.2" +futures-util = { version = "0.3.1", default-features = false, features = ["io"] } sp-runtime = { package = "sp-runtime",path = "../../primitives/runtime" } +derive_more = "0.99" -[dev-dependencies] -reqwest = "0.9" \ No newline at end of file +[target.'cfg(not(target_os = "unknown"))'.dependencies] +async-std = { version = "1.0.1", features = ["unstable"] } \ No newline at end of file diff --git a/utils/prometheus/README.md b/utils/prometheus/README.md index 3efe80d57de94..9fec16a81ca15 100644 --- a/utils/prometheus/README.md +++ b/utils/prometheus/README.md @@ -27,66 +27,128 @@ Start Grafana Here is the entry point of prometheus core module in Parity Substrate. +In existing sources, refer to the grapana source due to the issue of the wasm. + utils/prometheus/src/lib.rs ```rust #[macro_use] extern crate lazy_static; -#[macro_use] -extern crate log; +use futures_util::{FutureExt,future::{Future}}; use hyper::http::StatusCode; -use hyper::rt::Future; -use hyper::service::service_fn_ok; -use hyper::{Body, Request, Response, Server}; +use hyper::Server; +use hyper::{Body, Request, Response, service::{service_fn, make_service_fn}}; pub use prometheus::{Encoder, HistogramOpts, Opts, TextEncoder}; -pub use prometheus::{Histogram, IntCounter, IntGauge, Result}; +pub use prometheus::{Histogram, IntCounter, IntGauge}; pub use sp_runtime::traits::SaturatedConversion; use std::net::SocketAddr; - +#[cfg(not(target_os = "unknown"))] +mod networking; pub mod metrics; +#[derive(Debug, derive_more::Display, derive_more::From)] +pub enum Error { + /// Hyper internal error. + Hyper(hyper::Error), + /// Http request error. + Http(hyper::http::Error), + /// i/o error. + Io(std::io::Error) +} +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Error::Hyper(error) => Some(error), + Error::Http(error) => Some(error), + Error::Io(error) => Some(error) + } + } +} + +async fn request_metrics(req: Request) -> Result, Error> { + if req.uri().path() == "/metrics" { + let metric_families = prometheus::gather(); + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", encoder.format_type()) + .body(Body::from(buffer)) + .map_err(Error::Http) + //.expect("Sends OK(200) response with one or more data metrics") + } else { + Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("Not found.")) + .map_err(Error::Http) + //.expect("Sends NOT_FOUND(404) message with no data metric") + } + +} + +#[derive(Clone)] +pub struct Executor; + +#[cfg(not(target_os = "unknown"))] +impl hyper::rt::Executor for Executor + where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ + fn execute(&self, future: T) { + async_std::task::spawn(future); + } +} /// Initializes the metrics context, and starts an HTTP server /// to serve metrics. -pub fn init_prometheus(prometheus_addr: SocketAddr) { - let addr = prometheus_addr; - let server = Server::bind(&addr) - .serve(|| { - // This is the `Service` that will handle the connection. - // `service_fn_ok` is a helper to convert a function that - // returns a Response into a `Service`. - service_fn_ok(move |req: Request| { - if req.uri().path() == "/metrics" { - let metric_families = prometheus::gather(); - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - encoder.encode(&metric_families, &mut buffer).unwrap(); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", encoder.format_type()) - .body(Body::from(buffer)) - .expect("Sends OK(200) response with one or more data metrics") - } else { - Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from("Not found.")) - .expect("Sends NOT_FOUND(404) message with no data metric") - } - }) - }) - .map_err(|e| error!("server error: {}", e)); - - info!("Exporting metrics at http://{}/metrics", addr); - - let mut rt = tokio::runtime::Builder::new() - .core_threads(1) // one thread is sufficient - .build() - .expect("Builds one thread of tokio runtime exporter for prometheus"); - - std::thread::spawn(move || { - rt.spawn(server); - rt.shutdown_on_idle().wait().unwrap(); - }); +#[cfg(not(target_os = "unknown"))] +pub async fn init_prometheus(mut prometheus_addr: SocketAddr) -> Result<(), Error>{ + use async_std::{net, io}; + use crate::networking::Incoming; + + let listener = loop { + let listener = net::TcpListener::bind(&prometheus_addr).await; + match listener { + Ok(listener) => { + log::info!("Prometheus server started at {}", prometheus_addr); + break listener + }, + Err(err) => match err.kind() { + io::ErrorKind::AddrInUse | io::ErrorKind::PermissionDenied if prometheus_addr.port() != 0 => { + log::warn!( + "Prometheus server to already {} port.", prometheus_addr.port() + ); + prometheus_addr.set_port(0); + continue; + }, + _ => return Err(err.into()) + } + } + }; + let service = make_service_fn(|_| { + async { + Ok::<_, Error>(service_fn(request_metrics)) + } + }); + + + let _server = Server::builder(Incoming(listener.incoming())) + .executor(Executor) + .serve(service) + .boxed(); + + + let result = _server.await.map_err(Into::into); + + result +} + +#[cfg(target_os = "unknown")] +pub async fn init_prometheus(_: SocketAddr) -> Result<(), Error> { + Ok(()) } + #[macro_export] macro_rules! prometheus_gauge( ($($metric:expr => $value:expr),*) => { @@ -120,9 +182,65 @@ macro_rules! prometheus( ); */ ``` +utuls/prometheus/src/networking.rs ( grafana-data-source Note) +```rust +use async_std::pin::Pin; +use std::task::{Poll, Context}; +use futures_util::{stream::Stream, io::{AsyncRead, AsyncWrite}}; + +pub struct Incoming<'a>(pub async_std::net::Incoming<'a>); + +impl hyper::server::accept::Accept for Incoming<'_> { + type Conn = TcpStream; + type Error = async_std::io::Error; + + fn poll_accept(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { + Pin::new(&mut Pin::into_inner(self).0) + .poll_next(cx) + .map(|opt| opt.map(|res| res.map(TcpStream))) + } +} + +pub struct TcpStream(pub async_std::net::TcpStream); + +impl tokio::io::AsyncRead for TcpStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8] + ) -> Poll> { + Pin::new(&mut Pin::into_inner(self).0) + .poll_read(cx, buf) + } +} + +impl tokio::io::AsyncWrite for TcpStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8] + ) -> Poll> { + Pin::new(&mut Pin::into_inner(self).0) + .poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut Pin::into_inner(self).0) + .poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut Pin::into_inner(self).0) + .poll_close(cx) + } +} + +``` + + Here is the dependancies of the module. -client/prometheus/Cargo.toml +utils/prometheus/Cargo.toml ```toml [package] name = "sc-prometheus" @@ -132,15 +250,17 @@ description = "prometheus utils" edition = "2018" [dependencies] -hyper = "0.12" -lazy_static = "1.0" -log = "0.4" +hyper = { version = "0.13.1", default-features = false, features = ["stream"] } +lazy_static = "1.4" +log = "0.4.8" prometheus = { version = "0.7", features = ["nightly", "process"]} -tokio = "0.1" +tokio = "0.2" +futures-util = { version = "0.3.1", default-features = false, features = ["io"] } sp-runtime = { package = "sp-runtime",path = "../../primitives/runtime" } +derive_more = "0.99" -[dev-dependencies] -reqwest = "0.9" +[target.'cfg(not(target_os = "unknown"))'.dependencies] +async-std = { version = "1.0.1", features = ["unstable"] } ``` **Abbreviation of the package in service manager of parity substrate** @@ -198,11 +318,15 @@ use sc_prometheus::prometheus_gauge; ... // prometheus init - match config.prometheus_endpoint { - None => (), - Some(x) => { - let _prometheus = sc_prometheus::init_prometheus(x); - } + if let Some(port) = config.prometheus_port { + let future = select( + sc_prometheus::init_prometheus(port).boxed() + ,exit.clone() + ).map(|either| match either { + Either::Left((result, _)) => result.map_err(|_| ()), + Either::Right(_) => Ok(()) + }).compat(); + let _ = to_spawn_tx.unbounded_send(Box::new(future)); } // Grafana data source if let Some(port) = config.grafana_port { @@ -241,28 +365,13 @@ client/cli/src/lib.rs ```rust fn crate_run_node_config{ ... - // Override telemetry - if cli.no_telemetry { - config.telemetry_endpoints = None; - } else if !cli.telemetry_endpoints.is_empty() { - config.telemetry_endpoints = Some(TelemetryEndpoints::new(cli.telemetry_endpoints)); - } - - config.tracing_targets = cli.tracing_targets.into(); - config.tracing_receiver = cli.tracing_receiver.into(); - - // Override prometheus - match cli.prometheus_endpoint { - None => {config.prometheus_endpoint = None;}, - Some(x) => { - config.prometheus_endpoint = Some(parse_address(&format!("{}:{}", x, 33333), cli.prometheus_port)?); - } - } - // Imply forced authoring on --dev - config.force_authoring = cli.shared_params.dev || cli.force_authoring; - - Ok(config) + let prometheus_interface: &str = if cli.prometheus_external { "0.0.0.0" } ... + // Override prometheus + if cli.prometheus_external { + config.prometheus_port = Some( + parse_address(&format!("{}:{}", prometheus_interface, 33333), cli.prometheus_port)? + )} } ``` @@ -270,12 +379,12 @@ client/cli/src/params.rs ```rust pub struct RunCmd{ ... -/// Prometheus exporter TCP port. + /// Prometheus exporter TCP port. #[structopt(long = "prometheus-port", value_name = "PORT")] pub prometheus_port: Option, - /// Prometheus exporter IP addr. - #[structopt(long = "prometheus-addr", value_name = "Local IP address")] - pub prometheus_endpoint: Option, + /// Prometheus exporter on/off external". + #[structopt(long = "prometheus-external")] + pub prometheus_external: bool, ... } ``` @@ -284,8 +393,8 @@ client/service/src/config.rs #[derive(Clone)] pub struct Configuration { ... - /// Promteheus IP addr. `None` if disabled. and defult port 33333 - pub prometheus_endpoint: Option, + /// Promteheus Port. `None` if disabled. and defult port 33333 + pub prometheus_port: Option, ... } impl Configuration where @@ -297,7 +406,7 @@ impl Configuration where pub fn default_with_spec(chain_spec: ChainSpec) -> Self { let mut configuration = Configuration { ... - prometheus_endpoints: None, + prometheus_prot: None, ... }; configuration.network.boot_nodes = configuration.chain_spec.boot_nodes().to_vec(); diff --git a/utils/prometheus/src/lib.rs b/utils/prometheus/src/lib.rs index 4713f47291de0..70e0ebfd830cc 100644 --- a/utils/prometheus/src/lib.rs +++ b/utils/prometheus/src/lib.rs @@ -1,61 +1,138 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + + #[macro_use] extern crate lazy_static; -#[macro_use] -extern crate log; +use futures_util::{FutureExt,future::{Future}}; use hyper::http::StatusCode; -use hyper::rt::Future; -use hyper::service::service_fn_ok; -use hyper::{Body, Request, Response, Server}; +use hyper::Server; +use hyper::{Body, Request, Response, service::{service_fn, make_service_fn}}; pub use prometheus::{Encoder, HistogramOpts, Opts, TextEncoder}; -pub use prometheus::{Histogram, IntCounter, IntGauge, Result}; +pub use prometheus::{Histogram, IntCounter, IntGauge}; pub use sp_runtime::traits::SaturatedConversion; use std::net::SocketAddr; - +#[cfg(not(target_os = "unknown"))] +mod networking; pub mod metrics; +#[derive(Debug, derive_more::Display, derive_more::From)] +pub enum Error { + /// Hyper internal error. + Hyper(hyper::Error), + /// Http request error. + Http(hyper::http::Error), + /// i/o error. + Io(std::io::Error) +} +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Error::Hyper(error) => Some(error), + Error::Http(error) => Some(error), + Error::Io(error) => Some(error) + } + } +} + +async fn request_metrics(req: Request) -> Result, Error> { + if req.uri().path() == "/metrics" { + let metric_families = prometheus::gather(); + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + encoder.encode(&metric_families, &mut buffer).unwrap(); + Response::builder() + .status(StatusCode::OK) + .header("Content-Type", encoder.format_type()) + .body(Body::from(buffer)) + .map_err(Error::Http) + //.expect("Sends OK(200) response with one or more data metrics") + } else { + Response::builder() + .status(StatusCode::NOT_FOUND) + .body(Body::from("Not found.")) + .map_err(Error::Http) + //.expect("Sends NOT_FOUND(404) message with no data metric") + } + +} + +#[derive(Clone)] +pub struct Executor; + +#[cfg(not(target_os = "unknown"))] +impl hyper::rt::Executor for Executor + where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ + fn execute(&self, future: T) { + async_std::task::spawn(future); + } +} /// Initializes the metrics context, and starts an HTTP server /// to serve metrics. -pub fn init_prometheus(prometheus_addr: SocketAddr) { - let addr = prometheus_addr; - let server = Server::bind(&addr) - .serve(|| { - // This is the `Service` that will handle the connection. - // `service_fn_ok` is a helper to convert a function that - // returns a Response into a `Service`. - service_fn_ok(move |req: Request| { - if req.uri().path() == "/metrics" { - let metric_families = prometheus::gather(); - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - encoder.encode(&metric_families, &mut buffer).unwrap(); - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", encoder.format_type()) - .body(Body::from(buffer)) - .expect("Sends OK(200) response with one or more data metrics") - } else { - Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::from("Not found.")) - .expect("Sends NOT_FOUND(404) message with no data metric") - } - }) - }) - .map_err(|e| error!("server error: {}", e)); - - info!("Exporting metrics at http://{}/metrics", addr); - - let mut rt = tokio::runtime::Builder::new() - .core_threads(1) // one thread is sufficient - .build() - .expect("Builds one thread of tokio runtime exporter for prometheus"); - - std::thread::spawn(move || { - rt.spawn(server); - rt.shutdown_on_idle().wait().unwrap(); - }); +#[cfg(not(target_os = "unknown"))] +pub async fn init_prometheus(mut prometheus_addr: SocketAddr) -> Result<(), Error>{ + use async_std::{net, io}; + use crate::networking::Incoming; + + let listener = loop { + let listener = net::TcpListener::bind(&prometheus_addr).await; + match listener { + Ok(listener) => { + log::info!("Prometheus server started at {}", prometheus_addr); + break listener + }, + Err(err) => match err.kind() { + io::ErrorKind::AddrInUse | io::ErrorKind::PermissionDenied if prometheus_addr.port() != 0 => { + log::warn!( + "Prometheus server to already {} port.", prometheus_addr.port() + ); + prometheus_addr.set_port(0); + continue; + }, + _ => return Err(err.into()) + } + } + }; + let service = make_service_fn(|_| { + async { + Ok::<_, Error>(service_fn(request_metrics)) + } + }); + + + let _server = Server::builder(Incoming(listener.incoming())) + .executor(Executor) + .serve(service) + .boxed(); + + + let result = _server.await.map_err(Into::into); + + result } +#[cfg(target_os = "unknown")] +pub async fn init_prometheus(_: SocketAddr) -> Result<(), Error> { + Ok(()) +} + + #[macro_export] macro_rules! prometheus_gauge( ($($metric:expr => $value:expr),*) => { diff --git a/utils/prometheus/src/metrics.rs b/utils/prometheus/src/metrics.rs index dfb9c4ec57720..a653535c11092 100644 --- a/utils/prometheus/src/metrics.rs +++ b/utils/prometheus/src/metrics.rs @@ -1,5 +1,22 @@ -pub use crate::*; +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + + +pub use crate::*; +pub use prometheus::Result; /// Gauge type metrics generation function pub fn try_create_int_gauge(name: &str, help: &str) -> Result { let opts = Opts::new(name, help); diff --git a/utils/prometheus/src/networking.rs b/utils/prometheus/src/networking.rs new file mode 100644 index 0000000000000..f5bbd21d57a86 --- /dev/null +++ b/utils/prometheus/src/networking.rs @@ -0,0 +1,66 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use async_std::pin::Pin; +use std::task::{Poll, Context}; +use futures_util::{stream::Stream, io::{AsyncRead, AsyncWrite}}; + +pub struct Incoming<'a>(pub async_std::net::Incoming<'a>); + +impl hyper::server::accept::Accept for Incoming<'_> { + type Conn = TcpStream; + type Error = async_std::io::Error; + + fn poll_accept(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { + Pin::new(&mut Pin::into_inner(self).0) + .poll_next(cx) + .map(|opt| opt.map(|res| res.map(TcpStream))) + } +} + +pub struct TcpStream(pub async_std::net::TcpStream); + +impl tokio::io::AsyncRead for TcpStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8] + ) -> Poll> { + Pin::new(&mut Pin::into_inner(self).0) + .poll_read(cx, buf) + } +} + +impl tokio::io::AsyncWrite for TcpStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8] + ) -> Poll> { + Pin::new(&mut Pin::into_inner(self).0) + .poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut Pin::into_inner(self).0) + .poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut Pin::into_inner(self).0) + .poll_close(cx) + } +}