Skip to content

Commit

Permalink
Customisation support in Jaeger Propagator. (#852)
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyvelichko authored Aug 5, 2022
1 parent c04b6ba commit c15b2f1
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 26 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ jobs:
- name: doc
run: cargo doc --no-deps --all-features
env:
CARGO_INCREMENTAL: '0'
RUSTDOCFLAGS: -Dwarnings
- uses: actions-rs/cargo@v1
with:
Expand Down
6 changes: 6 additions & 0 deletions opentelemetry-jaeger/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v0.16.1

### Changed

- add propagator initialisation with custom headers and baggage prefix

## v0.16.0

### Changed
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-jaeger/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "opentelemetry-jaeger"
version = "0.16.0"
version = "0.16.1"
description = "Jaeger exporter for OpenTelemetry"
homepage = "https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger"
repository = "https://github.com/open-telemetry/opentelemetry-rust/tree/main/opentelemetry-jaeger"
Expand Down
154 changes: 129 additions & 25 deletions opentelemetry-jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ mod exporter;
pub mod testing;

mod propagator {
use once_cell::sync::Lazy;
use opentelemetry::{
global::{self, Error},
propagation::{text_map_propagator::FieldIter, Extractor, Injector, TextMapPropagator},
Expand All @@ -342,8 +341,6 @@ mod propagator {

const TRACE_FLAG_DEBUG: TraceFlags = TraceFlags::new(0x04);

static JAEGER_HEADER_FIELD: Lazy<[String; 1]> = Lazy::new(|| [JAEGER_HEADER.to_owned()]);

/// The Jaeger propagator propagates span contexts in [Jaeger propagation format].
///
/// Cross-cutting concerns send their state to the next process using `Propagator`s,
Expand All @@ -356,9 +353,12 @@ mod propagator {
/// ## Examples
/// ```
/// # use opentelemetry::{global, trace::{Tracer, TraceContextExt}, Context};
/// # use opentelemetry_jaeger::Propagator as JaegerPropagator;
/// # fn send_request() {
/// // setup jaeger propagator
/// global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
/// global::set_text_map_propagator(JaegerPropagator::default());
/// // You also can init propagator with custom header name
/// // global::set_text_map_propagator(JaegerPropagator::with_custom_header("my-custom-header"));
///
/// // before sending requests to downstream services.
/// let mut headers = std::collections::HashMap::new(); // replace by http header of the outgoing request
Expand All @@ -374,7 +374,9 @@ mod propagator {
/// # fn receive_request() {
/// // Receive the request sent above on the other service...
/// // setup jaeger propagator
/// global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
/// global::set_text_map_propagator(JaegerPropagator::new());
/// // You also can init propagator with custom header name
/// // global::set_text_map_propagator(JaegerPropagator::with_custom_header("my-custom-header"));
///
/// let headers = std::collections::HashMap::new(); // replace this with http header map from incoming requests.
/// let parent_context = global::get_text_map_propagator(|propagator| {
Expand All @@ -387,20 +389,63 @@ mod propagator {
/// ```
///
/// [jaeger propagation format]: https://www.jaegertracing.io/docs/1.18/client-libraries/#propagation-format
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug)]
pub struct Propagator {
_private: (),
baggage_prefix: &'static str,
header_name: &'static str,
fields: [String; 1],
}

// Implement default using Propagator::new() to not break compatibility with previous versions
impl Default for Propagator {
fn default() -> Self {
Propagator::new()
}
}

impl Propagator {
/// Create a Jaeger propagator
pub fn new() -> Self {
Propagator::default()
Self::with_custom_header_and_baggage(JAEGER_HEADER, JAEGER_BAGGAGE_PREFIX)
}

/// Create a Jaeger propagator with custom header name
pub fn with_custom_header(custom_header_name: &'static str) -> Self {
Self::with_custom_header_and_baggage(custom_header_name, JAEGER_BAGGAGE_PREFIX)
}

/// Create a Jaeger propagator with custom header name and baggage prefix
///
/// NOTE: it's implicitly fallback to the default header names when the ane of provided custom_* is empty
/// Default header-name is `uber-trace-id` and baggage-prefix is `uberctx-`
/// The format of serialized context and baggage's stays unchanged and not depending
/// on provided header name and prefix.
pub fn with_custom_header_and_baggage(
custom_header_name: &'static str,
custom_baggage_prefix: &'static str,
) -> Self {
let custom_header_name = if custom_header_name.trim().is_empty() {
JAEGER_HEADER
} else {
custom_header_name
};

let custom_baggage_prefix = if custom_baggage_prefix.trim().is_empty() {
JAEGER_BAGGAGE_PREFIX
} else {
custom_baggage_prefix
};

Propagator {
baggage_prefix: custom_baggage_prefix.trim(),
header_name: custom_header_name.trim(),
fields: [custom_header_name.to_owned()],
}
}

/// Extract span context from header value
fn extract_span_context(&self, extractor: &dyn Extractor) -> Result<SpanContext, ()> {
let mut header_value = Cow::from(extractor.get(JAEGER_HEADER).unwrap_or(""));
let mut header_value = Cow::from(extractor.get(self.header_name).unwrap_or(""));
// if there is no :, it means header_value could be encoded as url, try decode first
if !header_value.contains(':') {
header_value = Cow::from(header_value.replace("%3A", ":"));
Expand Down Expand Up @@ -464,17 +509,17 @@ mod propagator {
}

fn extract_trace_state(&self, extractor: &dyn Extractor) -> Result<TraceState, ()> {
let uber_context_keys = extractor
let baggage_keys = extractor
.keys()
.into_iter()
.filter(|key| key.starts_with(JAEGER_BAGGAGE_PREFIX))
.filter(|key| key.starts_with(self.baggage_prefix))
.filter_map(|key| {
extractor
.get(key)
.map(|value| (key.to_string(), value.to_string()))
});

match TraceState::from_key_value(uber_context_keys) {
match TraceState::from_key_value(baggage_keys) {
Ok(trace_state) => Ok(trace_state),
Err(trace_state_err) => {
global::handle_error(Error::Trace(TraceError::Other(Box::new(
Expand All @@ -501,13 +546,13 @@ mod propagator {
0x00
};
let header_value = format!(
"{:032x}:{:016x}:{:01}:{:01}",
"{:032x}:{:016x}:{:01}:{:01x}",
span_context.trace_id(),
span_context.span_id(),
DEPRECATED_PARENT_SPAN,
flag,
);
injector.set(JAEGER_HEADER, header_value);
injector.set(self.header_name, header_value);
}
}

Expand All @@ -518,7 +563,7 @@ mod propagator {
}

fn fields(&self) -> FieldIter<'_> {
FieldIter::new(JAEGER_HEADER_FIELD.as_ref())
FieldIter::new(self.fields.as_ref())
}
}

Expand Down Expand Up @@ -645,6 +690,32 @@ mod propagator {
]
}

/// Try to extract the context using the created Propagator with custom header name
/// from the Extractor under the `context_key` key.
fn _test_extract_with_header(construct_header: &'static str, context_key: &'static str) {
let propagator = Propagator::with_custom_header(construct_header);
for (trace_id, span_id, flag, expected) in get_extract_data() {
let mut map: HashMap<String, String> = HashMap::new();
map.set(context_key, format!("{}:{}:0:{}", trace_id, span_id, flag));
let context = propagator.extract(&map);
assert_eq!(context.span().span_context(), &expected);
}
}

/// Try to inject the context using the created Propagator with custom header name
/// and expect the serialized context existence under `expect_header` key.
fn _test_inject_with_header(construct_header: &'static str, expect_header: &'static str) {
let propagator = Propagator::with_custom_header(construct_header);
for (span_context, header_value) in get_inject_data() {
let mut injector = HashMap::new();
propagator.inject_context(
&Context::current_with_span(TestSpan(span_context)),
&mut injector,
);
assert_eq!(injector.get(expect_header), Some(&header_value));
}
}

#[test]
fn test_extract_empty() {
let map: HashMap<String, String> = HashMap::new();
Expand All @@ -654,14 +725,22 @@ mod propagator {
}

#[test]
fn test_extract() {
fn test_inject_extract_with_default() {
let propagator = Propagator::default();
for (span_context, header_value) in get_inject_data() {
let mut injector = HashMap::new();
propagator.inject_context(
&Context::current_with_span(TestSpan(span_context)),
&mut injector,
);
assert_eq!(injector.get(JAEGER_HEADER), Some(&header_value));
}
for (trace_id, span_id, flag, expected) in get_extract_data() {
let mut map: HashMap<String, String> = HashMap::new();
map.set(
JAEGER_HEADER,
format!("{}:{}:0:{}", trace_id, span_id, flag),
);
let propagator = Propagator::new();
let context = propagator.extract(&map);
assert_eq!(context.span().span_context(), &expected);
}
Expand Down Expand Up @@ -712,16 +791,41 @@ mod propagator {
);
}

#[test]
fn test_extract() {
_test_extract_with_header(JAEGER_HEADER, JAEGER_HEADER)
}

#[test]
fn test_inject() {
let propagator = Propagator::new();
for (span_context, header_value) in get_inject_data() {
let mut injector = HashMap::new();
propagator.inject_context(
&Context::current_with_span(TestSpan(span_context)),
&mut injector,
);
assert_eq!(injector.get(JAEGER_HEADER), Some(&header_value));
_test_inject_with_header(JAEGER_HEADER, JAEGER_HEADER)
}

#[test]
fn test_extract_with_invalid_header() {
for construct in &["", " "] {
_test_extract_with_header(construct, JAEGER_HEADER)
}
}

#[test]
fn test_extract_with_valid_header() {
for construct in &["custom-header", "custom-header ", " custom-header "] {
_test_extract_with_header(construct, "custom-header")
}
}

#[test]
fn test_inject_with_invalid_header() {
for construct in &["", " "] {
_test_inject_with_header(construct, JAEGER_HEADER)
}
}

#[test]
fn test_inject_with_valid_header() {
for construct in &["custom-header", "custom-header ", " custom-header "] {
_test_inject_with_header(construct, "custom-header")
}
}
}
Expand Down

0 comments on commit c15b2f1

Please sign in to comment.