diff --git a/Cargo.toml b/Cargo.toml index adc3fabb7c..ed54475e0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "opentelemetry-zpages", "examples/actix-http", "examples/actix-http-tracing", + "examples/actix-hyper", "examples/actix-udp", "examples/async", "examples/aws-xray", diff --git a/examples/actix-hyper/Cargo.toml b/examples/actix-hyper/Cargo.toml new file mode 100644 index 0000000000..f988de44ed --- /dev/null +++ b/examples/actix-hyper/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "actix-hyper-example" +version = "0.1.0" +edition = "2018" +publish = false + +[dependencies] +opentelemetry = { path = "../../opentelemetry", features = ["rt-tokio"] } +opentelemetry-jaeger = { path = "../../opentelemetry-jaeger", features = ["hyper_collector_client", "rt-tokio-current-thread"] } +actix-web = "4.1.0" +actix-service = "2.0.0" +env_logger = "0.9.0" +tokio = { version = "1", features = ["full"] } diff --git a/examples/actix-hyper/README.md b/examples/actix-hyper/README.md new file mode 100644 index 0000000000..229f1cd564 --- /dev/null +++ b/examples/actix-hyper/README.md @@ -0,0 +1,24 @@ +# Actix-web - Jaeger example with HTTP collector and batch exporter + +This example shows how to export spans from an actix-web application and ship them + to the collector directly via HTTP. + It uses the batch exporter to avoid excessive network roundtrips to Jaeger. + +## Usage + +Launch the application: +```shell +# Run jaeger in background +$ docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 jaegertracing/all-in-one:latest + +# Start the actix-web server +$ cargo run + +# View spans +$ firefox http://localhost:16686/ +``` + +Fire a request: +```bash +curl http://localhost:8088 +``` diff --git a/examples/actix-hyper/src/main.rs b/examples/actix-hyper/src/main.rs new file mode 100644 index 0000000000..efafe38d8c --- /dev/null +++ b/examples/actix-hyper/src/main.rs @@ -0,0 +1,56 @@ +use actix_service::Service; +use actix_web::middleware::Logger; +use actix_web::{web, App, HttpServer}; +use opentelemetry::global::shutdown_tracer_provider; +use opentelemetry::trace::TraceError; +use opentelemetry::{global, sdk::trace as sdktrace}; +use opentelemetry::{ + trace::{FutureExt, TraceContextExt, Tracer}, + Key, +}; + +fn init_tracer() -> Result { + opentelemetry_jaeger::new_collector_pipeline() + .with_endpoint("http://127.0.0.1:14268/api/traces") + .with_service_name("trace-http-demo") + .with_hyper() + .install_batch(opentelemetry::runtime::TokioCurrentThread) +} + +async fn index() -> &'static str { + let tracer = global::tracer("request"); + tracer.in_span("index", |ctx| { + ctx.span().set_attribute(Key::new("parameter").i64(10)); + "Index" + }) +} + +#[actix_web::main] +async fn main() -> std::io::Result<()> { + std::env::set_var("RUST_LOG", "debug"); + env_logger::init(); + let tracer = init_tracer().expect("Failed to initialise tracer."); + + HttpServer::new(move || { + let tracer = tracer.clone(); + App::new() + .wrap(Logger::default()) + .wrap_fn(move |req, srv| { + tracer.in_span("middleware", move |cx| { + cx.span() + .set_attribute(Key::new("path").string(req.path().to_string())); + srv.call(req).with_context(cx) + }) + }) + .route("/", web::get().to(index)) + }) + .bind("127.0.0.1:8088") + .unwrap() + .run() + .await?; + + // wait until all pending spans get exported. + shutdown_tracer_provider(); + + Ok(()) +} diff --git a/opentelemetry-dynatrace/src/metric.rs b/opentelemetry-dynatrace/src/metric.rs index 18c2984850..60b9dca264 100644 --- a/opentelemetry-dynatrace/src/metric.rs +++ b/opentelemetry-dynatrace/src/metric.rs @@ -1,6 +1,6 @@ //! Dynatrace Metric Exporter. //! -//! Defines an [Exporter] to send metric data to Dynatrace using the [Dynatrace Metrics ingestion protocol]. +//! Defines an `Exporter` to send metric data to Dynatrace using the [Dynatrace Metrics ingestion protocol]. //! //! [Metrics ingestion protocol]: https://www.dynatrace.com/support/help/how-to-use-dynatrace/metrics/metric-ingestion/metric-ingestion-protocol/ #![allow(unused_attributes)] @@ -178,7 +178,7 @@ where /// Set the timestamp to all metric data. /// If disabled, the ingestion time of the Dynatrace server will be used automatically. - /// Adding timestamps should be disabled in environments, where the system time is unrelible. + /// Adding timestamps should be disabled in environments, where the system time is unreliable. pub fn with_timestamp(self, value: bool) -> Self { DynatraceMetricsPipeline { timestamp: value, diff --git a/opentelemetry-http/Cargo.toml b/opentelemetry-http/Cargo.toml index a9012f60a1..980185c75a 100644 --- a/opentelemetry-http/Cargo.toml +++ b/opentelemetry-http/Cargo.toml @@ -12,7 +12,9 @@ edition = "2018" async-trait = "0.1" bytes = "1" http = "0.2" +hyper = { version = "0.14", default-features = false, features = ["http2", "client", "tcp"], optional = true } isahc = { version = "1.4", default-features = false, optional = true } opentelemetry-api = { version = "0.1", path = "../opentelemetry-api" } reqwest = { version = "0.11", default-features = false, features = ["blocking"], optional = true } surf = { version = "2.0", default-features = false, optional = true } +tokio = { version = "1.0", default-features = false, features = ["time"], optional = true } diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index 84f83b74e5..311c9d7a64 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -87,9 +87,25 @@ mod reqwest { } #[cfg(feature = "surf")] -mod surf { +pub mod surf { use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response}; + #[derive(Debug)] + pub struct BasicAuthMiddleware(pub surf::http::auth::BasicAuth); + + #[async_trait] + impl surf::middleware::Middleware for BasicAuthMiddleware { + async fn handle( + &self, + mut req: surf::Request, + client: surf::Client, + next: surf::middleware::Next<'_>, + ) -> surf::Result { + req.insert_header(self.0.name(), self.0.value()); + next.run(req, client).await + } + } + #[async_trait] impl HttpClient for surf::Client { async fn send(&self, request: Request>) -> Result, HttpError> { @@ -132,6 +148,66 @@ mod isahc { } } +#[cfg(any(feature = "hyper", feature = "hyper_tls"))] +pub mod hyper { + use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response}; + use http::HeaderValue; + use hyper::client::connect::Connect; + use hyper::Client; + use std::fmt::Debug; + use std::time::Duration; + use tokio::time; + + #[derive(Debug, Clone)] + pub struct HyperClient { + inner: Client, + timeout: Duration, + authorization: Option, + } + + impl HyperClient { + pub fn new_with_timeout(inner: Client, timeout: Duration) -> Self { + Self { + inner, + timeout, + authorization: None, + } + } + + pub fn new_with_timeout_and_authorization_header( + inner: Client, + timeout: Duration, + authorization: HeaderValue, + ) -> Self { + Self { + inner, + timeout, + authorization: Some(authorization), + } + } + } + + #[async_trait] + impl HttpClient for HyperClient + where + C: Connect + Send + Sync + Clone + Debug + 'static, + { + async fn send(&self, request: Request>) -> Result, HttpError> { + let (parts, body) = request.into_parts(); + let mut request = Request::from_parts(parts, body.into()); + if let Some(ref authorization) = self.authorization { + request + .headers_mut() + .insert(http::header::AUTHORIZATION, authorization.clone()); + } + let response = time::timeout(self.timeout, self.inner.request(request)).await??; + Ok(Response::builder() + .status(response.status()) + .body(hyper::body::to_bytes(response.into_body()).await?)?) + } + } +} + /// Methods to make working with responses from the [`HttpClient`] trait easier. pub trait ResponseExt: Sized { /// Turn a response into an error if the HTTP status does not indicate success (200 - 299). diff --git a/opentelemetry-jaeger/Cargo.toml b/opentelemetry-jaeger/Cargo.toml index 07313e19c4..cb40455579 100644 --- a/opentelemetry-jaeger/Cargo.toml +++ b/opentelemetry-jaeger/Cargo.toml @@ -27,6 +27,8 @@ futures-util = { version = "0.3", default-features = false, features = ["std"], futures-executor = "0.3" headers = { version = "0.3.2", optional = true } http = { version = "0.2", optional = true } +hyper = { version = "0.14", default-features = false, features = ["client"], optional = true } +hyper-tls = { version = "0.5.0", default-features = false, optional = true } isahc = { version = "1.4", default-features = false, optional = true } js-sys = { version = "0.3", optional = true } once_cell = "1.12" @@ -68,9 +70,12 @@ optional = true [features] full = [ "collector_client", + "hyper_collector_client", + "hyper_tls_collector_client", "isahc_collector_client", "reqwest_collector_client", "reqwest_blocking_collector_client", + "reqwest_rustls_collector_client", "surf_collector_client", "wasm_collector_client", "rt-tokio", @@ -80,6 +85,8 @@ full = [ ] default = [] collector_client = ["http", "opentelemetry-http"] +hyper_collector_client = ["collector_client", "headers", "http", "hyper", "opentelemetry-http/tokio", "opentelemetry-http/hyper"] +hyper_tls_collector_client = ["hyper_collector_client", "hyper-tls"] isahc_collector_client = ["isahc", "opentelemetry-http/isahc"] reqwest_blocking_collector_client = ["reqwest/blocking", "collector_client", "headers", "opentelemetry-http/reqwest"] reqwest_collector_client = ["reqwest", "collector_client", "headers", "opentelemetry-http/reqwest"] @@ -98,4 +105,4 @@ wasm_collector_client = [ rt-tokio = ["tokio", "opentelemetry/rt-tokio"] rt-tokio-current-thread = ["tokio", "opentelemetry/rt-tokio-current-thread"] rt-async-std = ["async-std", "opentelemetry/rt-async-std"] -integration_test = ["tonic", "prost", "prost-types", "rt-tokio", "collector_client", "reqwest_collector_client", "surf_collector_client", "isahc_collector_client"] +integration_test = ["tonic", "prost", "prost-types", "rt-tokio", "collector_client", "hyper_collector_client", "hyper_tls_collector_client", "reqwest_collector_client", "surf_collector_client", "isahc_collector_client"] diff --git a/opentelemetry-jaeger/README.md b/opentelemetry-jaeger/README.md index d994461773..1a9804a936 100644 --- a/opentelemetry-jaeger/README.md +++ b/opentelemetry-jaeger/README.md @@ -109,6 +109,7 @@ Then you can use the [`with_collector_endpoint`] method to specify the endpoint: ```rust // Note that this requires one of the following features enabled so that there is a default http client implementation +// * hyper_collector_client // * surf_collector_client // * reqwest_collector_client // * reqwest_blocking_collector_client diff --git a/opentelemetry-jaeger/src/exporter/config/collector/http_client.rs b/opentelemetry-jaeger/src/exporter/config/collector/http_client.rs index a7319ccc8f..06560e2bf5 100644 --- a/opentelemetry-jaeger/src/exporter/config/collector/http_client.rs +++ b/opentelemetry-jaeger/src/exporter/config/collector/http_client.rs @@ -1,39 +1,12 @@ -#[cfg(feature = "surf_collector_client")] -use async_trait::async_trait; -#[cfg(any( - feature = "reqwest_blocking_collector_client", - feature = "reqwest_collector_client" -))] -use headers::authorization::Credentials; -#[cfg(feature = "isahc_collector_client")] -use isahc::config::Configurable; -use opentelemetry_http::HttpClient as OtelHttpClient; -#[cfg(feature = "surf_collector_client")] -use std::convert::TryInto; +use opentelemetry_http::HttpClient; use std::time::Duration; -#[derive(Debug)] -#[cfg(feature = "surf_collector_client")] -struct BasicAuthMiddleware(surf::http::auth::BasicAuth); - -#[async_trait] -#[cfg(feature = "surf_collector_client")] -impl surf::middleware::Middleware for BasicAuthMiddleware { - async fn handle( - &self, - mut req: surf::Request, - client: surf::Client, - next: surf::middleware::Next<'_>, - ) -> surf::Result { - req.insert_header(self.0.name(), self.0.value()); - next.run(req, client).await - } -} - #[derive(Debug)] pub(crate) enum CollectorHttpClient { None, - Custom(Box), + Custom(Box), + #[cfg(feature = "hyper_collector_client")] + Hyper, #[cfg(feature = "isahc_collector_client")] Isahc, #[cfg(feature = "surf_collector_client")] @@ -46,20 +19,20 @@ pub(crate) enum CollectorHttpClient { impl CollectorHttpClient { // try to build a build in http client if users chose one. If none available return NoHttpClient error - #[allow(unused_variables)] // if the user enabled no build in client features. all parameters are unsed. + #[allow(unused_variables)] // if the user enabled no build in client features. all parameters are unused. pub(crate) fn build_client( self, collector_username: Option, collector_password: Option, collector_timeout: Duration, - ) -> Result, crate::Error> { + ) -> Result, crate::Error> { match self { CollectorHttpClient::Custom(client) => Ok(client), CollectorHttpClient::None => Err(crate::Error::ConfigError { pipeline_name: "http_client", config_name: "collector", reason: - "No http client provided. Consider enable one of the `surf_collector_client`, \ + "No http client provided. Consider enable one of the `hyper_collector_client`, `surf_collector_client`, \ `reqwest_collector_client`, `reqwest_blocking_collector_client`, `isahc_collector_client` \ features to use a build in http client. Or use `with_http_client` method in pipeline to \ provide your own implementation." @@ -67,6 +40,8 @@ impl CollectorHttpClient { }), #[cfg(feature = "isahc_collector_client")] CollectorHttpClient::Isahc => { + use isahc::config::Configurable; + let mut builder = isahc::HttpClient::builder().timeout(collector_timeout); if let (Some(username), Some(password)) = (collector_username, collector_password) { @@ -84,6 +59,9 @@ impl CollectorHttpClient { } #[cfg(feature = "surf_collector_client")] CollectorHttpClient::Surf => { + use std::convert::TryInto; + use opentelemetry_http::surf::BasicAuthMiddleware; + let client: surf::Client = surf::Config::new() .set_timeout(Some(collector_timeout)) .try_into() @@ -106,6 +84,8 @@ impl CollectorHttpClient { } #[cfg(feature = "reqwest_blocking_collector_client")] CollectorHttpClient::ReqwestBlocking => { + use headers::authorization::Credentials; + let mut builder = reqwest::blocking::ClientBuilder::new().timeout(collector_timeout); if let (Some(username), Some(password)) = (collector_username, collector_password) { @@ -126,6 +106,8 @@ impl CollectorHttpClient { } #[cfg(feature = "reqwest_collector_client")] CollectorHttpClient::Reqwest => { + use headers::authorization::Credentials; + let mut builder = reqwest::ClientBuilder::new().timeout(collector_timeout); if let (Some(username), Some(password)) = (collector_username, collector_password) { let mut map = http::HeaderMap::with_capacity(1); @@ -143,6 +125,32 @@ impl CollectorHttpClient { })?; Ok(Box::new(client)) } + #[cfg(any(feature = "hyper_collector_client", feature = "hyper_tls_collector_client"))] + CollectorHttpClient::Hyper => { + use headers::authorization::Credentials; + use opentelemetry_http::hyper::HyperClient; + use hyper::{Client, Body}; + + #[cfg(feature = "hyper_tls_collector_client")] + let inner: Client<_, Body> = Client::builder().build(hyper_tls::HttpsConnector::new()); + #[cfg(feature = "hyper_collector_client")] + let inner: Client<_, Body> = Client::new(); + + let client = if let (Some(username), Some(password)) = + (collector_username, collector_password) + { + let auth_header_val = + headers::Authorization::basic(username.as_str(), password.as_str()); + HyperClient::new_with_timeout_and_authorization_header( + inner, + collector_timeout, + auth_header_val.0.encode(), + ) + } else { + HyperClient::new_with_timeout(inner, collector_timeout) + }; + Ok(Box::new(client)) + } } } } diff --git a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs index 56d2264a29..7460414d4c 100644 --- a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs @@ -59,15 +59,18 @@ const ENV_PASSWORD: &str = "OTEL_EXPORTER_JAEGER_PASSWORD"; /// /// - `OTEL_EXPORTER_JAEGER_PASSWORD`: set the password. Part of the authentication for the collector. It only applies to build in http clients. /// -/// ## Build in http clients +/// ## Built-in http clients /// To help user setup the exporter, `opentelemetry-jaeger` provides the following build in http client /// implementation and relative configurations. /// +/// - [hyper], requires `hyper_collector_client` feature enabled, use [`with_hyper`][CollectorPipeline::with_hyper] function to setup. /// - [surf], requires `surf_collector_client` feature enabled, use [`with_surf`][CollectorPipeline::with_surf] function to setup. /// - [isahc], requires `isahc_collector_client` feature enabled, use [`with_isahc`][CollectorPipeline::with_isahc] function to setup. /// - [reqwest], requires `reqwest_collector_client` feature enabled, use [`with_reqwest`][CollectorPipeline::with_reqwest] function to setup. /// - [reqwest blocking client], requires `reqwest_blocking_collector_client` feature enabled, use [`with_reqwest_blocking`][CollectorPipeline::with_surf] function to setup. /// +/// Additionally you can enable https +/// /// Note that the functions to setup build in http clients override each other. That means if you have a pipeline with the following setup /// /// ```no_run @@ -82,8 +85,6 @@ const ENV_PASSWORD: &str = "OTEL_EXPORTER_JAEGER_PASSWORD"; /// /// The pipeline will use [reqwest] http client. /// -/// [surf]:https://docs.rs/surf/latest/surf/ -/// [isahc]: https://docs.rs/isahc/latest/isahc/ /// [reqwest]: reqwest::Client /// [reqwest blocking client]: reqwest::blocking::Client #[derive(Debug)] @@ -93,7 +94,7 @@ pub struct CollectorPipeline { #[cfg(feature = "collector_client")] collector_timeout: Duration, - // only used by buildin http clients. + // only used by builtin http clients. collector_endpoint: Option>, collector_username: Option, collector_password: Option, @@ -272,7 +273,7 @@ impl CollectorPipeline { (&self.collector_password).clone() } - /// Custom the http client used to send spans. + /// Custom http client used to send spans. /// /// **Note** that all configuration other than the [`endpoint`][CollectorPipeline::with_endpoint] are not /// applicable to custom clients. @@ -333,6 +334,17 @@ impl CollectorPipeline { } } + /// Use hyper http client in the exporter. + #[cfg(feature = "hyper_collector_client")] + pub fn with_hyper(self) -> Self { + Self { + client_config: ClientConfig::Http { + client_type: CollectorHttpClient::Hyper, + }, + ..self + } + } + /// Set the service name of the application. It generally is the name of application. /// Critically, Jaeger backend depends on `Span.Process.ServiceName` to identify the service /// that produced the spans. diff --git a/opentelemetry-jaeger/src/lib.rs b/opentelemetry-jaeger/src/lib.rs index d4c00f3608..7aeb16dfb6 100644 --- a/opentelemetry-jaeger/src/lib.rs +++ b/opentelemetry-jaeger/src/lib.rs @@ -248,6 +248,8 @@ //! //! * `collector_client`: Export span data directly to a Jaeger collector. User MUST provide the http client. //! +//! * `hyper_collector_client`: Export span data with Jaeger collector backed by a hyper default http client. +//! //! * `surf_collector_client`: Export span data with Jaeger collector backed by a surf default http client. //! //! * `reqwest_collector_client`: Export span data with Jaeger collector backed by a reqwest http client. diff --git a/opentelemetry-jaeger/tests/integration_test.rs b/opentelemetry-jaeger/tests/integration_test.rs index d5ff2ada6f..08c1315dcf 100644 --- a/opentelemetry-jaeger/tests/integration_test.rs +++ b/opentelemetry-jaeger/tests/integration_test.rs @@ -110,6 +110,17 @@ mod tests { .expect("cannot create tracer using default configuration") }), ), + ( + "collector_hyper", + Box::new(|| { + opentelemetry_jaeger::new_collector_pipeline() + .with_endpoint(collector_endpoint) + .with_hyper() + .with_service_name(format!("{}-{}", SERVICE_NAME, "collector_hyper")) + .install_batch(opentelemetry::runtime::Tokio) + .expect("cannot create tracer using default configuration") + }), + ), ]; for (name, build_tracer) in test_cases { diff --git a/opentelemetry-proto/src/transform/metrics.rs b/opentelemetry-proto/src/transform/metrics.rs index 1c64139a2d..d95574dbfe 100644 --- a/opentelemetry-proto/src/transform/metrics.rs +++ b/opentelemetry-proto/src/transform/metrics.rs @@ -19,7 +19,7 @@ pub mod tonic { use opentelemetry::{Key, Value}; - /// Convert [`Number`](opentelemetry::metrics::NumberKind) to target type based + /// Convert [`Number`](opentelemetry::metrics::Number) to target type based /// on it's [`NumberKind`](opentelemetry::metrics::NumberKind). pub trait FromNumber { fn from_number(number: Number, number_kind: &NumberKind) -> Self; diff --git a/scripts/lint.sh b/scripts/lint.sh index 43ed731cec..c5217f6dc5 100755 --- a/scripts/lint.sh +++ b/scripts/lint.sh @@ -33,6 +33,8 @@ if rustup component add clippy; then cargo_feature opentelemetry-jaeger "isahc_collector_client" cargo_feature opentelemetry-jaeger "reqwest_blocking_collector_client" cargo_feature opentelemetry-jaeger "reqwest_collector_client" + cargo_feature opentelemetry-jaeger "hyper_collector_client" + cargo_feature opentelemetry-jaeger "hyper_tls_collector_client" cargo_feature opentelemetry-jaeger "collector_client" cargo_feature opentelemetry-jaeger "wasm_collector_client" cargo_feature opentelemetry-jaeger "collector_client, wasm_collector_client"