Skip to content

Commit

Permalink
Merge branch 'main' into unreg-async
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias authored Dec 15, 2022
2 parents c56c140 + 14a17b3 commit 0e2ed46
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 80 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520)

### Added

- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)

## [1.11.2/0.34.0] 2022-12-05

### Added
Expand Down
21 changes: 13 additions & 8 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,19 @@ func unify(funcs []func(context.Context) error) func(context.Context) error {
errs = append(errs, err)
}
}
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return fmt.Errorf("%v", errs)
}
return unifyErrors(errs)
}
}

// unifyErrors combines multiple errors into a single error.
func unifyErrors(errs []error) error {
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return fmt.Errorf("%v", errs)
}
}

Expand Down
16 changes: 9 additions & 7 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import (
)

type reader struct {
producer producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
producer sdkProducer
externalProducers []Producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}

var _ Reader = (*reader)(nil)
Expand All @@ -42,7 +43,8 @@ func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // n
return r.aggregationFunc(kind)
}

func (r *reader) register(p producer) { r.producer = p }
func (r *reader) register(p sdkProducer) { r.producer = p }
func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.externalProducers, p) }
func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.temporalityFunc(kind)
}
Expand Down
60 changes: 49 additions & 11 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ import (
// manualReader is a simple Reader that allows an application to
// read metrics on demand.
type manualReader struct {
producer atomic.Value
sdkProducer atomic.Value
shutdownOnce sync.Once

mu sync.Mutex
isShutdown bool
externalProducers atomic.Value

temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
}
Expand All @@ -41,22 +45,39 @@ var _ = map[Reader]struct{}{&manualReader{}: {}}
// NewManualReader returns a Reader which is directly called to collect metrics.
func NewManualReader(opts ...ManualReaderOption) Reader {
cfg := newManualReaderConfig(opts)
return &manualReader{
r := &manualReader{
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
}
r.externalProducers.Store([]Producer{})
return r
}

// register stores the Producer which enables the caller to read
// metrics on demand.
func (mr *manualReader) register(p producer) {
// register stores the sdkProducer which enables the caller
// to read metrics from the SDK on demand.
func (mr *manualReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register manual reader"
global.Error(errDuplicateRegister, msg)
}
}

// RegisterProducer stores the external Producer which enables the caller
// to read metrics on demand.
func (mr *manualReader) RegisterProducer(p Producer) {
mr.mu.Lock()
defer mr.mu.Unlock()
if mr.isShutdown {
return
}
currentProducers := mr.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
mr.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (mr *manualReader) temporality(kind InstrumentKind) metricdata.Temporality {
return mr.temporalitySelector(kind)
Expand All @@ -77,18 +98,23 @@ func (mr *manualReader) Shutdown(context.Context) error {
err := ErrReaderShutdown
mr.shutdownOnce.Do(func() {
// Any future call to Collect will now return ErrReaderShutdown.
mr.producer.Store(produceHolder{
mr.sdkProducer.Store(produceHolder{
produce: shutdownProducer{}.produce,
})
mr.mu.Lock()
defer mr.mu.Unlock()
mr.isShutdown = true
// release references to Producer(s)
mr.externalProducers.Store([]Producer{})
err = nil
})
return err
}

// Collect gathers all metrics from the SDK, calling any callbacks necessary.
// Collect will return an error if called after shutdown.
// Collect gathers all metrics from the SDK and other Producers, calling any
// callbacks necessary. Collect will return an error if called after shutdown.
func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
p := mr.producer.Load()
p := mr.sdkProducer.Load()
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
}
Expand All @@ -103,7 +129,19 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
return metricdata.ResourceMetrics{}, err
}

return ph.produce(ctx)
rm, err := ph.produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
var errs []error
for _, producer := range mr.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, unifyErrors(errs)
}

// manualReaderConfig contains configuration options for a ManualReader.
Expand Down
55 changes: 47 additions & 8 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
cancel: cancel,
done: make(chan struct{}),
}
r.externalProducers.Store([]Producer{})

go func() {
defer func() { close(r.done) }()
Expand All @@ -126,7 +127,11 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
// periodicReader is a Reader that continuously collects and exports metric
// data at a set interval.
type periodicReader struct {
producer atomic.Value
sdkProducer atomic.Value

mu sync.Mutex
isShutdown bool
externalProducers atomic.Value

timeout time.Duration
exporter Exporter
Expand Down Expand Up @@ -166,14 +171,28 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
}

// register registers p as the producer of this reader.
func (r *periodicReader) register(p producer) {
func (r *periodicReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register periodic reader"
global.Error(errDuplicateRegister, msg)
}
}

// RegisterProducer registers p as an external Producer of this reader.
func (r *periodicReader) RegisterProducer(p Producer) {
r.mu.Lock()
defer r.mu.Unlock()
if r.isShutdown {
return
}
currentProducers := r.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
r.externalProducers.Store(newProducers)
}

// temporality reports the Temporality for the instrument kind provided.
func (r *periodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.exporter.Temporality(kind)
Expand All @@ -195,12 +214,13 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error {
}

// Collect gathers and returns all metric data related to the Reader from
// the SDK. The returned metric data is not exported to the configured
// exporter, it is left to the caller to handle that if desired.
// the SDK and other Producers. The returned metric data is not exported
// to the configured exporter, it is left to the caller to handle that if
// desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
return r.collect(ctx, r.producer.Load())
return r.collect(ctx, r.sdkProducer.Load())
}

// collect unwraps p as a produceHolder and returns its produce results.
Expand All @@ -218,7 +238,20 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err
}
return ph.produce(ctx)

rm, err := ph.produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
var errs []error
for _, producer := range r.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, unifyErrors(errs)
}

// export exports metric data m using r's exporter.
Expand Down Expand Up @@ -259,7 +292,7 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
<-r.done

// Any future call to Collect will now return ErrReaderShutdown.
ph := r.producer.Swap(produceHolder{
ph := r.sdkProducer.Swap(produceHolder{
produce: shutdownProducer{}.produce,
})

Expand All @@ -276,6 +309,12 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
if err == nil || err == ErrReaderShutdown {
err = sErr
}

r.mu.Lock()
defer r.mu.Unlock()
r.isShutdown = true
// release references to Producer(s)
r.externalProducers.Store([]Producer{})
})
return err
}
20 changes: 12 additions & 8 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func (ts *periodicReaderTestSuite) SetupTest() {
}

