Skip to content

Commit

Permalink
aggregators and selectors
Browse files Browse the repository at this point in the history
  • Loading branch information
krnowak committed Feb 12, 2020
1 parent 24315fc commit 8290d9f
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 10 deletions.
13 changes: 11 additions & 2 deletions sdk/metric/aggregator/array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,14 @@ func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) {
// Update takes a lock to prevent concurrent Update() and Checkpoint()
// calls.
func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error {
c.UpdateArray(number, desc)
return nil
}

func (c *Aggregator) UpdateArray(number core.Number, desc *export.Descriptor) {
c.lock.Lock()
c.current = append(c.current, number)
c.lock.Unlock()
return nil
}

// Merge combines two data sets into one.
Expand All @@ -120,9 +124,14 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error
return aggregator.NewInconsistentMergeError(c, oa)
}

c.MergeArrayAggregator(o, desc)
return nil
}

// MergeArrayAggregator combines two data sets into one.
func (c *Aggregator) MergeArrayAggregator(o *Aggregator, desc *export.Descriptor) {
c.ckptSum.AddNumber(desc.NumberKind(), o.ckptSum)
c.checkpoint = combine(c.checkpoint, o.checkpoint, desc.NumberKind())
return nil
}

func (c *Aggregator) sort(kind core.NumberKind) {
Expand Down
16 changes: 14 additions & 2 deletions sdk/metric/aggregator/ddsketch/ddsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,17 @@ func (c *Aggregator) Checkpoint(ctx context.Context, _ *export.Descriptor) {
// Update takes a lock to prevent concurrent Update() and Checkpoint()
// calls.
func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error {
c.UpdateDDSketch(number, desc)
return nil
}

// UpdateDDSketch adds the recorded measurement to the current data
// set. UpdateDDSketch takes a lock to prevent concurrent
// UpdateDDSketch() and Checkpoint() calls.
func (c *Aggregator) UpdateDDSketch(number core.Number, desc *export.Descriptor) {
c.lock.Lock()
defer c.lock.Unlock()
c.current.Add(number.CoerceToFloat64(desc.NumberKind()))
return nil
}

// Merge combines two sketches into one.
Expand All @@ -129,6 +136,11 @@ func (c *Aggregator) Merge(oa export.Aggregator, d *export.Descriptor) error {
return aggregator.NewInconsistentMergeError(c, oa)
}

c.checkpoint.Merge(o.checkpoint)
c.MergeDDSketchAggregator(o)
return nil
}

// MergeDDSketchAggregator combines two sketches into one.
func (c *Aggregator) MergeDDSketchAggregator(o *Aggregator) {
c.checkpoint.Merge(o.checkpoint)
}
16 changes: 12 additions & 4 deletions sdk/metric/aggregator/gauge/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ func (g *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error
return aggregator.NewInconsistentMergeError(g, oa)
}

g.MergeGaugeAggregator(o, desc)
return nil
}

// MergeGaugeAggregator combines state from two aggregators. If the
// gauge is declared as monotonic, the greater value is chosen. If
// the gauge is declared as non-monotonic, the most-recently set value
// is chosen.
func (g *Aggregator) MergeGaugeAggregator(o *Aggregator, desc *export.Descriptor) {
ggd := (*gaugeData)(atomic.LoadPointer(&g.checkpoint))
ogd := (*gaugeData)(atomic.LoadPointer(&o.checkpoint))

Expand All @@ -144,19 +153,18 @@ func (g *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error
cmp := ggd.value.CompareNumber(desc.NumberKind(), ogd.value)

if cmp > 0 {
return nil
return
}

if cmp < 0 {
g.checkpoint = unsafe.Pointer(ogd)
return nil
return
}
}
// Non-monotonic gauge or equal values
if ggd.timestamp.After(ogd.timestamp) {
return nil
return
}

g.checkpoint = unsafe.Pointer(ogd)
return nil
}
12 changes: 10 additions & 2 deletions sdk/metric/aggregator/minmaxsumcount/mmsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) {

// Update adds the recorded measurement to the current data set.
func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.Descriptor) error {
c.UpdateMMSC(number, desc)
return nil
}

func (c *Aggregator) UpdateMMSC(number core.Number, desc *export.Descriptor) {
kind := desc.NumberKind()

c.current.count.AddUint64Atomic(1)
Expand All @@ -149,7 +154,6 @@ func (c *Aggregator) Update(_ context.Context, number core.Number, desc *export.
break
}
}
return nil
}

// Merge combines two data sets into one.
Expand All @@ -159,6 +163,11 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error
return aggregator.NewInconsistentMergeError(c, oa)
}

c.MergeMMSCAggregator(o, desc)
return nil
}

