diff --git a/api/global/internal/benchmark_test.go b/api/global/internal/benchmark_test.go index eb2b4f57e13..6ed92a8ebba 100644 --- a/api/global/internal/benchmark_test.go +++ b/api/global/internal/benchmark_test.go @@ -49,7 +49,7 @@ func newFixture(b *testing.B) *benchFixture { bf := &benchFixture{ B: b, } - bf.sdk = sdk.New(bf, sdk.NewDefaultLabelEncoder()) + bf.sdk = sdk.New(bf) bf.meter = metric.WrapMeterImpl(bf.sdk) return bf } diff --git a/exporters/metric/dogstatsd/dogstatsd.go b/exporters/metric/dogstatsd/dogstatsd.go index 586a4a5926c..da4966f27b8 100644 --- a/exporters/metric/dogstatsd/dogstatsd.go +++ b/exporters/metric/dogstatsd/dogstatsd.go @@ -40,24 +40,19 @@ type ( // https://github.com/stripe/veneur/blob/master/sinks/datadog/datadog.go Exporter struct { *statsd.Exporter - *statsd.LabelEncoder - ReencodedLabelsCount int + labelEncoder *statsd.LabelEncoder } ) var ( - _ export.Exporter = &Exporter{} - _ export.LabelEncoder = &Exporter{} + _ export.Exporter = &Exporter{} ) // NewRawExporter returns a new Dogstatsd-syntax exporter for use in a pipeline. -// This type implements the metric.LabelEncoder interface, -// allowing the SDK's unique label encoding to be pre-computed -// for the exporter and stored in the LabelSet. func NewRawExporter(config Config) (*Exporter, error) { exp := &Exporter{ - LabelEncoder: statsd.NewLabelEncoder(), + labelEncoder: statsd.NewLabelEncoder(), } var err error @@ -94,11 +89,8 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, e // The ungrouped batcher ensures that the export sees the full // set of labels as dogstatsd tags. - batcher := ungrouped.New(selector, false) + batcher := ungrouped.New(selector, exporter.labelEncoder, false) - // The pusher automatically recognizes that the exporter - // implements the LabelEncoder interface, which ensures the - // export encoding for labels is encoded in the LabelSet. pusher := push.New(batcher, exporter, period) pusher.Start() @@ -112,10 +104,6 @@ func (*Exporter) AppendName(rec export.Record, buf *bytes.Buffer) { // AppendTags is part of the stats-internal adapter interface. func (e *Exporter) AppendTags(rec export.Record, buf *bytes.Buffer) { - encoded, inefficient := e.LabelEncoder.ForceEncode(rec.Labels()) + encoded := rec.Labels().Encoded(e.labelEncoder) _, _ = buf.WriteString(encoded) - - if inefficient { - e.ReencodedLabelsCount++ - } } diff --git a/exporters/metric/dogstatsd/dogstatsd_test.go b/exporters/metric/dogstatsd/dogstatsd_test.go index c35f7abbe7a..d79dd1fd15d 100644 --- a/exporters/metric/dogstatsd/dogstatsd_test.go +++ b/exporters/metric/dogstatsd/dogstatsd_test.go @@ -17,7 +17,6 @@ package dogstatsd_test import ( "bytes" "context" - "fmt" "testing" "github.com/stretchr/testify/require" @@ -28,8 +27,6 @@ import ( "go.opentelemetry.io/otel/exporters/metric/dogstatsd" "go.opentelemetry.io/otel/exporters/metric/internal/statsd" "go.opentelemetry.io/otel/exporters/metric/test" - export "go.opentelemetry.io/otel/sdk/export/metric" - sdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" ) @@ -37,34 +34,25 @@ import ( // whether or not the provided labels were encoded by a statsd label // encoder. func TestDogstatsLabels(t *testing.T) { - for inefficientCount, encoder := range []export.LabelEncoder{ - statsd.NewLabelEncoder(), // inefficientCount == 0 - sdk.NewDefaultLabelEncoder(), // inefficientCount == 1 - } { - t.Run(fmt.Sprintf("%T", encoder), func(t *testing.T) { - ctx := context.Background() - checkpointSet := test.NewCheckpointSet(encoder) + encoder := statsd.NewLabelEncoder() + ctx := context.Background() + checkpointSet := test.NewCheckpointSet(encoder) - desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind) - cagg := sum.New() - _ = cagg.Update(ctx, core.NewInt64Number(123), &desc) - cagg.Checkpoint(ctx, &desc) + desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind) + cagg := sum.New() + _ = cagg.Update(ctx, core.NewInt64Number(123), &desc) + cagg.Checkpoint(ctx, &desc) - checkpointSet.Add(&desc, cagg, key.New("A").String("B")) + checkpointSet.Add(&desc, cagg, key.New("A").String("B")) - var buf bytes.Buffer - exp, err := dogstatsd.NewRawExporter(dogstatsd.Config{ - Writer: &buf, - }) - require.Nil(t, err) - require.Equal(t, 0, exp.ReencodedLabelsCount) + var buf bytes.Buffer + exp, err := dogstatsd.NewRawExporter(dogstatsd.Config{ + Writer: &buf, + }) + require.Nil(t, err) - err = exp.Export(ctx, checkpointSet) - require.Nil(t, err) + err = exp.Export(ctx, checkpointSet) + require.Nil(t, err) - require.Equal(t, inefficientCount, exp.ReencodedLabelsCount) - - require.Equal(t, "test.name:123|c|#A:B\n", buf.String()) - }) - } + require.Equal(t, "test.name:123|c|#A:B\n", buf.String()) } diff --git a/exporters/metric/internal/statsd/conn.go b/exporters/metric/internal/statsd/conn.go index 0256f84a679..e08d8d2b9ee 100644 --- a/exporters/metric/internal/statsd/conn.go +++ b/exporters/metric/internal/statsd/conn.go @@ -86,7 +86,7 @@ var ( ErrInvalidScheme = fmt.Errorf("invalid statsd transport") ) -// NewExport returns a common implementation for exporters that Export +// NewExporter returns a common implementation for exporters that Export // statsd syntax. func NewExporter(config Config, adapter Adapter) (*Exporter, error) { if config.MaxPacketSize <= 0 { diff --git a/exporters/metric/internal/statsd/conn_test.go b/exporters/metric/internal/statsd/conn_test.go index 14e4fcf0f06..b8b99e9ec5e 100644 --- a/exporters/metric/internal/statsd/conn_test.go +++ b/exporters/metric/internal/statsd/conn_test.go @@ -31,7 +31,6 @@ import ( "go.opentelemetry.io/otel/exporters/metric/internal/statsd" "go.opentelemetry.io/otel/exporters/metric/test" export "go.opentelemetry.io/otel/sdk/export/metric" - sdk "go.opentelemetry.io/otel/sdk/metric" ) // withTagsAdapter tests a dogstatsd-style statsd exporter. @@ -44,7 +43,7 @@ func (*withTagsAdapter) AppendName(rec export.Record, buf *bytes.Buffer) { } func (ta *withTagsAdapter) AppendTags(rec export.Record, buf *bytes.Buffer) { - encoded, _ := ta.LabelEncoder.ForceEncode(rec.Labels()) + encoded := rec.Labels().Encoded(ta.LabelEncoder) _, _ = buf.WriteString(encoded) } @@ -125,7 +124,7 @@ timer.B.D:%s|ms t.Fatal("New error: ", err) } - checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder()) cdesc := metric.NewDescriptor( "counter", metric.CounterKind, nkind) gdesc := metric.NewDescriptor( diff --git a/exporters/metric/internal/statsd/labels.go b/exporters/metric/internal/statsd/labels.go index 769e1291ec5..6e7ac742e71 100644 --- a/exporters/metric/internal/statsd/labels.go +++ b/exporters/metric/internal/statsd/labels.go @@ -31,12 +31,8 @@ type LabelEncoder struct { pool sync.Pool } -// sameCheck is used to test whether label encoders are the same. -type sameCheck interface { - isStatsd() -} - var _ export.LabelEncoder = &LabelEncoder{} +var leID = export.NewLabelEncoderID() // NewLabelEncoder returns a new encoder for dogstatsd-syntax metric // labels. @@ -69,16 +65,6 @@ func (e *LabelEncoder) Encode(iter export.LabelIterator) string { return buf.String() } -func (e *LabelEncoder) isStatsd() {} - -// ForceEncode returns a statsd label encoding, even if the exported -// labels were encoded by a different type of encoder. Returns a -// boolean to indicate whether the labels were in fact re-encoded, to -// test for (and warn about) efficiency. -func (e *LabelEncoder) ForceEncode(labels export.Labels) (string, bool) { - if _, ok := labels.Encoder().(sameCheck); ok { - return labels.Encoded(), false - } - - return e.Encode(labels.Iter()), true +func (*LabelEncoder) ID() int64 { + return leID } diff --git a/exporters/metric/internal/statsd/labels_test.go b/exporters/metric/internal/statsd/labels_test.go index 1eb92d6eff8..09a0bf5fffb 100644 --- a/exporters/metric/internal/statsd/labels_test.go +++ b/exporters/metric/internal/statsd/labels_test.go @@ -23,7 +23,6 @@ import ( "go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/exporters/metric/internal/statsd" export "go.opentelemetry.io/otel/sdk/export/metric" - sdk "go.opentelemetry.io/otel/sdk/metric" ) var testLabels = []core.KeyValue{ @@ -44,32 +43,3 @@ func TestLabelSyntax(t *testing.T) { require.Equal(t, "", encoder.Encode(export.LabelSlice(nil).Iter())) } - -func TestLabelForceEncode(t *testing.T) { - defaultLabelEncoder := sdk.NewDefaultLabelEncoder() - statsdLabelEncoder := statsd.NewLabelEncoder() - - ls := export.LabelSlice(testLabels) - exportLabelsDefault := export.NewLabels(ls, defaultLabelEncoder.Encode(ls.Iter()), defaultLabelEncoder) - exportLabelsStatsd := export.NewLabels(ls, statsdLabelEncoder.Encode(ls.Iter()), statsdLabelEncoder) - - statsdEncoding := exportLabelsStatsd.Encoded() - require.NotEqual(t, statsdEncoding, exportLabelsDefault.Encoded()) - - forced, repeat := statsdLabelEncoder.ForceEncode(exportLabelsDefault) - require.Equal(t, statsdEncoding, forced) - require.True(t, repeat) - - forced, repeat = statsdLabelEncoder.ForceEncode(exportLabelsStatsd) - require.Equal(t, statsdEncoding, forced) - require.False(t, repeat) - - // Check that this works for an embedded implementation. - exportLabelsEmbed := export.NewLabels(export.LabelSlice(testLabels), statsdEncoding, struct { - *statsd.LabelEncoder - }{LabelEncoder: statsdLabelEncoder}) - - forced, repeat = statsdLabelEncoder.ForceEncode(exportLabelsEmbed) - require.Equal(t, statsdEncoding, forced) - require.False(t, repeat) -} diff --git a/exporters/metric/prometheus/prometheus.go b/exporters/metric/prometheus/prometheus.go index 8ed9cbcd209..8263607381c 100644 --- a/exporters/metric/prometheus/prometheus.go +++ b/exporters/metric/prometheus/prometheus.go @@ -27,7 +27,6 @@ import ( "go.opentelemetry.io/otel/api/global" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys" "go.opentelemetry.io/otel/sdk/metric/controller/push" "go.opentelemetry.io/otel/sdk/metric/selector/simple" @@ -153,7 +152,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h // it could try again on the next scrape and no data would be lost, only resolution. // // Gauges (or LastValues) and Summaries are an exception to this and have different behaviors. - batcher := defaultkeys.New(selector, sdkmetric.NewDefaultLabelEncoder(), true) + batcher := defaultkeys.New(selector, export.NewDefaultLabelEncoder(), true) pusher := push.New(batcher, exporter, period) pusher.Start() diff --git a/exporters/metric/prometheus/prometheus_test.go b/exporters/metric/prometheus/prometheus_test.go index c3afeb620d3..aad70ff849b 100644 --- a/exporters/metric/prometheus/prometheus_test.go +++ b/exporters/metric/prometheus/prometheus_test.go @@ -29,7 +29,7 @@ import ( "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/metric/prometheus" "go.opentelemetry.io/otel/exporters/metric/test" - sdk "go.opentelemetry.io/otel/sdk/metric" + export "go.opentelemetry.io/otel/sdk/export/metric" ) func TestPrometheusExporter(t *testing.T) { @@ -41,7 +41,7 @@ func TestPrometheusExporter(t *testing.T) { } var expected []string - checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder()) counter := metric.NewDescriptor( "counter", metric.CounterKind, core.Float64NumberKind) diff --git a/exporters/metric/stdout/stdout.go b/exporters/metric/stdout/stdout.go index 96bd57a8b23..1d342a656da 100644 --- a/exporters/metric/stdout/stdout.go +++ b/exporters/metric/stdout/stdout.go @@ -60,6 +60,9 @@ type Config struct { // exporter may wish to configure quantiles on a per-metric // basis. Quantiles []float64 + + // LabelEncoder encodes the labels + LabelEncoder export.LabelEncoder } type expoBatch struct { @@ -100,6 +103,9 @@ func NewRawExporter(config Config) (*Exporter, error) { } } } + if config.LabelEncoder == nil { + config.LabelEncoder = export.NewDefaultLabelEncoder() + } return &Exporter{ config: config, }, nil @@ -131,7 +137,7 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, e if err != nil { return nil, err } - batcher := ungrouped.New(selector, true) + batcher := ungrouped.New(selector, exporter.config.LabelEncoder, true) pusher := push.New(batcher, exporter, period) pusher.Start() @@ -218,7 +224,8 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) var materializedKeys []string if iter.Len() > 0 { - materializedKeys = append(materializedKeys, record.Labels().Encoded()) + encoded := record.Labels().Encoded(e.config.LabelEncoder) + materializedKeys = append(materializedKeys, encoded) } for _, k := range desc.Keys() { diff --git a/exporters/metric/stdout/stdout_test.go b/exporters/metric/stdout/stdout_test.go index e037d80a45e..b7e9dcb21ae 100644 --- a/exporters/metric/stdout/stdout_test.go +++ b/exporters/metric/stdout/stdout_test.go @@ -31,7 +31,6 @@ import ( "go.opentelemetry.io/otel/exporters/metric/test" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" - sdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/array" "go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" @@ -94,7 +93,7 @@ func TestStdoutTimestamp(t *testing.T) { before := time.Now() - checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder()) ctx := context.Background() desc := metric.NewDescriptor("test.name", metric.ObserverKind, core.Int64NumberKind) @@ -140,7 +139,7 @@ func TestStdoutTimestamp(t *testing.T) { func TestStdoutCounterFormat(t *testing.T) { fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder()) desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind) cagg := sum.New() @@ -157,7 +156,7 @@ func TestStdoutCounterFormat(t *testing.T) { func TestStdoutLastValueFormat(t *testing.T) { fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder()) desc := metric.NewDescriptor("test.name", metric.ObserverKind, core.Float64NumberKind) lvagg := lastvalue.New() @@ -174,7 +173,7 @@ func TestStdoutLastValueFormat(t *testing.T) { func TestStdoutMinMaxSumCount(t *testing.T) { fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder()) desc := metric.NewDescriptor("test.name", metric.MeasureKind, core.Float64NumberKind) magg := minmaxsumcount.New(&desc) @@ -194,7 +193,7 @@ func TestStdoutMeasureFormat(t *testing.T) { PrettyPrint: true, }) - checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder()) desc := metric.NewDescriptor("test.name", metric.MeasureKind, core.Float64NumberKind) magg := array.New() @@ -248,7 +247,7 @@ func TestStdoutNoData(t *testing.T) { fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder()) magg := tc magg.Checkpoint(fix.ctx, &desc) @@ -265,7 +264,7 @@ func TestStdoutNoData(t *testing.T) { func TestStdoutLastValueNotSet(t *testing.T) { fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder()) desc := metric.NewDescriptor("test.name", metric.ObserverKind, core.Float64NumberKind) lvagg := lastvalue.New() @@ -281,7 +280,7 @@ func TestStdoutLastValueNotSet(t *testing.T) { func TestStdoutCounterWithUnspecifiedKeys(t *testing.T) { fix := newFixture(t, stdout.Config{}) - checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder()) keys := []core.Key{key.New("C"), key.New("D")} diff --git a/exporters/metric/test/test.go b/exporters/metric/test/test.go index 09fcc5d4ef2..6e3d1127b8d 100644 --- a/exporters/metric/test/test.go +++ b/exporters/metric/test/test.go @@ -52,10 +52,9 @@ func (p *CheckpointSet) Reset() { // If there is an existing record with the same descriptor and LabelSet // the stored aggregator will be returned and should be merged. func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, labels ...core.KeyValue) (agg export.Aggregator, added bool) { - ls := export.LabelSlice(labels) - elabels := export.NewLabels(ls, p.encoder.Encode(ls.Iter()), p.encoder) + elabels := export.NewSimpleLabels(p.encoder, labels...) - key := desc.Name() + "_" + elabels.Encoded() + key := desc.Name() + "_" + elabels.Encoded(p.encoder) if record, ok := p.records[key]; ok { return record.Aggregator(), false } diff --git a/exporters/otlp/internal/transform/metric_test.go b/exporters/otlp/internal/transform/metric_test.go index 3eec2e57568..7a7bf94a90b 100644 --- a/exporters/otlp/internal/transform/metric_test.go +++ b/exporters/otlp/internal/transform/metric_test.go @@ -150,7 +150,7 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) { metric.WithKeys(test.keys...), metric.WithDescription(test.description), metric.WithUnit(test.unit)) - labels := export.NewLabels(export.LabelSlice(test.labels), "", nil) + labels := export.NewSimpleLabels(export.NoopLabelEncoder{}, test.labels...) got, err := minMaxSumCount(&desc, labels, mmsc) if assert.NoError(t, err) { assert.Equal(t, test.expected, got.MetricDescriptor) @@ -160,7 +160,7 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) { func TestMinMaxSumCountDatapoints(t *testing.T) { desc := metric.NewDescriptor("", metric.MeasureKind, core.Int64NumberKind) - labels := export.NewLabels(export.LabelSlice(nil), "", nil) + labels := export.NewSimpleLabels(export.NoopLabelEncoder{}) mmsc := minmaxsumcount.New(&desc) assert.NoError(t, mmsc.Update(context.Background(), 1, &desc)) assert.NoError(t, mmsc.Update(context.Background(), 10, &desc)) @@ -251,7 +251,7 @@ func TestSumMetricDescriptor(t *testing.T) { metric.WithDescription(test.description), metric.WithUnit(test.unit), ) - labels := export.NewLabels(export.LabelSlice(test.labels), "", nil) + labels := export.NewSimpleLabels(export.NoopLabelEncoder{}, test.labels...) got, err := sum(&desc, labels, sumAgg.New()) if assert.NoError(t, err) { assert.Equal(t, test.expected, got.MetricDescriptor) @@ -261,7 +261,7 @@ func TestSumMetricDescriptor(t *testing.T) { func TestSumInt64DataPoints(t *testing.T) { desc := metric.NewDescriptor("", metric.MeasureKind, core.Int64NumberKind) - labels := export.NewLabels(export.LabelSlice(nil), "", nil) + labels := export.NewSimpleLabels(export.NoopLabelEncoder{}) s := sumAgg.New() assert.NoError(t, s.Update(context.Background(), core.Number(1), &desc)) s.Checkpoint(context.Background(), &desc) @@ -275,7 +275,7 @@ func TestSumInt64DataPoints(t *testing.T) { func TestSumFloat64DataPoints(t *testing.T) { desc := metric.NewDescriptor("", metric.MeasureKind, core.Float64NumberKind) - labels := export.NewLabels(export.LabelSlice(nil), "", nil) + labels := export.NewSimpleLabels(export.NoopLabelEncoder{}) s := sumAgg.New() assert.NoError(t, s.Update(context.Background(), core.NewFloat64Number(1), &desc)) s.Checkpoint(context.Background(), &desc) diff --git a/exporters/otlp/otlp_test.go b/exporters/otlp/otlp_test.go index d24481cffe4..414b0206f80 100644 --- a/exporters/otlp/otlp_test.go +++ b/exporters/otlp/otlp_test.go @@ -30,7 +30,8 @@ import ( "go.opentelemetry.io/otel/api/metric" metricapi "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporters/otlp" - export "go.opentelemetry.io/otel/sdk/export/trace" + exportmetric "go.opentelemetry.io/otel/sdk/export/metric" + exporttrace "go.opentelemetry.io/otel/sdk/export/trace" "go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped" "go.opentelemetry.io/otel/sdk/metric/controller/push" "go.opentelemetry.io/otel/sdk/metric/selector/simple" @@ -110,7 +111,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) } selector := simple.NewWithExactMeasure() - batcher := ungrouped.New(selector, true) + batcher := ungrouped.New(selector, exportmetric.NewDefaultLabelEncoder(), true) pusher := push.New(batcher, exp, 60*time.Second) pusher.Start() @@ -321,7 +322,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { // reconnect. for j := 0; j < 3; j++ { - exp.ExportSpans(context.Background(), []*export.SpanData{{Name: "in the midst"}}) + exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}}) // Now resurrect the collector by making a new one but reusing the // old address, and the collector should reconnect automatically. @@ -332,7 +333,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { n := 10 for i := 0; i < n; i++ { - exp.ExportSpans(context.Background(), []*export.SpanData{{Name: "Resurrected"}}) + exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "Resurrected"}}) } nmaSpans := nmc.getSpans() diff --git a/sdk/metric/labelencoder.go b/sdk/export/metric/labelencoder.go similarity index 72% rename from sdk/metric/labelencoder.go rename to sdk/export/metric/labelencoder.go index fe2a9d9c4df..91d7f29001f 100644 --- a/sdk/metric/labelencoder.go +++ b/sdk/export/metric/labelencoder.go @@ -19,7 +19,6 @@ import ( "sync" "go.opentelemetry.io/otel/api/core" - export "go.opentelemetry.io/otel/sdk/export/metric" ) // escapeChar is used to ensure uniqueness of the label encoding where @@ -32,17 +31,20 @@ const escapeChar = '\\' type defaultLabelEncoder struct { // pool is a pool of labelset builders. The buffers in this // pool grow to a size that most label encodings will not - // allocate new memory. This pool reduces the number of - // allocations per new LabelSet to 3, typically, as seen in - // the benchmarks. (It should be 2--one for the LabelSet - // object and one for the buffer.String() here--see the extra - // allocation in the call to sort.Stable). + // allocate new memory. pool sync.Pool // *bytes.Buffer } -var _ export.LabelEncoder = &defaultLabelEncoder{} +var _ LabelEncoder = &defaultLabelEncoder{} -func NewDefaultLabelEncoder() export.LabelEncoder { +// NewDefaultLabelEncoder returns a label encoder that encodes labels +// in such a way that each escaped label's key is followed by an equal +// sign and then by an escaped label's value. All key-value pairs are +// separated by a comma. +// +// Escaping is done by prepending a backslash before either a +// backslash, equal sign or a comma. +func NewDefaultLabelEncoder() LabelEncoder { return &defaultLabelEncoder{ pool: sync.Pool{ New: func() interface{} { @@ -52,7 +54,9 @@ func NewDefaultLabelEncoder() export.LabelEncoder { } } -func (d *defaultLabelEncoder) Encode(iter export.LabelIterator) string { +// Encode is a part of an implementation of the LabelEncoder +// interface. +func (d *defaultLabelEncoder) Encode(iter LabelIterator) string { buf := d.pool.Get().(*bytes.Buffer) defer d.pool.Put(buf) buf.Reset() @@ -75,6 +79,11 @@ func (d *defaultLabelEncoder) Encode(iter export.LabelIterator) string { return buf.String() } +// ID is a part of an implementation of the LabelEncoder interface. +func (*defaultLabelEncoder) ID() int64 { + return defaultLabelEncoderID +} + func copyAndEscape(buf *bytes.Buffer, val string) { for _, ch := range val { switch ch { diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index 90e73e69dd6..e231e234a0d 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -16,11 +16,33 @@ package metric // import "go.opentelemetry.io/otel/sdk/export/metric" import ( "context" + "sync/atomic" "go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/metric" ) +const ( + // reserved ID for the noop label encoder + noopLabelEncoderID int64 = 1 + iota + // reserved ID for the default label encoder + defaultLabelEncoderID + + // this must come last in enumeration + lastLabelEncoderID +) + +// labelEncoderIDCounter is for generating IDs for other label +// encoders. +var labelEncoderIDCounter int64 = lastLabelEncoderID + +// NewLabelEncoderID returns a unique label encoder ID. It should be +// called once per each type of label encoder. Preferably in init() or +// in var definition. +func NewLabelEncoderID() int64 { + return atomic.AddInt64(&labelEncoderIDCounter, 1) +} + // Batcher is responsible for deciding which kind of aggregation to // use (via AggregationSelector), gathering exported results from the // SDK during collection, and deciding over which dimensions to group @@ -64,12 +86,7 @@ type Batcher interface { // Process is called by the SDK once per internal record, // passing the export Record (a Descriptor, the corresponding - // Labels, and the checkpointed Aggregator). The Batcher - // should be prepared to process duplicate (Descriptor, - // Labels) pairs during this pass due to race conditions, but - // this will usually be the ordinary course of events, as - // Aggregators are typically merged according the output set - // of labels. + // Labels, and the checkpointed Aggregator). // // The Context argument originates from the controller that // orchestrates collection. @@ -169,22 +186,6 @@ type Exporter interface { Export(context.Context, CheckpointSet) error } -// Convenience function that creates a slice of labels from the passed -// iterator. The iterator is set up to start from the beginning before -// creating the slice. -func IteratorToSlice(iter LabelIterator) []core.KeyValue { - l := iter.Len() - if l == 0 { - return nil - } - iter.idx = -1 - slice := make([]core.KeyValue, 0, l) - for iter.Next() { - slice = append(slice, iter.Label()) - } - return slice -} - // LabelStorage provides an access to the ordered labels. type LabelStorage interface { // NumLabels returns a number of labels in the storage. @@ -260,27 +261,42 @@ func (i *LabelIterator) Len() int { return i.storage.NumLabels() } +// Convenience function that creates a slice of labels from the passed +// iterator. The iterator is set up to start from the beginning before +// creating the slice. +func IteratorToSlice(iter LabelIterator) []core.KeyValue { + l := iter.Len() + if l == 0 { + return nil + } + iter.idx = -1 + slice := make([]core.KeyValue, 0, l) + for iter.Next() { + slice = append(slice, iter.Label()) + } + return slice +} + // LabelEncoder enables an optimization for export pipelines that use // text to encode their label sets. // -// This interface allows configuring the encoder used in the SDK -// and/or the Batcher so that by the time the exporter is called, the -// same encoding may be used. -// -// If none is provided, a default will be used. +// This interface allows configuring the encoder used in the Batcher +// so that by the time the exporter is called, the same encoding may +// be used. type LabelEncoder interface { // Encode is called (concurrently) in instrumentation context. - // It should return a unique representation of the labels - // suitable for the SDK to use as a map key. // - // The exported Labels object retains a reference to its - // LabelEncoder to determine which encoding was used. - // - // The expectation is that Exporters with a pre-determined to - // syntax for serialized label sets should implement - // LabelEncoder, thus avoiding duplicate computation in the - // export path. + // The expectation is that when setting up an export pipeline + // both the batcher and the exporter will use the same label + // encoder to avoid the duplicate computation of the encoded + // labels in the export path. Encode(LabelIterator) string + + // ID should return a unique positive number associated with + // the label encoder. Stateless label encoders could return + // the same number regardless of an instance, stateful label + // encoders should return a number depending on their state. + ID() int64 } // CheckpointSet allows a controller to access a complete checkpoint of @@ -311,36 +327,42 @@ type Record struct { // including the labels in an appropriate order (as defined by the // Batcher). If the batcher does not re-order labels, they are // presented in sorted order by the SDK. -type Labels struct { - storage LabelStorage - encoded string - encoder LabelEncoder +type Labels interface { + Iter() LabelIterator + Encoded(LabelEncoder) string } -// NewLabels builds a Labels object, consisting of an ordered set of -// labels, a unique encoded representation, and the encoder that -// produced it. -func NewLabels(storage LabelStorage, encoded string, encoder LabelEncoder) Labels { - return Labels{ - storage: storage, - encoded: encoded, - encoder: encoder, - } +type labels struct { + encoderID int64 + encoded string + slice LabelSlice } -// Iter returns an iterator over ordered labels. -func (l Labels) Iter() LabelIterator { - return NewLabelIterator(l.storage) +var _ Labels = &labels{} + +// NewSimpleLabels builds a Labels object, consisting of an ordered +// set of labels in a provided slice and a unique encoded +// representation generated by the passed encoder. +func NewSimpleLabels(encoder LabelEncoder, kvs ...core.KeyValue) Labels { + l := &labels{ + encoderID: encoder.ID(), + slice: kvs, + } + l.encoded = encoder.Encode(l.Iter()) + return l } -// Encoded is a pre-encoded form of the ordered labels. -func (l Labels) Encoded() string { - return l.encoded +// Iter is a part of an implementation of the Labels interface. +func (l *labels) Iter() LabelIterator { + return l.slice.Iter() } -// Encoder is the encoder that computed the Encoded() representation. -func (l Labels) Encoder() LabelEncoder { - return l.encoder +// Encoded is a part of an implementation of the Labels interface. +func (l *labels) Encoded(encoder LabelEncoder) string { + if l.encoderID == encoder.ID() { + return l.encoded + } + return encoder.Encode(l.Iter()) } // NewRecord allows Batcher implementations to construct export diff --git a/sdk/export/metric/noop.go b/sdk/export/metric/noop.go new file mode 100644 index 00000000000..d460cd85859 --- /dev/null +++ b/sdk/export/metric/noop.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +// NoopLabelEncoder does no encoding at all. +type NoopLabelEncoder struct{} + +var _ LabelEncoder = NoopLabelEncoder{} + +// Encode is a part of an implementation of the LabelEncoder +// interface. It returns an empty string. +func (NoopLabelEncoder) Encode(LabelIterator) string { + return "" +} + +// ID is a part of an implementation of the LabelEncoder interface. +func (NoopLabelEncoder) ID() int64 { + return noopLabelEncoderID +} diff --git a/sdk/metric/batcher/defaultkeys/defaultkeys.go b/sdk/metric/batcher/defaultkeys/defaultkeys.go index ca0093bb730..521a52e4eab 100644 --- a/sdk/metric/batcher/defaultkeys/defaultkeys.go +++ b/sdk/metric/batcher/defaultkeys/defaultkeys.go @@ -109,7 +109,8 @@ func (b *Batcher) Process(_ context.Context, record export.Record) error { } // Compute an encoded lookup key. - encoded := b.labelEncoder.Encode(export.LabelSlice(outputLabels).Iter()) + elabels := export.NewSimpleLabels(b.labelEncoder, outputLabels...) + encoded := elabels.Encoded(b.labelEncoder) // Merge this aggregator with all preceding aggregators that // map to the same set of `outputLabels` labels. @@ -137,11 +138,7 @@ func (b *Batcher) Process(_ context.Context, record export.Record) error { return err } } - b.aggCheckpoint[key] = export.NewRecord( - desc, - export.NewLabels(export.LabelSlice(outputLabels), encoded, b.labelEncoder), - agg, - ) + b.aggCheckpoint[key] = export.NewRecord(desc, elabels, agg) return nil } diff --git a/sdk/metric/batcher/defaultkeys/defaultkeys_test.go b/sdk/metric/batcher/defaultkeys/defaultkeys_test.go index 5c7e8c37e67..45a6d76697d 100644 --- a/sdk/metric/batcher/defaultkeys/defaultkeys_test.go +++ b/sdk/metric/batcher/defaultkeys/defaultkeys_test.go @@ -49,7 +49,7 @@ func TestGroupingStateless(t *testing.T) { checkpointSet := b.CheckpointSet() b.FinishedCollection() - records := test.Output{} + records := test.NewOutput(test.GroupEncoder) err := checkpointSet.ForEach(records.AddTo) require.NoError(t, err) @@ -65,7 +65,7 @@ func TestGroupingStateless(t *testing.T) { "lastvalue.a/G=": 30, // labels3 = last value "lastvalue.b/G=H": 10, // labels1 "lastvalue.b/G=": 30, // labels3 = last value - }, records) + }, records.Map) // Verify that state is reset by FinishedCollection() checkpointSet = b.CheckpointSet() @@ -91,24 +91,24 @@ func TestGroupingStateful(t *testing.T) { checkpointSet := b.CheckpointSet() b.FinishedCollection() - records1 := test.Output{} + records1 := test.NewOutput(test.GroupEncoder) err := checkpointSet.ForEach(records1.AddTo) require.NoError(t, err) require.EqualValues(t, map[string]float64{ "sum.a/C=D": 10, // labels1 "sum.b/C=D": 10, // labels1 - }, records1) + }, records1.Map) // Test that state was NOT reset checkpointSet = b.CheckpointSet() b.FinishedCollection() - records2 := test.Output{} + records2 := test.NewOutput(test.GroupEncoder) err = checkpointSet.ForEach(records2.AddTo) require.NoError(t, err) - require.EqualValues(t, records1, records2) + require.EqualValues(t, records1.Map, records2.Map) // Update and re-checkpoint the original record. _ = caggA.Update(ctx, core.NewInt64Number(20), &test.CounterADesc) @@ -121,11 +121,11 @@ func TestGroupingStateful(t *testing.T) { checkpointSet = b.CheckpointSet() b.FinishedCollection() - records3 := test.Output{} + records3 := test.NewOutput(test.GroupEncoder) err = checkpointSet.ForEach(records3.AddTo) require.NoError(t, err) - require.EqualValues(t, records1, records3) + require.EqualValues(t, records1.Map, records3.Map) // Now process the second update _ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, caggA)) @@ -134,12 +134,12 @@ func TestGroupingStateful(t *testing.T) { checkpointSet = b.CheckpointSet() b.FinishedCollection() - records4 := test.Output{} + records4 := test.NewOutput(test.GroupEncoder) err = checkpointSet.ForEach(records4.AddTo) require.NoError(t, err) require.EqualValues(t, map[string]float64{ "sum.a/C=D": 30, "sum.b/C=D": 30, - }, records4) + }, records4.Map) } diff --git a/sdk/metric/batcher/test/test.go b/sdk/metric/batcher/test/test.go index 1b1814bc4a5..acf0c23a755 100644 --- a/sdk/metric/batcher/test/test.go +++ b/sdk/metric/batcher/test/test.go @@ -24,7 +24,6 @@ import ( "go.opentelemetry.io/otel/api/metric" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" - sdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" ) @@ -34,7 +33,10 @@ type ( Encoder struct{} // Output collects distinct metric/label set outputs. - Output map[string]float64 + Output struct { + Map map[string]float64 + labelEncoder export.LabelEncoder + } // testAggregationSelector returns aggregators consistent with // the test variables below, needed for testing stateful @@ -57,7 +59,7 @@ var ( // SdkEncoder uses a non-standard encoder like K1~V1&K2~V2 SdkEncoder = &Encoder{} // GroupEncoder uses the SDK default encoder - GroupEncoder = sdk.NewDefaultLabelEncoder() + GroupEncoder = export.NewDefaultLabelEncoder() // LastValue groups are (labels1), (labels2+labels3) // Counter groups are (labels1+labels2), (labels3) @@ -68,8 +70,17 @@ var ( Labels2 = makeLabels(SdkEncoder, key.String("C", "D"), key.String("E", "F")) // Labels3 is the empty set Labels3 = makeLabels(SdkEncoder) + + leID = export.NewLabelEncoderID() ) +func NewOutput(labelEncoder export.LabelEncoder) Output { + return Output{ + Map: make(map[string]float64), + labelEncoder: labelEncoder, + } +} + // NewAggregationSelector returns a policy that is consistent with the // test descriptors above. I.e., it returns sum.New() for counter // instruments and lastvalue.New for lastValue instruments. @@ -89,8 +100,7 @@ func (*testAggregationSelector) AggregatorFor(desc *metric.Descriptor) export.Ag } func makeLabels(encoder export.LabelEncoder, labels ...core.KeyValue) export.Labels { - ls := export.LabelSlice(labels) - return export.NewLabels(ls, encoder.Encode(ls.Iter()), encoder) + return export.NewSimpleLabels(encoder, labels...) } func (Encoder) Encode(iter export.LabelIterator) string { @@ -107,6 +117,10 @@ func (Encoder) Encode(iter export.LabelIterator) string { return sb.String() } +func (Encoder) ID() int64 { + return leID +} + // LastValueAgg returns a checkpointed lastValue aggregator w/ the specified descriptor and value. func LastValueAgg(desc *metric.Descriptor, v int64) export.Aggregator { ctx := context.Background() @@ -138,8 +152,8 @@ func CounterAgg(desc *metric.Descriptor, v int64) export.Aggregator { // AddTo adds a name/label-encoding entry with the lastValue or counter // value to the output map. func (o Output) AddTo(rec export.Record) error { - labels := rec.Labels() - key := fmt.Sprint(rec.Descriptor().Name(), "/", labels.Encoded()) + encoded := rec.Labels().Encoded(o.labelEncoder) + key := fmt.Sprint(rec.Descriptor().Name(), "/", encoded) var value float64 if s, ok := rec.Aggregator().(aggregator.Sum); ok { @@ -151,6 +165,6 @@ func (o Output) AddTo(rec export.Record) error { } else { panic(fmt.Sprintf("Unhandled aggregator type: %T", rec.Aggregator())) } - o[key] = value + o.Map[key] = value return nil } diff --git a/sdk/metric/batcher/ungrouped/ungrouped.go b/sdk/metric/batcher/ungrouped/ungrouped.go index cc289b98401..6eced0460c1 100644 --- a/sdk/metric/batcher/ungrouped/ungrouped.go +++ b/sdk/metric/batcher/ungrouped/ungrouped.go @@ -25,9 +25,10 @@ import ( type ( Batcher struct { - selector export.AggregationSelector - batchMap batchMap - stateful bool + selector export.AggregationSelector + batchMap batchMap + stateful bool + labelEncoder export.LabelEncoder } batchKey struct { @@ -46,11 +47,12 @@ type ( var _ export.Batcher = &Batcher{} var _ export.CheckpointSet = batchMap{} -func New(selector export.AggregationSelector, stateful bool) *Batcher { +func New(selector export.AggregationSelector, labelEncoder export.LabelEncoder, stateful bool) *Batcher { return &Batcher{ - selector: selector, - batchMap: batchMap{}, - stateful: stateful, + selector: selector, + batchMap: batchMap{}, + stateful: stateful, + labelEncoder: labelEncoder, } } @@ -60,9 +62,10 @@ func (b *Batcher) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator func (b *Batcher) Process(_ context.Context, record export.Record) error { desc := record.Descriptor() + encoded := record.Labels().Encoded(b.labelEncoder) key := batchKey{ descriptor: desc, - encoded: record.Labels().Encoded(), + encoded: encoded, } agg := record.Aggregator() value, ok := b.batchMap[key] diff --git a/sdk/metric/batcher/ungrouped/ungrouped_test.go b/sdk/metric/batcher/ungrouped/ungrouped_test.go index 6ff999f12d7..8a5829203e0 100644 --- a/sdk/metric/batcher/ungrouped/ungrouped_test.go +++ b/sdk/metric/batcher/ungrouped/ungrouped_test.go @@ -30,7 +30,7 @@ import ( func TestUngroupedStateless(t *testing.T) { ctx := context.Background() - b := ungrouped.New(test.NewAggregationSelector(), false) + b := ungrouped.New(test.NewAggregationSelector(), test.SdkEncoder, false) // Set initial lastValue values _ = b.Process(ctx, test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 10)) @@ -61,7 +61,7 @@ func TestUngroupedStateless(t *testing.T) { checkpointSet := b.CheckpointSet() b.FinishedCollection() - records := test.Output{} + records := test.NewOutput(test.SdkEncoder) _ = checkpointSet.ForEach(records.AddTo) // Output lastvalue should have only the "G=H" and "G=" keys. @@ -79,7 +79,7 @@ func TestUngroupedStateless(t *testing.T) { "lastvalue.b/G~H&C~D": 50, // labels1 "lastvalue.b/C~D&E~F": 20, // labels2 "lastvalue.b/": 30, // labels3 - }, records) + }, records.Map) // Verify that state was reset checkpointSet = b.CheckpointSet() @@ -92,7 +92,7 @@ func TestUngroupedStateless(t *testing.T) { func TestUngroupedStateful(t *testing.T) { ctx := context.Background() - b := ungrouped.New(test.NewAggregationSelector(), true) + b := ungrouped.New(test.NewAggregationSelector(), test.SdkEncoder, true) counterA := test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10) caggA := counterA.Aggregator() @@ -105,22 +105,22 @@ func TestUngroupedStateful(t *testing.T) { checkpointSet := b.CheckpointSet() b.FinishedCollection() - records1 := test.Output{} + records1 := test.NewOutput(test.SdkEncoder) _ = checkpointSet.ForEach(records1.AddTo) require.EqualValues(t, map[string]float64{ "sum.a/G~H&C~D": 10, // labels1 "sum.b/G~H&C~D": 10, // labels1 - }, records1) + }, records1.Map) // Test that state was NOT reset checkpointSet = b.CheckpointSet() b.FinishedCollection() - records2 := test.Output{} + records2 := test.NewOutput(test.SdkEncoder) _ = checkpointSet.ForEach(records2.AddTo) - require.EqualValues(t, records1, records2) + require.EqualValues(t, records1.Map, records2.Map) // Update and re-checkpoint the original record. _ = caggA.Update(ctx, core.NewInt64Number(20), &test.CounterADesc) @@ -133,10 +133,10 @@ func TestUngroupedStateful(t *testing.T) { checkpointSet = b.CheckpointSet() b.FinishedCollection() - records3 := test.Output{} + records3 := test.NewOutput(test.SdkEncoder) _ = checkpointSet.ForEach(records3.AddTo) - require.EqualValues(t, records1, records3) + require.EqualValues(t, records1.Map, records3.Map) // Now process the second update _ = b.Process(ctx, export.NewRecord(&test.CounterADesc, test.Labels1, caggA)) @@ -145,11 +145,11 @@ func TestUngroupedStateful(t *testing.T) { checkpointSet = b.CheckpointSet() b.FinishedCollection() - records4 := test.Output{} + records4 := test.NewOutput(test.SdkEncoder) _ = checkpointSet.ForEach(records4.AddTo) require.EqualValues(t, map[string]float64{ "sum.a/G~H&C~D": 30, "sum.b/G~H&C~D": 30, - }, records4) + }, records4.Map) } diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 99bbb2b9b55..243422e5f18 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -32,24 +32,27 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" ) +type processFunc func(context.Context, export.Record) error + type benchFixture struct { meter metric.MeterMust sdk *sdk.SDK B *testing.B + pcb processFunc } -func newFixtureWithEncoder(b *testing.B, encoder export.LabelEncoder) *benchFixture { +func newFixture(b *testing.B) *benchFixture { b.ReportAllocs() bf := &benchFixture{ B: b, } - bf.sdk = sdk.New(bf, encoder) + bf.sdk = sdk.New(bf) bf.meter = metric.Must(metric.WrapMeterImpl(bf.sdk)) return bf } -func newFixture(b *testing.B) *benchFixture { - return newFixtureWithEncoder(b, sdk.NewDefaultLabelEncoder()) +func (f *benchFixture) setProcessCallback(cb processFunc) { + f.pcb = cb } func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { @@ -71,8 +74,11 @@ func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega return nil } -func (*benchFixture) Process(context.Context, export.Record) error { - return nil +func (f *benchFixture) Process(ctx context.Context, rec export.Record) error { + if f.pcb == nil { + return nil + } + return f.pcb(ctx, rec) } func (*benchFixture) CheckpointSet() export.CheckpointSet { @@ -198,32 +204,25 @@ func BenchmarkAcquireReleaseExistingHandle(b *testing.B) { // Iterators -type benchmarkEncoder struct { - b *testing.B -} - -var _ export.LabelEncoder = benchmarkEncoder{} - -var benchmarkEncoderVar core.KeyValue - -func (e benchmarkEncoder) Encode(li export.LabelIterator) string { - var kv core.KeyValue - e.b.StartTimer() - for i := 0; i < e.b.N; i++ { - iter := li - // test getting only the first element - if iter.Next() { - kv = iter.Label() - } - } - e.b.StopTimer() - benchmarkEncoderVar = kv - return "foo=bar" -} +var benchmarkIteratorVar core.KeyValue func benchmarkIterator(b *testing.B, n int) { - encoder := benchmarkEncoder{b: b} - fix := newFixtureWithEncoder(b, encoder) + fix := newFixture(b) + fix.setProcessCallback(func(ctx context.Context, rec export.Record) error { + var kv core.KeyValue + li := rec.Labels().Iter() + fix.B.StartTimer() + for i := 0; i < fix.B.N; i++ { + iter := li + // test getting only the first element + if iter.Next() { + kv = iter.Label() + } + } + fix.B.StopTimer() + benchmarkIteratorVar = kv + return nil + }) labs := fix.sdk.Labels(makeLabels(n)...) cnt := fix.meter.NewInt64Counter("int64.counter") ctx := context.Background() diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index 2778259bb94..7477c05bda5 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -69,24 +69,13 @@ var _ Ticker = realTicker{} // using the provided batcher, exporter, collection period, and SDK // configuration options to configure an SDK with periodic collection. // The batcher itself is configured with the aggregation selector policy. -// -// If the Exporter implements the export.LabelEncoder interface, the -// exporter will be used as the label encoder for the SDK itself, -// otherwise the SDK will be configured with the default label -// encoder. func New(batcher export.Batcher, exporter export.Exporter, period time.Duration, opts ...Option) *Controller { - lencoder, _ := exporter.(export.LabelEncoder) - - if lencoder == nil { - lencoder = sdk.NewDefaultLabelEncoder() - } - c := &Config{ErrorHandler: sdk.DefaultErrorHandler} for _, opt := range opts { opt.Apply(c) } - impl := sdk.New(batcher, lencoder, sdk.WithResource(c.Resource), sdk.WithErrorHandler(c.ErrorHandler)) + impl := sdk.New(batcher, sdk.WithResource(c.Resource), sdk.WithErrorHandler(c.ErrorHandler)) return &Controller{ sdk: impl, meter: metric.WrapMeterImpl(impl), diff --git a/sdk/metric/controller/push/push_test.go b/sdk/metric/controller/push/push_test.go index bcd5be3c5d7..a5378d17d11 100644 --- a/sdk/metric/controller/push/push_test.go +++ b/sdk/metric/controller/push/push_test.go @@ -29,7 +29,6 @@ import ( "go.opentelemetry.io/otel/exporters/metric/test" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" - sdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/controller/push" ) @@ -68,7 +67,7 @@ var _ push.Clock = mockClock{} var _ push.Ticker = mockTicker{} func newFixture(t *testing.T) testFixture { - checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder()) batcher := &testBatcher{ t: t, diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 1e1a171f005..8f6650c55af 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -28,7 +28,7 @@ import ( "go.opentelemetry.io/otel/api/metric" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" - sdk "go.opentelemetry.io/otel/sdk/metric" + metricsdk "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/array" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" batchTest "go.opentelemetry.io/otel/sdk/metric/batcher/test" @@ -42,8 +42,6 @@ type correctnessBatcher struct { records []export.Record } -type testLabelEncoder struct{} - func (cb *correctnessBatcher) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { name := descriptor.Name() switch { @@ -69,16 +67,12 @@ func (cb *correctnessBatcher) Process(_ context.Context, record export.Record) e return nil } -func (testLabelEncoder) Encode(iter export.LabelIterator) string { - return fmt.Sprint(export.IteratorToSlice(iter)) -} - func TestInputRangeTestCounter(t *testing.T) { ctx := context.Background() batcher := &correctnessBatcher{ t: t, } - sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) + sdk := metricsdk.New(batcher) meter := metric.WrapMeterImpl(sdk) var sdkErr error @@ -113,7 +107,7 @@ func TestInputRangeTestMeasure(t *testing.T) { batcher := &correctnessBatcher{ t: t, } - sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) + sdk := metricsdk.New(batcher) meter := metric.WrapMeterImpl(sdk) var sdkErr error @@ -151,7 +145,7 @@ func TestDisabledInstrument(t *testing.T) { batcher := &correctnessBatcher{ t: t, } - sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) + sdk := metricsdk.New(batcher) meter := metric.WrapMeterImpl(sdk) measure := Must(meter).NewFloat64Measure("name.disabled") @@ -167,7 +161,7 @@ func TestRecordNaN(t *testing.T) { batcher := &correctnessBatcher{ t: t, } - sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) + sdk := metricsdk.New(batcher) meter := metric.WrapMeterImpl(sdk) var sdkErr error @@ -181,31 +175,12 @@ func TestRecordNaN(t *testing.T) { require.Error(t, sdkErr) } -func TestSDKAltLabelEncoder(t *testing.T) { - ctx := context.Background() - batcher := &correctnessBatcher{ - t: t, - } - sdk := sdk.New(batcher, testLabelEncoder{}) - meter := metric.WrapMeterImpl(sdk) - - measure := Must(meter).NewFloat64Measure("measure") - measure.Record(ctx, 1, sdk.Labels(key.String("A", "B"), key.String("C", "D"))) - - sdk.Collect(ctx) - - require.Equal(t, 1, len(batcher.records)) - - labels := batcher.records[0].Labels() - require.Equal(t, `[{A {8 0 B}} {C {8 0 D}}]`, labels.Encoded()) -} - func TestSDKLabelsDeduplication(t *testing.T) { ctx := context.Background() batcher := &correctnessBatcher{ t: t, } - sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) + sdk := metricsdk.New(batcher) meter := metric.WrapMeterImpl(sdk) counter := Must(meter).NewInt64Counter("counter") @@ -270,7 +245,7 @@ func TestSDKLabelsDeduplication(t *testing.T) { } func TestDefaultLabelEncoder(t *testing.T) { - encoder := sdk.NewDefaultLabelEncoder() + encoder := export.NewDefaultLabelEncoder() encoded := encoder.Encode(export.LabelSlice([]core.KeyValue{key.String("A", "B"), key.String("C", "D")}).Iter()) require.Equal(t, `A=B,C=D`, encoded) @@ -303,7 +278,7 @@ func TestObserverCollection(t *testing.T) { batcher := &correctnessBatcher{ t: t, } - sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) + sdk := metricsdk.New(batcher) meter := metric.WrapMeterImpl(sdk) _ = Must(meter).RegisterFloat64Observer("float.observer", func(result metric.Float64ObserverResult) { @@ -327,7 +302,7 @@ func TestObserverCollection(t *testing.T) { require.Equal(t, 4, collected) require.Equal(t, 4, len(batcher.records)) - out := batchTest.Output{} + out := batchTest.NewOutput(export.NewDefaultLabelEncoder()) for _, rec := range batcher.records { _ = out.AddTo(rec) } @@ -336,6 +311,6 @@ func TestObserverCollection(t *testing.T) { "float.observer/C=D": -1, "int.observer/": 1, "int.observer/A=B": 1, - }, out) + }, out.Map) } diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index e66f7604855..8cc10335be9 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -60,9 +60,6 @@ type ( // batcher is the configured batcher+configuration. batcher export.Batcher - // lencoder determines how labels are uniquely encoded. - labelEncoder export.LabelEncoder - // collectLock prevents simultaneous calls to Collect(). collectLock sync.Mutex @@ -85,6 +82,12 @@ type ( // represents an internalized set of labels that may be used // repeatedly. labels struct { + // cachedEncoderID needs to be aligned for atomic access + cachedEncoderID int64 + // cachedEncoded is an encoded version of ordered + // labels + cachedEncoded string + meter *SDK // ordered is the output of sorting and deduplicating // the labels, copied into an array of the correct @@ -98,9 +101,6 @@ type ( // cachedValue contains a `reflect.Value` of the `ordered` // member cachedValue reflect.Value - // cachedEncoded contains an encoded version of the - // `ordered` member - cachedEncoded string } // mapkey uniquely describes a metric instrument in terms of @@ -168,6 +168,7 @@ var ( _ api.BoundSyncImpl = &record{} _ api.Resourcer = &SDK{} _ export.LabelStorage = &labels{} + _ export.Labels = &labels{} kvType = reflect.TypeOf(core.KeyValue{}) ) @@ -306,7 +307,7 @@ func (s *syncInstrument) RecordOne(ctx context.Context, number core.Number, ls a // batcher will call Collect() when it receives a request to scrape // current metric values. A push-based batcher should configure its // own periodic collection. -func New(batcher export.Batcher, labelEncoder export.LabelEncoder, opts ...Option) *SDK { +func New(batcher export.Batcher, opts ...Option) *SDK { c := &Config{ErrorHandler: DefaultErrorHandler} for _, opt := range opts { opt.Apply(c) @@ -317,7 +318,6 @@ func New(batcher export.Batcher, labelEncoder export.LabelEncoder, opts ...Optio ordered: [0]core.KeyValue{}, }, batcher: batcher, - labelEncoder: labelEncoder, errorHandler: c.ErrorHandler, resource: c.Resource, } @@ -366,14 +366,69 @@ func (m *SDK) Labels(kvs ...core.KeyValue) api.LabelSet { return ls } +// NumLabels is a part of an implementation of the export.LabelStorage +// interface. func (ls *labels) NumLabels() int { return ls.cachedValue.Len() } +// GetLabel is a part of an implementation of the export.LabelStorage +// interface. func (ls *labels) GetLabel(idx int) core.KeyValue { return ls.cachedValue.Index(idx).Interface().(core.KeyValue) } +// Iter is a part of an implementation of the export.Labels interface. +func (ls *labels) Iter() export.LabelIterator { + return export.NewLabelIterator(ls) +} + +// Encoded is a part of an implementation of the export.Labels +// interface. +func (ls *labels) Encoded(encoder export.LabelEncoder) string { + id := encoder.ID() + if id <= 0 { + // Punish misbehaving encoders by not even trying to + // cache them + return encoder.Encode(ls.Iter()) + } + cachedID := atomic.LoadInt64(&ls.cachedEncoderID) + // If cached ID is less than zero, it means that other + // goroutine is currently caching the encoded labels and the + // ID of the encoder. Wait until it's done - it's a + // nonblocking op. + for cachedID < 0 { + // Let other goroutine finish its work. + runtime.Gosched() + cachedID = atomic.LoadInt64(&ls.cachedEncoderID) + } + // At this point, cachedID is either 0 (nothing cached) or + // some other number. + // + // If cached ID is the same as ID of the passed encoder, we've + // got the fast path. + if cachedID == id { + return ls.cachedEncoded + } + // If we are here, either some other encoder cached its + // encoded labels or the cache is still for the taking. Either + // way, we need to compute the encoded labels anyway. + encoded := encoder.Encode(ls.Iter()) + // If some other encoder took the cache, then we just return + // our encoded labels. That's a slow path. + if cachedID > 0 { + return encoded + } + // Try to take the cache for ourselves. This is the place + // where other encoders may be "blocked". + if atomic.CompareAndSwapInt64(&ls.cachedEncoderID, 0, -1) { + // The cache is ours. + ls.cachedEncoded = encoded + atomic.StoreInt64(&ls.cachedEncoderID, id) + } + return encoded +} + func (ls *labels) computeOrdered(kvs []core.KeyValue) { ls.ordered = computeOrderedFixed(kvs) if ls.ordered == nil { @@ -382,14 +437,6 @@ func (ls *labels) computeOrdered(kvs []core.KeyValue) { ls.cachedValue = reflect.ValueOf(ls.ordered) } -func (ls *labels) ensureEncoded(encoder export.LabelEncoder) { - if ls.cachedEncoded != "" { - return - } - iter := export.NewLabelIterator(ls) - ls.cachedEncoded = encoder.Encode(iter) -} - func computeOrderedFixed(kvs []core.KeyValue) orderedLabels { switch len(kvs) { case 1: @@ -567,9 +614,7 @@ func (m *SDK) checkpoint(ctx context.Context, descriptor *metric.Descriptor, rec } recorder.Checkpoint(ctx, descriptor) - labels.ensureEncoded(m.labelEncoder) - exportLabels := export.NewLabels(labels, labels.cachedEncoded, m.labelEncoder) - exportRecord := export.NewRecord(descriptor, exportLabels, recorder) + exportRecord := export.NewRecord(descriptor, labels, recorder) err := m.batcher.Process(ctx, exportRecord) if err != nil { m.errorHandler(err) diff --git a/sdk/metric/stress_test.go b/sdk/metric/stress_test.go index 1ce1714fd2e..f2d49d7d73c 100644 --- a/sdk/metric/stress_test.go +++ b/sdk/metric/stress_test.go @@ -296,7 +296,7 @@ func stressTest(t *testing.T, impl testImpl) { lused: map[string]bool{}, } cc := concurrency() - sdk := New(fixture, NewDefaultLabelEncoder()) + sdk := New(fixture) meter := metric.WrapMeterImpl(sdk) fixture.wg.Add(cc + 1)