From ea9278a50f52a628b5c1b4e2f1541ce46a6beae8 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 23 Jan 2020 17:59:58 +0100 Subject: [PATCH] utils/prometheus: Make crate spawn onto global executor With 6ee1244e2 the Substrate green threading executor is now configurable. The Substrate Prometheus crate can use this executor to spawn futures per connection (scrape request). --- Cargo.lock | 6 ++- client/service/src/builder.rs | 2 +- utils/prometheus/Cargo.toml | 4 +- utils/prometheus/src/lib.rs | 46 ++++++++++----------- utils/prometheus/src/networking.rs | 66 ------------------------------ 5 files changed, 30 insertions(+), 94 deletions(-) delete mode 100644 utils/prometheus/src/networking.rs diff --git a/Cargo.lock b/Cargo.lock index da87ea8549aef..699be70286c91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1954,6 +1954,7 @@ dependencies = [ "httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4494,7 +4495,7 @@ version = "0.8.0" dependencies = [ "async-std 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.99.2 (registry+https://github.com/rust-lang/crates.io-index)", - "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "prometheus 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -7275,9 +7276,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.11.1 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project-lite 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 60b0888112eec..9e6b25584a6a1 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -1029,7 +1029,7 @@ ServiceBuilder< let metrics = ServiceMetrics::register(®istry)?; let future = select( - prometheus_exporter::init_prometheus(port, registry).boxed(), + prometheus_exporter::init_prometheus(port, registry, to_spawn_tx.clone()).boxed(), exit.clone() ).map(drop); diff --git a/utils/prometheus/Cargo.toml b/utils/prometheus/Cargo.toml index 8e50b4024e3bc..37da93257f933 100644 --- a/utils/prometheus/Cargo.toml +++ b/utils/prometheus/Cargo.toml @@ -8,10 +8,10 @@ edition = "2018" [dependencies] log = "0.4.8" -hyper = { version = "0.13.1", default-features = false, features = ["stream"] } +hyper = { version = "0.13.1", default-features = false, features = ["tcp"] } prometheus = { version = "0.7", features = ["nightly", "process"]} tokio = "0.2" -futures-util = { version = "0.3.1", default-features = false, features = ["io"] } +futures = "0.3.1" derive_more = "0.99" [target.'cfg(not(target_os = "unknown"))'.dependencies] diff --git a/utils/prometheus/src/lib.rs b/utils/prometheus/src/lib.rs index 25f058acc377d..d6436fe696940 100644 --- a/utils/prometheus/src/lib.rs +++ b/utils/prometheus/src/lib.rs @@ -14,13 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use futures_util::{FutureExt, future::Future}; -use hyper::http::StatusCode; -use hyper::{Server, Body, Response, service::{service_fn, make_service_fn}}; +use futures::{channel::mpsc, prelude::*}; +use hyper::{Body, http::StatusCode, Response, Server, service::{service_fn, make_service_fn}}; use prometheus::{Encoder, Opts, TextEncoder, core::Atomic}; -use std::net::SocketAddr; -#[cfg(not(target_os = "unknown"))] -mod networking; +use std::{net::SocketAddr, pin::Pin}; pub use prometheus::{ Registry, Error as PrometheusError, @@ -70,29 +67,28 @@ async fn request_metrics(registry: Registry) -> Result, Error> { } #[derive(Clone)] -pub struct Executor; +pub struct Executor { + to_spawn_tx: mpsc::UnboundedSender + Send>>>, +} #[cfg(not(target_os = "unknown"))] impl hyper::rt::Executor for Executor - where - T: Future + Send + 'static, - T::Output: Send + 'static, +where + T: Future + Send + 'static, { - fn execute(&self, future: T) { - async_std::task::spawn(future); + fn execute(&self, fut: T) { + self.to_spawn_tx.unbounded_send(Box::pin(fut.map(drop))) + .expect("sending on unbounded channel never fails; qed"); } } /// Initializes the metrics context, and starts an HTTP server /// to serve metrics. #[cfg(not(target_os = "unknown"))] -pub async fn init_prometheus(prometheus_addr: SocketAddr, registry: Registry) -> Result<(), Error>{ - use networking::Incoming; - let listener = async_std::net::TcpListener::bind(&prometheus_addr) - .await - .map_err(|_| Error::PortInUse(prometheus_addr))?; - - log::info!("Prometheus server started at {}", prometheus_addr); - +pub async fn init_prometheus( + prometheus_addr: SocketAddr, + registry: Registry, + to_spawn_tx: mpsc::UnboundedSender + Send>>>, +) -> Result<(), Error>{ let service = make_service_fn(move |_| { let registry = registry.clone(); @@ -103,14 +99,16 @@ pub async fn init_prometheus(prometheus_addr: SocketAddr, registry: Registry) -> } }); - let server = Server::builder(Incoming(listener.incoming())) - .executor(Executor) + let executor = Executor { to_spawn_tx }; + + let server = hyper::server::Server::try_bind(&prometheus_addr)? + .executor(executor) .serve(service) .boxed(); - let result = server.await.map_err(Into::into); + log::info!("Prometheus metrics served at {}/metrics", prometheus_addr); - result + server.await.map_err(Into::into) } #[cfg(target_os = "unknown")] diff --git a/utils/prometheus/src/networking.rs b/utils/prometheus/src/networking.rs deleted file mode 100644 index 5c8c036d44597..0000000000000 --- a/utils/prometheus/src/networking.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2019-2020 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -use async_std::pin::Pin; -use std::task::{Poll, Context}; -use futures_util::{stream::Stream, io::{AsyncRead, AsyncWrite}}; - -pub struct Incoming<'a>(pub async_std::net::Incoming<'a>); - -impl hyper::server::accept::Accept for Incoming<'_> { - type Conn = TcpStream; - type Error = async_std::io::Error; - - fn poll_accept(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { - Pin::new(&mut Pin::into_inner(self).0) - .poll_next(cx) - .map(|opt| opt.map(|res| res.map(TcpStream))) - } -} - -pub struct TcpStream(pub async_std::net::TcpStream); - -impl tokio::io::AsyncRead for TcpStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut [u8] - ) -> Poll> { - Pin::new(&mut Pin::into_inner(self).0) - .poll_read(cx, buf) - } -} - -impl tokio::io::AsyncWrite for TcpStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8] - ) -> Poll> { - Pin::new(&mut Pin::into_inner(self).0) - .poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut Pin::into_inner(self).0) - .poll_flush(cx) - } - - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Pin::new(&mut Pin::into_inner(self).0) - .poll_close(cx) - } -}