Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support service correlation for mirroring spans #431

Merged
merged 15 commits into from
Jan 30, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class EnrichedSpanConstants {
public static final String GRPC_REQUEST_URL = "grpc.request.url";
public static final String GRPC_REQUEST_ENDPOINT = "grpc.request.endpoint";
public static final String DROP_TRACE_ATTRIBUTE = "drop.trace";
public static final String CALLER_SERVICE_ID = "CALLER_SERVICE_ID";
public static final String CALLER_SERVICE_NAME = "CALLER_SERVICE_NAME";

/**
* Returns the constant value for the given Enum.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public static Optional<String> getFullHttpUrl(Event event) {
}

public static Optional<String> getDestinationIpAddress(Event event) {
return HttpSemanticConventionUtils.getDestinationIpAddress(event);
return HttpSemanticConventionUtils.getPeerIpAddress(event);
sanket-mundra marked this conversation as resolved.
Show resolved Hide resolved
sanket-mundra marked this conversation as resolved.
Show resolved Hide resolved
}

public static Optional<String> getPath(Event event) {
Expand Down
5 changes: 5 additions & 0 deletions raw-spans-grouper/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ kafka-topic-creator:
partitions: 8
configs:
cleanup.policy: "[compact, delete]"
raw-spans-to-structured-traces-grouping-job-mirroring-exit-spans-state-store-changelog:
sanket-mundra marked this conversation as resolved.
Show resolved Hide resolved
replicationFactor: 3
partitions: 8
configs:
cleanup.policy: "[compact, delete]"

zookeeper:
address: zookeeper:2181
Expand Down
1 change: 1 addition & 0 deletions raw-spans-grouper/raw-spans-grouper/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
because("https://snyk.io/vuln/SNYK-JAVA-ORGGLASSFISHJERSEYCORE-1255637")
}
implementation(project(":span-normalizer:span-normalizer-api"))
implementation(project(":semantic-convention-utils"))
implementation(libs.hypertrace.data.model)
implementation(libs.hypertrace.serviceFramework.framework)
implementation(libs.hypertrace.serviceFramework.metrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public class RawSpanGrouperConstants {
public static final String RAW_SPANS_GROUPER_JOB_CONFIG = "raw-spans-grouper-job-config";
public static final String SPAN_STATE_STORE_NAME = "span-data-store";
public static final String TRACE_STATE_STORE = "trace-state-store";
public static final String MIRRORING_EXIT_SPANS_STATE_STORE = "mirroring-exit-spans-state-store";
sanket-mundra marked this conversation as resolved.
Show resolved Hide resolved
public static final String OUTPUT_TOPIC_PRODUCER = "output-topic-producer";
public static final String SPANS_PER_TRACE_METRIC = "spans_per_trace";
public static final String TRACE_CREATION_TIME = "trace.creation.time";
Expand All @@ -17,4 +18,5 @@ public class RawSpanGrouperConstants {
public static final String DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT = "default.max.span.count";
public static final String DROPPED_SPANS_COUNTER = "hypertrace.dropped.spans";
public static final String TRUNCATED_TRACES_COUNTER = "hypertrace.truncated.traces";
public static final String CALLER_SERVICE_NAME = "caller.service.name";
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.hypertrace.core.rawspansgrouper;

import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.INPUT_TOPIC_CONFIG_KEY;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.MIRRORING_EXIT_SPANS_STATE_STORE;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_CONFIG_KEY;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG;
Expand All @@ -26,6 +27,8 @@
import org.hypertrace.core.kafkastreams.framework.partitioner.GroupPartitionerBuilder;
import org.hypertrace.core.kafkastreams.framework.partitioner.KeyHashPartitioner;
import org.hypertrace.core.serviceframework.config.ConfigClient;
import org.hypertrace.core.spannormalizer.IpResolutionExitSpanIdentity;
import org.hypertrace.core.spannormalizer.IpResolutionStateStoreValue;
import org.hypertrace.core.spannormalizer.SpanIdentity;
import org.hypertrace.core.spannormalizer.TraceIdentity;
import org.hypertrace.core.spannormalizer.TraceState;
Expand Down Expand Up @@ -72,8 +75,17 @@ public StreamsBuilder buildTopology(
Stores.persistentKeyValueStore(SPAN_STATE_STORE_NAME), keySerde, valueSerde)
.withCachingEnabled();

StoreBuilder<KeyValueStore<IpResolutionExitSpanIdentity, IpResolutionStateStoreValue>>
sanket-mundra marked this conversation as resolved.
Show resolved Hide resolved
mirroringExitSpansStoreBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(MIRRORING_EXIT_SPANS_STATE_STORE),
keySerde,
valueSerde)
.withCachingEnabled();

streamsBuilder.addStateStore(spanStoreBuilder);
streamsBuilder.addStateStore(traceStateStoreBuilder);
streamsBuilder.addStateStore(mirroringExitSpansStoreBuilder);

StreamPartitioner<TraceIdentity, StructuredTrace> groupPartitioner =
new GroupPartitionerBuilder<TraceIdentity, StructuredTrace>()
Expand All @@ -93,7 +105,8 @@ public StreamsBuilder buildTopology(
RawSpansProcessor::new,
Named.as(RawSpansProcessor.class.getSimpleName()),
SPAN_STATE_STORE_NAME,
TRACE_STATE_STORE)
TRACE_STATE_STORE,
MIRRORING_EXIT_SPANS_STATE_STORE)
.to(outputTopic, outputTopicProducer);

return streamsBuilder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package org.hypertrace.core.rawspansgrouper;

import static org.hypertrace.core.datamodel.shared.AvroBuilderCache.fastNewBuilder;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.CALLER_SERVICE_NAME;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.DROPPED_SPANS_COUNTER;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.INFLIGHT_TRACE_MAX_SPAN_COUNT;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.MIRRORING_EXIT_SPANS_STATE_STORE;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.OUTPUT_TOPIC_PRODUCER;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.RAW_SPANS_GROUPER_JOB_CONFIG;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY;
Expand All @@ -21,6 +23,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -33,13 +37,24 @@
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.hypertrace.core.datamodel.AttributeValue;
import org.hypertrace.core.datamodel.Attributes;
import org.hypertrace.core.datamodel.Event;
import org.hypertrace.core.datamodel.RawSpan;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.datamodel.Timestamps;
import org.hypertrace.core.datamodel.shared.HexUtils;
import org.hypertrace.core.datamodel.shared.trace.AttributeValueCreator;
import org.hypertrace.core.datamodel.shared.trace.StructuredTraceBuilder;
import org.hypertrace.core.rawspansgrouper.utils.RawSpansGrouperUtils;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
import org.hypertrace.core.spannormalizer.IpResolutionExitSpanIdentity;
import org.hypertrace.core.spannormalizer.IpResolutionStateStoreValue;
import org.hypertrace.core.spannormalizer.SpanIdentity;
import org.hypertrace.core.spannormalizer.TraceIdentity;
import org.hypertrace.core.spannormalizer.TraceState;
import org.hypertrace.semantic.convention.utils.http.HttpSemanticConventionUtils;
import org.hypertrace.semantic.convention.utils.span.SpanSemanticConventionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,11 +71,15 @@ public class RawSpansProcessor
private static final Logger logger = LoggerFactory.getLogger(RawSpansProcessor.class);
private static final String PROCESSING_LATENCY_TIMER =
"hypertrace.rawspansgrouper.processing.latency";
private static final String MIRRORING_SPAN_ATTRIBUTE_NAME_CONFIG =
"mirroring.span.attribute.name";
sanket-mundra marked this conversation as resolved.
Show resolved Hide resolved
private static final ConcurrentMap<String, Timer> tenantToSpansGroupingTimer =
new ConcurrentHashMap<>();
private ProcessorContext context;
private KeyValueStore<SpanIdentity, RawSpan> spanStore;
private KeyValueStore<TraceIdentity, TraceState> traceStateStore;
private KeyValueStore<IpResolutionExitSpanIdentity, IpResolutionStateStoreValue>
mirroringExitSpansStateStore;
private long groupingWindowTimeoutMs;
private To outputTopic;
private double dataflowSamplingPercent = -1;
Expand All @@ -74,6 +93,8 @@ public class RawSpansProcessor
// counter for number of truncated traces per tenant
private static final ConcurrentMap<String, Counter> truncatedTracesCounter =
new ConcurrentHashMap<>();
private String mirroringSpanAttributeName;
private RawSpansGrouperUtils rawSpansGrouperUtils;

@Override
public void init(ProcessorContext context) {
Expand All @@ -82,7 +103,14 @@ public void init(ProcessorContext context) {
(KeyValueStore<SpanIdentity, RawSpan>) context.getStateStore(SPAN_STATE_STORE_NAME);
this.traceStateStore =
(KeyValueStore<TraceIdentity, TraceState>) context.getStateStore(TRACE_STATE_STORE);
this.mirroringExitSpansStateStore =
(KeyValueStore<IpResolutionExitSpanIdentity, IpResolutionStateStoreValue>)
context.getStateStore(MIRRORING_EXIT_SPANS_STATE_STORE);
Config jobConfig = (Config) (context.appConfigs().get(RAW_SPANS_GROUPER_JOB_CONFIG));
this.mirroringSpanAttributeName =
jobConfig.hasPath(MIRRORING_SPAN_ATTRIBUTE_NAME_CONFIG)
? jobConfig.getString(MIRRORING_SPAN_ATTRIBUTE_NAME_CONFIG)
: null;
this.groupingWindowTimeoutMs =
jobConfig.getLong(SPAN_GROUPBY_SESSION_WINDOW_INTERVAL_CONFIG_KEY) * 1000;

Expand All @@ -91,14 +119,14 @@ public void init(ProcessorContext context) {
&& jobConfig.getDouble(DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY) <= 100) {
this.dataflowSamplingPercent = jobConfig.getDouble(DATAFLOW_SAMPLING_PERCENT_CONFIG_KEY);
}
this.rawSpansGrouperUtils = new RawSpansGrouperUtils(dataflowSamplingPercent);

if (jobConfig.hasPath(INFLIGHT_TRACE_MAX_SPAN_COUNT)) {
Config subConfig = jobConfig.getConfig(INFLIGHT_TRACE_MAX_SPAN_COUNT);
subConfig.entrySet().stream()
subConfig
.entrySet()
.forEach(
(entry) -> {
maxSpanCountMap.put(entry.getKey(), subConfig.getLong(entry.getKey()));
});
(entry) -> maxSpanCountMap.put(entry.getKey(), subConfig.getLong(entry.getKey())));
}

if (jobConfig.hasPath(DEFAULT_INFLIGHT_TRACE_MAX_SPAN_COUNT)) {
Expand All @@ -114,15 +142,20 @@ public KeyValue<TraceIdentity, StructuredTrace> transform(TraceIdentity key, Raw
long currentTimeMs = System.currentTimeMillis();

TraceState traceState = traceStateStore.get(key);
boolean firstEntry = (traceState == null);

if (shouldDropSpan(key, traceState)) {
return null;
}

Event event = value.getEvent();
if (isMirroringSpan(event)) {
processMirroringSpan(key, value, traceState, currentTimeMs);
sanket-mundra marked this conversation as resolved.
Show resolved Hide resolved
return null;
}

boolean firstEntry = (traceState == null);
String tenantId = key.getTenantId();
ByteBuffer traceId = value.getTraceId();
ByteBuffer spanId = value.getEvent().getEventId();
ByteBuffer spanId = event.getEventId();
spanStore.put(new SpanIdentity(tenantId, traceId, spanId), value);

/*
Expand Down Expand Up @@ -169,6 +202,91 @@ public KeyValue<TraceIdentity, StructuredTrace> transform(TraceIdentity key, Raw
return null;
}

private void processMirroringSpan(
TraceIdentity key, RawSpan value, TraceState traceState, long currentTimeMs) {
Event event = value.getEvent();
String tenantId = key.getTenantId();
ByteBuffer traceId = value.getTraceId();
boolean firstEntry = (traceState == null);
final Optional<String> maybeEnvironment =
HttpSemanticConventionUtils.getEnvironmentForSpan(event);

if (SpanSemanticConventionUtils.isClientSpanForOCFormat(
event.getAttributes().getAttributeMap())) {
final Optional<String> maybeHostAddr = HttpSemanticConventionUtils.getHostIpAddress(event);
sanket-mundra marked this conversation as resolved.
Show resolved Hide resolved
final Optional<String> maybePeerAddr = HttpSemanticConventionUtils.getPeerIpAddress(event);
final Optional<String> maybePeerPort = HttpSemanticConventionUtils.getPeerPort(event);
if (maybeEnvironment.isPresent()
sanket-mundra marked this conversation as resolved.
Show resolved Hide resolved
&& maybeHostAddr.isPresent()
&& maybePeerAddr.isPresent()
&& maybePeerPort.isPresent()) {
final String serviceName = event.getServiceName();
mirroringExitSpansStateStore.put(
IpResolutionExitSpanIdentity.newBuilder()
.setTenantId(tenantId)
.setEnvironment(maybeEnvironment.get())
.setHostAddr(maybeHostAddr.get())
.setPeerAddr(maybePeerAddr.get())
.setPeerPort(maybePeerPort.get())
.build(),
IpResolutionStateStoreValue.newBuilder().setServiceName(serviceName).build());
}
} else {
final Optional<String> maybePeerAddr = HttpSemanticConventionUtils.getPeerIpAddress(event);
sanket-mundra marked this conversation as resolved.
Show resolved Hide resolved
final Optional<String> maybeHostAddr = HttpSemanticConventionUtils.getHostIpAddress(event);
final Optional<String> maybeHostPort = HttpSemanticConventionUtils.getHostPort(event);
if (maybeEnvironment.isPresent()
&& maybePeerAddr.isPresent()
&& maybeHostAddr.isPresent()
&& maybeHostPort.isPresent()) {
final IpResolutionExitSpanIdentity ipResolutionIdentity =
IpResolutionExitSpanIdentity.newBuilder()
.setTenantId(tenantId)
.setEnvironment(maybeEnvironment.get())
.setHostAddr(maybePeerAddr.get())
.setPeerAddr(maybeHostAddr.get())
.setPeerPort(maybeHostPort.get())
.build();
final IpResolutionStateStoreValue ipResolutionStateStoreValue =
mirroringExitSpansStateStore.get(ipResolutionIdentity);
if (Objects.nonNull(ipResolutionStateStoreValue)) {
event
.getAttributes()
.getAttributeMap()
.put(
CALLER_SERVICE_NAME,
AttributeValueCreator.create(ipResolutionStateStoreValue.getServiceName()));
}
}
}

Timestamps timestamps =
rawSpansGrouperUtils.trackEndToEndLatencyTimestamps(
currentTimeMs, firstEntry ? currentTimeMs : traceState.getTraceStartTimestamp());
StructuredTrace trace =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We build structureTrace in two places - 1) TraceEmitPuncutator 2) Bypass condition.

This seems a third place. What is the condition for this - it's not very clear. Will the same trace be also emitted via TraceEmitPuncuator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the same trace be also emitted via TraceEmitPuncuator as we return in the caller function:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment

StructuredTraceBuilder.buildStructuredTraceFromRawSpans(
List.of(value), traceId, tenantId, timestamps);
context.forward(key, trace, outputTopic);
}

private boolean isMirroringSpan(Event event) {
final Attributes attributes = event.getAttributes();
if (Objects.isNull(attributes)) {
return false;
}

final Map<String, AttributeValue> attributeMap = attributes.getAttributeMap();
if (Objects.isNull(attributeMap)) {
sanket-mundra marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

final String value =
attributeMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have access to EnrichedSpanUtils here? it generally has all this functions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but that is enriched attribute right? here we will not have the enriched attributes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the SpanAttributeUtils.getStringAttributeWithDefault()

.getOrDefault(this.mirroringSpanAttributeName, AttributeValue.newBuilder().build())
.getValue();
return Objects.nonNull(value) && Boolean.parseBoolean(value);
}

private boolean shouldDropSpan(TraceIdentity key, TraceState traceState) {
int inFlightSpansPerTrace =
traceState != null ? traceState.getSpanIds().size() : Integer.MIN_VALUE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.hypertrace.core.rawspansgrouper;

import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.SPANS_PER_TRACE_METRIC;
import static org.hypertrace.core.rawspansgrouper.RawSpanGrouperConstants.TRACE_CREATION_TIME;

import com.google.common.util.concurrent.RateLimiter;
import io.micrometer.core.instrument.Counter;
Expand All @@ -28,11 +27,11 @@
import org.apache.kafka.streams.state.KeyValueStore;
import org.hypertrace.core.datamodel.RawSpan;
import org.hypertrace.core.datamodel.StructuredTrace;
import org.hypertrace.core.datamodel.TimestampRecord;
import org.hypertrace.core.datamodel.Timestamps;
import org.hypertrace.core.datamodel.shared.DataflowMetricUtils;
import org.hypertrace.core.datamodel.shared.HexUtils;
import org.hypertrace.core.datamodel.shared.trace.StructuredTraceBuilder;
import org.hypertrace.core.rawspansgrouper.utils.RawSpansGrouperUtils;
import org.hypertrace.core.serviceframework.metrics.PlatformMetricsRegistry;
import org.hypertrace.core.spannormalizer.SpanIdentity;
import org.hypertrace.core.spannormalizer.TraceIdentity;
Expand Down Expand Up @@ -69,14 +68,13 @@ class TraceEmitPunctuator implements Punctuator {
"hypertrace.rawspansgrouper.trace.with.duplicate.spans";
private static final ConcurrentMap<String, Counter> tenantToTraceWithDuplicateSpansCounter =
new ConcurrentHashMap<>();

private final double dataflowSamplingPercent;
private final TraceIdentity key;
private final ProcessorContext context;
private final KeyValueStore<SpanIdentity, RawSpan> spanStore;
private final KeyValueStore<TraceIdentity, TraceState> traceStateStore;
private final To outputTopicProducer;
private final long groupingWindowTimeoutMs;
private final RawSpansGrouperUtils rawSpansGrouperUtils;
private Cancellable cancellable;

TraceEmitPunctuator(
Expand All @@ -93,7 +91,7 @@ class TraceEmitPunctuator implements Punctuator {
this.traceStateStore = traceStateStore;
this.outputTopicProducer = outputTopicProducer;
this.groupingWindowTimeoutMs = groupingWindowTimeoutMs;
this.dataflowSamplingPercent = dataflowSamplingPercent;
this.rawSpansGrouperUtils = new RawSpansGrouperUtils(dataflowSamplingPercent);
}

public void setCancellable(Cancellable cancellable) {
Expand Down Expand Up @@ -167,7 +165,8 @@ public void punctuate(long timestamp) {

recordSpansPerTrace(rawSpanList.size(), List.of(Tag.of("tenant_id", tenantId)));
Timestamps timestamps =
trackEndToEndLatencyTimestamps(timestamp, traceState.getTraceStartTimestamp());
rawSpansGrouperUtils.trackEndToEndLatencyTimestamps(
timestamp, traceState.getTraceStartTimestamp());
StructuredTrace trace =
StructuredTraceBuilder.buildStructuredTraceFromRawSpans(
rawSpanList, traceId, tenantId, timestamps);
Expand Down Expand Up @@ -235,22 +234,6 @@ public void punctuate(long timestamp) {
}
}

private Timestamps trackEndToEndLatencyTimestamps(
long currentTimestamp, long firstSpanTimestamp) {
Timestamps timestamps = null;
if (!(Math.random() * 100 <= dataflowSamplingPercent)) {
spansGrouperArrivalLagTimer.record(
currentTimestamp - firstSpanTimestamp, TimeUnit.MILLISECONDS);
Map<String, TimestampRecord> records = new HashMap<>();
records.put(
DataflowMetricUtils.SPAN_ARRIVAL_TIME,
new TimestampRecord(DataflowMetricUtils.SPAN_ARRIVAL_TIME, firstSpanTimestamp));
records.put(TRACE_CREATION_TIME, new TimestampRecord(TRACE_CREATION_TIME, currentTimestamp));
timestamps = new Timestamps(records);
}
return timestamps;
}

private void recordSpansPerTrace(double count, Iterable<Tag> tags) {
DistributionSummary summary =
DistributionSummary.builder(SPANS_PER_TRACE_METRIC)
Expand Down
Loading