Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesign RegisterCallback API #3584

Merged
merged 34 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
14a7c30
Update RegisterCallback and Callback decls
MrAlias Jan 10, 2023
4a0d32f
Update global impl
MrAlias Jan 10, 2023
7110c78
Update noop impl
MrAlias Jan 10, 2023
e87ea5f
Update SDK impl
MrAlias Jan 11, 2023
68d8224
Fix prometheus example
MrAlias Jan 11, 2023
8c88dea
Fix metric API example_test
MrAlias Jan 11, 2023
93a5d85
Remove unused registerabler
MrAlias Jan 11, 2023
0a23626
Rename ObservationRecorder to MultiObserver
MrAlias Jan 11, 2023
d0947fb
Update Callback documentation about MultiObserver
MrAlias Jan 11, 2023
053e7d1
Add changes to changelog
MrAlias Jan 11, 2023
ac2911e
Comment observer impl method
MrAlias Jan 11, 2023
2dc0046
Fix comment type ref
MrAlias Jan 11, 2023
b5fe5f4
Fix err msg for registerable
MrAlias Jan 11, 2023
2a4a70e
Remove ctx eval in observer observe
MrAlias Jan 11, 2023
65ab9df
Don't pass unneeded ctx to observer observe
MrAlias Jan 11, 2023
63e9ad0
Revert restructure to RegisterCallback
MrAlias Jan 11, 2023
04e2020
Remove erroneous bug
MrAlias Jan 11, 2023
d5c5478
Test RegisterCallback for invalid obsrvs
MrAlias Jan 11, 2023
ee01dcd
Test callbacks from foreign sources not collected
MrAlias Jan 11, 2023
a50169f
Merge branch 'main' into multi-cback-reg
hanyuancheung Jan 12, 2023
9ca6b1a
Rename Float64 and Int64 methods of MultiObserver
MrAlias Jan 12, 2023
04d2712
Rename MultiObserver to Observer
MrAlias Jan 12, 2023
355064f
Fix method name in changelog
MrAlias Jan 12, 2023
a562252
Merge branch 'main' into multi-cback-reg
MrAlias Jan 13, 2023
fa2a29e
Support registering delegating insts
MrAlias Jan 17, 2023
44b8e5f
Merge branch 'main' into multi-cback-reg
MrAlias Jan 17, 2023
40bdf13
Add TestGlobalInstRegisterCallback
MrAlias Jan 17, 2023
bf4c156
Change Observe err msg
MrAlias Jan 17, 2023
f798ed8
Unexport Unwrapper from metric global
MrAlias Jan 17, 2023
5bf6707
Clarify callbackObserver doc
MrAlias Jan 17, 2023
8142b9e
Move TestGlobalInstRegisterCallback
MrAlias Jan 17, 2023
27f50ec
Fix observation of global inst
MrAlias Jan 18, 2023
e25195d
Update err msgs
MrAlias Jan 18, 2023
eec9b94
Assert collection in TestGlobalInstRegisterCallback
MrAlias Jan 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
Instead it uses the `net.sock.peer` attributes. (#3581)
- The parameters for the `RegisterCallback` method of the `Meter` from `go.opentelemetry.io/otel/metric` are changed.
The slice of `instrument.Asynchronous` parameter is now passed as a variadic argument. (#3587)
- The `Callback` in `go.opentelemetry.io/otel/metric` has the added `Observer` parameter added.
This new parameter is used by `Callback` implementations to observe values for asynchronous instruments instead of calling the `Observe` method of the instrument directly. (#3584)

### Fixed

- The `RegisterCallback` method of the `Meter` from `go.opentelemetry.io/otel/sdk/metric` only registers a callback for instruments created by that meter.
Trying to register a callback with instruments from a different meter will result in an error being returned. (#3584)

### Deprecated

Expand Down
5 changes: 3 additions & 2 deletions example/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/prometheus"
api "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/sdk/metric"
)
Expand Down Expand Up @@ -68,9 +69,9 @@ func main() {
if err != nil {
log.Fatal(err)
}
_, err = meter.RegisterCallback(func(ctx context.Context) error {
_, err = meter.RegisterCallback(func(_ context.Context, o api.Observer) error {
n := -10. + rand.Float64()*(90.) // [-10, 100)
gauge.Observe(ctx, n, attrs...)
o.ObserveFloat64(gauge, n, attrs...)
return nil
}, gauge)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions metric/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ func ExampleMeter_asynchronous_multiple() {
gcPause, _ := meter.Float64Histogram("gcPause")

_, err := meter.RegisterCallback(
func(ctx context.Context) error {
func(ctx context.Context, o metric.Observer) error {
memStats := &runtime.MemStats{}
// This call does work
runtime.ReadMemStats(memStats)

heapAlloc.Observe(ctx, int64(memStats.HeapAlloc))
gcCount.Observe(ctx, int64(memStats.NumGC))
o.ObserveInt64(heapAlloc, int64(memStats.HeapAlloc))
o.ObserveInt64(gcCount, int64(memStats.NumGC))

// This function synchronously records the pauses
computeGCPauses(ctx, gcPause, memStats.PauseNs[:])
Expand Down
47 changes: 41 additions & 6 deletions metric/internal/global/instruments.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
"go.opentelemetry.io/otel/metric/instrument"
)

// unwrapper unwraps to return the underlying instrument implementation.
type unwrapper interface {
Unwrap() instrument.Asynchronous
}

type afCounter struct {
name string
opts []instrument.Float64ObserverOption
Expand All @@ -33,6 +38,9 @@ type afCounter struct {
instrument.Asynchronous
}

var _ unwrapper = (*afCounter)(nil)
var _ instrument.Float64ObservableCounter = (*afCounter)(nil)

func (i *afCounter) setDelegate(m metric.Meter) {
ctr, err := m.Float64ObservableCounter(i.name, i.opts...)
if err != nil {
Expand All @@ -48,7 +56,7 @@ func (i *afCounter) Observe(ctx context.Context, x float64, attrs ...attribute.K
}
}

func (i *afCounter) unwrap() instrument.Asynchronous {
func (i *afCounter) Unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(instrument.Float64ObservableCounter)
}
Expand All @@ -64,6 +72,9 @@ type afUpDownCounter struct {
instrument.Asynchronous
}

var _ unwrapper = (*afUpDownCounter)(nil)
var _ instrument.Float64ObservableUpDownCounter = (*afUpDownCounter)(nil)

func (i *afUpDownCounter) setDelegate(m metric.Meter) {
ctr, err := m.Float64ObservableUpDownCounter(i.name, i.opts...)
if err != nil {
Expand All @@ -79,7 +90,7 @@ func (i *afUpDownCounter) Observe(ctx context.Context, x float64, attrs ...attri
}
}

func (i *afUpDownCounter) unwrap() instrument.Asynchronous {
func (i *afUpDownCounter) Unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(instrument.Float64ObservableUpDownCounter)
}
Expand All @@ -104,13 +115,16 @@ func (i *afGauge) setDelegate(m metric.Meter) {
i.delegate.Store(ctr)
}

var _ unwrapper = (*afGauge)(nil)
var _ instrument.Float64ObservableGauge = (*afGauge)(nil)

func (i *afGauge) Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(instrument.Float64ObservableGauge).Observe(ctx, x, attrs...)
}
}

func (i *afGauge) unwrap() instrument.Asynchronous {
func (i *afGauge) Unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(instrument.Float64ObservableGauge)
}
Expand All @@ -126,6 +140,9 @@ type aiCounter struct {
instrument.Asynchronous
}

var _ unwrapper = (*aiCounter)(nil)
var _ instrument.Int64ObservableCounter = (*aiCounter)(nil)

func (i *aiCounter) setDelegate(m metric.Meter) {
ctr, err := m.Int64ObservableCounter(i.name, i.opts...)
if err != nil {
Expand All @@ -141,7 +158,7 @@ func (i *aiCounter) Observe(ctx context.Context, x int64, attrs ...attribute.Key
}
}

func (i *aiCounter) unwrap() instrument.Asynchronous {
func (i *aiCounter) Unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(instrument.Int64ObservableCounter)
}
Expand All @@ -157,6 +174,9 @@ type aiUpDownCounter struct {
instrument.Asynchronous
}

var _ unwrapper = (*aiUpDownCounter)(nil)
var _ instrument.Int64ObservableUpDownCounter = (*aiUpDownCounter)(nil)

func (i *aiUpDownCounter) setDelegate(m metric.Meter) {
ctr, err := m.Int64ObservableUpDownCounter(i.name, i.opts...)
if err != nil {
Expand All @@ -172,7 +192,7 @@ func (i *aiUpDownCounter) Observe(ctx context.Context, x int64, attrs ...attribu
}
}

func (i *aiUpDownCounter) unwrap() instrument.Asynchronous {
func (i *aiUpDownCounter) Unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(instrument.Int64ObservableUpDownCounter)
}
Expand All @@ -188,6 +208,9 @@ type aiGauge struct {
instrument.Asynchronous
}

var _ unwrapper = (*aiGauge)(nil)
var _ instrument.Int64ObservableGauge = (*aiGauge)(nil)

func (i *aiGauge) setDelegate(m metric.Meter) {
ctr, err := m.Int64ObservableGauge(i.name, i.opts...)
if err != nil {
Expand All @@ -203,7 +226,7 @@ func (i *aiGauge) Observe(ctx context.Context, x int64, attrs ...attribute.KeyVa
}
}

func (i *aiGauge) unwrap() instrument.Asynchronous {
func (i *aiGauge) Unwrap() instrument.Asynchronous {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(instrument.Int64ObservableGauge)
}
Expand All @@ -220,6 +243,8 @@ type sfCounter struct {
instrument.Synchronous
}

var _ instrument.Float64Counter = (*sfCounter)(nil)

func (i *sfCounter) setDelegate(m metric.Meter) {
ctr, err := m.Float64Counter(i.name, i.opts...)
if err != nil {
Expand All @@ -244,6 +269,8 @@ type sfUpDownCounter struct {
instrument.Synchronous
}

var _ instrument.Float64UpDownCounter = (*sfUpDownCounter)(nil)

func (i *sfUpDownCounter) setDelegate(m metric.Meter) {
ctr, err := m.Float64UpDownCounter(i.name, i.opts...)
if err != nil {
Expand All @@ -268,6 +295,8 @@ type sfHistogram struct {
instrument.Synchronous
}

var _ instrument.Float64Histogram = (*sfHistogram)(nil)

func (i *sfHistogram) setDelegate(m metric.Meter) {
ctr, err := m.Float64Histogram(i.name, i.opts...)
if err != nil {
Expand All @@ -292,6 +321,8 @@ type siCounter struct {
instrument.Synchronous
}

var _ instrument.Int64Counter = (*siCounter)(nil)

func (i *siCounter) setDelegate(m metric.Meter) {
ctr, err := m.Int64Counter(i.name, i.opts...)
if err != nil {
Expand All @@ -316,6 +347,8 @@ type siUpDownCounter struct {
instrument.Synchronous
}

var _ instrument.Int64UpDownCounter = (*siUpDownCounter)(nil)

func (i *siUpDownCounter) setDelegate(m metric.Meter) {
ctr, err := m.Int64UpDownCounter(i.name, i.opts...)
if err != nil {
Expand All @@ -340,6 +373,8 @@ type siHistogram struct {
instrument.Synchronous
}

var _ instrument.Int64Histogram = (*siHistogram)(nil)

func (i *siHistogram) setDelegate(m metric.Meter) {
ctr, err := m.Int64Histogram(i.name, i.opts...)
if err != nil {
Expand Down
16 changes: 10 additions & 6 deletions metric/internal/global/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func TestMeterProviderRace(t *testing.T) {
close(finish)
}

var zeroCallback metric.Callback = func(ctx context.Context, or metric.Observer) error {
return nil
}

func TestMeterRace(t *testing.T) {
mtr := &meter{}

Expand All @@ -66,7 +70,7 @@ func TestMeterRace(t *testing.T) {
_, _ = mtr.Int64Counter(name)
_, _ = mtr.Int64UpDownCounter(name)
_, _ = mtr.Int64Histogram(name)
_, _ = mtr.RegisterCallback(func(ctx context.Context) error { return nil })
_, _ = mtr.RegisterCallback(zeroCallback)
if !once {
wg.Done()
once = true
Expand All @@ -86,7 +90,7 @@ func TestMeterRace(t *testing.T) {

func TestUnregisterRace(t *testing.T) {
mtr := &meter{}
reg, err := mtr.RegisterCallback(func(ctx context.Context) error { return nil })
reg, err := mtr.RegisterCallback(zeroCallback)
require.NoError(t, err)

wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -128,8 +132,8 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (instrument.Float
_, err = m.Int64ObservableGauge("test_Async_Gauge")
assert.NoError(t, err)

_, err = m.RegisterCallback(func(ctx context.Context) error {
afcounter.Observe(ctx, 3)
_, err = m.RegisterCallback(func(ctx context.Context, obs metric.Observer) error {
obs.ObserveFloat64(afcounter, 3)
return nil
}, afcounter)
require.NoError(t, err)
Expand Down Expand Up @@ -323,7 +327,7 @@ func TestRegistrationDelegation(t *testing.T) {
require.NoError(t, err)

var called0 bool
reg0, err := m.RegisterCallback(func(context.Context) error {
reg0, err := m.RegisterCallback(func(context.Context, metric.Observer) error {
called0 = true
return nil
}, actr)
Expand All @@ -334,7 +338,7 @@ func TestRegistrationDelegation(t *testing.T) {
assert.Equal(t, 0, mImpl.registry.Len(), "callback not unregistered")

var called1 bool
reg1, err := m.RegisterCallback(func(context.Context) error {
reg1, err := m.RegisterCallback(func(context.Context, metric.Observer) error {
called1 = true
return nil
}, actr)
Expand Down
16 changes: 15 additions & 1 deletion metric/internal/global/meter_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package global // import "go.opentelemetry.io/otel/metric/internal/global"
import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
)
Expand Down Expand Up @@ -136,11 +137,24 @@ func (r testReg) Unregister() error {
// This enables async collection.
func (m *testMeter) collect() {
ctx := context.Background()
o := observationRecorder{ctx}
for _, f := range m.callbacks {
if f == nil {
// Unregister.
continue
}
_ = f(ctx)
_ = f(ctx, o)
}
}

type observationRecorder struct {
ctx context.Context
}

func (o observationRecorder) ObserveFloat64(i instrument.Float64Observer, value float64, attr ...attribute.KeyValue) {
i.Observe(o.ctx, value, attr...)
}

func (o observationRecorder) ObserveInt64(i instrument.Int64Observer, value int64, attr ...attribute.KeyValue) {
i.Observe(o.ctx, value, attr...)
}
14 changes: 12 additions & 2 deletions metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package metric // import "go.opentelemetry.io/otel/metric"
import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
)

Expand Down Expand Up @@ -106,7 +107,8 @@ type Meter interface {
}

// Callback is a function registered with a Meter that makes observations for
// the set of instruments it is registered with.
// the set of instruments it is registered with. The Observer parameter is used
// to record measurment observations for these instruments.
//
// The function needs to complete in a finite amount of time and the deadline
// of the passed context is expected to be honored.
Expand All @@ -116,7 +118,15 @@ type Meter interface {
// the same attributes as another Callback will report.
//
// The function needs to be concurrent safe.
type Callback func(context.Context) error
type Callback func(context.Context, Observer) error

// Observer records measurements for multiple instruments in a Callback.
type Observer interface {
// ObserveFloat64 records the float64 value with attributes for obsrv.
ObserveFloat64(obsrv instrument.Float64Observer, value float64, attributes ...attribute.KeyValue)
// ObserveInt64 records the int64 value with attributes for obsrv.
ObserveInt64(obsrv instrument.Int64Observer, value int64, attributes ...attribute.KeyValue)
}

// Registration is an token representing the unique registration of a callback
// for a set of instruments with a Meter.
Expand Down
Loading