diff --git a/processor/spanmetricsprocessor/README.md b/processor/spanmetricsprocessor/README.md index e1f3b7372800..ed4fd46cb485 100644 --- a/processor/spanmetricsprocessor/README.md +++ b/processor/spanmetricsprocessor/README.md @@ -74,6 +74,8 @@ The following settings can be optionally configured: - `attach_span_and_trace_id` attaches span id and trace id as attributes on metrics generated from spans if set to `true`. If not provided, will use default value of `false`. + - `inherit_instrumentation_library_name`: defines whether the metrics generated from spans should inherit the same instrumentation library name as the span. If not provided, will use default value of `false` which will define the instrumentation library name as `spanmetricsprocessor` on metrics. + ## Examples The following is a simple example usage of the spanmetrics processor. @@ -116,7 +118,8 @@ processors: default: us-east-1 - name: host_id resource_attributes_cache_size: 1000 - aggregation_temporality: "AGGREGATION_TEMPORALITY_DELTA" + aggregation_temporality: "AGGREGATION_TEMPORALITY_DELTA" + inherit_instrumentation_library_name: true exporters: jaeger: diff --git a/processor/spanmetricsprocessor/config.go b/processor/spanmetricsprocessor/config.go index 2e2d4551dd6d..1a9caef19305 100644 --- a/processor/spanmetricsprocessor/config.go +++ b/processor/spanmetricsprocessor/config.go @@ -73,6 +73,11 @@ type Config struct { // AttachSpanAndTraceID attaches span id and trace id to metrics generated from spans. // The default value is set to `false`. AttachSpanAndTraceID bool `mapstructure:"attach_span_and_trace_id"` + + // InheritInstrumentationLibraryName defines whether metrics generated from spans should inherit + // the instrumentation library name from the span. + // Optional. The default value is `false` which will define the instrumentation library name on metrics as `spanmetricsprocessor`. + InheritInstrumentationLibraryName bool `mapstructure:"inherit_instrumentation_library_name"` } // GetAggregationTemporality converts the string value given in the config into a MetricAggregationTemporality. diff --git a/processor/spanmetricsprocessor/config_test.go b/processor/spanmetricsprocessor/config_test.go index 93a9f1f346d2..b4b06143ddbf 100644 --- a/processor/spanmetricsprocessor/config_test.go +++ b/processor/spanmetricsprocessor/config_test.go @@ -38,31 +38,34 @@ func TestLoadConfig(t *testing.T) { defaultMethod := "GET" defaultRegion := "us-east-1" testcases := []struct { - configFile string - wantMetricsExporter string - wantLatencyHistogramBuckets []time.Duration - wantDimensions []Dimension - wantDimensionsCacheSize int - wantResourceAttributes []Dimension - wantResourceAttributesCacheSize int - wantAggregationTemporality string - wantAttachSpanAndTraceID bool + configFile string + wantMetricsExporter string + wantLatencyHistogramBuckets []time.Duration + wantDimensions []Dimension + wantDimensionsCacheSize int + wantResourceAttributes []Dimension + wantResourceAttributesCacheSize int + wantAggregationTemporality string + wantAttachSpanAndTraceID bool + wantInheritInstrumentationLibraryName bool }{ { - configFile: "config-2-pipelines.yaml", - wantMetricsExporter: "prometheus", - wantAggregationTemporality: cumulative, - wantDimensionsCacheSize: 500, - wantResourceAttributesCacheSize: 300, - wantAttachSpanAndTraceID: true, + configFile: "config-2-pipelines.yaml", + wantMetricsExporter: "prometheus", + wantAggregationTemporality: cumulative, + wantDimensionsCacheSize: 500, + wantResourceAttributesCacheSize: 300, + wantAttachSpanAndTraceID: true, + wantInheritInstrumentationLibraryName: true, }, { - configFile: "config-3-pipelines.yaml", - wantMetricsExporter: "otlp/spanmetrics", - wantAggregationTemporality: cumulative, - wantDimensionsCacheSize: defaultDimensionsCacheSize, - wantResourceAttributesCacheSize: defaultResourceAttributesCacheSize, - wantAttachSpanAndTraceID: false, + configFile: "config-3-pipelines.yaml", + wantMetricsExporter: "otlp/spanmetrics", + wantAggregationTemporality: cumulative, + wantDimensionsCacheSize: defaultDimensionsCacheSize, + wantResourceAttributesCacheSize: defaultResourceAttributesCacheSize, + wantAttachSpanAndTraceID: false, + wantInheritInstrumentationLibraryName: false, }, { configFile: "config-full.yaml", @@ -85,9 +88,10 @@ func TestLoadConfig(t *testing.T) { {"region", &defaultRegion}, {"host_id", nil}, }, - wantResourceAttributesCacheSize: 3000, - wantAggregationTemporality: delta, - wantAttachSpanAndTraceID: false, + wantResourceAttributesCacheSize: 3000, + wantAggregationTemporality: delta, + wantAttachSpanAndTraceID: false, + wantInheritInstrumentationLibraryName: false, }, } for _, tc := range testcases { @@ -116,15 +120,16 @@ func TestLoadConfig(t *testing.T) { require.NotNil(t, cfg) assert.Equal(t, &Config{ - ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), - MetricsExporter: tc.wantMetricsExporter, - LatencyHistogramBuckets: tc.wantLatencyHistogramBuckets, - Dimensions: tc.wantDimensions, - DimensionsCacheSize: tc.wantDimensionsCacheSize, - ResourceAttributes: tc.wantResourceAttributes, - ResourceAttributesCacheSize: tc.wantResourceAttributesCacheSize, - AggregationTemporality: tc.wantAggregationTemporality, - AttachSpanAndTraceID: tc.wantAttachSpanAndTraceID, + ProcessorSettings: config.NewProcessorSettings(config.NewID(typeStr)), + MetricsExporter: tc.wantMetricsExporter, + LatencyHistogramBuckets: tc.wantLatencyHistogramBuckets, + Dimensions: tc.wantDimensions, + DimensionsCacheSize: tc.wantDimensionsCacheSize, + ResourceAttributes: tc.wantResourceAttributes, + ResourceAttributesCacheSize: tc.wantResourceAttributesCacheSize, + AggregationTemporality: tc.wantAggregationTemporality, + AttachSpanAndTraceID: tc.wantAttachSpanAndTraceID, + InheritInstrumentationLibraryName: tc.wantInheritInstrumentationLibraryName, }, cfg.Processors[config.NewID(typeStr)], ) diff --git a/processor/spanmetricsprocessor/processor.go b/processor/spanmetricsprocessor/processor.go index e7592fdc0b7d..ee062b40bdc6 100644 --- a/processor/spanmetricsprocessor/processor.go +++ b/processor/spanmetricsprocessor/processor.go @@ -65,12 +65,21 @@ type exemplarData struct { value float64 } +type aggregationMeta struct { + serviceName string + instrLibName instrLibKey + resourceAttr pdata.AttributeMap +} + // metricKey is used to carry the stringified metric attributes type metricKey string // resourceKey is used to carry the stringified resource attributes type resourceKey string +// instrLibKey is used to carry the stringified instrumentation library name +type instrLibKey string + type processorImp struct { // ConsumeTraces() of each instance might be called concurrently from its upstream component in the pipeline. // As this processor is stateful. Due to the nature of its logic, the concurrent executions of ConsumeTraces() will @@ -92,14 +101,14 @@ type processorImp struct { startTime time.Time // Call & Error counts. - callSum map[resourceKey]map[metricKey]int64 + callSum map[resourceKey]map[instrLibKey]map[metricKey]int64 // Latency histogram. - latencyCount map[resourceKey]map[metricKey]uint64 - latencySum map[resourceKey]map[metricKey]float64 - latencyBucketCounts map[resourceKey]map[metricKey][]uint64 + latencyCount map[resourceKey]map[instrLibKey]map[metricKey]uint64 + latencySum map[resourceKey]map[instrLibKey]map[metricKey]float64 + latencyBucketCounts map[resourceKey]map[instrLibKey]map[metricKey][]uint64 latencyBounds []float64 - latencyExemplarsData map[resourceKey]map[metricKey][]exemplarData + latencyExemplarsData map[resourceKey]map[instrLibKey]map[metricKey][]exemplarData // An LRU cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values: // e.g. { "foo/barOK": { "serviceName": "foo", "operation": "/bar", "status_code": "OK" }} @@ -109,6 +118,9 @@ type processorImp struct { // Defines whether metrics generated from spans should attach span and trace id as dimensions. attachSpanAndTraceID bool + + // Defines whether metrics should inherit instrumentation library name from span + inheritInstrumentationLibraryName bool } func newProcessor(logger *zap.Logger, config config.Processor, nextConsumer consumer.Traces) (*processorImp, error) { @@ -143,21 +155,22 @@ func newProcessor(logger *zap.Logger, config config.Processor, nextConsumer cons } return &processorImp{ - logger: logger, - config: *pConfig, - startTime: time.Now(), - callSum: make(map[resourceKey]map[metricKey]int64), - latencyBounds: bounds, - latencySum: make(map[resourceKey]map[metricKey]float64), - latencyCount: make(map[resourceKey]map[metricKey]uint64), - latencyBucketCounts: make(map[resourceKey]map[metricKey][]uint64), - latencyExemplarsData: make(map[resourceKey]map[metricKey][]exemplarData), - nextConsumer: nextConsumer, - dimensions: pConfig.Dimensions, - resourceAttributes: pConfig.ResourceAttributes, - resourceKeyToDimensions: resourceKeyToDimensionsCache, - metricKeyToDimensions: metricKeyToDimensionsCache, - attachSpanAndTraceID: pConfig.AttachSpanAndTraceID, + logger: logger, + config: *pConfig, + startTime: time.Now(), + callSum: make(map[resourceKey]map[instrLibKey]map[metricKey]int64), + latencyBounds: bounds, + latencySum: make(map[resourceKey]map[instrLibKey]map[metricKey]float64), + latencyCount: make(map[resourceKey]map[instrLibKey]map[metricKey]uint64), + latencyBucketCounts: make(map[resourceKey]map[instrLibKey]map[metricKey][]uint64), + latencyExemplarsData: make(map[resourceKey]map[instrLibKey]map[metricKey][]exemplarData), + nextConsumer: nextConsumer, + dimensions: pConfig.Dimensions, + resourceAttributes: pConfig.ResourceAttributes, + resourceKeyToDimensions: resourceKeyToDimensionsCache, + metricKeyToDimensions: metricKeyToDimensionsCache, + attachSpanAndTraceID: pConfig.AttachSpanAndTraceID, + inheritInstrumentationLibraryName: pConfig.InheritInstrumentationLibraryName, }, nil } @@ -303,20 +316,17 @@ func (p *processorImp) buildMetrics() (*pdata.Metrics, error) { return true }) - ilm := rm.InstrumentationLibraryMetrics().AppendEmpty() - ilm.InstrumentationLibrary().SetName(instrumentationLibraryName) - // build metrics per resource resourceAttrKey, ok := key.(resourceKey) if !ok { return nil, errors.New("resource key type assertion failed") } - if err := p.collectCallMetrics(ilm, resourceAttrKey); err != nil { + if err := p.collectCallMetrics(rm, resourceAttrKey); err != nil { return nil, err } - if err := p.collectLatencyMetrics(ilm, resourceAttrKey); err != nil { + if err := p.collectLatencyMetrics(rm, resourceAttrKey); err != nil { return nil, err } @@ -326,64 +336,72 @@ func (p *processorImp) buildMetrics() (*pdata.Metrics, error) { // collectLatencyMetrics collects the raw latency metrics, writing the data // into the given instrumentation library metrics. -func (p *processorImp) collectLatencyMetrics(ilm pdata.InstrumentationLibraryMetrics, resAttrKey resourceKey) error { - for mKey := range p.latencyCount[resAttrKey] { - mLatency := ilm.Metrics().AppendEmpty() - mLatency.SetDataType(pdata.MetricDataTypeHistogram) - mLatency.SetName("latency") - mLatency.Histogram().SetAggregationTemporality(p.config.GetAggregationTemporality()) - - timestamp := pdata.TimestampFromTime(time.Now()) - - dpLatency := mLatency.Histogram().DataPoints().AppendEmpty() - dpLatency.SetStartTimestamp(pdata.TimestampFromTime(p.startTime)) - dpLatency.SetTimestamp(timestamp) - dpLatency.SetExplicitBounds(p.latencyBounds) - dpLatency.SetBucketCounts(p.latencyBucketCounts[resAttrKey][mKey]) - dpLatency.SetCount(p.latencyCount[resAttrKey][mKey]) - dpLatency.SetSum(p.latencySum[resAttrKey][mKey]) - - setLatencyExemplars(p.latencyExemplarsData[resAttrKey][mKey], timestamp, dpLatency.Exemplars()) - - dimensions, err := p.getDimensionsByMetricKey(mKey) - if err != nil { - p.logger.Error(err.Error()) - return err +func (p *processorImp) collectLatencyMetrics(rm pdata.ResourceMetrics, resAttrKey resourceKey) error { + for libKey := range p.latencyCount[resAttrKey] { + ilm := rm.InstrumentationLibraryMetrics().AppendEmpty() + ilm.InstrumentationLibrary().SetName(string(libKey)) + for mKey := range p.latencyCount[resAttrKey][libKey] { + mLatency := ilm.Metrics().AppendEmpty() + mLatency.SetDataType(pdata.MetricDataTypeHistogram) + mLatency.SetName("latency") + mLatency.Histogram().SetAggregationTemporality(p.config.GetAggregationTemporality()) + + timestamp := pdata.TimestampFromTime(time.Now()) + + dpLatency := mLatency.Histogram().DataPoints().AppendEmpty() + dpLatency.SetStartTimestamp(pdata.TimestampFromTime(p.startTime)) + dpLatency.SetTimestamp(timestamp) + dpLatency.SetExplicitBounds(p.latencyBounds) + dpLatency.SetBucketCounts(p.latencyBucketCounts[resAttrKey][libKey][mKey]) + dpLatency.SetCount(p.latencyCount[resAttrKey][libKey][mKey]) + dpLatency.SetSum(p.latencySum[resAttrKey][libKey][mKey]) + + setLatencyExemplars(p.latencyExemplarsData[resAttrKey][libKey][mKey], timestamp, dpLatency.Exemplars()) + + dimensions, err := p.getDimensionsByMetricKey(mKey) + if err != nil { + p.logger.Error(err.Error()) + return err + } + + dimensions.Range(func(k string, v pdata.AttributeValue) bool { + dpLatency.LabelsMap().Upsert(k, tracetranslator.AttributeValueToString(v)) + return true + }) } - - dimensions.Range(func(k string, v pdata.AttributeValue) bool { - dpLatency.LabelsMap().Upsert(k, tracetranslator.AttributeValueToString(v)) - return true - }) } return nil } // collectCallMetrics collects the raw call count metrics, writing the data // into the given instrumentation library metrics. -func (p *processorImp) collectCallMetrics(ilm pdata.InstrumentationLibraryMetrics, resAttrKey resourceKey) error { - for mKey := range p.callSum[resAttrKey] { - mCalls := ilm.Metrics().AppendEmpty() - mCalls.SetDataType(pdata.MetricDataTypeIntSum) - mCalls.SetName("calls_total") - mCalls.IntSum().SetIsMonotonic(true) - mCalls.IntSum().SetAggregationTemporality(p.config.GetAggregationTemporality()) - - dpCalls := mCalls.IntSum().DataPoints().AppendEmpty() - dpCalls.SetStartTimestamp(pdata.TimestampFromTime(p.startTime)) - dpCalls.SetTimestamp(pdata.TimestampFromTime(time.Now())) - dpCalls.SetValue(p.callSum[resAttrKey][mKey]) - - dimensions, err := p.getDimensionsByMetricKey(mKey) - if err != nil { - p.logger.Error(err.Error()) - return err +func (p *processorImp) collectCallMetrics(rm pdata.ResourceMetrics, resAttrKey resourceKey) error { + for libKey := range p.callSum[resAttrKey] { + ilm := rm.InstrumentationLibraryMetrics().AppendEmpty() + ilm.InstrumentationLibrary().SetName(string(libKey)) + for mKey := range p.callSum[resAttrKey][libKey] { + mCalls := ilm.Metrics().AppendEmpty() + mCalls.SetDataType(pdata.MetricDataTypeIntSum) + mCalls.SetName("calls_total") + mCalls.IntSum().SetIsMonotonic(true) + mCalls.IntSum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + + dpCalls := mCalls.IntSum().DataPoints().AppendEmpty() + dpCalls.SetStartTimestamp(pdata.TimestampFromTime(p.startTime)) + dpCalls.SetTimestamp(pdata.TimestampFromTime(time.Now())) + dpCalls.SetValue(p.callSum[resAttrKey][libKey][mKey]) + + dimensions, err := p.getDimensionsByMetricKey(mKey) + if err != nil { + p.logger.Error(err.Error()) + return err + } + + dimensions.Range(func(k string, v pdata.AttributeValue) bool { + dpCalls.LabelsMap().Upsert(k, tracetranslator.AttributeValueToString(v)) + return true + }) } - - dimensions.Range(func(k string, v pdata.AttributeValue) bool { - dpCalls.LabelsMap().Upsert(k, tracetranslator.AttributeValueToString(v)) - return true - }) } return nil } @@ -420,39 +438,56 @@ func (p *processorImp) aggregateMetrics(traces pdata.Traces) { func (p *processorImp) aggregateMetricsForServiceSpans(rspans pdata.ResourceSpans, serviceName string) { ilsSlice := rspans.InstrumentationLibrarySpans() + instrLibName := instrLibKey(instrumentationLibraryName) for j := 0; j < ilsSlice.Len(); j++ { ils := ilsSlice.At(j) + + // if config is set to inherit instrumentation library name, then assume from trace + // otherwise use default + if p.inheritInstrumentationLibraryName { + instrLibName = instrLibKey(ils.InstrumentationLibrary().Name()) + } + spans := ils.Spans() for k := 0; k < spans.Len(); k++ { span := spans.At(k) - p.aggregateMetricsForSpan(serviceName, span, rspans.Resource().Attributes()) + aggrMeta := aggregationMeta{ + serviceName: serviceName, + resourceAttr: rspans.Resource().Attributes(), + instrLibName: instrLibName, + } + p.aggregateMetricsForSpan(span, aggrMeta) } } } -func (p *processorImp) aggregateMetricsForSpan(serviceName string, span pdata.Span, resourceAttr pdata.AttributeMap) { +func (p *processorImp) aggregateMetricsForSpan(span pdata.Span, aggregationMeta aggregationMeta) { latencyInMilliseconds := float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds()) // Binary search to find the latencyInMilliseconds bucket index. index := sort.SearchFloat64s(p.latencyBounds, latencyInMilliseconds) - mKey := p.buildMetricKey(span, resourceAttr, p.attachSpanAndTraceID) - resourceAttrKey := p.buildResourceAttrKey(serviceName, resourceAttr) + mKey := p.buildMetricKey(span, aggregationMeta.resourceAttr, p.attachSpanAndTraceID) + resourceAttrKey := p.buildResourceAttrKey(aggregationMeta.serviceName, aggregationMeta.resourceAttr) - p.cacheMetricKey(span, mKey, resourceAttr) - p.cacheResourceAttrKey(serviceName, resourceAttr, resourceAttrKey) - p.updateCallMetrics(resourceAttrKey, mKey) - p.updateLatencyMetrics(resourceAttrKey, mKey, latencyInMilliseconds, index) - p.updateLatencyExemplars(resourceAttrKey, mKey, latencyInMilliseconds, span.TraceID()) + p.cacheMetricKey(span, mKey, aggregationMeta.resourceAttr) + p.cacheResourceAttrKey(aggregationMeta.serviceName, aggregationMeta.resourceAttr, resourceAttrKey) + p.updateCallMetrics(resourceAttrKey, mKey, aggregationMeta.instrLibName) + p.updateLatencyMetrics(resourceAttrKey, mKey, latencyInMilliseconds, index, aggregationMeta.instrLibName) + p.updateLatencyExemplars(resourceAttrKey, mKey, latencyInMilliseconds, span.TraceID(), aggregationMeta.instrLibName) } // updateCallMetrics increments the call count for the given metric key. -func (p *processorImp) updateCallMetrics(rKey resourceKey, mKey metricKey) { +func (p *processorImp) updateCallMetrics(rKey resourceKey, mKey metricKey, iKey instrLibKey) { if _, ok := p.callSum[rKey]; !ok { - p.callSum[rKey] = make(map[metricKey]int64) + p.callSum[rKey] = make(map[instrLibKey]map[metricKey]int64) } - p.callSum[rKey][mKey]++ + if _, ok := p.callSum[rKey][iKey]; !ok { + p.callSum[rKey][iKey] = make(map[metricKey]int64) + } + + p.callSum[rKey][iKey][mKey]++ } func (p *processorImp) reset() { @@ -470,20 +505,24 @@ func (p *processorImp) reset() { // resetAccumulatedMetrics resets the internal maps used to store created metric data. Also purge the cache for // metricKeyToDimensions. func (p *processorImp) resetAccumulatedMetrics() { - p.callSum = make(map[resourceKey]map[metricKey]int64) - p.latencyCount = make(map[resourceKey]map[metricKey]uint64) - p.latencySum = make(map[resourceKey]map[metricKey]float64) - p.latencyBucketCounts = make(map[resourceKey]map[metricKey][]uint64) + p.callSum = make(map[resourceKey]map[instrLibKey]map[metricKey]int64) + p.latencyCount = make(map[resourceKey]map[instrLibKey]map[metricKey]uint64) + p.latencySum = make(map[resourceKey]map[instrLibKey]map[metricKey]float64) + p.latencyBucketCounts = make(map[resourceKey]map[instrLibKey]map[metricKey][]uint64) p.metricKeyToDimensions.Purge() p.resourceKeyToDimensions.Purge() } // updateLatencyExemplars sets the histogram exemplars for the given resource and metric key and append the exemplar data. -func (p *processorImp) updateLatencyExemplars(rKey resourceKey, mKey metricKey, value float64, traceID pdata.TraceID) { - rled, ok := p.latencyExemplarsData[rKey] +func (p *processorImp) updateLatencyExemplars(rKey resourceKey, mKey metricKey, value float64, traceID pdata.TraceID, instrLibName instrLibKey) { + if _, ok := p.latencyExemplarsData[rKey]; !ok { + p.latencyExemplarsData[rKey] = make(map[instrLibKey]map[metricKey][]exemplarData) + } + + rled, ok := p.latencyExemplarsData[rKey][instrLibName] if !ok { rled = make(map[metricKey][]exemplarData) - p.latencyExemplarsData[rKey] = rled + p.latencyExemplarsData[rKey][instrLibName] = rled } rled[mKey] = append(rled[mKey], exemplarData{ @@ -496,32 +535,40 @@ func (p *processorImp) updateLatencyExemplars(rKey resourceKey, mKey metricKey, // the data structure. An exemplar is a punctual value that exists at specific moment in time // and should be not considered like a metrics that persist over time. func (p *processorImp) resetExemplarData() { - p.latencyExemplarsData = make(map[resourceKey]map[metricKey][]exemplarData) + p.latencyExemplarsData = make(map[resourceKey]map[instrLibKey]map[metricKey][]exemplarData) } // updateLatencyMetrics increments the histogram counts for the given metric key and bucket index. -func (p *processorImp) updateLatencyMetrics(rKey resourceKey, mKey metricKey, latency float64, index int) { +func (p *processorImp) updateLatencyMetrics(rKey resourceKey, mKey metricKey, latency float64, index int, instrLibName instrLibKey) { + // update latency bucket counts if _, ok := p.latencyBucketCounts[rKey]; !ok { - p.latencyBucketCounts[rKey] = make(map[metricKey][]uint64) + p.latencyBucketCounts[rKey] = make(map[instrLibKey]map[metricKey][]uint64) } - - if _, ok := p.latencyBucketCounts[rKey][mKey]; !ok { - p.latencyBucketCounts[rKey][mKey] = make([]uint64, len(p.latencyBounds)) + if _, ok := p.latencyBucketCounts[rKey][instrLibName]; !ok { + p.latencyBucketCounts[rKey][instrLibName] = make(map[metricKey][]uint64) } + if _, ok := p.latencyBucketCounts[rKey][instrLibName][mKey]; !ok { + p.latencyBucketCounts[rKey][instrLibName][mKey] = make([]uint64, len(p.latencyBounds)) + } + p.latencyBucketCounts[rKey][instrLibName][mKey][index]++ - p.latencyBucketCounts[rKey][mKey][index]++ - - if _, ok := p.latencySum[rKey]; ok { - p.latencySum[rKey][mKey] += latency - } else { - p.latencySum[rKey] = map[metricKey]float64{mKey: latency} + // update latency sum + if _, ok := p.latencySum[rKey]; !ok { + p.latencySum[rKey] = make(map[instrLibKey]map[metricKey]float64) } + if _, ok := p.latencySum[rKey][instrLibName]; !ok { + p.latencySum[rKey][instrLibName] = make(map[metricKey]float64) + } + p.latencySum[rKey][instrLibName][mKey] += latency - if _, ok := p.latencyCount[rKey]; ok { - p.latencyCount[rKey][mKey]++ - } else { - p.latencyCount[rKey] = map[metricKey]uint64{mKey: 1} + // update latency count + if _, ok := p.latencyCount[rKey]; !ok { + p.latencyCount[rKey] = make(map[instrLibKey]map[metricKey]uint64) + } + if _, ok := p.latencyCount[rKey][instrLibName]; !ok { + p.latencyCount[rKey][instrLibName] = make(map[metricKey]uint64) } + p.latencyCount[rKey][instrLibName][mKey]++ } func (p *processorImp) buildDimensionKVs(span pdata.Span, optionalDims []Dimension, resourceAttrs pdata.AttributeMap, attachSpanAndTraceID bool) pdata.AttributeMap { diff --git a/processor/spanmetricsprocessor/processor_test.go b/processor/spanmetricsprocessor/processor_test.go index 639a66a1bd7d..a5d894464568 100644 --- a/processor/spanmetricsprocessor/processor_test.go +++ b/processor/spanmetricsprocessor/processor_test.go @@ -79,8 +79,9 @@ type metricDataPoint interface { } type serviceSpans struct { - serviceName string - spans []span + serviceName string + spans []span + instrumentationLibraryName string } type span struct { @@ -216,7 +217,7 @@ func TestProcessorConsumeTracesErrors(t *testing.T) { tcon := &mocks.TracesConsumer{} tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(tc.consumeTracesErr) - p := newProcessorImp(mexp, tcon, nil, cumulative, t, false) + p := newProcessorImp(mexp, tcon, nil, cumulative, t, false, false) traces := buildSampleTrace() @@ -272,7 +273,7 @@ func TestProcessorConsumeTracesConcurrentSafe(t *testing.T) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := "defaultNullValue" - p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, t, false) + p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, t, false, false) for _, traces := range tc.traces { // Test @@ -340,7 +341,7 @@ func TestProcessorConsumeTraces(t *testing.T) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := "defaultNullValue" - p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, t, false) + p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, t, false, false) for _, traces := range tc.traces { // Test @@ -354,33 +355,102 @@ func TestProcessorConsumeTraces(t *testing.T) { } } -func TestResourceCopying(t *testing.T) { - // Prepare +func TestInheritInstrumentationLibraryName(t *testing.T) { mexp := &mocks.MetricsExporter{} tcon := &mocks.TracesConsumer{} mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pdata.Metrics) bool { rm := input.ResourceMetrics() require.Equal(t, 2, rm.Len()) + rmA := findResourceMetricsHelper(t, rm, "service-a") + assert.NotEmpty(t, rmA, "resource metric containing service name `service-a` not found") + rmB := findResourceMetricsHelper(t, rm, "service-b") + assert.NotEmpty(t, rmB, "resource metric containing service name `service-b` not found") - var rmA, rmB pdata.ResourceMetrics - rm0 := rm.At(0) - rm1 := rm.At(1) - serviceName, ok := getServiceName(rm0) - assert.True(t, ok, "should get service name from resourceMetric") + assert.Equal(t, "service-a-instrumentation-library", rmA.InstrumentationLibraryMetrics().At(0).InstrumentationLibrary().Name()) + assert.Equal(t, "service-b-instrumentation-library", rmB.InstrumentationLibraryMetrics().At(0).InstrumentationLibrary().Name()) - if serviceName == "service-a" { - rmA = rm0 - rmB = rm1 - } else { - rmB = rm0 - rmA = rm1 + return true + })).Return(nil) + + tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) + + defaultNullValue := "defaultNullValue" + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, false, true) + + traces := buildSampleTrace() + + // Test + ctx := metadata.NewIncomingContext(context.Background(), nil) + err := p.ConsumeTraces(ctx, traces) + + // Verify + assert.NoError(t, err) +} + +func TestDefaultInstrumentationLibraryName(t *testing.T) { + mexp := &mocks.MetricsExporter{} + tcon := &mocks.TracesConsumer{} + + mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pdata.Metrics) bool { + rm := input.ResourceMetrics() + require.Equal(t, 2, rm.Len()) + rmA := findResourceMetricsHelper(t, rm, "service-a") + assert.NotEmpty(t, rmA, "resource metric containing service name `service-a` not found") + rmB := findResourceMetricsHelper(t, rm, "service-b") + assert.NotEmpty(t, rmB, "resource metric containing service name `service-b` not found") + assert.Equal(t, instrumentationLibraryName, rmA.InstrumentationLibraryMetrics().At(0).InstrumentationLibrary().Name()) + assert.Equal(t, instrumentationLibraryName, rmB.InstrumentationLibraryMetrics().At(0).InstrumentationLibrary().Name()) + + return true + })).Return(nil) + + tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) + + defaultNullValue := "defaultNullValue" + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, false, false) + + traces := buildSampleTrace() + + // Test + ctx := metadata.NewIncomingContext(context.Background(), nil) + err := p.ConsumeTraces(ctx, traces) + + // Verify + assert.NoError(t, err) +} + +func findResourceMetricsHelper(t *testing.T, rm pdata.ResourceMetricsSlice, serviceNameExpected string) pdata.ResourceMetrics { + for i := 0; i < rm.Len(); i++ { + serviceName, ok := getServiceName(rm.At(i)) + assert.True(t, ok, "resourceMetrics should always have service name") + if serviceName == serviceNameExpected { + return rm.At(i) } + } + + return pdata.ResourceMetrics{} +} + +func TestResourceCopying(t *testing.T) { + // Prepare + mexp := &mocks.MetricsExporter{} + tcon := &mocks.TracesConsumer{} + + mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pdata.Metrics) bool { + rm := input.ResourceMetrics() + require.Equal(t, 2, rm.Len()) + rmA := findResourceMetricsHelper(t, rm, "service-a") + assert.NotEmpty(t, rmA, "resource metric containing service name `service-a` not found") + rmB := findResourceMetricsHelper(t, rm, "service-b") + assert.NotEmpty(t, rmB, "resource metric containing service name `service-b` not found") require.Equal(t, 4, rmA.Resource().Attributes().Len()) - require.Equal(t, 4, rmA.InstrumentationLibraryMetrics().At(0).Metrics().Len()) + require.Equal(t, 2, rmA.InstrumentationLibraryMetrics().At(0).Metrics().Len()) + require.Equal(t, 2, rmA.InstrumentationLibraryMetrics().At(1).Metrics().Len()) require.Equal(t, 2, rmB.Resource().Attributes().Len()) - require.Equal(t, 2, rmB.InstrumentationLibraryMetrics().At(0).Metrics().Len()) + require.Equal(t, 1, rmB.InstrumentationLibraryMetrics().At(0).Metrics().Len()) + require.Equal(t, 1, rmB.InstrumentationLibraryMetrics().At(1).Metrics().Len()) wantResourceAttrServiceA := map[string]string{ resourceAttr1: "1", @@ -424,7 +494,7 @@ func TestResourceCopying(t *testing.T) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := "defaultNullValue" - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, false) + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, false, false) traces := buildSampleTrace() traces.ResourceSpans().At(0).Resource().Attributes().Insert(resourceAttr1, pdata.NewAttributeValueString("1")) @@ -464,7 +534,7 @@ func TestProcessorConsumeTracesWithSpanAndTraceID(t *testing.T) { defaultNullValue := "defaultNullValue" - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, true) + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, true, false) mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pdata.Metrics) bool { return verifyConsumeMetricsInputCumulative(t, input, p.attachSpanAndTraceID, expectedSpanAndTraceIDs) @@ -486,7 +556,7 @@ func TestMetricKeyCache(t *testing.T) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := "defaultNullValue" - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, false) + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, false, false) traces := buildSampleTrace() @@ -519,7 +589,7 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := "defaultNullValue" - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, b, false) + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, b, true, true) traces := buildSampleTrace() @@ -530,7 +600,7 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) { } } -func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, defaultNullValue *string, temporality string, tb testing.TB, attachSpanAndTraceID bool) *processorImp { +func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, defaultNullValue *string, temporality string, tb testing.TB, attachSpanAndTraceID bool, inheritInstrumentationLibraryName bool) *processorImp { localDefaultNotInSpanAttrVal := defaultNotInSpanAttrVal // use size 2 for LRU cache for testing purpose metricKeyToDimensions, err := cache.NewCache(DimensionsCacheSize) @@ -550,12 +620,12 @@ func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, de nextConsumer: tcon, startTime: time.Now(), - callSum: make(map[resourceKey]map[metricKey]int64), - latencySum: make(map[resourceKey]map[metricKey]float64), - latencyCount: make(map[resourceKey]map[metricKey]uint64), - latencyBucketCounts: make(map[resourceKey]map[metricKey][]uint64), + callSum: make(map[resourceKey]map[instrLibKey]map[metricKey]int64), + latencySum: make(map[resourceKey]map[instrLibKey]map[metricKey]float64), + latencyCount: make(map[resourceKey]map[instrLibKey]map[metricKey]uint64), + latencyBucketCounts: make(map[resourceKey]map[instrLibKey]map[metricKey][]uint64), latencyBounds: defaultLatencyHistogramBucketsMs, - latencyExemplarsData: make(map[resourceKey]map[metricKey][]exemplarData), + latencyExemplarsData: make(map[resourceKey]map[instrLibKey]map[metricKey][]exemplarData), dimensions: []Dimension{ // Set nil defaults to force a lookup for the attribute in the span. {stringAttrName, nil}, @@ -579,8 +649,9 @@ func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, de {notInSpanResourceAttr0, &localDefaultNotInSpanAttrVal}, {notInSpanResourceAttr1, nil}, }, - resourceKeyToDimensions: resourceKeyToDimensions, - metricKeyToDimensions: metricKeyToDimensions, + resourceKeyToDimensions: resourceKeyToDimensions, + metricKeyToDimensions: metricKeyToDimensions, + inheritInstrumentationLibraryName: inheritInstrumentationLibraryName, } } @@ -631,21 +702,29 @@ func verifyConsumeMetricsInput(t testing.TB, input pdata.Metrics, expectedTempor ilmA := rmA.InstrumentationLibraryMetrics() - require.Equal(t, 1, ilmA.Len()) + require.Equal(t, 2, ilmA.Len()) assert.Equal(t, instrumentationLibraryName, ilmA.At(0).InstrumentationLibrary().Name()) - mA := ilmA.At(0).Metrics() - require.Equal(t, 4, mA.Len()) + mA1 := ilmA.At(0).Metrics() + require.Equal(t, 2, mA1.Len()) + mA2 := ilmA.At(1).Metrics() + require.Equal(t, 2, mA2.Len()) ilmB := rmB.InstrumentationLibraryMetrics() - require.Equal(t, 1, ilmB.Len()) + require.Equal(t, 2, ilmB.Len()) assert.Equal(t, instrumentationLibraryName, ilmB.At(0).InstrumentationLibrary().Name()) - mB := ilmB.At(0).Metrics() - require.Equal(t, 2, mB.Len()) + mB1 := ilmB.At(0).Metrics() + require.Equal(t, 1, mB1.Len()) + mB2 := ilmB.At(1).Metrics() + require.Equal(t, 1, mB2.Len()) - verifyMetrics(mA, expectedTemporality, numCumulativeConsumptions, 2, t, attachSpanAndTraceID, expectedSpanAndTraceIDs) - verifyMetrics(mB, expectedTemporality, numCumulativeConsumptions, 1, t, attachSpanAndTraceID, expectedSpanAndTraceIDs) + // mA1 and mbB1 contains "calls_total" metrics + // mA2 and mB2 contains "latency" metrics + verifyMetrics(mA1, expectedTemporality, numCumulativeConsumptions, 2, t, attachSpanAndTraceID, expectedSpanAndTraceIDs) + verifyMetrics(mA2, expectedTemporality, numCumulativeConsumptions, 0, t, attachSpanAndTraceID, expectedSpanAndTraceIDs) + verifyMetrics(mB1, expectedTemporality, numCumulativeConsumptions, 1, t, attachSpanAndTraceID, expectedSpanAndTraceIDs) + verifyMetrics(mB2, expectedTemporality, numCumulativeConsumptions, 0, t, attachSpanAndTraceID, expectedSpanAndTraceIDs) assert.Empty(t, expectedSpanAndTraceIDs, "Did not see all expected span and trace IDs in metric. Missing: ", expectedSpanAndTraceIDs) @@ -784,7 +863,8 @@ func buildSampleTrace() pdata.Traces { initServiceSpans( serviceSpans{ - serviceName: "service-a", + serviceName: "service-a", + instrumentationLibraryName: "service-a-instrumentation-library", spans: []span{ { operation: "/ping", @@ -804,7 +884,8 @@ func buildSampleTrace() pdata.Traces { }, traces.ResourceSpans().AppendEmpty()) initServiceSpans( serviceSpans{ - serviceName: "service-b", + serviceName: "service-b", + instrumentationLibraryName: "service-b-instrumentation-library", spans: []span{ { operation: "/ping", @@ -828,6 +909,10 @@ func initServiceSpans(serviceSpans serviceSpans, spans pdata.ResourceSpans) { spans.Resource().Attributes().InsertString(regionResourceAttrName, sampleRegion) ils := spans.InstrumentationLibrarySpans().AppendEmpty() + if serviceSpans.instrumentationLibraryName != "" { + ils.InstrumentationLibrary().SetName(serviceSpans.instrumentationLibraryName) + } + for _, span := range serviceSpans.spans { initSpan(span, ils.Spans().AppendEmpty()) } @@ -1113,7 +1198,7 @@ func TestTraceWithoutServiceNameDoesNotGenerateMetrics(t *testing.T) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := "defaultNullValue" - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, false) + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, false, false) trace := pdata.NewTraces() @@ -1179,6 +1264,7 @@ func TestProcessorUpdateLatencyExemplars(t *testing.T) { cfg := factory.CreateDefaultConfig().(*Config) traces := buildSampleTrace() traceID := traces.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).TraceID() + traceInstrLibName := instrLibKey(instrumentationLibraryName) mKey := metricKey("metricKey") rKey := resourceKey("resourceKey") next := new(consumertest.TracesSink) @@ -1186,12 +1272,12 @@ func TestProcessorUpdateLatencyExemplars(t *testing.T) { value := float64(42) // ----- call ------------------------------------------------------------- - p.updateLatencyExemplars(rKey, mKey, value, traceID) + p.updateLatencyExemplars(rKey, mKey, value, traceID, traceInstrLibName) // ----- verify ----------------------------------------------------------- assert.NoError(t, err) - assert.NotEmpty(t, p.latencyExemplarsData[rKey][mKey]) - assert.Equal(t, p.latencyExemplarsData[rKey][mKey][0], exemplarData{traceID: traceID, value: value}) + assert.NotEmpty(t, p.latencyExemplarsData[rKey][traceInstrLibName][mKey]) + assert.Equal(t, p.latencyExemplarsData[rKey][traceInstrLibName][mKey][0], exemplarData{traceID: traceID, value: value}) } func TestProcessorResetExemplarData(t *testing.T) { @@ -1201,6 +1287,7 @@ func TestProcessorResetExemplarData(t *testing.T) { mKey := metricKey("metricKey") rKey := resourceKey("resourceKey") + libKey := instrLibKey(instrumentationLibraryName) next := new(consumertest.TracesSink) p, err := newProcessor(zaptest.NewLogger(t), cfg, next) @@ -1209,7 +1296,7 @@ func TestProcessorResetExemplarData(t *testing.T) { // ----- verify ----------------------------------------------------------- assert.NoError(t, err) - assert.Empty(t, p.latencyExemplarsData[rKey][mKey]) + assert.Empty(t, p.latencyExemplarsData[rKey][libKey][mKey]) } func TestBuildResourceAttrKey(t *testing.T) { diff --git a/processor/spanmetricsprocessor/testdata/config-2-pipelines.yaml b/processor/spanmetricsprocessor/testdata/config-2-pipelines.yaml index 853d997fe8c1..bdfbffdc7dbd 100644 --- a/processor/spanmetricsprocessor/testdata/config-2-pipelines.yaml +++ b/processor/spanmetricsprocessor/testdata/config-2-pipelines.yaml @@ -29,6 +29,7 @@ processors: dimensions_cache_size: 500 # the max number of items in the `resource_key_to_dimensions_cache`. Default is 1000 resource_attributes_cache_size: 300 + inherit_instrumentation_library_name: true attach_span_and_trace_id: true service: pipelines: diff --git a/processor/spanmetricsprocessor/testdata/config-3-pipelines.yaml b/processor/spanmetricsprocessor/testdata/config-3-pipelines.yaml index fe32ab635997..ef9f3fde7bf2 100644 --- a/processor/spanmetricsprocessor/testdata/config-3-pipelines.yaml +++ b/processor/spanmetricsprocessor/testdata/config-3-pipelines.yaml @@ -39,6 +39,7 @@ processors: spanmetrics: metrics_exporter: otlp/spanmetrics aggregation_temporality: "AGGREGATION_TEMPORALITY_CUMULATIVE" + inherit_instrumentation_library_name: false attach_span_and_trace_id: false service: