Skip to content

Commit

Permalink
Fix duplicate instrumentation memory leak (#5754)
Browse files Browse the repository at this point in the history
Fixes #5753

The added test fails on main, but passes after the fix.

---------

Co-authored-by: Sam Xie <sam@samxie.me>
  • Loading branch information
dashpole and XSAM authored Aug 29, 2024
1 parent 080a198 commit e47618f
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Fixed

- Fix memory leak in the global `MeterProvider` when identical instruments are repeatedly created. (#5754)

### Removed

- Drop support for [Go 1.21]. (#5736, #5740)
Expand Down
143 changes: 127 additions & 16 deletions internal/global/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package global // import "go.opentelemetry.io/otel/internal/global"

import (
"container/list"
"reflect"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -76,7 +77,7 @@ func (p *meterProvider) Meter(name string, opts ...metric.MeterOption) metric.Me
return val
}

t := &meter{name: name, opts: opts}
t := &meter{name: name, opts: opts, instruments: make(map[instID]delegatedInstrument)}
p.meters[key] = t
return t
}
Expand All @@ -92,7 +93,7 @@ type meter struct {
opts []metric.MeterOption

mtx sync.Mutex
instruments []delegatedInstrument
instruments map[instID]delegatedInstrument

registry list.List

Expand All @@ -103,6 +104,18 @@ type delegatedInstrument interface {
setDelegate(metric.Meter)
}

// instID are the identifying properties of a instrument.
type instID struct {
// name is the name of the stream.
name string
// description is the description of the stream.
description string
// kind defines the functional group of the instrument.
kind reflect.Type
// unit is the unit of the stream.
unit string
}

// setDelegate configures m to delegate all Meter functionality to Meters
// created by provider.
//
Expand Down Expand Up @@ -139,7 +152,14 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption)
m.mtx.Lock()
defer m.mtx.Unlock()
i := &siCounter{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewInt64CounterConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand All @@ -150,7 +170,14 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou
m.mtx.Lock()
defer m.mtx.Unlock()
i := &siUpDownCounter{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewInt64UpDownCounterConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand All @@ -161,7 +188,14 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
m.mtx.Lock()
defer m.mtx.Unlock()
i := &siHistogram{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewInt64HistogramConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand All @@ -172,7 +206,14 @@ func (m *meter) Int64Gauge(name string, options ...metric.Int64GaugeOption) (met
m.mtx.Lock()
defer m.mtx.Unlock()
i := &siGauge{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewInt64GaugeConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand All @@ -183,7 +224,14 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser
m.mtx.Lock()
defer m.mtx.Unlock()
i := &aiCounter{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewInt64ObservableCounterConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand All @@ -194,7 +242,14 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6
m.mtx.Lock()
defer m.mtx.Unlock()
i := &aiUpDownCounter{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand All @@ -205,7 +260,14 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa
m.mtx.Lock()
defer m.mtx.Unlock()
i := &aiGauge{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewInt64ObservableGaugeConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand All @@ -216,7 +278,14 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti
m.mtx.Lock()
defer m.mtx.Unlock()
i := &sfCounter{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewFloat64CounterConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand All @@ -227,7 +296,14 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow
m.mtx.Lock()
defer m.mtx.Unlock()
i := &sfUpDownCounter{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewFloat64UpDownCounterConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand All @@ -238,7 +314,14 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
m.mtx.Lock()
defer m.mtx.Unlock()
i := &sfHistogram{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewFloat64HistogramConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand All @@ -249,7 +332,14 @@ func (m *meter) Float64Gauge(name string, options ...metric.Float64GaugeOption)
m.mtx.Lock()
defer m.mtx.Unlock()
i := &sfGauge{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewFloat64GaugeConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand All @@ -260,7 +350,14 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O
m.mtx.Lock()
defer m.mtx.Unlock()
i := &afCounter{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewFloat64ObservableCounterConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand All @@ -271,7 +368,14 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl
m.mtx.Lock()
defer m.mtx.Unlock()
i := &afUpDownCounter{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand All @@ -282,7 +386,14 @@ func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64Obs
m.mtx.Lock()
defer m.mtx.Unlock()
i := &afGauge{name: name, opts: options}
m.instruments = append(m.instruments, i)
cfg := metric.NewFloat64ObservableGaugeConfig(options...)
id := instID{
name: name,
kind: reflect.TypeOf(i),
description: cfg.Description(),
unit: cfg.Unit(),
}
m.instruments[id] = i
return i, nil
}

Expand Down
16 changes: 14 additions & 2 deletions internal/global/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var zeroCallback metric.Callback = func(ctx context.Context, or metric.Observer)
}

func TestMeterConcurrentSafe(t *testing.T) {
mtr := &meter{}
mtr := &meter{instruments: make(map[instID]delegatedInstrument)}

wg := &sync.WaitGroup{}
wg.Add(1)
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestMeterConcurrentSafe(t *testing.T) {
}

func TestUnregisterConcurrentSafe(t *testing.T) {
mtr := &meter{}
mtr := &meter{instruments: make(map[instID]delegatedInstrument)}
reg, err := mtr.RegisterCallback(zeroCallback)
require.NoError(t, err)

Expand Down Expand Up @@ -176,6 +176,18 @@ func testCollect(t *testing.T, m metric.Meter) {
tMeter.collect()
}

func TestInstrumentIdentity(t *testing.T) {
globalMeterProvider := &meterProvider{}
m := globalMeterProvider.Meter("go.opentelemetry.io/otel/metric/internal/global/meter_test")
tMeter := m.(*meter)
testSetupAllInstrumentTypes(t, m)
assert.Len(t, tMeter.instruments, 14)
// Creating the same instruments multiple times should not increase the
// number of instruments.
testSetupAllInstrumentTypes(t, m)
assert.Len(t, tMeter.instruments, 14)
}

func TestMeterProviderDelegatesCalls(t *testing.T) {
// The global MeterProvider should directly call the underlying MeterProvider
// if it is set prior to Meter() being called.
Expand Down

0 comments on commit e47618f

Please sign in to comment.