Skip to content

Commit

Permalink
chore(deps): upgrade prometheous server to hyper v1 (#4898)
Browse files Browse the repository at this point in the history
Partly fixes
#4890 (comment)

Still the offchain API needs to be updated to hyper v1.0 and I opened an
issue for it, it's using low-level http body features that have been
removed
  • Loading branch information
niklasad1 authored Jun 27, 2024
1 parent dee1824 commit de41ae8
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 77 deletions.
30 changes: 17 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -779,10 +779,14 @@ hex-literal = { version = "0.4.1", default-features = false }
hkdf = { version = "0.12.0" }
hmac = { version = "0.12.1" }
honggfuzz = { version = "0.5.55" }
http = { version = "1.1.0" }
http-body-util = { version = "0.1.2" }
hyper = { version = "0.14.27", default-features = false }
hyper-rustls = { version = "0.24.0" }
http = { version = "1.1" }
http-body = { version = "1", default-features = false }
http-body-util = { version = "0.1.2", default-features = false }
hyper = { version = "1.3.1", default-features = false }
hyper-rustls = { version = "0.24.2" }
hyper-util = { version = "0.1.5", default-features = false }
# TODO: remove hyper v0.14 https://github.com/paritytech/polkadot-sdk/issues/4896
hyperv14 = { package = "hyper", version = "0.14.29", default-features = false }
impl-serde = { version = "0.4.0", default-features = false }
impl-trait-for-tuples = { version = "0.2.2" }
indexmap = { version = "2.0.0" }
Expand Down
4 changes: 3 additions & 1 deletion polkadot/node/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ log = { workspace = true, default-features = true }
[dev-dependencies]
assert_cmd = { workspace = true }
tempfile = { workspace = true }
hyper = { features = ["http1", "tcp"], workspace = true }
hyper-util = { features = ["client-legacy", "tokio"], workspace = true }
hyper = { workspace = true }
http-body-util = { workspace = true }
tokio = { workspace = true, default-features = true }
polkadot-test-service = { features = ["runtime-metrics"], workspace = true }
substrate-test-utils = { workspace = true }
Expand Down
14 changes: 11 additions & 3 deletions polkadot/node/metrics/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

//! Polkadot runtime metrics integration test.
use hyper::{Client, Uri};
use http_body_util::BodyExt;
use hyper::Uri;
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use polkadot_primitives::metric_definitions::PARACHAIN_INHERENT_DATA_BITFIELDS_PROCESSED;
use polkadot_test_service::{node_config, run_validator_node, test_prometheus_config};
use sp_keyring::AccountKeyring::*;
Expand Down Expand Up @@ -66,14 +68,20 @@ async fn runtime_can_publish_metrics() {
}

async fn scrape_prometheus_metrics(metrics_uri: &str) -> HashMap<String, u64> {
let res = Client::new()
let res = Client::builder(TokioExecutor::new())
.build_http::<http_body_util::Full<hyper::body::Bytes>>()
.get(Uri::try_from(metrics_uri).expect("bad URI"))
.await
.expect("GET request failed");

// Retrieve the `HTTP` response body.
let body = String::from_utf8(
hyper::body::to_bytes(res).await.expect("can't get body as bytes").to_vec(),
res.into_body()
.collect()
.await
.expect("can't get body as bytes")
.to_bytes()
.to_vec(),
)
.expect("body is not an UTF8 string");

Expand Down
2 changes: 1 addition & 1 deletion substrate/client/offchain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ codec = { features = ["derive"], workspace = true, default-features = true }
fnv = { workspace = true }
futures = { workspace = true }
futures-timer = { workspace = true }
hyper = { features = ["http2", "stream"], workspace = true, default-features = true }
hyperv14 = { features = ["http2", "stream"], workspace = true, default-features = true }
hyper-rustls = { features = ["http2"], workspace = true }
num_cpus = { workspace = true }
once_cell = { workspace = true }
Expand Down
2 changes: 2 additions & 0 deletions substrate/client/offchain/src/api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
//! (i.e.: the socket should continue being processed) in the background even if the runtime isn't
//! actively calling any function.
use hyperv14 as hyper;

use crate::api::timestamp;
use bytes::buf::{Buf, Reader};
use fnv::FnvHashMap;
Expand Down
8 changes: 5 additions & 3 deletions substrate/utils/prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
hyper = { features = ["http1", "server", "tcp"], workspace = true }
http-body-util = { workspace = true }
hyper = { features = ["http1", "server"], workspace = true }
hyper-util = { features = ["server-auto", "tokio"], workspace = true }
log = { workspace = true, default-features = true }
prometheus = { workspace = true }
thiserror = { workspace = true }
tokio = { features = ["parking_lot"], workspace = true, default-features = true }

[dev-dependencies]
hyper = { features = ["client"], workspace = true, default-features = true }
tokio = { features = ["rt-multi-thread"], workspace = true, default-features = true }
hyper-util = { features = ["client-legacy", "tokio"], workspace = true, default-features = true }
tokio = { features = ["macros", "rt-multi-thread"], workspace = true, default-features = true }
107 changes: 55 additions & 52 deletions substrate/utils/prometheus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use hyper::{
http::StatusCode,
server::Server,
service::{make_service_fn, service_fn},
Body, Request, Response,
};
mod sourced;

use hyper::{http::StatusCode, Request, Response};
use prometheus::{core::Collector, Encoder, TextEncoder};
use std::net::SocketAddr;

pub use prometheus::{
self,
core::{
Expand All @@ -30,13 +30,10 @@ pub use prometheus::{
exponential_buckets, Error as PrometheusError, Histogram, HistogramOpts, HistogramVec, Opts,
Registry,
};
use prometheus::{core::Collector, Encoder, TextEncoder};
use std::net::SocketAddr;

mod sourced;

pub use sourced::{MetricSource, SourcedCounter, SourcedGauge, SourcedMetric};

type Body = http_body_util::Full<hyper::body::Bytes>;

pub fn register<T: Clone + Collector + 'static>(
metric: T,
registry: &Registry,
Expand All @@ -63,7 +60,10 @@ pub enum Error {
PortInUse(SocketAddr),
}

async fn request_metrics(req: Request<Body>, registry: Registry) -> Result<Response<Body>, Error> {
async fn request_metrics(
req: Request<hyper::body::Incoming>,
registry: Registry,
) -> Result<Response<Body>, Error> {
if req.uri().path() == "/metrics" {
let metric_families = registry.gather();
let mut buffer = vec![];
Expand Down Expand Up @@ -98,46 +98,49 @@ async fn init_prometheus_with_listener(
listener: tokio::net::TcpListener,
registry: Registry,
) -> Result<(), Error> {
let listener = hyper::server::conn::AddrIncoming::from_listener(listener)?;
log::info!("〽️ Prometheus exporter started at {}", listener.local_addr());

let service = make_service_fn(move |_| {
let registry = registry.clone();

async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
request_metrics(req, registry.clone())
}))
}
});
log::info!(target: "prometheus", "〽️ Prometheus exporter started at {}", listener.local_addr()?);

let (signal, on_exit) = tokio::sync::oneshot::channel::<()>();
let server = Server::builder(listener).serve(service).with_graceful_shutdown(async {
let _ = on_exit.await;
});
let server = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new());

let result = server.await.map_err(Into::into);
loop {
let io = match listener.accept().await {
Ok((sock, _)) => hyper_util::rt::TokioIo::new(sock),
Err(e) => {
log::debug!(target: "prometheus", "Error accepting connection: {:?}", e);
continue;
},
};

// Gracefully shutdown server, otherwise the server does not stop if it has open connections
let _ = signal.send(());
let registry = registry.clone();

result
let conn = server
.serve_connection_with_upgrades(
io,
hyper::service::service_fn(move |req| request_metrics(req, registry.clone())),
)
.into_owned();

tokio::spawn(async move {
if let Err(err) = conn.await {
log::debug!(target: "prometheus", "connection error: {:?}", err);
}
});
}
}

#[cfg(test)]
mod tests {
use super::*;
use hyper::{Client, Uri};

#[test]
fn prometheus_works() {
const METRIC_NAME: &str = "test_test_metric_name_test_test";
use http_body_util::BodyExt;
use hyper::Uri;
use hyper_util::{client::legacy::Client, rt::TokioExecutor};

let runtime = tokio::runtime::Runtime::new().expect("Creates the runtime");
const METRIC_NAME: &str = "test_test_metric_name_test_test";

let listener = runtime
.block_on(tokio::net::TcpListener::bind("127.0.0.1:0"))
.expect("Creates listener");
#[tokio::test]
async fn prometheus_works() {
let listener =
tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("Creates listener");

let local_addr = listener.local_addr().expect("Returns the local addr");

Expand All @@ -148,20 +151,20 @@ mod tests {
)
.expect("Registers the test metric");

runtime.spawn(init_prometheus_with_listener(listener, registry));
tokio::spawn(init_prometheus_with_listener(listener, registry));

runtime.block_on(async {
let client = Client::new();
let client = Client::builder(TokioExecutor::new()).build_http::<Body>();

let res = client
.get(Uri::try_from(&format!("http://{}/metrics", local_addr)).expect("Parses URI"))
.await
.expect("Requests metrics");
let res = client
.get(Uri::try_from(&format!("http://{}/metrics", local_addr)).expect("Parses URI"))
.await
.expect("Requests metrics");

let buf = hyper::body::to_bytes(res).await.expect("Converts body to bytes");
assert!(res.status().is_success());

let body = String::from_utf8(buf.to_vec()).expect("Converts body to String");
assert!(body.contains(&format!("{} 0", METRIC_NAME)));
});
let buf = res.into_body().collect().await.expect("Failed to read HTTP body").to_bytes();
let body = String::from_utf8(buf.to_vec()).expect("Converts body to String");

assert!(body.contains(&format!("{} 0", METRIC_NAME)));
}
}

0 comments on commit de41ae8

Please sign in to comment.