Skip to content

Commit

Permalink
Adds async instruments and providers. (#3084)
Browse files Browse the repository at this point in the history
* Adds instrument providers and instruments.

* Don't return nil instrument, return with error

* removed sync

* Added a number of tests.

Signed-off-by: GitHub <noreply@github.com>

* Address PR comments

* fix error messages

* fixes typo in test name

Signed-off-by: GitHub <noreply@github.com>

* Fix lint issues

* moved the testCallback into the TestMeterCreateInstrument

Signed-off-by: GitHub <noreply@github.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
MadVikingGod and MrAlias authored Aug 25, 2022
1 parent 5c9ff25 commit 2eec66f
Show file tree
Hide file tree
Showing 6 changed files with 540 additions and 21 deletions.
55 changes: 55 additions & 0 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
"go.opentelemetry.io/otel/sdk/metric/internal"
)

type instrumentImpl[N int64 | float64] struct {
instrument.Asynchronous

aggregators []internal.Aggregator[N]
}

var _ asyncfloat64.Counter = &instrumentImpl[float64]{}
var _ asyncfloat64.UpDownCounter = &instrumentImpl[float64]{}
var _ asyncfloat64.Gauge = &instrumentImpl[float64]{}
var _ asyncint64.Counter = &instrumentImpl[int64]{}
var _ asyncint64.UpDownCounter = &instrumentImpl[int64]{}
var _ asyncint64.Gauge = &instrumentImpl[int64]{}

func (i *instrumentImpl[N]) Observe(ctx context.Context, val N, attrs ...attribute.KeyValue) {
// Only record a value if this is being called from the MetricProvider.
_, ok := ctx.Value(produceKey).(struct{})
if !ok {
return
}
if err := ctx.Err(); err != nil {
return
}
for _, agg := range i.aggregators {
agg.Aggregate(val, attribute.NewSet(attrs...))
}
}
151 changes: 151 additions & 0 deletions sdk/metric/instrument_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.18
// +build go1.18

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"fmt"

"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/view"
)

type asyncInt64Provider struct {
scope instrumentation.Scope
registry *pipelineRegistry[int64]
}

var _ asyncint64.InstrumentProvider = asyncInt64Provider{}

// Counter creates an instrument for recording increasing values.
func (p asyncInt64Provider) Counter(name string, opts ...instrument.Option) (asyncint64.Counter, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := p.registry.createAggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Kind: view.AsyncCounter,
}, cfg.Unit())
if len(aggs) == 0 && err != nil {
err = fmt.Errorf("instrument does not match any view: %w", err)
}

return &instrumentImpl[int64]{
aggregators: aggs,
}, err
}

// UpDownCounter creates an instrument for recording changes of a value.
func (p asyncInt64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncint64.UpDownCounter, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := p.registry.createAggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Kind: view.AsyncUpDownCounter,
}, cfg.Unit())
if len(aggs) == 0 && err != nil {
err = fmt.Errorf("instrument does not match any view: %w", err)
}
return &instrumentImpl[int64]{
aggregators: aggs,
}, err
}

// Gauge creates an instrument for recording the current value.
func (p asyncInt64Provider) Gauge(name string, opts ...instrument.Option) (asyncint64.Gauge, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := p.registry.createAggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Kind: view.AsyncGauge,
}, cfg.Unit())
if len(aggs) == 0 && err != nil {
err = fmt.Errorf("instrument does not match any view: %w", err)
}
return &instrumentImpl[int64]{
aggregators: aggs,
}, err
}

type asyncFloat64Provider struct {
scope instrumentation.Scope
registry *pipelineRegistry[float64]
}

var _ asyncfloat64.InstrumentProvider = asyncFloat64Provider{}

// Counter creates an instrument for recording increasing values.
func (p asyncFloat64Provider) Counter(name string, opts ...instrument.Option) (asyncfloat64.Counter, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := p.registry.createAggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Kind: view.AsyncCounter,
}, cfg.Unit())
if len(aggs) == 0 && err != nil {
err = fmt.Errorf("instrument does not match any view: %w", err)
}
return &instrumentImpl[float64]{
aggregators: aggs,
}, err
}

// UpDownCounter creates an instrument for recording changes of a value.
func (p asyncFloat64Provider) UpDownCounter(name string, opts ...instrument.Option) (asyncfloat64.UpDownCounter, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := p.registry.createAggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Kind: view.AsyncUpDownCounter,
}, cfg.Unit())
if len(aggs) == 0 && err != nil {
err = fmt.Errorf("instrument does not match any view: %w", err)
}
return &instrumentImpl[float64]{
aggregators: aggs,
}, err
}

// Gauge creates an instrument for recording the current value.
func (p asyncFloat64Provider) Gauge(name string, opts ...instrument.Option) (asyncfloat64.Gauge, error) {
cfg := instrument.NewConfig(opts...)

aggs, err := p.registry.createAggregators(view.Instrument{
Scope: p.scope,
Name: name,
Description: cfg.Description(),
Kind: view.AsyncGauge,
}, cfg.Unit())
if len(aggs) == 0 && err != nil {
err = fmt.Errorf("instrument does not match any view: %w", err)
}
return &instrumentImpl[float64]{
aggregators: aggs,
}, err
}
27 changes: 19 additions & 8 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type meterRegistry struct {
sync.Mutex

meters map[instrumentation.Scope]*meter

intRegistry *pipelineRegistry[int64]
floatRegistry *pipelineRegistry[float64]
}

// Get returns a registered meter matching the instrumentation scope if it
Expand All @@ -56,7 +59,11 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
defer r.Unlock()

if r.meters == nil {
m := &meter{Scope: s}
m := &meter{
Scope: s,
intRegistry: r.intRegistry,
floatRegistry: r.floatRegistry,
}
r.meters = map[instrumentation.Scope]*meter{s: m}
return m
}
Expand All @@ -66,7 +73,11 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
return m
}

m = &meter{Scope: s}
m = &meter{
Scope: s,
intRegistry: r.intRegistry,
floatRegistry: r.floatRegistry,
}
r.meters[s] = m
return m
}
Expand All @@ -93,28 +104,28 @@ func (r *meterRegistry) Range(f func(*meter) bool) {
type meter struct {
instrumentation.Scope

// TODO (#2815, 2814): implement.
intRegistry *pipelineRegistry[int64]
floatRegistry *pipelineRegistry[float64]
}

// Compile-time check meter implements metric.Meter.
var _ metric.Meter = (*meter)(nil)

// AsyncInt64 returns the asynchronous integer instrument provider.
func (m *meter) AsyncInt64() asyncint64.InstrumentProvider {
// TODO (#2815): implement.
return nil
return asyncInt64Provider{scope: m.Scope, registry: m.intRegistry}
}

// AsyncFloat64 returns the asynchronous floating-point instrument provider.
func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
// TODO (#2815): implement.
return nil
return asyncFloat64Provider{scope: m.Scope, registry: m.floatRegistry}
}

// RegisterCallback registers the function f to be called when any of the
// insts Collect method is called.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error {
// TODO (#2815): implement.
// Because the pipelines are shared only one of the registries needs to be invoked
m.intRegistry.registerCallback(f)
return nil
}

Expand Down
Loading

0 comments on commit 2eec66f

Please sign in to comment.