Skip to content

Commit

Permalink
Currently while generating metrics out of traces, the start timestamp…
Browse files Browse the repository at this point in the history
… of a metric is currently tied to the root span. When a "new" child span appears for trace (eg : unhappy path on an api call which results in a new subspan or an async process that was triggered much later), the time starttimestamp for the new metric for the new child span is that of the parent span(which can be well in the past).

This MR moves the start timestamp from resource level (root span) to metric level(child span)  . (Doesn't consider delta-temporality as of now).
References: open-telemetry#35994
Upstream Fix: open-telemetry#36019
  • Loading branch information
shivanthzen committed Nov 4, 2024
1 parent 3a3b701 commit fb0278f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 39 deletions.
28 changes: 9 additions & 19 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,35 +283,25 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
* - For delta metrics: (T1, T2), (T2, T3), (T3, T4) ...
*/
deltaMetricKeys := make(map[metrics.Key]bool)
startTimeGenerator := func(mk metrics.Key) pcommon.Timestamp {
startTime := rawMetrics.startTimestamp
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
if lastTimestamp, ok := p.lastDeltaTimestamps.Get(mk); ok {
startTime = lastTimestamp
}
// Collect lastDeltaTimestamps keys that need to be updated. Metrics can share the same key, so defer the update.
deltaMetricKeys[mk] = true
}
return startTime
}

sums := rawMetrics.sums
metric := sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameCalls))
sums.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
sums.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality())

if !p.config.Histogram.Disable {
histograms := rawMetrics.histograms
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameDuration))
metric.SetUnit(p.config.Histogram.Unit.String())
histograms.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
histograms.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality())
}

events := rawMetrics.events
if p.events.Enabled {
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameEvents))
events.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
events.BuildMetrics(metric, timestamp, p.config.GetAggregationTemporality())
}

for mk := range deltaMetricKeys {
Expand Down Expand Up @@ -405,13 +395,13 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
}
if !p.config.Histogram.Disable {
// aggregate histogram metrics
h := histograms.GetOrCreate(key, attributes)
h := histograms.GetOrCreate(key, attributes, startTimestamp)
p.addExemplar(span, duration, h)
h.Observe(duration)

}
// aggregate sums metrics
s := sums.GetOrCreate(key, attributes)
s := sums.GetOrCreate(key, attributes, startTimestamp)
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
s.AddExemplar(span.TraceID(), span.SpanID(), duration)
}
Expand All @@ -435,7 +425,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
eAttributes = p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions)
p.metricKeyToDimensions.Add(eKey, eAttributes)
}
e := events.GetOrCreate(eKey, eAttributes)
e := events.GetOrCreate(eKey, eAttributes, startTimestamp)
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
e.AddExemplar(span.TraceID(), span.SpanID(), duration)
}
Expand Down Expand Up @@ -479,8 +469,8 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimesta
if !ok {
v = &resourceMetrics{
histograms: initHistogramMetrics(p.config),
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint, startTimestamp),
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint, startTimestamp),
attributes: attr,
startTimestamp: startTimestamp,
}
Expand Down
43 changes: 23 additions & 20 deletions connector/spanmetricsconnector/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
type Key string

type HistogramMetrics interface {
GetOrCreate(key Key, attributes pcommon.Map) Histogram
BuildMetrics(pmetric.Metric, generateStartTimestamp, pcommon.Timestamp, pmetric.AggregationTemporality)
GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) Histogram
BuildMetrics(pmetric.Metric, pcommon.Timestamp, pmetric.AggregationTemporality)
ClearExemplars()
}

Expand Down Expand Up @@ -47,6 +47,7 @@ type explicitHistogram struct {
bounds []float64

maxExemplarCount *int
startTimestamp pcommon.Timestamp
}

type exponentialHistogram struct {
Expand All @@ -56,6 +57,7 @@ type exponentialHistogram struct {
histogram *structure.Histogram[float64]

maxExemplarCount *int
startTimestamp pcommon.Timestamp
}

type generateStartTimestamp = func(Key) pcommon.Timestamp
Expand All @@ -76,7 +78,7 @@ func NewExplicitHistogramMetrics(bounds []float64, maxExemplarCount *int) Histog
}
}

