From 4014204d42d55084fd90b6cb859dcde14f00231f Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 16 Dec 2022 12:02:42 -0800 Subject: [PATCH] Allow multi-instrument callbacks to be unregistered (#3522) * Update Meter RegisterCallback method Return a Registration from the method that can be used by the caller to unregister their callback. Update documentation of the method to better explain expectations of use and implementation. * Update noop impl * Update global impl * Test global Unregister concurrent safe * Use a map to track reg in global impl * Update sdk impl * Use a list for global impl * Fix prom example * Lint metric/meter.go * Fix metric example * Placeholder for changelog * Update PR number in changelog * Update sdk/metric/pipeline.go Co-authored-by: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> * Add test unregistered callback is not called Co-authored-by: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> --- CHANGELOG.md | 10 +- example/prometheus/main.go | 2 +- metric/example_test.go | 4 +- metric/internal/global/meter.go | 63 +++++++-- metric/internal/global/meter_test.go | 84 +++++++++++- metric/internal/global/meter_types_test.go | 21 ++- metric/meter.go | 28 +++- metric/noop.go | 8 +- sdk/metric/meter.go | 15 ++- sdk/metric/meter_test.go | 142 ++++++++++++++++++--- sdk/metric/pipeline.go | 39 ++++-- 11 files changed, 350 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 40dc1d752ab..ed5100716f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,14 +8,16 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## [Unreleased] -### Removed - -- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520) - ### Added +- Return a `Registration` from the `RegisterCallback` method of a `Meter` in the `go.opentelemetry.io/otel/metric` package. + This `Registration` can be used to unregister callbacks. (#3522) - Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524) +### Removed + +- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520) + ### Changed - Global error handler uses an atomic value instead of a mutex. (#3543) diff --git a/example/prometheus/main.go b/example/prometheus/main.go index bc15f041486..39015994517 100644 --- a/example/prometheus/main.go +++ b/example/prometheus/main.go @@ -68,7 +68,7 @@ func main() { if err != nil { log.Fatal(err) } - err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + _, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { n := -10. + rand.Float64()*(90.) // [-10, 100) gauge.Observe(ctx, n, attrs...) }) diff --git a/metric/example_test.go b/metric/example_test.go index bc94d7572ef..dc22989f627 100644 --- a/metric/example_test.go +++ b/metric/example_test.go @@ -61,7 +61,7 @@ func ExampleMeter_asynchronous_single() { panic(err) } - err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage}, + _, err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage}, func(ctx context.Context) { // instrument.WithCallbackFunc(func(ctx context.Context) { //Do Work to get the real memoryUsage @@ -86,7 +86,7 @@ func ExampleMeter_asynchronous_multiple() { gcCount, _ := meter.AsyncInt64().Counter("gcCount") gcPause, _ := meter.SyncFloat64().Histogram("gcPause") - err := meter.RegisterCallback([]instrument.Asynchronous{ + _, err := meter.RegisterCallback([]instrument.Asynchronous{ heapAlloc, gcCount, }, diff --git a/metric/internal/global/meter.go b/metric/internal/global/meter.go index 0fa924f397c..e8c83578459 100644 --- a/metric/internal/global/meter.go +++ b/metric/internal/global/meter.go @@ -15,6 +15,7 @@ package global // import "go.opentelemetry.io/otel/metric/internal/global" import ( + "container/list" "context" "sync" "sync/atomic" @@ -109,7 +110,8 @@ type meter struct { mtx sync.Mutex instruments []delegatedInstrument - callbacks []delegatedCallback + + registry list.List delegate atomic.Value // metric.Meter } @@ -135,12 +137,14 @@ func (m *meter) setDelegate(provider metric.MeterProvider) { inst.setDelegate(meter) } - for _, callback := range m.callbacks { - callback.setDelegate(meter) + for e := m.registry.Front(); e != nil; e = e.Next() { + r := e.Value.(*registration) + r.setDelegate(meter) + m.registry.Remove(e) } m.instruments = nil - m.callbacks = nil + m.registry.Init() } // AsyncInt64 is the namespace for the Asynchronous Integer instruments. @@ -167,20 +171,24 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { // // It is only valid to call Observe within the scope of the passed function, // and only on the instruments that were registered with this call. -func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error { +func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { if del, ok := m.delegate.Load().(metric.Meter); ok { insts = unwrapInstruments(insts) - return del.RegisterCallback(insts, function) + return del.RegisterCallback(insts, f) } m.mtx.Lock() defer m.mtx.Unlock() - m.callbacks = append(m.callbacks, delegatedCallback{ - instruments: insts, - function: function, - }) - return nil + reg := ®istration{instruments: insts, function: f} + e := m.registry.PushBack(reg) + reg.unreg = func() error { + m.mtx.Lock() + _ = m.registry.Remove(e) + m.mtx.Unlock() + return nil + } + return reg, nil } type wrapped interface { @@ -217,17 +225,44 @@ func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider { return (*sfInstProvider)(m) } -type delegatedCallback struct { +type registration struct { instruments []instrument.Asynchronous function func(context.Context) + + unreg func() error + unregMu sync.Mutex } -func (c *delegatedCallback) setDelegate(m metric.Meter) { +func (c *registration) setDelegate(m metric.Meter) { insts := unwrapInstruments(c.instruments) - err := m.RegisterCallback(insts, c.function) + + c.unregMu.Lock() + defer c.unregMu.Unlock() + + if c.unreg == nil { + // Unregister already called. + return + } + + reg, err := m.RegisterCallback(insts, c.function) if err != nil { otel.Handle(err) } + + c.unreg = reg.Unregister +} + +func (c *registration) Unregister() error { + c.unregMu.Lock() + defer c.unregMu.Unlock() + if c.unreg == nil { + // Unregister already called. + return nil + } + + var err error + err, c.unreg = c.unreg(), nil + return err } type afInstProvider meter diff --git a/metric/internal/global/meter_test.go b/metric/internal/global/meter_test.go index 8865f06d57b..15a0bf877af 100644 --- a/metric/internal/global/meter_test.go +++ b/metric/internal/global/meter_test.go @@ -68,7 +68,7 @@ func TestMeterRace(t *testing.T) { _, _ = mtr.SyncInt64().Counter(name) _, _ = mtr.SyncInt64().UpDownCounter(name) _, _ = mtr.SyncInt64().Histogram(name) - _ = mtr.RegisterCallback(nil, func(ctx context.Context) {}) + _, _ = mtr.RegisterCallback(nil, func(ctx context.Context) {}) if !once { wg.Done() once = true @@ -86,6 +86,35 @@ func TestMeterRace(t *testing.T) { close(finish) } +func TestUnregisterRace(t *testing.T) { + mtr := &meter{} + reg, err := mtr.RegisterCallback(nil, func(ctx context.Context) {}) + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(1) + finish := make(chan struct{}) + go func() { + for i, once := 0, false; ; i++ { + _ = reg.Unregister() + if !once { + wg.Done() + once = true + } + select { + case <-finish: + return + default: + } + } + }() + _ = reg.Unregister() + + wg.Wait() + mtr.setDelegate(metric.NewNoopMeterProvider()) + close(finish) +} + func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Counter, asyncfloat64.Counter) { afcounter, err := m.AsyncFloat64().Counter("test_Async_Counter") require.NoError(t, err) @@ -101,9 +130,10 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Coun _, err = m.AsyncInt64().Gauge("test_Async_Gauge") assert.NoError(t, err) - require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) { afcounter.Observe(ctx, 3) - })) + }) + require.NoError(t, err) sfcounter, err := m.SyncFloat64().Counter("test_Async_Counter") require.NoError(t, err) @@ -257,3 +287,51 @@ func TestMeterDefersDelegations(t *testing.T) { assert.IsType(t, &afCounter{}, actr) assert.Equal(t, 1, mp.count) } + +func TestRegistrationDelegation(t *testing.T) { + // globalMeterProvider := otel.GetMeterProvider + globalMeterProvider := &meterProvider{} + + m := globalMeterProvider.Meter("go.opentelemetry.io/otel/metric/internal/global/meter_test") + require.IsType(t, &meter{}, m) + mImpl := m.(*meter) + + actr, err := m.AsyncFloat64().Counter("test_Async_Counter") + require.NoError(t, err) + + var called0 bool + reg0, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) { + called0 = true + }) + require.NoError(t, err) + require.Equal(t, 1, mImpl.registry.Len(), "callback not registered") + // This means reg0 should not be delegated. + assert.NoError(t, reg0.Unregister()) + assert.Equal(t, 0, mImpl.registry.Len(), "callback not unregistered") + + var called1 bool + reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) { + called1 = true + }) + require.NoError(t, err) + require.Equal(t, 1, mImpl.registry.Len(), "second callback not registered") + + mp := &testMeterProvider{} + + // otel.SetMeterProvider(mp) + globalMeterProvider.setDelegate(mp) + + testCollect(t, m) // This is a hacky way to emulate a read from an exporter + require.False(t, called0, "pre-delegation unregistered callback called") + require.True(t, called1, "callback not called") + + called1 = false + assert.NoError(t, reg1.Unregister(), "unregister second callback") + + testCollect(t, m) // This is a hacky way to emulate a read from an exporter + assert.False(t, called1, "unregistered callback called") + + assert.NotPanics(t, func() { + assert.NoError(t, reg1.Unregister(), "duplicate unregister calls") + }) +} diff --git a/metric/internal/global/meter_types_test.go b/metric/internal/global/meter_types_test.go index ac6e93ebe38..53dfbc7528d 100644 --- a/metric/internal/global/meter_types_test.go +++ b/metric/internal/global/meter_types_test.go @@ -64,8 +64,21 @@ func (m *testMeter) AsyncFloat64() asyncfloat64.InstrumentProvider { // // It is only valid to call Observe within the scope of the passed function, // and only on the instruments that were registered with this call. -func (m *testMeter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error { - m.callbacks = append(m.callbacks, function) +func (m *testMeter) RegisterCallback(i []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { + m.callbacks = append(m.callbacks, f) + return testReg{ + f: func(idx int) func() { + return func() { m.callbacks[idx] = nil } + }(len(m.callbacks) - 1), + }, nil +} + +type testReg struct { + f func() +} + +func (r testReg) Unregister() error { + r.f() return nil } @@ -85,6 +98,10 @@ func (m *testMeter) SyncFloat64() syncfloat64.InstrumentProvider { func (m *testMeter) collect() { ctx := context.Background() for _, f := range m.callbacks { + if f == nil { + // Unregister. + continue + } f(ctx) } } diff --git a/metric/meter.go b/metric/meter.go index 23e6853afbb..3a505264ca0 100644 --- a/metric/meter.go +++ b/metric/meter.go @@ -51,14 +51,30 @@ type Meter interface { // To Observe data with instruments it must be registered in a callback. AsyncFloat64() asyncfloat64.InstrumentProvider - // RegisterCallback captures the function that will be called during Collect. - // - // It is only valid to call Observe within the scope of the passed function, - // and only on the instruments that were registered with this call. - RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error - // SyncInt64 is the namespace for the Synchronous Integer instruments SyncInt64() syncint64.InstrumentProvider // SyncFloat64 is the namespace for the Synchronous Float instruments SyncFloat64() syncfloat64.InstrumentProvider + + // RegisterCallback registers f to be called during the collection of a + // measurement cycle. + // + // If Unregister of the returned Registration is called, f needs to be + // unregistered and not called during collection. + // + // The instruments f is registered with are the only instruments that f may + // observe values for. + // + // If no instruments are passed, f should not be registered nor called + // during collection. + RegisterCallback(instruments []instrument.Asynchronous, f func(context.Context)) (Registration, error) +} + +// Registration is an token representing the unique registration of a callback +// for a set of instruments with a Meter. +type Registration interface { + // Unregister removes the callback registration from a Meter. + // + // This method needs to be idempotent and concurrent safe. + Unregister() error } diff --git a/metric/noop.go b/metric/noop.go index e8b9a9a1458..7454a790337 100644 --- a/metric/noop.go +++ b/metric/noop.go @@ -64,10 +64,14 @@ func (noopMeter) SyncFloat64() syncfloat64.InstrumentProvider { } // RegisterCallback creates a register callback that does not record any metrics. -func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) error { - return nil +func (noopMeter) RegisterCallback([]instrument.Asynchronous, func(context.Context)) (Registration, error) { + return noopReg{}, nil } +type noopReg struct{} + +func (noopReg) Unregister() error { return nil } + type nonrecordingAsyncFloat64Instrument struct { instrument.Asynchronous } diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 418827e9672..c2c515af35c 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -69,7 +69,7 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider { // RegisterCallback registers the function f to be called when any of the // insts Collect method is called. -func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error { +func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) (metric.Registration, error) { for _, inst := range insts { // Only register if at least one instrument has a non-drop aggregation. // Otherwise, calling f during collection will be wasted computation. @@ -91,14 +91,21 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context } } // All insts use drop aggregation. - return nil + return noopRegister{}, nil } -func (m *meter) registerCallback(f func(context.Context)) error { - m.pipes.registerCallback(f) +type noopRegister struct{} + +func (noopRegister) Unregister() error { return nil } +type callback func(context.Context) + +func (m *meter) registerCallback(c callback) (metric.Registration, error) { + return m.pipes.registerCallback(c), nil +} + // SyncInt64 returns the synchronous integer instrument provider. func (m *meter) SyncInt64() syncint64.InstrumentProvider { return syncInt64Provider{m.instProviderInt64} diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 013a52e5924..d904b118ad4 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -103,11 +103,63 @@ func TestMeterCallbackCreationConcurrency(t *testing.T) { m := NewMeterProvider().Meter("callback-concurrency") go func() { - _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) + _, _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) wg.Done() }() go func() { - _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) + _, _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {}) + wg.Done() + }() + wg.Wait() +} + +func TestNoopCallbackUnregisterConcurrency(t *testing.T) { + m := NewMeterProvider().Meter("noop-unregister-concurrency") + reg, err := m.RegisterCallback(nil, func(ctx context.Context) {}) + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + _ = reg.Unregister() + wg.Done() + }() + go func() { + _ = reg.Unregister() + wg.Done() + }() + wg.Wait() +} + +func TestCallbackUnregisterConcurrency(t *testing.T) { + reader := NewManualReader() + provider := NewMeterProvider(WithReader(reader)) + meter := provider.Meter("unregister-concurrency") + + actr, err := meter.AsyncFloat64().Counter("counter") + require.NoError(t, err) + + ag, err := meter.AsyncInt64().Gauge("gauge") + require.NoError(t, err) + + i := []instrument.Asynchronous{actr} + regCtr, err := meter.RegisterCallback(i, func(ctx context.Context) {}) + require.NoError(t, err) + + i = []instrument.Asynchronous{ag} + regG, err := meter.RegisterCallback(i, func(ctx context.Context) {}) + require.NoError(t, err) + + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + _ = regCtr.Unregister() + _ = regG.Unregister() + wg.Done() + }() + go func() { + _ = regCtr.Unregister() + _ = regG.Unregister() wg.Done() }() wg.Wait() @@ -126,7 +178,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { ctr, err := m.AsyncInt64().Counter("aint") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 3) }) assert.NoError(t, err) @@ -150,7 +202,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { ctr, err := m.AsyncInt64().UpDownCounter("aint") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 11) }) assert.NoError(t, err) @@ -174,7 +226,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { gauge, err := m.AsyncInt64().Gauge("agauge") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { gauge.Observe(ctx, 11) }) assert.NoError(t, err) @@ -196,7 +248,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { ctr, err := m.AsyncFloat64().Counter("afloat") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 3) }) assert.NoError(t, err) @@ -220,7 +272,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { ctr, err := m.AsyncFloat64().UpDownCounter("afloat") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 11) }) assert.NoError(t, err) @@ -244,7 +296,7 @@ func TestMeterCreatesInstruments(t *testing.T) { fn: func(t *testing.T, m metric.Meter) { gauge, err := m.AsyncFloat64().Gauge("agauge") assert.NoError(t, err) - err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { + _, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) { gauge.Observe(ctx, 11) }) assert.NoError(t, err) @@ -418,7 +470,7 @@ func TestMetersProvideScope(t *testing.T) { m1 := mp.Meter("scope1") ctr1, err := m1.AsyncFloat64().Counter("ctr1") assert.NoError(t, err) - err = m1.RegisterCallback([]instrument.Asynchronous{ctr1}, func(ctx context.Context) { + _, err = m1.RegisterCallback([]instrument.Asynchronous{ctr1}, func(ctx context.Context) { ctr1.Observe(ctx, 5) }) assert.NoError(t, err) @@ -426,7 +478,7 @@ func TestMetersProvideScope(t *testing.T) { m2 := mp.Meter("scope2") ctr2, err := m2.AsyncInt64().Counter("ctr2") assert.NoError(t, err) - err = m1.RegisterCallback([]instrument.Asynchronous{ctr2}, func(ctx context.Context) { + _, err = m1.RegisterCallback([]instrument.Asynchronous{ctr2}, func(ctx context.Context) { ctr2.Observe(ctx, 7) }) assert.NoError(t, err) @@ -480,6 +532,53 @@ func TestMetersProvideScope(t *testing.T) { metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) } +func TestUnregisterUnregisters(t *testing.T) { + r := NewManualReader() + mp := NewMeterProvider(WithReader(r)) + m := mp.Meter("TestUnregisterUnregisters") + + int64Counter, err := m.AsyncInt64().Counter("int64.counter") + require.NoError(t, err) + + int64UpDownCounter, err := m.AsyncInt64().UpDownCounter("int64.up_down_counter") + require.NoError(t, err) + + int64Gauge, err := m.AsyncInt64().Gauge("int64.gauge") + require.NoError(t, err) + + floag64Counter, err := m.AsyncFloat64().Counter("floag64.counter") + require.NoError(t, err) + + floag64UpDownCounter, err := m.AsyncFloat64().UpDownCounter("floag64.up_down_counter") + require.NoError(t, err) + + floag64Gauge, err := m.AsyncFloat64().Gauge("floag64.gauge") + require.NoError(t, err) + + var called bool + reg, err := m.RegisterCallback([]instrument.Asynchronous{ + int64Counter, + int64UpDownCounter, + int64Gauge, + floag64Counter, + floag64UpDownCounter, + floag64Gauge, + }, func(context.Context) { called = true }) + require.NoError(t, err) + + ctx := context.Background() + _, err = r.Collect(ctx) + require.NoError(t, err) + assert.True(t, called, "callback not called for registered callback") + + called = false + require.NoError(t, reg.Unregister(), "unregister") + + _, err = r.Collect(ctx) + require.NoError(t, err) + assert.False(t, called, "callback called for unregistered callback") +} + func TestRegisterCallbackDropAggregations(t *testing.T) { aggFn := func(InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} @@ -507,14 +606,15 @@ func TestRegisterCallbackDropAggregations(t *testing.T) { require.NoError(t, err) var called bool - require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{ + _, err = m.RegisterCallback([]instrument.Asynchronous{ int64Counter, int64UpDownCounter, int64Gauge, floag64Counter, floag64UpDownCounter, floag64Gauge, - }, func(context.Context) { called = true })) + }, func(context.Context) { called = true }) + require.NoError(t, err) data, err := r.Collect(context.Background()) require.NoError(t, err) @@ -538,10 +638,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "afcounter", @@ -564,10 +665,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "afupdowncounter", @@ -590,10 +692,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "afgauge", @@ -614,10 +717,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "aicounter", @@ -640,10 +744,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "aiupdowncounter", @@ -666,10 +771,11 @@ func TestAttributeFilter(t *testing.T) { if err != nil { return err } - return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { + _, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) { ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1)) ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2)) }) + return err }, wantMetric: metricdata.Metrics{ Name: "aigauge", diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index bc6901e5775..f9938bf617f 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -15,6 +15,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( + "container/list" "context" "errors" "fmt" @@ -22,6 +23,7 @@ import ( "sync" "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/unit" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregation" @@ -75,7 +77,7 @@ type pipeline struct { sync.Mutex aggregations map[instrumentation.Scope][]instrumentSync - callbacks []func(context.Context) + callbacks list.List } // addSync adds the instrumentSync to pipeline p with scope. This method is not @@ -94,10 +96,15 @@ func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) { } // addCallback registers a callback to be run when `produce()` is called. -func (p *pipeline) addCallback(callback func(context.Context)) { +func (p *pipeline) addCallback(c callback) (unregister func()) { p.Lock() defer p.Unlock() - p.callbacks = append(p.callbacks, callback) + e := p.callbacks.PushBack(c) + return func() { + p.Lock() + p.callbacks.Remove(e) + p.Unlock() + } } // callbackKey is a context key type used to identify context that came from the SDK. @@ -112,14 +119,15 @@ const produceKey callbackKey = 0 // // This method is safe to call concurrently. func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, error) { + ctx = context.WithValue(ctx, produceKey, struct{}{}) + p.Lock() defer p.Unlock() - ctx = context.WithValue(ctx, produceKey, struct{}{}) - - for _, callback := range p.callbacks { + for e := p.callbacks.Front(); e != nil; e = e.Next() { // TODO make the callbacks parallel. ( #3034 ) - callback(ctx) + f := e.Value.(callback) + f(ctx) if err := ctx.Err(); err != nil { // This means the context expired before we finished running callbacks. return metricdata.ResourceMetrics{}, err @@ -439,10 +447,21 @@ func newPipelines(res *resource.Resource, readers []Reader, views []View) pipeli return pipes } -func (p pipelines) registerCallback(fn func(context.Context)) { - for _, pipe := range p { - pipe.addCallback(fn) +func (p pipelines) registerCallback(c callback) metric.Registration { + unregs := make([]func(), len(p)) + for i, pipe := range p { + unregs[i] = pipe.addCallback(c) + } + return unregisterFuncs(unregs) +} + +type unregisterFuncs []func() + +func (u unregisterFuncs) Unregister() error { + for _, f := range u { + f() } + return nil } // resolver facilitates resolving Aggregators an instrument needs to aggregate