Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opentelemetry: add support for thread names/ids #2134

Merged
merged 5 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tracing-opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tracing = { path = "../tracing", version = "0.2", default-features = false, feat
tracing-core = { path = "../tracing-core", version = "0.2" }
tracing-subscriber = { path = "../tracing-subscriber", version = "0.3", default-features = false, features = ["registry", "std"] }
tracing-log = { path = "../tracing-log", version = "0.2", default-features = false, optional = true }
once_cell = "1"

[dev-dependencies]
async-trait = "0.1"
Expand Down
230 changes: 149 additions & 81 deletions tracing-opentelemetry/src/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::{OtelData, PreSampledTracer};
use once_cell::unsync;
use opentelemetry::{
trace::{self as otel, noop, TraceContextExt},
Context as OtelContext, Key, KeyValue, Value,
};
use std::fmt;
use std::marker;
use std::thread;
use std::time::{Instant, SystemTime};
use std::{any::TypeId, ptr::NonNull};
use tracing_core::span::{self, Attributes, Id, Record};
Expand All @@ -29,6 +31,7 @@ pub struct OpenTelemetrySubscriber<C, T> {
tracer: T,
location: bool,
tracked_inactivity: bool,
with_threads: bool,
get_context: WithContext,
_registry: marker::PhantomData<C>,
}
Expand Down Expand Up @@ -292,6 +295,7 @@ where
tracer,
location: true,
tracked_inactivity: true,
with_threads: true,
get_context: WithContext(Self::get_context),
_registry: marker::PhantomData,
}
Expand Down Expand Up @@ -331,23 +335,34 @@ where
tracer,
location: self.location,
tracked_inactivity: self.tracked_inactivity,
with_threads: self.with_threads,
get_context: WithContext(OpenTelemetrySubscriber::<C, Tracer>::get_context),
_registry: self._registry,
}
}

/// Sets whether or not span and event metadata should include detailed
/// location information, such as the file, module and line number.
/// Sets whether or not span and event metadata should include OpenTelemetry
/// attributes with location information, such as the file, module and line number.
///
/// These attributes follow the [OpenTelemetry semantic conventions for
/// source locations][conv].
///
/// By default, locations are enabled.
///
/// [conv]: https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/span-general/#source-code-attributes
pub fn with_location(self, location: bool) -> Self {
Self { location, ..self }
}

/// Sets whether or not event span's metadata should include detailed location
/// information, such as the file, module and line number.
/// Sets whether or not span and event metadata should include OpenTelemetry
/// attributes with location information, such as the file, module and line number.
///
/// By default, event locations are enabled.
/// These attributes follow the [OpenTelemetry semantic conventions for
/// source locations][conv].
///
/// By default, locations are enabled.
///
/// [conv]: https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/span-general/#source-code-attributes
#[deprecated(
since = "0.17.3",
note = "renamed to `OpenTelemetrySubscriber::with_location`"
Expand All @@ -369,6 +384,20 @@ where
}
}

/// Sets whether or not spans record additional attributes for the thread
/// name and thread ID of the thread they were created on, following the
/// [OpenTelemetry semantic conventions for threads][conv].
///
/// By default, thread attributes are enabled.
///
/// [conv]: https://opentelemetry.io/docs/reference/specification/trace/semantic_conventions/span-general/#general-thread-attributes
pub fn with_threads(self, threads: bool) -> Self {
Self {
with_threads: threads,
..self
}
}

/// Retrieve the parent OpenTelemetry [`Context`] from the current tracing
/// [`span`] through the [`Registry`]. This [`Context`] links spans to their
/// parent for proper hierarchical visualization.
Expand Down Expand Up @@ -421,6 +450,30 @@ where
f(builder, &subscriber.tracer);
}
}

fn extra_span_attrs(&self) -> usize {
let mut extra_attrs = 0;
if self.location {
extra_attrs += 3;
}
if self.with_threads {
extra_attrs += 2;
}
extra_attrs
}
}

thread_local! {
static THREAD_ID: unsync::Lazy<u64> = unsync::Lazy::new(|| {
// OpenTelemetry's semantic conventions require the thread ID to be
// recorded as an integer, but `std::thread::ThreadId` does not expose
// the integer value on stable, so we have to convert it to a `usize` by
// parsing it. Since this requires allocating a `String`, store it in a
// thread local so we only have to do this once.
// TODO(eliza): once `std::thread::ThreadId::as_u64` is stabilized
// (https://github.com/rust-lang/rust/issues/67939), just use that.
thread_id_integer(thread::current().id())
});
}

impl<C, T> Subscribe<C> for OpenTelemetrySubscriber<C, T>
Expand Down Expand Up @@ -453,9 +506,9 @@ where
builder.trace_id = Some(self.tracer.new_trace_id());
}

let builder_attrs = builder
.attributes
.get_or_insert(Vec::with_capacity(attrs.fields().len() + 3));
let builder_attrs = builder.attributes.get_or_insert(Vec::with_capacity(
attrs.fields().len() + self.extra_span_attrs(),
));

if self.location {
let meta = attrs.metadata();
Expand All @@ -473,6 +526,17 @@ where
}
}

if self.with_threads {
THREAD_ID.with(|id| builder_attrs.push(KeyValue::new("thread.id", **id as i64)));
if let Some(name) = std::thread::current().name() {
// TODO(eliza): it's a bummer that we have to allocate here, but
// we can't easily get the string as a `static`. it would be
// nice if `opentelemetry` could also take `Arc<str>`s as
// `String` values...
builder_attrs.push(KeyValue::new("thread.name", name.to_owned()));
}
}

