Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle duplicate Aggregators and log instrument conflicts #3251

Merged
merged 29 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
bc44d82
Add the cache type
MrAlias Sep 29, 2022
29ea02d
Add cache unit tests
MrAlias Sep 29, 2022
87324d8
Test cache concurrency
MrAlias Sep 29, 2022
010b999
Add the instrumentCache
MrAlias Sep 29, 2022
8889f32
Use the instrumentCache to deduplicate creation
MrAlias Sep 30, 2022
40ae7fa
Drop unique check from addAggregator
MrAlias Sep 30, 2022
c7d8d2f
Fix aggregatorCache* docs
MrAlias Sep 30, 2022
45ac559
Update cachedAggregator and aggregator method docs
MrAlias Sep 30, 2022
d0acf9b
Remove unnecessary type constraint
MrAlias Sep 30, 2022
0cec45a
Remove unused errAlreadyRegistered
MrAlias Sep 30, 2022
cae41cc
Rename to not shadow imports
MrAlias Sep 30, 2022
1514d9d
Add changes to changelog
MrAlias Sep 30, 2022
52e6508
Fix changelog English
MrAlias Sep 30, 2022
668d5d8
Merge branch 'main' into agg-cache
MrAlias Oct 2, 2022
ab1ee04
Merge branch 'main' into agg-cache
MrAlias Oct 3, 2022
c41328c
Store resolvers in the meter instead of caches
MrAlias Oct 3, 2022
1f83658
Test all Aggregator[N] impls are comparable
MrAlias Oct 3, 2022
71c19e6
Fix lint
MrAlias Oct 3, 2022
2bb7517
Add documentation that Aggregators need to be comparable
MrAlias Oct 4, 2022
edaf70a
Merge branch 'main' into agg-cache
MrAlias Oct 4, 2022
a453ab7
Merge branch 'main' into agg-cache
MrAlias Oct 4, 2022
cc91e8a
Update sdk/metric/internal/aggregator.go
MrAlias Oct 10, 2022
5832c56
Update sdk/metric/instrument.go
MrAlias Oct 10, 2022
7096a68
Update sdk/metric/instrument.go
MrAlias Oct 10, 2022
32f47bf
Update sdk/metric/internal/aggregator_test.go
MrAlias Oct 10, 2022
d90819f
Merge branch 'main' into agg-cache
MrAlias Oct 10, 2022
8b0ac23
Merge branch 'main' into agg-cache
MrAlias Oct 11, 2022
b876373
Fix pipeline_test.go use of newInstrumentCache
MrAlias Oct 11, 2022
b913d7a
Merge branch 'main' into agg-cache
MrAlias Oct 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Upgrade `golang.org/x/sys/unix` from `v0.0.0-20210423185535-09eb48e85fd7` to `v0.0.0-20220919091848-fb04ddd9f9c8`.
This addresses [GO-2022-0493](https://pkg.go.dev/vuln/GO-2022-0493). (#3235)

### Fixed

- Return the same instrument every time a user makes the exact same instrument creation call. (#3229, #3251)
- Return the existing instrument when a view transforms a creation call to match an existing instrument. (#3240, #3251)
- Log a warning when a conflicting instrument (e.g. description, unit, data-type) is created instead of returning an error. (#3251)

## [0.32.1] Metric SDK (Alpha) - 2022-09-22

### Changed
Expand Down
110 changes: 110 additions & 0 deletions sdk/metric/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"sync"

"go.opentelemetry.io/otel/sdk/metric/internal"
)

// cache is a locking storage used to quickly return already computed values.
//
// The zero value of a cache is empty and ready to use.
//
// A cache must not be copied after first use.
//
// All methods of a cache are safe to call concurrently.
type cache[K comparable, V any] struct {
sync.Mutex
data map[K]V
}

// Lookup returns the value stored in the cache with the accociated key if it
// exists. Otherwise, f is called and its returned value is set in the cache
// for key and returned.
//
// Lookup is safe to call concurrently. It will hold the cache lock, so f
// should not block excessively.
func (c *cache[K, V]) Lookup(key K, f func() V) V {
c.Lock()
defer c.Unlock()

if c.data == nil {
val := f()
c.data = map[K]V{key: val}
return val
}
if v, ok := c.data[key]; ok {
return v
}
val := f()
c.data[key] = val
return val
}

// instrumentCache is a cache of instruments. It is scoped at the Meter level
// along with a number type. Meaning all instruments it contains need to belong
// to the same instrumentation.Scope (implicitly) and number type (explicitly).
type instrumentCache[N int64 | float64] struct {
// aggregators is used to ensure duplicate creations of the same instrument
// return the same instance of that instrument's aggregator.
aggregators *cache[instrumentID, aggVal[N]]
// views is used to ensure if instruments with the same name are created,
// but do not have the same identifying properties, a warning is logged.
views *cache[string, instrumentID]
}

// newInstrumentCache returns a new instrumentCache that uses ac as the
// underlying cache for aggregators and vc as the cache for views. If ac or vc
// are nil, a new empty cache will be used.
func newInstrumentCache[N int64 | float64](ac *cache[instrumentID, aggVal[N]], vc *cache[string, instrumentID]) instrumentCache[N] {
if ac == nil {
ac = &cache[instrumentID, aggVal[N]]{}
}
if vc == nil {
vc = &cache[string, instrumentID]{}
}
return instrumentCache[N]{aggregators: ac, views: vc}
}

// LookupAggregator returns the Aggregator and error for a cached instrument if
// it exist in the cache. Otherwise, f is called and its returned value is set
// in the cache and returned.
//
// LookupAggregator is safe to call concurrently.
func (c instrumentCache[N]) LookupAggregator(id instrumentID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) {
v := c.aggregators.Lookup(id, func() aggVal[N] {
a, err := f()
return aggVal[N]{Aggregator: a, Err: err}
})
return v.Aggregator, v.Err
}

// aggVal is the cached value of an instrumentCache's aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}

// Unique returns if id is unique or a duplicate instrument. If an instrument
// with the same name has already been created, that instrumentID will be
// returned along with false. Otherwise, id is returned with true.
//
// Unique is safe to call concurrently.
func (c instrumentCache[N]) Unique(id instrumentID) (instrumentID, bool) {
got := c.views.Lookup(id.Name, func() instrumentID { return id })
return got, id == got
}
76 changes: 76 additions & 0 deletions sdk/metric/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCache(t *testing.T) {
k0, k1 := "one", "two"
v0, v1 := 1, 2

c := cache[string, int]{}

var got int
require.NotPanics(t, func() {
got = c.Lookup(k0, func() int { return v0 })
}, "zero-value cache panics on Lookup")
assert.Equal(t, v0, got, "zero-value cache did not return fallback")

assert.Equal(t, v0, c.Lookup(k0, func() int { return v1 }), "existing key")

assert.Equal(t, v1, c.Lookup(k1, func() int { return v1 }), "non-existing key")
}

func TestCacheConcurrency(t *testing.T) {
const (
key = "k"
goroutines = 10
timeoutSec = 5
)

c := cache[string, int]{}
var wg sync.WaitGroup
for n := 0; n < goroutines; n++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
assert.NotPanics(t, func() {
c.Lookup(key, func() int { return i })
})
}(n)
}

done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

assert.Eventually(t, func() bool {
select {
case <-done:
return true
default:
return false
}
}, timeoutSec*time.Second, 10*time.Millisecond)
}
23 changes: 23 additions & 0 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,32 @@ import (
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// instrumentID are the identifying properties of an instrument.
type instrumentID struct {
// Name is the name of the instrument.
Name string
// Description is the description of the instrument.
Description string
// Unit is the unit of the instrument.
Unit unit.Unit
// Aggregation is the aggregation data type of the instrument.
Aggregation string
// Monotonic is the monotonicity of an instruments data type. This field is
// not used for all data types, so a zero value needs to understood in the
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
// context of Aggregation.
Monotonic bool
// Temporality is the temporality of an instruments data type. This field
// is not used for all data types.
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
Temporality metricdata.Temporality
// Number is the number type of the instrument.
Number string
}

type instrumentImpl[N int64 | float64] struct {
instrument.Asynchronous
instrument.Synchronous
Expand Down
25 changes: 21 additions & 4 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ func (r *meterRegistry) Get(s instrumentation.Scope) *meter {
type meter struct {
instrumentation.Scope

// aggregatorCache* ensures no duplicate Aggregators are created for the
// same instrument within the scope of all instruments this meter owns.
//
// Duplicate instrument creations for different number types are identified
// in the viewCache. Since the conflict is "resolvable", a valid aggregator
// still needs to be returned when this occurs. Therefore, instruments of
// different numbers are not tracked with the same cache.
aggregatorCacheInt64 cache[instrumentID, aggVal[int64]]
aggregatorCacheFloat64 cache[instrumentID, aggVal[float64]]
// viewCache ensures instrument conflicts this meter is asked to create are
// logged to the user.
viewCache cache[string, instrumentID]
MrAlias marked this conversation as resolved.
Show resolved Hide resolved

pipes pipelines
}

Expand All @@ -91,12 +104,14 @@ var _ metric.Meter = (*meter)(nil)

// AsyncInt64 returns the asynchronous integer instrument provider.
func (m *meter) AsyncInt64() asyncint64.InstrumentProvider {
return asyncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
c := newInstrumentCache(&m.aggregatorCacheInt64, &m.viewCache)
return asyncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)}
}

// AsyncFloat64 returns the asynchronous floating-point instrument provider.
func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
return asyncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
c := newInstrumentCache(&m.aggregatorCacheFloat64, &m.viewCache)
return asyncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)}
}

// RegisterCallback registers the function f to be called when any of the
Expand All @@ -108,10 +123,12 @@ func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context

// SyncInt64 returns the synchronous integer instrument provider.
func (m *meter) SyncInt64() syncint64.InstrumentProvider {
return syncInt64Provider{scope: m.Scope, resolve: newResolver[int64](m.pipes)}
c := newInstrumentCache(&m.aggregatorCacheInt64, &m.viewCache)
return syncInt64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)}
}

// SyncFloat64 returns the synchronous floating-point instrument provider.
func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider {
return syncFloat64Provider{scope: m.Scope, resolve: newResolver[float64](m.pipes)}
c := newInstrumentCache(&m.aggregatorCacheFloat64, &m.viewCache)
return syncFloat64Provider{scope: m.Scope, resolve: newResolver(m.pipes, c)}
}
Loading