Skip to content

Commit

Permalink
Revert "Cache instruments so repeatedly creating identical instrument…
Browse files Browse the repository at this point in the history
…s doesn't leak memory (open-telemetry#4820)"

This reverts commit 1978044.
  • Loading branch information
dmitryax committed Jun 28, 2024
1 parent 4987a1d commit 26a886c
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 287 deletions.
2 changes: 0 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ See our [versioning policy](VERSIONING.md) for more information about these stab
### Fixed

- Fix `ContainerID` resource detection on systemd when cgroup path has a colon. (#4449)
- Fix `go.opentelemetry.io/otel/sdk/metric` to cache instruments to avoid leaking memory when the same instrument is created multiple times. (#4820)
- Fix missing `Mix` and `Max` values for `go.opentelemetry.io/otel/exporters/stdout/stdoutmetric` by introducing `MarshalText` and `MarshalJSON` for the `Extrema` type in `go.opentelemetry.io/sdk/metric/metricdata`. (#4827)

## [1.23.0-rc.1] 2024-01-18

Expand Down
40 changes: 0 additions & 40 deletions sdk/metric/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,43 +41,3 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V {
c.data[key] = val
return val
}

// HasKey returns true if Lookup has previously been called with that key
//
// HasKey is safe to call concurrently.
func (c *cache[K, V]) HasKey(key K) bool {
c.Lock()
defer c.Unlock()
_, ok := c.data[key]
return ok
}

// cacheWithErr is a locking storage used to quickly return already computed values and an error.
//
// The zero value of a cacheWithErr is empty and ready to use.
//
// A cacheWithErr must not be copied after first use.
//
// All methods of a cacheWithErr are safe to call concurrently.
type cacheWithErr[K comparable, V any] struct {
cache[K, valAndErr[V]]
}

type valAndErr[V any] struct {
val V
err error
}

// Lookup returns the value stored in the cacheWithErr with the associated key
// if it exists. Otherwise, f is called and its returned value is set in the
// cacheWithErr for key and returned.
//
// Lookup is safe to call concurrently. It will hold the cacheWithErr lock, so f
// should not block excessively.
func (c *cacheWithErr[K, V]) Lookup(key K, f func() (V, error)) (V, error) {
combined := c.cache.Lookup(key, func() valAndErr[V] {
val, err := f()
return valAndErr[V]{val: val, err: err}
})
return combined.val, combined.err
}
186 changes: 50 additions & 136 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ type meter struct {
scope instrumentation.Scope
pipes pipelines

int64Insts *cacheWithErr[instID, *int64Inst]
float64Insts *cacheWithErr[instID, *float64Inst]
int64ObservableInsts *cacheWithErr[instID, int64Observable]
float64ObservableInsts *cacheWithErr[instID, float64Observable]

int64Resolver resolver[int64]
float64Resolver resolver[float64]
}
Expand All @@ -44,20 +39,11 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
// meter is asked to create are logged to the user.
var viewCache cache[string, instID]

var int64Insts cacheWithErr[instID, *int64Inst]
var float64Insts cacheWithErr[instID, *float64Inst]
var int64ObservableInsts cacheWithErr[instID, int64Observable]
var float64ObservableInsts cacheWithErr[instID, float64Observable]

return &meter{
scope: s,
pipes: p,
int64Insts: &int64Insts,
float64Insts: &float64Insts,
int64ObservableInsts: &int64ObservableInsts,
float64ObservableInsts: &float64ObservableInsts,
int64Resolver: newResolver[int64](p, &viewCache),
float64Resolver: newResolver[float64](p, &viewCache),
scope: s,
pipes: p,
int64Resolver: newResolver[int64](p, &viewCache),
float64Resolver: newResolver[float64](p, &viewCache),
}
}

Expand Down Expand Up @@ -126,49 +112,32 @@ func (m *meter) Int64Gauge(name string, options ...metric.Int64GaugeOption) (met
// int64ObservableInstrument returns a new observable identified by the Instrument.
// It registers callbacks for each reader's pipeline.
func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int64Callback) (int64Observable, error) {
key := instID{
Name: id.Name,
Description: id.Description,
Unit: id.Unit,
Kind: id.Kind,
}
if m.int64ObservableInsts.HasKey(key) && len(callbacks) > 0 {
warnRepeatedObservableCallbacks(id)
}
return m.int64ObservableInsts.Lookup(key, func() (int64Observable, error) {
inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.int64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
fn := cback
insert.addCallback(func(ctx context.Context) error { return fn(ctx, inst) })
}
inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.int64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
return inst, validateInstrumentName(id.Name)
})
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
}
return inst, validateInstrumentName(id.Name)
}