attrs.record(&mut SpanAttributeVisitor(&mut builder));
extensions.insert(OtelData { builder, parent_cx });
}
Expand Down Expand Up @@ -696,14 +760,27 @@ impl Timings {
}
}

fn thread_id_integer(id: thread::ThreadId) -> u64 {
let thread_id = format!("{:?}", id);
thread_id
.trim_start_matches("ThreadId(")
.trim_end_matches(')')
.parse::<u64>()
.expect("thread ID should parse as an integer")
}

#[cfg(test)]
mod tests {
use super::*;
use crate::OtelData;
use opentelemetry::trace::{noop, SpanKind, TraceFlags};
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use std::{
borrow::Cow,
collections::HashMap,
sync::{Arc, Mutex},
thread,
time::SystemTime,
};
use tracing_subscriber::prelude::*;

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -747,6 +824,14 @@ mod tests {
}
}

impl TestTracer {
fn with_data<T>(&self, f: impl FnOnce(&OtelData) -> T) -> T {
let lock = self.0.lock().unwrap();
let data = lock.as_ref().expect("no span data has been recorded yet");
f(data)
}
}

#[derive(Debug, Clone)]
struct TestSpan(otel::SpanContext);
impl otel::Span for TestSpan {
Expand Down Expand Up @@ -799,15 +884,7 @@ mod tests {
tracing::debug_span!("request", otel.kind = %SpanKind::Server);
});

let recorded_kind = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.span_kind
.clone();
let recorded_kind = tracer.with_data(|data| data.builder.span_kind.clone());
assert_eq!(recorded_kind, Some(otel::SpanKind::Server))
}

Expand All @@ -820,14 +897,8 @@ mod tests {
tracing::collect::with_default(subscriber, || {
tracing::debug_span!("request", otel.status_code = ?otel::StatusCode::Ok);
});
let recorded_status_code = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.status_code;

let recorded_status_code = tracer.with_data(|data| data.builder.status_code);
assert_eq!(recorded_status_code, Some(otel::StatusCode::Ok))
}

Expand All @@ -843,16 +914,7 @@ mod tests {
tracing::debug_span!("request", otel.status_message = message);
});

let recorded_status_message = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.status_message
.clone();

let recorded_status_message = tracer.with_data(|data| data.builder.status_message.clone());
assert_eq!(recorded_status_message, Some(message.into()))
}

Expand All @@ -875,16 +937,8 @@ mod tests {
tracing::debug_span!("request", otel.kind = %SpanKind::Server);
});

let recorded_trace_id = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.parent_cx
.span()
.span_context()
.trace_id();
let recorded_trace_id =
tracer.with_data(|data| data.parent_cx.span().span_context().trace_id());
assert_eq!(recorded_trace_id, trace_id)
}

Expand All @@ -901,17 +955,7 @@ mod tests {
tracing::debug_span!("request");
});

let attributes = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.attributes
.as_ref()
.unwrap()
.clone();
let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone());
let keys = attributes
.iter()
.map(|attr| attr.key.as_str())
Expand All @@ -930,17 +974,7 @@ mod tests {
tracing::debug_span!("request");
});

let attributes = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.attributes
.as_ref()
.unwrap()
.clone();
let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone());
let keys = attributes
.iter()
.map(|attr| attr.key.as_str())
Expand All @@ -963,17 +997,7 @@ mod tests {
tracing::debug_span!("request");
});

let attributes = tracer
.0
.lock()
.unwrap()
.as_ref()
.unwrap()
.builder
.attributes
.as_ref()
.unwrap()
.clone();
let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone());
let keys = attributes
.iter()
.map(|attr| attr.key.as_str())
Expand All @@ -982,4 +1006,48 @@ mod tests {
assert!(!keys.contains(&"code.namespace"));
assert!(!keys.contains(&"code.lineno"));
}

#[test]
fn includes_thread() {
let thread = thread::current();
let expected_name = thread
.name()
.map(|name| Value::String(Cow::Owned(name.to_owned())));
let expected_id = Value::I64(thread_id_integer(thread.id()) as i64);

let tracer = TestTracer(Arc::new(Mutex::new(None)));
let subscriber = tracing_subscriber::registry()
.with(subscriber().with_tracer(tracer.clone()).with_threads(true));

tracing::collect::with_default(subscriber, || {
tracing::debug_span!("request");
});

let attributes = tracer
.with_data(|data| data.builder.attributes.as_ref().unwrap().clone())
.drain(..)
.map(|keyval| (keyval.key.as_str().to_string(), keyval.value))
.collect::<HashMap<_, _>>();
assert_eq!(attributes.get("thread.name"), expected_name.as_ref());
assert_eq!(attributes.get("thread.id"), Some(&expected_id));
}

#[test]
fn excludes_thread() {
let tracer = TestTracer(Arc::new(Mutex::new(None)));
let subscriber = tracing_subscriber::registry()
.with(subscriber().with_tracer(tracer.clone()).with_threads(false));

tracing::collect::with_default(subscriber, || {
tracing::debug_span!("request");
});

let attributes = tracer.with_data(|data| data.builder.attributes.as_ref().unwrap().clone());
let keys = attributes
.iter()
.map(|attr| attr.key.as_str())
.collect::<Vec<&str>>();
assert!(!keys.contains(&"thread.name"));
assert!(!keys.contains(&"thread.id"));
}
}