Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implementation of collector http client with pure hyper #853

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ members = [
"opentelemetry-zpages",
"examples/actix-http",
"examples/actix-http-tracing",
"examples/actix-hyper",
"examples/actix-udp",
"examples/async",
"examples/aws-xray",
Expand Down
13 changes: 13 additions & 0 deletions examples/actix-hyper/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "actix-hyper-example"
version = "0.1.0"
edition = "2018"
publish = false

[dependencies]
opentelemetry = { path = "../../opentelemetry", features = ["rt-tokio"] }
opentelemetry-jaeger = { path = "../../opentelemetry-jaeger", features = ["hyper_collector_client", "rt-tokio-current-thread"] }
actix-web = "4.1.0"
actix-service = "2.0.0"
env_logger = "0.9.0"
tokio = { version = "1", features = ["full"] }
24 changes: 24 additions & 0 deletions examples/actix-hyper/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Actix-web - Jaeger example with HTTP collector and batch exporter

This example shows how to export spans from an actix-web application and ship them
to the collector directly via HTTP.
It uses the batch exporter to avoid excessive network roundtrips to Jaeger.

## Usage

Launch the application:
```shell
# Run jaeger in background
$ docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 jaegertracing/all-in-one:latest

# Start the actix-web server
$ cargo run

# View spans
$ firefox http://localhost:16686/
```

Fire a request:
```bash
curl http://localhost:8088
```
56 changes: 56 additions & 0 deletions examples/actix-hyper/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use actix_service::Service;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer},
Key,
};

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_jaeger::new_collector_pipeline()
.with_endpoint("http://127.0.0.1:14268/api/traces")
.with_service_name("trace-http-demo")
.with_hyper()
.install_batch(opentelemetry::runtime::TokioCurrentThread)
}

async fn index() -> &'static str {
let tracer = global::tracer("request");
tracer.in_span("index", |ctx| {
ctx.span().set_attribute(Key::new("parameter").i64(10));
"Index"
})
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "debug");
env_logger::init();
let tracer = init_tracer().expect("Failed to initialise tracer.");

HttpServer::new(move || {
let tracer = tracer.clone();
App::new()
.wrap(Logger::default())
.wrap_fn(move |req, srv| {
tracer.in_span("middleware", move |cx| {
cx.span()
.set_attribute(Key::new("path").string(req.path().to_string()));
srv.call(req).with_context(cx)
})
})
.route("/", web::get().to(index))
})
.bind("127.0.0.1:8088")
.unwrap()
.run()
.await?;

// wait until all pending spans get exported.
shutdown_tracer_provider();

Ok(())
}
4 changes: 2 additions & 2 deletions opentelemetry-dynatrace/src/metric.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Dynatrace Metric Exporter.
//!
//! Defines an [Exporter] to send metric data to Dynatrace using the [Dynatrace Metrics ingestion protocol].
//! Defines an `Exporter` to send metric data to Dynatrace using the [Dynatrace Metrics ingestion protocol].
//!
//! [Metrics ingestion protocol]: https://www.dynatrace.com/support/help/how-to-use-dynatrace/metrics/metric-ingestion/metric-ingestion-protocol/
#![allow(unused_attributes)]
Expand Down Expand Up @@ -236,7 +236,7 @@ where

