Skip to content

Commit

Permalink
feat: implementation of collector http client with pure hyper (#853)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomkarw authored Aug 8, 2022
1 parent c15b2f1 commit 399404d
Show file tree
Hide file tree
Showing 15 changed files with 259 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ members = [
"opentelemetry-zpages",
"examples/actix-http",
"examples/actix-http-tracing",
"examples/actix-hyper",
"examples/actix-udp",
"examples/async",
"examples/aws-xray",
Expand Down
13 changes: 13 additions & 0 deletions examples/actix-hyper/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "actix-hyper-example"
version = "0.1.0"
edition = "2018"
publish = false

[dependencies]
opentelemetry = { path = "../../opentelemetry", features = ["rt-tokio"] }
opentelemetry-jaeger = { path = "../../opentelemetry-jaeger", features = ["hyper_collector_client", "rt-tokio-current-thread"] }
actix-web = "4.1.0"
actix-service = "2.0.0"
env_logger = "0.9.0"
tokio = { version = "1", features = ["full"] }
24 changes: 24 additions & 0 deletions examples/actix-hyper/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Actix-web - Jaeger example with HTTP collector and batch exporter

This example shows how to export spans from an actix-web application and ship them
to the collector directly via HTTP.
It uses the batch exporter to avoid excessive network roundtrips to Jaeger.

## Usage

Launch the application:
```shell
# Run jaeger in background
$ docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 jaegertracing/all-in-one:latest

# Start the actix-web server
$ cargo run

# View spans
$ firefox http://localhost:16686/
```

Fire a request:
```bash
curl http://localhost:8088
```
56 changes: 56 additions & 0 deletions examples/actix-hyper/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use actix_service::Service;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer},
Key,
};

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_jaeger::new_collector_pipeline()
.with_endpoint("http://127.0.0.1:14268/api/traces")
.with_service_name("trace-http-demo")
.with_hyper()
.install_batch(opentelemetry::runtime::TokioCurrentThread)
}

async fn index() -> &'static str {
let tracer = global::tracer("request");
tracer.in_span("index", |ctx| {
ctx.span().set_attribute(Key::new("parameter").i64(10));
"Index"
})
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "debug");
env_logger::init();
let tracer = init_tracer().expect("Failed to initialise tracer.");

HttpServer::new(move || {
let tracer = tracer.clone();
App::new()
.wrap(Logger::default())
.wrap_fn(move |req, srv| {
tracer.in_span("middleware", move |cx| {
cx.span()
.set_attribute(Key::new("path").string(req.path().to_string()));
srv.call(req).with_context(cx)
})
})
.route("/", web::get().to(index))
})
.bind("127.0.0.1:8088")
.unwrap()
.run()
.await?;

// wait until all pending spans get exported.
shutdown_tracer_provider();

Ok(())
}
4 changes: 2 additions & 2 deletions opentelemetry-dynatrace/src/metric.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Dynatrace Metric Exporter.
//!
//! Defines an [Exporter] to send metric data to Dynatrace using the [Dynatrace Metrics ingestion protocol].
//! Defines an `Exporter` to send metric data to Dynatrace using the [Dynatrace Metrics ingestion protocol].
//!
//! [Metrics ingestion protocol]: https://www.dynatrace.com/support/help/how-to-use-dynatrace/metrics/metric-ingestion/metric-ingestion-protocol/
#![allow(unused_attributes)]
Expand Down Expand Up @@ -178,7 +178,7 @@ where