func (c *Aggregator) MergeMMSCAggregator(o *Aggregator, desc *export.Descriptor) {
c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count)

Expand All @@ -168,5 +177,4 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *export.Descriptor) error
if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 {
c.checkpoint.max.SetNumber(o.checkpoint.max)
}
return nil
}
100 changes: 100 additions & 0 deletions sdk/metric/aggregator/observerarray/observerarray.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package observerarray // import "go.opentelemetry.io/otel/sdk/metric/aggregator/observerarray"

import (
"context"
"time"

"go.opentelemetry.io/otel/api/core"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/gauge"
)

type (
Aggregator struct {
g *gauge.Aggregator
a *array.Aggregator
}
)

var (
_ export.Aggregator = &Aggregator{}
_ aggregator.LastValue = &Aggregator{}
_ aggregator.MinMaxSumCount = &Aggregator{}
_ aggregator.Distribution = &Aggregator{}
_ aggregator.Points = &Aggregator{}
)

func New() *Aggregator {
return &Aggregator{
g: gauge.New(),
a: array.New(),
}
}

func (a *Aggregator) Update(ctx context.Context, number core.Number, descriptor *export.Descriptor) error {
if err := a.g.Update(ctx, number, descriptor); err != nil {
return err
}
a.a.UpdateArray(number, descriptor)
return nil
}

func (a *Aggregator) Checkpoint(ctx context.Context, descriptor *export.Descriptor) {
a.g.Checkpoint(ctx, descriptor)
a.a.Checkpoint(ctx, descriptor)
}

func (a *Aggregator) Merge(oa export.Aggregator, descriptor *export.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentMergeError(a, oa)
}

a.g.MergeGaugeAggregator(o.g, descriptor)
a.a.MergeArrayAggregator(o.a, descriptor)
return nil
}

func (a *Aggregator) LastValue() (core.Number, time.Time, error) {
return a.g.LastValue()
}

func (a *Aggregator) Min() (core.Number, error) {
return a.a.Min()
}

func (a *Aggregator) Max() (core.Number, error) {
return a.a.Max()
}

func (a *Aggregator) Sum() (core.Number, error) {
return a.a.Sum()
}

func (a *Aggregator) Count() (int64, error) {
return a.a.Count()
}

func (a *Aggregator) Quantile(q float64) (core.Number, error) {
return a.a.Quantile(q)
}

func (a *Aggregator) Points() ([]core.Number, error) {
return a.a.Points()
}
95 changes: 95 additions & 0 deletions sdk/metric/aggregator/observerddsketch/observerddsketch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package observerddsketch // import "go.opentelemetry.io/otel/sdk/metric/aggregator/observerddsketch"

import (
"context"
"time"

"go.opentelemetry.io/otel/api/core"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
"go.opentelemetry.io/otel/sdk/metric/aggregator/gauge"
)

type (
Aggregator struct {
g *gauge.Aggregator
d *ddsketch.Aggregator
}
)

var (
_ export.Aggregator = &Aggregator{}
_ aggregator.LastValue = &Aggregator{}
_ aggregator.MinMaxSumCount = &Aggregator{}
_ aggregator.Distribution = &Aggregator{}
)

func New(cfg *ddsketch.Config, desc *export.Descriptor) *Aggregator {
return &Aggregator{
g: gauge.New(),
d: ddsketch.New(cfg, desc),
}
}

func (a *Aggregator) Update(ctx context.Context, number core.Number, descriptor *export.Descriptor) error {
if err := a.g.Update(ctx, number, descriptor); err != nil {
return err
}
a.d.UpdateDDSketch(number, descriptor)
return nil
}

func (a *Aggregator) Checkpoint(ctx context.Context, descriptor *export.Descriptor) {
a.g.Checkpoint(ctx, descriptor)
a.d.Checkpoint(ctx, descriptor)
}

func (a *Aggregator) Merge(oa export.Aggregator, descriptor *export.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentMergeError(a, oa)
}

a.g.MergeGaugeAggregator(o.g, descriptor)
a.d.MergeDDSketchAggregator(o.d)
return nil
}

func (a *Aggregator) LastValue() (core.Number, time.Time, error) {
return a.g.LastValue()
}

func (a *Aggregator) Min() (core.Number, error) {
return a.d.Min()
}

func (a *Aggregator) Max() (core.Number, error) {
return a.d.Max()
}

func (a *Aggregator) Sum() (core.Number, error) {
return a.d.Sum()
}

func (a *Aggregator) Count() (int64, error) {
return a.d.Count()
}

func (a *Aggregator) Quantile(q float64) (core.Number, error) {
return a.d.Quantile(q)
}
Loading

0 comments on commit 8290d9f

Please sign in to comment.