Skip to content

Commit

Permalink
utils/prometheus: Make crate spawn onto global executor
Browse files Browse the repository at this point in the history
With 6ee1244 the Substrate green threading executor is now
configurable. The Substrate Prometheus crate can use this executor to
spawn futures per connection (scrape request).
  • Loading branch information
mxinden committed Jan 23, 2020
1 parent 32c04b4 commit ea9278a
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 94 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ ServiceBuilder<
let metrics = ServiceMetrics::register(&registry)?;

let future = select(
prometheus_exporter::init_prometheus(port, registry).boxed(),
prometheus_exporter::init_prometheus(port, registry, to_spawn_tx.clone()).boxed(),
exit.clone()
).map(drop);

Expand Down
4 changes: 2 additions & 2 deletions utils/prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ edition = "2018"

[dependencies]
log = "0.4.8"
hyper = { version = "0.13.1", default-features = false, features = ["stream"] }
hyper = { version = "0.13.1", default-features = false, features = ["tcp"] }
prometheus = { version = "0.7", features = ["nightly", "process"]}
tokio = "0.2"
futures-util = { version = "0.3.1", default-features = false, features = ["io"] }
futures = "0.3.1"
derive_more = "0.99"

[target.'cfg(not(target_os = "unknown"))'.dependencies]
Expand Down
46 changes: 22 additions & 24 deletions utils/prometheus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use futures_util::{FutureExt, future::Future};
use hyper::http::StatusCode;
use hyper::{Server, Body, Response, service::{service_fn, make_service_fn}};
use futures::{channel::mpsc, prelude::*};
use hyper::{Body, http::StatusCode, Response, Server, service::{service_fn, make_service_fn}};
use prometheus::{Encoder, Opts, TextEncoder, core::Atomic};
use std::net::SocketAddr;
#[cfg(not(target_os = "unknown"))]
mod networking;
use std::{net::SocketAddr, pin::Pin};

pub use prometheus::{
Registry, Error as PrometheusError,
Expand Down Expand Up @@ -70,29 +67,28 @@ async fn request_metrics(registry: Registry) -> Result<Response<Body>, Error> {
}

#[derive(Clone)]
pub struct Executor;
pub struct Executor {
to_spawn_tx: mpsc::UnboundedSender<Pin<Box<dyn Future<Output = ()> + Send>>>,
}

#[cfg(not(target_os = "unknown"))]
impl<T> hyper::rt::Executor<T> for Executor
where
T: Future + Send + 'static,
T::Output: Send + 'static,
where
T: Future + Send + 'static,
{
fn execute(&self, future: T) {
async_std::task::spawn(future);
fn execute(&self, fut: T) {
self.to_spawn_tx.unbounded_send(Box::pin(fut.map(drop)))
.expect("sending on unbounded channel never fails; qed");
}
}
/// Initializes the metrics context, and starts an HTTP server
/// to serve metrics.
#[cfg(not(target_os = "unknown"))]
pub async fn init_prometheus(prometheus_addr: SocketAddr, registry: Registry) -> Result<(), Error>{
use networking::Incoming;
let listener = async_std::net::TcpListener::bind(&prometheus_addr)
.await
.map_err(|_| Error::PortInUse(prometheus_addr))?;

log::info!("Prometheus server started at {}", prometheus_addr);

pub async fn init_prometheus(
prometheus_addr: SocketAddr,
registry: Registry,
to_spawn_tx: mpsc::UnboundedSender<Pin<Box<dyn Future<Output = ()> + Send>>>,
) -> Result<(), Error>{
let service = make_service_fn(move |_| {
let registry = registry.clone();

Expand All @@ -103,14 +99,16 @@ pub async fn init_prometheus(prometheus_addr: SocketAddr, registry: Registry) ->
}
});

let server = Server::builder(Incoming(listener.incoming()))
.executor(Executor)
let executor = Executor { to_spawn_tx };

let server = hyper::server::Server::try_bind(&prometheus_addr)?
.executor(executor)
.serve(service)
.boxed();

let result = server.await.map_err(Into::into);
log::info!("Prometheus metrics served at {}/metrics", prometheus_addr);

result
server.await.map_err(Into::into)
}

#[cfg(target_os = "unknown")]
Expand Down
66 changes: 0 additions & 66 deletions utils/prometheus/src/networking.rs

This file was deleted.

0 comments on commit ea9278a

Please sign in to comment.