Skip to content

Commit

Permalink
> fix servicegraphprocessor concurrent map read and write (#16851)
Browse files Browse the repository at this point in the history
* > fix servicegraphprocessor concurrent map read and write
  • Loading branch information
kebe7jun authored Dec 27, 2022
1 parent 5c2184f commit 42de5b2
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
11 changes: 11 additions & 0 deletions .chloggen/servicegraphprocessor-concurrent.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: servicegraphprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: fix servicegraphprocessor concurrent map read and write

# One or more tracking issues related to the change
issues: [16850]
9 changes: 9 additions & 0 deletions processor/servicegraphprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type serviceGraphProcessor struct {
reqDurationBounds []float64
reqDurationSecondsBucketCounts map[string][]uint64

metricMutex sync.RWMutex
keyToMetric map[string]metricSeries

shutdownCh chan interface{}
Expand Down Expand Up @@ -308,6 +309,8 @@ func (p *serviceGraphProcessor) aggregateMetricsForEdge(e *store.Edge) {
}

func (p *serviceGraphProcessor) updateSeries(key string, dimensions pcommon.Map) {
p.metricMutex.Lock()
defer p.metricMutex.Unlock()
// Overwrite the series if it already exists
p.keyToMetric[key] = metricSeries{
dimensions: dimensions,
Expand All @@ -316,6 +319,8 @@ func (p *serviceGraphProcessor) updateSeries(key string, dimensions pcommon.Map)
}

func (p *serviceGraphProcessor) dimensionsForSeries(key string) (pcommon.Map, bool) {
p.metricMutex.RLock()
defer p.metricMutex.RUnlock()
if series, ok := p.keyToMetric[key]; ok {
return series.dimensions, true
}
Expand Down Expand Up @@ -487,15 +492,19 @@ func (p *serviceGraphProcessor) cacheLoop(d time.Duration) {
// cleanCache removes series that have not been updated in 15 minutes
func (p *serviceGraphProcessor) cleanCache() {
var staleSeries []string
p.metricMutex.RLock()
for key, series := range p.keyToMetric {
if series.lastUpdated+15*time.Minute.Milliseconds() < time.Now().UnixMilli() {
staleSeries = append(staleSeries, key)
}
}
p.metricMutex.RUnlock()

p.metricMutex.Lock()
for _, key := range staleSeries {
delete(p.keyToMetric, key)
}
p.metricMutex.Unlock()
}

// durationToMillis converts the given duration to the number of milliseconds it represents.
Expand Down

0 comments on commit 42de5b2

Please sign in to comment.