Skip to content

Commit

Permalink
Have multi-instrument callback return an error (#3576)
Browse files Browse the repository at this point in the history
* Have multi-inst callback return an error

* Update PR number in changelog entry

Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
  • Loading branch information
MrAlias and hanyuancheung authored Jan 8, 2023
1 parent 75a19d1 commit 82882df
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `InstrumentKindAsyncGauge` is renamed to `InstrumentKindObservableGauge`
- Update the `RegisterCallback` method of the `Meter` in the `go.opentelemetry.io/otel/sdk/metric` package to accept the added `Callback` type instead of an inline function type definition.
The underlying type of a `Callback` is the same `func(context.Context)` that the method used to accept. (#3564)
- The callback function registered with a `Meter` from the `go.opentelemetry.io/otel/metric` package is required to return an error now. (#3576)

### Deprecated

Expand Down
3 changes: 2 additions & 1 deletion example/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ func main() {
if err != nil {
log.Fatal(err)
}
_, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
_, err = meter.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) error {
n := -10. + rand.Float64()*(90.) // [-10, 100)
gauge.Observe(ctx, n, attrs...)
return nil
})
if err != nil {
log.Fatal(err)
Expand Down
6 changes: 4 additions & 2 deletions metric/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ func ExampleMeter_asynchronous_single() {
}

_, err = meter.RegisterCallback([]instrument.Asynchronous{memoryUsage},
func(ctx context.Context) {
func(ctx context.Context) error {
// instrument.WithCallbackFunc(func(ctx context.Context) {
//Do Work to get the real memoryUsage
// mem := GatherMemory(ctx)
mem := 75000

memoryUsage.Observe(ctx, int64(mem))
return nil
})
if err != nil {
fmt.Println("Failed to register callback")
Expand All @@ -90,7 +91,7 @@ func ExampleMeter_asynchronous_multiple() {
heapAlloc,
gcCount,
},
func(ctx context.Context) {
func(ctx context.Context) error {
memStats := &runtime.MemStats{}
// This call does work
runtime.ReadMemStats(memStats)
Expand All @@ -100,6 +101,7 @@ func ExampleMeter_asynchronous_multiple() {

// This function synchronously records the pauses
computeGCPauses(ctx, gcPause, memStats.PauseNs[:])
return nil
},
)

Expand Down
3 changes: 1 addition & 2 deletions metric/internal/global/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package global // import "go.opentelemetry.io/otel/metric/internal/global"

import (
"container/list"
"context"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -323,7 +322,7 @@ func unwrapInstruments(instruments []instrument.Asynchronous) []instrument.Async

type registration struct {
instruments []instrument.Asynchronous
function func(context.Context)
function metric.Callback

unreg func() error
unregMu sync.Mutex
Expand Down
13 changes: 8 additions & 5 deletions metric/internal/global/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestMeterRace(t *testing.T) {
_, _ = mtr.Int64Counter(name)
_, _ = mtr.Int64UpDownCounter(name)
_, _ = mtr.Int64Histogram(name)
_, _ = mtr.RegisterCallback(nil, func(ctx context.Context) {})
_, _ = mtr.RegisterCallback(nil, func(ctx context.Context) error { return nil })
if !once {
wg.Done()
once = true
Expand All @@ -88,7 +88,7 @@ func TestMeterRace(t *testing.T) {

func TestUnregisterRace(t *testing.T) {
mtr := &meter{}
reg, err := mtr.RegisterCallback(nil, func(ctx context.Context) {})
reg, err := mtr.RegisterCallback(nil, func(ctx context.Context) error { return nil })
require.NoError(t, err)

wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -130,8 +130,9 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Coun
_, err = m.Int64ObservableGauge("test_Async_Gauge")
assert.NoError(t, err)

_, err = m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) error {
afcounter.Observe(ctx, 3)
return nil
})
require.NoError(t, err)

Expand Down Expand Up @@ -324,8 +325,9 @@ func TestRegistrationDelegation(t *testing.T) {
require.NoError(t, err)

var called0 bool
reg0, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) {
reg0, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) error {
called0 = true
return nil
})
require.NoError(t, err)
require.Equal(t, 1, mImpl.registry.Len(), "callback not registered")
Expand All @@ -334,8 +336,9 @@ func TestRegistrationDelegation(t *testing.T) {
assert.Equal(t, 0, mImpl.registry.Len(), "callback not unregistered")

var called1 bool
reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) {
reg1, err := m.RegisterCallback([]instrument.Asynchronous{actr}, func(context.Context) error {
called1 = true
return nil
})
require.NoError(t, err)
require.Equal(t, 1, mImpl.registry.Len(), "second callback not registered")
Expand Down
4 changes: 2 additions & 2 deletions metric/internal/global/meter_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type testMeter struct {
siUDCount int
siHist int

callbacks []func(context.Context)
callbacks []metric.Callback
}

func (m *testMeter) Int64Counter(name string, options ...instrument.Int64Option) (syncint64.Counter, error) {
Expand Down Expand Up @@ -145,6 +145,6 @@ func (m *testMeter) collect() {
// Unregister.
continue
}
f(ctx)
_ = f(ctx)
}
}
2 changes: 1 addition & 1 deletion metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type Meter interface {
// the same attributes as another Callback will report.
//
// The function needs to be concurrent safe.
type Callback func(context.Context)
type Callback func(context.Context) error

// Registration is an token representing the unique registration of a callback
// for a set of instruments with a Meter.
Expand Down
64 changes: 43 additions & 21 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func TestMeterInstrumentConcurrency(t *testing.T) {
wg.Wait()
}

var emptyCallback metric.Callback = func(ctx context.Context) error { return nil }

// A Meter Should be able register Callbacks Concurrently.
func TestMeterCallbackCreationConcurrency(t *testing.T) {
wg := &sync.WaitGroup{}
Expand All @@ -103,19 +105,19 @@ func TestMeterCallbackCreationConcurrency(t *testing.T) {
m := NewMeterProvider().Meter("callback-concurrency")

go func() {
_, _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {})
_, _ = m.RegisterCallback([]instrument.Asynchronous{}, emptyCallback)
wg.Done()
}()
go func() {
_, _ = m.RegisterCallback([]instrument.Asynchronous{}, func(ctx context.Context) {})
_, _ = m.RegisterCallback([]instrument.Asynchronous{}, emptyCallback)
wg.Done()
}()
wg.Wait()
}

func TestNoopCallbackUnregisterConcurrency(t *testing.T) {
m := NewMeterProvider().Meter("noop-unregister-concurrency")
reg, err := m.RegisterCallback(nil, func(ctx context.Context) {})
reg, err := m.RegisterCallback(nil, emptyCallback)
require.NoError(t, err)

wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -143,11 +145,11 @@ func TestCallbackUnregisterConcurrency(t *testing.T) {
require.NoError(t, err)

i := []instrument.Asynchronous{actr}
regCtr, err := meter.RegisterCallback(i, func(ctx context.Context) {})
regCtr, err := meter.RegisterCallback(i, emptyCallback)
require.NoError(t, err)

i = []instrument.Asynchronous{ag}
regG, err := meter.RegisterCallback(i, func(ctx context.Context) {})
regG, err := meter.RegisterCallback(i, emptyCallback)
require.NoError(t, err)

wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -183,8 +185,9 @@ func TestMeterCreatesInstruments(t *testing.T) {
}
ctr, err := m.Int64ObservableCounter("aint", instrument.WithInt64Callback(cback))
assert.NoError(t, err)
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 3)
return nil
})
assert.NoError(t, err)

Expand Down Expand Up @@ -212,8 +215,9 @@ func TestMeterCreatesInstruments(t *testing.T) {
}
ctr, err := m.Int64ObservableUpDownCounter("aint", instrument.WithInt64Callback(cback))
assert.NoError(t, err)
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 11)
return nil
})
assert.NoError(t, err)

Expand Down Expand Up @@ -241,8 +245,9 @@ func TestMeterCreatesInstruments(t *testing.T) {
}
gauge, err := m.Int64ObservableGauge("agauge", instrument.WithInt64Callback(cback))
assert.NoError(t, err)
_, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) error {
gauge.Observe(ctx, 11)
return nil
})
assert.NoError(t, err)

Expand All @@ -268,8 +273,9 @@ func TestMeterCreatesInstruments(t *testing.T) {
}
ctr, err := m.Float64ObservableCounter("afloat", instrument.WithFloat64Callback(cback))
assert.NoError(t, err)
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 3)
return nil
})
assert.NoError(t, err)

Expand Down Expand Up @@ -297,8 +303,9 @@ func TestMeterCreatesInstruments(t *testing.T) {
}
ctr, err := m.Float64ObservableUpDownCounter("afloat", instrument.WithFloat64Callback(cback))
assert.NoError(t, err)
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 11)
return nil
})
assert.NoError(t, err)

