diff --git a/Cargo.lock b/Cargo.lock index da87ea8549aef..699be70286c91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1954,6 +1954,7 @@ dependencies = [ "httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4494,7 +4495,7 @@ version = "0.8.0" dependencies = [ "async-std 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.99.2 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "prometheus 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -7275,9 +7276,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.11.1 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project-lite 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 60b0888112eec..9e6b25584a6a1 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -1029,7 +1029,7 @@ ServiceBuilder< let metrics = ServiceMetrics::register(®istry)?; let future = select( - prometheus_exporter::init_prometheus(port, registry).boxed(), + prometheus_exporter::init_prometheus(port, registry, to_spawn_tx.clone()).boxed(), exit.clone() ).map(drop); diff --git a/utils/prometheus/Cargo.toml b/utils/prometheus/Cargo.toml index 8e50b4024e3bc..37da93257f933 100644 --- a/utils/prometheus/Cargo.toml +++ b/utils/prometheus/Cargo.toml @@ -8,10 +8,10 @@ edition = "2018" [dependencies] log = "0.4.8" -hyper = { version = "0.13.1", default-features = false, features = ["stream"] } +hyper = { version = "0.13.1", default-features = false, features = ["tcp"] } prometheus = { version = "0.7", features = ["nightly", "process"]} tokio = "0.2" -futures-util = { version = "0.3.1", default-features = false, features = ["io"] } +futures = "0.3.1" derive_more = "0.99" [target.'cfg(not(target_os = "unknown"))'.dependencies] diff --git a/utils/prometheus/src/lib.rs b/utils/prometheus/src/lib.rs index 25f058acc377d..d6436fe696940 100644 --- a/utils/prometheus/src/lib.rs +++ b/utils/prometheus/src/lib.rs @@ -14,13 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use futures_util::{FutureExt, future::Future}; -use hyper::http::StatusCode; -use hyper::{Server, Body, Response, service::{service_fn, make_service_fn}}; +use futures::{channel::mpsc, prelude::*}; +use hyper::{Body, http::StatusCode, Response, Server, service::{service_fn, make_service_fn}}; use prometheus::{Encoder, Opts, TextEncoder, core::Atomic}; -use std::net::SocketAddr; -#[cfg(not(target_os = "unknown"))] -mod networking; +use std::{net::SocketAddr, pin::Pin}; pub use prometheus::{ Registry, Error as PrometheusError, @@ -70,29 +67,28 @@ async fn request_metrics(registry: Registry) -> Result, Error> { } #[derive(Clone)] -pub struct Executor; +pub struct Executor { + to_spawn_tx: mpsc::UnboundedSender + Send>>>, +} #[cfg(not(target_os = "unknown"))] impl hyper::rt::Executor for Executor - where - T: Future + Send + 'static, - T::Output: Send + 'static, +where + T: Future + Send + 'static, { - fn execute(&self, future: T) { - async_std::task::spawn(future); + fn execute(&self, fut: T) { + self.to_spawn_tx.unbounded_send(Box::pin(fut.map(drop))) + .expect("sending on unbounded channel never fails; qed"); } } /// Initializes the metrics context, and starts an HTTP server /// to serve metrics. #[cfg(not(target_os = "unknown"))] -pub async fn init_prometheus(prometheus_addr: SocketAddr, registry: Registry) -> Result<(), Error>{ - use networking::Incoming; - let listener = async_std::net::TcpListener::bind(&prometheus_addr) - .await - .map_err(|_| Error::PortInUse(prometheus_addr))?; - - log::info!("Prometheus server started at {}", prometheus_addr); - +pub async fn init_prometheus( + prometheus_addr: SocketAddr, + registry: Registry, + to_spawn_tx: mpsc::UnboundedSender + Send>>>, +) -> Result<(), Error>{ let service = make_service_fn(move |_| { let registry = registry.clone(); @@ -103,14 +99,16 @@ pub async fn init_prometheus(prometheus_addr: SocketAddr, registry: Registry) -> } }); - let server = Server::builder(Incoming(listener.incoming())) - .executor(Executor) + let executor = Executor { to_spawn_tx }; + + let server = hyper::server::Server::try_bind(&prometheus_addr)? + .executor(executor) .serve(service) .boxed(); - let result = server.await.map_err(Into::into); + log::info!("Prometheus metrics served at {}/metrics", prometheus_addr); - result + server.await.map_err(Into::into) } #[cfg(target_os = "unknown")] diff --git a/utils/prometheus/src/networking.rs b/utils/prometheus/src/networking.rs deleted file mode 100644 index 5c8c036d44597..0000000000000 --- a/utils/prometheus/src/networking.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2019-2020 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -use async_std::pin::Pin; -use std::task::{Poll, Context}; -use futures_util::{stream::Stream, io::{AsyncRead, AsyncWrite}}; - -pub struct Incoming<'a>(pub async_std::net::Incoming<'a>); - -impl hyper::server::accept::Accept for Incoming<'_> { - type Conn = TcpStream; - type Error = async_std::io::Error; - - fn poll_accept(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { - Pin::new(&mut Pin::into_inner(self).0) - .poll_next(cx) - .map(|opt| opt.map(|res| res.map(TcpStream))) - } -} - -pub struct TcpStream(pub async_std::net::TcpStream); - -impl tokio::io::AsyncRead for TcpStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut [u8] - ) -> Poll> { - Pin::new(&mut Pin::into_inner(self).0) - .poll_read(cx, buf) - } -} - -impl tokio::io::AsyncWrite for TcpStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8] - ) -> Poll> { - Pin::new(&mut Pin::into_inner(self).0) - .poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut Pin::into_inner(self).0) - .poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut Pin::into_inner(self).0) - .poll_close(cx) - } -}