Skip to content

Commit

Permalink
Move asyncronous metrics state helper out of apimetric/metrictes (#1234)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd authored Oct 11, 2020
1 parent 02cd123 commit f60f51d
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
5 changes: 3 additions & 2 deletions api/metric/metrictest/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/otel/api/metric"
apimetric "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/metric/registry"
internalmetric "go.opentelemetry.io/otel/internal/metric"
"go.opentelemetry.io/otel/label"
)

Expand All @@ -44,7 +45,7 @@ type (

MeasurementBatches []Batch

asyncInstruments *AsyncInstrumentState
asyncInstruments *internalmetric.AsyncInstrumentState
}

Measurement struct {
Expand Down Expand Up @@ -115,7 +116,7 @@ func (m *MeterImpl) doRecordSingle(ctx context.Context, labels []label.KeyValue,

func NewMeterProvider() (*MeterImpl, apimetric.MeterProvider) {
impl := &MeterImpl{
asyncInstruments: NewAsyncInstrumentState(),
asyncInstruments: internalmetric.NewAsyncInstrumentState(),
}
return impl, registry.NewMeterProvider(impl)
}
Expand Down
22 changes: 11 additions & 11 deletions api/metric/metrictest/async.go → internal/metric/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package metrictest
package metric

import (
"context"
Expand All @@ -21,7 +21,7 @@ import (
"sync"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/metric"
api "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/label"
)

Expand All @@ -32,7 +32,7 @@ var ErrInvalidAsyncRunner = errors.New("unknown async runner type")
// the SDK to provide support for running observer callbacks.
type AsyncCollector interface {
// CollectAsync passes a batch of observations to the MeterImpl.
CollectAsync(labels []label.KeyValue, observation ...metric.Observation)
CollectAsync(labels []label.KeyValue, observation ...api.Observation)
}

// AsyncInstrumentState manages an ordered set of asynchronous
Expand Down Expand Up @@ -60,18 +60,18 @@ type AsyncInstrumentState struct {

// instruments maintains the set of instruments in the order
// they were registered.
instruments []metric.AsyncImpl
instruments []api.AsyncImpl
}

// asyncRunnerPair is a map entry for Observer callback runners.
type asyncRunnerPair struct {
// runner is used as a map key here. The API ensures
// that all callbacks are pointers for this reason.
runner metric.AsyncRunner
runner api.AsyncRunner

// inst refers to a non-nil instrument when `runner` is a
// AsyncSingleRunner.
inst metric.AsyncImpl
inst api.AsyncImpl
}

// NewAsyncInstrumentState returns a new *AsyncInstrumentState, for
Expand All @@ -86,7 +86,7 @@ func NewAsyncInstrumentState() *AsyncInstrumentState {
// Instruments returns the asynchronous instruments managed by this
// object, the set that should be checkpointed after observers are
// run.
func (a *AsyncInstrumentState) Instruments() []metric.AsyncImpl {
func (a *AsyncInstrumentState) Instruments() []api.AsyncImpl {
a.lock.Lock()
defer a.lock.Unlock()
return a.instruments
Expand All @@ -96,7 +96,7 @@ func (a *AsyncInstrumentState) Instruments() []metric.AsyncImpl {
// object. This should be called during NewAsyncInstrument() and
// assumes that errors (e.g., duplicate registration) have already
// been checked.
func (a *AsyncInstrumentState) Register(inst metric.AsyncImpl, runner metric.AsyncRunner) {
func (a *AsyncInstrumentState) Register(inst api.AsyncImpl, runner api.AsyncRunner) {
a.lock.Lock()
defer a.lock.Unlock()

Expand All @@ -110,7 +110,7 @@ func (a *AsyncInstrumentState) Register(inst metric.AsyncImpl, runner metric.Asy
rp := asyncRunnerPair{
runner: runner,
}
if _, ok := runner.(metric.AsyncSingleRunner); ok {
if _, ok := runner.(api.AsyncSingleRunner); ok {
rp.inst = inst
}

Expand All @@ -131,12 +131,12 @@ func (a *AsyncInstrumentState) Run(ctx context.Context, collector AsyncCollector
// other implementations are possible because the
// interface has un-exported methods.

if singleRunner, ok := rp.runner.(metric.AsyncSingleRunner); ok {
if singleRunner, ok := rp.runner.(api.AsyncSingleRunner); ok {
singleRunner.Run(ctx, rp.inst, collector.CollectAsync)
continue
}

if multiRunner, ok := rp.runner.(metric.AsyncBatchRunner); ok {
if multiRunner, ok := rp.runner.(api.AsyncBatchRunner); ok {
multiRunner.Run(ctx, collector.CollectAsync)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/metric"
api "go.opentelemetry.io/otel/api/metric"
internal "go.opentelemetry.io/otel/api/metric/metrictest"
internal "go.opentelemetry.io/otel/internal/metric"
"go.opentelemetry.io/otel/label"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator"
Expand Down

0 comments on commit f60f51d

Please sign in to comment.