From 459df463fbbbeadfcadbc88ad3df6f4929ee2086 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens <pablo.baeyens@datadoghq.com> Date: Tue, 21 Sep 2021 09:47:29 +0200 Subject: [PATCH 1/5] [exporter/datadogexporter] Use a `Consumer` interface for decoupling from zorkian's package --- .../internal/metrics/consumer.go | 117 +++++ .../internal/metrics/consumer_test.go | 84 ++++ .../internal/translator/consumer.go | 69 +++ .../internal/translator/metrics_translator.go | 171 ++++--- .../translator/metrics_translator_test.go | 423 +++++++++--------- .../internal/translator/sketches_test.go | 29 +- exporter/datadogexporter/metrics_exporter.go | 7 +- 7 files changed, 607 insertions(+), 293 deletions(-) create mode 100644 exporter/datadogexporter/internal/metrics/consumer.go create mode 100644 exporter/datadogexporter/internal/metrics/consumer_test.go create mode 100644 exporter/datadogexporter/internal/translator/consumer.go diff --git a/exporter/datadogexporter/internal/metrics/consumer.go b/exporter/datadogexporter/internal/metrics/consumer.go new file mode 100644 index 000000000000..191785650555 --- /dev/null +++ b/exporter/datadogexporter/internal/metrics/consumer.go @@ -0,0 +1,117 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "gopkg.in/zorkian/go-datadog-api.v2" + + "github.com/DataDog/datadog-agent/pkg/quantile" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/sketches" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/translator" +) + +var _ translator.Consumer = (*Consumer)(nil) +var _ translator.HostConsumer = (*Consumer)(nil) + +// Consumer is the metrics Consumer. +type Consumer struct { + ms []datadog.Metric + sl sketches.SketchSeriesList + seenHosts map[string]struct{} +} + +// NewConsumer creates a new zorkian consumer. +func NewConsumer() *Consumer { + return &Consumer{ + seenHosts: make(map[string]struct{}), + } +} + +// toDataType maps translator datatypes to zorkian's datatypes. +func (c *Consumer) toDataType(dt translator.MetricDataType) (out MetricDataType) { + out = MetricDataType("unknown") + + switch dt { + case translator.Count: + out = Count + case translator.Gauge: + out = Gauge + } + + return +} + +// runningMetrics gets the running metrics for the exporter. +func (c *Consumer) runningMetrics(timestamp uint64, buildInfo component.BuildInfo) (series []datadog.Metric) { + for host := range c.seenHosts { + // Report the host as running + runningMetric := DefaultMetrics("metrics", host, timestamp, buildInfo) + series = append(series, runningMetric...) + } + + return +} + +// All gets all metrics (consumed metrics and running metrics). +func (c *Consumer) All(timestamp uint64, buildInfo component.BuildInfo) ([]datadog.Metric, sketches.SketchSeriesList) { + series := c.ms + series = append(series, c.runningMetrics(timestamp, buildInfo)...) + return series, c.sl +} + +// ConsumeTimeSeries implements the translator.Consumer interface. +func (c *Consumer) ConsumeTimeSeries( + _ context.Context, + name string, + typ translator.MetricDataType, + timestamp uint64, + value float64, + tags []string, + host string, +) { + dt := c.toDataType(typ) + met := NewMetric(name, dt, timestamp, value, tags) + met.SetHost(host) + c.ms = append(c.ms, met) +} + +// ConsumeSketch implements the translator.Consumer interface. +func (c *Consumer) ConsumeSketch( + _ context.Context, + name string, + timestamp uint64, + sketch *quantile.Sketch, + tags []string, + host string, +) { + c.sl = append(c.sl, sketches.SketchSeries{ + Name: name, + Tags: tags, + Host: host, + Interval: 1, + Points: []sketches.SketchPoint{{ + Ts: int64(timestamp / 1e9), + Sketch: sketch, + }}, + }) +} + +// ConsumeHost implements the translator.HostConsumer interface. +func (c *Consumer) ConsumeHost(host string) { + c.seenHosts[host] = struct{}{} +} diff --git a/exporter/datadogexporter/internal/metrics/consumer_test.go b/exporter/datadogexporter/internal/metrics/consumer_test.go new file mode 100644 index 000000000000..4aab2eced970 --- /dev/null +++ b/exporter/datadogexporter/internal/metrics/consumer_test.go @@ -0,0 +1,84 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/model/pdata" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/translator" +) + +func newTestCache() *translator.TTLCache { + cache := translator.NewTTLCache(1800, 3600) + return cache +} + +type testProvider string + +func (t testProvider) Hostname(context.Context) (string, error) { + return string(t), nil +} + +func newTranslator(logger *zap.Logger, cfg config.MetricsConfig) *translator.Translator { + return translator.New(newTestCache(), logger, cfg, testProvider("fallbackHostname")) +} + +func TestRunningMetrics(t *testing.T) { + ms := pdata.NewMetrics() + rms := ms.ResourceMetrics() + + rm := rms.AppendEmpty() + resAttrs := rm.Resource().Attributes() + resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-1")) + + rm = rms.AppendEmpty() + resAttrs = rm.Resource().Attributes() + resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-1")) + + rm = rms.AppendEmpty() + resAttrs = rm.Resource().Attributes() + resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-2")) + + rms.AppendEmpty() + + cfg := config.MetricsConfig{} + logger, _ := zap.NewProduction() + tr := newTranslator(logger, cfg) + + ctx := context.Background() + consumer := NewConsumer() + tr.MapMetrics(ctx, ms, consumer) + + runningHostnames := []string{} + for _, metric := range consumer.runningMetrics(0, component.BuildInfo{}) { + if metric.Host != nil { + runningHostnames = append(runningHostnames, *metric.Host) + } + } + + assert.ElementsMatch(t, + runningHostnames, + []string{"fallbackHostname", "resource-hostname-1", "resource-hostname-2"}, + ) + +} diff --git a/exporter/datadogexporter/internal/translator/consumer.go b/exporter/datadogexporter/internal/translator/consumer.go new file mode 100644 index 000000000000..21442dd19885 --- /dev/null +++ b/exporter/datadogexporter/internal/translator/consumer.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package translator + +import ( + "context" + + "github.com/DataDog/datadog-agent/pkg/quantile" +) + +type MetricDataType int + +const ( + // Gauge is the Datadog Gauge metric type. + Gauge MetricDataType = iota + // Count is the Datadog Count metric type. + Count +) + +// TimeSeriesConsumer is timeseries consumer. +type TimeSeriesConsumer interface { + // ConsumeTimeSeries consumes a timeseries-style metric. + ConsumeTimeSeries( + ctx context.Context, + name string, + typ MetricDataType, + timestamp uint64, + value float64, + tags []string, + host string, + ) +} + +// SketchConsumer is a pkg/quantile sketch consumer. +type SketchConsumer interface { + // ConsumeSketch consumes a pkg/quantile-style sketch. + ConsumeSketch( + ctx context.Context, + name string, + timestamp uint64, + sketch *quantile.Sketch, + tags []string, + host string, + ) +} + +// Consumer is a metrics consumer. +type Consumer interface { + TimeSeriesConsumer + SketchConsumer +} + +// HostConsumer is a hostname consumer. +type HostConsumer interface { + // ConsumeHost consumes a hostname. + ConsumeHost(host string) +} diff --git a/exporter/datadogexporter/internal/translator/metrics_translator.go b/exporter/datadogexporter/internal/translator/metrics_translator.go index bee6a96fba75..5b2ad29598de 100644 --- a/exporter/datadogexporter/internal/translator/metrics_translator.go +++ b/exporter/datadogexporter/internal/translator/metrics_translator.go @@ -19,18 +19,13 @@ import ( "fmt" "math" "strconv" - "time" - "github.com/DataDog/datadog-agent/pkg/quantile" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/model/pdata" "go.uber.org/zap" - "gopkg.in/zorkian/go-datadog-api.v2" + "github.com/DataDog/datadog-agent/pkg/quantile" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/sketches" ) const metricName string = "metric name" @@ -51,12 +46,11 @@ type Translator struct { prevPts *TTLCache logger *zap.Logger cfg config.MetricsConfig - buildInfo component.BuildInfo fallbackHostnameProvider HostnameProvider } -func New(cache *TTLCache, params component.ExporterCreateSettings, cfg config.MetricsConfig, fallbackHostProvider HostnameProvider) *Translator { - return &Translator{cache, params.Logger, cfg, params.BuildInfo, fallbackHostProvider} +func New(cache *TTLCache, logger *zap.Logger, cfg config.MetricsConfig, fallbackHostProvider HostnameProvider) *Translator { + return &Translator{cache, logger, cfg, fallbackHostProvider} } // getTags maps an attributeMap into a slice of Datadog tags @@ -95,8 +89,16 @@ func (t *Translator) isSkippable(name string, v float64) bool { } // mapNumberMetrics maps double datapoints into Datadog metrics -func (t *Translator) mapNumberMetrics(name string, dt metrics.MetricDataType, slice pdata.NumberDataPointSlice, attrTags []string) []datadog.Metric { - ms := make([]datadog.Metric, 0, slice.Len()) +func (t *Translator) mapNumberMetrics( + ctx context.Context, + consumer TimeSeriesConsumer, + name string, + dt MetricDataType, + slice pdata.NumberDataPointSlice, + attrTags []string, + host string, +) { + for i := 0; i < slice.Len(); i++ { p := slice.At(i) tags := getTags(p.Attributes()) @@ -113,16 +115,19 @@ func (t *Translator) mapNumberMetrics(name string, dt metrics.MetricDataType, sl continue } - ms = append(ms, - metrics.NewMetric(name, dt, uint64(p.Timestamp()), val, tags), - ) + consumer.ConsumeTimeSeries(ctx, name, dt, uint64(p.Timestamp()), val, tags, host) } - return ms } // mapNumberMonotonicMetrics maps monotonic datapoints into Datadog metrics -func (t *Translator) mapNumberMonotonicMetrics(name string, slice pdata.NumberDataPointSlice, attrTags []string) []datadog.Metric { - ms := make([]datadog.Metric, 0, slice.Len()) +func (t *Translator) mapNumberMonotonicMetrics( + ctx context.Context, + consumer TimeSeriesConsumer, + name string, + slice pdata.NumberDataPointSlice, + attrTags []string, + host string, +) { for i := 0; i < slice.Len(); i++ { p := slice.At(i) ts := uint64(p.Timestamp()) @@ -142,10 +147,9 @@ func (t *Translator) mapNumberMonotonicMetrics(name string, slice pdata.NumberDa } if dx, ok := t.prevPts.putAndGetDiff(name, tags, ts, val); ok { - ms = append(ms, metrics.NewCount(name, ts, dx, tags)) + consumer.ConsumeTimeSeries(ctx, name, Count, ts, dx, tags, host) } } - return ms } func getBounds(p pdata.HistogramDataPoint, idx int) (lowerBound float64, upperBound float64) { @@ -161,7 +165,16 @@ func getBounds(p pdata.HistogramDataPoint, idx int) (lowerBound float64, upperBo return } -func (t *Translator) getSketchBuckets(name string, ts uint64, p pdata.HistogramDataPoint, delta bool, tags []string) sketches.SketchSeries { +func (t *Translator) getSketchBuckets( + ctx context.Context, + consumer SketchConsumer, + name string, + ts uint64, + p pdata.HistogramDataPoint, + delta bool, + tags []string, + host string, +) { as := &quantile.Agent{} for j := range p.BucketCounts() { lowerBound, upperBound := getBounds(p, j) @@ -181,21 +194,21 @@ func (t *Translator) getSketchBuckets(name string, ts uint64, p pdata.HistogramD } } - return sketches.SketchSeries{ - Name: name, - Tags: tags, - Interval: 1, - Points: []sketches.SketchPoint{{ - Ts: int64(p.Timestamp() / 1e9), - Sketch: as.Finish(), - }}, - } + + consumer.ConsumeSketch(ctx, name, ts, as.Finish(), tags, host) } -func (t *Translator) getLegacyBuckets(name string, p pdata.HistogramDataPoint, delta bool, tags []string) []datadog.Metric { +func (t *Translator) getLegacyBuckets( + ctx context.Context, + consumer TimeSeriesConsumer, + name string, + p pdata.HistogramDataPoint, + delta bool, + tags []string, + host string, +) { // We have a single metric, 'bucket', which is tagged with the bucket bounds. See: // https://github.com/DataDog/integrations-core/blob/7.30.1/datadog_checks_base/datadog_checks/base/checks/openmetrics/v2/transformers/histogram.py - ms := make([]datadog.Metric, 0, len(p.BucketCounts())) fullName := fmt.Sprintf("%s.bucket", name) for idx, val := range p.BucketCounts() { lowerBound, upperBound := getBounds(p, idx) @@ -207,12 +220,11 @@ func (t *Translator) getLegacyBuckets(name string, p pdata.HistogramDataPoint, d count := float64(val) ts := uint64(p.Timestamp()) if delta { - ms = append(ms, metrics.NewCount(fullName, ts, count, bucketTags)) + consumer.ConsumeTimeSeries(ctx, fullName, Count, ts, count, bucketTags, host) } else if dx, ok := t.prevPts.putAndGetDiff(fullName, bucketTags, ts, count); ok { - ms = append(ms, metrics.NewCount(fullName, ts, dx, bucketTags)) + consumer.ConsumeTimeSeries(ctx, fullName, Count, ts, dx, bucketTags, host) } } - return ms } // mapHistogramMetrics maps double histogram metrics slices to Datadog metrics @@ -228,12 +240,15 @@ func (t *Translator) getLegacyBuckets(name string, p pdata.HistogramDataPoint, d // We follow a similar approach to our OpenMetrics check: // we report sum and count by default; buckets count can also // be reported (opt-in) tagged by lower bound. -func (t *Translator) mapHistogramMetrics(name string, slice pdata.HistogramDataPointSlice, delta bool, attrTags []string) (ms []datadog.Metric, sl sketches.SketchSeriesList) { - // Allocate assuming none are nil and no buckets - ms = make([]datadog.Metric, 0, 2*slice.Len()) - if t.cfg.HistConfig.Mode == histogramModeDistributions { - sl = make(sketches.SketchSeriesList, 0, slice.Len()) - } +func (t *Translator) mapHistogramMetrics( + ctx context.Context, + consumer Consumer, + name string, + slice pdata.HistogramDataPointSlice, + delta bool, + attrTags []string, + host string, +) { for i := 0; i < slice.Len(); i++ { p := slice.At(i) ts := uint64(p.Timestamp()) @@ -244,9 +259,9 @@ func (t *Translator) mapHistogramMetrics(name string, slice pdata.HistogramDataP count := float64(p.Count()) countName := fmt.Sprintf("%s.count", name) if delta { - ms = append(ms, metrics.NewCount(countName, ts, count, tags)) + consumer.ConsumeTimeSeries(ctx, countName, Count, ts, count, tags, host) } else if dx, ok := t.prevPts.putAndGetDiff(countName, tags, ts, count); ok { - ms = append(ms, metrics.NewCount(countName, ts, dx, tags)) + consumer.ConsumeTimeSeries(ctx, countName, Count, ts, dx, tags, host) } } @@ -255,21 +270,20 @@ func (t *Translator) mapHistogramMetrics(name string, slice pdata.HistogramDataP sumName := fmt.Sprintf("%s.sum", name) if !t.isSkippable(sumName, p.Sum()) { if delta { - ms = append(ms, metrics.NewCount(sumName, ts, sum, tags)) + consumer.ConsumeTimeSeries(ctx, sumName, Count, ts, sum, tags, host) } else if dx, ok := t.prevPts.putAndGetDiff(sumName, tags, ts, sum); ok { - ms = append(ms, metrics.NewCount(sumName, ts, dx, tags)) + consumer.ConsumeTimeSeries(ctx, sumName, Count, ts, dx, tags, host) } } } switch t.cfg.HistConfig.Mode { case histogramModeCounters: - ms = append(ms, t.getLegacyBuckets(name, p, delta, tags)...) + t.getLegacyBuckets(ctx, consumer, name, p, delta, tags, host) case histogramModeDistributions: - sl = append(sl, t.getSketchBuckets(name, ts, p, true, tags)) + t.getSketchBuckets(ctx, consumer, name, ts, p, true, tags, host) } } - return } // formatFloat formats a float number as close as possible to what @@ -300,9 +314,15 @@ func getQuantileTag(quantile float64) string { } // mapSummaryMetrics maps summary datapoints into Datadog metrics -func (t *Translator) mapSummaryMetrics(name string, slice pdata.SummaryDataPointSlice, attrTags []string) []datadog.Metric { - // Allocate assuming none are nil and no quantiles - ms := make([]datadog.Metric, 0, 2*slice.Len()) +func (t *Translator) mapSummaryMetrics( + ctx context.Context, + consumer TimeSeriesConsumer, + name string, + slice pdata.SummaryDataPointSlice, + attrTags []string, + host string, +) { + for i := 0; i < slice.Len(); i++ { p := slice.At(i) ts := uint64(p.Timestamp()) @@ -313,7 +333,7 @@ func (t *Translator) mapSummaryMetrics(name string, slice pdata.SummaryDataPoint { countName := fmt.Sprintf("%s.count", name) if dx, ok := t.prevPts.putAndGetDiff(countName, tags, ts, float64(p.Count())); ok && !t.isSkippable(countName, dx) { - ms = append(ms, metrics.NewCount(countName, ts, dx, tags)) + consumer.ConsumeTimeSeries(ctx, countName, Count, ts, dx, tags, host) } } @@ -321,7 +341,7 @@ func (t *Translator) mapSummaryMetrics(name string, slice pdata.SummaryDataPoint sumName := fmt.Sprintf("%s.sum", name) if !t.isSkippable(sumName, p.Sum()) { if dx, ok := t.prevPts.putAndGetDiff(sumName, tags, ts, p.Sum()); ok { - ms = append(ms, metrics.NewCount(sumName, ts, dx, tags)) + consumer.ConsumeTimeSeries(ctx, sumName, Count, ts, dx, tags, host) } } } @@ -337,20 +357,15 @@ func (t *Translator) mapSummaryMetrics(name string, slice pdata.SummaryDataPoint } quantileTags := append(tags, getQuantileTag(q.Quantile())) - ms = append(ms, - metrics.NewGauge(fullName, ts, q.Value(), quantileTags), - ) + consumer.ConsumeTimeSeries(ctx, fullName, Gauge, ts, q.Value(), quantileTags, host) } } } - return ms } // MapMetrics maps OTLP metrics into the DataDog format -func (t *Translator) MapMetrics(md pdata.Metrics) (series []datadog.Metric, sl sketches.SketchSeriesList) { - pushTime := uint64(time.Now().UTC().UnixNano()) +func (t *Translator) MapMetrics(ctx context.Context, md pdata.Metrics, consumer Consumer) { rms := md.ResourceMetrics() - seenHosts := make(map[string]struct{}) for i := 0; i < rms.Len(); i++ { rm := rms.At(i) @@ -370,7 +385,10 @@ func (t *Translator) MapMetrics(md pdata.Metrics) (series []datadog.Metric, sl s host = fallbackHost } } - seenHosts[host] = struct{}{} + + if c, ok := consumer.(HostConsumer); ok { + c.ConsumeHost(host) + } ilms := rm.InstrumentationLibraryMetrics() for j := 0; j < ilms.Len(); j++ { @@ -378,21 +396,19 @@ func (t *Translator) MapMetrics(md pdata.Metrics) (series []datadog.Metric, sl s metricsArray := ilm.Metrics() for k := 0; k < metricsArray.Len(); k++ { md := metricsArray.At(k) - var datapoints []datadog.Metric - var sketchesPoints sketches.SketchSeriesList switch md.DataType() { case pdata.MetricDataTypeGauge: - datapoints = t.mapNumberMetrics(md.Name(), metrics.Gauge, md.Gauge().DataPoints(), attributeTags) + t.mapNumberMetrics(ctx, consumer, md.Name(), Gauge, md.Gauge().DataPoints(), attributeTags, host) case pdata.MetricDataTypeSum: switch md.Sum().AggregationTemporality() { case pdata.AggregationTemporalityCumulative: if t.cfg.SendMonotonic && isCumulativeMonotonic(md) { - datapoints = t.mapNumberMonotonicMetrics(md.Name(), md.Sum().DataPoints(), attributeTags) + t.mapNumberMonotonicMetrics(ctx, consumer, md.Name(), md.Sum().DataPoints(), attributeTags, host) } else { - datapoints = t.mapNumberMetrics(md.Name(), metrics.Gauge, md.Sum().DataPoints(), attributeTags) + t.mapNumberMetrics(ctx, consumer, md.Name(), Gauge, md.Sum().DataPoints(), attributeTags, host) } case pdata.AggregationTemporalityDelta: - datapoints = t.mapNumberMetrics(md.Name(), metrics.Count, md.Sum().DataPoints(), attributeTags) + t.mapNumberMetrics(ctx, consumer, md.Name(), Count, md.Sum().DataPoints(), attributeTags, host) default: // pdata.AggregationTemporalityUnspecified or any other not supported type t.logger.Debug("Unknown or unsupported aggregation temporality", zap.String(metricName, md.Name()), @@ -404,7 +420,7 @@ func (t *Translator) MapMetrics(md pdata.Metrics) (series []datadog.Metric, sl s switch md.Histogram().AggregationTemporality() { case pdata.AggregationTemporalityCumulative, pdata.AggregationTemporalityDelta: delta := md.Histogram().AggregationTemporality() == pdata.AggregationTemporalityDelta - datapoints, sketchesPoints = t.mapHistogramMetrics(md.Name(), md.Histogram().DataPoints(), delta, attributeTags) + t.mapHistogramMetrics(ctx, consumer, md.Name(), md.Histogram().DataPoints(), delta, attributeTags, host) default: // pdata.AggregationTemporalityUnspecified or any other not supported type t.logger.Debug("Unknown or unsupported aggregation temporality", zap.String("metric name", md.Name()), @@ -413,31 +429,12 @@ func (t *Translator) MapMetrics(md pdata.Metrics) (series []datadog.Metric, sl s continue } case pdata.MetricDataTypeSummary: - datapoints = t.mapSummaryMetrics(md.Name(), md.Summary().DataPoints(), attributeTags) + t.mapSummaryMetrics(ctx, consumer, md.Name(), md.Summary().DataPoints(), attributeTags, host) default: // pdata.MetricDataTypeNone or any other not supported type t.logger.Debug("Unknown or unsupported metric type", zap.String(metricName, md.Name()), zap.Any("data type", md.DataType())) continue } - - for i := range datapoints { - datapoints[i].SetHost(host) - } - - for i := range sl { - sl[i].Host = host - } - - series = append(series, datapoints...) - sl = append(sl, sketchesPoints...) } } } - - for host := range seenHosts { - // Report the host as running - runningMetric := metrics.DefaultMetrics("metrics", host, pushTime, t.buildInfo) - series = append(series, runningMetric...) - } - - return } diff --git a/exporter/datadogexporter/internal/translator/metrics_translator_test.go b/exporter/datadogexporter/internal/translator/metrics_translator_test.go index 1621777273de..f143fa5318fa 100644 --- a/exporter/datadogexporter/internal/translator/metrics_translator_test.go +++ b/exporter/datadogexporter/internal/translator/metrics_translator_test.go @@ -20,20 +20,16 @@ import ( "testing" "time" + "github.com/DataDog/datadog-agent/pkg/quantile" gocache "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/model/pdata" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" - "gopkg.in/zorkian/go-datadog-api.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics" ) var defaultCfg = config.MetricsConfig{ @@ -44,19 +40,6 @@ var defaultCfg = config.MetricsConfig{ }, } -func TestMetricValue(t *testing.T) { - var ( - name = "name" - value = math.Pi - ts = uint64(time.Now().UnixNano()) - tags = []string{"tool:opentelemetry", "version:0.1.0"} - ) - - metric := metrics.NewGauge(name, ts, value, tags) - assert.Equal(t, string(metrics.Gauge), metric.GetType()) - assert.Equal(t, tags, metric.Tags) -} - func TestGetTags(t *testing.T) { attributes := pdata.NewAttributeMapFromMap(map[string]pdata.AttributeValue{ "key1": pdata.NewAttributeValueString("val1"), @@ -131,15 +114,51 @@ func (t testProvider) Hostname(context.Context) (string, error) { } func newTranslator(logger *zap.Logger, cfg config.MetricsConfig) *Translator { - params := component.ExporterCreateSettings{ - BuildInfo: component.BuildInfo{ - Version: "1.0", - }, - TelemetrySettings: component.TelemetrySettings{ - Logger: logger, + return New(newTestCache(), logger, cfg, testProvider("fallbackHostname")) +} + +type metric struct { + name string + typ MetricDataType + timestamp uint64 + value float64 + tags []string + host string +} + +var _ TimeSeriesConsumer = (*mockTimeSeriesConsumer)(nil) + +type mockTimeSeriesConsumer struct { + metrics []metric +} + +func (m *mockTimeSeriesConsumer) ConsumeTimeSeries( + _ context.Context, + name string, + typ MetricDataType, + ts uint64, + val float64, + tags []string, + host string, +) { + m.metrics = append(m.metrics, + metric{ + name: name, + typ: typ, + timestamp: ts, + value: val, + tags: tags, + host: host, }, - } - return New(newTestCache(), params, cfg, testProvider("fallbackHostname")) + ) +} + +func newGauge(name string, ts uint64, val float64, tags []string) metric { + return metric{name: name, typ: Gauge, timestamp: ts, value: val, tags: tags} +} + +func newCount(name string, ts uint64, val float64, tags []string) metric { + return metric{name: name, typ: Count, timestamp: ts, value: val, tags: tags} } func TestMapIntMetrics(t *testing.T) { @@ -148,22 +167,29 @@ func TestMapIntMetrics(t *testing.T) { point := slice.AppendEmpty() point.SetIntVal(17) point.SetTimestamp(ts) + ctx := context.Background() tr := newTranslator(zap.NewNop(), config.MetricsConfig{}) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "int64.test", Gauge, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMetrics("int64.test", metrics.Gauge, slice, []string{}), - []datadog.Metric{metrics.NewGauge("int64.test", uint64(ts), 17, []string{})}, + consumer.metrics, + []metric{newGauge("int64.test", uint64(ts), 17, []string{})}, ) + consumer = &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "int64.delta.test", Count, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMetrics("int64.delta.test", metrics.Count, slice, []string{}), - []datadog.Metric{metrics.NewCount("int64.delta.test", uint64(ts), 17, []string{})}, + consumer.metrics, + []metric{newCount("int64.delta.test", uint64(ts), 17, []string{})}, ) // With attribute tags + consumer = &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "int64.test", Gauge, slice, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, - tr.mapNumberMetrics("int64.test", metrics.Gauge, slice, []string{"attribute_tag:attribute_value"}), - []datadog.Metric{metrics.NewGauge("int64.test", uint64(ts), 17, []string{"attribute_tag:attribute_value"})}, + consumer.metrics, + []metric{newGauge("int64.test", uint64(ts), 17, []string{"attribute_tag:attribute_value"})}, ) } @@ -173,22 +199,29 @@ func TestMapDoubleMetrics(t *testing.T) { point := slice.AppendEmpty() point.SetDoubleVal(math.Pi) point.SetTimestamp(ts) + ctx := context.Background() tr := newTranslator(zap.NewNop(), config.MetricsConfig{}) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "float64.test", Gauge, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMetrics("float64.test", metrics.Gauge, slice, []string{}), - []datadog.Metric{metrics.NewGauge("float64.test", uint64(ts), math.Pi, []string{})}, + consumer.metrics, + []metric{newGauge("float64.test", uint64(ts), math.Pi, []string{})}, ) + consumer = &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "float64.delta.test", Count, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMetrics("float64.delta.test", metrics.Count, slice, []string{}), - []datadog.Metric{metrics.NewCount("float64.delta.test", uint64(ts), math.Pi, []string{})}, + consumer.metrics, + []metric{newCount("float64.delta.test", uint64(ts), math.Pi, []string{})}, ) // With attribute tags + consumer = &mockTimeSeriesConsumer{} + tr.mapNumberMetrics(ctx, consumer, "float64.test", Gauge, slice, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, - tr.mapNumberMetrics("float64.test", metrics.Gauge, slice, []string{"attribute_tag:attribute_value"}), - []datadog.Metric{metrics.NewGauge("float64.test", uint64(ts), math.Pi, []string{"attribute_tag:attribute_value"})}, + consumer.metrics, + []metric{newGauge("float64.test", uint64(ts), math.Pi, []string{"attribute_tag:attribute_value"})}, ) } @@ -216,15 +249,17 @@ func TestMapIntMonotonicMetrics(t *testing.T) { // Map to Datadog format metricName := "metric.example" - expected := make([]datadog.Metric, len(deltas)) + expected := make([]metric, len(deltas)) for i, val := range deltas { - expected[i] = metrics.NewCount(metricName, uint64(seconds(i+1)), float64(val), []string{}) + expected[i] = newCount(metricName, uint64(seconds(i+1)), float64(val), []string{}) } + ctx := context.Background() + consumer := &mockTimeSeriesConsumer{} tr := newTranslator(zap.NewNop(), defaultCfg) - output := tr.mapNumberMonotonicMetrics(metricName, slice, []string{}) + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") - assert.ElementsMatch(t, output, expected) + assert.ElementsMatch(t, expected, consumer.metrics) } func TestMapIntMonotonicDifferentDimensions(t *testing.T) { @@ -259,14 +294,17 @@ func TestMapIntMonotonicDifferentDimensions(t *testing.T) { point.SetTimestamp(seconds(1)) point.Attributes().InsertString("key1", "valB") + ctx := context.Background() tr := newTranslator(zap.NewNop(), defaultCfg) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), - []datadog.Metric{ - metrics.NewCount(metricName, uint64(seconds(1)), 20, []string{}), - metrics.NewCount(metricName, uint64(seconds(1)), 30, []string{"key1:valA"}), - metrics.NewCount(metricName, uint64(seconds(1)), 40, []string{"key1:valB"}), + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(1)), 20, []string{}), + newCount(metricName, uint64(seconds(1)), 30, []string{"key1:valA"}), + newCount(metricName, uint64(seconds(1)), 40, []string{"key1:valB"}), }, ) } @@ -283,12 +321,15 @@ func TestMapIntMonotonicWithReboot(t *testing.T) { point.SetIntVal(val) } + ctx := context.Background() tr := newTranslator(zap.NewNop(), defaultCfg) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), - []datadog.Metric{ - metrics.NewCount(metricName, uint64(seconds(1)), 30, []string{}), - metrics.NewCount(metricName, uint64(seconds(3)), 20, []string{}), + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(1)), 30, []string{}), + newCount(metricName, uint64(seconds(3)), 20, []string{}), }, ) } @@ -307,12 +348,15 @@ func TestMapIntMonotonicOutOfOrder(t *testing.T) { point.SetIntVal(val) } + ctx := context.Background() tr := newTranslator(zap.NewNop(), defaultCfg) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), - []datadog.Metric{ - metrics.NewCount(metricName, uint64(seconds(2)), 2, []string{}), - metrics.NewCount(metricName, uint64(seconds(3)), 1, []string{}), + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(2)), 2, []string{}), + newCount(metricName, uint64(seconds(3)), 1, []string{}), }, ) } @@ -336,15 +380,17 @@ func TestMapDoubleMonotonicMetrics(t *testing.T) { // Map to Datadog format metricName := "metric.example" - expected := make([]datadog.Metric, len(deltas)) + expected := make([]metric, len(deltas)) for i, val := range deltas { - expected[i] = metrics.NewCount(metricName, uint64(seconds(i+1)), val, []string{}) + expected[i] = newCount(metricName, uint64(seconds(i+1)), val, []string{}) } + ctx := context.Background() + consumer := &mockTimeSeriesConsumer{} tr := newTranslator(zap.NewNop(), defaultCfg) - output := tr.mapNumberMonotonicMetrics(metricName, slice, []string{}) + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") - assert.ElementsMatch(t, expected, output) + assert.ElementsMatch(t, expected, consumer.metrics) } func TestMapDoubleMonotonicDifferentDimensions(t *testing.T) { @@ -379,14 +425,17 @@ func TestMapDoubleMonotonicDifferentDimensions(t *testing.T) { point.SetTimestamp(seconds(1)) point.Attributes().InsertString("key1", "valB") + ctx := context.Background() tr := newTranslator(zap.NewNop(), defaultCfg) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), - []datadog.Metric{ - metrics.NewCount(metricName, uint64(seconds(1)), 20, []string{}), - metrics.NewCount(metricName, uint64(seconds(1)), 30, []string{"key1:valA"}), - metrics.NewCount(metricName, uint64(seconds(1)), 40, []string{"key1:valB"}), + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(1)), 20, []string{}), + newCount(metricName, uint64(seconds(1)), 30, []string{"key1:valA"}), + newCount(metricName, uint64(seconds(1)), 40, []string{"key1:valB"}), }, ) } @@ -403,12 +452,15 @@ func TestMapDoubleMonotonicWithReboot(t *testing.T) { point.SetDoubleVal(val) } + ctx := context.Background() tr := newTranslator(zap.NewNop(), defaultCfg) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), - []datadog.Metric{ - metrics.NewCount(metricName, uint64(seconds(2)), 30, []string{}), - metrics.NewCount(metricName, uint64(seconds(6)), 20, []string{}), + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(2)), 30, []string{}), + newCount(metricName, uint64(seconds(6)), 20, []string{}), }, ) } @@ -427,16 +479,28 @@ func TestMapDoubleMonotonicOutOfOrder(t *testing.T) { point.SetDoubleVal(val) } + ctx := context.Background() tr := newTranslator(zap.NewNop(), defaultCfg) + consumer := &mockTimeSeriesConsumer{} + tr.mapNumberMonotonicMetrics(ctx, consumer, metricName, slice, []string{}, "") assert.ElementsMatch(t, - tr.mapNumberMonotonicMetrics(metricName, slice, []string{}), - []datadog.Metric{ - metrics.NewCount(metricName, uint64(seconds(2)), 2, []string{}), - metrics.NewCount(metricName, uint64(seconds(3)), 1, []string{}), + consumer.metrics, + []metric{ + newCount(metricName, uint64(seconds(2)), 2, []string{}), + newCount(metricName, uint64(seconds(3)), 1, []string{}), }, ) } +type mockFullConsumer struct { + mockTimeSeriesConsumer + anySketch bool +} + +func (c *mockFullConsumer) ConsumeSketch(_ context.Context, _ string, _ uint64, _ *quantile.Sketch, _ []string, _ string) { + c.anySketch = true +} + func TestMapDeltaHistogramMetrics(t *testing.T) { ts := pdata.NewTimestampFromTime(time.Now()) slice := pdata.NewHistogramDataPointSlice() @@ -447,61 +511,54 @@ func TestMapDeltaHistogramMetrics(t *testing.T) { point.SetExplicitBounds([]float64{0}) point.SetTimestamp(ts) - noBuckets := []datadog.Metric{ - metrics.NewCount("doubleHist.test.count", uint64(ts), 20, []string{}), - metrics.NewCount("doubleHist.test.sum", uint64(ts), math.Pi, []string{}), + noBuckets := []metric{ + newCount("doubleHist.test.count", uint64(ts), 20, []string{}), + newCount("doubleHist.test.sum", uint64(ts), math.Pi, []string{}), } - buckets := []datadog.Metric{ - metrics.NewCount("doubleHist.test.bucket", uint64(ts), 2, []string{"lower_bound:-inf", "upper_bound:0"}), - metrics.NewCount("doubleHist.test.bucket", uint64(ts), 18, []string{"lower_bound:0", "upper_bound:inf"}), + buckets := []metric{ + newCount("doubleHist.test.bucket", uint64(ts), 2, []string{"lower_bound:-inf", "upper_bound:0"}), + newCount("doubleHist.test.bucket", uint64(ts), 18, []string{"lower_bound:0", "upper_bound:inf"}), } + ctx := context.Background() tr := newTranslator(zap.NewNop(), defaultCfg) delta := true tr.cfg.HistConfig.Mode = histogramModeNoBuckets - res, sl := tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{}) - require.Empty(t, sl) - assert.ElementsMatch(t, - res, // No buckets - noBuckets, - ) + consumer := &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.ElementsMatch(t, noBuckets, consumer.metrics) + assert.False(t, consumer.anySketch) tr.cfg.HistConfig.Mode = histogramModeCounters - res, sl = tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{}) - require.Empty(t, sl) - assert.ElementsMatch(t, - res, // buckets - append(noBuckets, buckets...), - ) + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.ElementsMatch(t, append(noBuckets, buckets...), consumer.metrics) + assert.False(t, consumer.anySketch) // With attribute tags - noBucketsAttributeTags := []datadog.Metric{ - metrics.NewCount("doubleHist.test.count", uint64(ts), 20, []string{"attribute_tag:attribute_value"}), - metrics.NewCount("doubleHist.test.sum", uint64(ts), math.Pi, []string{"attribute_tag:attribute_value"}), + noBucketsAttributeTags := []metric{ + newCount("doubleHist.test.count", uint64(ts), 20, []string{"attribute_tag:attribute_value"}), + newCount("doubleHist.test.sum", uint64(ts), math.Pi, []string{"attribute_tag:attribute_value"}), } - bucketsAttributeTags := []datadog.Metric{ - metrics.NewCount("doubleHist.test.bucket", uint64(ts), 2, []string{"attribute_tag:attribute_value", "lower_bound:-inf", "upper_bound:0"}), - metrics.NewCount("doubleHist.test.bucket", uint64(ts), 18, []string{"attribute_tag:attribute_value", "lower_bound:0", "upper_bound:inf"}), + bucketsAttributeTags := []metric{ + newCount("doubleHist.test.bucket", uint64(ts), 2, []string{"attribute_tag:attribute_value", "lower_bound:-inf", "upper_bound:0"}), + newCount("doubleHist.test.bucket", uint64(ts), 18, []string{"attribute_tag:attribute_value", "lower_bound:0", "upper_bound:inf"}), } tr.cfg.HistConfig.Mode = histogramModeNoBuckets - res, sl = tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}) - require.Empty(t, sl) - assert.ElementsMatch(t, - res, // No buckets - noBucketsAttributeTags, - ) + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") + assert.ElementsMatch(t, noBucketsAttributeTags, consumer.metrics) + assert.False(t, consumer.anySketch) tr.cfg.HistConfig.Mode = histogramModeCounters - res, sl = tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}) - require.Empty(t, sl) - assert.ElementsMatch(t, - res, // buckets - append(noBucketsAttributeTags, bucketsAttributeTags...), - ) + consumer = &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{"attribute_tag:attribute_value"}, "") + assert.ElementsMatch(t, append(noBucketsAttributeTags, bucketsAttributeTags...), consumer.metrics) + assert.False(t, consumer.anySketch) } func TestMapCumulativeHistogramMetrics(t *testing.T) { @@ -520,21 +577,23 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) { point.SetExplicitBounds([]float64{0}) point.SetTimestamp(seconds(2)) - expected := []datadog.Metric{ - metrics.NewCount("doubleHist.test.count", uint64(seconds(2)), 30, []string{}), - metrics.NewCount("doubleHist.test.sum", uint64(seconds(2)), 20, []string{}), - metrics.NewCount("doubleHist.test.bucket", uint64(seconds(2)), 11, []string{"lower_bound:-inf", "upper_bound:0"}), - metrics.NewCount("doubleHist.test.bucket", uint64(seconds(2)), 2, []string{"lower_bound:0", "upper_bound:inf"}), + expected := []metric{ + newCount("doubleHist.test.count", uint64(seconds(2)), 30, []string{}), + newCount("doubleHist.test.sum", uint64(seconds(2)), 20, []string{}), + newCount("doubleHist.test.bucket", uint64(seconds(2)), 11, []string{"lower_bound:-inf", "upper_bound:0"}), + newCount("doubleHist.test.bucket", uint64(seconds(2)), 2, []string{"lower_bound:0", "upper_bound:inf"}), } + ctx := context.Background() tr := newTranslator(zap.NewNop(), defaultCfg) delta := false tr.cfg.HistConfig.Mode = histogramModeCounters - res, sl := tr.mapHistogramMetrics("doubleHist.test", slice, delta, []string{}) - require.Empty(t, sl) + consumer := &mockFullConsumer{} + tr.mapHistogramMetrics(ctx, consumer, "doubleHist.test", slice, delta, []string{}, "") + assert.False(t, consumer.anySketch) assert.ElementsMatch(t, - res, + consumer.metrics, expected, ) } @@ -597,93 +656,61 @@ func TestMapSummaryMetrics(t *testing.T) { c := newTestCache() c.cache.Set(c.metricDimensionsToMapKey("summary.example.count", tags), numberCounter{0, 1}, gocache.NoExpiration) c.cache.Set(c.metricDimensionsToMapKey("summary.example.sum", tags), numberCounter{0, 1}, gocache.NoExpiration) - return New(c, componenttest.NewNopExporterCreateSettings(), config.MetricsConfig{Quantiles: quantiles}, testProvider("fallbackHostname")) + return New(c, zap.NewNop(), config.MetricsConfig{Quantiles: quantiles}, testProvider("fallbackHostname")) } - noQuantiles := []datadog.Metric{ - metrics.NewCount("summary.example.count", uint64(ts), 100, []string{}), - metrics.NewCount("summary.example.sum", uint64(ts), 10_000, []string{}), + noQuantiles := []metric{ + newCount("summary.example.count", uint64(ts), 100, []string{}), + newCount("summary.example.sum", uint64(ts), 10_000, []string{}), } - quantiles := []datadog.Metric{ - metrics.NewGauge("summary.example.quantile", uint64(ts), 0, []string{"quantile:0"}), - metrics.NewGauge("summary.example.quantile", uint64(ts), 100, []string{"quantile:0.5"}), - metrics.NewGauge("summary.example.quantile", uint64(ts), 500, []string{"quantile:0.999"}), - metrics.NewGauge("summary.example.quantile", uint64(ts), 600, []string{"quantile:1.0"}), + quantiles := []metric{ + newGauge("summary.example.quantile", uint64(ts), 0, []string{"quantile:0"}), + newGauge("summary.example.quantile", uint64(ts), 100, []string{"quantile:0.5"}), + newGauge("summary.example.quantile", uint64(ts), 500, []string{"quantile:0.999"}), + newGauge("summary.example.quantile", uint64(ts), 600, []string{"quantile:1.0"}), } + ctx := context.Background() tr := newTranslator([]string{}, false) + consumer := &mockTimeSeriesConsumer{} + tr.mapSummaryMetrics(ctx, consumer, "summary.example", slice, []string{}, "") assert.ElementsMatch(t, - tr.mapSummaryMetrics("summary.example", slice, []string{}), + consumer.metrics, noQuantiles, ) tr = newTranslator([]string{}, true) + consumer = &mockTimeSeriesConsumer{} + tr.mapSummaryMetrics(ctx, consumer, "summary.example", slice, []string{}, "") assert.ElementsMatch(t, - tr.mapSummaryMetrics("summary.example", slice, []string{}), + consumer.metrics, append(noQuantiles, quantiles...), ) - noQuantilesAttr := []datadog.Metric{ - metrics.NewCount("summary.example.count", uint64(ts), 100, []string{"attribute_tag:attribute_value"}), - metrics.NewCount("summary.example.sum", uint64(ts), 10_000, []string{"attribute_tag:attribute_value"}), + noQuantilesAttr := []metric{ + newCount("summary.example.count", uint64(ts), 100, []string{"attribute_tag:attribute_value"}), + newCount("summary.example.sum", uint64(ts), 10_000, []string{"attribute_tag:attribute_value"}), } - quantilesAttr := []datadog.Metric{ - metrics.NewGauge("summary.example.quantile", uint64(ts), 0, []string{"attribute_tag:attribute_value", "quantile:0"}), - metrics.NewGauge("summary.example.quantile", uint64(ts), 100, []string{"attribute_tag:attribute_value", "quantile:0.5"}), - metrics.NewGauge("summary.example.quantile", uint64(ts), 500, []string{"attribute_tag:attribute_value", "quantile:0.999"}), - metrics.NewGauge("summary.example.quantile", uint64(ts), 600, []string{"attribute_tag:attribute_value", "quantile:1.0"}), + quantilesAttr := []metric{ + newGauge("summary.example.quantile", uint64(ts), 0, []string{"attribute_tag:attribute_value", "quantile:0"}), + newGauge("summary.example.quantile", uint64(ts), 100, []string{"attribute_tag:attribute_value", "quantile:0.5"}), + newGauge("summary.example.quantile", uint64(ts), 500, []string{"attribute_tag:attribute_value", "quantile:0.999"}), + newGauge("summary.example.quantile", uint64(ts), 600, []string{"attribute_tag:attribute_value", "quantile:1.0"}), } tr = newTranslator([]string{"attribute_tag:attribute_value"}, false) + consumer = &mockTimeSeriesConsumer{} + tr.mapSummaryMetrics(ctx, consumer, "summary.example", slice, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, - tr.mapSummaryMetrics("summary.example", slice, []string{"attribute_tag:attribute_value"}), + consumer.metrics, noQuantilesAttr, ) tr = newTranslator([]string{"attribute_tag:attribute_value"}, true) + consumer = &mockTimeSeriesConsumer{} + tr.mapSummaryMetrics(ctx, consumer, "summary.example", slice, []string{"attribute_tag:attribute_value"}, "") assert.ElementsMatch(t, - tr.mapSummaryMetrics("summary.example", slice, []string{"attribute_tag:attribute_value"}), + consumer.metrics, append(noQuantilesAttr, quantilesAttr...), ) } -func TestRunningMetrics(t *testing.T) { - ms := pdata.NewMetrics() - rms := ms.ResourceMetrics() - - rm := rms.AppendEmpty() - resAttrs := rm.Resource().Attributes() - resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-1")) - - rm = rms.AppendEmpty() - resAttrs = rm.Resource().Attributes() - resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-1")) - - rm = rms.AppendEmpty() - resAttrs = rm.Resource().Attributes() - resAttrs.Insert(attributes.AttributeDatadogHostname, pdata.NewAttributeValueString("resource-hostname-2")) - - rms.AppendEmpty() - - cfg := config.MetricsConfig{} - tr := newTranslator(zap.NewNop(), cfg) - - series, sl := tr.MapMetrics(ms) - require.Empty(t, sl) - - runningHostnames := []string{} - - for _, metric := range series { - if *metric.Metric == "otel.datadog_exporter.metrics.running" { - if metric.Host != nil { - runningHostnames = append(runningHostnames, *metric.Host) - } - } - } - - assert.ElementsMatch(t, - runningHostnames, - []string{"fallbackHostname", "resource-hostname-1", "resource-hostname-2"}, - ) - -} - const ( testHostname = "res-hostname" ) @@ -850,25 +877,15 @@ func createTestMetrics() pdata.Metrics { return md } -func removeRunningMetrics(series []datadog.Metric) []datadog.Metric { - filtered := []datadog.Metric{} - for _, m := range series { - if m.GetMetric() != "otel.datadog_exporter.metrics.running" { - filtered = append(filtered, m) - } - } - return filtered -} - -func testGauge(name string, val float64) datadog.Metric { - m := metrics.NewGauge(name, 0, val, []string{}) - m.SetHost(testHostname) +func testGauge(name string, val float64) metric { + m := newGauge(name, 0, val, []string{}) + m.host = testHostname return m } -func testCount(name string, val float64, seconds uint64) datadog.Metric { - m := metrics.NewCount(name, seconds*1e9, val, []string{}) - m.SetHost(testHostname) +func testCount(name string, val float64, seconds uint64) metric { + m := newCount(name, seconds*1e9, val, []string{}) + m.host = testHostname return m } @@ -877,12 +894,13 @@ func TestMapMetrics(t *testing.T) { core, observed := observer.New(zapcore.DebugLevel) testLogger := zap.New(core) + ctx := context.Background() + consumer := &mockFullConsumer{} tr := newTranslator(testLogger, defaultCfg) - series, sl := tr.MapMetrics(md) - require.Empty(t, sl) + tr.MapMetrics(ctx, md, consumer) + assert.False(t, consumer.anySketch) - filtered := removeRunningMetrics(series) - assert.ElementsMatch(t, filtered, []datadog.Metric{ + assert.ElementsMatch(t, consumer.metrics, []metric{ testGauge("int.gauge", 1), testGauge("double.gauge", math.Pi), testCount("int.delta.sum", 2, 0), @@ -1000,12 +1018,13 @@ func TestNaNMetrics(t *testing.T) { core, observed := observer.New(zapcore.DebugLevel) testLogger := zap.New(core) + ctx := context.Background() tr := newTranslator(testLogger, defaultCfg) - series, sl := tr.MapMetrics(md) - require.Empty(t, sl) + consumer := &mockFullConsumer{} + tr.MapMetrics(ctx, md, consumer) + assert.False(t, consumer.anySketch) - filtered := removeRunningMetrics(series) - assert.ElementsMatch(t, filtered, []datadog.Metric{ + assert.ElementsMatch(t, consumer.metrics, []metric{ testCount("nan.histogram.count", 20, 0), testCount("nan.summary.count", 100, 2), }) diff --git a/exporter/datadogexporter/internal/translator/sketches_test.go b/exporter/datadogexporter/internal/translator/sketches_test.go index 9a83b4131a70..63a38094cc06 100644 --- a/exporter/datadogexporter/internal/translator/sketches_test.go +++ b/exporter/datadogexporter/internal/translator/sketches_test.go @@ -15,6 +15,7 @@ package translator import ( + "context" "fmt" "math" "testing" @@ -27,6 +28,24 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" ) +var _ SketchConsumer = (*sketchConsumer)(nil) + +type sketchConsumer struct { + sk *quantile.Sketch +} + +// ConsumeSketch implements the translator.Consumer interface. +func (c *sketchConsumer) ConsumeSketch( + _ context.Context, + _ string, + _ uint64, + sketch *quantile.Sketch, + _ []string, + _ string, +) { + c.sk = sketch +} + func TestHistogramSketches(t *testing.T) { N := 1_000 M := 50_000.0 @@ -87,12 +106,15 @@ func TestHistogramSketches(t *testing.T) { defaultEps := 1.0 / 128.0 tol := 1e-8 cfg := quantile.Default() + ctx := context.Background() tr := newTranslator(zap.NewNop(), config.MetricsConfig{}) for _, test := range tests { t.Run(test.name, func(t *testing.T) { p := fromCDF(test.cdf) - sk := tr.getSketchBuckets("test", 0, p, true, []string{}).Points[0].Sketch + consumer := &sketchConsumer{} + tr.getSketchBuckets(ctx, consumer, "test", 0, p, true, []string{}, "") + sk := consumer.sk // Check the minimum is 0.0 assert.Equal(t, 0.0, sk.Quantile(cfg, 0)) @@ -177,11 +199,14 @@ func TestInfiniteBounds(t *testing.T) { }, } + ctx := context.Background() tr := newTranslator(zap.NewNop(), config.MetricsConfig{}) for _, testInstance := range tests { t.Run(testInstance.name, func(t *testing.T) { p := testInstance.getHist() - sk := tr.getSketchBuckets("test", 0, p, true, []string{}).Points[0].Sketch + consumer := &sketchConsumer{} + tr.getSketchBuckets(ctx, consumer, "test", 0, p, true, []string{}, "") + sk := consumer.sk assert.InDelta(t, sk.Basic.Sum, p.Sum(), 1) assert.Equal(t, uint64(sk.Basic.Cnt), p.Count()) }) diff --git a/exporter/datadogexporter/metrics_exporter.go b/exporter/datadogexporter/metrics_exporter.go index 926a006bdb7e..85e1b34f79c0 100644 --- a/exporter/datadogexporter/metrics_exporter.go +++ b/exporter/datadogexporter/metrics_exporter.go @@ -66,7 +66,7 @@ func newMetricsExporter(ctx context.Context, params component.ExporterCreateSett sweepInterval = cfg.Metrics.DeltaTTL / 2 } prevPts := translator.NewTTLCache(sweepInterval, cfg.Metrics.DeltaTTL) - tr := translator.New(prevPts, params, cfg.Metrics, &hostProvider{params.Logger, cfg}) + tr := translator.New(prevPts, params.Logger, cfg.Metrics, &hostProvider{params.Logger, cfg}) return &metricsExporter{params, cfg, ctx, client, tr} } @@ -115,7 +115,10 @@ func (exp *metricsExporter) PushMetricsData(ctx context.Context, md pdata.Metric }) } - ms, sl := exp.tr.MapMetrics(md) + consumer := metrics.NewConsumer() + pushTime := uint64(time.Now().UTC().UnixNano()) + exp.tr.MapMetrics(ctx, md, consumer) + ms, sl := consumer.All(pushTime, exp.params.BuildInfo) metrics.ProcessMetrics(ms, exp.cfg) if len(ms) > 0 { From 8a3d299c6271ae0d013238c3fafd59ef803d5e85 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens <pablo.baeyens@datadoghq.com> Date: Wed, 22 Sep 2021 18:23:01 +0200 Subject: [PATCH 2/5] [exporter/datadogexporter] Add comment for HostConsumer interface --- exporter/datadogexporter/internal/translator/consumer.go | 1 + .../datadogexporter/internal/translator/metrics_translator.go | 1 + 2 files changed, 2 insertions(+) diff --git a/exporter/datadogexporter/internal/translator/consumer.go b/exporter/datadogexporter/internal/translator/consumer.go index 21442dd19885..5cc6fb544a94 100644 --- a/exporter/datadogexporter/internal/translator/consumer.go +++ b/exporter/datadogexporter/internal/translator/consumer.go @@ -63,6 +63,7 @@ type Consumer interface { } // HostConsumer is a hostname consumer. +// It is an optional interface that can be implemented by a Consumer. type HostConsumer interface { // ConsumeHost consumes a hostname. ConsumeHost(host string) diff --git a/exporter/datadogexporter/internal/translator/metrics_translator.go b/exporter/datadogexporter/internal/translator/metrics_translator.go index 5b2ad29598de..c708ebb4ee39 100644 --- a/exporter/datadogexporter/internal/translator/metrics_translator.go +++ b/exporter/datadogexporter/internal/translator/metrics_translator.go @@ -386,6 +386,7 @@ func (t *Translator) MapMetrics(ctx context.Context, md pdata.Metrics, consumer } } + // Track hosts if the consumer is a HostConsumer. if c, ok := consumer.(HostConsumer); ok { c.ConsumeHost(host) } From 7f25329d50cc87f6ee5e195db6ba63e09f126562 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens <pablo.baeyens@datadoghq.com> Date: Thu, 23 Sep 2021 16:35:33 +0200 Subject: [PATCH 3/5] [exporter/datadogexporter] Fix tests after merge --- .../datadogexporter/internal/metrics/consumer.go | 2 +- .../internal/translator/metrics_translator.go | 4 ++-- .../internal/translator/metrics_translator_test.go | 13 +++++++++---- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/exporter/datadogexporter/internal/metrics/consumer.go b/exporter/datadogexporter/internal/metrics/consumer.go index 191785650555..05723fd27c1a 100644 --- a/exporter/datadogexporter/internal/metrics/consumer.go +++ b/exporter/datadogexporter/internal/metrics/consumer.go @@ -17,10 +17,10 @@ package metrics import ( "context" + "github.com/DataDog/datadog-agent/pkg/quantile" "go.opentelemetry.io/collector/component" "gopkg.in/zorkian/go-datadog-api.v2" - "github.com/DataDog/datadog-agent/pkg/quantile" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/sketches" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/translator" ) diff --git a/exporter/datadogexporter/internal/translator/metrics_translator.go b/exporter/datadogexporter/internal/translator/metrics_translator.go index 50aae841d775..ab354c132479 100644 --- a/exporter/datadogexporter/internal/translator/metrics_translator.go +++ b/exporter/datadogexporter/internal/translator/metrics_translator.go @@ -20,10 +20,10 @@ import ( "math" "strconv" + "github.com/DataDog/datadog-agent/pkg/quantile" "go.opentelemetry.io/collector/model/pdata" "go.uber.org/zap" - "github.com/DataDog/datadog-agent/pkg/quantile" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/config" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/attributes" ) @@ -359,7 +359,7 @@ func (t *Translator) mapSummaryMetrics( quantileTags := []string{getQuantileTag(q.Quantile())} quantileTags = append(quantileTags, tags...) - consumer.ConsumeTimeSeries(ctx, fullName, Gauge, ts, q.Value(), quantileTags, host) + consumer.ConsumeTimeSeries(ctx, fullName, Gauge, ts, q.Value(), quantileTags, host) } } } diff --git a/exporter/datadogexporter/internal/translator/metrics_translator_test.go b/exporter/datadogexporter/internal/translator/metrics_translator_test.go index 40d55815840a..dff6784dc943 100644 --- a/exporter/datadogexporter/internal/translator/metrics_translator_test.go +++ b/exporter/datadogexporter/internal/translator/metrics_translator_test.go @@ -601,6 +601,7 @@ func TestMapCumulativeHistogramMetrics(t *testing.T) { func TestLegacyBucketsTags(t *testing.T) { // Test that passing the same tags slice doesn't reuse the slice. cfg := config.MetricsConfig{} + ctx := context.Background() tr := newTranslator(zap.NewNop(), cfg) tags := make([]string, 0, 10) @@ -609,16 +610,20 @@ func TestLegacyBucketsTags(t *testing.T) { pointOne.SetBucketCounts([]uint64{2, 18}) pointOne.SetExplicitBounds([]float64{0}) pointOne.SetTimestamp(seconds(0)) - seriesOne := tr.getLegacyBuckets("test.histogram.one", pointOne, true, tags) + consumer := &mockTimeSeriesConsumer{} + tr.getLegacyBuckets(ctx, consumer, "test.histogram.one", pointOne, true, tags, "") + seriesOne := consumer.metrics pointTwo := pdata.NewHistogramDataPoint() pointTwo.SetBucketCounts([]uint64{2, 18}) pointTwo.SetExplicitBounds([]float64{1}) pointTwo.SetTimestamp(seconds(0)) - seriesTwo := tr.getLegacyBuckets("test.histogram.two", pointTwo, true, tags) + consumer = &mockTimeSeriesConsumer{} + tr.getLegacyBuckets(ctx, consumer, "test.histogram.two", pointTwo, true, tags, "") + seriesTwo := consumer.metrics - assert.ElementsMatch(t, seriesOne[0].Tags, []string{"lower_bound:-inf", "upper_bound:0"}) - assert.ElementsMatch(t, seriesTwo[0].Tags, []string{"lower_bound:-inf", "upper_bound:1.0"}) + assert.ElementsMatch(t, seriesOne[0].tags, []string{"lower_bound:-inf", "upper_bound:0"}) + assert.ElementsMatch(t, seriesTwo[0].tags, []string{"lower_bound:-inf", "upper_bound:1.0"}) } func TestFormatFloat(t *testing.T) { From b9b21d4192eb99268bc9aea3c43eaeb4b9dde06f Mon Sep 17 00:00:00 2001 From: Pablo Baeyens <pablo.baeyens@datadoghq.com> Date: Thu, 30 Sep 2021 15:53:10 +0200 Subject: [PATCH 4/5] Backport datadog-agent changes This adds a comment added originally on: - DataDog/datadog-agent@a258ba7, --- exporter/datadogexporter/internal/translator/consumer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/exporter/datadogexporter/internal/translator/consumer.go b/exporter/datadogexporter/internal/translator/consumer.go index 5cc6fb544a94..87653209eb28 100644 --- a/exporter/datadogexporter/internal/translator/consumer.go +++ b/exporter/datadogexporter/internal/translator/consumer.go @@ -20,6 +20,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/quantile" ) +// MetricDataType is a timeseries-style metric type. type MetricDataType int const ( From dd93635463aa2624fea21374410877a437a51b6d Mon Sep 17 00:00:00 2001 From: Pablo Baeyens <pablo.baeyens@datadoghq.com> Date: Tue, 5 Oct 2021 17:08:09 +0200 Subject: [PATCH 5/5] Fix differences with datadog-agent code --- .../internal/translator/metrics_translator.go | 2 ++ .../internal/translator/metrics_translator_test.go | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/exporter/datadogexporter/internal/translator/metrics_translator.go b/exporter/datadogexporter/internal/translator/metrics_translator.go index 5a291d16998b..1ccd11499ea5 100644 --- a/exporter/datadogexporter/internal/translator/metrics_translator.go +++ b/exporter/datadogexporter/internal/translator/metrics_translator.go @@ -29,12 +29,14 @@ import ( const metricName string = "metric name" +// Translator is a metrics translator. type Translator struct { prevPts *ttlCache logger *zap.Logger cfg translatorConfig } +// New creates a new translator with given options. func New(logger *zap.Logger, options ...Option) (*Translator, error) { cfg := translatorConfig{ HistMode: HistogramModeDistributions, diff --git a/exporter/datadogexporter/internal/translator/metrics_translator_test.go b/exporter/datadogexporter/internal/translator/metrics_translator_test.go index 41a1c4460844..d2c687b2a049 100644 --- a/exporter/datadogexporter/internal/translator/metrics_translator_test.go +++ b/exporter/datadogexporter/internal/translator/metrics_translator_test.go @@ -933,7 +933,8 @@ func TestMapMetrics(t *testing.T) { ctx := context.Background() consumer := &mockFullConsumer{} tr := newTranslator(t, testLogger) - tr.MapMetrics(ctx, md, consumer) + err := tr.MapMetrics(ctx, md, consumer) + require.NoError(t, err) assert.False(t, consumer.anySketch) assert.ElementsMatch(t, consumer.metrics, []metric{ @@ -1057,8 +1058,9 @@ func TestNaNMetrics(t *testing.T) { ctx := context.Background() tr := newTranslator(t, testLogger) consumer := &mockFullConsumer{} - tr.MapMetrics(ctx, md, consumer) + err := tr.MapMetrics(ctx, md, consumer) assert.False(t, consumer.anySketch) + require.NoError(t, err) assert.ElementsMatch(t, consumer.metrics, []metric{ testCount("nan.histogram.count", 20, 0),