Skip to content

Commit

Permalink
Deduplicate event attributes (#6649)
Browse files Browse the repository at this point in the history
  • Loading branch information
tninesling authored Jan 27, 2025
1 parent cbe4c32 commit bfad5b2
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 9 deletions.
10 changes: 8 additions & 2 deletions apollo-router/src/plugins/telemetry/dynamic_attribute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,16 @@ impl EventDynAttribute for ::tracing::Span {
match extensions.get_mut::<OtelData>() {
Some(otel_data) => match &mut otel_data.event_attributes {
Some(event_attributes) => {
event_attributes.extend(attributes);
event_attributes.extend(
attributes.map(|KeyValue { key, value }| (key, value)),
);
}
None => {
otel_data.event_attributes = Some(attributes.collect());
otel_data.event_attributes = Some(
attributes
.map(|KeyValue { key, value }| (key, value))
.collect(),
);
}
},
None => {
Expand Down
105 changes: 105 additions & 0 deletions apollo-router/src/plugins/telemetry/fmt_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,111 @@ connector:
insta::assert_snapshot!(buff.to_string());
}

#[tokio::test]
async fn test_json_logging_deduplicates_attributes() {
let buff = LogBuffer::default();
let text_format = JsonFormat {
display_span_list: false,
display_current_span: false,
display_resource: false,
..Default::default()
};
let format = Json::new(Default::default(), text_format);
let fmt_layer = FmtLayer::new(
FilteringFormatter::new(format, filter_metric_events, &RateLimit::default()),
buff.clone(),
)
.boxed();

let event_config: events::Events = serde_yaml::from_str(
r#"
subgraph:
request: info
response: warn
error: error
event.with.duplicate.attribute:
message: "this event has a duplicate attribute"
level: error
on: response
attributes:
subgraph.name: true
static: foo # This shows up twice without attribute deduplication
"#,
)
.unwrap();

::tracing::subscriber::with_default(
fmt::Subscriber::new()
.with(otel::layer().force_sampling())
.with(fmt_layer),
move || {
let test_span = info_span!("test");
let _enter = test_span.enter();

let router_events = event_config.new_router_events();
let supergraph_events = event_config.new_supergraph_events();
let subgraph_events = event_config.new_subgraph_events();

// In: Router -> Supergraph -> Subgraphs
let router_req = router::Request::fake_builder().build().unwrap();
router_events.on_request(&router_req);

let supergraph_req = supergraph::Request::fake_builder()
.query("query { foo }")
.build()
.unwrap();
supergraph_events.on_request(&supergraph_req);

let subgraph_req_1 = subgraph::Request::fake_builder()
.subgraph_name("subgraph")
.subgraph_request(http::Request::new(
graphql::Request::fake_builder()
.query("query { foo }")
.build(),
))
.build();
subgraph_events.on_request(&subgraph_req_1);

let subgraph_req_2 = subgraph::Request::fake_builder()
.subgraph_name("subgraph_bis")
.subgraph_request(http::Request::new(
graphql::Request::fake_builder()
.query("query { foo }")
.build(),
))
.build();
subgraph_events.on_request(&subgraph_req_2);

// Out: Subgraphs -> Supergraph -> Router
let subgraph_resp_1 = subgraph::Response::fake2_builder()
.data(serde_json::json!({"products": [{"id": 1234, "name": "first_name"}, {"id": 567, "name": "second_name"}]}))
.build()
.expect("expecting valid response");
subgraph_events.on_response(&subgraph_resp_1);

let subgraph_resp_2 = subgraph::Response::fake2_builder()
.data(serde_json::json!({"products": [{"id": 1234, "name": "first_name"}, {"id": 567, "name": "second_name"}], "other": {"foo": "bar"}}))
.build()
.expect("expecting valid response");
subgraph_events.on_response(&subgraph_resp_2);

let supergraph_resp = supergraph::Response::fake_builder()
.data(serde_json::json!({"data": "res"}).to_string())
.build()
.expect("expecting valid response");
supergraph_events.on_response(&supergraph_resp);

let router_resp = router::Response::fake_builder()
.data(serde_json_bytes::json!({"data": "res"}))
.build()
.expect("expecting valid response");
router_events.on_response(&router_resp);
},
);

insta::assert_snapshot!(buff.to_string());
}

#[tokio::test]
async fn test_text_logging_with_custom_events_with_instrumented() {
let buff = LogBuffer::default();
Expand Down
10 changes: 8 additions & 2 deletions apollo-router/src/plugins/telemetry/formatters/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,18 @@ where
Some(attrs) => Some(attrs),
None => {
let event_attributes = extensions.get_mut::<EventAttributes>();
event_attributes.map(|event_attributes| event_attributes.take())
event_attributes.map(|event_attributes| {
event_attributes
.take()
.into_iter()
.map(|KeyValue { key, value }| (key, value))
.collect()
})
}
}
};
if let Some(event_attributes) = event_attributes {
for KeyValue { key, value } in event_attributes {
for (key, value) in event_attributes {
serializer.serialize_entry(key.as_str(), &AttributeValue::from(value))?;
}
}
Expand Down
10 changes: 8 additions & 2 deletions apollo-router/src/plugins/telemetry/formatters/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,17 @@ where
Some(attrs) => Some(attrs),
None => {
let event_attributes = extensions.get_mut::<EventAttributes>();
event_attributes.map(|event_attributes| event_attributes.take())
event_attributes.map(|event_attributes| {
event_attributes
.take()
.into_iter()
.map(|KeyValue { key, value }| (key, value))
.collect()
})
}
};
if let Some(event_attributes) = event_attributes {
for KeyValue { key, value } in event_attributes {
for (key, value) in event_attributes {
default_visitor.log_debug_attrs(key.as_str(), &value);
}
}
Expand Down
6 changes: 5 additions & 1 deletion apollo-router/src/plugins/telemetry/otel/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,11 @@ where
let event_attributes = otel_data.as_ref().and_then(|o| o.event_attributes.clone());

if let Some(event_attributes) = event_attributes {
otel_event.attributes.extend(event_attributes)
otel_event.attributes.extend(
event_attributes
.into_iter()
.map(|(k, v)| KeyValue::new(k, v)),
)
}
}

Expand Down
8 changes: 6 additions & 2 deletions apollo-router/src/plugins/telemetry/otel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ pub(crate) mod tracer;

pub(crate) use layer::layer;
pub(crate) use layer::OpenTelemetryLayer;
use opentelemetry::KeyValue;
use opentelemetry::Key;
use opentelemetry::Value;
pub(crate) use span_ext::OpenTelemetrySpanExt;
pub(crate) use tracer::PreSampledTracer;

Expand All @@ -24,7 +25,10 @@ pub(crate) struct OtelData {
pub(crate) builder: opentelemetry::trace::SpanBuilder,

/// Attributes gathered for the next event
pub(crate) event_attributes: Option<Vec<KeyValue>>,
#[cfg(not(test))]
pub(crate) event_attributes: Option<ahash::HashMap<Key, Value>>,
#[cfg(test)]
pub(crate) event_attributes: Option<indexmap::IndexMap<Key, Value>>,

/// Forced status in case it's coming from the custom attributes
pub(crate) forced_status: Option<opentelemetry::trace::Status>,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
source: apollo-router/src/plugins/telemetry/fmt_layer.rs
expression: buff.to_string()
---
{"timestamp":"[timestamp]","level":"ERROR","trace_id":"00000000000000000000000000000000","span_id":"0000000000000000","static":"foo","subgraph.name":"subgraph_bis","message":"this event has a duplicate attribute","kind":"event.with.duplicate.attribute","target":"apollo_router::plugins::telemetry::config_new::events"}
{"timestamp":"[timestamp]","level":"ERROR","trace_id":"00000000000000000000000000000000","span_id":"0000000000000000","static":"foo","message":"this event has a duplicate attribute","kind":"event.with.duplicate.attribute","target":"apollo_router::plugins::telemetry::config_new::events"}

0 comments on commit bfad5b2

Please sign in to comment.