/// Set the timestamp to all metric data.
/// If disabled, the ingestion time of the Dynatrace server will be used automatically.
/// Adding timestamps should be disabled in environments, where the system time is unrelible.
/// Adding timestamps should be disabled in environments, where the system time is unreliable.
pub fn with_timestamp(self, value: bool) -> Self {
DynatraceMetricsPipeline {
timestamp: value,
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ edition = "2018"
async-trait = "0.1"
bytes = "1"
http = "0.2"
hyper = { version = "0.14", default-features = false, features = ["http2", "client", "tcp"], optional = true }
isahc = { version = "1.4", default-features = false, optional = true }
opentelemetry-api = { version = "0.1", path = "../opentelemetry-api" }
reqwest = { version = "0.11", default-features = false, features = ["blocking"], optional = true }
surf = { version = "2.0", default-features = false, optional = true }
tokio = { version = "1.0", default-features = false, features = ["time"], optional = true }
78 changes: 77 additions & 1 deletion opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,25 @@ mod reqwest {
}

#[cfg(feature = "surf")]
mod surf {
pub mod surf {
use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};

#[derive(Debug)]
pub struct BasicAuthMiddleware(pub surf::http::auth::BasicAuth);

#[async_trait]
impl surf::middleware::Middleware for BasicAuthMiddleware {
async fn handle(
&self,
mut req: surf::Request,
client: surf::Client,
next: surf::middleware::Next<'_>,
) -> surf::Result<surf::Response> {
req.insert_header(self.0.name(), self.0.value());
next.run(req, client).await
}
}

#[async_trait]
impl HttpClient for surf::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
Expand Down Expand Up @@ -132,6 +148,66 @@ mod isahc {
}
}

#[cfg(any(feature = "hyper", feature = "hyper_tls"))]
pub mod hyper {
use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};
use http::HeaderValue;
use hyper::client::connect::Connect;
use hyper::Client;
use std::fmt::Debug;
use std::time::Duration;
use tokio::time;

#[derive(Debug, Clone)]
pub struct HyperClient<C> {
inner: Client<C>,
timeout: Duration,
authorization: Option<HeaderValue>,
}

impl<C> HyperClient<C> {
pub fn new_with_timeout(inner: Client<C>, timeout: Duration) -> Self {
Self {
inner,
timeout,
authorization: None,
}
}

pub fn new_with_timeout_and_authorization_header(
inner: Client<C>,
timeout: Duration,
authorization: HeaderValue,
) -> Self {
Self {
inner,
timeout,
authorization: Some(authorization),
}
}
}

#[async_trait]
impl<C> HttpClient for HyperClient<C>
where
C: Connect + Send + Sync + Clone + Debug + 'static,
{
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let (parts, body) = request.into_parts();
let mut request = Request::from_parts(parts, body.into());
if let Some(ref authorization) = self.authorization {
request
.headers_mut()
.insert(http::header::AUTHORIZATION, authorization.clone());
}
let response = time::timeout(self.timeout, self.inner.request(request)).await??;
Ok(Response::builder()
.status(response.status())
.body(hyper::body::to_bytes(response.into_body()).await?)?)
}
}
}

/// Methods to make working with responses from the [`HttpClient`] trait easier.
pub trait ResponseExt: Sized {
/// Turn a response into an error if the HTTP status does not indicate success (200 - 299).
Expand Down
9 changes: 8 additions & 1 deletion opentelemetry-jaeger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ futures-util = { version = "0.3", default-features = false, features = ["std"],
futures-executor = "0.3"
headers = { version = "0.3.2", optional = true }
http = { version = "0.2", optional = true }
hyper = { version = "0.14", default-features = false, features = ["client"], optional = true }
hyper-tls = { version = "0.5.0", default-features = false, optional = true }
isahc = { version = "1.4", default-features = false, optional = true }
js-sys = { version = "0.3", optional = true }
once_cell = "1.12"
Expand Down Expand Up @@ -68,9 +70,12 @@ optional = true
[features]
full = [
"collector_client",
"hyper_collector_client",
"hyper_tls_collector_client",
"isahc_collector_client",
"reqwest_collector_client",
"reqwest_blocking_collector_client",
"reqwest_rustls_collector_client",
"surf_collector_client",
"wasm_collector_client",
"rt-tokio",
Expand All @@ -80,6 +85,8 @@ full = [
]
default = []
collector_client = ["http", "opentelemetry-http"]
hyper_collector_client = ["collector_client", "headers", "http", "hyper", "opentelemetry-http/tokio", "opentelemetry-http/hyper"]
hyper_tls_collector_client = ["hyper_collector_client", "hyper-tls"]
isahc_collector_client = ["isahc", "opentelemetry-http/isahc"]
reqwest_blocking_collector_client = ["reqwest/blocking", "collector_client", "headers", "opentelemetry-http/reqwest"]
reqwest_collector_client = ["reqwest", "collector_client", "headers", "opentelemetry-http/reqwest"]
Expand All @@ -98,4 +105,4 @@ wasm_collector_client = [
rt-tokio = ["tokio", "opentelemetry/rt-tokio"]
rt-tokio-current-thread = ["tokio", "opentelemetry/rt-tokio-current-thread"]
rt-async-std = ["async-std", "opentelemetry/rt-async-std"]
integration_test = ["tonic", "prost", "prost-types", "rt-tokio", "collector_client", "reqwest_collector_client", "surf_collector_client", "isahc_collector_client"]
integration_test = ["tonic", "prost", "prost-types", "rt-tokio", "collector_client", "hyper_collector_client", "hyper_tls_collector_client", "reqwest_collector_client", "surf_collector_client", "isahc_collector_client"]
1 change: 1 addition & 0 deletions opentelemetry-jaeger/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ Then you can use the [`with_collector_endpoint`] method to specify the endpoint:

```rust
// Note that this requires one of the following features enabled so that there is a default http client implementation
// * hyper_collector_client
// * surf_collector_client
// * reqwest_collector_client
// * reqwest_blocking_collector_client
Expand Down
Loading