From cb6252e5ea98ff55aad1043c79710a557045dd12 Mon Sep 17 00:00:00 2001 From: Claire Fei Date: Thu, 13 Jan 2022 12:09:27 +1100 Subject: [PATCH 1/8] starts adding config; adds todos in processor --- processor/spanmetricsprocessor/README.md | 3 + processor/spanmetricsprocessor/config.go | 5 ++ processor/spanmetricsprocessor/config_test.go | 44 ++++++------ processor/spanmetricsprocessor/processor.go | 67 +++++++++++-------- 4 files changed, 72 insertions(+), 47 deletions(-) diff --git a/processor/spanmetricsprocessor/README.md b/processor/spanmetricsprocessor/README.md index 89587cd23f1a..20e2e8044c3e 100644 --- a/processor/spanmetricsprocessor/README.md +++ b/processor/spanmetricsprocessor/README.md @@ -70,6 +70,9 @@ The following settings can be optionally configured: - `resource_attributes_cache_size`: the max number of items in the `resource_key_to_dimensions_cache`. If not provided, will use default value size `1000`. + +- `inherit_instrumentation_library_name`: defines whether the metrics generated from spans should inherit the same instrumentation library name as the span. If not provided, will use default value of `false` which will define the instrumentation library name as `spanmetricsprocessor` on metrics. + ## Examples The following is a simple example usage of the spanmetrics processor. diff --git a/processor/spanmetricsprocessor/config.go b/processor/spanmetricsprocessor/config.go index ec80caa01f00..762764bb768b 100644 --- a/processor/spanmetricsprocessor/config.go +++ b/processor/spanmetricsprocessor/config.go @@ -69,6 +69,11 @@ type Config struct { // memory growing indefinitely over the lifetime of the collector. // Optional. See defaultResourceAttributesCacheSize in processor.go for the default value. ResourceAttributesCacheSize int `mapstructure:"resource_attributes_cache_size"` + + // InheritInstrumentationLibraryName defines whether metrics generated from spans should inherit + // the instrumentation library name from the span. + // Optional. The default value is `false` which will define the instrumentation library name on metrics as `spanmetricsprocessor`. + InheritInstrumentationLibraryName bool `mapstructure:"inherit_instrumentation_library_name"` } // GetAggregationTemporality converts the string value given in the config into a MetricAggregationTemporality. diff --git a/processor/spanmetricsprocessor/config_test.go b/processor/spanmetricsprocessor/config_test.go index 4e5d2f41d124..fcf4c4d57694 100644 --- a/processor/spanmetricsprocessor/config_test.go +++ b/processor/spanmetricsprocessor/config_test.go @@ -38,28 +38,31 @@ func TestLoadConfig(t *testing.T) { defaultMethod := "GET" defaultRegion := "us-east-1" testcases := []struct { - configFile string - wantMetricsExporter string - wantLatencyHistogramBuckets []time.Duration - wantDimensions []Dimension - wantDimensionsCacheSize int - wantResourceAttributes []Dimension - wantResourceAttributesCacheSize int - wantAggregationTemporality string + configFile string + wantMetricsExporter string + wantLatencyHistogramBuckets []time.Duration + wantDimensions []Dimension + wantDimensionsCacheSize int + wantResourceAttributes []Dimension + wantResourceAttributesCacheSize int + wantAggregationTemporality string + wantInheritInstrumentationLibraryName bool }{ { - configFile: "config-2-pipelines.yaml", - wantMetricsExporter: "prometheus", - wantAggregationTemporality: cumulative, - wantDimensionsCacheSize: 500, - wantResourceAttributesCacheSize: 300, + configFile: "config-2-pipelines.yaml", + wantMetricsExporter: "prometheus", + wantAggregationTemporality: cumulative, + wantDimensionsCacheSize: 500, + wantResourceAttributesCacheSize: 300, + wantInheritInstrumentationLibraryName: true, }, { - configFile: "config-3-pipelines.yaml", - wantMetricsExporter: "otlp/spanmetrics", - wantAggregationTemporality: cumulative, - wantDimensionsCacheSize: defaultDimensionsCacheSize, - wantResourceAttributesCacheSize: defaultResourceAttributesCacheSize, + configFile: "config-3-pipelines.yaml", + wantMetricsExporter: "otlp/spanmetrics", + wantAggregationTemporality: cumulative, + wantDimensionsCacheSize: defaultDimensionsCacheSize, + wantResourceAttributesCacheSize: defaultResourceAttributesCacheSize, + wantInheritInstrumentationLibraryName: false, }, { configFile: "config-full.yaml", @@ -82,8 +85,9 @@ func TestLoadConfig(t *testing.T) { {"region", &defaultRegion}, {"host_id", nil}, }, - wantResourceAttributesCacheSize: 3000, - wantAggregationTemporality: delta, + wantResourceAttributesCacheSize: 3000, + wantAggregationTemporality: delta, + wantInheritInstrumentationLibraryName: false, }, } for _, tc := range testcases { diff --git a/processor/spanmetricsprocessor/processor.go b/processor/spanmetricsprocessor/processor.go index bc350f4c4389..d1419910e329 100644 --- a/processor/spanmetricsprocessor/processor.go +++ b/processor/spanmetricsprocessor/processor.go @@ -68,6 +68,9 @@ type metricKey string // resourceKey is used to carry the stringified resource attributes type resourceKey string +// instrLibKey is used to carry the stringified instrumentation library name +type instrLibKey string + type processorImp struct { // ConsumeTraces() of each instance might be called concurrently from its upstream component in the pipeline. // As this processor is stateful. Due to the nature of its logic, the concurrent executions of ConsumeTraces() will @@ -92,17 +95,20 @@ type processorImp struct { callSum map[resourceKey]map[metricKey]int64 // Latency histogram. - latencyCount map[resourceKey]map[metricKey]uint64 - latencySum map[resourceKey]map[metricKey]float64 - latencyBucketCounts map[resourceKey]map[metricKey][]uint64 + latencyCount map[resourceKey]map[instrLibKey]map[metricKey]uint64 + latencySum map[resourceKey]map[instrLibKey]map[metricKey]float64 + latencyBucketCounts map[resourceKey]map[instrLibKey]map[metricKey][]uint64 latencyBounds []float64 - latencyExemplarsData map[resourceKey]map[metricKey][]exemplarData + latencyExemplarsData map[resourceKey]map[instrLibKey]map[metricKey][]exemplarData // An LRU cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values: // e.g. { "foo/barOK": { "serviceName": "foo", "operation": "/bar", "status_code": "OK" }} metricKeyToDimensions *cache.Cache // An LRU cache of resourceattributekey-value maps keyed by a unique identifier formed by a concatenation of its values. resourceKeyToDimensions *cache.Cache + + // Defines whether metrics should inherit instrumentation library name from span + inheritInstrumentationLibraryName bool } func newProcessor(logger *zap.Logger, config config.Processor, nextConsumer consumer.Traces) (*processorImp, error) { @@ -137,20 +143,21 @@ func newProcessor(logger *zap.Logger, config config.Processor, nextConsumer cons } return &processorImp{ - logger: logger, - config: *pConfig, - startTime: time.Now(), - callSum: make(map[resourceKey]map[metricKey]int64), - latencyBounds: bounds, - latencySum: make(map[resourceKey]map[metricKey]float64), - latencyCount: make(map[resourceKey]map[metricKey]uint64), - latencyBucketCounts: make(map[resourceKey]map[metricKey][]uint64), - latencyExemplarsData: make(map[resourceKey]map[metricKey][]exemplarData), - nextConsumer: nextConsumer, - dimensions: pConfig.Dimensions, - resourceAttributes: pConfig.ResourceAttributes, - resourceKeyToDimensions: resourceKeyToDimensionsCache, - metricKeyToDimensions: metricKeyToDimensionsCache, + logger: logger, + config: *pConfig, + startTime: time.Now(), + callSum: make(map[resourceKey]map[instrLibKey]map[metricKey]int64), + latencyBounds: bounds, + latencySum: make(map[resourceKey]map[instrLibKey]map[metricKey]float64), + latencyCount: make(map[resourceKey]map[instrLibKey]map[metricKey]uint64), + latencyBucketCounts: make(map[resourceKey]map[instrLibKey]map[metricKey][]uint64), + latencyExemplarsData: make(map[resourceKey]map[instrLibKey]map[metricKey][]exemplarData), + nextConsumer: nextConsumer, + dimensions: pConfig.Dimensions, + resourceAttributes: pConfig.ResourceAttributes, + resourceKeyToDimensions: resourceKeyToDimensionsCache, + metricKeyToDimensions: metricKeyToDimensionsCache, + inheritInstrumentationLibraryName: pConfig.InheritInstrumentationLibraryName, }, nil } @@ -297,6 +304,7 @@ func (p *processorImp) buildMetrics() (*pdata.Metrics, error) { }) ilm := rm.InstrumentationLibraryMetrics().AppendEmpty() + //TODO: CLAIRE: this should be done inside the functions now ilm.InstrumentationLibrary().SetName(instrumentationLibraryName) // build metrics per resource @@ -320,6 +328,7 @@ func (p *processorImp) buildMetrics() (*pdata.Metrics, error) { // collectLatencyMetrics collects the raw latency metrics, writing the data // into the given instrumentation library metrics. func (p *processorImp) collectLatencyMetrics(ilm pdata.InstrumentationLibraryMetrics, resAttrKey resourceKey) error { + // TODO: CLAIRE for mKey := range p.latencyCount[resAttrKey] { mLatency := ilm.Metrics().AppendEmpty() mLatency.SetDataType(pdata.MetricDataTypeHistogram) @@ -352,6 +361,7 @@ func (p *processorImp) collectLatencyMetrics(ilm pdata.InstrumentationLibraryMet // collectCallMetrics collects the raw call count metrics, writing the data // into the given instrumentation library metrics. func (p *processorImp) collectCallMetrics(ilm pdata.InstrumentationLibraryMetrics, resAttrKey resourceKey) error { + // TODO: CLAIRE for mKey := range p.callSum[resAttrKey] { mCalls := ilm.Metrics().AppendEmpty() mCalls.SetDataType(pdata.MetricDataTypeSum) @@ -409,15 +419,16 @@ func (p *processorImp) aggregateMetricsForServiceSpans(rspans pdata.ResourceSpan ilsSlice := rspans.InstrumentationLibrarySpans() for j := 0; j < ilsSlice.Len(); j++ { ils := ilsSlice.At(j) + instrLibName := ils.InstrumentationLibrary().Name() spans := ils.Spans() for k := 0; k < spans.Len(); k++ { span := spans.At(k) - p.aggregateMetricsForSpan(serviceName, span, rspans.Resource().Attributes()) + p.aggregateMetricsForSpan(serviceName, span, rspans.Resource().Attributes(), instrLibName) } } } -func (p *processorImp) aggregateMetricsForSpan(serviceName string, span pdata.Span, resourceAttr pdata.AttributeMap) { +func (p *processorImp) aggregateMetricsForSpan(serviceName string, span pdata.Span, resourceAttr pdata.AttributeMap, instrLibName string) { latencyInMilliseconds := float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds()) // Binary search to find the latencyInMilliseconds bucket index. @@ -428,18 +439,18 @@ func (p *processorImp) aggregateMetricsForSpan(serviceName string, span pdata.Sp p.cacheMetricKey(span, mKey, resourceAttr) p.cacheResourceAttrKey(serviceName, resourceAttr, resourceAttrKey) - p.updateCallMetrics(resourceAttrKey, mKey) - p.updateLatencyMetrics(resourceAttrKey, mKey, latencyInMilliseconds, index) - p.updateLatencyExemplars(resourceAttrKey, mKey, latencyInMilliseconds, span.TraceID()) + p.updateCallMetrics(resourceAttrKey, mKey, instrLibName) + p.updateLatencyMetrics(resourceAttrKey, mKey, latencyInMilliseconds, index, instrLibName) + p.updateLatencyExemplars(resourceAttrKey, mKey, latencyInMilliseconds, span.TraceID(), instrLibName) } // updateCallMetrics increments the call count for the given metric key. -func (p *processorImp) updateCallMetrics(rKey resourceKey, mKey metricKey) { +func (p *processorImp) updateCallMetrics(rKey resourceKey, mKey metricKey, instrLibName string) { if _, ok := p.callSum[rKey]; !ok { p.callSum[rKey] = make(map[metricKey]int64) } - p.callSum[rKey][mKey]++ + p.callSum[rKey][instrLibName][mKey]++ } func (p *processorImp) reset() { @@ -466,7 +477,8 @@ func (p *processorImp) resetAccumulatedMetrics() { } // updateLatencyExemplars sets the histogram exemplars for the given resource and metric key and append the exemplar data. -func (p *processorImp) updateLatencyExemplars(rKey resourceKey, mKey metricKey, value float64, traceID pdata.TraceID) { +func (p *processorImp) updateLatencyExemplars(rKey resourceKey, mKey metricKey, value float64, traceID pdata.TraceID, instrLibName string) { + // TODO: CLAIRE: FIX THIS: instrLibName rled, ok := p.latencyExemplarsData[rKey] if !ok { rled = make(map[metricKey][]exemplarData) @@ -487,7 +499,8 @@ func (p *processorImp) resetExemplarData() { } // updateLatencyMetrics increments the histogram counts for the given metric key and bucket index. -func (p *processorImp) updateLatencyMetrics(rKey resourceKey, mKey metricKey, latency float64, index int) { +func (p *processorImp) updateLatencyMetrics(rKey resourceKey, mKey metricKey, latency float64, index int, instrLibName string) { + // TODO: CLAIRE: FIX THIS if _, ok := p.latencyBucketCounts[rKey]; !ok { p.latencyBucketCounts[rKey] = make(map[metricKey][]uint64) } From 6335480bb182c9680b85c267bb90894db93645fb Mon Sep 17 00:00:00 2001 From: Claire Fei Date: Thu, 13 Jan 2022 12:09:58 +1100 Subject: [PATCH 2/8] adds config --- processor/spanmetricsprocessor/testdata/config-2-pipelines.yaml | 1 + processor/spanmetricsprocessor/testdata/config-3-pipelines.yaml | 1 + 2 files changed, 2 insertions(+) diff --git a/processor/spanmetricsprocessor/testdata/config-2-pipelines.yaml b/processor/spanmetricsprocessor/testdata/config-2-pipelines.yaml index f22b76adb1e7..816257923aab 100644 --- a/processor/spanmetricsprocessor/testdata/config-2-pipelines.yaml +++ b/processor/spanmetricsprocessor/testdata/config-2-pipelines.yaml @@ -30,6 +30,7 @@ processors: dimensions_cache_size: 500 # the max number of items in the `resource_key_to_dimensions_cache`. Default is 1000 resource_attributes_cache_size: 300 + inherit_instrumentation_library_name: true service: pipelines: traces: diff --git a/processor/spanmetricsprocessor/testdata/config-3-pipelines.yaml b/processor/spanmetricsprocessor/testdata/config-3-pipelines.yaml index 089cc87b4026..a31f47383cfd 100644 --- a/processor/spanmetricsprocessor/testdata/config-3-pipelines.yaml +++ b/processor/spanmetricsprocessor/testdata/config-3-pipelines.yaml @@ -41,6 +41,7 @@ processors: spanmetrics: metrics_exporter: otlp/spanmetrics aggregation_temporality: "AGGREGATION_TEMPORALITY_CUMULATIVE" + inherit_instrumentation_library_name: false service: pipelines: From 002affa8baee17bba7e1e75fe94908943d9f0c70 Mon Sep 17 00:00:00 2001 From: Claire Fei Date: Thu, 13 Jan 2022 13:54:13 +1100 Subject: [PATCH 3/8] work on adding instrumentation library name to aggregation step of consume traces --- processor/spanmetricsprocessor/processor.go | 62 +++++++++++++-------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/processor/spanmetricsprocessor/processor.go b/processor/spanmetricsprocessor/processor.go index d1419910e329..7c6b25d5a1b2 100644 --- a/processor/spanmetricsprocessor/processor.go +++ b/processor/spanmetricsprocessor/processor.go @@ -92,7 +92,7 @@ type processorImp struct { startTime time.Time // Call & Error counts. - callSum map[resourceKey]map[metricKey]int64 + callSum map[resourceKey]map[instrLibKey]map[metricKey]int64 // Latency histogram. latencyCount map[resourceKey]map[instrLibKey]map[metricKey]uint64 @@ -428,7 +428,7 @@ func (p *processorImp) aggregateMetricsForServiceSpans(rspans pdata.ResourceSpan } } -func (p *processorImp) aggregateMetricsForSpan(serviceName string, span pdata.Span, resourceAttr pdata.AttributeMap, instrLibName string) { +func (p *processorImp) aggregateMetricsForSpan(serviceName string, span pdata.Span, resourceAttr pdata.AttributeMap, instrLibName instrLibKey) { latencyInMilliseconds := float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds()) // Binary search to find the latencyInMilliseconds bucket index. @@ -445,9 +445,13 @@ func (p *processorImp) aggregateMetricsForSpan(serviceName string, span pdata.Sp } // updateCallMetrics increments the call count for the given metric key. -func (p *processorImp) updateCallMetrics(rKey resourceKey, mKey metricKey, instrLibName string) { +func (p *processorImp) updateCallMetrics(rKey resourceKey, mKey metricKey, instrLibName instrLibKey) { if _, ok := p.callSum[rKey]; !ok { - p.callSum[rKey] = make(map[metricKey]int64) + p.callSum[rKey] = make(map[instrLibKey]map[metricKey]int64) + } + + if _, ok := p.callSum[rKey][instrLibName]; !ok { + p.callSum[rKey][instrLibName] = make(map[metricKey]int64) } p.callSum[rKey][instrLibName][mKey]++ @@ -477,12 +481,15 @@ func (p *processorImp) resetAccumulatedMetrics() { } // updateLatencyExemplars sets the histogram exemplars for the given resource and metric key and append the exemplar data. -func (p *processorImp) updateLatencyExemplars(rKey resourceKey, mKey metricKey, value float64, traceID pdata.TraceID, instrLibName string) { - // TODO: CLAIRE: FIX THIS: instrLibName - rled, ok := p.latencyExemplarsData[rKey] +func (p *processorImp) updateLatencyExemplars(rKey resourceKey, mKey metricKey, value float64, traceID pdata.TraceID, instrLibName instrLibKey) { + if _, ok := p.latencyExemplarsData[rKey]; !ok { + p.latencyExemplarsData[rKey] = make(map[instrLibKey]map[metricKey][]exemplarData) + } + + rled, ok := p.latencyExemplarsData[rKey][instrLibName] if !ok { rled = make(map[metricKey][]exemplarData) - p.latencyExemplarsData[rKey] = rled + p.latencyExemplarsData[rKey][instrLibName] = rled } rled[mKey] = append(rled[mKey], exemplarData{ @@ -499,29 +506,36 @@ func (p *processorImp) resetExemplarData() { } // updateLatencyMetrics increments the histogram counts for the given metric key and bucket index. -func (p *processorImp) updateLatencyMetrics(rKey resourceKey, mKey metricKey, latency float64, index int, instrLibName string) { - // TODO: CLAIRE: FIX THIS +func (p *processorImp) updateLatencyMetrics(rKey resourceKey, mKey metricKey, latency float64, index int, instrLibName instrLibKey) { + // update latency bucket counts if _, ok := p.latencyBucketCounts[rKey]; !ok { - p.latencyBucketCounts[rKey] = make(map[metricKey][]uint64) + p.latencyBucketCounts[rKey] = make(map[instrLibKey]map[metricKey][]uint64) } - - if _, ok := p.latencyBucketCounts[rKey][mKey]; !ok { - p.latencyBucketCounts[rKey][mKey] = make([]uint64, len(p.latencyBounds)) + if _, ok := p.latencyBucketCounts[rKey][instrLibName]; !ok { + p.latencyBucketCounts[rKey][instrLibName] = make(map[metricKey][]uint64) } + if _, ok := p.latencyBucketCounts[rKey][instrLibName][mKey]; !ok { + p.latencyBucketCounts[rKey][instrLibName][mKey] = make([]uint64, len(p.latencyBounds)) + } + p.latencyBucketCounts[rKey][instrLibName][mKey][index]++ - p.latencyBucketCounts[rKey][mKey][index]++ - - if _, ok := p.latencySum[rKey]; ok { - p.latencySum[rKey][mKey] += latency - } else { - p.latencySum[rKey] = map[metricKey]float64{mKey: latency} + // update latency sum + if _, ok := p.latencySum[rKey]; !ok { + p.latencySum[rKey] = make(map[instrLibKey]map[metricKey]float64) + } + if _, ok := p.latencySum[rKey][instrLibName]; !ok { + p.latencySum[rKey][instrLibName] = make(map[metricKey]float64) } + p.latencySum[rKey][instrLibName][mKey] += latency - if _, ok := p.latencyCount[rKey]; ok { - p.latencyCount[rKey][mKey]++ - } else { - p.latencyCount[rKey] = map[metricKey]uint64{mKey: 1} + // update latency count + if _, ok := p.latencyCount[rKey]; !ok { + p.latencyCount[rKey] = make(map[instrLibKey]map[metricKey]uint64) + } + if _, ok := p.latencyCount[rKey][instrLibName]; !ok { + p.latencyCount[rKey][instrLibName] = make(map[metricKey]uint64) } + p.latencyCount[rKey][instrLibName][mKey]++ } func (p *processorImp) buildDimensionKVs(span pdata.Span, optionalDims []Dimension, resourceAttrs pdata.AttributeMap) pdata.AttributeMap { From bd901184a20d0381c50e6ee188d70b348e38bbba Mon Sep 17 00:00:00 2001 From: Claire Fei Date: Thu, 13 Jan 2022 13:56:50 +1100 Subject: [PATCH 4/8] fixes type cast instrLibKey --- processor/spanmetricsprocessor/processor.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/processor/spanmetricsprocessor/processor.go b/processor/spanmetricsprocessor/processor.go index 8fbdfa5a8450..f7bc8f6d5286 100644 --- a/processor/spanmetricsprocessor/processor.go +++ b/processor/spanmetricsprocessor/processor.go @@ -427,7 +427,7 @@ func (p *processorImp) aggregateMetricsForServiceSpans(rspans pdata.ResourceSpan ilsSlice := rspans.InstrumentationLibrarySpans() for j := 0; j < ilsSlice.Len(); j++ { ils := ilsSlice.At(j) - instrLibName := ils.InstrumentationLibrary().Name() + instrLibName := instrLibKey(ils.InstrumentationLibrary().Name()) spans := ils.Spans() for k := 0; k < spans.Len(); k++ { span := spans.At(k) @@ -480,10 +480,10 @@ func (p *processorImp) reset() { // resetAccumulatedMetrics resets the internal maps used to store created metric data. Also purge the cache for // metricKeyToDimensions. func (p *processorImp) resetAccumulatedMetrics() { - p.callSum = make(map[resourceKey]map[metricKey]int64) - p.latencyCount = make(map[resourceKey]map[metricKey]uint64) - p.latencySum = make(map[resourceKey]map[metricKey]float64) - p.latencyBucketCounts = make(map[resourceKey]map[metricKey][]uint64) + p.callSum = make(map[resourceKey]map[instrLibKey]map[metricKey]int64) + p.latencyCount = make(map[resourceKey]map[instrLibKey]map[metricKey]uint64) + p.latencySum = make(map[resourceKey]map[instrLibKey]map[metricKey]float64) + p.latencyBucketCounts = make(map[resourceKey]map[instrLibKey]map[metricKey][]uint64) p.metricKeyToDimensions.Purge() p.resourceKeyToDimensions.Purge() } @@ -510,7 +510,7 @@ func (p *processorImp) updateLatencyExemplars(rKey resourceKey, mKey metricKey, // the data structure. An exemplar is a punctual value that exists at specific moment in time // and should be not considered like a metrics that persist over time. func (p *processorImp) resetExemplarData() { - p.latencyExemplarsData = make(map[resourceKey]map[metricKey][]exemplarData) + p.latencyExemplarsData = make(map[resourceKey]map[instrLibKey]map[metricKey][]exemplarData) } // updateLatencyMetrics increments the histogram counts for the given metric key and bucket index. From cf04417955dfc5cd76bd7ecaaa4b64a1f65912eb Mon Sep 17 00:00:00 2001 From: Claire Fei Date: Fri, 14 Jan 2022 10:39:34 +1100 Subject: [PATCH 5/8] fii=nish logic; Fix tests; Add tests --- processor/spanmetricsprocessor/config_test.go | 1 + processor/spanmetricsprocessor/processor.go | 127 +++++++------ .../spanmetricsprocessor/processor_test.go | 174 +++++++++++++----- 3 files changed, 194 insertions(+), 108 deletions(-) diff --git a/processor/spanmetricsprocessor/config_test.go b/processor/spanmetricsprocessor/config_test.go index 569b7f996aa9..d0b0ba80d650 100644 --- a/processor/spanmetricsprocessor/config_test.go +++ b/processor/spanmetricsprocessor/config_test.go @@ -124,6 +124,7 @@ func TestLoadConfig(t *testing.T) { ResourceAttributes: tc.wantResourceAttributes, ResourceAttributesCacheSize: tc.wantResourceAttributesCacheSize, AggregationTemporality: tc.wantAggregationTemporality, + InheritInstrumentationLibraryName: tc.wantInheritInstrumentationLibraryName, }, cfg.Processors[config.NewID(typeStr)], ) diff --git a/processor/spanmetricsprocessor/processor.go b/processor/spanmetricsprocessor/processor.go index f7bc8f6d5286..ca1f654e87c2 100644 --- a/processor/spanmetricsprocessor/processor.go +++ b/processor/spanmetricsprocessor/processor.go @@ -305,21 +305,17 @@ func (p *processorImp) buildMetrics() (*pdata.Metrics, error) { return true }) - ilm := rm.InstrumentationLibraryMetrics().AppendEmpty() - //TODO: CLAIRE: this should be done inside the functions now - ilm.InstrumentationLibrary().SetName(instrumentationLibraryName) - // build metrics per resource resourceAttrKey, ok := key.(resourceKey) if !ok { return nil, errors.New("resource key type assertion failed") } - if err := p.collectCallMetrics(ilm, resourceAttrKey); err != nil { + if err := p.collectCallMetrics(rm, resourceAttrKey); err != nil { return nil, err } - if err := p.collectLatencyMetrics(ilm, resourceAttrKey); err != nil { + if err := p.collectLatencyMetrics(rm, resourceAttrKey); err != nil { return nil, err } @@ -329,66 +325,72 @@ func (p *processorImp) buildMetrics() (*pdata.Metrics, error) { // collectLatencyMetrics collects the raw latency metrics, writing the data // into the given instrumentation library metrics. -func (p *processorImp) collectLatencyMetrics(ilm pdata.InstrumentationLibraryMetrics, resAttrKey resourceKey) error { - // TODO: CLAIRE - for mKey := range p.latencyCount[resAttrKey] { - mLatency := ilm.Metrics().AppendEmpty() - mLatency.SetDataType(pdata.MetricDataTypeHistogram) - mLatency.SetName("latency") - mLatency.Histogram().SetAggregationTemporality(p.config.GetAggregationTemporality()) - - timestamp := pdata.TimestampFromTime(time.Now()) - - dpLatency := mLatency.Histogram().DataPoints().AppendEmpty() - dpLatency.SetStartTimestamp(pdata.TimestampFromTime(p.startTime)) - dpLatency.SetTimestamp(timestamp) - dpLatency.SetExplicitBounds(p.latencyBounds) - dpLatency.SetBucketCounts(p.latencyBucketCounts[resAttrKey][mKey]) - dpLatency.SetCount(p.latencyCount[resAttrKey][mKey]) - dpLatency.SetSum(p.latencySum[resAttrKey][mKey]) - - setLatencyExemplars(p.latencyExemplarsData[resAttrKey][mKey], timestamp, dpLatency.Exemplars()) - - dimensions, err := p.getDimensionsByMetricKey(mKey) - if err != nil { - p.logger.Error(err.Error()) - return err +func (p *processorImp) collectLatencyMetrics(rm pdata.ResourceMetrics, resAttrKey resourceKey) error { + for libKey := range p.latencyCount[resAttrKey] { + ilm := rm.InstrumentationLibraryMetrics().AppendEmpty() + ilm.InstrumentationLibrary().SetName(string(libKey)) + for mKey := range p.latencyCount[resAttrKey][libKey] { + mLatency := ilm.Metrics().AppendEmpty() + mLatency.SetDataType(pdata.MetricDataTypeHistogram) + mLatency.SetName("latency") + mLatency.Histogram().SetAggregationTemporality(p.config.GetAggregationTemporality()) + + timestamp := pdata.TimestampFromTime(time.Now()) + + dpLatency := mLatency.Histogram().DataPoints().AppendEmpty() + dpLatency.SetStartTimestamp(pdata.TimestampFromTime(p.startTime)) + dpLatency.SetTimestamp(timestamp) + dpLatency.SetExplicitBounds(p.latencyBounds) + dpLatency.SetBucketCounts(p.latencyBucketCounts[resAttrKey][libKey][mKey]) + dpLatency.SetCount(p.latencyCount[resAttrKey][libKey][mKey]) + dpLatency.SetSum(p.latencySum[resAttrKey][libKey][mKey]) + + setLatencyExemplars(p.latencyExemplarsData[resAttrKey][libKey][mKey], timestamp, dpLatency.Exemplars()) + + dimensions, err := p.getDimensionsByMetricKey(mKey) + if err != nil { + p.logger.Error(err.Error()) + return err + } + + dimensions.Range(func(k string, v pdata.AttributeValue) bool { + dpLatency.LabelsMap().Upsert(k, tracetranslator.AttributeValueToString(v)) + return true + }) } - - dimensions.Range(func(k string, v pdata.AttributeValue) bool { - dpLatency.LabelsMap().Upsert(k, tracetranslator.AttributeValueToString(v)) - return true - }) } return nil } // collectCallMetrics collects the raw call count metrics, writing the data // into the given instrumentation library metrics. -func (p *processorImp) collectCallMetrics(ilm pdata.InstrumentationLibraryMetrics, resAttrKey resourceKey) error { - // TODO: CLAIRE - for mKey := range p.callSum[resAttrKey] { - mCalls := ilm.Metrics().AppendEmpty() - mCalls.SetDataType(pdata.MetricDataTypeIntSum) - mCalls.SetName("calls_total") - mCalls.IntSum().SetIsMonotonic(true) - mCalls.IntSum().SetAggregationTemporality(p.config.GetAggregationTemporality()) - - dpCalls := mCalls.IntSum().DataPoints().AppendEmpty() - dpCalls.SetStartTimestamp(pdata.TimestampFromTime(p.startTime)) - dpCalls.SetTimestamp(pdata.TimestampFromTime(time.Now())) - dpCalls.SetValue(p.callSum[resAttrKey][mKey]) - - dimensions, err := p.getDimensionsByMetricKey(mKey) - if err != nil { - p.logger.Error(err.Error()) - return err +func (p *processorImp) collectCallMetrics(rm pdata.ResourceMetrics, resAttrKey resourceKey) error { + for libKey := range p.callSum[resAttrKey] { + ilm := rm.InstrumentationLibraryMetrics().AppendEmpty() + ilm.InstrumentationLibrary().SetName(string(libKey)) + for mKey := range p.callSum[resAttrKey][libKey] { + mCalls := ilm.Metrics().AppendEmpty() + mCalls.SetDataType(pdata.MetricDataTypeIntSum) + mCalls.SetName("calls_total") + mCalls.IntSum().SetIsMonotonic(true) + mCalls.IntSum().SetAggregationTemporality(p.config.GetAggregationTemporality()) + + dpCalls := mCalls.IntSum().DataPoints().AppendEmpty() + dpCalls.SetStartTimestamp(pdata.TimestampFromTime(p.startTime)) + dpCalls.SetTimestamp(pdata.TimestampFromTime(time.Now())) + dpCalls.SetValue(p.callSum[resAttrKey][libKey][mKey]) + + dimensions, err := p.getDimensionsByMetricKey(mKey) + if err != nil { + p.logger.Error(err.Error()) + return err + } + + dimensions.Range(func(k string, v pdata.AttributeValue) bool { + dpCalls.LabelsMap().Upsert(k, tracetranslator.AttributeValueToString(v)) + return true + }) } - - dimensions.Range(func(k string, v pdata.AttributeValue) bool { - dpCalls.LabelsMap().Upsert(k, tracetranslator.AttributeValueToString(v)) - return true - }) } return nil } @@ -425,9 +427,16 @@ func (p *processorImp) aggregateMetrics(traces pdata.Traces) { func (p *processorImp) aggregateMetricsForServiceSpans(rspans pdata.ResourceSpans, serviceName string) { ilsSlice := rspans.InstrumentationLibrarySpans() + instrLibName := instrLibKey(instrumentationLibraryName) for j := 0; j < ilsSlice.Len(); j++ { ils := ilsSlice.At(j) - instrLibName := instrLibKey(ils.InstrumentationLibrary().Name()) + + // if confing is set to inherit instrumentation library name, then assume from trace + // otherwise use default + if p.inheritInstrumentationLibraryName { + instrLibName = instrLibKey(ils.InstrumentationLibrary().Name()) + } + spans := ils.Spans() for k := 0; k < spans.Len(); k++ { span := spans.At(k) diff --git a/processor/spanmetricsprocessor/processor_test.go b/processor/spanmetricsprocessor/processor_test.go index d69dcd196b2a..ae12b2a5da2f 100644 --- a/processor/spanmetricsprocessor/processor_test.go +++ b/processor/spanmetricsprocessor/processor_test.go @@ -79,8 +79,9 @@ type metricDataPoint interface { } type serviceSpans struct { - serviceName string - spans []span + serviceName string + spans []span + instrumentationLibraryName string } type span struct { @@ -214,7 +215,7 @@ func TestProcessorConsumeTracesErrors(t *testing.T) { tcon := &mocks.TracesConsumer{} tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(tc.consumeTracesErr) - p := newProcessorImp(mexp, tcon, nil, cumulative, t) + p := newProcessorImp(mexp, tcon, nil, cumulative, t, false) traces := buildSampleTrace() @@ -270,7 +271,7 @@ func TestProcessorConsumeTracesConcurrentSafe(t *testing.T) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := "defaultNullValue" - p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, t) + p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, t, false) for _, traces := range tc.traces { // Test @@ -337,7 +338,7 @@ func TestProcessorConsumeTraces(t *testing.T) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := "defaultNullValue" - p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, t) + p := newProcessorImp(mexp, tcon, &defaultNullValue, tc.aggregationTemporality, t, false) for _, traces := range tc.traces { // Test @@ -351,33 +352,91 @@ func TestProcessorConsumeTraces(t *testing.T) { } } +func TestInheritInstrumentationLibraryName(t *testing.T) { + mexp := &mocks.MetricsExporter{} + tcon := &mocks.TracesConsumer{} + + mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pdata.Metrics) bool { + rmA, rmB := extractResourceMetricsHelper(t, input) + + assert.Equal(t, "service-a-instrumentation-library", rmA.InstrumentationLibraryMetrics().At(0).InstrumentationLibrary().Name()) + assert.Equal(t, "service-b-instrumentation-library", rmB.InstrumentationLibraryMetrics().At(0).InstrumentationLibrary().Name()) + + return true + })).Return(nil) + + tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) + + defaultNullValue := "defaultNullValue" + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, true) + + traces := buildSampleTrace() + + // Test + ctx := metadata.NewIncomingContext(context.Background(), nil) + err := p.ConsumeTraces(ctx, traces) + + // Verify + assert.NoError(t, err) +} + +func TestDefaultInstrumentationLibraryName(t *testing.T) { + mexp := &mocks.MetricsExporter{} + tcon := &mocks.TracesConsumer{} + + mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pdata.Metrics) bool { + rmA, rmB := extractResourceMetricsHelper(t, input) + assert.Equal(t, instrumentationLibraryName, rmA.InstrumentationLibraryMetrics().At(0).InstrumentationLibrary().Name()) + assert.Equal(t, instrumentationLibraryName, rmB.InstrumentationLibraryMetrics().At(0).InstrumentationLibrary().Name()) + + return true + })).Return(nil) + + tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) + + defaultNullValue := "defaultNullValue" + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, false) + + traces := buildSampleTrace() + + // Test + ctx := metadata.NewIncomingContext(context.Background(), nil) + err := p.ConsumeTraces(ctx, traces) + + // Verify + assert.NoError(t, err) +} + +func extractResourceMetricsHelper(t *testing.T, input pdata.Metrics) (pdata.ResourceMetrics, pdata.ResourceMetrics) { + rm := input.ResourceMetrics() + require.Equal(t, 2, rm.Len()) + + rm0 := rm.At(0) + rm1 := rm.At(1) + serviceName, ok := getServiceName(rm0) + assert.True(t, ok, "should get service name from resourceMetric") + + if serviceName == "service-a" { + return rm0, rm1 + } + + return rm1, rm0 +} + func TestResourceCopying(t *testing.T) { // Prepare mexp := &mocks.MetricsExporter{} tcon := &mocks.TracesConsumer{} mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pdata.Metrics) bool { - rm := input.ResourceMetrics() - require.Equal(t, 2, rm.Len()) - - var rmA, rmB pdata.ResourceMetrics - rm0 := rm.At(0) - rm1 := rm.At(1) - serviceName, ok := getServiceName(rm0) - assert.True(t, ok, "should get service name from resourceMetric") - - if serviceName == "service-a" { - rmA = rm0 - rmB = rm1 - } else { - rmB = rm0 - rmA = rm1 - } + rmA, rmB := extractResourceMetricsHelper(t, input) require.Equal(t, 4, rmA.Resource().Attributes().Len()) - require.Equal(t, 4, rmA.InstrumentationLibraryMetrics().At(0).Metrics().Len()) + require.Equal(t, 2, rmA.InstrumentationLibraryMetrics().At(0).Metrics().Len()) + require.Equal(t, 2, rmA.InstrumentationLibraryMetrics().At(1).Metrics().Len()) require.Equal(t, 2, rmB.Resource().Attributes().Len()) - require.Equal(t, 2, rmB.InstrumentationLibraryMetrics().At(0).Metrics().Len()) + require.Equal(t, 1, rmB.InstrumentationLibraryMetrics().At(0).Metrics().Len()) + require.Equal(t, 1, rmB.InstrumentationLibraryMetrics().At(1).Metrics().Len()) wantResourceAttrServiceA := map[string]string{ resourceAttr1: "1", @@ -421,7 +480,7 @@ func TestResourceCopying(t *testing.T) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := "defaultNullValue" - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t) + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, false) traces := buildSampleTrace() traces.ResourceSpans().At(0).Resource().Attributes().Insert(resourceAttr1, pdata.NewAttributeValueString("1")) @@ -443,7 +502,7 @@ func TestMetricKeyCache(t *testing.T) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := "defaultNullValue" - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t) + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, false) traces := buildSampleTrace() @@ -476,7 +535,7 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := "defaultNullValue" - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, b) + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, b, false) traces := buildSampleTrace() @@ -487,7 +546,7 @@ func BenchmarkProcessorConsumeTraces(b *testing.B) { } } -func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, defaultNullValue *string, temporality string, tb testing.TB) *processorImp { +func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, defaultNullValue *string, temporality string, tb testing.TB, inheritInstrumentationLibraryName bool) *processorImp { localDefaultNotInSpanAttrVal := defaultNotInSpanAttrVal // use size 2 for LRU cache for testing purpose metricKeyToDimensions, err := cache.NewCache(DimensionsCacheSize) @@ -507,12 +566,12 @@ func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, de nextConsumer: tcon, startTime: time.Now(), - callSum: make(map[resourceKey]map[metricKey]int64), - latencySum: make(map[resourceKey]map[metricKey]float64), - latencyCount: make(map[resourceKey]map[metricKey]uint64), - latencyBucketCounts: make(map[resourceKey]map[metricKey][]uint64), + callSum: make(map[resourceKey]map[instrLibKey]map[metricKey]int64), + latencySum: make(map[resourceKey]map[instrLibKey]map[metricKey]float64), + latencyCount: make(map[resourceKey]map[instrLibKey]map[metricKey]uint64), + latencyBucketCounts: make(map[resourceKey]map[instrLibKey]map[metricKey][]uint64), latencyBounds: defaultLatencyHistogramBucketsMs, - latencyExemplarsData: make(map[resourceKey]map[metricKey][]exemplarData), + latencyExemplarsData: make(map[resourceKey]map[instrLibKey]map[metricKey][]exemplarData), dimensions: []Dimension{ // Set nil defaults to force a lookup for the attribute in the span. {stringAttrName, nil}, @@ -535,8 +594,9 @@ func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, de {notInSpanResourceAttr0, &localDefaultNotInSpanAttrVal}, {notInSpanResourceAttr1, nil}, }, - resourceKeyToDimensions: resourceKeyToDimensions, - metricKeyToDimensions: metricKeyToDimensions, + resourceKeyToDimensions: resourceKeyToDimensions, + metricKeyToDimensions: metricKeyToDimensions, + inheritInstrumentationLibraryName: inheritInstrumentationLibraryName, } } @@ -587,21 +647,29 @@ func verifyConsumeMetricsInput(t testing.TB, input pdata.Metrics, expectedTempor ilmA := rmA.InstrumentationLibraryMetrics() - require.Equal(t, 1, ilmA.Len()) + require.Equal(t, 2, ilmA.Len()) assert.Equal(t, instrumentationLibraryName, ilmA.At(0).InstrumentationLibrary().Name()) - mA := ilmA.At(0).Metrics() - require.Equal(t, 4, mA.Len()) + mA1 := ilmA.At(0).Metrics() + require.Equal(t, 2, mA1.Len()) + mA2 := ilmA.At(1).Metrics() + require.Equal(t, 2, mA2.Len()) ilmB := rmB.InstrumentationLibraryMetrics() - require.Equal(t, 1, ilmB.Len()) + require.Equal(t, 2, ilmB.Len()) assert.Equal(t, instrumentationLibraryName, ilmB.At(0).InstrumentationLibrary().Name()) - mB := ilmB.At(0).Metrics() - require.Equal(t, 2, mB.Len()) + mB1 := ilmB.At(0).Metrics() + require.Equal(t, 1, mB1.Len()) + mB2 := ilmB.At(1).Metrics() + require.Equal(t, 1, mB2.Len()) - verifyMetrics(mA, expectedTemporality, numCumulativeConsumptions, 2, t) - verifyMetrics(mB, expectedTemporality, numCumulativeConsumptions, 1, t) + // mA1 and mbB1 contains "calls_total" metrics + // mA2 and mB2 contains "latency" metrics + verifyMetrics(mA1, expectedTemporality, numCumulativeConsumptions, 2, t) + verifyMetrics(mA2, expectedTemporality, numCumulativeConsumptions, 0, t) + verifyMetrics(mB1, expectedTemporality, numCumulativeConsumptions, 1, t) + verifyMetrics(mB2, expectedTemporality, numCumulativeConsumptions, 0, t) return true } @@ -727,7 +795,8 @@ func buildSampleTrace() pdata.Traces { initServiceSpans( serviceSpans{ - serviceName: "service-a", + serviceName: "service-a", + instrumentationLibraryName: "service-a-instrumentation-library", spans: []span{ { operation: "/ping", @@ -743,7 +812,8 @@ func buildSampleTrace() pdata.Traces { }, traces.ResourceSpans().AppendEmpty()) initServiceSpans( serviceSpans{ - serviceName: "service-b", + serviceName: "service-b", + instrumentationLibraryName: "service-b-instrumentation-library", spans: []span{ { operation: "/ping", @@ -765,6 +835,10 @@ func initServiceSpans(serviceSpans serviceSpans, spans pdata.ResourceSpans) { spans.Resource().Attributes().InsertString(regionResourceAttrName, sampleRegion) ils := spans.InstrumentationLibrarySpans().AppendEmpty() + if serviceSpans.instrumentationLibraryName != "" { + ils.InstrumentationLibrary().SetName(serviceSpans.instrumentationLibraryName) + } + for _, span := range serviceSpans.spans { initSpan(span, ils.Spans().AppendEmpty()) } @@ -1033,7 +1107,7 @@ func TestTraceWithoutServiceNameDoesNotGenerateMetrics(t *testing.T) { tcon.On("ConsumeTraces", mock.Anything, mock.Anything).Return(nil) defaultNullValue := "defaultNullValue" - p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t) + p := newProcessorImp(mexp, tcon, &defaultNullValue, cumulative, t, false) trace := pdata.NewTraces() @@ -1099,6 +1173,7 @@ func TestProcessorUpdateLatencyExemplars(t *testing.T) { cfg := factory.CreateDefaultConfig().(*Config) traces := buildSampleTrace() traceID := traces.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).TraceID() + traceInstrLibName := instrLibKey(instrumentationLibraryName) mKey := metricKey("metricKey") rKey := resourceKey("resourceKey") next := new(consumertest.TracesSink) @@ -1106,12 +1181,12 @@ func TestProcessorUpdateLatencyExemplars(t *testing.T) { value := float64(42) // ----- call ------------------------------------------------------------- - p.updateLatencyExemplars(rKey, mKey, value, traceID) + p.updateLatencyExemplars(rKey, mKey, value, traceID, traceInstrLibName) // ----- verify ----------------------------------------------------------- assert.NoError(t, err) - assert.NotEmpty(t, p.latencyExemplarsData[rKey][mKey]) - assert.Equal(t, p.latencyExemplarsData[rKey][mKey][0], exemplarData{traceID: traceID, value: value}) + assert.NotEmpty(t, p.latencyExemplarsData[rKey][traceInstrLibName][mKey]) + assert.Equal(t, p.latencyExemplarsData[rKey][traceInstrLibName][mKey][0], exemplarData{traceID: traceID, value: value}) } func TestProcessorResetExemplarData(t *testing.T) { @@ -1121,6 +1196,7 @@ func TestProcessorResetExemplarData(t *testing.T) { mKey := metricKey("metricKey") rKey := resourceKey("resourceKey") + libKey := instrLibKey(instrumentationLibraryName) next := new(consumertest.TracesSink) p, err := newProcessor(zaptest.NewLogger(t), cfg, next) @@ -1129,7 +1205,7 @@ func TestProcessorResetExemplarData(t *testing.T) { // ----- verify ----------------------------------------------------------- assert.NoError(t, err) - assert.Empty(t, p.latencyExemplarsData[rKey][mKey]) + assert.Empty(t, p.latencyExemplarsData[rKey][libKey][mKey]) } func TestBuildResourceAttrKey(t *testing.T) { From 4cb60656ae7c30fd31723d8513a0e78334941e05 Mon Sep 17 00:00:00 2001 From: Claire Fei Date: Tue, 18 Jan 2022 14:57:06 +1100 Subject: [PATCH 6/8] updates readme --- processor/spanmetricsprocessor/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processor/spanmetricsprocessor/README.md b/processor/spanmetricsprocessor/README.md index 1252d0fc2594..ed4fd46cb485 100644 --- a/processor/spanmetricsprocessor/README.md +++ b/processor/spanmetricsprocessor/README.md @@ -118,7 +118,8 @@ processors: default: us-east-1 - name: host_id resource_attributes_cache_size: 1000 - aggregation_temporality: "AGGREGATION_TEMPORALITY_DELTA" + aggregation_temporality: "AGGREGATION_TEMPORALITY_DELTA" + inherit_instrumentation_library_name: true exporters: jaeger: From 3dfd091451b277344782b666eef5a348be2cf58c Mon Sep 17 00:00:00 2001 From: Claire Fei Date: Thu, 20 Jan 2022 14:42:40 +1100 Subject: [PATCH 7/8] fixes PR comments --- processor/spanmetricsprocessor/processor.go | 39 ++++++++++++------- .../spanmetricsprocessor/processor_test.go | 33 ++++++++++++++-- 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/processor/spanmetricsprocessor/processor.go b/processor/spanmetricsprocessor/processor.go index ad4baed3b26b..ee062b40bdc6 100644 --- a/processor/spanmetricsprocessor/processor.go +++ b/processor/spanmetricsprocessor/processor.go @@ -65,6 +65,12 @@ type exemplarData struct { value float64 } +type aggregationMeta struct { + serviceName string + instrLibName instrLibKey + resourceAttr pdata.AttributeMap +} + // metricKey is used to carry the stringified metric attributes type metricKey string @@ -436,7 +442,7 @@ func (p *processorImp) aggregateMetricsForServiceSpans(rspans pdata.ResourceSpan for j := 0; j < ilsSlice.Len(); j++ { ils := ilsSlice.At(j) - // if confing is set to inherit instrumentation library name, then assume from trace + // if config is set to inherit instrumentation library name, then assume from trace // otherwise use default if p.inheritInstrumentationLibraryName { instrLibName = instrLibKey(ils.InstrumentationLibrary().Name()) @@ -445,38 +451,43 @@ func (p *processorImp) aggregateMetricsForServiceSpans(rspans pdata.ResourceSpan spans := ils.Spans() for k := 0; k < spans.Len(); k++ { span := spans.At(k) - p.aggregateMetricsForSpan(serviceName, span, rspans.Resource().Attributes(), instrLibName) + aggrMeta := aggregationMeta{ + serviceName: serviceName, + resourceAttr: rspans.Resource().Attributes(), + instrLibName: instrLibName, + } + p.aggregateMetricsForSpan(span, aggrMeta) } } } -func (p *processorImp) aggregateMetricsForSpan(serviceName string, span pdata.Span, resourceAttr pdata.AttributeMap, instrLibName instrLibKey) { +func (p *processorImp) aggregateMetricsForSpan(span pdata.Span, aggregationMeta aggregationMeta) { latencyInMilliseconds := float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds()) // Binary search to find the latencyInMilliseconds bucket index. index := sort.SearchFloat64s(p.latencyBounds, latencyInMilliseconds) - mKey := p.buildMetricKey(span, resourceAttr, p.attachSpanAndTraceID) - resourceAttrKey := p.buildResourceAttrKey(serviceName, resourceAttr) + mKey := p.buildMetricKey(span, aggregationMeta.resourceAttr, p.attachSpanAndTraceID) + resourceAttrKey := p.buildResourceAttrKey(aggregationMeta.serviceName, aggregationMeta.resourceAttr) - p.cacheMetricKey(span, mKey, resourceAttr) - p.cacheResourceAttrKey(serviceName, resourceAttr, resourceAttrKey) - p.updateCallMetrics(resourceAttrKey, mKey, instrLibName) - p.updateLatencyMetrics(resourceAttrKey, mKey, latencyInMilliseconds, index, instrLibName) - p.updateLatencyExemplars(resourceAttrKey, mKey, latencyInMilliseconds, span.TraceID(), instrLibName) + p.cacheMetricKey(span, mKey, aggregationMeta.resourceAttr) + p.cacheResourceAttrKey(aggregationMeta.serviceName, aggregationMeta.resourceAttr, resourceAttrKey) + p.updateCallMetrics(resourceAttrKey, mKey, aggregationMeta.instrLibName) + p.updateLatencyMetrics(resourceAttrKey, mKey, latencyInMilliseconds, index, aggregationMeta.instrLibName) + p.updateLatencyExemplars(resourceAttrKey, mKey, latencyInMilliseconds, span.TraceID(), aggregationMeta.instrLibName) } // updateCallMetrics increments the call count for the given metric key. -func (p *processorImp) updateCallMetrics(rKey resourceKey, mKey metricKey, instrLibName instrLibKey) { +func (p *processorImp) updateCallMetrics(rKey resourceKey, mKey metricKey, iKey instrLibKey) { if _, ok := p.callSum[rKey]; !ok { p.callSum[rKey] = make(map[instrLibKey]map[metricKey]int64) } - if _, ok := p.callSum[rKey][instrLibName]; !ok { - p.callSum[rKey][instrLibName] = make(map[metricKey]int64) + if _, ok := p.callSum[rKey][iKey]; !ok { + p.callSum[rKey][iKey] = make(map[metricKey]int64) } - p.callSum[rKey][instrLibName][mKey]++ + p.callSum[rKey][iKey][mKey]++ } func (p *processorImp) reset() { diff --git a/processor/spanmetricsprocessor/processor_test.go b/processor/spanmetricsprocessor/processor_test.go index 417ad47678d8..6aed0f95a385 100644 --- a/processor/spanmetricsprocessor/processor_test.go +++ b/processor/spanmetricsprocessor/processor_test.go @@ -360,7 +360,12 @@ func TestInheritInstrumentationLibraryName(t *testing.T) { tcon := &mocks.TracesConsumer{} mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pdata.Metrics) bool { - rmA, rmB := extractResourceMetricsHelper(t, input) + rm := input.ResourceMetrics() + require.Equal(t, 2, rm.Len()) + rmA := findResourceMetricsHelper(t, rm, "service-a") + assert.NotEmpty(t, rmA, "resource metric containing service name `service-a` not found") + rmB := findResourceMetricsHelper(t, rm, "service-b") + assert.NotEmpty(t, rmB, "resource metric containing service name `service-b` not found") assert.Equal(t, "service-a-instrumentation-library", rmA.InstrumentationLibraryMetrics().At(0).InstrumentationLibrary().Name()) assert.Equal(t, "service-b-instrumentation-library", rmB.InstrumentationLibraryMetrics().At(0).InstrumentationLibrary().Name()) @@ -388,7 +393,12 @@ func TestDefaultInstrumentationLibraryName(t *testing.T) { tcon := &mocks.TracesConsumer{} mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pdata.Metrics) bool { - rmA, rmB := extractResourceMetricsHelper(t, input) + rm := input.ResourceMetrics() + require.Equal(t, 2, rm.Len()) + rmA := findResourceMetricsHelper(t, rm, "service-a") + assert.NotEmpty(t, rmA, "resource metric containing service name `service-a` not found") + rmB := findResourceMetricsHelper(t, rm, "service-b") + assert.NotEmpty(t, rmB, "resource metric containing service name `service-b` not found") assert.Equal(t, instrumentationLibraryName, rmA.InstrumentationLibraryMetrics().At(0).InstrumentationLibrary().Name()) assert.Equal(t, instrumentationLibraryName, rmB.InstrumentationLibraryMetrics().At(0).InstrumentationLibrary().Name()) @@ -426,13 +436,30 @@ func extractResourceMetricsHelper(t *testing.T, input pdata.Metrics) (pdata.Reso return rm1, rm0 } +func findResourceMetricsHelper(t *testing.T, rm pdata.ResourceMetricsSlice, serviceNameExpected string) pdata.ResourceMetrics { + for i := 0; i < rm.Len(); i++ { + serviceName, ok := getServiceName(rm.At(i)) + assert.True(t, ok, "resourceMetrics should always have service name") + if serviceName == serviceNameExpected { + return rm.At(i) + } + } + + return pdata.ResourceMetrics{} +} + func TestResourceCopying(t *testing.T) { // Prepare mexp := &mocks.MetricsExporter{} tcon := &mocks.TracesConsumer{} mexp.On("ConsumeMetrics", mock.Anything, mock.MatchedBy(func(input pdata.Metrics) bool { - rmA, rmB := extractResourceMetricsHelper(t, input) + rm := input.ResourceMetrics() + require.Equal(t, 2, rm.Len()) + rmA := findResourceMetricsHelper(t, rm, "service-a") + assert.NotEmpty(t, rmA, "resource metric containing service name `service-a` not found") + rmB := findResourceMetricsHelper(t, rm, "service-b") + assert.NotEmpty(t, rmB, "resource metric containing service name `service-b` not found") require.Equal(t, 4, rmA.Resource().Attributes().Len()) require.Equal(t, 2, rmA.InstrumentationLibraryMetrics().At(0).Metrics().Len()) From fecee58355695a133d759ec3abfc0b5e473d87dd Mon Sep 17 00:00:00 2001 From: Claire Fei Date: Fri, 21 Jan 2022 08:20:41 +1100 Subject: [PATCH 8/8] remove unused function --- processor/spanmetricsprocessor/processor_test.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/processor/spanmetricsprocessor/processor_test.go b/processor/spanmetricsprocessor/processor_test.go index 6aed0f95a385..a5d894464568 100644 --- a/processor/spanmetricsprocessor/processor_test.go +++ b/processor/spanmetricsprocessor/processor_test.go @@ -420,22 +420,6 @@ func TestDefaultInstrumentationLibraryName(t *testing.T) { assert.NoError(t, err) } -func extractResourceMetricsHelper(t *testing.T, input pdata.Metrics) (pdata.ResourceMetrics, pdata.ResourceMetrics) { - rm := input.ResourceMetrics() - require.Equal(t, 2, rm.Len()) - - rm0 := rm.At(0) - rm1 := rm.At(1) - serviceName, ok := getServiceName(rm0) - assert.True(t, ok, "should get service name from resourceMetric") - - if serviceName == "service-a" { - return rm0, rm1 - } - - return rm1, rm0 -} - func findResourceMetricsHelper(t *testing.T, rm pdata.ResourceMetricsSlice, serviceNameExpected string) pdata.ResourceMetrics { for i := 0; i < rm.Len(); i++ { serviceName, ok := getServiceName(rm.At(i))