Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: albertteoh <albert.teoh@logz.io>
  • Loading branch information
albertteoh committed Jan 26, 2021
1 parent 9a46143 commit bfee7b8
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 48 deletions.
94 changes: 47 additions & 47 deletions processor/spanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ import (
)

const (
serviceNameKey = conventions.AttributeServiceName
operationKey = "operation" // is there a constant we can refer to?
spanKindKey = tracetranslator.TagSpanKind
statusCodeKey = tracetranslator.TagStatusCode
serviceNameKey = conventions.AttributeServiceName
operationKey = "operation" // is there a constant we can refer to?
spanKindKey = tracetranslator.TagSpanKind
statusCodeKey = tracetranslator.TagStatusCode
metricKeySeparator = string(byte(0))
)

var (
Expand Down Expand Up @@ -76,15 +77,9 @@ type processorImp struct {
latencyBucketCounts map[metricKey][]uint64
latencyBounds []float64

// Builds the unique identifier for a metric: an ordered concatenation the metric's dimension values.
metricKeyBuilder strings.Builder

// A cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values:
// e.g. { "foo/barOK": { "serviceName": "foo", "operation": "/bar", "status_code": "OK" }}
metricKeyToDimensions map[metricKey]dimKV

// A reusable buffer for storing intermediate dimension key-values as they are added to the metric key.
dimensionsBuffer map[string]string
}

func newProcessor(logger *zap.Logger, config configmodels.Exporter, nextConsumer consumer.TracesConsumer) *processorImp {
Expand Down Expand Up @@ -114,7 +109,6 @@ func newProcessor(logger *zap.Logger, config configmodels.Exporter, nextConsumer
latencyBucketCounts: make(map[metricKey][]uint64),
nextConsumer: nextConsumer,
dimensions: pConfig.Dimensions,
dimensionsBuffer: make(map[string]string),
metricKeyToDimensions: make(map[metricKey]dimKV),
}
}
Expand Down Expand Up @@ -296,7 +290,8 @@ func (p *processorImp) aggregateMetricsForSpan(serviceName string, span pdata.Sp
index := sort.SearchFloat64s(p.latencyBounds, latencyInMilliseconds)

p.lock.Lock()
key := p.buildKey(serviceName, span)
key := buildKey(serviceName, span, p.dimensions)
p.cache(serviceName, span, key)
p.updateCallMetrics(key)
p.updateLatencyMetrics(key, latencyInMilliseconds, index)
p.lock.Unlock()
Expand All @@ -317,63 +312,68 @@ func (p *processorImp) updateLatencyMetrics(key metricKey, latency float64, inde
p.latencyBucketCounts[key][index]++
}

func (p *processorImp) addDimension(key, value string) {
func buildDimensionKVs(serviceName string, span pdata.Span, optionalDims []Dimension) dimKV {
dims := make(dimKV)
dims[serviceNameKey] = serviceName
dims[operationKey] = span.Name()
dims[spanKindKey] = span.Kind().String()
dims[statusCodeKey] = span.Status().Code().String()
spanAttr := span.Attributes()
var value string
for _, d := range optionalDims {
// Set the default if configured, otherwise this metric will have no value set for the dimension.
if d.Default != nil {
value = *d.Default
}
if attr, ok := spanAttr.Get(d.Name); ok {
value = tracetranslator.AttributeValueToString(attr, false)
}
dims[d.Name] = value
}
return dims
}

func concatDimensionValue(metricKeyBuilder *strings.Builder, value string, prefixSep bool) {
// It's worth noting that from pprof benchmarks, WriteString is the most expensive operation of this processor.
// Specifically, the need to grow the underlying []byte slice to make room for the appended string.
p.metricKeyBuilder.WriteString(value)
p.dimensionsBuffer[key] = value
if prefixSep {
metricKeyBuilder.WriteString(metricKeySeparator)
}
metricKeyBuilder.WriteString(value)
}

// buildKey builds the metric key from the service name and span metadata such as operation, kind, status_code and
// any additional dimensions the user has configured.
// The metric key is a simple concatenation of dimension values.
func (p *processorImp) buildKey(serviceName string, span pdata.Span) metricKey {
// Reset back to a clean state.
defer func() {
// Compiler will optimize this map clearing operation.
// https://github.com/golang/go/blob/master/doc/go1.11.html#L447
for k := range p.dimensionsBuffer {
delete(p.dimensionsBuffer, k)
}
p.metricKeyBuilder.Reset()
}()
func buildKey(serviceName string, span pdata.Span, optionalDims []Dimension) metricKey {
var metricKeyBuilder strings.Builder
concatDimensionValue(&metricKeyBuilder, serviceName, false)
concatDimensionValue(&metricKeyBuilder, span.Name(), true)
concatDimensionValue(&metricKeyBuilder, span.Kind().String(), true)
concatDimensionValue(&metricKeyBuilder, span.Status().Code().String(), true)

p.addDimension(serviceNameKey, serviceName)
p.addDimension(operationKey, span.Name())
p.addDimension(spanKindKey, span.Kind().String())
p.addDimension(statusCodeKey, span.Status().Code().String())
spanAttr := span.Attributes()
var value string
for _, d := range p.dimensions {
for _, d := range optionalDims {
// Set the default if configured, otherwise this metric will have no value set for the dimension.
if d.Default != nil {
value = *d.Default
}
if attr, ok := spanAttr.Get(d.Name); ok {
value = tracetranslator.AttributeValueToString(attr, false)
}
p.addDimension(d.Name, value)
concatDimensionValue(&metricKeyBuilder, value, true)
}
metricKey := metricKey(p.metricKeyBuilder.String())

p.cache(metricKey)

return metricKey
k := metricKey(metricKeyBuilder.String())
return k
}

// cache caches the dimension key-value map stored in the dimensionsBuffer for the given metricKey.
// cache the dimension key-value map for the metricKey if there is a cache miss.
// This enables a lookup of the dimension key-value map when constructing the metric like so:
// LabelsMap().InitFromMap(p.metricKeyToDimensions[key])
func (p *processorImp) cache(metricKey metricKey) {
if _, ok := p.metricKeyToDimensions[metricKey]; ok {
return
}
// Create new map to copy the buffer contents.
targetMap := make(dimKV)

// Copy from the original map to the target map
for key, value := range p.dimensionsBuffer {
targetMap[key] = value
func (p *processorImp) cache(serviceName string, span pdata.Span, k metricKey) {
if _, ok := p.metricKeyToDimensions[k]; !ok {
p.metricKeyToDimensions[k] = buildDimensionKVs(serviceName, span, p.dimensions)
}
p.metricKeyToDimensions[metricKey] = targetMap
}
13 changes: 12 additions & 1 deletion processor/spanmetricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func newProcessorImp(mexp *mocks.MetricsExporter, tcon *mocks.TracesConsumer, de
{nullAttrName, defaultNullValue},
},
metricKeyToDimensions: make(map[metricKey]dimKV),
dimensionsBuffer: make(map[string]string),
}
}

Expand Down Expand Up @@ -481,3 +480,15 @@ func newOTLPExporters(t *testing.T) (*otlpexporter.Config, component.MetricsExpo
require.NoError(t, err)
return otlpConfig, mexp, texp
}

func TestBuildKey(t *testing.T) {
span0 := pdata.NewSpan()
span0.SetName("c")
k0 := buildKey("ab", span0, nil)

span1 := pdata.NewSpan()
span1.SetName("bc")
k1 := buildKey("a", span1, nil)

assert.NotEqual(t, k0, k1)
}

0 comments on commit bfee7b8

Please sign in to comment.