From 2023abb1dc5fd437454b84847207bfa8e7539190 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 17 Jul 2023 13:22:49 +1000 Subject: [PATCH 1/2] Improve performance of conversion to Prometheus remote write format --- .../prometheusremotewrite/helper.go | 122 ++++++++++------- .../metrics_to_prw_test.go | 123 ++++++++++++++++++ 2 files changed, 196 insertions(+), 49 deletions(-) create mode 100644 pkg/translator/prometheusremotewrite/metrics_to_prw_test.go diff --git a/pkg/translator/prometheusremotewrite/helper.go b/pkg/translator/prometheusremotewrite/helper.go index b1ad78fc0c7e..68ad2a66500d 100644 --- a/pkg/translator/prometheusremotewrite/helper.go +++ b/pkg/translator/prometheusremotewrite/helper.go @@ -130,7 +130,14 @@ func addExemplar(tsMap map[string]*prompb.TimeSeries, bucketBounds []bucketBound // the label slice should not contain duplicate label names; this method sorts the slice by label name before creating // the signature. func timeSeriesSignature(datatype string, labels *[]prompb.Label) string { + length := len(datatype) + + for _, lb := range *labels { + length += 2 + len(lb.GetName()) + len(lb.GetValue()) + } + b := strings.Builder{} + b.Grow(length) b.WriteString(datatype) sort.Sort(ByLabelName(*labels)) @@ -149,8 +156,22 @@ func timeSeriesSignature(datatype string, labels *[]prompb.Label) string { // Unpaired string value is ignored. String pairs overwrites OTLP labels if collision happens, and the overwrite is // logged. Resultant label names are sanitized. func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, extras ...string) []prompb.Label { + serviceName, haveServiceName := resource.Attributes().Get(conventions.AttributeServiceName) + instance, haveInstanceID := resource.Attributes().Get(conventions.AttributeServiceInstanceID) + + // Calculate the maximum possible number of labels we could return so we can preallocate l + maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 + + if haveServiceName { + maxLabelCount++ + } + + if haveInstanceID { + maxLabelCount++ + } + // map ensures no duplicate label name - l := map[string]prompb.Label{} + l := make(map[string]string, maxLabelCount) // Ensure attributes are sorted by key for consistent merging of keys which // collide when sanitized. @@ -164,33 +185,23 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa for _, label := range labels { var finalKey = prometheustranslator.NormalizeLabel(label.Name) if existingLabel, alreadyExists := l[finalKey]; alreadyExists { - existingLabel.Value = existingLabel.Value + ";" + label.Value - l[finalKey] = existingLabel + l[finalKey] = existingLabel + ";" + label.Value } else { - l[finalKey] = prompb.Label{ - Name: finalKey, - Value: label.Value, - } + l[finalKey] = label.Value } } // Map service.name + service.namespace to job - if serviceName, ok := resource.Attributes().Get(conventions.AttributeServiceName); ok { + if haveServiceName { val := serviceName.AsString() if serviceNamespace, ok := resource.Attributes().Get(conventions.AttributeServiceNamespace); ok { val = fmt.Sprintf("%s/%s", serviceNamespace.AsString(), val) } - l[model.JobLabel] = prompb.Label{ - Name: model.JobLabel, - Value: val, - } + l[model.JobLabel] = val } // Map service.instance.id to instance - if instance, ok := resource.Attributes().Get(conventions.AttributeServiceInstanceID); ok { - l[model.InstanceLabel] = prompb.Label{ - Name: model.InstanceLabel, - Value: instance.AsString(), - } + if haveInstanceID { + l[model.InstanceLabel] = instance.AsString() } for key, value := range externalLabels { // External labels have already been sanitized @@ -198,10 +209,7 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa // Skip external labels if they are overridden by metric attributes continue } - l[key] = prompb.Label{ - Name: key, - Value: value, - } + l[key] = value } for i := 0; i < len(extras); i += 2 { @@ -217,15 +225,12 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa if !(len(name) > 4 && name[:2] == "__" && name[len(name)-2:] == "__") { name = prometheustranslator.NormalizeLabel(name) } - l[name] = prompb.Label{ - Name: name, - Value: extras[i+1], - } + l[name] = extras[i+1] } s := make([]prompb.Label, 0, len(l)) - for _, lb := range l { - s = append(s, lb) + for k, v := range l { + s = append(s, prompb.Label{Name: k, Value: v}) } return s @@ -253,6 +258,21 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon timestamp := convertTimeStamp(pt.Timestamp()) // sum, count, and buckets of the histogram should append suffix to baseName baseName := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace) + baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels) + + createLabels := func(nameSuffix string, extras ...string) []prompb.Label { + extraLabelCount := len(extras) / 2 + labels := make([]prompb.Label, len(baseLabels), len(baseLabels)+extraLabelCount+1) // +1 for name + copy(labels, baseLabels) + + for extrasIdx := 0; extrasIdx < extraLabelCount; extrasIdx++ { + labels = append(labels, prompb.Label{Name: extras[extrasIdx], Value: extras[extrasIdx+1]}) + } + + labels = append(labels, prompb.Label{Name: nameStr, Value: baseName + nameSuffix}) + + return labels + } // If the sum is unset, it indicates the _sum metric point should be // omitted @@ -266,7 +286,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon sum.Value = math.Float64frombits(value.StaleNaN) } - sumlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+sumStr) + sumlabels := createLabels(sumStr) addSample(tsMap, sum, sumlabels, metric.Type().String()) } @@ -280,7 +300,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon count.Value = math.Float64frombits(value.StaleNaN) } - countlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+countStr) + countlabels := createLabels(countStr) addSample(tsMap, count, countlabels, metric.Type().String()) // cumulative count for conversion to cumulative histogram @@ -302,7 +322,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon bucket.Value = math.Float64frombits(value.StaleNaN) } boundStr := strconv.FormatFloat(bound, 'f', -1, 64) - labels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+bucketStr, leStr, boundStr) + labels := createLabels(bucketStr, leStr, boundStr) sig := addSample(tsMap, bucket, labels, metric.Type().String()) bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: bound}) @@ -316,7 +336,7 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon } else { infBucket.Value = float64(pt.Count()) } - infLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+bucketStr, leStr, pInfStr) + infLabels := createLabels(bucketStr, leStr, pInfStr) sig := addSample(tsMap, infBucket, infLabels, metric.Type().String()) bucketBounds = append(bucketBounds, bucketBoundsData{sig: sig, bound: math.Inf(1)}) @@ -325,14 +345,8 @@ func addSingleHistogramDataPoint(pt pmetric.HistogramDataPoint, resource pcommon // add _created time series if needed startTimestamp := pt.StartTimestamp() if settings.ExportCreatedMetric && startTimestamp != 0 { - createdLabels := createAttributes( - resource, - pt.Attributes(), - settings.ExternalLabels, - nameStr, - baseName+createdSuffix, - ) - addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String()) + labels := createLabels(createdSuffix) + addCreatedTimeSeriesIfNeeded(tsMap, labels, startTimestamp, metric.Type().String()) } } @@ -443,6 +457,22 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res timestamp := convertTimeStamp(pt.Timestamp()) // sum and count of the summary should append suffix to baseName baseName := prometheustranslator.BuildPromCompliantName(metric, settings.Namespace) + baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels) + + createLabels := func(name string, extras ...string) []prompb.Label { + extraLabelCount := len(extras) / 2 + labels := make([]prompb.Label, len(baseLabels), len(baseLabels)+extraLabelCount+1) // +1 for name + copy(labels, baseLabels) + + for extrasIdx := 0; extrasIdx < extraLabelCount; extrasIdx++ { + labels = append(labels, prompb.Label{Name: extras[extrasIdx], Value: extras[extrasIdx+1]}) + } + + labels = append(labels, prompb.Label{Name: nameStr, Value: name}) + + return labels + } + // treat sum as a sample in an individual TimeSeries sum := &prompb.Sample{ Value: pt.Sum(), @@ -451,7 +481,7 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res if pt.Flags().NoRecordedValue() { sum.Value = math.Float64frombits(value.StaleNaN) } - sumlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+sumStr) + sumlabels := createLabels(baseName + sumStr) addSample(tsMap, sum, sumlabels, metric.Type().String()) // treat count as a sample in an individual TimeSeries @@ -462,7 +492,7 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res if pt.Flags().NoRecordedValue() { count.Value = math.Float64frombits(value.StaleNaN) } - countlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName+countStr) + countlabels := createLabels(baseName + countStr) addSample(tsMap, count, countlabels, metric.Type().String()) // process each percentile/quantile @@ -476,20 +506,14 @@ func addSingleSummaryDataPoint(pt pmetric.SummaryDataPoint, resource pcommon.Res quantile.Value = math.Float64frombits(value.StaleNaN) } percentileStr := strconv.FormatFloat(qt.Quantile(), 'f', -1, 64) - qtlabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nameStr, baseName, quantileStr, percentileStr) + qtlabels := createLabels(baseName, quantileStr, percentileStr) addSample(tsMap, quantile, qtlabels, metric.Type().String()) } // add _created time series if needed startTimestamp := pt.StartTimestamp() if settings.ExportCreatedMetric && startTimestamp != 0 { - createdLabels := createAttributes( - resource, - pt.Attributes(), - settings.ExternalLabels, - nameStr, - baseName+createdSuffix, - ) + createdLabels := createLabels(baseName + createdSuffix) addCreatedTimeSeriesIfNeeded(tsMap, createdLabels, startTimestamp, metric.Type().String()) } } diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go b/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go new file mode 100644 index 000000000000..8be6bdcd60a9 --- /dev/null +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go @@ -0,0 +1,123 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewrite + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" +) + +func BenchmarkFromMetrics(b *testing.B) { + for _, resourceAttributeCount := range []int{0, 5, 50} { + b.Run(fmt.Sprintf("resource attribute count: %v", resourceAttributeCount), func(b *testing.B) { + for _, histogramCount := range []int{0, 1000} { + b.Run(fmt.Sprintf("histogram count: %v", histogramCount), func(b *testing.B) { + nonHistogramCounts := []int{0, 1000} + + if resourceAttributeCount == 0 && histogramCount == 0 { + // Don't bother running a scenario where we'll generate no series. + nonHistogramCounts = []int{1000} + } + + for _, nonHistogramCount := range nonHistogramCounts { + b.Run(fmt.Sprintf("non-histogram count: %v", nonHistogramCount), func(b *testing.B) { + for _, labelsPerMetric := range []int{2, 20} { + b.Run(fmt.Sprintf("labels per metric: %v", labelsPerMetric), func(b *testing.B) { + for _, exemplarsPerSeries := range []int{0, 5, 10} { + b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) { + payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries) + + for i := 0; i < b.N; i++ { + _, err := FromMetrics(payload.Metrics(), Settings{}) + + if err != nil { + require.NoError(b, err) + } + } + }) + } + }) + } + }) + } + }) + } + }) + } +} + +func createExportRequest(resourceAttributeCount int, histogramCount int, nonHistogramCount int, labelsPerMetric int, exemplarsPerSeries int) pmetricotlp.ExportRequest { + request := pmetricotlp.NewExportRequest() + + rm := request.Metrics().ResourceMetrics().AppendEmpty() + generateAttributes(rm.Resource().Attributes(), "resource", resourceAttributeCount) + + metrics := rm.ScopeMetrics().AppendEmpty().Metrics() + ts := pcommon.NewTimestampFromTime(time.Now()) + + for i := 1; i <= histogramCount; i++ { + m := metrics.AppendEmpty() + m.SetEmptyHistogram() + m.SetName(fmt.Sprintf("histogram-%v", i)) + m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + h := m.Histogram().DataPoints().AppendEmpty() + h.SetTimestamp(ts) + + // Set 50 samples, 10 each with values 0.5, 1, 2, 4, and 8 + h.SetCount(50) + h.SetSum(155) + h.BucketCounts().FromRaw([]uint64{10, 10, 10, 10, 10, 0}) + h.ExplicitBounds().FromRaw([]float64{.5, 1, 2, 4, 8, 16}) // Bucket boundaries include the upper limit (ie. each sample is on the upper limit of its bucket) + + generateAttributes(h.Attributes(), "series", labelsPerMetric) + generateExemplars(h.Exemplars(), exemplarsPerSeries, ts) + } + + for i := 1; i <= nonHistogramCount; i++ { + m := metrics.AppendEmpty() + m.SetEmptySum() + m.SetName(fmt.Sprintf("sum-%v", i)) + m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + point := m.Sum().DataPoints().AppendEmpty() + point.SetTimestamp(ts) + point.SetDoubleValue(1.23) + generateAttributes(point.Attributes(), "series", labelsPerMetric) + generateExemplars(point.Exemplars(), exemplarsPerSeries, ts) + } + + for i := 1; i <= nonHistogramCount; i++ { + m := metrics.AppendEmpty() + m.SetEmptyGauge() + m.SetName(fmt.Sprintf("gauge-%v", i)) + point := m.Gauge().DataPoints().AppendEmpty() + point.SetTimestamp(ts) + point.SetDoubleValue(1.23) + generateAttributes(point.Attributes(), "series", labelsPerMetric) + generateExemplars(point.Exemplars(), exemplarsPerSeries, ts) + } + + return request +} + +func generateAttributes(m pcommon.Map, prefix string, count int) { + for i := 1; i <= count; i++ { + m.PutStr(fmt.Sprintf("%v-name-%v", prefix, i), fmt.Sprintf("value-%v", i)) + } +} + +func generateExemplars(exemplars pmetric.ExemplarSlice, count int, ts pcommon.Timestamp) { + for i := 1; i <= count; i++ { + e := exemplars.AppendEmpty() + e.SetTimestamp(ts) + e.SetDoubleValue(2.22) + e.SetSpanID(pcommon.SpanID{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}) + e.SetTraceID(pcommon.TraceID{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f}) + } +} From beffb82bb827946a8b67579cbb6c4d1ed49a722b Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 17 Jul 2023 13:35:59 +1000 Subject: [PATCH 2/2] Add changelog entry. --- .../prometheus-remote-write-performance.yaml | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100755 .chloggen/prometheus-remote-write-performance.yaml diff --git a/.chloggen/prometheus-remote-write-performance.yaml b/.chloggen/prometheus-remote-write-performance.yaml new file mode 100755 index 000000000000..d96c994cbeb5 --- /dev/null +++ b/.chloggen/prometheus-remote-write-performance.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: improve the latency and memory utilisation of the conversion from OpenTelemetry to Prometheus remote write + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [24288] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: