Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unique instrument checking #580

Merged
merged 16 commits into from
Mar 24, 2020
2 changes: 1 addition & 1 deletion api/global/internal/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newFixture(b *testing.B) *benchFixture {
B: b,
}
bf.sdk = sdk.New(bf, sdk.NewDefaultLabelEncoder())
bf.meter = metric.WrapMeterImpl(bf.sdk)
bf.meter = metric.WrapMeterImpl(bf.sdk, "test")
return bf
}

Expand Down
86 changes: 69 additions & 17 deletions api/global/internal/meter.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2020, OpenTelemetry Authors
jmacd marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
Expand All @@ -8,6 +22,7 @@ import (

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/metric/registry"
)

// This file contains the forwarding implementation of metric.Provider
Expand All @@ -30,12 +45,15 @@ import (
// Bound instrument operations are implemented by delegating to the
// instrument after it is registered, with a sync.Once initializer to
// protect against races with Release().
//
// Metric uniqueness checking is implemented by calling the exported
// methods of the api/metric/registry package.

type meterProvider struct {
delegate metric.Provider

lock sync.Mutex
meters []*meter
meters map[string]*meter
}

type meter struct {
Expand All @@ -45,8 +63,9 @@ type meter struct {
name string

lock sync.Mutex
registry map[string]metric.InstrumentImpl
syncInsts []*syncImpl
asyncInsts []*obsImpl
asyncInsts []*asyncImpl
}

type instrument struct {
Expand All @@ -61,7 +80,7 @@ type syncImpl struct {
constructor func(metric.Meter) (metric.SyncImpl, error)
}

type obsImpl struct {
type asyncImpl struct {
jmacd marked this conversation as resolved.
Show resolved Hide resolved
delegate unsafe.Pointer // (*metric.AsyncImpl)

instrument
Expand Down Expand Up @@ -105,14 +124,20 @@ var _ metric.LabelSet = &labelSet{}
var _ metric.LabelSetDelegate = &labelSet{}
var _ metric.InstrumentImpl = &syncImpl{}
var _ metric.BoundSyncImpl = &syncHandle{}
var _ metric.AsyncImpl = &obsImpl{}
var _ metric.AsyncImpl = &asyncImpl{}

func (inst *instrument) Descriptor() metric.Descriptor {
return inst.descriptor
}

// Provider interface and delegation

func newMeterProvider() *meterProvider {
return &meterProvider{
meters: map[string]*meter{},
}
}

func (p *meterProvider) setDelegate(provider metric.Provider) {
p.lock.Lock()
defer p.lock.Unlock()
Expand All @@ -132,11 +157,18 @@ func (p *meterProvider) Meter(name string) metric.Meter {
return p.delegate.Meter(name)
}

if exm, ok := p.meters[name]; ok {
return exm
}

m := &meter{
provider: p,
name: name,
provider: p,
name: name,
registry: map[string]metric.InstrumentImpl{},
syncInsts: []*syncImpl{},
asyncInsts: []*asyncImpl{},
}
p.meters = append(p.meters, m)
p.meters[name] = m
return m
}

Expand Down Expand Up @@ -168,13 +200,21 @@ func (m *meter) newSync(desc metric.Descriptor, constructor func(metric.Meter) (
return constructor(*meterPtr)
}

if ex, ok := m.registry[desc.Name()]; ok {
if !registry.Compatible(desc, ex.Descriptor()) {
return nil, registry.NewMetricKindMismatchError(ex.Descriptor())
}
return ex.(metric.SyncImpl), nil
}

inst := &syncImpl{
instrument: instrument{
descriptor: desc,
},
constructor: constructor,
}
m.syncInsts = append(m.syncInsts, inst)
m.registry[desc.Name()] = inst
return inst, nil
}

Expand Down Expand Up @@ -246,17 +286,25 @@ func (m *meter) newAsync(desc metric.Descriptor, constructor func(metric.Meter)
return constructor(*meterPtr)
}

inst := &obsImpl{
if ex, ok := m.registry[desc.Name()]; ok {
if !registry.Compatible(desc, ex.Descriptor()) {
return nil, registry.NewMetricKindMismatchError(ex.Descriptor())
}
return ex, nil
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
}

inst := &asyncImpl{
instrument: instrument{
descriptor: desc,
},
constructor: constructor,
}
m.asyncInsts = append(m.asyncInsts, inst)
m.registry[desc.Name()] = inst
return inst, nil
}

func (obs *obsImpl) Implementation() interface{} {
func (obs *asyncImpl) Implementation() interface{} {
if implPtr := (*metric.AsyncImpl)(atomic.LoadPointer(&obs.delegate)); implPtr != nil {
return (*implPtr).Implementation()
}
Expand All @@ -273,7 +321,7 @@ func asyncCheck(has AsyncImpler, err error) (metric.AsyncImpl, error) {
return nil, err
}

func (obs *obsImpl) setDelegate(d metric.Meter) {
func (obs *asyncImpl) setDelegate(d metric.Meter) {
implPtr := new(metric.AsyncImpl)

var err error
Expand Down Expand Up @@ -360,49 +408,53 @@ func (labels *labelSet) Delegate() metric.LabelSet {

// Constructors

func (m *meter) withName(opts []metric.Option) []metric.Option {
return append(opts, metric.WithLibraryName(m.name))
}

func (m *meter) NewInt64Counter(name string, opts ...metric.Option) (metric.Int64Counter, error) {
return metric.WrapInt64CounterInstrument(m.newSync(
metric.NewDescriptor(name, metric.CounterKind, core.Int64NumberKind, opts...),
metric.NewDescriptor(name, metric.CounterKind, core.Int64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewInt64Counter(name, opts...))
}))
}

func (m *meter) NewFloat64Counter(name string, opts ...metric.Option) (metric.Float64Counter, error) {
return metric.WrapFloat64CounterInstrument(m.newSync(
metric.NewDescriptor(name, metric.CounterKind, core.Float64NumberKind, opts...),
metric.NewDescriptor(name, metric.CounterKind, core.Float64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewFloat64Counter(name, opts...))
}))
}

func (m *meter) NewInt64Measure(name string, opts ...metric.Option) (metric.Int64Measure, error) {
return metric.WrapInt64MeasureInstrument(m.newSync(
metric.NewDescriptor(name, metric.MeasureKind, core.Int64NumberKind, opts...),
metric.NewDescriptor(name, metric.MeasureKind, core.Int64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewInt64Measure(name, opts...))
}))
}

func (m *meter) NewFloat64Measure(name string, opts ...metric.Option) (metric.Float64Measure, error) {
return metric.WrapFloat64MeasureInstrument(m.newSync(
metric.NewDescriptor(name, metric.MeasureKind, core.Float64NumberKind, opts...),
metric.NewDescriptor(name, metric.MeasureKind, core.Float64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewFloat64Measure(name, opts...))
}))
}

func (m *meter) RegisterInt64Observer(name string, callback metric.Int64ObserverCallback, opts ...metric.Option) (metric.Int64Observer, error) {
return metric.WrapInt64ObserverInstrument(m.newAsync(
metric.NewDescriptor(name, metric.ObserverKind, core.Int64NumberKind, opts...),
metric.NewDescriptor(name, metric.ObserverKind, core.Int64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.AsyncImpl, error) {
return asyncCheck(other.RegisterInt64Observer(name, callback, opts...))
}))
}

func (m *meter) RegisterFloat64Observer(name string, callback metric.Float64ObserverCallback, opts ...metric.Option) (metric.Float64Observer, error) {
return metric.WrapFloat64ObserverInstrument(m.newAsync(
metric.NewDescriptor(name, metric.ObserverKind, core.Float64NumberKind, opts...),
metric.NewDescriptor(name, metric.ObserverKind, core.Float64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.AsyncImpl, error) {
return asyncCheck(other.RegisterFloat64Observer(name, callback, opts...))
}))
Expand All @@ -413,7 +465,7 @@ func AtomicFieldOffsets() map[string]uintptr {
"meterProvider.delegate": unsafe.Offsetof(meterProvider{}.delegate),
"meter.delegate": unsafe.Offsetof(meter{}.delegate),
"syncImpl.delegate": unsafe.Offsetof(syncImpl{}.delegate),
"obsImpl.delegate": unsafe.Offsetof(obsImpl{}.delegate),
"asyncImpl.delegate": unsafe.Offsetof(asyncImpl{}.delegate),
"labelSet.delegate": unsafe.Offsetof(labelSet{}.delegate),
"syncHandle.delegate": unsafe.Offsetof(syncHandle{}.delegate),
}
Expand Down
Loading