From 42de5b2bcc15b5b66a2225ac8e27d61b8f98e15c Mon Sep 17 00:00:00 2001 From: Kebe Date: Tue, 27 Dec 2022 23:33:12 +0800 Subject: [PATCH] > fix servicegraphprocessor concurrent map read and write (#16851) * > fix servicegraphprocessor concurrent map read and write --- .chloggen/servicegraphprocessor-concurrent.yaml | 11 +++++++++++ processor/servicegraphprocessor/processor.go | 9 +++++++++ 2 files changed, 20 insertions(+) create mode 100644 .chloggen/servicegraphprocessor-concurrent.yaml diff --git a/.chloggen/servicegraphprocessor-concurrent.yaml b/.chloggen/servicegraphprocessor-concurrent.yaml new file mode 100644 index 000000000000..8a08caac2f8c --- /dev/null +++ b/.chloggen/servicegraphprocessor-concurrent.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: servicegraphprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: fix servicegraphprocessor concurrent map read and write + +# One or more tracking issues related to the change +issues: [16850] diff --git a/processor/servicegraphprocessor/processor.go b/processor/servicegraphprocessor/processor.go index 78a803411c31..bf39de6eacfb 100644 --- a/processor/servicegraphprocessor/processor.go +++ b/processor/servicegraphprocessor/processor.go @@ -74,6 +74,7 @@ type serviceGraphProcessor struct { reqDurationBounds []float64 reqDurationSecondsBucketCounts map[string][]uint64 + metricMutex sync.RWMutex keyToMetric map[string]metricSeries shutdownCh chan interface{} @@ -308,6 +309,8 @@ func (p *serviceGraphProcessor) aggregateMetricsForEdge(e *store.Edge) { } func (p *serviceGraphProcessor) updateSeries(key string, dimensions pcommon.Map) { + p.metricMutex.Lock() + defer p.metricMutex.Unlock() // Overwrite the series if it already exists p.keyToMetric[key] = metricSeries{ dimensions: dimensions, @@ -316,6 +319,8 @@ func (p *serviceGraphProcessor) updateSeries(key string, dimensions pcommon.Map) } func (p *serviceGraphProcessor) dimensionsForSeries(key string) (pcommon.Map, bool) { + p.metricMutex.RLock() + defer p.metricMutex.RUnlock() if series, ok := p.keyToMetric[key]; ok { return series.dimensions, true } @@ -487,15 +492,19 @@ func (p *serviceGraphProcessor) cacheLoop(d time.Duration) { // cleanCache removes series that have not been updated in 15 minutes func (p *serviceGraphProcessor) cleanCache() { var staleSeries []string + p.metricMutex.RLock() for key, series := range p.keyToMetric { if series.lastUpdated+15*time.Minute.Milliseconds() < time.Now().UnixMilli() { staleSeries = append(staleSeries, key) } } + p.metricMutex.RUnlock() + p.metricMutex.Lock() for _, key := range staleSeries { delete(p.keyToMetric, key) } + p.metricMutex.Unlock() } // durationToMillis converts the given duration to the number of milliseconds it represents.