Skip to content

Commit

Permalink
Kick label encoder out of sdk (#574)
Browse files Browse the repository at this point in the history
* Temporarily opt-out export.Labels from label encoding stuff

* Stop passing label encoding stuff to export.Labels

* Drop label encoding stuff from SDK

* Dogstatd exporter does not need to implement label exporter anymore

* more dogstatd exporter fixes

* export labels get back to encoding stuff

in a lame way, but improvements are coming in following commits

* Get encoded labels through export.Labels

* make SDK to provide its own implementation of export.Labels

* drop dead code

* add noop label exporter

* make export simple labels immutable

* Move the default label encoder to export package

* Simplify the simple export labels a bit

* Reserve some label exporter IDs

* Document and shuffle the code a bit

* Prepare for bring the iterator benchmark test back

We can install a callback to the Batcher's process function - this is
the place where we can access the labels, and thus test the label
iterator.

* Bring back the iterator benchmarks

* Simplifications and docs

* Fix copyright to be consistent with the rest

* Fix typo

* Put reserved label encoder IDs into constants

We get fewer comments about magic numbers that way.

* Fix the label encoder as label exporter thinko
  • Loading branch information
krnowak authored Mar 24, 2020
1 parent 6f881b4 commit d648712
Show file tree
Hide file tree
Showing 28 changed files with 353 additions and 334 deletions.
2 changes: 1 addition & 1 deletion api/global/internal/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newFixture(b *testing.B) *benchFixture {
bf := &benchFixture{
B: b,
}
bf.sdk = sdk.New(bf, sdk.NewDefaultLabelEncoder())
bf.sdk = sdk.New(bf)
bf.meter = metric.WrapMeterImpl(bf.sdk)
return bf
}
Expand Down
22 changes: 5 additions & 17 deletions exporters/metric/dogstatsd/dogstatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,19 @@ type (
// https://github.com/stripe/veneur/blob/master/sinks/datadog/datadog.go
Exporter struct {
*statsd.Exporter
*statsd.LabelEncoder

ReencodedLabelsCount int
labelEncoder *statsd.LabelEncoder
}
)

var (
_ export.Exporter = &Exporter{}
_ export.LabelEncoder = &Exporter{}
_ export.Exporter = &Exporter{}
)

// NewRawExporter returns a new Dogstatsd-syntax exporter for use in a pipeline.
// This type implements the metric.LabelEncoder interface,
// allowing the SDK's unique label encoding to be pre-computed
// for the exporter and stored in the LabelSet.
func NewRawExporter(config Config) (*Exporter, error) {
exp := &Exporter{
LabelEncoder: statsd.NewLabelEncoder(),
labelEncoder: statsd.NewLabelEncoder(),
}

var err error
Expand Down Expand Up @@ -94,11 +89,8 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, e

// The ungrouped batcher ensures that the export sees the full
// set of labels as dogstatsd tags.
batcher := ungrouped.New(selector, false)
batcher := ungrouped.New(selector, exporter.labelEncoder, false)

// The pusher automatically recognizes that the exporter
// implements the LabelEncoder interface, which ensures the
// export encoding for labels is encoded in the LabelSet.
pusher := push.New(batcher, exporter, period)
pusher.Start()

Expand All @@ -112,10 +104,6 @@ func (*Exporter) AppendName(rec export.Record, buf *bytes.Buffer) {

// AppendTags is part of the stats-internal adapter interface.
func (e *Exporter) AppendTags(rec export.Record, buf *bytes.Buffer) {
encoded, inefficient := e.LabelEncoder.ForceEncode(rec.Labels())
encoded := rec.Labels().Encoded(e.labelEncoder)
_, _ = buf.WriteString(encoded)

if inefficient {
e.ReencodedLabelsCount++
}
}
44 changes: 16 additions & 28 deletions exporters/metric/dogstatsd/dogstatsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package dogstatsd_test
import (
"bytes"
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -28,43 +27,32 @@ import (
"go.opentelemetry.io/otel/exporters/metric/dogstatsd"
"go.opentelemetry.io/otel/exporters/metric/internal/statsd"
"go.opentelemetry.io/otel/exporters/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
)

// TestDogstatsLabels that labels are formatted in the correct style,
// whether or not the provided labels were encoded by a statsd label
// encoder.
func TestDogstatsLabels(t *testing.T) {
for inefficientCount, encoder := range []export.LabelEncoder{
statsd.NewLabelEncoder(), // inefficientCount == 0
sdk.NewDefaultLabelEncoder(), // inefficientCount == 1
} {
t.Run(fmt.Sprintf("%T", encoder), func(t *testing.T) {
ctx := context.Background()
checkpointSet := test.NewCheckpointSet(encoder)
encoder := statsd.NewLabelEncoder()
ctx := context.Background()
checkpointSet := test.NewCheckpointSet(encoder)

desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind)
cagg := sum.New()
_ = cagg.Update(ctx, core.NewInt64Number(123), &desc)
cagg.Checkpoint(ctx, &desc)
desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind)
cagg := sum.New()
_ = cagg.Update(ctx, core.NewInt64Number(123), &desc)
cagg.Checkpoint(ctx, &desc)

checkpointSet.Add(&desc, cagg, key.New("A").String("B"))
checkpointSet.Add(&desc, cagg, key.New("A").String("B"))

var buf bytes.Buffer
exp, err := dogstatsd.NewRawExporter(dogstatsd.Config{
Writer: &buf,
})
require.Nil(t, err)
require.Equal(t, 0, exp.ReencodedLabelsCount)
var buf bytes.Buffer
exp, err := dogstatsd.NewRawExporter(dogstatsd.Config{
Writer: &buf,
})
require.Nil(t, err)

err = exp.Export(ctx, checkpointSet)
require.Nil(t, err)
err = exp.Export(ctx, checkpointSet)
require.Nil(t, err)

require.Equal(t, inefficientCount, exp.ReencodedLabelsCount)

require.Equal(t, "test.name:123|c|#A:B\n", buf.String())
})
}
require.Equal(t, "test.name:123|c|#A:B\n", buf.String())
}
2 changes: 1 addition & 1 deletion exporters/metric/internal/statsd/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ var (
ErrInvalidScheme = fmt.Errorf("invalid statsd transport")
)

// NewExport returns a common implementation for exporters that Export
// NewExporter returns a common implementation for exporters that Export
// statsd syntax.
func NewExporter(config Config, adapter Adapter) (*Exporter, error) {
if config.MaxPacketSize <= 0 {
Expand Down
5 changes: 2 additions & 3 deletions exporters/metric/internal/statsd/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"go.opentelemetry.io/otel/exporters/metric/internal/statsd"
"go.opentelemetry.io/otel/exporters/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
)

// withTagsAdapter tests a dogstatsd-style statsd exporter.
Expand All @@ -44,7 +43,7 @@ func (*withTagsAdapter) AppendName(rec export.Record, buf *bytes.Buffer) {
}

func (ta *withTagsAdapter) AppendTags(rec export.Record, buf *bytes.Buffer) {
encoded, _ := ta.LabelEncoder.ForceEncode(rec.Labels())
encoded := rec.Labels().Encoded(ta.LabelEncoder)
_, _ = buf.WriteString(encoded)
}

Expand Down Expand Up @@ -125,7 +124,7 @@ timer.B.D:%s|ms
t.Fatal("New error: ", err)
}

checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())
cdesc := metric.NewDescriptor(
"counter", metric.CounterKind, nkind)
gdesc := metric.NewDescriptor(
Expand Down
20 changes: 3 additions & 17 deletions exporters/metric/internal/statsd/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ type LabelEncoder struct {
pool sync.Pool
}

// sameCheck is used to test whether label encoders are the same.
type sameCheck interface {
isStatsd()
}

var _ export.LabelEncoder = &LabelEncoder{}
var leID = export.NewLabelEncoderID()

// NewLabelEncoder returns a new encoder for dogstatsd-syntax metric
// labels.
Expand Down Expand Up @@ -69,16 +65,6 @@ func (e *LabelEncoder) Encode(iter export.LabelIterator) string {
return buf.String()
}

func (e *LabelEncoder) isStatsd() {}

// ForceEncode returns a statsd label encoding, even if the exported
// labels were encoded by a different type of encoder. Returns a
// boolean to indicate whether the labels were in fact re-encoded, to
// test for (and warn about) efficiency.
func (e *LabelEncoder) ForceEncode(labels export.Labels) (string, bool) {
if _, ok := labels.Encoder().(sameCheck); ok {
return labels.Encoded(), false
}

return e.Encode(labels.Iter()), true
func (*LabelEncoder) ID() int64 {
return leID
}
30 changes: 0 additions & 30 deletions exporters/metric/internal/statsd/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/exporters/metric/internal/statsd"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
)

var testLabels = []core.KeyValue{
Expand All @@ -44,32 +43,3 @@ func TestLabelSyntax(t *testing.T) {

require.Equal(t, "", encoder.Encode(export.LabelSlice(nil).Iter()))
}

func TestLabelForceEncode(t *testing.T) {
defaultLabelEncoder := sdk.NewDefaultLabelEncoder()
statsdLabelEncoder := statsd.NewLabelEncoder()

ls := export.LabelSlice(testLabels)
exportLabelsDefault := export.NewLabels(ls, defaultLabelEncoder.Encode(ls.Iter()), defaultLabelEncoder)
exportLabelsStatsd := export.NewLabels(ls, statsdLabelEncoder.Encode(ls.Iter()), statsdLabelEncoder)

statsdEncoding := exportLabelsStatsd.Encoded()
require.NotEqual(t, statsdEncoding, exportLabelsDefault.Encoded())

forced, repeat := statsdLabelEncoder.ForceEncode(exportLabelsDefault)
require.Equal(t, statsdEncoding, forced)
require.True(t, repeat)

forced, repeat = statsdLabelEncoder.ForceEncode(exportLabelsStatsd)
require.Equal(t, statsdEncoding, forced)
require.False(t, repeat)

// Check that this works for an embedded implementation.
exportLabelsEmbed := export.NewLabels(export.LabelSlice(testLabels), statsdEncoding, struct {
*statsd.LabelEncoder
}{LabelEncoder: statsdLabelEncoder})

forced, repeat = statsdLabelEncoder.ForceEncode(exportLabelsEmbed)
require.Equal(t, statsdEncoding, forced)
require.False(t, repeat)
}
3 changes: 1 addition & 2 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"go.opentelemetry.io/otel/api/global"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
Expand Down Expand Up @@ -153,7 +152,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h
// it could try again on the next scrape and no data would be lost, only resolution.
//
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
batcher := defaultkeys.New(selector, sdkmetric.NewDefaultLabelEncoder(), true)
batcher := defaultkeys.New(selector, export.NewDefaultLabelEncoder(), true)
pusher := push.New(batcher, exporter, period)
pusher.Start()

Expand Down
4 changes: 2 additions & 2 deletions exporters/metric/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
"go.opentelemetry.io/otel/exporters/metric/test"
sdk "go.opentelemetry.io/otel/sdk/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
)

func TestPrometheusExporter(t *testing.T) {
Expand All @@ -41,7 +41,7 @@ func TestPrometheusExporter(t *testing.T) {
}

var expected []string
checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())

counter := metric.NewDescriptor(
"counter", metric.CounterKind, core.Float64NumberKind)
Expand Down
11 changes: 9 additions & 2 deletions exporters/metric/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type Config struct {
// exporter may wish to configure quantiles on a per-metric
// basis.
Quantiles []float64

// LabelEncoder encodes the labels
LabelEncoder export.LabelEncoder
}

type expoBatch struct {
Expand Down Expand Up @@ -100,6 +103,9 @@ func NewRawExporter(config Config) (*Exporter, error) {
}
}
}
if config.LabelEncoder == nil {
config.LabelEncoder = export.NewDefaultLabelEncoder()
}
return &Exporter{
config: config,
}, nil
Expand Down Expand Up @@ -131,7 +137,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, e
if err != nil {
return nil, err
}
batcher := ungrouped.New(selector, true)
batcher := ungrouped.New(selector, exporter.config.LabelEncoder, true)
pusher := push.New(batcher, exporter, period)
pusher.Start()

Expand Down Expand Up @@ -218,7 +224,8 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
var materializedKeys []string

if iter.Len() > 0 {
materializedKeys = append(materializedKeys, record.Labels().Encoded())
encoded := record.Labels().Encoded(e.config.LabelEncoder)
materializedKeys = append(materializedKeys, encoded)
}

for _, k := range desc.Keys() {
Expand Down
17 changes: 8 additions & 9 deletions exporters/metric/stdout/stdout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"go.opentelemetry.io/otel/exporters/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
Expand Down Expand Up @@ -94,7 +93,7 @@ func TestStdoutTimestamp(t *testing.T) {

before := time.Now()

checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())

ctx := context.Background()
desc := metric.NewDescriptor("test.name", metric.ObserverKind, core.Int64NumberKind)
Expand Down Expand Up @@ -140,7 +139,7 @@ func TestStdoutTimestamp(t *testing.T) {
func TestStdoutCounterFormat(t *testing.T) {
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())

desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind)
cagg := sum.New()
Expand All @@ -157,7 +156,7 @@ func TestStdoutCounterFormat(t *testing.T) {
func TestStdoutLastValueFormat(t *testing.T) {
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())

desc := metric.NewDescriptor("test.name", metric.ObserverKind, core.Float64NumberKind)
lvagg := lastvalue.New()
Expand All @@ -174,7 +173,7 @@ func TestStdoutLastValueFormat(t *testing.T) {
func TestStdoutMinMaxSumCount(t *testing.T) {
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())

desc := metric.NewDescriptor("test.name", metric.MeasureKind, core.Float64NumberKind)
magg := minmaxsumcount.New(&desc)
Expand All @@ -194,7 +193,7 @@ func TestStdoutMeasureFormat(t *testing.T) {
PrettyPrint: true,
})

checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())

desc := metric.NewDescriptor("test.name", metric.MeasureKind, core.Float64NumberKind)
magg := array.New()
Expand Down Expand Up @@ -248,7 +247,7 @@ func TestStdoutNoData(t *testing.T) {

fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())

magg := tc
magg.Checkpoint(fix.ctx, &desc)
Expand All @@ -265,7 +264,7 @@ func TestStdoutNoData(t *testing.T) {
func TestStdoutLastValueNotSet(t *testing.T) {
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())

desc := metric.NewDescriptor("test.name", metric.ObserverKind, core.Float64NumberKind)
lvagg := lastvalue.New()
Expand All @@ -281,7 +280,7 @@ func TestStdoutLastValueNotSet(t *testing.T) {
func TestStdoutCounterWithUnspecifiedKeys(t *testing.T) {
fix := newFixture(t, stdout.Config{})

checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder())
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())

keys := []core.Key{key.New("C"), key.New("D")}

Expand Down
Loading

0 comments on commit d648712

Please sign in to comment.