From d5d769260ce36156889c2ee65a4421f6b93a82c5 Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Wed, 13 Apr 2022 10:43:08 -0700 Subject: [PATCH 01/11] Add support for concurrent exports Applications generating significant span volume can end up dropping data due to the synchronous export step. According to the opentelemetry spec, This function will never be called concurrently for the same exporter instance. It can be called again only after the current call returns. However, it does not place a restriction on concurrent I/O or anything of that nature. There is an [ongoing discussion] about tweaking the language to make this more clear. With that in mind, this commit makes the exporters return a future that can be spawned concurrently. Unfortunately, this means that the `export()` method can no longer be async while taking &mut self. The latter is desirable to enforce the no concurrent calls line of the spec, so the choice is made here to return a future instead with the lifetime decoupled from self. This resulted in a bit of additional verbosity, but for the most part the async code can still be shoved into an async fn for the ergonomics. The main exception to this is the `jaeger` exporter which internally requires a bunch of mutable references. I plan to discuss with the opentelemetry team the overall goal of this PR and get buy-in before making more invasive changes to support this in the jaeger exporter. [ongoing discussion]: https://github.com/open-telemetry/opentelemetry-specification/issues/2434 --- Cargo.toml | 18 +-- opentelemetry-datadog/Cargo.toml | 1 + opentelemetry-datadog/src/exporter/mod.rs | 74 ++++++---- opentelemetry-otlp/src/exporter/http.rs | 11 +- opentelemetry-otlp/src/span.rs | 126 ++++++++++++------ opentelemetry-sdk/Cargo.toml | 1 + opentelemetry-sdk/src/export/trace/mod.rs | 5 +- opentelemetry-sdk/src/export/trace/stdout.rs | 23 +++- opentelemetry-sdk/src/runtime.rs | 10 +- opentelemetry-sdk/src/testing/trace.rs | 35 +++-- opentelemetry-sdk/src/trace/span_processor.rs | 38 ++++-- opentelemetry-stackdriver/src/lib.rs | 8 +- opentelemetry-zipkin/Cargo.toml | 1 + opentelemetry-zipkin/src/exporter/mod.rs | 39 ++++-- opentelemetry-zipkin/src/exporter/uploader.rs | 9 +- scripts/test.sh | 4 +- 16 files changed, 255 insertions(+), 148 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b7eaef93f6..a595369927 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ "opentelemetry-datadog", "opentelemetry-dynatrace", "opentelemetry-http", - "opentelemetry-jaeger", +# "opentelemetry-jaeger", "opentelemetry-otlp", "opentelemetry-prometheus", "opentelemetry-proto", @@ -16,24 +16,24 @@ members = [ "opentelemetry-stackdriver", "opentelemetry-zipkin", "opentelemetry-zpages", - "examples/actix-http", - "examples/actix-http-tracing", - "examples/actix-udp", - "examples/async", +# "examples/actix-http", +# "examples/actix-http-tracing", +# "examples/actix-udp", +# "examples/async", "examples/aws-xray", - "examples/basic", +# "examples/basic", "examples/basic-otlp", "examples/basic-otlp-with-selector", "examples/basic-otlp-http", "examples/datadog", "examples/dynatrace", "examples/external-otlp-tonic-tokio", - "examples/grpc", +# "examples/grpc", "examples/http", "examples/hyper-prometheus", - "examples/tracing-grpc", +# "examples/tracing-grpc", "examples/zipkin", - "examples/multiple-span-processors", +# "examples/multiple-span-processors", "examples/zpages" ] exclude = ["examples/external-otlp-grpcio-async-std"] diff --git a/opentelemetry-datadog/Cargo.toml b/opentelemetry-datadog/Cargo.toml index a2a7af0afd..0669c82c2b 100644 --- a/opentelemetry-datadog/Cargo.toml +++ b/opentelemetry-datadog/Cargo.toml @@ -36,6 +36,7 @@ thiserror = "1.0" itertools = "0.10" http = "0.2" lazy_static = "1.4" +futures = "0.3" [dev-dependencies] base64 = "0.13" diff --git a/opentelemetry-datadog/src/exporter/mod.rs b/opentelemetry-datadog/src/exporter/mod.rs index e928c1c396..efb80d7150 100644 --- a/opentelemetry-datadog/src/exporter/mod.rs +++ b/opentelemetry-datadog/src/exporter/mod.rs @@ -9,7 +9,7 @@ use std::borrow::Cow; use std::fmt::{Debug, Formatter}; use crate::exporter::model::FieldMapping; -use async_trait::async_trait; +use futures::future::BoxFuture; use http::{Method, Request, Uri}; use itertools::Itertools; use opentelemetry::sdk::export::trace; @@ -34,7 +34,7 @@ const DATADOG_TRACE_COUNT_HEADER: &str = "X-Datadog-Trace-Count"; /// Datadog span exporter pub struct DatadogExporter { - client: Box, + client: Arc, request_url: Uri, model_config: ModelConfig, version: ApiVersion, @@ -49,7 +49,7 @@ impl DatadogExporter { model_config: ModelConfig, request_url: Uri, version: ApiVersion, - client: Box, + client: Arc, resource_mapping: Option, name_mapping: Option, service_name_mapping: Option, @@ -64,6 +64,27 @@ impl DatadogExporter { service_name_mapping, } } + + fn build_request(&self, batch: Vec) -> Result>, TraceError> { + let traces: Vec> = group_into_traces(batch); + let trace_count = traces.len(); + let data = self.version.encode( + &self.model_config, + traces, + self.service_name_mapping.clone(), + self.name_mapping.clone(), + self.resource_mapping.clone(), + )?; + let req = Request::builder() + .method(Method::POST) + .uri(self.request_url.clone()) + .header(http::header::CONTENT_TYPE, self.version.content_type()) + .header(DATADOG_TRACE_COUNT_HEADER, trace_count) + .body(data) + .map_err::(Into::into)?; + + Ok(req) + } } impl Debug for DatadogExporter { @@ -94,8 +115,7 @@ pub struct DatadogPipelineBuilder { agent_endpoint: String, trace_config: Option, version: ApiVersion, - client: Option>, - + client: Option>, resource_mapping: Option, name_mapping: Option, service_name_mapping: Option, @@ -122,15 +142,15 @@ impl Default for DatadogPipelineBuilder { not(feature = "reqwest-blocking-client"), feature = "surf-client" ))] - client: Some(Box::new(surf::Client::new())), + client: Some(Arc::new(surf::Client::new())), #[cfg(all( not(feature = "surf-client"), not(feature = "reqwest-blocking-client"), feature = "reqwest-client" ))] - client: Some(Box::new(reqwest::Client::new())), + client: Some(Arc::new(reqwest::Client::new())), #[cfg(feature = "reqwest-blocking-client")] - client: Some(Box::new(reqwest::blocking::Client::new())), + client: Some(Arc::new(reqwest::blocking::Client::new())), } } } @@ -296,7 +316,7 @@ impl DatadogPipelineBuilder { /// Choose the http client used by uploader pub fn with_http_client( mut self, - client: Box, + client: Arc, ) -> Self { self.client = Some(client); self @@ -354,28 +374,24 @@ fn group_into_traces(spans: Vec) -> Vec> { .collect() } -#[async_trait] +async fn send_request( + client: Arc, + request: http::Request>, +) -> trace::ExportResult { + let _ = client.send(request).await?.error_for_status()?; + Ok(()) +} + impl trace::SpanExporter for DatadogExporter { /// Export spans to datadog-agent - async fn export(&mut self, batch: Vec) -> trace::ExportResult { - let traces: Vec> = group_into_traces(batch); - let trace_count = traces.len(); - let data = self.version.encode( - &self.model_config, - traces, - self.service_name_mapping.clone(), - self.name_mapping.clone(), - self.resource_mapping.clone(), - )?; - let req = Request::builder() - .method(Method::POST) - .uri(self.request_url.clone()) - .header(http::header::CONTENT_TYPE, self.version.content_type()) - .header(DATADOG_TRACE_COUNT_HEADER, trace_count) - .body(data) - .map_err::(Into::into)?; - let _ = self.client.send(req).await?.error_for_status()?; - Ok(()) + fn export(&mut self, batch: Vec) -> BoxFuture<'static, trace::ExportResult> { + let request = match self.build_request(batch) { + Ok(req) => req, + Err(err) => return Box::pin(std::future::ready(Err(err))), + }; + + let client = self.client.clone(); + Box::pin(send_request(client, request)) } } diff --git a/opentelemetry-otlp/src/exporter/http.rs b/opentelemetry-otlp/src/exporter/http.rs index d04b2a64e9..7cb20c47d3 100644 --- a/opentelemetry-otlp/src/exporter/http.rs +++ b/opentelemetry-otlp/src/exporter/http.rs @@ -1,6 +1,7 @@ use crate::{ExportConfig, Protocol}; use opentelemetry_http::HttpClient; use std::collections::HashMap; +use std::sync::Arc; /// Configuration of the http transport #[cfg(feature = "http-proto")] @@ -15,7 +16,7 @@ use std::collections::HashMap; )] pub struct HttpConfig { /// Select the HTTP client - pub client: Option>, + pub client: Option>, /// Additional headers to send to the collector. pub headers: Option>, @@ -30,19 +31,19 @@ impl Default for HttpConfig { fn default() -> Self { HttpConfig { #[cfg(feature = "reqwest-blocking-client")] - client: Some(Box::new(reqwest::blocking::Client::new())), + client: Some(Arc::new(reqwest::blocking::Client::new())), #[cfg(all( not(feature = "reqwest-blocking-client"), not(feature = "surf-client"), feature = "reqwest-client" ))] - client: Some(Box::new(reqwest::Client::new())), + client: Some(Arc::new(reqwest::Client::new())), #[cfg(all( not(feature = "reqwest-client"), not(feature = "reqwest-blocking-client"), feature = "surf-client" ))] - client: Some(Box::new(surf::Client::new())), + client: Some(Arc::new(surf::Client::new())), #[cfg(all( not(feature = "reqwest-client"), not(feature = "surf-client"), @@ -78,7 +79,7 @@ impl Default for HttpExporterBuilder { impl HttpExporterBuilder { /// Assign client implementation pub fn with_http_client(mut self, client: T) -> Self { - self.http_config.client = Some(Box::new(client)); + self.http_config.client = Some(Arc::new(client)); self } diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index c1cd803de5..1524ebf85a 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -44,6 +44,7 @@ use { opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest as ProstRequest, prost::Message, std::convert::TryFrom, + std::sync::Arc, }; #[cfg(any(feature = "grpc-sys", feature = "http-proto"))] @@ -269,7 +270,7 @@ pub enum SpanExporter { /// The Collector URL collector_endpoint: Uri, /// The HTTP trace exporter - trace_exporter: Option>, + trace_exporter: Option>, }, } @@ -397,9 +398,74 @@ impl SpanExporter { } } +#[cfg(feature = "grpc-sys")] +async fn grpcio_send_request( + trace_exporter: GrpcioTraceServiceClient, + request: GrpcRequest, + call_options: CallOption, +) -> ExportResult { + let receiver = trace_exporter + .export_async_opt(&request, call_options) + .map_err::(Into::into)?; + receiver.await.map_err::(Into::into)?; + Ok(()) +} + +#[cfg(feature = "tonic")] +async fn tonic_send_request( + trace_exporter: TonicTraceServiceClient, + request: Request, +) -> ExportResult { + trace_exporter + .to_owned() + .export(request) + .await + .map_err::(Into::into)?; + + Ok(()) +} + +#[cfg(feature = "http-proto")] +async fn http_send_request( + batch: Vec, + client: std::sync::Arc, + headers: Option>, + collector_endpoint: Uri, +) -> ExportResult { + let req = ProstRequest { + resource_spans: batch.into_iter().map(Into::into).collect(), + }; + + let mut buf = vec![]; + req.encode(&mut buf) + .map_err::(Into::into)?; + + let mut request = http::Request::builder() + .method(Method::POST) + .uri(collector_endpoint) + .header(CONTENT_TYPE, "application/x-protobuf") + .body(buf) + .map_err::(Into::into)?; + + if let Some(headers) = headers { + for (k, val) in headers { + let value = + HeaderValue::from_str(val.as_ref()).map_err::(Into::into)?; + let key = HeaderName::try_from(&k).map_err::(Into::into)?; + request.headers_mut().insert(key, value); + } + } + + client.send(request).await?; + Ok(()) +} + #[async_trait] impl opentelemetry::sdk::export::trace::SpanExporter for SpanExporter { - async fn export(&mut self, batch: Vec) -> ExportResult { + fn export( + &mut self, + batch: Vec, + ) -> futures::future::BoxFuture<'static, ExportResult> { match self { #[cfg(feature = "grpc-sys")] SpanExporter::Grpcio { @@ -427,11 +493,11 @@ impl opentelemetry::sdk::export::trace::SpanExporter for SpanExporter { call_options = call_options.headers(metadata_builder.build()); } - let receiver = trace_exporter - .export_async_opt(&request, call_options) - .map_err::(Into::into)?; - receiver.await.map_err::(Into::into)?; - Ok(()) + Box::pin(grpcio_send_request( + trace_exporter.clone(), + request, + call_options, + )) } #[cfg(feature = "grpc-tonic")] @@ -457,13 +523,7 @@ impl opentelemetry::sdk::export::trace::SpanExporter for SpanExporter { } } - trace_exporter - .to_owned() - .export(request) - .await - .map_err::(Into::into)?; - - Ok(()) + Box::pin(tonic_send_request(trace_exporter.to_owned(), request)) } #[cfg(feature = "http-proto")] @@ -473,36 +533,16 @@ impl opentelemetry::sdk::export::trace::SpanExporter for SpanExporter { headers, .. } => { - let req = ProstRequest { - resource_spans: batch.into_iter().map(Into::into).collect(), - }; - - let mut buf = vec![]; - req.encode(&mut buf) - .map_err::(Into::into)?; - - let mut request = http::Request::builder() - .method(Method::POST) - .uri(collector_endpoint.clone()) - .header(CONTENT_TYPE, "application/x-protobuf") - .body(buf) - .map_err::(Into::into)?; - - if let Some(headers) = headers.clone() { - for (k, val) in headers { - let value = HeaderValue::from_str(val.as_ref()) - .map_err::(Into::into)?; - let key = - HeaderName::try_from(&k).map_err::(Into::into)?; - request.headers_mut().insert(key, value); - } - } - - if let Some(client) = trace_exporter { - client.send(request).await?; - Ok(()) + if let Some(ref client) = trace_exporter { + let client = Arc::clone(client); + Box::pin(http_send_request( + batch, + client, + headers.clone(), + collector_endpoint.clone(), + )) } else { - Err(crate::Error::NoHttpClient.into()) + Box::pin(std::future::ready(Err(crate::Error::NoHttpClient.into()))) } } } diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index 09073ff893..bbc5074ee5 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -9,6 +9,7 @@ async-trait = { version = "0.1", optional = true } crossbeam-channel = { version = "0.5", optional = true } dashmap = { version = "4.0.1", optional = true } fnv = { version = "1.0", optional = true } +futures = "0.3" futures-channel = "0.3" futures-executor = "0.3" futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] } diff --git a/opentelemetry-sdk/src/export/trace/mod.rs b/opentelemetry-sdk/src/export/trace/mod.rs index daab93371e..ef7a8b5e83 100644 --- a/opentelemetry-sdk/src/export/trace/mod.rs +++ b/opentelemetry-sdk/src/export/trace/mod.rs @@ -1,6 +1,6 @@ //! Trace exporters use crate::Resource; -use async_trait::async_trait; +use futures::future::BoxFuture; use opentelemetry_api::trace::{Event, Link, SpanContext, SpanId, SpanKind, Status, TraceError}; use std::borrow::Cow; use std::fmt::Debug; @@ -18,7 +18,6 @@ pub type ExportResult = Result<(), TraceError>; /// The goal of the interface is to minimize burden of implementation for /// protocol-dependent telemetry exporters. The protocol exporter is expected to /// be primarily a simple telemetry data encoder and transmitter. -#[async_trait] pub trait SpanExporter: Send + Debug { /// Exports a batch of readable spans. Protocol exporters that will /// implement this function are typically expected to serialize and transmit @@ -32,7 +31,7 @@ pub trait SpanExporter: Send + Debug { /// /// Any retry logic that is required by the exporter is the responsibility /// of the exporter. - async fn export(&mut self, batch: Vec) -> ExportResult; + fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult>; /// Shuts down the exporter. Called when SDK is shut down. This is an /// opportunity for exporter to do any cleanup required. diff --git a/opentelemetry-sdk/src/export/trace/stdout.rs b/opentelemetry-sdk/src/export/trace/stdout.rs index c88c897ee7..417dae6356 100644 --- a/opentelemetry-sdk/src/export/trace/stdout.rs +++ b/opentelemetry-sdk/src/export/trace/stdout.rs @@ -133,20 +133,31 @@ where W: Write + Debug + Send + 'static, { /// Export spans to stdout - async fn export(&mut self, batch: Vec) -> ExportResult { + fn export( + &mut self, + batch: Vec, + ) -> futures::future::BoxFuture<'static, ExportResult> { for span in batch { if self.pretty_print { - self.writer + if let Err(err) = self + .writer .write_all(format!("{:#?}\n", span).as_bytes()) - .map_err(|err| TraceError::ExportFailed(Box::new(Error::from(err))))?; + .map_err(|err| TraceError::ExportFailed(Box::new(Error::from(err)))) + { + return Box::pin(std::future::ready(Err(Into::into(err)))); + } } else { - self.writer + if let Err(err) = self + .writer .write_all(format!("{:?}\n", span).as_bytes()) - .map_err(|err| TraceError::ExportFailed(Box::new(Error::from(err))))?; + .map_err(|err| TraceError::ExportFailed(Box::new(Error::from(err)))) + { + return Box::pin(std::future::ready(Err(Into::into(err)))); + } } } - Ok(()) + Box::pin(std::future::ready(Ok(()))) } } diff --git a/opentelemetry-sdk/src/runtime.rs b/opentelemetry-sdk/src/runtime.rs index c958d19245..9f1e3b71b1 100644 --- a/opentelemetry-sdk/src/runtime.rs +++ b/opentelemetry-sdk/src/runtime.rs @@ -21,7 +21,7 @@ pub trait Runtime: Clone + Send + Sync + 'static { /// A future, which resolves after a previously specified amount of time. The output type is /// not important. - type Delay: Future + Send; + type Delay: Future + Send + Unpin; /// Create a [Stream][futures_util::stream::Stream], which returns a new item every /// [Duration][std::time::Duration]. @@ -52,7 +52,7 @@ pub struct Tokio; #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio")))] impl Runtime for Tokio { type Interval = tokio_stream::wrappers::IntervalStream; - type Delay = tokio::time::Sleep; + type Delay = ::std::pin::Pin>; fn interval(&self, duration: Duration) -> Self::Interval { crate::util::tokio_interval_stream(duration) @@ -63,7 +63,7 @@ impl Runtime for Tokio { } fn delay(&self, duration: Duration) -> Self::Delay { - tokio::time::sleep(duration) + Box::pin(tokio::time::sleep(duration)) } } @@ -77,7 +77,7 @@ pub struct TokioCurrentThread; #[cfg_attr(docsrs, doc(cfg(feature = "rt-tokio-current-thread")))] impl Runtime for TokioCurrentThread { type Interval = tokio_stream::wrappers::IntervalStream; - type Delay = tokio::time::Sleep; + type Delay = ::std::pin::Pin>; fn interval(&self, duration: Duration) -> Self::Interval { crate::util::tokio_interval_stream(duration) @@ -100,7 +100,7 @@ impl Runtime for TokioCurrentThread { } fn delay(&self, duration: Duration) -> Self::Delay { - tokio::time::sleep(duration) + Box::pin(tokio::time::sleep(duration)) } } diff --git a/opentelemetry-sdk/src/testing/trace.rs b/opentelemetry-sdk/src/testing/trace.rs index 03781fe66d..b60df5ea14 100644 --- a/opentelemetry-sdk/src/testing/trace.rs +++ b/opentelemetry-sdk/src/testing/trace.rs @@ -38,13 +38,20 @@ pub struct TestSpanExporter { #[async_trait] impl SpanExporter for TestSpanExporter { - async fn export(&mut self, batch: Vec) -> ExportResult { + fn export( + &mut self, + batch: Vec, + ) -> futures::future::BoxFuture<'static, ExportResult> { for span_data in batch { - self.tx_export + if let Err(err) = self + .tx_export .send(span_data) - .map_err::(Into::into)?; + .map_err::(Into::into) + { + return Box::pin(std::future::ready(Err(Into::into(err)))); + } } - Ok(()) + Box::pin(std::future::ready(Ok(()))) } fn shutdown(&mut self) { @@ -68,15 +75,21 @@ pub struct TokioSpanExporter { tx_shutdown: tokio::sync::mpsc::UnboundedSender<()>, } -#[async_trait] impl SpanExporter for TokioSpanExporter { - async fn export(&mut self, batch: Vec) -> ExportResult { + fn export( + &mut self, + batch: Vec, + ) -> futures::future::BoxFuture<'static, ExportResult> { for span_data in batch { - self.tx_export + if let Err(err) = self + .tx_export .send(span_data) - .map_err::(Into::into)?; + .map_err::(Into::into) + { + return Box::pin(std::future::ready(Err(Into::into(err)))); + } } - Ok(()) + Box::pin(std::future::ready(Ok(()))) } fn shutdown(&mut self) { @@ -144,7 +157,7 @@ impl NoopSpanExporter { #[async_trait::async_trait] impl SpanExporter for NoopSpanExporter { - async fn export(&mut self, _batch: Vec) -> ExportResult { - Ok(()) + fn export(&mut self, _: Vec) -> futures::future::BoxFuture<'static, ExportResult> { + Box::pin(std::future::ready(Ok(()))) } } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 001dc763f9..c06b4b4661 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -302,17 +302,24 @@ impl BatchSpanProcessor { spans.push(span); if spans.len() == config.max_export_batch_size { - let result = export_with_timeout( - config.max_export_timeout, - exporter.as_mut(), - &timeout_runtime, - spans.split_off(0), - ) - .await; - - if let Err(err) = result { - global::handle_error(err); - } + let export = exporter.export(spans.split_off(0)); + let timeout = timeout_runtime.delay(config.max_export_timeout); + let time_out = config.max_export_timeout; + + timeout_runtime.spawn(Box::pin(async move { + // pin_mut!(export); + // pin_mut!(timeout); + let result = match futures::future::select(export, timeout).await { + Either::Left((export_res, _)) => export_res, + Either::Right((_, _)) => { + ExportResult::Err(TraceError::ExportTimedOut(time_out)) + } + }; + + if let Err(err) = result { + global::handle_error(err); + } + })); } } // Span batch interval time reached or a force flush has been invoked, export current spans. @@ -652,9 +659,12 @@ mod tests { D: Fn(Duration) -> DS + 'static + Send + Sync, DS: Future + Send + Sync + 'static, { - async fn export(&mut self, _batch: Vec) -> ExportResult { - (self.delay_fn)(self.delay_for).await; - Ok(()) + fn export( + &mut self, + _batch: Vec, + ) -> futures::future::BoxFuture<'static, ExportResult> { + use futures::FutureExt; + Box::pin((self.delay_fn)(self.delay_for).map(|_| Ok(()))) } } diff --git a/opentelemetry-stackdriver/src/lib.rs b/opentelemetry-stackdriver/src/lib.rs index 22c219672b..6ac7591daf 100644 --- a/opentelemetry-stackdriver/src/lib.rs +++ b/opentelemetry-stackdriver/src/lib.rs @@ -9,6 +9,7 @@ rustdoc::invalid_rust_codeblocks )] +use futures::future::BoxFuture; use std::{ collections::HashMap, fmt, @@ -79,14 +80,13 @@ impl StackDriverExporter { } } -#[async_trait] impl SpanExporter for StackDriverExporter { - async fn export(&mut self, batch: Vec) -> ExportResult { + fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { match self.tx.try_send(batch) { - Err(e) => Err(e.into()), + Err(e) => Box::pin(std::future::ready(Err(e.into()))), Ok(()) => { self.pending_count.fetch_add(1, Ordering::Relaxed); - Ok(()) + Box::pin(std::future::ready(Ok(()))) } } } diff --git a/opentelemetry-zipkin/Cargo.toml b/opentelemetry-zipkin/Cargo.toml index b6f35241bc..0ba0e2505d 100644 --- a/opentelemetry-zipkin/Cargo.toml +++ b/opentelemetry-zipkin/Cargo.toml @@ -38,6 +38,7 @@ http = "0.2" reqwest = { version = "0.11", optional = true, default-features = false } surf = { version = "2.0", optional = true, default-features = false } thiserror = { version = "1.0"} +futures = "0.3" [dev-dependencies] bytes = "1" diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs index 45d5a6f30d..3ef6617f46 100644 --- a/opentelemetry-zipkin/src/exporter/mod.rs +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -3,6 +3,7 @@ mod model; mod uploader; use async_trait::async_trait; +use futures::future::BoxFuture; use http::Uri; use model::endpoint::Endpoint; use opentelemetry::sdk::resource::ResourceDetector; @@ -36,7 +37,7 @@ pub struct Exporter { } impl Exporter { - fn new(local_endpoint: Endpoint, client: Box, collector_endpoint: Uri) -> Self { + fn new(local_endpoint: Endpoint, client: Arc, collector_endpoint: Uri) -> Self { Exporter { local_endpoint, uploader: uploader::Uploader::new(client, collector_endpoint), @@ -56,7 +57,7 @@ pub struct ZipkinPipelineBuilder { service_addr: Option, collector_endpoint: String, trace_config: Option, - client: Option>, + client: Option>, } impl Default for ZipkinPipelineBuilder { @@ -64,7 +65,7 @@ impl Default for ZipkinPipelineBuilder { let timeout = env::get_timeout(); ZipkinPipelineBuilder { #[cfg(feature = "reqwest-blocking-client")] - client: Some(Box::new( + client: Some(Arc::new( reqwest::blocking::Client::builder() .timeout(timeout) .build() @@ -75,7 +76,7 @@ impl Default for ZipkinPipelineBuilder { not(feature = "surf-client"), feature = "reqwest-client" ))] - client: Some(Box::new( + client: Some(Arc::new( reqwest::Client::builder() .timeout(timeout) .build() @@ -86,7 +87,7 @@ impl Default for ZipkinPipelineBuilder { not(feature = "reqwest-blocking-client"), feature = "surf-client" ))] - client: Some(Box::new( + client: Some(Arc::new( surf::Client::try_from(surf::Config::new().set_timeout(Some(timeout))) .unwrap_or_else(|_| surf::Client::new()), )), @@ -211,7 +212,7 @@ impl ZipkinPipelineBuilder { /// Assign client implementation pub fn with_http_client(mut self, client: T) -> Self { - self.client = Some(Box::new(client)); + self.client = Some(Arc::new(client)); self } @@ -234,16 +235,28 @@ impl ZipkinPipelineBuilder { } } +async fn zipkin_export( + batch: Vec, + uploader: uploader::Uploader, + local_endpoint: Endpoint, +) -> trace::ExportResult { + let zipkin_spans = batch + .into_iter() + .map(|span| model::into_zipkin_span(local_endpoint.clone(), span)) + .collect(); + + uploader.upload(zipkin_spans).await +} + #[async_trait] impl trace::SpanExporter for Exporter { /// Export spans to Zipkin collector. - async fn export(&mut self, batch: Vec) -> trace::ExportResult { - let zipkin_spans = batch - .into_iter() - .map(|span| model::into_zipkin_span(self.local_endpoint.clone(), span)) - .collect(); - - self.uploader.upload(zipkin_spans).await + fn export(&mut self, batch: Vec) -> BoxFuture<'static, trace::ExportResult> { + Box::pin(zipkin_export( + batch, + self.uploader.clone(), + self.local_endpoint.clone(), + )) } } diff --git a/opentelemetry-zipkin/src/exporter/uploader.rs b/opentelemetry-zipkin/src/exporter/uploader.rs index cd790e47b0..9ab843034b 100644 --- a/opentelemetry-zipkin/src/exporter/uploader.rs +++ b/opentelemetry-zipkin/src/exporter/uploader.rs @@ -5,15 +5,16 @@ use http::{header::CONTENT_TYPE, Method, Request, Uri}; use opentelemetry::sdk::export::trace::ExportResult; use opentelemetry_http::{HttpClient, ResponseExt}; use std::fmt::Debug; +use std::sync::Arc; -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) enum Uploader { Http(JsonV2Client), } impl Uploader { /// Create a new http uploader - pub(crate) fn new(client: Box, collector_endpoint: Uri) -> Self { + pub(crate) fn new(client: Arc, collector_endpoint: Uri) -> Self { Uploader::Http(JsonV2Client { client, collector_endpoint, @@ -28,9 +29,9 @@ impl Uploader { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct JsonV2Client { - client: Box, + client: Arc, collector_endpoint: Uri, } diff --git a/scripts/test.sh b/scripts/test.sh index a7a3b9d399..cf6e3b363b 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -12,7 +12,7 @@ cargo test --manifest-path=opentelemetry/Cargo.toml --all-features -- --ignored cargo test --manifest-path=opentelemetry/Cargo.toml --all-features cargo test --manifest-path=opentelemetry-contrib/Cargo.toml --all-features -cargo test --manifest-path=opentelemetry-dynatrace/Cargo.toml --all-features -cargo test --manifest-path=opentelemetry-jaeger/Cargo.toml --all-features +# cargo test --manifest-path=opentelemetry-dynatrace/Cargo.toml --all-features +# cargo test --manifest-path=opentelemetry-jaeger/Cargo.toml --all-features cargo test --manifest-path=opentelemetry-otlp/Cargo.toml --features "trace,grpc-sys" --no-default-features cargo test --manifest-path=opentelemetry-zipkin/Cargo.toml --all-features From 293563db38a313a739451cf6b7686ab802eecf09 Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Wed, 20 Apr 2022 17:33:03 -0700 Subject: [PATCH 02/11] SpanProcessor directly manages concurrent exports Prior, export tasks were run in "fire and forget" mode with runtime::spawn. SpanProcessor now manages tasks directly using FuturesUnordered. This enables limiting overall concurrency (and thus memory footprint). Additionally, flush and shutdown logic now spawn an additional task for any unexported spans and wait on _all_ outstanding tasks to complete before returning. --- opentelemetry-sdk/src/trace/span_processor.rs | 280 +++++++++++------- 1 file changed, 173 insertions(+), 107 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index c06b4b4661..f97db13637 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -37,9 +37,12 @@ use crate::export::trace::{ExportResult, SpanData, SpanExporter}; use crate::trace::runtime::{TraceRuntime, TrySend}; use crate::trace::Span; +use futures::select; +use futures::stream::FusedStream; +use futures::Stream; use futures_channel::oneshot; -use futures_util::future::{self, Either}; -use futures_util::{pin_mut, stream, StreamExt as _}; +use futures_util::future::Either; +use futures_util::{stream, StreamExt as _}; use opentelemetry_api::global; use opentelemetry_api::{ trace::{TraceError, TraceResult}, @@ -277,12 +280,157 @@ pub enum BatchMessage { Shutdown(oneshot::Sender), } +use futures::future::BoxFuture; +use futures::stream::FuturesUnordered; + +struct BatchSpanProcessorInternal { + spans: Vec, + export_tasks: FuturesUnordered>, + runtime: R, + exporter: Box, + config: BatchConfig, +} + +impl BatchSpanProcessorInternal { + async fn flush(&mut self, res_channel: Option>) { + let export_task = self.export(); + let task = Box::pin(async move { + let result = export_task.await; + + if let Some(channel) = res_channel { + if let Err(result) = channel.send(result) { + global::handle_error(TraceError::from(format!( + "failed to send flush result: {:?}", + result + ))); + } + } else if let Err(err) = result { + global::handle_error(err); + } + + Ok(()) + }); + + if self.config.max_concurrent_exports == 1 { + let _ = task.await; + } else { + self.export_tasks.push(task); + while let Some(_) = self.export_tasks.next().await {} + } + } + + /// Process a single message + /// + /// A return value of false indicates shutdown + async fn process_message(&mut self, message: BatchMessage) -> bool { + match message { + // Span has finished, add to buffer of pending spans. + BatchMessage::ExportSpan(span) => { + self.spans.push(span); + + if self.spans.len() == self.config.max_export_batch_size { + // If concurrent exports are saturated, wait for one to complete. + // TODO jwilm: ignore max_concurrent_exports == 0 + if self.export_tasks.len() == self.config.max_concurrent_exports { + self.export_tasks.next().await; + } + + let task = self.export(); + + // Special case when not using concurrent exports + if self.config.max_concurrent_exports == 1 { + if let Err(err) = task.await { + global::handle_error(err); + } + } else { + self.export_tasks.push(Box::pin(async move { + if let Err(err) = task.await { + global::handle_error(err); + } + + Ok(()) + })); + } + } + } + // Span batch interval time reached or a force flush has been invoked, export + // current spans. + // + // This is a hint to ensure that any tasks associated with Spans for which the + // SpanProcessor had already received events prior to the call to ForceFlush + // SHOULD be completed as soon as possible, preferably before returning from + // this method. + // + // In particular, if any SpanProcessor has any associated exporter, it SHOULD + // try to call the exporter's Export with all spans for which this was not + // already done and then invoke ForceFlush on it. The built-in SpanProcessors + // MUST do so. If a timeout is specified (see below), the SpanProcessor MUST + // prioritize honoring the timeout over finishing all calls. It MAY skip or + // abort some or all Export or ForceFlush calls it has made to achieve this + // goal. + // + // NB: `force_flush` is not currently implemented on exporters; the equivalent + // would be waiting for exporter tasks to complete. In the case of + // channel-coupled exporters, they will need a `force_flush` implementation to + // properly block. + BatchMessage::Flush(res_channel) => { + self.flush(res_channel).await; + } + // Stream has terminated or processor is shutdown, return to finish execution. + BatchMessage::Shutdown(ch) => { + self.flush(Some(ch)).await; + self.exporter.shutdown(); + return false; + } + } + + true + } + + fn export(&mut self) -> BoxFuture<'static, ExportResult> { + // Batch size check for flush / shutdown. Those methods may be called + // when there's no work to do. + if self.spans.len() == 0 { + return Box::pin(futures::future::ready(Ok(()))); + } + + let export = self.exporter.export(self.spans.split_off(0)); + let timeout = self.runtime.delay(self.config.max_export_timeout); + let time_out = self.config.max_export_timeout; + + Box::pin(async move { + match futures::future::select(export, timeout).await { + Either::Left((export_res, _)) => export_res, + Either::Right((_, _)) => ExportResult::Err(TraceError::ExportTimedOut(time_out)), + } + }) + } + + async fn run(mut self, mut messages: impl Stream + Unpin + FusedStream) { + loop { + select! { + // FuturesUnordered implements Fuse intelligently such that it + // will become eligible again once new tasks are added to it. + _ = self.export_tasks.next() => { + // An export task completed; do we need to do anything with it? + }, + message = messages.next() => { + match message { + Some(message) => { + if !self.process_message(message).await { + break; + } + }, + None => break, + } + }, + } + } + } +} + impl BatchSpanProcessor { - pub(crate) fn new( - mut exporter: Box, - config: BatchConfig, - runtime: R, - ) -> Self { + pub(crate) fn new(exporter: Box, config: BatchConfig, runtime: R) -> Self { let (message_sender, message_receiver) = runtime.batch_message_channel(config.max_queue_size); let ticker = runtime @@ -290,83 +438,17 @@ impl BatchSpanProcessor { .map(|_| BatchMessage::Flush(None)); let timeout_runtime = runtime.clone(); - // Spawn worker process via user-defined spawn function. - runtime.spawn(Box::pin(async move { - let mut spans = Vec::new(); - let mut messages = Box::pin(stream::select(message_receiver, ticker)); - - while let Some(message) = messages.next().await { - match message { - // Span has finished, add to buffer of pending spans. - BatchMessage::ExportSpan(span) => { - spans.push(span); - - if spans.len() == config.max_export_batch_size { - let export = exporter.export(spans.split_off(0)); - let timeout = timeout_runtime.delay(config.max_export_timeout); - let time_out = config.max_export_timeout; - - timeout_runtime.spawn(Box::pin(async move { - // pin_mut!(export); - // pin_mut!(timeout); - let result = match futures::future::select(export, timeout).await { - Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => { - ExportResult::Err(TraceError::ExportTimedOut(time_out)) - } - }; - - if let Err(err) = result { - global::handle_error(err); - } - })); - } - } - // Span batch interval time reached or a force flush has been invoked, export current spans. - BatchMessage::Flush(res_channel) => { - let result = export_with_timeout( - config.max_export_timeout, - exporter.as_mut(), - &timeout_runtime, - spans.split_off(0), - ) - .await; - - if let Some(channel) = res_channel { - if let Err(result) = channel.send(result) { - global::handle_error(TraceError::from(format!( - "failed to send flush result: {:?}", - result - ))); - } - } else if let Err(err) = result { - global::handle_error(err); - } - } - // Stream has terminated or processor is shutdown, return to finish execution. - BatchMessage::Shutdown(ch) => { - let result = export_with_timeout( - config.max_export_timeout, - exporter.as_mut(), - &timeout_runtime, - spans.split_off(0), - ) - .await; - - exporter.shutdown(); - - if let Err(result) = ch.send(result) { - global::handle_error(TraceError::from(format!( - "failed to send batch processor shutdown result: {:?}", - result - ))); - } + let messages = Box::pin(stream::select(message_receiver, ticker)); + let processor = BatchSpanProcessorInternal { + spans: Vec::new(), + export_tasks: FuturesUnordered::new(), + runtime: timeout_runtime, + config, + exporter, + }; - break; - } - } - } - })); + // Spawn worker process via user-defined spawn function. + runtime.spawn(Box::pin(processor.run(messages))); // Return batch processor with link to worker BatchSpanProcessor { message_sender } @@ -385,30 +467,6 @@ impl BatchSpanProcessor { } } -async fn export_with_timeout( - time_out: Duration, - exporter: &mut E, - runtime: &R, - batch: Vec, -) -> ExportResult -where - R: TraceRuntime, - E: SpanExporter + ?Sized, -{ - if batch.is_empty() { - return Ok(()); - } - - let export = exporter.export(batch); - let timeout = runtime.delay(time_out); - pin_mut!(export); - pin_mut!(timeout); - match future::select(export, timeout).await { - Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => ExportResult::Err(TraceError::ExportTimedOut(time_out)), - } -} - /// Batch span processor configuration #[derive(Debug)] pub struct BatchConfig { @@ -428,6 +486,13 @@ pub struct BatchConfig { /// The maximum duration to export a batch of data. max_export_timeout: Duration, + + /// Maximum number of concurrent exports + /// + /// Limits the number of spawned tasks for exports and thus memory consumed + /// by an exporter. A value of 1 will cause exports to be performed + /// synchronously on the BatchSpanProcessor task. + max_concurrent_exports: usize, } impl Default for BatchConfig { @@ -437,6 +502,7 @@ impl Default for BatchConfig { scheduled_delay: Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT), max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, max_export_timeout: Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT), + max_concurrent_exports: 16, // TODO jwilm }; if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE) From 56db3bb6f655276436215fd6e045a2e1633b87c5 Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Wed, 20 Apr 2022 17:53:40 -0700 Subject: [PATCH 03/11] Add configuration for BSP max_concurrent_exports Users may desire to control the level of export concurrency in the batch span processor. There are two special values: max_concurrent_exports = 0: no bound on concurrency max_concurrent_exports = 1: no concurrency, makes everything synchronous on the messaging task. --- opentelemetry-sdk/src/trace/span_processor.rs | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index f97db13637..094cfadf48 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -66,6 +66,10 @@ const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT"; /// Default maximum allowed time to export data. const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000; +/// Default max concurrent exports for BSP +// TODO jwilm: I omitted the OTEL_ prefix here as this is a non-standard config +// value. Should it be kept this way or prefix with OTEL_ for consistency? +const BSP_MAX_CONCURRENT_EXPORTS: usize = 0; /// `SpanProcessor` is an interface which allows hooks for span start and end /// method invocations. The span processors are invoked only when is_recording @@ -330,26 +334,25 @@ impl BatchSpanProcessorInternal { if self.spans.len() == self.config.max_export_batch_size { // If concurrent exports are saturated, wait for one to complete. - // TODO jwilm: ignore max_concurrent_exports == 0 - if self.export_tasks.len() == self.config.max_concurrent_exports { + if self.export_tasks.len() > 0 + && self.export_tasks.len() == self.config.max_concurrent_exports + { self.export_tasks.next().await; } - let task = self.export(); + let export_task = self.export(); + let task = async move { + if let Err(err) = export_task.await { + global::handle_error(err); + } + Ok(()) + }; // Special case when not using concurrent exports if self.config.max_concurrent_exports == 1 { - if let Err(err) = task.await { - global::handle_error(err); - } + let _ = task.await; } else { - self.export_tasks.push(Box::pin(async move { - if let Err(err) = task.await { - global::handle_error(err); - } - - Ok(()) - })); + self.export_tasks.push(Box::pin(task)); } } } @@ -502,7 +505,7 @@ impl Default for BatchConfig { scheduled_delay: Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT), max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, max_export_timeout: Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT), - max_concurrent_exports: 16, // TODO jwilm + max_concurrent_exports: BSP_MAX_CONCURRENT_EXPORTS, }; if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE) @@ -597,6 +600,16 @@ where BatchSpanProcessorBuilder { config, ..self } } + /// Set the maximum number of concurrent exports + /// + /// This setting may be useful for limiting network throughput or memory + /// consumption. + pub fn with_max_concurrent_exports(self, max: usize) -> Self { + let mut config = self.config; + config.max_concurrent_exports = max; + BatchSpanProcessorBuilder { config, ..self } + } + /// Build a batch processor pub fn build(self) -> BatchSpanProcessor { BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime) From a8d1e46be2907ab58bb9a4be37d446a349a6cfed Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Thu, 21 Apr 2022 10:49:30 -0700 Subject: [PATCH 04/11] Implement new SpanExporter API for Jaeger Key points - decouple exporter from uploaders via channel and spawned task - some uploaders are a shared I/O resource and cannot be multiplexed - necessitates a task queue - eg, HttpClient will spawn many I/O tasks internally, AgentUploader is a single I/O resource. Different level of abstraction. - Synchronous API not supported without a Runtime argument. I updated the API to thread one through, but maybe this is undesirable. I'm also exploiting the fact in the Actix examples that it uses Tokio under the hood to pass through the Tokio runtime token. - Tests pass save for a couple of flakey environment ones which is likely a race condition. --- Cargo.toml | 18 ++-- examples/actix-udp/src/main.rs | 8 +- examples/grpc/src/client.rs | 8 +- examples/grpc/src/server.rs | 8 +- opentelemetry-jaeger/Cargo.toml | 15 ++++ .../src/exporter/config/agent.rs | 43 ++++++---- .../src/exporter/config/collector/mod.rs | 3 +- .../src/exporter/config/mod.rs | 8 +- opentelemetry-jaeger/src/exporter/mod.rs | 84 ++++++++++++++----- opentelemetry-jaeger/src/lib.rs | 6 +- opentelemetry-otlp/src/span.rs | 4 +- opentelemetry-sdk/benches/trace.rs | 8 +- opentelemetry-sdk/src/export/trace/stdout.rs | 14 ++-- opentelemetry-sdk/src/trace/span_processor.rs | 6 +- scripts/lint.sh | 9 +- scripts/test.sh | 6 +- 16 files changed, 160 insertions(+), 88 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a595369927..b7eaef93f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ "opentelemetry-datadog", "opentelemetry-dynatrace", "opentelemetry-http", -# "opentelemetry-jaeger", + "opentelemetry-jaeger", "opentelemetry-otlp", "opentelemetry-prometheus", "opentelemetry-proto", @@ -16,24 +16,24 @@ members = [ "opentelemetry-stackdriver", "opentelemetry-zipkin", "opentelemetry-zpages", -# "examples/actix-http", -# "examples/actix-http-tracing", -# "examples/actix-udp", -# "examples/async", + "examples/actix-http", + "examples/actix-http-tracing", + "examples/actix-udp", + "examples/async", "examples/aws-xray", -# "examples/basic", + "examples/basic", "examples/basic-otlp", "examples/basic-otlp-with-selector", "examples/basic-otlp-http", "examples/datadog", "examples/dynatrace", "examples/external-otlp-tonic-tokio", -# "examples/grpc", + "examples/grpc", "examples/http", "examples/hyper-prometheus", -# "examples/tracing-grpc", + "examples/tracing-grpc", "examples/zipkin", -# "examples/multiple-span-processors", + "examples/multiple-span-processors", "examples/zpages" ] exclude = ["examples/external-otlp-grpcio-async-std"] diff --git a/examples/actix-udp/src/main.rs b/examples/actix-udp/src/main.rs index 2a741b92a1..cac9801fb0 100644 --- a/examples/actix-udp/src/main.rs +++ b/examples/actix-udp/src/main.rs @@ -1,14 +1,16 @@ use actix_service::Service; use actix_web::middleware::Logger; use actix_web::{web, App, HttpServer}; +use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceError; use opentelemetry::{global, sdk::trace as sdktrace}; use opentelemetry::{ trace::{FutureExt, TraceContextExt, Tracer}, Key, }; +use opentelemetry_jaeger::JaegerTraceRuntime; -fn init_tracer() -> Result { +fn init_tracer(runtime: R) -> Result { opentelemetry_jaeger::new_agent_pipeline() .with_endpoint("localhost:6831") .with_service_name("trace-udp-demo") @@ -19,7 +21,7 @@ fn init_tracer() -> Result { opentelemetry::KeyValue::new("exporter", "jaeger"), ]), )) - .install_simple() + .install_simple(runtime) } async fn index() -> &'static str { @@ -34,7 +36,7 @@ async fn index() -> &'static str { async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "debug"); env_logger::init(); - let _tracer = init_tracer().expect("Failed to initialise tracer."); + let _tracer = init_tracer(Tokio).expect("Failed to initialise tracer."); HttpServer::new(|| { App::new() diff --git a/examples/grpc/src/client.rs b/examples/grpc/src/client.rs index df2e65ab14..51668cd9b8 100644 --- a/examples/grpc/src/client.rs +++ b/examples/grpc/src/client.rs @@ -3,6 +3,7 @@ use hello_world::HelloRequest; use opentelemetry::global; use opentelemetry::global::shutdown_tracer_provider; use opentelemetry::sdk::propagation::TraceContextPropagator; +use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceResult; use opentelemetry::{ propagation::Injector, @@ -10,6 +11,7 @@ use opentelemetry::{ trace::{TraceContextExt, Tracer as _}, Context, KeyValue, }; +use opentelemetry_jaeger::JaegerTraceRuntime; struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap); @@ -28,16 +30,16 @@ pub mod hello_world { tonic::include_proto!("helloworld"); } -fn tracing_init() -> TraceResult { +fn tracing_init(runtime: R) -> TraceResult { global::set_text_map_propagator(TraceContextPropagator::new()); opentelemetry_jaeger::new_agent_pipeline() .with_service_name("grpc-client") - .install_simple() + .install_simple(runtime) } #[tokio::main] async fn main() -> Result<(), Box> { - let tracer = tracing_init()?; + let tracer = tracing_init(Tokio)?; let mut client = GreeterClient::connect("http://[::1]:50051").await?; let span = tracer.start("client-request"); let cx = Context::current_with_span(span); diff --git a/examples/grpc/src/server.rs b/examples/grpc/src/server.rs index b4f02b5e2e..a454e5f7d8 100644 --- a/examples/grpc/src/server.rs +++ b/examples/grpc/src/server.rs @@ -4,12 +4,14 @@ use hello_world::greeter_server::{Greeter, GreeterServer}; use hello_world::{HelloReply, HelloRequest}; use opentelemetry::global; use opentelemetry::sdk::propagation::TraceContextPropagator; +use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceError; use opentelemetry::{ propagation::Extractor, trace::{Span, Tracer}, KeyValue, }; +use opentelemetry_jaeger::JaegerTraceRuntime; use std::error::Error; pub mod hello_world { @@ -59,16 +61,16 @@ impl Greeter for MyGreeter { } } -fn tracing_init() -> Result { +fn tracing_init(runtime: R) -> Result { global::set_text_map_propagator(TraceContextPropagator::new()); opentelemetry_jaeger::new_agent_pipeline() .with_service_name("grpc-server") - .install_simple() + .install_simple(runtime) } #[tokio::main] async fn main() -> Result<(), Box> { - let _tracer = tracing_init()?; + let _tracer = tracing_init(Tokio)?; let addr = "[::1]:50051".parse()?; let greeter = MyGreeter::default(); diff --git a/opentelemetry-jaeger/Cargo.toml b/opentelemetry-jaeger/Cargo.toml index f020cb87ca..6770eb86ab 100644 --- a/opentelemetry-jaeger/Cargo.toml +++ b/opentelemetry-jaeger/Cargo.toml @@ -22,6 +22,8 @@ rustdoc-args = ["--cfg", "docsrs"] async-std = { version = "1.6", optional = true } async-trait = "0.1" base64 = { version = "0.13", optional = true } +futures = "0.3" +futures-channel = "0.3" futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true } headers = { version = "0.3.2", optional = true } http = { version = "0.2", optional = true } @@ -45,6 +47,7 @@ prost = { version = "0.9.0", optional = true } prost-types = { version = "0.9.0", optional = true } [dev-dependencies] +tokio = { version = "1.0", features = ["net", "sync"] } bytes = "1" futures-executor = "0.3" opentelemetry = { default-features = false, features = ["trace", "testing"], path = "../opentelemetry" } @@ -63,6 +66,18 @@ features = [ optional = true [features] +full = [ + "collector_client", + "isahc_collector_client", + "reqwest_collector_client", + "reqwest_blocking_collector_client", + "surf_collector_client", + "wasm_collector_client", + "rt-tokio", + "rt-tokio-current-thread", + "rt-async-std", + "integration_test" +] default = [] collector_client = ["http", "opentelemetry-http"] isahc_collector_client = ["isahc", "opentelemetry-http/isahc"] diff --git a/opentelemetry-jaeger/src/exporter/config/agent.rs b/opentelemetry-jaeger/src/exporter/config/agent.rs index 28a3f72d96..f637de02ef 100644 --- a/opentelemetry-jaeger/src/exporter/config/agent.rs +++ b/opentelemetry-jaeger/src/exporter/config/agent.rs @@ -228,19 +228,23 @@ impl AgentPipeline { /// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline. /// /// The exporter will send each span to the agent upon the span ends. - pub fn build_simple(mut self) -> Result { + pub fn build_simple( + mut self, + runtime: R, + ) -> Result { let mut builder = sdk::trace::TracerProvider::builder(); let (config, process) = build_config_and_process( self.trace_config.take(), self.transformation_config.service_name.take(), ); - let exporter = Exporter::new( + let (exporter, task) = Exporter::new( process.into(), self.transformation_config.export_instrument_library, self.build_sync_agent_uploader()?, ); + runtime.spawn(Box::pin(task)); builder = builder.with_simple_exporter(exporter); builder = builder.with_config(config); @@ -273,7 +277,9 @@ impl AgentPipeline { self.transformation_config.service_name.take(), ); let uploader = self.build_async_agent_uploader(runtime.clone())?; - let exporter = Exporter::new(process.into(), export_instrument_library, uploader); + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + + runtime.spawn(Box::pin(task)); builder = builder.with_batch_exporter(exporter, runtime); builder = builder.with_config(config); @@ -285,8 +291,11 @@ impl AgentPipeline { /// tracer provider. /// /// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate. - pub fn install_simple(self) -> Result { - let tracer_provider = self.build_simple()?; + pub fn install_simple( + self, + runtime: R, + ) -> Result { + let tracer_provider = self.build_simple(runtime)?; install_tracer_provider_and_get_tracer(tracer_provider) } @@ -317,25 +326,30 @@ impl AgentPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); - let uploader = self.build_async_agent_uploader(runtime)?; - Ok(Exporter::new( - process.into(), - export_instrument_library, - uploader, - )) + let uploader = self.build_async_agent_uploader(runtime.clone())?; + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + + runtime.spawn(Box::pin(task)); + Ok(exporter) } /// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime. - pub fn build_sync_agent_exporter(mut self) -> Result { + pub fn build_sync_agent_exporter( + mut self, + runtime: R, + ) -> Result { let (_, process) = build_config_and_process( self.trace_config.take(), self.transformation_config.service_name.take(), ); - Ok(Exporter::new( + let (exporter, task) = Exporter::new( process.into(), self.transformation_config.export_instrument_library, self.build_sync_agent_uploader()?, - )) + ); + + runtime.spawn(Box::pin(task)); + Ok(exporter) } fn build_async_agent_uploader(self, runtime: R) -> Result, TraceError> @@ -352,6 +366,7 @@ impl AgentPipeline { Ok(Box::new(AsyncUploader::Agent(agent))) } + #[allow(dead_code)] // TODO jwilm fn build_sync_agent_uploader(self) -> Result, TraceError> { let agent = AgentSyncClientUdp::new( self.agent_endpoint?.as_slice(), diff --git a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs index 8736c575e1..18871642fe 100644 --- a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs @@ -411,8 +411,9 @@ impl CollectorPipeline { self.transformation_config.service_name.take(), ); let uploader = self.build_uploader::()?; - let exporter = Exporter::new(process.into(), export_instrument_library, uploader); + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + runtime.spawn(Box::pin(task)); builder = builder.with_batch_exporter(exporter, runtime); builder = builder.with_config(config); diff --git a/opentelemetry-jaeger/src/exporter/config/mod.rs b/opentelemetry-jaeger/src/exporter/config/mod.rs index 8a576d83d9..ec26427ade 100644 --- a/opentelemetry-jaeger/src/exporter/config/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/mod.rs @@ -104,12 +104,14 @@ mod tests { assert_eq!(process.tags.len(), 2); } - #[test] - fn test_read_from_env() { + #[tokio::test] + async fn test_read_from_env() { // OTEL_SERVICE_NAME env var also works env::set_var("OTEL_SERVICE_NAME", "test service"); let builder = new_agent_pipeline(); - let exporter = builder.build_sync_agent_exporter().unwrap(); + let exporter = builder + .build_sync_agent_exporter(opentelemetry::runtime::Tokio) + .unwrap(); assert_eq!(exporter.process.service_name, "test service"); env::set_var("OTEL_SERVICE_NAME", "") } diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index 0d64a739ea..458c9b1790 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -18,8 +18,11 @@ use std::convert::TryFrom; use self::runtime::JaegerTraceRuntime; use self::thrift::jaeger; -use async_trait::async_trait; +use futures::channel::{mpsc, oneshot}; +use futures::future::BoxFuture; +use futures::StreamExt; use std::convert::TryInto; +use std::future::Future; #[cfg(feature = "isahc_collector_client")] #[allow(unused_imports)] // this is actually used to configure authentication @@ -45,10 +48,11 @@ const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version"; /// Jaeger span exporter #[derive(Debug)] pub struct Exporter { + tx: mpsc::Sender<(Vec, oneshot::Sender)>, + // TODO jwilm: this is used in an existing test to check that the service + // name is properly read from the environment. + #[allow(dead_code)] process: jaeger::Process, - /// Whether or not to export instrumentation information. - export_instrumentation_lib: bool, - uploader: Box, } impl Exporter { @@ -56,11 +60,54 @@ impl Exporter { process: jaeger::Process, export_instrumentation_lib: bool, uploader: Box, - ) -> Exporter { - Exporter { - process, - export_instrumentation_lib, - uploader, + ) -> (Exporter, impl Future) { + let (tx, rx) = futures::channel::mpsc::channel(64); + ( + Exporter { + tx, + process: process.clone(), + }, + ExporterTask { + rx, + export_instrumentation_lib, + uploader, + process, + } + .run(), + ) + } +} + +struct ExporterTask { + rx: mpsc::Receiver<(Vec, oneshot::Sender)>, + process: jaeger::Process, + /// Whether or not to export instrumentation information. + export_instrumentation_lib: bool, + uploader: Box, +} + +impl ExporterTask { + async fn run(mut self) { + // TODO jwilm: this might benefit from a ExporterMessage so that we can + // send Shutdown and break the loop. + while let Some((batch, tx)) = self.rx.next().await { + let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); + let process = self.process.clone(); + + for span in batch.into_iter() { + jaeger_spans.push(convert_otel_span_into_jaeger_span( + span, + self.export_instrumentation_lib, + )); + } + + let res = self + .uploader + .upload(jaeger::Batch::new(process, jaeger_spans)) + .await; + + // TODO jwilm: is ignoring the err (fail to send) correct here? + let _ = tx.send(res); } } } @@ -74,23 +121,16 @@ pub struct Process { pub tags: Vec, } -#[async_trait] impl trace::SpanExporter for Exporter { /// Export spans to Jaeger - async fn export(&mut self, batch: Vec) -> trace::ExportResult { - let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); - let process = self.process.clone(); - - for span in batch.into_iter() { - jaeger_spans.push(convert_otel_span_into_jaeger_span( - span, - self.export_instrumentation_lib, - )); + fn export(&mut self, batch: Vec) -> BoxFuture<'static, trace::ExportResult> { + let (tx, rx) = oneshot::channel(); + + if let Err(err) = self.tx.try_send((batch, tx)) { + return Box::pin(futures::future::ready(Err(Into::into(err)))); } - self.uploader - .upload(jaeger::Batch::new(process, jaeger_spans)) - .await + Box::pin(async move { rx.await? }) } } diff --git a/opentelemetry-jaeger/src/lib.rs b/opentelemetry-jaeger/src/lib.rs index c921e0672f..e1ffecb902 100644 --- a/opentelemetry-jaeger/src/lib.rs +++ b/opentelemetry-jaeger/src/lib.rs @@ -24,10 +24,12 @@ //! ```no_run //! use opentelemetry::trace::Tracer; //! use opentelemetry::global; +//! use opentelemetry::runtime::Tokio; //! -//! fn main() -> Result<(), opentelemetry::trace::TraceError> { +//! #[tokio::main] +//! async fn main() -> Result<(), opentelemetry::trace::TraceError> { //! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); -//! let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?; +//! let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple(Tokio)?; //! //! tracer.in_span("doing_work", |cx| { //! // Traced app logic here... diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index 1524ebf85a..bd10626e35 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -30,7 +30,6 @@ use { trace_service::ExportTraceServiceRequest as GrpcRequest, trace_service_grpc::TraceServiceClient as GrpcioTraceServiceClient, }, - std::sync::Arc, }; #[cfg(feature = "http-proto")] @@ -44,11 +43,10 @@ use { opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest as ProstRequest, prost::Message, std::convert::TryFrom, - std::sync::Arc, }; #[cfg(any(feature = "grpc-sys", feature = "http-proto"))] -use std::collections::HashMap; +use {std::collections::HashMap, std::sync::Arc}; use crate::exporter::ExportConfig; use crate::OtlpPipeline; diff --git a/opentelemetry-sdk/benches/trace.rs b/opentelemetry-sdk/benches/trace.rs index 70fbc6edd0..e8dceab942 100644 --- a/opentelemetry-sdk/benches/trace.rs +++ b/opentelemetry-sdk/benches/trace.rs @@ -100,10 +100,12 @@ fn insert_keys(mut map: sdktrace::EvictedHashMap, n: usize) { #[derive(Debug)] struct VoidExporter; -#[async_trait::async_trait] impl SpanExporter for VoidExporter { - async fn export(&mut self, _spans: Vec) -> ExportResult { - Ok(()) + fn export( + &mut self, + _spans: Vec, + ) -> futures::future::BoxFuture<'static, ExportResult> { + Box::pin(futures::future::ready(Ok(()))) } } diff --git a/opentelemetry-sdk/src/export/trace/stdout.rs b/opentelemetry-sdk/src/export/trace/stdout.rs index 417dae6356..2038692038 100644 --- a/opentelemetry-sdk/src/export/trace/stdout.rs +++ b/opentelemetry-sdk/src/export/trace/stdout.rs @@ -146,14 +146,12 @@ where { return Box::pin(std::future::ready(Err(Into::into(err)))); } - } else { - if let Err(err) = self - .writer - .write_all(format!("{:?}\n", span).as_bytes()) - .map_err(|err| TraceError::ExportFailed(Box::new(Error::from(err)))) - { - return Box::pin(std::future::ready(Err(Into::into(err)))); - } + } else if let Err(err) = self + .writer + .write_all(format!("{:?}\n", span).as_bytes()) + .map_err(|err| TraceError::ExportFailed(Box::new(Error::from(err)))) + { + return Box::pin(std::future::ready(Err(Into::into(err)))); } } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 094cfadf48..c59690c59e 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -319,7 +319,7 @@ impl BatchSpanProcessorInternal { let _ = task.await; } else { self.export_tasks.push(task); - while let Some(_) = self.export_tasks.next().await {} + while self.export_tasks.next().await.is_some() {} } } @@ -334,7 +334,7 @@ impl BatchSpanProcessorInternal { if self.spans.len() == self.config.max_export_batch_size { // If concurrent exports are saturated, wait for one to complete. - if self.export_tasks.len() > 0 + if !self.export_tasks.is_empty() && self.export_tasks.len() == self.config.max_concurrent_exports { self.export_tasks.next().await; @@ -393,7 +393,7 @@ impl BatchSpanProcessorInternal { fn export(&mut self) -> BoxFuture<'static, ExportResult> { // Batch size check for flush / shutdown. Those methods may be called // when there's no work to do. - if self.spans.len() == 0 { + if self.spans.is_empty() { return Box::pin(futures::future::ready(Ok(()))); } diff --git a/scripts/lint.sh b/scripts/lint.sh index 43ed731cec..40d5ccfd43 100755 --- a/scripts/lint.sh +++ b/scripts/lint.sh @@ -29,14 +29,7 @@ if rustup component add clippy; then cargo_feature opentelemetry-otlp "http-proto, surf-client, surf/curl-client" cargo_feature opentelemetry-otlp "metrics" - cargo_feature opentelemetry-jaeger "surf_collector_client, surf/curl-client" - cargo_feature opentelemetry-jaeger "isahc_collector_client" - cargo_feature opentelemetry-jaeger "reqwest_blocking_collector_client" - cargo_feature opentelemetry-jaeger "reqwest_collector_client" - cargo_feature opentelemetry-jaeger "collector_client" - cargo_feature opentelemetry-jaeger "wasm_collector_client" - cargo_feature opentelemetry-jaeger "collector_client, wasm_collector_client" - cargo_feature opentelemetry-jaeger "default" + cargo_feature opentelemetry-jaeger "full" cargo_feature opentelemetry-dynatrace "default" cargo_feature opentelemetry-dynatrace "metrics,rt-tokio,reqwest-client" diff --git a/scripts/test.sh b/scripts/test.sh index cf6e3b363b..3e3b6559ea 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -2,7 +2,7 @@ set -eu -cargo test --all "$@" +cargo test --all "$@" -- --test-threads=1 # See https://github.com/rust-lang/cargo/issues/5364 cargo test --manifest-path=opentelemetry/Cargo.toml --no-default-features @@ -12,7 +12,7 @@ cargo test --manifest-path=opentelemetry/Cargo.toml --all-features -- --ignored cargo test --manifest-path=opentelemetry/Cargo.toml --all-features cargo test --manifest-path=opentelemetry-contrib/Cargo.toml --all-features -# cargo test --manifest-path=opentelemetry-dynatrace/Cargo.toml --all-features -# cargo test --manifest-path=opentelemetry-jaeger/Cargo.toml --all-features +cargo test --manifest-path=opentelemetry-dynatrace/Cargo.toml --all-features +cargo test --manifest-path=opentelemetry-jaeger/Cargo.toml --all-features cargo test --manifest-path=opentelemetry-otlp/Cargo.toml --features "trace,grpc-sys" --no-default-features cargo test --manifest-path=opentelemetry-zipkin/Cargo.toml --all-features From 1571e21cd290bed0d484c6daf2f71fbc909b0b20 Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Thu, 5 May 2022 13:30:23 -0700 Subject: [PATCH 05/11] Reduce dependencies on futures The minimal necessary futures library (core, util, futures proper) is now used in all packages touched by the concurrent exporters work. --- opentelemetry-datadog/Cargo.toml | 2 +- opentelemetry-datadog/src/exporter/mod.rs | 2 +- opentelemetry-jaeger/Cargo.toml | 1 - opentelemetry-sdk/Cargo.toml | 3 +-- opentelemetry-sdk/benches/trace.rs | 8 +++---- opentelemetry-sdk/src/export/trace/mod.rs | 2 +- opentelemetry-sdk/src/export/trace/stdout.rs | 6 ++--- opentelemetry-sdk/src/testing/trace.rs | 13 ++++------- opentelemetry-sdk/src/trace/span_processor.rs | 22 +++++++++---------- opentelemetry-stackdriver/src/lib.rs | 3 +-- opentelemetry-zipkin/Cargo.toml | 2 +- opentelemetry-zipkin/src/exporter/mod.rs | 2 +- 12 files changed, 26 insertions(+), 40 deletions(-) diff --git a/opentelemetry-datadog/Cargo.toml b/opentelemetry-datadog/Cargo.toml index 0669c82c2b..840e5e0130 100644 --- a/opentelemetry-datadog/Cargo.toml +++ b/opentelemetry-datadog/Cargo.toml @@ -36,7 +36,7 @@ thiserror = "1.0" itertools = "0.10" http = "0.2" lazy_static = "1.4" -futures = "0.3" +futures-core = "0.3" [dev-dependencies] base64 = "0.13" diff --git a/opentelemetry-datadog/src/exporter/mod.rs b/opentelemetry-datadog/src/exporter/mod.rs index efb80d7150..c9ad748205 100644 --- a/opentelemetry-datadog/src/exporter/mod.rs +++ b/opentelemetry-datadog/src/exporter/mod.rs @@ -9,7 +9,7 @@ use std::borrow::Cow; use std::fmt::{Debug, Formatter}; use crate::exporter::model::FieldMapping; -use futures::future::BoxFuture; +use futures_core::future::BoxFuture; use http::{Method, Request, Uri}; use itertools::Itertools; use opentelemetry::sdk::export::trace; diff --git a/opentelemetry-jaeger/Cargo.toml b/opentelemetry-jaeger/Cargo.toml index 6770eb86ab..8c0263ad97 100644 --- a/opentelemetry-jaeger/Cargo.toml +++ b/opentelemetry-jaeger/Cargo.toml @@ -23,7 +23,6 @@ async-std = { version = "1.6", optional = true } async-trait = "0.1" base64 = { version = "0.13", optional = true } futures = "0.3" -futures-channel = "0.3" futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true } headers = { version = "0.3.2", optional = true } http = { version = "0.2", optional = true } diff --git a/opentelemetry-sdk/Cargo.toml b/opentelemetry-sdk/Cargo.toml index bbc5074ee5..7d810cbdfc 100644 --- a/opentelemetry-sdk/Cargo.toml +++ b/opentelemetry-sdk/Cargo.toml @@ -9,10 +9,9 @@ async-trait = { version = "0.1", optional = true } crossbeam-channel = { version = "0.5", optional = true } dashmap = { version = "4.0.1", optional = true } fnv = { version = "1.0", optional = true } -futures = "0.3" futures-channel = "0.3" futures-executor = "0.3" -futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] } +futures-util = { version = "0.3", default-features = false, features = ["std", "sink", "async-await-macro"] } lazy_static = "1.4" once_cell = "1.10" opentelemetry-api = { version = "0.1", path = "../opentelemetry-api/" } diff --git a/opentelemetry-sdk/benches/trace.rs b/opentelemetry-sdk/benches/trace.rs index e8dceab942..4d75e7493c 100644 --- a/opentelemetry-sdk/benches/trace.rs +++ b/opentelemetry-sdk/benches/trace.rs @@ -1,4 +1,5 @@ use criterion::{criterion_group, criterion_main, Criterion}; +use futures_util::future::BoxFuture; use opentelemetry_api::{ trace::{Span, Tracer, TracerProvider}, Key, KeyValue, @@ -101,11 +102,8 @@ fn insert_keys(mut map: sdktrace::EvictedHashMap, n: usize) { struct VoidExporter; impl SpanExporter for VoidExporter { - fn export( - &mut self, - _spans: Vec, - ) -> futures::future::BoxFuture<'static, ExportResult> { - Box::pin(futures::future::ready(Ok(()))) + fn export(&mut self, _spans: Vec) -> BoxFuture<'static, ExportResult> { + Box::pin(futures_util::future::ready(Ok(()))) } } diff --git a/opentelemetry-sdk/src/export/trace/mod.rs b/opentelemetry-sdk/src/export/trace/mod.rs index ef7a8b5e83..25c09c32d7 100644 --- a/opentelemetry-sdk/src/export/trace/mod.rs +++ b/opentelemetry-sdk/src/export/trace/mod.rs @@ -1,6 +1,6 @@ //! Trace exporters use crate::Resource; -use futures::future::BoxFuture; +use futures_util::future::BoxFuture; use opentelemetry_api::trace::{Event, Link, SpanContext, SpanId, SpanKind, Status, TraceError}; use std::borrow::Cow; use std::fmt::Debug; diff --git a/opentelemetry-sdk/src/export/trace/stdout.rs b/opentelemetry-sdk/src/export/trace/stdout.rs index 2038692038..68bd0b172c 100644 --- a/opentelemetry-sdk/src/export/trace/stdout.rs +++ b/opentelemetry-sdk/src/export/trace/stdout.rs @@ -32,6 +32,7 @@ use crate::export::{ ExportError, }; use async_trait::async_trait; +use futures_util::future::BoxFuture; use opentelemetry_api::{global, trace::TracerProvider}; use std::fmt::Debug; use std::io::{stdout, Stdout, Write}; @@ -133,10 +134,7 @@ where W: Write + Debug + Send + 'static, { /// Export spans to stdout - fn export( - &mut self, - batch: Vec, - ) -> futures::future::BoxFuture<'static, ExportResult> { + fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { for span in batch { if self.pretty_print { if let Err(err) = self diff --git a/opentelemetry-sdk/src/testing/trace.rs b/opentelemetry-sdk/src/testing/trace.rs index b60df5ea14..5d5b338a9b 100644 --- a/opentelemetry-sdk/src/testing/trace.rs +++ b/opentelemetry-sdk/src/testing/trace.rs @@ -7,6 +7,7 @@ use crate::{ InstrumentationLibrary, }; use async_trait::async_trait; +use futures_util::future::BoxFuture; pub use opentelemetry_api::testing::trace::TestSpan; use opentelemetry_api::trace::{SpanContext, SpanId, SpanKind, Status}; use std::fmt::{Display, Formatter}; @@ -38,10 +39,7 @@ pub struct TestSpanExporter { #[async_trait] impl SpanExporter for TestSpanExporter { - fn export( - &mut self, - batch: Vec, - ) -> futures::future::BoxFuture<'static, ExportResult> { + fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { for span_data in batch { if let Err(err) = self .tx_export @@ -76,10 +74,7 @@ pub struct TokioSpanExporter { } impl SpanExporter for TokioSpanExporter { - fn export( - &mut self, - batch: Vec, - ) -> futures::future::BoxFuture<'static, ExportResult> { + fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { for span_data in batch { if let Err(err) = self .tx_export @@ -157,7 +152,7 @@ impl NoopSpanExporter { #[async_trait::async_trait] impl SpanExporter for NoopSpanExporter { - fn export(&mut self, _: Vec) -> futures::future::BoxFuture<'static, ExportResult> { + fn export(&mut self, _: Vec) -> BoxFuture<'static, ExportResult> { Box::pin(std::future::ready(Ok(()))) } } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index c59690c59e..f044472c44 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -37,12 +37,13 @@ use crate::export::trace::{ExportResult, SpanData, SpanExporter}; use crate::trace::runtime::{TraceRuntime, TrySend}; use crate::trace::Span; -use futures::select; -use futures::stream::FusedStream; -use futures::Stream; use futures_channel::oneshot; -use futures_util::future::Either; -use futures_util::{stream, StreamExt as _}; +use futures_util::{ + future::{self, BoxFuture, Either}, + select, + stream::{self, FusedStream, FuturesUnordered}, + Stream, StreamExt as _, +}; use opentelemetry_api::global; use opentelemetry_api::{ trace::{TraceError, TraceResult}, @@ -284,9 +285,6 @@ pub enum BatchMessage { Shutdown(oneshot::Sender), } -use futures::future::BoxFuture; -use futures::stream::FuturesUnordered; - struct BatchSpanProcessorInternal { spans: Vec, export_tasks: FuturesUnordered>, @@ -394,7 +392,7 @@ impl BatchSpanProcessorInternal { // Batch size check for flush / shutdown. Those methods may be called // when there's no work to do. if self.spans.is_empty() { - return Box::pin(futures::future::ready(Ok(()))); + return Box::pin(future::ready(Ok(()))); } let export = self.exporter.export(self.spans.split_off(0)); @@ -402,7 +400,7 @@ impl BatchSpanProcessorInternal { let time_out = self.config.max_export_timeout; Box::pin(async move { - match futures::future::select(export, timeout).await { + match future::select(export, timeout).await { Either::Left((export_res, _)) => export_res, Either::Right((_, _)) => ExportResult::Err(TraceError::ExportTimedOut(time_out)), } @@ -741,8 +739,8 @@ mod tests { fn export( &mut self, _batch: Vec, - ) -> futures::future::BoxFuture<'static, ExportResult> { - use futures::FutureExt; + ) -> futures_util::future::BoxFuture<'static, ExportResult> { + use futures_util::FutureExt; Box::pin((self.delay_fn)(self.delay_for).map(|_| Ok(()))) } } diff --git a/opentelemetry-stackdriver/src/lib.rs b/opentelemetry-stackdriver/src/lib.rs index 6ac7591daf..0f1cfab4e5 100644 --- a/opentelemetry-stackdriver/src/lib.rs +++ b/opentelemetry-stackdriver/src/lib.rs @@ -9,7 +9,6 @@ rustdoc::invalid_rust_codeblocks )] -use futures::future::BoxFuture; use std::{ collections::HashMap, fmt, @@ -22,7 +21,7 @@ use std::{ }; use async_trait::async_trait; -use futures::stream::StreamExt; +use futures::{future::BoxFuture, stream::StreamExt}; use opentelemetry::{ global::handle_error, sdk::{ diff --git a/opentelemetry-zipkin/Cargo.toml b/opentelemetry-zipkin/Cargo.toml index 0ba0e2505d..784372804d 100644 --- a/opentelemetry-zipkin/Cargo.toml +++ b/opentelemetry-zipkin/Cargo.toml @@ -38,7 +38,7 @@ http = "0.2" reqwest = { version = "0.11", optional = true, default-features = false } surf = { version = "2.0", optional = true, default-features = false } thiserror = { version = "1.0"} -futures = "0.3" +futures-core = "0.3" [dev-dependencies] bytes = "1" diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs index 3ef6617f46..c51e27ee9f 100644 --- a/opentelemetry-zipkin/src/exporter/mod.rs +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -3,7 +3,7 @@ mod model; mod uploader; use async_trait::async_trait; -use futures::future::BoxFuture; +use futures_core::future::BoxFuture; use http::Uri; use model::endpoint::Endpoint; use opentelemetry::sdk::resource::ResourceDetector; From 33555dfd0932db4a8c0e7510df5bd3353f94a392 Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Thu, 5 May 2022 14:21:51 -0700 Subject: [PATCH 06/11] Remove runtime from Jaeger's install_simple To keep the API _actually_ simple, we now leverage a thread to run the jaeger exporter internals. --- examples/actix-udp/src/main.rs | 8 ++-- examples/grpc/src/client.rs | 8 ++-- examples/grpc/src/server.rs | 8 ++-- opentelemetry-jaeger/Cargo.toml | 1 + .../src/exporter/config/agent.rs | 43 ++++++------------- .../src/exporter/config/collector/mod.rs | 3 +- .../src/exporter/config/mod.rs | 4 +- opentelemetry-jaeger/src/exporter/mod.rs | 28 ++++++------ opentelemetry-jaeger/src/lib.rs | 3 +- 9 files changed, 40 insertions(+), 66 deletions(-) diff --git a/examples/actix-udp/src/main.rs b/examples/actix-udp/src/main.rs index cac9801fb0..2a741b92a1 100644 --- a/examples/actix-udp/src/main.rs +++ b/examples/actix-udp/src/main.rs @@ -1,16 +1,14 @@ use actix_service::Service; use actix_web::middleware::Logger; use actix_web::{web, App, HttpServer}; -use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceError; use opentelemetry::{global, sdk::trace as sdktrace}; use opentelemetry::{ trace::{FutureExt, TraceContextExt, Tracer}, Key, }; -use opentelemetry_jaeger::JaegerTraceRuntime; -fn init_tracer(runtime: R) -> Result { +fn init_tracer() -> Result { opentelemetry_jaeger::new_agent_pipeline() .with_endpoint("localhost:6831") .with_service_name("trace-udp-demo") @@ -21,7 +19,7 @@ fn init_tracer(runtime: R) -> Result &'static str { @@ -36,7 +34,7 @@ async fn index() -> &'static str { async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "debug"); env_logger::init(); - let _tracer = init_tracer(Tokio).expect("Failed to initialise tracer."); + let _tracer = init_tracer().expect("Failed to initialise tracer."); HttpServer::new(|| { App::new() diff --git a/examples/grpc/src/client.rs b/examples/grpc/src/client.rs index 51668cd9b8..df2e65ab14 100644 --- a/examples/grpc/src/client.rs +++ b/examples/grpc/src/client.rs @@ -3,7 +3,6 @@ use hello_world::HelloRequest; use opentelemetry::global; use opentelemetry::global::shutdown_tracer_provider; use opentelemetry::sdk::propagation::TraceContextPropagator; -use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceResult; use opentelemetry::{ propagation::Injector, @@ -11,7 +10,6 @@ use opentelemetry::{ trace::{TraceContextExt, Tracer as _}, Context, KeyValue, }; -use opentelemetry_jaeger::JaegerTraceRuntime; struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap); @@ -30,16 +28,16 @@ pub mod hello_world { tonic::include_proto!("helloworld"); } -fn tracing_init(runtime: R) -> TraceResult { +fn tracing_init() -> TraceResult { global::set_text_map_propagator(TraceContextPropagator::new()); opentelemetry_jaeger::new_agent_pipeline() .with_service_name("grpc-client") - .install_simple(runtime) + .install_simple() } #[tokio::main] async fn main() -> Result<(), Box> { - let tracer = tracing_init(Tokio)?; + let tracer = tracing_init()?; let mut client = GreeterClient::connect("http://[::1]:50051").await?; let span = tracer.start("client-request"); let cx = Context::current_with_span(span); diff --git a/examples/grpc/src/server.rs b/examples/grpc/src/server.rs index a454e5f7d8..b4f02b5e2e 100644 --- a/examples/grpc/src/server.rs +++ b/examples/grpc/src/server.rs @@ -4,14 +4,12 @@ use hello_world::greeter_server::{Greeter, GreeterServer}; use hello_world::{HelloReply, HelloRequest}; use opentelemetry::global; use opentelemetry::sdk::propagation::TraceContextPropagator; -use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceError; use opentelemetry::{ propagation::Extractor, trace::{Span, Tracer}, KeyValue, }; -use opentelemetry_jaeger::JaegerTraceRuntime; use std::error::Error; pub mod hello_world { @@ -61,16 +59,16 @@ impl Greeter for MyGreeter { } } -fn tracing_init(runtime: R) -> Result { +fn tracing_init() -> Result { global::set_text_map_propagator(TraceContextPropagator::new()); opentelemetry_jaeger::new_agent_pipeline() .with_service_name("grpc-server") - .install_simple(runtime) + .install_simple() } #[tokio::main] async fn main() -> Result<(), Box> { - let _tracer = tracing_init(Tokio)?; + let _tracer = tracing_init()?; let addr = "[::1]:50051".parse()?; let greeter = MyGreeter::default(); diff --git a/opentelemetry-jaeger/Cargo.toml b/opentelemetry-jaeger/Cargo.toml index 8c0263ad97..e679a08f31 100644 --- a/opentelemetry-jaeger/Cargo.toml +++ b/opentelemetry-jaeger/Cargo.toml @@ -24,6 +24,7 @@ async-trait = "0.1" base64 = { version = "0.13", optional = true } futures = "0.3" futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true } +futures-executor = "0.3" headers = { version = "0.3.2", optional = true } http = { version = "0.2", optional = true } isahc = { version = "1.4", default-features = false, optional = true } diff --git a/opentelemetry-jaeger/src/exporter/config/agent.rs b/opentelemetry-jaeger/src/exporter/config/agent.rs index f637de02ef..28a3f72d96 100644 --- a/opentelemetry-jaeger/src/exporter/config/agent.rs +++ b/opentelemetry-jaeger/src/exporter/config/agent.rs @@ -228,23 +228,19 @@ impl AgentPipeline { /// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline. /// /// The exporter will send each span to the agent upon the span ends. - pub fn build_simple( - mut self, - runtime: R, - ) -> Result { + pub fn build_simple(mut self) -> Result { let mut builder = sdk::trace::TracerProvider::builder(); let (config, process) = build_config_and_process( self.trace_config.take(), self.transformation_config.service_name.take(), ); - let (exporter, task) = Exporter::new( + let exporter = Exporter::new( process.into(), self.transformation_config.export_instrument_library, self.build_sync_agent_uploader()?, ); - runtime.spawn(Box::pin(task)); builder = builder.with_simple_exporter(exporter); builder = builder.with_config(config); @@ -277,9 +273,7 @@ impl AgentPipeline { self.transformation_config.service_name.take(), ); let uploader = self.build_async_agent_uploader(runtime.clone())?; - let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); - - runtime.spawn(Box::pin(task)); + let exporter = Exporter::new(process.into(), export_instrument_library, uploader); builder = builder.with_batch_exporter(exporter, runtime); builder = builder.with_config(config); @@ -291,11 +285,8 @@ impl AgentPipeline { /// tracer provider. /// /// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate. - pub fn install_simple( - self, - runtime: R, - ) -> Result { - let tracer_provider = self.build_simple(runtime)?; + pub fn install_simple(self) -> Result { + let tracer_provider = self.build_simple()?; install_tracer_provider_and_get_tracer(tracer_provider) } @@ -326,30 +317,25 @@ impl AgentPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); - let uploader = self.build_async_agent_uploader(runtime.clone())?; - let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); - - runtime.spawn(Box::pin(task)); - Ok(exporter) + let uploader = self.build_async_agent_uploader(runtime)?; + Ok(Exporter::new( + process.into(), + export_instrument_library, + uploader, + )) } /// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime. - pub fn build_sync_agent_exporter( - mut self, - runtime: R, - ) -> Result { + pub fn build_sync_agent_exporter(mut self) -> Result { let (_, process) = build_config_and_process( self.trace_config.take(), self.transformation_config.service_name.take(), ); - let (exporter, task) = Exporter::new( + Ok(Exporter::new( process.into(), self.transformation_config.export_instrument_library, self.build_sync_agent_uploader()?, - ); - - runtime.spawn(Box::pin(task)); - Ok(exporter) + )) } fn build_async_agent_uploader(self, runtime: R) -> Result, TraceError> @@ -366,7 +352,6 @@ impl AgentPipeline { Ok(Box::new(AsyncUploader::Agent(agent))) } - #[allow(dead_code)] // TODO jwilm fn build_sync_agent_uploader(self) -> Result, TraceError> { let agent = AgentSyncClientUdp::new( self.agent_endpoint?.as_slice(), diff --git a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs index 18871642fe..8736c575e1 100644 --- a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs @@ -411,9 +411,8 @@ impl CollectorPipeline { self.transformation_config.service_name.take(), ); let uploader = self.build_uploader::()?; - let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + let exporter = Exporter::new(process.into(), export_instrument_library, uploader); - runtime.spawn(Box::pin(task)); builder = builder.with_batch_exporter(exporter, runtime); builder = builder.with_config(config); diff --git a/opentelemetry-jaeger/src/exporter/config/mod.rs b/opentelemetry-jaeger/src/exporter/config/mod.rs index ec26427ade..7b314c2ec8 100644 --- a/opentelemetry-jaeger/src/exporter/config/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/mod.rs @@ -109,9 +109,7 @@ mod tests { // OTEL_SERVICE_NAME env var also works env::set_var("OTEL_SERVICE_NAME", "test service"); let builder = new_agent_pipeline(); - let exporter = builder - .build_sync_agent_exporter(opentelemetry::runtime::Tokio) - .unwrap(); + let exporter = builder.build_sync_agent_exporter().unwrap(); assert_eq!(exporter.process.service_name, "test service"); env::set_var("OTEL_SERVICE_NAME", "") } diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index 458c9b1790..54a95fee2e 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -22,7 +22,6 @@ use futures::channel::{mpsc, oneshot}; use futures::future::BoxFuture; use futures::StreamExt; use std::convert::TryInto; -use std::future::Future; #[cfg(feature = "isahc_collector_client")] #[allow(unused_imports)] // this is actually used to configure authentication @@ -60,21 +59,20 @@ impl Exporter { process: jaeger::Process, export_instrumentation_lib: bool, uploader: Box, - ) -> (Exporter, impl Future) { + ) -> Exporter { let (tx, rx) = futures::channel::mpsc::channel(64); - ( - Exporter { - tx, - process: process.clone(), - }, - ExporterTask { - rx, - export_instrumentation_lib, - uploader, - process, - } - .run(), - ) + + let exporter_task = ExporterTask { + rx, + export_instrumentation_lib, + uploader, + process: process.clone(), + }; + std::thread::spawn(move || { + futures_executor::block_on(exporter_task.run()); + }); + + Exporter { tx, process } } } diff --git a/opentelemetry-jaeger/src/lib.rs b/opentelemetry-jaeger/src/lib.rs index e1ffecb902..6c96fa8772 100644 --- a/opentelemetry-jaeger/src/lib.rs +++ b/opentelemetry-jaeger/src/lib.rs @@ -24,12 +24,11 @@ //! ```no_run //! use opentelemetry::trace::Tracer; //! use opentelemetry::global; -//! use opentelemetry::runtime::Tokio; //! //! #[tokio::main] //! async fn main() -> Result<(), opentelemetry::trace::TraceError> { //! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); -//! let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple(Tokio)?; +//! let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?; //! //! tracer.in_span("doing_work", |cx| { //! // Traced app logic here... From cd8e0545986d19bd7add4f9fe93803ece511bfef Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Thu, 5 May 2022 15:04:30 -0700 Subject: [PATCH 07/11] Add Arc lost in a rebase --- opentelemetry-zipkin/src/exporter/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/opentelemetry-zipkin/src/exporter/mod.rs b/opentelemetry-zipkin/src/exporter/mod.rs index c51e27ee9f..02e6f60503 100644 --- a/opentelemetry-zipkin/src/exporter/mod.rs +++ b/opentelemetry-zipkin/src/exporter/mod.rs @@ -27,6 +27,7 @@ use std::borrow::Cow; ))] use std::convert::TryFrom; use std::net::SocketAddr; +use std::sync::Arc; use std::time::Duration; /// Zipkin span exporter From 790609ebecc5c49d4d29e9d766900729006fc1a8 Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Fri, 6 May 2022 09:00:41 -0700 Subject: [PATCH 08/11] Fix OTEL_BSP_MAX_CONCURRENT_EXPORTS name and value Per PR feedback, the default should match the previous behavior of 1 batch at a time. --- opentelemetry-sdk/src/trace/span_processor.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index f044472c44..d1283de56d 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -68,9 +68,7 @@ const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT"; /// Default maximum allowed time to export data. const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000; /// Default max concurrent exports for BSP -// TODO jwilm: I omitted the OTEL_ prefix here as this is a non-standard config -// value. Should it be kept this way or prefix with OTEL_ for consistency? -const BSP_MAX_CONCURRENT_EXPORTS: usize = 0; +const OTEL_BSP_MAX_CONCURRENT_EXPORTS: usize = 1; /// `SpanProcessor` is an interface which allows hooks for span start and end /// method invocations. The span processors are invoked only when is_recording @@ -503,7 +501,7 @@ impl Default for BatchConfig { scheduled_delay: Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT), max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, max_export_timeout: Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT), - max_concurrent_exports: BSP_MAX_CONCURRENT_EXPORTS, + max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS, }; if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE) From c98822990525e70a42c17c6af0f3ea3a64403c9b Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Fri, 6 May 2022 09:24:52 -0700 Subject: [PATCH 09/11] Fix remaining TODOs This finishes the remaining TODOs on the concurrent-exports branch. The major change included here adds shutdown functionality to the jaeger exporter which ensures the exporter has finished its tasks before exiting. --- opentelemetry-jaeger/src/exporter/mod.rs | 85 ++++++++++++++++-------- 1 file changed, 58 insertions(+), 27 deletions(-) diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index 54a95fee2e..2dd7c009d4 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -44,14 +44,27 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name"; /// Instrument Library version MUST be reported in Jaeger Span tags with the following key const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version"; +#[derive(Debug)] +enum ExportMessage { + Export { + batch: Vec, + tx: oneshot::Sender, + }, + Shutdown, +} + /// Jaeger span exporter #[derive(Debug)] pub struct Exporter { - tx: mpsc::Sender<(Vec, oneshot::Sender)>, - // TODO jwilm: this is used in an existing test to check that the service - // name is properly read from the environment. + tx: mpsc::Sender, + + // In the switch to concurrent exports, the non-test code which used this + // value was moved into the ExporterTask implementation. However, there's + // still a test that relies on this value being here, thus the + // allow(dead_code). #[allow(dead_code)] process: jaeger::Process, + join_handle: Option>, } impl Exporter { @@ -68,16 +81,21 @@ impl Exporter { uploader, process: process.clone(), }; - std::thread::spawn(move || { + + let join_handle = Some(std::thread::spawn(move || { futures_executor::block_on(exporter_task.run()); - }); + })); - Exporter { tx, process } + Exporter { + tx, + process, + join_handle, + } } } struct ExporterTask { - rx: mpsc::Receiver<(Vec, oneshot::Sender)>, + rx: mpsc::Receiver, process: jaeger::Process, /// Whether or not to export instrumentation information. export_instrumentation_lib: bool, @@ -86,26 +104,30 @@ struct ExporterTask { impl ExporterTask { async fn run(mut self) { - // TODO jwilm: this might benefit from a ExporterMessage so that we can - // send Shutdown and break the loop. - while let Some((batch, tx)) = self.rx.next().await { - let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); - let process = self.process.clone(); - - for span in batch.into_iter() { - jaeger_spans.push(convert_otel_span_into_jaeger_span( - span, - self.export_instrumentation_lib, - )); + while let Some(message) = self.rx.next().await { + match message { + ExportMessage::Export { batch, tx } => { + let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); + let process = self.process.clone(); + + for span in batch.into_iter() { + jaeger_spans.push(convert_otel_span_into_jaeger_span( + span, + self.export_instrumentation_lib, + )); + } + + let res = self + .uploader + .upload(jaeger::Batch::new(process, jaeger_spans)) + .await; + + // Errors here might be completely expected if the receiver didn't + // care about the result. + let _ = tx.send(res); + } + ExportMessage::Shutdown => break, } - - let res = self - .uploader - .upload(jaeger::Batch::new(process, jaeger_spans)) - .await; - - // TODO jwilm: is ignoring the err (fail to send) correct here? - let _ = tx.send(res); } } } @@ -124,12 +146,21 @@ impl trace::SpanExporter for Exporter { fn export(&mut self, batch: Vec) -> BoxFuture<'static, trace::ExportResult> { let (tx, rx) = oneshot::channel(); - if let Err(err) = self.tx.try_send((batch, tx)) { + if let Err(err) = self.tx.try_send(ExportMessage::Export { batch, tx }) { return Box::pin(futures::future::ready(Err(Into::into(err)))); } Box::pin(async move { rx.await? }) } + + fn shutdown(&mut self) { + let _ = self.tx.try_send(ExportMessage::Shutdown); + + // This has the potential to block indefinitely, but as long as all of + // the tasks processed by ExportTask have a timeout, this should join + // eventually. + self.join_handle.take().map(|handle| handle.join()); + } } fn links_to_references(links: sdk::trace::EvictedQueue) -> Option> { From fec6c9f86ce2e57f2ae4fc8d494decb994d4e7a2 Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Tue, 10 May 2022 18:12:58 -0700 Subject: [PATCH 10/11] Restore lint.sh script This was erroneously committed. --- scripts/lint.sh | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/scripts/lint.sh b/scripts/lint.sh index 40d5ccfd43..43ed731cec 100755 --- a/scripts/lint.sh +++ b/scripts/lint.sh @@ -29,7 +29,14 @@ if rustup component add clippy; then cargo_feature opentelemetry-otlp "http-proto, surf-client, surf/curl-client" cargo_feature opentelemetry-otlp "metrics" - cargo_feature opentelemetry-jaeger "full" + cargo_feature opentelemetry-jaeger "surf_collector_client, surf/curl-client" + cargo_feature opentelemetry-jaeger "isahc_collector_client" + cargo_feature opentelemetry-jaeger "reqwest_blocking_collector_client" + cargo_feature opentelemetry-jaeger "reqwest_collector_client" + cargo_feature opentelemetry-jaeger "collector_client" + cargo_feature opentelemetry-jaeger "wasm_collector_client" + cargo_feature opentelemetry-jaeger "collector_client, wasm_collector_client" + cargo_feature opentelemetry-jaeger "default" cargo_feature opentelemetry-dynatrace "default" cargo_feature opentelemetry-dynatrace "metrics,rt-tokio,reqwest-client" From b6470162dfa5d57c35a5b40403deea2590997349 Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Wed, 11 May 2022 08:21:02 -0700 Subject: [PATCH 11/11] Make max concurrent exports env configurable OTEL_BSP_MAX_CONCURRENT_EXPORTS may now be specified in the environment to configure the number of max concurrent exports. This configurable now has parity with the other options of the span_processor. --- opentelemetry-sdk/src/trace/span_processor.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index d1283de56d..8436181d04 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -67,8 +67,11 @@ const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT"; /// Default maximum allowed time to export data. const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000; +/// Environment variable to configure max concurrent exports for batch span +/// processor. +const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS"; /// Default max concurrent exports for BSP -const OTEL_BSP_MAX_CONCURRENT_EXPORTS: usize = 1; +const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1; /// `SpanProcessor` is an interface which allows hooks for span start and end /// method invocations. The span processors are invoked only when is_recording @@ -501,9 +504,16 @@ impl Default for BatchConfig { scheduled_delay: Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT), max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, max_export_timeout: Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT), - max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS, + max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT, }; + if let Some(max_concurrent_exports) = env::var(OTEL_BSP_MAX_CONCURRENT_EXPORTS) + .ok() + .and_then(|max_concurrent_exports| usize::from_str(&max_concurrent_exports).ok()) + { + config.max_concurrent_exports = max_concurrent_exports; + } + if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE) .ok() .and_then(|queue_size| usize::from_str(&queue_size).ok())