Expand Down Expand Up @@ -326,8 +333,9 @@ func TestMeterCreatesInstruments(t *testing.T) {
}
gauge, err := m.Float64ObservableGauge("agauge", instrument.WithFloat64Callback(cback))
assert.NoError(t, err)
_, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) {
_, err = m.RegisterCallback([]instrument.Asynchronous{gauge}, func(ctx context.Context) error {
gauge.Observe(ctx, 11)
return nil
})
assert.NoError(t, err)

Expand Down Expand Up @@ -501,16 +509,18 @@ func TestMetersProvideScope(t *testing.T) {
m1 := mp.Meter("scope1")
ctr1, err := m1.Float64ObservableCounter("ctr1")
assert.NoError(t, err)
_, err = m1.RegisterCallback([]instrument.Asynchronous{ctr1}, func(ctx context.Context) {
_, err = m1.RegisterCallback([]instrument.Asynchronous{ctr1}, func(ctx context.Context) error {
ctr1.Observe(ctx, 5)
return nil
})
assert.NoError(t, err)

m2 := mp.Meter("scope2")
ctr2, err := m2.Int64ObservableCounter("ctr2")
assert.NoError(t, err)
_, err = m1.RegisterCallback([]instrument.Asynchronous{ctr2}, func(ctx context.Context) {
_, err = m1.RegisterCallback([]instrument.Asynchronous{ctr2}, func(ctx context.Context) error {
ctr2.Observe(ctx, 7)
return nil
})
assert.NoError(t, err)

Expand Down Expand Up @@ -594,7 +604,10 @@ func TestUnregisterUnregisters(t *testing.T) {
floag64Counter,
floag64UpDownCounter,
floag64Gauge,
}, func(context.Context) { called = true })
}, func(context.Context) error {
called = true
return nil
})
require.NoError(t, err)

ctx := context.Background()
Expand Down Expand Up @@ -644,7 +657,10 @@ func TestRegisterCallbackDropAggregations(t *testing.T) {
floag64Counter,
floag64UpDownCounter,
floag64Gauge,
}, func(context.Context) { called = true })
}, func(context.Context) error {
called = true
return nil
})
require.NoError(t, err)