// Int64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing int64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
//
// If Int64ObservableCounter is invoked repeatedly with the same Name,
// Description, and Unit, only the first set of callbacks provided are used.
// Use meter.RegisterCallback and Registration.Unregister to manage callbacks
// if instrumentation can be created multiple times with different callbacks.
func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
cfg := metric.NewInt64ObservableCounterConfig(options...)
id := Instrument{
Expand Down Expand Up @@ -275,49 +244,32 @@ func (m *meter) Float64Gauge(name string, options ...metric.Float64GaugeOption)
// float64ObservableInstrument returns a new observable identified by the Instrument.
// It registers callbacks for each reader's pipeline.
func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Float64Callback) (float64Observable, error) {
key := instID{
Name: id.Name,
Description: id.Description,
Unit: id.Unit,
Kind: id.Kind,
}
if m.int64ObservableInsts.HasKey(key) && len(callbacks) > 0 {
warnRepeatedObservableCallbacks(id)
}
return m.float64ObservableInsts.Lookup(key, func() (float64Observable, error) {
inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.float64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
fn := cback
insert.addCallback(func(ctx context.Context) error { return fn(ctx, inst) })
}
inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.float64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
return inst, validateInstrumentName(id.Name)
})
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
}
return inst, validateInstrumentName(id.Name)
}

// Float64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing float64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
//
// If Float64ObservableCounter is invoked repeatedly with the same Name,
// Description, and Unit, only the first set of callbacks provided are used.
// Use meter.RegisterCallback and Registration.Unregister to manage callbacks
// if instrumentation can be created multiple times with different callbacks.
func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
cfg := metric.NewFloat64ObservableCounterConfig(options...)
id := Instrument{
Expand Down Expand Up @@ -391,16 +343,6 @@ func isAlphanumeric(c rune) bool {
return isAlpha(c) || ('0' <= c && c <= '9')
}

func warnRepeatedObservableCallbacks(id Instrument) {
inst := fmt.Sprintf(
"Instrument{Name: %q, Description: %q, Kind: %q, Unit: %q}",
id.Name, id.Description, "InstrumentKind"+id.Kind.String(), id.Unit,
)
global.Warn("Repeated observable instrument creation with callbacks. Ignoring new callbacks. Use meter.RegisterCallback and Registration.Unregister to manage callbacks.",
"instrument", inst,
)
}

// RegisterCallback registers f to be called each collection cycle so it will
// make observations for insts during those cycles.
//
Expand Down Expand Up @@ -606,28 +548,14 @@ func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramC

// lookup returns the resolved instrumentImpl.
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
return p.meter.int64Insts.Lookup(instID{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
}, func() (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{measures: aggs}, err
})
aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{measures: aggs}, err
}

// lookupHistogram returns the resolved instrumentImpl.
func (p int64InstProvider) lookupHistogram(name string, cfg metric.Int64HistogramConfig) (*int64Inst, error) {
return p.meter.int64Insts.Lookup(instID{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
}, func() (*int64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &int64Inst{measures: aggs}, err
})
aggs, err := p.histogramAggs(name, cfg)
return &int64Inst{measures: aggs}, err
}

// float64InstProvider provides float64 OpenTelemetry instruments.
Expand Down Expand Up @@ -664,28 +592,14 @@ func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64Histog

// lookup returns the resolved instrumentImpl.
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
return p.meter.float64Insts.Lookup(instID{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
}, func() (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{measures: aggs}, err
})
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{measures: aggs}, err
}

// lookupHistogram returns the resolved instrumentImpl.
func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64HistogramConfig) (*float64Inst, error) {
return p.meter.float64Insts.Lookup(instID{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
}, func() (*float64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &float64Inst{measures: aggs}, err
})
aggs, err := p.histogramAggs(name, cfg)
return &float64Inst{measures: aggs}, err
}

type int64Observer struct {
Expand Down
Loading

0 comments on commit 26a886c

Please sign in to comment.