diff --git a/sdk/metric/aggregator/array/array.go b/sdk/metric/aggregator/array/array.go index 3855e8cc28ca..923ef94c2016 100644 --- a/sdk/metric/aggregator/array/array.go +++ b/sdk/metric/aggregator/array/array.go @@ -107,10 +107,14 @@ func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { // Update takes a lock to prevent concurrent Update() and Checkpoint() // calls. func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error { + c.UpdateArray(number, desc) + return nil +} + +func (c *Aggregator) UpdateArray(number core.Number, desc *export.Descriptor) { c.lock.Lock() c.current = append(c.current, number) c.lock.Unlock() - return nil } // Merge combines two data sets into one. @@ -120,9 +124,14 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error return aggregator.NewInconsistentMergeError(c, oa) } + c.MergeArrayAggregator(o, desc) + return nil +} + +// MergeArrayAggregator combines two data sets into one. +func (c *Aggregator) MergeArrayAggregator(o *Aggregator, desc *export.Descriptor) { c.ckptSum.AddNumber(desc.NumberKind(), o.ckptSum) c.checkpoint = combine(c.checkpoint, o.checkpoint, desc.NumberKind()) - return nil } func (c *Aggregator) sort(kind core.NumberKind) { diff --git a/sdk/metric/aggregator/ddsketch/ddsketch.go b/sdk/metric/aggregator/ddsketch/ddsketch.go index 324edca7c93e..c2402b98cac1 100644 --- a/sdk/metric/aggregator/ddsketch/ddsketch.go +++ b/sdk/metric/aggregator/ddsketch/ddsketch.go @@ -116,10 +116,17 @@ func (c *Aggregator) Checkpoint(ctx context.Context, _ *export.Descriptor) { // Update takes a lock to prevent concurrent Update() and Checkpoint() // calls. func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error { + c.UpdateDDSketch(number, desc) + return nil +} + +// UpdateDDSketch adds the recorded measurement to the current data +// set. UpdateDDSketch takes a lock to prevent concurrent +// UpdateDDSketch() and Checkpoint() calls. +func (c *Aggregator) UpdateDDSketch(number core.Number, desc *export.Descriptor) { c.lock.Lock() defer c.lock.Unlock() c.current.Add(number.CoerceToFloat64(desc.NumberKind())) - return nil } // Merge combines two sketches into one. @@ -129,6 +136,11 @@ func (c *Aggregator) Merge(oa export.Aggregator, d *export.Descriptor) error { return aggregator.NewInconsistentMergeError(c, oa) } - c.checkpoint.Merge(o.checkpoint) + c.MergeDDSketchAggregator(o) return nil } + +// MergeDDSketchAggregator combines two sketches into one. +func (c *Aggregator) MergeDDSketchAggregator(o *Aggregator) { + c.checkpoint.Merge(o.checkpoint) +} diff --git a/sdk/metric/aggregator/gauge/gauge.go b/sdk/metric/aggregator/gauge/gauge.go index a9e73f73de87..7183194e5561 100644 --- a/sdk/metric/aggregator/gauge/gauge.go +++ b/sdk/metric/aggregator/gauge/gauge.go @@ -136,6 +136,15 @@ func (g *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error return aggregator.NewInconsistentMergeError(g, oa) } + g.MergeGaugeAggregator(o, desc) + return nil +} + +// MergeGaugeAggregator combines state from two aggregators. If the +// gauge is declared as monotonic, the greater value is chosen. If +// the gauge is declared as non-monotonic, the most-recently set value +// is chosen. +func (g *Aggregator) MergeGaugeAggregator(o *Aggregator, desc *export.Descriptor) { ggd := (*gaugeData)(atomic.LoadPointer(&g.checkpoint)) ogd := (*gaugeData)(atomic.LoadPointer(&o.checkpoint)) @@ -144,19 +153,18 @@ func (g *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error cmp := ggd.value.CompareNumber(desc.NumberKind(), ogd.value) if cmp > 0 { - return nil + return } if cmp < 0 { g.checkpoint = unsafe.Pointer(ogd) - return nil + return } } // Non-monotonic gauge or equal values if ggd.timestamp.After(ogd.timestamp) { - return nil + return } g.checkpoint = unsafe.Pointer(ogd) - return nil } diff --git a/sdk/metric/aggregator/minmaxsumcount/mmsc.go b/sdk/metric/aggregator/minmaxsumcount/mmsc.go index 8d9b67c7d6aa..24dec53d0d91 100644 --- a/sdk/metric/aggregator/minmaxsumcount/mmsc.go +++ b/sdk/metric/aggregator/minmaxsumcount/mmsc.go @@ -124,6 +124,11 @@ func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { // Update adds the recorded measurement to the current data set. func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error { + c.UpdateMMSC(number, desc) + return nil +} + +func (c *Aggregator) UpdateMMSC(number core.Number, desc *export.Descriptor) { kind := desc.NumberKind() c.current.count.AddUint64Atomic(1) @@ -149,7 +154,6 @@ func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export. break } } - return nil } // Merge combines two data sets into one. @@ -159,6 +163,11 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error return aggregator.NewInconsistentMergeError(c, oa) } + c.MergeMMSCAggregator(o, desc) + return nil +} + +func (c *Aggregator) MergeMMSCAggregator(o *Aggregator, desc *export.Descriptor) { c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum) c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count) @@ -168,5 +177,4 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 { c.checkpoint.max.SetNumber(o.checkpoint.max) } - return nil } diff --git a/sdk/metric/aggregator/observerarray/observerarray.go b/sdk/metric/aggregator/observerarray/observerarray.go new file mode 100644 index 000000000000..14747ec6c338 --- /dev/null +++ b/sdk/metric/aggregator/observerarray/observerarray.go @@ -0,0 +1,100 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package observerarray // import "go.opentelemetry.io/otel/sdk/metric/aggregator/observerarray" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/api/core" + export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregator" + "go.opentelemetry.io/otel/sdk/metric/aggregator/array" + "go.opentelemetry.io/otel/sdk/metric/aggregator/gauge" +) + +type ( + Aggregator struct { + g *gauge.Aggregator + a *array.Aggregator + } +) + +var ( + _ export.Aggregator = &Aggregator{} + _ aggregator.LastValue = &Aggregator{} + _ aggregator.MinMaxSumCount = &Aggregator{} + _ aggregator.Distribution = &Aggregator{} + _ aggregator.Points = &Aggregator{} +) + +func New() *Aggregator { + return &Aggregator{ + g: gauge.New(), + a: array.New(), + } +} + +func (a *Aggregator) Update(ctx context.Context, number core.Number, descriptor *export.Descriptor) error { + if err := a.g.Update(ctx, number, descriptor); err != nil { + return err + } + a.a.UpdateArray(number, descriptor) + return nil +} + +func (a *Aggregator) Checkpoint(ctx context.Context, descriptor *export.Descriptor) { + a.g.Checkpoint(ctx, descriptor) + a.a.Checkpoint(ctx, descriptor) +} + +func (a *Aggregator) Merge(oa export.Aggregator, descriptor *export.Descriptor) error { + o, _ := oa.(*Aggregator) + if o == nil { + return aggregator.NewInconsistentMergeError(a, oa) + } + + a.g.MergeGaugeAggregator(o.g, descriptor) + a.a.MergeArrayAggregator(o.a, descriptor) + return nil +} + +func (a *Aggregator) LastValue() (core.Number, time.Time, error) { + return a.g.LastValue() +} + +func (a *Aggregator) Min() (core.Number, error) { + return a.a.Min() +} + +func (a *Aggregator) Max() (core.Number, error) { + return a.a.Max() +} + +func (a *Aggregator) Sum() (core.Number, error) { + return a.a.Sum() +} + +func (a *Aggregator) Count() (int64, error) { + return a.a.Count() +} + +func (a *Aggregator) Quantile(q float64) (core.Number, error) { + return a.a.Quantile(q) +} + +func (a *Aggregator) Points() ([]core.Number, error) { + return a.a.Points() +} diff --git a/sdk/metric/aggregator/observerddsketch/observerddsketch.go b/sdk/metric/aggregator/observerddsketch/observerddsketch.go new file mode 100644 index 000000000000..654261728981 --- /dev/null +++ b/sdk/metric/aggregator/observerddsketch/observerddsketch.go @@ -0,0 +1,95 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package observerddsketch // import "go.opentelemetry.io/otel/sdk/metric/aggregator/observerddsketch" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/api/core" + export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregator" + "go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch" + "go.opentelemetry.io/otel/sdk/metric/aggregator/gauge" +) + +type ( + Aggregator struct { + g *gauge.Aggregator + d *ddsketch.Aggregator + } +) + +var ( + _ export.Aggregator = &Aggregator{} + _ aggregator.LastValue = &Aggregator{} + _ aggregator.MinMaxSumCount = &Aggregator{} + _ aggregator.Distribution = &Aggregator{} +) + +func New(cfg *ddsketch.Config, desc *export.Descriptor) *Aggregator { + return &Aggregator{ + g: gauge.New(), + d: ddsketch.New(cfg, desc), + } +} + +func (a *Aggregator) Update(ctx context.Context, number core.Number, descriptor *export.Descriptor) error { + if err := a.g.Update(ctx, number, descriptor); err != nil { + return err + } + a.d.UpdateDDSketch(number, descriptor) + return nil +} + +func (a *Aggregator) Checkpoint(ctx context.Context, descriptor *export.Descriptor) { + a.g.Checkpoint(ctx, descriptor) + a.d.Checkpoint(ctx, descriptor) +} + +func (a *Aggregator) Merge(oa export.Aggregator, descriptor *export.Descriptor) error { + o, _ := oa.(*Aggregator) + if o == nil { + return aggregator.NewInconsistentMergeError(a, oa) + } + + a.g.MergeGaugeAggregator(o.g, descriptor) + a.d.MergeDDSketchAggregator(o.d) + return nil +} + +func (a *Aggregator) LastValue() (core.Number, time.Time, error) { + return a.g.LastValue() +} + +func (a *Aggregator) Min() (core.Number, error) { + return a.d.Min() +} + +func (a *Aggregator) Max() (core.Number, error) { + return a.d.Max() +} + +func (a *Aggregator) Sum() (core.Number, error) { + return a.d.Sum() +} + +func (a *Aggregator) Count() (int64, error) { + return a.d.Count() +} + +func (a *Aggregator) Quantile(q float64) (core.Number, error) { + return a.d.Quantile(q) +} diff --git a/sdk/metric/aggregator/observermmsc/observermmsc.go b/sdk/metric/aggregator/observermmsc/observermmsc.go new file mode 100644 index 000000000000..31ea36f21604 --- /dev/null +++ b/sdk/metric/aggregator/observermmsc/observermmsc.go @@ -0,0 +1,90 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package observermmsc // import "go.opentelemetry.io/otel/sdk/metric/aggregator/observermmsc" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/api/core" + export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregator" + "go.opentelemetry.io/otel/sdk/metric/aggregator/gauge" + "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" +) + +type ( + Aggregator struct { + g *gauge.Aggregator + mmsc *minmaxsumcount.Aggregator + } +) + +var ( + _ export.Aggregator = &Aggregator{} + _ aggregator.LastValue = &Aggregator{} + _ aggregator.MinMaxSumCount = &Aggregator{} +) + +func New(desc *export.Descriptor) *Aggregator { + return &Aggregator{ + g: gauge.New(), + mmsc: minmaxsumcount.New(desc), + } +} + +func (a *Aggregator) Update(ctx context.Context, number core.Number, descriptor *export.Descriptor) error { + if err := a.g.Update(ctx, number, descriptor); err != nil { + return err + } + a.mmsc.UpdateMMSC(number, descriptor) + return nil +} + +func (a *Aggregator) Checkpoint(ctx context.Context, descriptor *export.Descriptor) { + a.g.Checkpoint(ctx, descriptor) + a.mmsc.Checkpoint(ctx, descriptor) +} + +func (a *Aggregator) Merge(oa export.Aggregator, descriptor *export.Descriptor) error { + o, _ := oa.(*Aggregator) + if o == nil { + return aggregator.NewInconsistentMergeError(a, oa) + } + + a.g.MergeGaugeAggregator(o.g, descriptor) + a.mmsc.MergeMMSCAggregator(o.mmsc, descriptor) + return nil +} + +func (a *Aggregator) LastValue() (core.Number, time.Time, error) { + return a.g.LastValue() +} + +func (a *Aggregator) Min() (core.Number, error) { + return a.mmsc.Min() +} + +func (a *Aggregator) Max() (core.Number, error) { + return a.mmsc.Max() +} + +func (a *Aggregator) Sum() (core.Number, error) { + return a.mmsc.Sum() +} + +func (a *Aggregator) Count() (int64, error) { + return a.mmsc.Count() +} diff --git a/sdk/metric/selector/simple/simple.go b/sdk/metric/selector/simple/simple.go index d86005b0365b..a5de136e5c78 100644 --- a/sdk/metric/selector/simple/simple.go +++ b/sdk/metric/selector/simple/simple.go @@ -21,6 +21,9 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch" "go.opentelemetry.io/otel/sdk/metric/aggregator/gauge" "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" + "go.opentelemetry.io/otel/sdk/metric/aggregator/observerarray" + "go.opentelemetry.io/otel/sdk/metric/aggregator/observerddsketch" + "go.opentelemetry.io/otel/sdk/metric/aggregator/observermmsc" ) type ( @@ -70,6 +73,8 @@ func (selectorInexpensive) AggregatorFor(descriptor *export.Descriptor) export.A switch descriptor.MetricKind() { case export.GaugeKind: return gauge.New() + case export.ObserverKind: + return observermmsc.New(descriptor) case export.MeasureKind: return minmaxsumcount.New(descriptor) default: @@ -81,6 +86,8 @@ func (s selectorSketch) AggregatorFor(descriptor *export.Descriptor) export.Aggr switch descriptor.MetricKind() { case export.GaugeKind: return gauge.New() + case export.ObserverKind: + return observerddsketch.New(s.config, descriptor) case export.MeasureKind: return ddsketch.New(s.config, descriptor) default: @@ -92,6 +99,8 @@ func (selectorExact) AggregatorFor(descriptor *export.Descriptor) export.Aggrega switch descriptor.MetricKind() { case export.GaugeKind: return gauge.New() + case export.ObserverKind: + return observerarray.New() case export.MeasureKind: return array.New() default: