From 29cbb7d5056ae00d137e97218ba5d0457b12d755 Mon Sep 17 00:00:00 2001 From: Julian Tescher Date: Fri, 24 Jun 2022 19:05:19 -0700 Subject: [PATCH] opentelemetry: Update otel to 0.18.0 ## Motivation Support the latest OpenTelemetry specification. ## Solution Update `opentelemetry` to the latest `0.18.x` release. Breaking changes in the metrics spec have removed value recorders and added histograms so the metrics layer's `value.` prefix has been changed to `histogram.` and behaves accordingly. Additionally the `PushController` configuration for the metrics layer has been simplified to accept a `BasicController` that can act in either push or pull modes. Finally trace sampling in the sdk's `PreSampledTracer` impl has been updated to match the sampling logic in https://github.com/open-telemetry/opentelemetry-rust/pull/839. --- tracing-opentelemetry/Cargo.toml | 10 +- tracing-opentelemetry/src/metrics.rs | 93 ++++--- tracing-opentelemetry/src/subscriber.rs | 109 ++++---- tracing-opentelemetry/src/tracer.rs | 26 +- .../tests/metrics_publishing.rs | 248 +++++++++--------- .../tests/trace_state_propagation.rs | 21 +- 6 files changed, 260 insertions(+), 247 deletions(-) diff --git a/tracing-opentelemetry/Cargo.toml b/tracing-opentelemetry/Cargo.toml index ec7d4a8606..36e51e5a96 100644 --- a/tracing-opentelemetry/Cargo.toml +++ b/tracing-opentelemetry/Cargo.toml @@ -25,20 +25,16 @@ default = ["tracing-log", "metrics"] metrics = ["opentelemetry/metrics"] [dependencies] -opentelemetry = { version = "0.17.0", default-features = false, features = ["trace"] } +opentelemetry = { version = "0.18.0", default-features = false, features = ["trace"] } tracing = { path = "../tracing", version = "0.2", default-features = false, features = ["std"] } tracing-core = { path = "../tracing-core", version = "0.2" } tracing-subscriber = { path = "../tracing-subscriber", version = "0.3", default-features = false, features = ["registry", "std"] } tracing-log = { path = "../tracing-log", version = "0.2", default-features = false, optional = true } once_cell = "1.13.0" -# Fix minimal-versions; opentelemetry specifies async-trait = "0.1" which breaks -async-trait = "0.1.20" - [dev-dependencies] -async-trait = "0.1.56" criterion = { version = "0.3.6", default_features = false } -opentelemetry-jaeger = "0.16.0" +opentelemetry-jaeger = "0.17.0" futures-util = { version = "0.3.21", default-features = false } tokio = { version = "1.20.0", features = ["full"] } tokio-stream = "0.1.9" @@ -52,4 +48,4 @@ harness = false [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] \ No newline at end of file +rustdoc-args = ["--cfg", "docsrs"] diff --git a/tracing-opentelemetry/src/metrics.rs b/tracing-opentelemetry/src/metrics.rs index 76c0ed2d37..147a9f8833 100644 --- a/tracing-opentelemetry/src/metrics.rs +++ b/tracing-opentelemetry/src/metrics.rs @@ -3,8 +3,9 @@ use tracing::{field::Visit, Collect}; use tracing_core::Field; use opentelemetry::{ - metrics::{Counter, Meter, MeterProvider, UpDownCounter, ValueRecorder}, - sdk::metrics::PushController, + metrics::{Counter, Histogram, Meter, MeterProvider, UpDownCounter}, + sdk::metrics::controllers::BasicController, + Context as OtelContext, }; use tracing_subscriber::{registry::LookupSpan, subscribe::Context, Subscribe}; @@ -13,7 +14,7 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry"; const METRIC_PREFIX_MONOTONIC_COUNTER: &str = "monotonic_counter."; const METRIC_PREFIX_COUNTER: &str = "counter."; -const METRIC_PREFIX_VALUE: &str = "value."; +const METRIC_PREFIX_HISTOGRAM: &str = "histogram."; const I64_MAX: u64 = i64::MAX as u64; #[derive(Default)] @@ -22,9 +23,9 @@ pub(crate) struct Instruments { f64_counter: MetricsMap>, i64_up_down_counter: MetricsMap>, f64_up_down_counter: MetricsMap>, - u64_value_recorder: MetricsMap>, - i64_value_recorder: MetricsMap>, - f64_value_recorder: MetricsMap>, + u64_histogram: MetricsMap>, + i64_histogram: MetricsMap>, + f64_histogram: MetricsMap>, } type MetricsMap = RwLock>; @@ -35,14 +36,15 @@ pub(crate) enum InstrumentType { CounterF64(f64), UpDownCounterI64(i64), UpDownCounterF64(f64), - ValueRecorderU64(u64), - ValueRecorderI64(i64), - ValueRecorderF64(f64), + HistogramU64(u64), + HistogramI64(i64), + HistogramF64(f64), } impl Instruments { pub(crate) fn update_metric( &self, + cx: &OtelContext, meter: &Meter, instrument_type: InstrumentType, metric_name: &'static str, @@ -76,7 +78,7 @@ impl Instruments { &self.u64_counter, metric_name, || meter.u64_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::CounterF64(value) => { @@ -84,7 +86,7 @@ impl Instruments { &self.f64_counter, metric_name, || meter.f64_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::UpDownCounterI64(value) => { @@ -92,7 +94,7 @@ impl Instruments { &self.i64_up_down_counter, metric_name, || meter.i64_up_down_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } InstrumentType::UpDownCounterF64(value) => { @@ -100,31 +102,31 @@ impl Instruments { &self.f64_up_down_counter, metric_name, || meter.f64_up_down_counter(metric_name).init(), - |ctr| ctr.add(value, &[]), + |ctr| ctr.add(cx, value, &[]), ); } - InstrumentType::ValueRecorderU64(value) => { + InstrumentType::HistogramU64(value) => { update_or_insert( - &self.u64_value_recorder, + &self.u64_histogram, metric_name, - || meter.u64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.u64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } - InstrumentType::ValueRecorderI64(value) => { + InstrumentType::HistogramI64(value) => { update_or_insert( - &self.i64_value_recorder, + &self.i64_histogram, metric_name, - || meter.i64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.i64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } - InstrumentType::ValueRecorderF64(value) => { + InstrumentType::HistogramF64(value) => { update_or_insert( - &self.f64_value_recorder, + &self.f64_histogram, metric_name, - || meter.f64_value_recorder(metric_name).init(), - |rec| rec.record(value, &[]), + || meter.f64_histogram(metric_name).init(), + |rec| rec.record(cx, value, &[]), ); } }; @@ -142,8 +144,10 @@ impl<'a> Visit for MetricVisitor<'a> { } fn record_u64(&mut self, field: &Field, value: u64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterU64(value), metric_name, @@ -151,6 +155,7 @@ impl<'a> Visit for MetricVisitor<'a> { } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { if value <= I64_MAX { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterI64(value as i64), metric_name, @@ -163,54 +168,63 @@ impl<'a> Visit for MetricVisitor<'a> { value ); } - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderU64(value), + InstrumentType::HistogramU64(value), metric_name, ); } } fn record_f64(&mut self, field: &Field, value: f64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterF64(value), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterF64(value), metric_name, ); - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderF64(value), + InstrumentType::HistogramF64(value), metric_name, ); } } fn record_i64(&mut self, field: &Field, value: i64) { + let cx = OtelContext::current(); if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_MONOTONIC_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::CounterU64(value as u64), metric_name, ); } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_COUNTER) { self.instruments.update_metric( + &cx, self.meter, InstrumentType::UpDownCounterI64(value), metric_name, ); - } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_VALUE) { + } else if let Some(metric_name) = field.name().strip_prefix(METRIC_PREFIX_HISTOGRAM) { self.instruments.update_metric( + &cx, self.meter, - InstrumentType::ValueRecorderI64(value), + InstrumentType::HistogramI64(value), metric_name, ); } @@ -232,14 +246,14 @@ impl<'a> Visit for MetricVisitor<'a> { /// use tracing_opentelemetry::MetricsSubscriber; /// use tracing_subscriber::subscribe::CollectExt; /// use tracing_subscriber::Registry; -/// # use opentelemetry::sdk::metrics::PushController; +/// # use opentelemetry::sdk::metrics::controllers::BasicController; /// -/// // Constructing a PushController is out-of-scope for the docs here, but there +/// // Constructing a BasicController is out-of-scope for the docs here, but there /// // are examples in the opentelemetry repository. See: -/// // https://github.com/open-telemetry/opentelemetry-rust/blob/c13a11e62a68eacd8c41a0742a0d097808e28fbd/examples/basic-otlp/src/main.rs#L39-L53 -/// # let push_controller: PushController = unimplemented!(); +/// // https://github.com/open-telemetry/opentelemetry-rust/blob/d4b9befea04bcc7fc19319a6ebf5b5070131c486/examples/basic-otlp/src/main.rs#L35-L52 +/// # let controller: BasicController = unimplemented!(); /// -/// let opentelemetry_metrics = MetricsSubscriber::new(push_controller); +/// let opentelemetry_metrics = MetricsSubscriber::new(controller); /// let collector = Registry::default().with(opentelemetry_metrics); /// tracing::collect::set_global_default(collector).unwrap(); /// ``` @@ -328,10 +342,9 @@ pub struct MetricsSubscriber { impl MetricsSubscriber { /// Create a new instance of MetricsSubscriber. - pub fn new(push_controller: PushController) -> Self { - let meter = push_controller - .provider() - .meter(INSTRUMENTATION_LIBRARY_NAME, Some(CARGO_PKG_VERSION)); + pub fn new(controller: BasicController) -> Self { + let meter = + controller.versioned_meter(INSTRUMENTATION_LIBRARY_NAME, Some(CARGO_PKG_VERSION), None); MetricsSubscriber { meter, instruments: Default::default(), diff --git a/tracing-opentelemetry/src/subscriber.rs b/tracing-opentelemetry/src/subscriber.rs index f0936271e8..6450b7f7bd 100644 --- a/tracing-opentelemetry/src/subscriber.rs +++ b/tracing-opentelemetry/src/subscriber.rs @@ -1,7 +1,7 @@ use crate::{OtelData, PreSampledTracer}; use once_cell::unsync; use opentelemetry::{ - trace::{self as otel, noop, TraceContextExt}, + trace::{self as otel, noop, OrderMap, TraceContextExt}, Context as OtelContext, Key, KeyValue, Value, }; use std::fmt; @@ -100,12 +100,11 @@ fn str_to_span_kind(s: &str) -> Option { } } -fn str_to_status_code(s: &str) -> Option { +fn str_to_status(s: &str) -> otel::Status { match s { - s if s.eq_ignore_ascii_case("unset") => Some(otel::StatusCode::Unset), - s if s.eq_ignore_ascii_case("ok") => Some(otel::StatusCode::Ok), - s if s.eq_ignore_ascii_case("error") => Some(otel::StatusCode::Error), - _ => None, + s if s.eq_ignore_ascii_case("ok") => otel::Status::Ok, + s if s.eq_ignore_ascii_case("error") => otel::Status::error(""), + _ => otel::Status::Unset, } } @@ -199,7 +198,7 @@ impl<'a> SpanAttributeVisitor<'a> { fn record(&mut self, attribute: KeyValue) { debug_assert!(self.0.attributes.is_some()); if let Some(v) = self.0.attributes.as_mut() { - v.push(attribute); + v.insert(attribute.key, attribute.value); } } } @@ -233,8 +232,8 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { match field.name() { SPAN_NAME_FIELD => self.0.name = value.to_string().into(), SPAN_KIND_FIELD => self.0.span_kind = str_to_span_kind(value), - SPAN_STATUS_CODE_FIELD => self.0.status_code = str_to_status_code(value), - SPAN_STATUS_MESSAGE_FIELD => self.0.status_message = Some(value.to_owned().into()), + SPAN_STATUS_CODE_FIELD => self.0.status = str_to_status(value), + SPAN_STATUS_MESSAGE_FIELD => self.0.status = otel::Status::error(value.to_string()), _ => self.record(KeyValue::new(field.name(), value.to_string())), } } @@ -247,11 +246,9 @@ impl<'a> field::Visit for SpanAttributeVisitor<'a> { match field.name() { SPAN_NAME_FIELD => self.0.name = format!("{:?}", value).into(), SPAN_KIND_FIELD => self.0.span_kind = str_to_span_kind(&format!("{:?}", value)), - SPAN_STATUS_CODE_FIELD => { - self.0.status_code = str_to_status_code(&format!("{:?}", value)) - } + SPAN_STATUS_CODE_FIELD => self.0.status = str_to_status(&format!("{:?}", value)), SPAN_STATUS_MESSAGE_FIELD => { - self.0.status_message = Some(format!("{:?}", value).into()) + self.0.status = otel::Status::error(format!("{:?}", value)) } _ => self.record(Key::new(field.name()).string(format!("{:?}", value))), } @@ -277,7 +274,7 @@ where /// use tracing_subscriber::Registry; /// /// // Create a jaeger exporter pipeline for a `trace_demo` service. - /// let tracer = opentelemetry_jaeger::new_pipeline() + /// let tracer = opentelemetry_jaeger::new_agent_pipeline() /// .with_service_name("trace_demo") /// .install_simple() /// .expect("Error initializing Jaeger exporter"); @@ -314,7 +311,7 @@ where /// use tracing_subscriber::Registry; /// /// // Create a jaeger exporter pipeline for a `trace_demo` service. - /// let tracer = opentelemetry_jaeger::new_pipeline() + /// let tracer = opentelemetry_jaeger::new_agent_pipeline() /// .with_service_name("trace_demo") /// .install_simple() /// .expect("Error initializing Jaeger exporter"); @@ -506,7 +503,7 @@ where builder.trace_id = Some(self.tracer.new_trace_id()); } - let builder_attrs = builder.attributes.get_or_insert(Vec::with_capacity( + let builder_attrs = builder.attributes.get_or_insert(OrderMap::with_capacity( attrs.fields().len() + self.extra_span_attrs(), )); @@ -514,26 +511,26 @@ where let meta = attrs.metadata(); if let Some(filename) = meta.file() { - builder_attrs.push(KeyValue::new("code.filepath", filename)); + builder_attrs.insert("code.filepath".into(), filename.into()); } if let Some(module) = meta.module_path() { - builder_attrs.push(KeyValue::new("code.namespace", module)); + builder_attrs.insert("code.namespace".into(), module.into()); } if let Some(line) = meta.line() { - builder_attrs.push(KeyValue::new("code.lineno", line as i64)); + builder_attrs.insert("code.lineno".into(), (line as i64).into()); } } if self.with_threads { - THREAD_ID.with(|id| builder_attrs.push(KeyValue::new("thread.id", **id as i64))); + THREAD_ID.with(|id| builder_attrs.insert("thread.id".into(), (**id as i64).into())); if let Some(name) = std::thread::current().name() { // TODO(eliza): it's a bummer that we have to allocate here, but // we can't easily get the string as a `static`. it would be // nice if `opentelemetry` could also take `Arc`s as // `String` values... - builder_attrs.push(KeyValue::new("thread.name", name.to_owned())); + builder_attrs.insert("thread.name".into(), name.to_owned().into()); } } @@ -653,8 +650,10 @@ where let mut extensions = span.extensions_mut(); if let Some(OtelData { builder, .. }) = extensions.get_mut::() { - if builder.status_code.is_none() && *meta.level() == tracing_core::Level::ERROR { - builder.status_code = Some(otel::StatusCode::Error); + if builder.status == otel::Status::Unset + && *meta.level() == tracing_core::Level::ERROR + { + builder.status = otel::Status::error("") } if self.location { @@ -712,15 +711,14 @@ where if self.tracked_inactivity { // Append busy/idle timings when enabled. if let Some(timings) = extensions.get_mut::() { - let busy_ns = KeyValue::new("busy_ns", timings.busy); - let idle_ns = KeyValue::new("idle_ns", timings.idle); - - if let Some(ref mut attributes) = builder.attributes { - attributes.push(busy_ns); - attributes.push(idle_ns); - } else { - builder.attributes = Some(vec![busy_ns, idle_ns]); - } + let busy_ns = Key::new("busy_ns"); + let idle_ns = Key::new("idle_ns"); + + let attributes = builder + .attributes + .get_or_insert_with(|| OrderMap::with_capacity(2)); + attributes.insert(busy_ns, timings.busy.into()); + attributes.insert(idle_ns, timings.idle.into()); } } @@ -773,7 +771,7 @@ fn thread_id_integer(id: thread::ThreadId) -> u64 { mod tests { use super::*; use crate::OtelData; - use opentelemetry::trace::{noop, SpanKind, TraceFlags}; + use opentelemetry::trace::{noop, TraceFlags}; use std::{ borrow::Cow, collections::HashMap, @@ -849,7 +847,7 @@ mod tests { false } fn set_attribute(&mut self, _attribute: KeyValue) {} - fn set_status(&mut self, _code: otel::StatusCode, _message: String) {} + fn set_status(&mut self, _status: otel::Status) {} fn update_name>>(&mut self, _new_name: T) {} fn end_with_timestamp(&mut self, _timestamp: SystemTime) {} } @@ -881,7 +879,7 @@ mod tests { tracing_subscriber::registry().with(subscriber().with_tracer(tracer.clone())); tracing::collect::with_default(subscriber, || { - tracing::debug_span!("request", otel.kind = %SpanKind::Server); + tracing::debug_span!("request", otel.kind = "server"); }); let recorded_kind = tracer.with_data(|data| data.builder.span_kind.clone()); @@ -895,11 +893,19 @@ mod tests { tracing_subscriber::registry().with(subscriber().with_tracer(tracer.clone())); tracing::collect::with_default(subscriber, || { - tracing::debug_span!("request", otel.status_code = ?otel::StatusCode::Ok); + tracing::debug_span!("request", otel.status_code = ?otel::Status::Ok); }); + let recorded_status = tracer + .0 + .lock() + .unwrap() + .as_ref() + .unwrap() + .builder + .status + .clone(); - let recorded_status_code = tracer.with_data(|data| data.builder.status_code); - assert_eq!(recorded_status_code, Some(otel::StatusCode::Ok)) + assert_eq!(recorded_status, otel::Status::Ok) } #[test] @@ -914,8 +920,17 @@ mod tests { tracing::debug_span!("request", otel.status_message = message); }); - let recorded_status_message = tracer.with_data(|data| data.builder.status_message.clone()); - assert_eq!(recorded_status_message, Some(message.into())) + let recorded_status_message = tracer + .0 + .lock() + .unwrap() + .as_ref() + .unwrap() + .builder + .status + .clone(); + + assert_eq!(recorded_status_message, otel::Status::error(message)) } #[test] @@ -934,7 +949,7 @@ mod tests { let _g = existing_cx.attach(); tracing::collect::with_default(subscriber, || { - tracing::debug_span!("request", otel.kind = %SpanKind::Server); + tracing::debug_span!("request", otel.kind = "server"); }); let recorded_trace_id = @@ -958,7 +973,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(keys.contains(&"idle_ns")); assert!(keys.contains(&"busy_ns")); @@ -977,7 +992,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(keys.contains(&"code.filepath")); assert!(keys.contains(&"code.namespace")); @@ -1000,7 +1015,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(!keys.contains(&"code.filepath")); assert!(!keys.contains(&"code.namespace")); @@ -1012,7 +1027,7 @@ mod tests { let thread = thread::current(); let expected_name = thread .name() - .map(|name| Value::String(Cow::Owned(name.to_owned()))); + .map(|name| Value::String(name.to_owned().into())); let expected_id = Value::I64(thread_id_integer(thread.id()) as i64); let tracer = TestTracer(Arc::new(Mutex::new(None))); @@ -1026,7 +1041,7 @@ mod tests { let attributes = tracer .with_data(|data| data.builder.attributes.as_ref().unwrap().clone()) .drain(..) - .map(|keyval| (keyval.key.as_str().to_string(), keyval.value)) + .map(|(key, value)| (key.as_str().to_string(), value)) .collect::>(); assert_eq!(attributes.get("thread.name"), expected_name.as_ref()); assert_eq!(attributes.get("thread.id"), Some(&expected_id)); @@ -1045,7 +1060,7 @@ mod tests { let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone()); let keys = attributes .iter() - .map(|attr| attr.key.as_str()) + .map(|(key, _)| key.as_str()) .collect::>(); assert!(!keys.contains(&"thread.name")); assert!(!keys.contains(&"thread.id")); diff --git a/tracing-opentelemetry/src/tracer.rs b/tracing-opentelemetry/src/tracer.rs index 23e574450d..52513c36ad 100644 --- a/tracing-opentelemetry/src/tracer.rs +++ b/tracing-opentelemetry/src/tracer.rs @@ -1,9 +1,10 @@ -use opentelemetry::sdk::trace::{SamplingDecision, SamplingResult, Tracer, TracerProvider}; +use opentelemetry::sdk::trace::{Tracer, TracerProvider}; +use opentelemetry::trace::OrderMap; use opentelemetry::{ trace as otel, trace::{ - noop, SpanBuilder, SpanContext, SpanId, SpanKind, TraceContextExt, TraceFlags, TraceId, - TraceState, + noop, SamplingDecision, SamplingResult, SpanBuilder, SpanContext, SpanId, SpanKind, + TraceContextExt, TraceFlags, TraceId, TraceState, }, Context as OtelContext, }; @@ -73,19 +74,18 @@ impl PreSampledTracer for Tracer { let builder = &mut data.builder; // Gather trace state - let (no_parent, trace_id, remote_parent, parent_trace_flags) = - current_trace_state(builder, parent_cx, &provider); + let (trace_id, parent_trace_flags) = current_trace_state(builder, parent_cx, &provider); // Sample or defer to existing sampling decisions let (flags, trace_state) = if let Some(result) = &builder.sampling_result { process_sampling_result(result, parent_trace_flags) - } else if no_parent || remote_parent { + } else { builder.sampling_result = Some(provider.config().sampler.should_sample( Some(parent_cx), trace_id, &builder.name, builder.span_kind.as_ref().unwrap_or(&SpanKind::Internal), - builder.attributes.as_deref().unwrap_or(&[]), + builder.attributes.as_ref().unwrap_or(&OrderMap::default()), builder.links.as_deref().unwrap_or(&[]), self.instrumentation_library(), )); @@ -94,12 +94,6 @@ impl PreSampledTracer for Tracer { builder.sampling_result.as_ref().unwrap(), parent_trace_flags, ) - } else { - // has parent that is local - Some(( - parent_trace_flags, - parent_cx.span().span_context().trace_state().clone(), - )) } .unwrap_or_default(); @@ -125,18 +119,16 @@ fn current_trace_state( builder: &SpanBuilder, parent_cx: &OtelContext, provider: &TracerProvider, -) -> (bool, TraceId, bool, TraceFlags) { +) -> (TraceId, TraceFlags) { if parent_cx.has_active_span() { let span = parent_cx.span(); let sc = span.span_context(); - (false, sc.trace_id(), sc.is_remote(), sc.trace_flags()) + (sc.trace_id(), sc.trace_flags()) } else { ( - true, builder .trace_id .unwrap_or_else(|| provider.config().id_generator.new_trace_id()), - false, Default::default(), ) } diff --git a/tracing-opentelemetry/tests/metrics_publishing.rs b/tracing-opentelemetry/tests/metrics_publishing.rs index 9b8d8a6bab..c3a143987b 100644 --- a/tracing-opentelemetry/tests/metrics_publishing.rs +++ b/tracing-opentelemetry/tests/metrics_publishing.rs @@ -1,26 +1,21 @@ -#![cfg(feature = "metrics")] -use async_trait::async_trait; -use futures_util::{Stream, StreamExt as _}; use opentelemetry::{ - metrics::{Descriptor, InstrumentKind}, - metrics::{Number, NumberKind}, + metrics::MetricsError, sdk::{ - export::{ - metrics::{ - CheckpointSet, ExportKind, ExportKindFor, ExportKindSelector, - Exporter as MetricsExporter, Points, Sum, - }, - trace::{SpanData, SpanExporter}, + export::metrics::{ + aggregation::{self, Histogram, Sum, TemporalitySelector}, + InstrumentationLibraryReader, }, metrics::{ - aggregators::{ArrayAggregator, SumAggregator}, - selectors::simple::Selector, + aggregators::{HistogramAggregator, SumAggregator}, + controllers::BasicController, + processors, + sdk_api::{Descriptor, InstrumentKind, Number, NumberKind}, + selectors, }, }, - Key, Value, + Context, }; use std::cmp::Ordering; -use std::time::Duration; use tracing::Collect; use tracing_opentelemetry::MetricsSubscriber; use tracing_subscriber::prelude::*; @@ -30,7 +25,7 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "tracing/tracing-opentelemetry"; #[tokio::test] async fn u64_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "hello_world".to_string(), InstrumentKind::Counter, NumberKind::U64, @@ -40,11 +35,13 @@ async fn u64_counter_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world = 1_u64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn u64_counter_is_exported_i64_at_instrumentation_point() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "hello_world2".to_string(), InstrumentKind::Counter, NumberKind::U64, @@ -54,11 +51,13 @@ async fn u64_counter_is_exported_i64_at_instrumentation_point() { tracing::collect::with_default(subscriber, || { tracing::info!(monotonic_counter.hello_world2 = 1_i64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn f64_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "float_hello_world".to_string(), InstrumentKind::Counter, NumberKind::F64, @@ -68,11 +67,13 @@ async fn f64_counter_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(monotonic_counter.float_hello_world = 1.000000123_f64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn i64_up_down_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak".to_string(), InstrumentKind::UpDownCounter, NumberKind::I64, @@ -82,11 +83,13 @@ async fn i64_up_down_counter_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(counter.pebcak = -5_i64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak2".to_string(), InstrumentKind::UpDownCounter, NumberKind::I64, @@ -96,11 +99,13 @@ async fn i64_up_down_counter_is_exported_u64_at_instrumentation_point() { tracing::collect::with_default(subscriber, || { tracing::info!(counter.pebcak2 = 5_u64); }); + + exporter.export().unwrap(); } #[tokio::test] async fn f64_up_down_counter_is_exported() { - let subscriber = init_subscriber( + let (subscriber, exporter) = init_subscriber( "pebcak_blah".to_string(), InstrumentKind::UpDownCounter, NumberKind::F64, @@ -110,13 +115,15 @@ async fn f64_up_down_counter_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(counter.pebcak_blah = 99.123_f64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn u64_value_is_exported() { - let subscriber = init_subscriber( +async fn u64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::U64, Number::from(9_u64), ); @@ -124,13 +131,15 @@ async fn u64_value_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(value.abcdefg = 9_u64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn i64_value_is_exported() { - let subscriber = init_subscriber( +async fn i64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg_auenatsou".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::I64, Number::from(-19_i64), ); @@ -138,13 +147,15 @@ async fn i64_value_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(value.abcdefg_auenatsou = -19_i64); }); + + exporter.export().unwrap(); } #[tokio::test] -async fn f64_value_is_exported() { - let subscriber = init_subscriber( +async fn f64_histogram_is_exported() { + let (subscriber, exporter) = init_subscriber( "abcdefg_racecar".to_string(), - InstrumentKind::ValueRecorder, + InstrumentKind::Histogram, NumberKind::F64, Number::from(777.0012_f64), ); @@ -152,6 +163,8 @@ async fn f64_value_is_exported() { tracing::collect::with_default(subscriber, || { tracing::info!(value.abcdefg_racecar = 777.0012_f64); }); + + exporter.export().unwrap(); } fn init_subscriber( @@ -159,24 +172,25 @@ fn init_subscriber( expected_instrument_kind: InstrumentKind, expected_number_kind: NumberKind, expected_value: Number, -) -> impl Collect + 'static { +) -> (impl Collect + 'static, TestExporter) { + let controller = opentelemetry::sdk::metrics::controllers::basic(processors::factory( + selectors::simple::histogram(vec![-10.0, 100.0]), + aggregation::cumulative_temporality_selector(), + )) + .build(); + let exporter = TestExporter { expected_metric_name, expected_instrument_kind, expected_number_kind, expected_value, + controller: controller.clone(), }; - let push_controller = opentelemetry::sdk::metrics::controllers::push( - Selector::Exact, - ExportKindSelector::Stateless, + ( + tracing_subscriber::registry().with(MetricsSubscriber::new(controller)), exporter, - tokio::spawn, - delayed_interval, ) - .build(); - - tracing_subscriber::registry().with(MetricsSubscriber::new(push_controller)) } #[derive(Clone, Debug)] @@ -185,100 +199,84 @@ struct TestExporter { expected_instrument_kind: InstrumentKind, expected_number_kind: NumberKind, expected_value: Number, + controller: BasicController, } -#[async_trait] -impl SpanExporter for TestExporter { - async fn export( - &mut self, - mut _batch: Vec, - ) -> opentelemetry::sdk::export::trace::ExportResult { - Ok(()) - } -} +impl TestExporter { + fn export(&self) -> Result<(), MetricsError> { + self.controller.collect(&Context::current())?; + self.controller.try_for_each(&mut |library, reader| { + reader.try_for_each(self, &mut |record| { + assert_eq!(self.expected_metric_name, record.descriptor().name()); + assert_eq!( + self.expected_instrument_kind, + *record.descriptor().instrument_kind() + ); + assert_eq!( + self.expected_number_kind, + *record.descriptor().number_kind() + ); + match self.expected_instrument_kind { + InstrumentKind::Counter | InstrumentKind::UpDownCounter => { + let number = record + .aggregator() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .sum() + .unwrap(); + + assert_eq!( + Ordering::Equal, + number + .partial_cmp(&NumberKind::U64, &self.expected_value) + .unwrap() + ); + } + InstrumentKind::Histogram => { + let histogram = record + .aggregator() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .histogram() + .unwrap(); + + let counts = histogram.counts(); + if dbg!(self.expected_value.to_i64(&self.expected_number_kind)) > 100 { + assert_eq!(counts, &[0.0, 0.0, 1.0]); + } else if self.expected_value.to_i64(&self.expected_number_kind) > 0 { + assert_eq!(counts, &[0.0, 1.0, 0.0]); + } else { + assert_eq!(counts, &[1.0, 0.0, 0.0]); + } + } + _ => panic!( + "InstrumentKind {:?} not currently supported!", + self.expected_instrument_kind + ), + }; -impl MetricsExporter for TestExporter { - fn export(&self, checkpoint_set: &mut dyn CheckpointSet) -> opentelemetry::metrics::Result<()> { - checkpoint_set.try_for_each(self, &mut |record| { - assert_eq!(self.expected_metric_name, record.descriptor().name()); - assert_eq!( - self.expected_instrument_kind, - *record.descriptor().instrument_kind() - ); - assert_eq!( - self.expected_number_kind, - *record.descriptor().number_kind() - ); - let number = match self.expected_instrument_kind { - InstrumentKind::Counter | InstrumentKind::UpDownCounter => record - .aggregator() - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .sum() - .unwrap(), - InstrumentKind::ValueRecorder => record - .aggregator() - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .points() - .unwrap()[0] - .clone(), - _ => panic!( - "InstrumentKind {:?} not currently supported!", - self.expected_instrument_kind - ), - }; - assert_eq!( - Ordering::Equal, - number - .partial_cmp(&NumberKind::U64, &self.expected_value) - .unwrap() - ); - - // The following are the same regardless of the individual metric. - assert_eq!( - INSTRUMENTATION_LIBRARY_NAME, - record.descriptor().instrumentation_library().name - ); - assert_eq!( - CARGO_PKG_VERSION, - record.descriptor().instrumentation_version().unwrap() - ); - assert_eq!( - Value::String("unknown_service".into()), - record - .resource() - .get(Key::new("service.name".to_string())) - .unwrap() - ); - - opentelemetry::metrics::Result::Ok(()) + // The following are the same regardless of the individual metric. + assert_eq!(INSTRUMENTATION_LIBRARY_NAME, library.name); + assert_eq!(CARGO_PKG_VERSION, library.version.as_ref().unwrap()); + + Ok(()) + }) }) } } -impl ExportKindFor for TestExporter { - fn export_kind_for(&self, _descriptor: &Descriptor) -> ExportKind { +impl TemporalitySelector for TestExporter { + fn temporality_for( + &self, + _descriptor: &Descriptor, + _kind: &aggregation::AggregationKind, + ) -> aggregation::Temporality { // I don't think the value here makes a difference since // we are just testing a single metric. - ExportKind::Cumulative + aggregation::Temporality::Cumulative } } - -// From opentelemetry::sdk::util:: -// For some reason I can't pull it in from the other crate, it gives -// could not find `util` in `sdk` -/// Helper which wraps `tokio::time::interval` and makes it return a stream -fn tokio_interval_stream(period: std::time::Duration) -> tokio_stream::wrappers::IntervalStream { - tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(period)) -} - -// https://github.com/open-telemetry/opentelemetry-rust/blob/2585d109bf90d53d57c91e19c758dca8c36f5512/examples/basic-otlp/src/main.rs#L34-L37 -// Skip first immediate tick from tokio, not needed for async_std. -fn delayed_interval(duration: Duration) -> impl Stream { - tokio_interval_stream(duration).skip(0) -} diff --git a/tracing-opentelemetry/tests/trace_state_propagation.rs b/tracing-opentelemetry/tests/trace_state_propagation.rs index 5382411b50..bc80e1364b 100644 --- a/tracing-opentelemetry/tests/trace_state_propagation.rs +++ b/tracing-opentelemetry/tests/trace_state_propagation.rs @@ -1,8 +1,8 @@ -use async_trait::async_trait; +use futures_util::future::BoxFuture; use opentelemetry::{ propagation::TextMapPropagator, sdk::{ - export::trace::{SpanData, SpanExporter}, + export::trace::{ExportResult, SpanData, SpanExporter}, propagation::{BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator}, trace::{Tracer, TracerProvider}, }, @@ -158,15 +158,14 @@ fn build_sampled_context() -> (Context, impl Collect, TestExporter, TracerProvid #[derive(Clone, Default, Debug)] struct TestExporter(Arc>>); -#[async_trait] impl SpanExporter for TestExporter { - async fn export( - &mut self, - mut batch: Vec, - ) -> opentelemetry::sdk::export::trace::ExportResult { - if let Ok(mut inner) = self.0.lock() { - inner.append(&mut batch); - } - Ok(()) + fn export(&mut self, mut batch: Vec) -> BoxFuture<'static, ExportResult> { + let spans = self.0.clone(); + Box::pin(async move { + if let Ok(mut inner) = spans.lock() { + inner.append(&mut batch); + } + Ok(()) + }) } }