data, err := r.Collect(context.Background())
Expand All @@ -669,9 +685,10 @@ func TestAttributeFilter(t *testing.T) {
if err != nil {
return err
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
})
return err
},
Expand All @@ -696,9 +713,10 @@ func TestAttributeFilter(t *testing.T) {
if err != nil {
return err
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
})
return err
},
Expand All @@ -723,9 +741,10 @@ func TestAttributeFilter(t *testing.T) {
if err != nil {
return err
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
})
return err
},
Expand All @@ -748,9 +767,10 @@ func TestAttributeFilter(t *testing.T) {
if err != nil {
return err
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
})
return err
},
Expand All @@ -775,9 +795,10 @@ func TestAttributeFilter(t *testing.T) {
if err != nil {
return err
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
})
return err
},
Expand All @@ -802,9 +823,10 @@ func TestAttributeFilter(t *testing.T) {
if err != nil {
return err
}
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
_, err = mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) error {
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
})
return err
},
Expand Down
4 changes: 3 additions & 1 deletion sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
// TODO make the callbacks parallel. ( #3034 )
f := e.Value.(metric.Callback)
f(ctx)
if err := f(ctx); err != nil {
errs.append(err)
}
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
return metricdata.ResourceMetrics{}, err
Expand Down
Loading

0 comments on commit 82882df

Please sign in to comment.