diff --git a/Cargo.toml b/Cargo.toml index b971c4c..74d9071 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,4 +36,4 @@ opentelemetry-http = "0.27.0" tokio = { version = "1.42", features = ["macros"] } [patch.crates-io] -opentelemetry-prometheus = { git="https://github.com/ttys3/opentelemetry-rust.git", branch="opentelemetry-prometheus-sdk-0.26" } +opentelemetry-prometheus = { git="https://github.com/ttys3/opentelemetry-rust.git", branch="opentelemetry-prometheus-sdk-0.27" } diff --git a/src/lib.rs b/src/lib.rs index d579e02..c47c5a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,17 +26,22 @@ //! ``` //! use axum_otel_metrics::HttpMetricsLayerBuilder; //! use axum::{response::Html, routing::get, Router}; +//! use prometheus::{Encoder, Registry, TextEncoder}; //! //! let metrics = HttpMetricsLayerBuilder::new() //! .with_service_name(env!("CARGO_PKG_NAME").to_string()) //! .with_service_version(env!("CARGO_PKG_VERSION").to_string()) -//! .with_prefix("axum_metrics_demo".to_string()) -//! .with_labels(vec![("env".to_string(), "testing".to_string())].into_iter().collect()) //! .build(); //! //! let app = Router::<()>::new() //! // export metrics at `/metrics` endpoint -//! .merge(metrics.routes()) +//! .route("/metrics", get(|| async { +//! let mut buffer = Vec::new(); +//! let encoder = TextEncoder::new(); +//! encoder.encode(&prometheus::gather(), &mut buffer).unwrap(); +//! // return metrics +//! String::from_utf8(buffer).unwrap() +//! })) //! .route("/", get(handler)) //! .route("/hello", get(handler)) //! .route("/world", get(handler)) @@ -49,8 +54,7 @@ //! ``` use axum::http::Response; -use axum::{extract::MatchedPath, extract::State, http::Request, response::IntoResponse, routing::get, Router}; -use std::collections::HashMap; +use axum::{extract::MatchedPath, http::Request}; use std::env; use std::sync::Arc; use std::time::Duration; @@ -61,18 +65,13 @@ use std::task::Poll::Ready; use std::task::{Context, Poll}; use std::time::Instant; -use prometheus::{Encoder, Registry, TextEncoder}; - -use opentelemetry::{Key, KeyValue, Value}; +use opentelemetry::{Key, KeyValue}; use opentelemetry::metrics::{Counter, Histogram, UpDownCounter}; use opentelemetry::metrics::MeterProvider; -use opentelemetry_sdk::metrics::{ - reader::{DefaultTemporalitySelector}, - PeriodicReader, SdkMeterProvider, -}; +use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider, Temporality}; use opentelemetry_sdk::resource::{EnvResourceDetector, SdkProvidedResourceDetector, TelemetryResourceDetector}; use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_NAMESPACE, SERVICE_VERSION}; @@ -104,9 +103,6 @@ pub struct Metric { #[derive(Clone)] pub struct MetricState { - /// Prometheus Registry we used to gathering and exporting metrics in the export endpoint - registry: Option, - /// hold the metrics we used in the middleware pub metric: Metric, @@ -133,7 +129,6 @@ pub struct HttpMetrics { pub struct HttpMetricsLayer { /// the metric state, use both by the middleware handler and metrics export endpoint pub(crate) state: MetricState, - path: String, } // TODO support custom buckets @@ -161,30 +156,6 @@ const HTTP_REQ_SIZE_HISTOGRAM_BUCKETS: &[f64] = &[ 10.0 * MB, // 10 MB ]; -impl HttpMetricsLayer { - pub fn routes(&self) -> Router { - Router::new() - .route(self.path.as_str(), get(Self::exporter_handler)) - .with_state(self.state.clone()) - } - - // TODO use a static global exporter like autometrics-rs? - // https://github.com/autometrics-dev/autometrics-rs/blob/d3e7bffeede43f6c77b6a992b0443c0fca34003f/autometrics/src/prometheus_exporter.rs#L10 - pub async fn exporter_handler(state: State) -> impl IntoResponse { - // tracing::trace!("exporter_handler called"); - match state.registry { - Some(ref registry) => { - let mut buffer = Vec::new(); - let encoder = TextEncoder::new(); - encoder.encode(®istry.gather(), &mut buffer).unwrap(); - // return metrics - String::from_utf8(buffer).unwrap() - } - None => "#no prometheus registry".to_string(), - } - } -} - /// A helper that instructs the metrics layer to ignore /// certain paths. /// @@ -219,7 +190,7 @@ impl PathSkipper { /// the implementor and user of this code, have that /// responsibility. pub fn new_with_fn(skip: Arc bool + 'static + Send + Sync>) -> Self { - Self { skip: skip } + Self { skip } } } @@ -235,33 +206,27 @@ impl Default for PathSkipper { } #[derive(Clone)] -pub struct HttpMetricsLayerBuilder { +pub struct HttpMetricsLayerBuilder { service_name: Option, service_version: Option, - prefix: Option, - path: String, - labels: Option>, skipper: PathSkipper, is_tls: bool, - exporter: Option, + metric_reader: Option, } -impl Default for HttpMetricsLayerBuilder { +impl Default for HttpMetricsLayerBuilder { fn default() -> Self { Self { service_name: None, service_version: None, - prefix: None, - path: "/metrics".to_string(), - labels: None, skipper: PathSkipper::default(), is_tls: false, - exporter: Some("prometheus".to_string()), + metric_reader: None, } } } -impl HttpMetricsLayerBuilder { +impl HttpMetricsLayerBuilder { pub fn new() -> Self { HttpMetricsLayerBuilder::default() } @@ -276,28 +241,13 @@ impl HttpMetricsLayerBuilder { self } - pub fn with_prefix(mut self, prefix: String) -> Self { - self.prefix = Some(prefix); - self - } - - pub fn with_path(mut self, path: String) -> Self { - self.path = path; - self - } - - pub fn with_labels(mut self, labels: HashMap) -> Self { - self.labels = Some(labels); - self - } - pub fn with_skipper(mut self, skipper: PathSkipper) -> Self { self.skipper = skipper; self } - pub fn with_exporter(mut self, exporter: String) -> Self { - self.exporter = Some(exporter); + pub fn with_metric_reader(mut self, metric_reader: T) -> Self { + self.metric_reader = Some(metric_reader); self } @@ -344,17 +294,13 @@ impl HttpMetricsLayerBuilder { res }; - let mut registry = None; let mut builder = SdkMeterProvider::builder().with_resource(res); // exporter - - if self.exporter == Some("otlp".to_string()) { - builder = builder.with_reader(self.build_otlp()); + if let Some(metric_reader) = self.metric_reader { + builder = builder.with_reader(metric_reader); } else { - let (reg, exporter) = self.build_prometheus(); - registry = Some(reg); - builder = builder.with_reader(exporter); + builder = builder.with_reader(self.build_otlp()); } let provider = builder.build(); @@ -364,18 +310,18 @@ impl HttpMetricsLayerBuilder { // this must called after the global meter provider has ben initialized // let meter = global::meter("axum-app"); // let meter = provider.meter("axum-app"); - let meter = provider.versioned_meter( - env!("CARGO_PKG_NAME"), - Some(env!("CARGO_PKG_VERSION")), - Some("https://opentelemetry.io/schema/1.0.0"), - None, + let meter = provider.meter_with_scope( + opentelemetry::InstrumentationScope::builder(env!("CARGO_PKG_NAME")) + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url("https://opentelemetry.io/schema/1.0.0") + .build(), ); // requests_total let requests_total = meter .u64_counter("requests") .with_description("How many HTTP requests processed, partitioned by status code and HTTP method.") - .init(); + .build(); // request_duration_seconds let req_duration = meter @@ -383,7 +329,7 @@ impl HttpMetricsLayerBuilder { .with_unit("s") .with_description("The HTTP request latencies in seconds.") .with_boundaries(HTTP_REQ_DURATION_HISTOGRAM_BUCKETS.to_vec()) - .init(); + .build(); // request_size_bytes let req_size = meter @@ -391,23 +337,22 @@ impl HttpMetricsLayerBuilder { .with_unit("By") .with_description("The HTTP request sizes in bytes.") .with_boundaries(HTTP_REQ_SIZE_HISTOGRAM_BUCKETS.to_vec()) - .init(); + .build(); let res_size = meter .u64_histogram("http.server.response.size") .with_unit("By") .with_description("The HTTP reponse sizes in bytes.") .with_boundaries(HTTP_REQ_SIZE_HISTOGRAM_BUCKETS.to_vec()) - .init(); + .build(); // no u64_up_down_counter because up_down_counter maybe < 0 since it allow negative values let req_active = meter .i64_up_down_counter("http.server.active_requests") .with_description("The number of active HTTP requests.") - .init(); + .build(); let meter_state = MetricState { - registry, metric: Metric { requests_total, req_duration, @@ -419,31 +364,14 @@ impl HttpMetricsLayerBuilder { is_tls: self.is_tls, }; - HttpMetricsLayer { - state: meter_state, - path: self.path, - } - } - - fn build_prometheus(&self) -> (Registry, impl opentelemetry_sdk::metrics::reader::MetricReader) { - let registry = if let Some(prefix) = self.prefix.clone() { - Registry::new_custom(Some(prefix), self.labels.clone()).expect("create prometheus registry") - } else { - Registry::new() - }; - // init prometheus exporter - let exporter = opentelemetry_prometheus::exporter() - .with_registry(registry.clone()) - .build() - .unwrap(); - (registry, exporter) + HttpMetricsLayer { state: meter_state } } /// init otlp metrics exporter /// read from env var: /// OTEL_EXPORTER_OTLP_METRICS_ENDPOINT, OTEL_EXPORTER_OTLP_METRICS_HEADERS,OTEL_EXPORTER_OTLP_METRICS_TIMEOUT /// ref https://github.com/tokio-rs/tracing-opentelemetry/blob/5e3354ec24debcfbf856bfd1eb7022459dca1e6a/examples/opentelemetry-otlp.rs#L32 - fn build_otlp(&self) -> impl opentelemetry_sdk::metrics::reader::MetricReader { + fn build_otlp(&self) -> PeriodicReader { let protocol = match env::var("OTEL_EXPORTER_OTLP_METRICS_PROTOCOL") .ok() .or(env::var("OTEL_EXPORTER_OTLP_PROTOCOL").ok()) @@ -453,25 +381,22 @@ impl HttpMetricsLayerBuilder { }; let exporter = if protocol.starts_with("http") { - opentelemetry_otlp::new_exporter() - .http() - .build_metrics_exporter( - Box::new(DefaultTemporalitySelector::new()), - ) + opentelemetry_otlp::MetricExporter::builder() + .with_http() + .with_temporality(Temporality::default()) + .build() .unwrap() } else { - opentelemetry_otlp::new_exporter() - .tonic() - .build_metrics_exporter( - Box::new(DefaultTemporalitySelector::new()), - ) + opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_temporality(Temporality::default()) + .build() .unwrap() }; - let reader = PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio) + PeriodicReader::builder(exporter, opentelemetry_sdk::runtime::Tokio) .with_interval(std::time::Duration::from_secs(30)) - .build(); - reader + .build() } } @@ -528,9 +453,9 @@ where return "https".to_string(); } if let Some(scheme) = req.headers().get("X-Url-Scheme") { - return scheme.to_str().unwrap().to_string(); + scheme.to_str().unwrap().to_string() } else { - return "http".to_string(); + "http".to_string() } })() }; @@ -627,10 +552,7 @@ where let res_size = response.body().size_hint().upper().unwrap_or(0); let labels = [ - KeyValue { - key: Key::from("http.request.method"), - value: Value::from(this.method.clone()), - }, + KeyValue::new("http.request.method", this.method.clone()), KeyValue::new("http.route", this.path.clone()), KeyValue::new("http.response.status_code", status), // server.address: Name of the local HTTP server that received the request. @@ -656,6 +578,7 @@ where #[cfg(test)] mod tests { + use crate::HttpMetricsLayer; use crate::HttpMetricsLayerBuilder; use axum::extract::State; use axum::routing::get; @@ -665,8 +588,8 @@ mod tests { use prometheus::{Encoder, Registry, TextEncoder}; use std::sync::Arc; - #[test] - fn test_prometheus_exporter() { + #[tokio::test] + async fn test_prometheus_exporter() { let _cx = Context::current(); let registry = Registry::new(); @@ -685,8 +608,8 @@ mod tests { let meter = global::meter("my-app"); // Use two instruments - let counter = meter.u64_counter("a.counter").with_description("Counts things").init(); - let recorder = meter.u64_histogram("a.histogram").with_description("Records values").init(); + let counter = meter.u64_counter("a.counter").with_description("Counts things").build(); + let recorder = meter.u64_histogram("a.histogram").with_description("Records values").build(); counter.add(100, &[KeyValue::new("key", "value")]); recorder.record(100, &[KeyValue::new("key", "value")]); @@ -699,12 +622,21 @@ mod tests { println!("{}", String::from_utf8(result).unwrap()); } - #[test] - fn test_builder() { - let metrics = HttpMetricsLayerBuilder::new().build(); - let _app = Router::new() + #[tokio::test] + async fn test_prom_exporter_builder() { + let metrics = HttpMetricsLayerBuilder::::new().build(); + let _app = Router::::new() // export metrics at `/metrics` endpoint - .merge(metrics.routes::<()>()) + .route( + "/metrics", + get(|| async { + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + encoder.encode(&prometheus::gather(), &mut buffer).unwrap(); + // return metrics + String::from_utf8(buffer).unwrap() + }), + ) .route("/", get(handler)) .route("/hello", get(handler)) .route("/world", get(handler)) @@ -716,15 +648,23 @@ mod tests { } } - #[test] - fn test_builder_with_state_router() { + #[tokio::test] + async fn test_builder_with_state_router() { #[derive(Clone)] struct AppState {} - let metrics = HttpMetricsLayerBuilder::new().build(); + let metrics = HttpMetricsLayerBuilder::::new().build(); let _app: Router = Router::new() - // export metrics at `/metrics` endpoint - .merge(metrics.routes::()) + .route( + "/metrics", + get(|| async { + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + encoder.encode(&prometheus::gather(), &mut buffer).unwrap(); + // return metrics + String::from_utf8(buffer).unwrap() + }), + ) .route("/", get(handler)) .route("/hello", get(handler)) .route("/world", get(handler)) @@ -737,17 +677,25 @@ mod tests { } } - #[test] - fn test_builder_with_arced_skipper() { + #[tokio::test] + async fn test_builder_with_arced_skipper() { #[derive(Clone)] struct AppState {} - let metrics = HttpMetricsLayerBuilder::new() + let metrics = HttpMetricsLayerBuilder::::new() .with_skipper(crate::PathSkipper::new_with_fn(Arc::new(|_: &str| true))) .build(); let _app: Router = Router::new() - // export metrics at `/metrics` endpoint - .merge(metrics.routes::()) + .route( + "/metrics", + get(|| async { + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + encoder.encode(&prometheus::gather(), &mut buffer).unwrap(); + // return metrics + String::from_utf8(buffer).unwrap() + }), + ) .route("/", get(handler)) // add the metrics middleware .layer(metrics)