ts.ErrReader = NewPeriodicReader(e)
ts.ErrReader.register(testProducer{})
ts.ErrReader.register(testSDKProducer{})
ts.ErrReader.RegisterProducer(testExternalProducer{})
}

func (ts *periodicReaderTestSuite) TearDownTest() {
Expand Down Expand Up @@ -186,14 +187,15 @@ func TestPeriodicReaderRun(t *testing.T) {

exp := &fnExporter{
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
// The testProducer produces testMetrics.
assert.Equal(t, testMetrics, m)
// The testSDKProducer produces testResourceMetricsAB.
assert.Equal(t, testResourceMetricsAB, m)
return assert.AnError
},
}

r := NewPeriodicReader(exp)
r.register(testProducer{})
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
trigger <- time.Now()
assert.Equal(t, assert.AnError, <-eh.Err)

Expand All @@ -210,8 +212,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
called = new(bool)
return &fnExporter{
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
// The testProducer produces testMetrics.
assert.Equal(t, testMetrics, m)
// The testSDKProducer produces testResourceMetricsA.
assert.Equal(t, testResourceMetricsAB, m)
*called = true
return assert.AnError
},
Expand All @@ -221,7 +223,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
t.Run("ForceFlush", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r.register(testProducer{})
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")

Expand All @@ -232,7 +235,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
t.Run("Shutdown", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r.register(testProducer{})
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
})
Expand Down
19 changes: 16 additions & 3 deletions sdk/metric/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ type Reader interface {
// register registers a Reader with a MeterProvider.
// The producer argument allows the Reader to signal the sdk to collect
// and send aggregated metric measurements.
register(producer)
register(sdkProducer)

// RegisterProducer registers a an external Producer with this Reader.
// The Producer is used as a source of aggregated metric data which is
// incorporated into metrics collected from the SDK.
RegisterProducer(Producer)

// temporality reports the Temporality for the instrument kind provided.
temporality(InstrumentKind) metricdata.Temporality
Expand Down Expand Up @@ -84,14 +89,22 @@ type Reader interface {
Shutdown(context.Context) error
}

// producer produces metrics for a Reader.
type producer interface {
// sdkProducer produces metrics for a Reader.
type sdkProducer interface {
// produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
produce(context.Context) (metricdata.ResourceMetrics, error)
}

// Producer produces metrics for a Reader from an external source.
type Producer interface {
// Produce returns aggregated metrics from an external source.
//
// This method should be safe to call concurrently.
Produce(context.Context) ([]metricdata.ScopeMetrics, error)
}

// produceHolder is used as an atomic.Value to wrap the non-concrete producer
// type.
type produceHolder struct {
Expand Down
Loading

0 comments on commit 0e2ed46

Please sign in to comment.