func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) Histogram {
h, ok := m.metrics[key]
if !ok {
h = &explicitHistogram{
Expand All @@ -94,17 +96,16 @@ func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map)

func (m *explicitHistogramMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
metric.SetEmptyHistogram().SetAggregationTemporality(temporality)
dps := metric.Histogram().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, h := range m.metrics {
for _, h := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
dp.SetTimestamp(timestamp)
dp.SetStartTimestamp(h.startTimestamp)
dp.ExplicitBounds().FromRaw(h.bounds)
dp.BucketCounts().FromRaw(h.bucketCounts)
dp.SetCount(h.count)
Expand All @@ -114,6 +115,7 @@ func (m *explicitHistogramMetrics) BuildMetrics(
}
h.exemplars.CopyTo(dp.Exemplars())
h.attributes.CopyTo(dp.Attributes())
dp.Attributes().PutInt("startTimestamp", int64(h.startTimestamp))
}
}

Expand All @@ -123,7 +125,7 @@ func (m *explicitHistogramMetrics) ClearExemplars() {
}
}

func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) Histogram {
func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimeStamp pcommon.Timestamp) Histogram {
h, ok := m.metrics[key]
if !ok {
histogram := new(structure.Histogram[float64])
Expand All @@ -147,23 +149,23 @@ func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Ma

func (m *exponentialHistogramMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
metric.SetEmptyExponentialHistogram().SetAggregationTemporality(temporality)
dps := metric.ExponentialHistogram().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, m := range m.metrics {
for _, e := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
dp.SetStartTimestamp(e.startTimestamp)
dp.SetTimestamp(timestamp)
expoHistToExponentialDataPoint(m.histogram, dp)
for i := 0; i < m.exemplars.Len(); i++ {
m.exemplars.At(i).SetTimestamp(timestamp)
expoHistToExponentialDataPoint(e.histogram, dp)
for i := 0; i < e.exemplars.Len(); i++ {
e.exemplars.At(i).SetTimestamp(timestamp)
}
m.exemplars.CopyTo(dp.Exemplars())
m.attributes.CopyTo(dp.Attributes())
e.exemplars.CopyTo(dp.Exemplars())
e.attributes.CopyTo(dp.Attributes())
dp.Attributes().PutInt("startTimestamp", int64(e.startTimestamp))
}
}

Expand Down Expand Up @@ -242,13 +244,14 @@ type Sum struct {
count uint64
exemplars pmetric.ExemplarSlice
maxExemplarCount *int
startTimestamp pcommon.Timestamp
}

func (s *Sum) Add(value uint64) {
s.count += value
}

func NewSumMetrics(maxExemplarCount *int) SumMetrics {
func NewSumMetrics(maxExemplarCount *int, startTimeStamp pcommon.Timestamp) SumMetrics {
return SumMetrics{
metrics: make(map[Key]*Sum),
maxExemplarCount: maxExemplarCount,
Expand All @@ -260,13 +263,14 @@ type SumMetrics struct {
maxExemplarCount *int
}

func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map) *Sum {
func (m *SumMetrics) GetOrCreate(key Key, attributes pcommon.Map, startTimestamp pcommon.Timestamp) *Sum {
s, ok := m.metrics[key]
if !ok {
s = &Sum{
attributes: attributes,
exemplars: pmetric.NewExemplarSlice(),
maxExemplarCount: m.maxExemplarCount,
startTimestamp: startTimestamp,
}
m.metrics[key] = s
}
Expand All @@ -285,7 +289,6 @@ func (s *Sum) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value

func (m *SumMetrics) BuildMetrics(
metric pmetric.Metric,
startTimestamp generateStartTimestamp,
timestamp pcommon.Timestamp,
temporality pmetric.AggregationTemporality,
) {
Expand All @@ -294,9 +297,9 @@ func (m *SumMetrics) BuildMetrics(

dps := metric.Sum().DataPoints()
dps.EnsureCapacity(len(m.metrics))
for k, s := range m.metrics {
for _, s := range m.metrics {
dp := dps.AppendEmpty()
dp.SetStartTimestamp(startTimestamp(k))
dp.SetStartTimestamp(s.startTimestamp)
dp.SetTimestamp(timestamp)
dp.SetIntValue(int64(s.count))
for i := 0; i < s.exemplars.Len(); i++ {
Expand Down

0 comments on commit fb0278f

Please sign in to comment.