/// Set the timestamp to all metric data.
/// If disabled, the ingestion time of the Dynatrace server will be used automatically.
/// Adding timestamps should be disabled in environments, where the system time is unrelible.
/// Adding timestamps should be disabled in environments, where the system time is unreliable.
pub fn with_timestamp(self, value: bool) -> Self {
DynatraceMetricsPipeline {
timestamp: value,
Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ edition = "2018"
async-trait = "0.1"
bytes = "1"
http = "0.2"
hyper = { version = "0.14", default-features = false, features = ["http2", "client", "tcp"], optional = true }
isahc = { version = "1.4", default-features = false, optional = true }
opentelemetry-api = { version = "0.1", path = "../opentelemetry-api" }
reqwest = { version = "0.11", default-features = false, features = ["blocking"], optional = true }
surf = { version = "2.0", default-features = false, optional = true }
tokio = { version = "1.0", default-features = false, features = ["time"], optional = true }
78 changes: 77 additions & 1 deletion opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,25 @@ mod reqwest {
}

#[cfg(feature = "surf")]
mod surf {
pub mod surf {
use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};

#[derive(Debug)]
pub struct BasicAuthMiddleware(pub surf::http::auth::BasicAuth);

#[async_trait]
impl surf::middleware::Middleware for BasicAuthMiddleware {
async fn handle(
&self,
mut req: surf::Request,
client: surf::Client,
next: surf::middleware::Next<'_>,
) -> surf::Result<surf::Response> {
req.insert_header(self.0.name(), self.0.value());
next.run(req, client).await
}
}

#[async_trait]
impl HttpClient for surf::Client {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
Expand Down Expand Up @@ -132,6 +148,66 @@ mod isahc {
}
}

#[cfg(any(feature = "hyper", feature = "hyper_tls"))]
pub mod hyper {
use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};
use http::HeaderValue;
use hyper::client::connect::Connect;
use hyper::Client;
use std::fmt::Debug;
use std::time::Duration;
use tokio::time;

#[derive(Debug, Clone)]
pub struct HyperClient<C> {
inner: Client<C>,
timeout: Duration,
authorization: Option<HeaderValue>,
}

impl<C> HyperClient<C> {
pub fn new_with_timeout(inner: Client<C>, timeout: Duration) -> Self {
Self {
inner,
timeout,
authorization: None,
}
}

pub fn new_with_timeout_and_authorization_header(
inner: Client<C>,
timeout: Duration,
authorization: HeaderValue,
) -> Self {
Self {
inner,
timeout,
authorization: Some(authorization),
}
}
}

#[async_trait]
impl<C> HttpClient for HyperClient<C>
where
C: Connect + Send + Sync + Clone + Debug + 'static,
{
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let (parts, body) = request.into_parts();
let mut request = Request::from_parts(parts, body.into());
if let Some(ref authorization) = self.authorization {
request
.headers_mut()
.insert(http::header::AUTHORIZATION, authorization.clone());
}
let response = time::timeout(self.timeout, self.inner.request(request)).await??;
Ok(Response::builder()
.status(response.status())
.body(hyper::body::to_bytes(response.into_body()).await?)?)
}
}
}

/// Methods to make working with responses from the [`HttpClient`] trait easier.
pub trait ResponseExt: Sized {
/// Turn a response into an error if the HTTP status does not indicate success (200 - 299).
Expand Down
9 changes: 8 additions & 1 deletion opentelemetry-jaeger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ futures-util = { version = "0.3", default-features = false, features = ["std"],
futures-executor = "0.3"
headers = { version = "0.3.2", optional = true }
http = { version = "0.2", optional = true }
hyper = { version = "0.14", default-features = false, features = ["client"], optional = true }
hyper-tls = { version = "0.5.0", default-features = false, optional = true }
isahc = { version = "1.4", default-features = false, optional = true }
js-sys = { version = "0.3", optional = true }
once_cell = "1.12"
Expand Down Expand Up @@ -68,9 +70,12 @@ optional = true
[features]
full = [
"collector_client",
"hyper_collector_client",
"hyper_tls_collector_client",
"isahc_collector_client",
"reqwest_collector_client",
"reqwest_blocking_collector_client",
"reqwest_rustls_collector_client",
"surf_collector_client",
"wasm_collector_client",
"rt-tokio",
Expand All @@ -80,6 +85,8 @@ full = [
]
default = []
collector_client = ["http", "opentelemetry-http"]
hyper_collector_client = ["collector_client", "headers", "http", "hyper", "opentelemetry-http/tokio", "opentelemetry-http/hyper"]
hyper_tls_collector_client = ["hyper_collector_client", "hyper-tls"]
isahc_collector_client = ["isahc", "opentelemetry-http/isahc"]
reqwest_blocking_collector_client = ["reqwest/blocking", "collector_client", "headers", "opentelemetry-http/reqwest"]
reqwest_collector_client = ["reqwest", "collector_client", "headers", "opentelemetry-http/reqwest"]
Expand All @@ -98,4 +105,4 @@ wasm_collector_client = [
rt-tokio = ["tokio", "opentelemetry/rt-tokio"]
rt-tokio-current-thread = ["tokio", "opentelemetry/rt-tokio-current-thread"]
rt-async-std = ["async-std", "opentelemetry/rt-async-std"]
integration_test = ["tonic", "prost", "prost-types", "rt-tokio", "collector_client", "reqwest_collector_client", "surf_collector_client", "isahc_collector_client"]
integration_test = ["tonic", "prost", "prost-types", "rt-tokio", "collector_client", "hyper_collector_client", "hyper_tls_collector_client", "reqwest_collector_client", "surf_collector_client", "isahc_collector_client"]
1 change: 1 addition & 0 deletions opentelemetry-jaeger/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ Then you can use the [`with_collector_endpoint`] method to specify the endpoint:

```rust
// Note that this requires one of the following features enabled so that there is a default http client implementation
// * hyper_collector_client
// * surf_collector_client
// * reqwest_collector_client
// * reqwest_blocking_collector_client
Expand Down
Loading

0 comments on commit 399404d

Please sign in to comment.