From 24c666da4a5a3c7cb9aa2704fbb5e8a6e76500e9 Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Mon, 18 Jul 2022 13:34:08 +0100 Subject: [PATCH 01/14] Implement cumulative-to-delta over histograms --- .../internal/tracking/identity.go | 8 +- .../internal/tracking/identity_test.go | 21 ++++ .../cumulativetodeltaprocessor/processor.go | 116 ++++++++++++++++++ 3 files changed, 144 insertions(+), 1 deletion(-) diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/identity.go b/processor/cumulativetodeltaprocessor/internal/tracking/identity.go index fa590da7b4b1..784622291063 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/identity.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/identity.go @@ -32,6 +32,7 @@ type MetricIdentity struct { StartTimestamp pcommon.Timestamp Attributes pcommon.Map MetricValueType pmetric.NumberDataPointValueType + MetricField string } const A = int32('A') @@ -75,6 +76,11 @@ func (mi *MetricIdentity) Write(b *bytes.Buffer) { }) b.WriteByte(SEP) b.WriteString(strconv.FormatInt(int64(mi.StartTimestamp), 36)) + + if mi.MetricField != "" { + b.WriteByte(SEP) + b.WriteString(mi.MetricField) + } } func (mi *MetricIdentity) IsFloatVal() bool { @@ -82,5 +88,5 @@ func (mi *MetricIdentity) IsFloatVal() bool { } func (mi *MetricIdentity) IsSupportedMetricType() bool { - return mi.MetricDataType == pmetric.MetricDataTypeSum + return mi.MetricDataType == pmetric.MetricDataTypeSum || mi.MetricDataType == pmetric.MetricDataTypeHistogram } diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go b/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go index cd40b13b23af..26b2223ea834 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go @@ -43,6 +43,7 @@ func TestMetricIdentity_Write(t *testing.T) { StartTimestamp pcommon.Timestamp Attributes pcommon.Map MetricValueType pmetric.NumberDataPointValueType + MetricField string } tests := []struct { name string @@ -72,6 +73,18 @@ func TestMetricIdentity_Write(t *testing.T) { }, want: []string{"C" + SEPSTR + "B", "Y"}, }, + { + name: "histogram sum", + fields: fields{ + Resource: resource, + InstrumentationLibrary: il, + Attributes: attributes, + MetricDataType: pmetric.MetricDataTypeHistogram, + MetricValueType: pmetric.NumberDataPointValueTypeInt, + MetricField: "bound_100", + }, + want: []string{"D" + SEPSTR + "B", "bound_100"}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -85,6 +98,7 @@ func TestMetricIdentity_Write(t *testing.T) { StartTimestamp: tt.fields.StartTimestamp, Attributes: tt.fields.Attributes, MetricValueType: tt.fields.MetricValueType, + MetricField: tt.fields.MetricField, } b := &bytes.Buffer{} mi.Write(b) @@ -159,6 +173,13 @@ func TestMetricIdentity_IsSupportedMetricType(t *testing.T) { fields: fields{ MetricDataType: pmetric.MetricDataTypeHistogram, }, + want: true, + }, + { + name: "gauge", + fields: fields{ + MetricDataType: pmetric.MetricDataTypeGauge, + }, want: false, }, } diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index c42f70ef26c2..4c755cc688f0 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -15,9 +15,12 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor" import ( + "bytes" "context" + "fmt" "math" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" @@ -89,6 +92,47 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme MetricIsMonotonic: ms.IsMonotonic(), } ctdp.convertDataPoints(ms.DataPoints(), baseIdentity) + ms.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) + return ms.DataPoints().Len() == 0 + case pmetric.MetricDataTypeHistogram: + ms := m.Histogram() + if ms.AggregationTemporality() != pmetric.MetricAggregationTemporalityCumulative { + return false + } + + histogramIdentities := make([]tracking.MetricIdentity, 0, 16) + + countIdentity := tracking.MetricIdentity{ + Resource: rm.Resource(), + InstrumentationLibrary: ilm.Scope(), + MetricDataType: m.DataType(), + MetricName: m.Name(), + MetricUnit: m.Unit(), + MetricIsMonotonic: true, + MetricValueType: pmetric.NumberDataPointValueTypeInt, + MetricField: "count", + } + + sumIdentity := countIdentity + sumIdentity.MetricField = "sum" + sumIdentity.MetricValueType = pmetric.NumberDataPointValueTypeDouble + + histogramIdentities = append(histogramIdentities, countIdentity, sumIdentity) + + if ms.DataPoints().Len() == 0 { + return false + } + + firstDataPoint := ms.DataPoints().At(0) + for index := 0; index < firstDataPoint.BucketCounts().Len(); index++ { + metricField := fmt.Sprintf("bucket_%d", index) + bucketIdentity := countIdentity + bucketIdentity.MetricField = metricField + histogramIdentities = append(histogramIdentities, bucketIdentity) + } + + ctdp.convertHistogramDataPoints(ms.DataPoints(), &histogramIdentities) + ms.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) return ms.DataPoints().Len() == 0 default: @@ -159,3 +203,75 @@ func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseId }) } } + +func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{}, baseIdentities *[]tracking.MetricIdentity) { + + if dps, ok := in.(pmetric.HistogramDataPointSlice); ok { + dps.RemoveIf(func(dp pmetric.HistogramDataPoint) bool { + countId := (*baseIdentities)[0] + countId.StartTimestamp = dp.StartTimestamp() + countId.Attributes = dp.Attributes() + countPoint := tracking.MetricPoint{ + Identity: countId, + Value: tracking.ValuePoint{ + ObservedTimestamp: dp.Timestamp(), + IntValue: int64(dp.Count()), + }, + } + countDelta, countValid := ctdp.deltaCalculator.Convert(countPoint) + if !countValid { + return true + } + + dp.SetCount(uint64(countDelta.IntValue)) + + if dp.HasSum() { + sumId := (*baseIdentities)[1] + sumId.StartTimestamp = dp.StartTimestamp() + sumId.Attributes = dp.Attributes() + sumPoint := tracking.MetricPoint{ + Identity: sumId, + Value: tracking.ValuePoint{ + ObservedTimestamp: dp.Timestamp(), + FloatValue: dp.Sum(), + }, + } + sumDelta, sumValid := ctdp.deltaCalculator.Convert(sumPoint) + if !sumValid { + return true + } + + dp.SetSum(sumDelta.FloatValue) + } + + firstBucketIndex := 2 + rawCounts := dp.BucketCounts().AsRaw() + for index := 0; index < len(rawCounts); index++ { + bucketId := (*baseIdentities)[firstBucketIndex+index] + bucketId.StartTimestamp = dp.StartTimestamp() + bucketId.Attributes = dp.Attributes() + testBytes := &bytes.Buffer{} + bucketId.Write(testBytes) + fmt.Println(testBytes.String()) + bucketPoint := tracking.MetricPoint{ + Identity: bucketId, + Value: tracking.ValuePoint{ + ObservedTimestamp: dp.Timestamp(), + IntValue: int64(rawCounts[index]), + }, + } + bucketDelta, bucketValid := ctdp.deltaCalculator.Convert(bucketPoint) + if !bucketValid { + return true + } + + rawCounts[index] = uint64(bucketDelta.IntValue) + } + dp.SetBucketCounts(pcommon.NewImmutableUInt64Slice(rawCounts)) + + dp.SetStartTimestamp(countDelta.StartTimestamp) + + return false + }) + } +} From e73cd5d07e8a8da7dd7e14a79c3755578bc2e0ae Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Mon, 18 Jul 2022 14:18:18 +0100 Subject: [PATCH 02/14] Add unit tests for use with histograms --- .../cumulativetodeltaprocessor/processor.go | 4 - .../processor_test.go | 145 ++++++++++++++++-- 2 files changed, 131 insertions(+), 18 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 4c755cc688f0..055cf567d05b 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -15,7 +15,6 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor" import ( - "bytes" "context" "fmt" "math" @@ -250,9 +249,6 @@ func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{ bucketId := (*baseIdentities)[firstBucketIndex+index] bucketId.StartTimestamp = dp.StartTimestamp() bucketId.Attributes = dp.Attributes() - testBytes := &bytes.Buffer{} - bucketId.Write(testBytes) - fmt.Println(testBytes.String()) bucketPoint := tracking.MetricPoint{ Identity: bucketId, Value: tracking.ValuePoint{ diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index a7101595168d..74842499fe76 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -33,12 +33,20 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset" ) -type testMetric struct { +type testSumMetric struct { metricNames []string metricValues [][]float64 isCumulative []bool } +type testHistogramMetric struct { + metricNames []string + metricCounts [][]uint64 + metricSums [][]float64 + metricBuckets [][][]uint64 + isCumulative []bool +} + type cumulativeToDeltaTest struct { name string metrics []string @@ -53,12 +61,12 @@ var ( { name: "legacy_cumulative_to_delta_one_positive", metrics: []string{"metric_1"}, - inMetrics: generateTestMetrics(testMetric{ + inMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100, 200, 500}, {4}}, isCumulative: []bool{true, true}, }), - outMetrics: generateTestMetrics(testMetric{ + outMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100, 100, 300}, {4}}, isCumulative: []bool{false, true}, @@ -67,12 +75,12 @@ var ( { name: "legacy_cumulative_to_delta_nan_value", metrics: []string{"metric_1"}, - inMetrics: generateTestMetrics(testMetric{ + inMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100, 200, math.NaN()}, {4}}, isCumulative: []bool{true, true}, }), - outMetrics: generateTestMetrics(testMetric{ + outMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100, 100, math.NaN()}, {4}}, isCumulative: []bool{false, true}, @@ -88,12 +96,12 @@ var ( RegexpConfig: nil, }, }, - inMetrics: generateTestMetrics(testMetric{ + inMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, isCumulative: []bool{true, true}, }), - outMetrics: generateTestMetrics(testMetric{ + outMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, isCumulative: []bool{true, true}, @@ -108,17 +116,77 @@ var ( RegexpConfig: nil, }, }, - inMetrics: generateTestMetrics(testMetric{ + inMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100, 200, 500}, {4}}, isCumulative: []bool{true, true}, }), - outMetrics: generateTestMetrics(testMetric{ + outMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100, 100, 300}, {4}}, isCumulative: []bool{false, true}, }), }, + { + name: "cumulative_to_delta_histogram_one_positive", + include: MatchMetrics{ + Metrics: []string{"metric_1"}, + Config: filterset.Config{ + MatchType: "strict", + RegexpConfig: nil, + }, + }, + inMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 200, 500}, {4}}, + metricSums: [][]float64{{100, 200, 500}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{true, true}, + }), + outMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 100, 300}, {4}}, + metricSums: [][]float64{{100, 100, 300}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {50, 25, 25}, {150, 75, 75}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{false, true}, + }), + }, + { + name: "cumulative_to_delta_histogram_one_positive_without_sums", + include: MatchMetrics{ + Metrics: []string{"metric_1"}, + Config: filterset.Config{ + MatchType: "strict", + RegexpConfig: nil, + }, + }, + inMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 200, 500}, {4}}, + metricSums: [][]float64{{}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{true, true}, + }), + outMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 100, 300}, {4}}, + metricSums: [][]float64{{}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {50, 25, 25}, {150, 75, 75}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{false, true}, + }), + }, { name: "cumulative_to_delta_nan_value", include: MatchMetrics{ @@ -128,12 +196,12 @@ var ( RegexpConfig: nil, }, }, - inMetrics: generateTestMetrics(testMetric{ + inMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100, 200, math.NaN()}, {4}}, isCumulative: []bool{true, true}, }), - outMetrics: generateTestMetrics(testMetric{ + outMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100, 100, math.NaN()}, {4}}, isCumulative: []bool{false, true}, @@ -156,12 +224,12 @@ var ( RegexpConfig: nil, }, }, - inMetrics: generateTestMetrics(testMetric{ + inMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, isCumulative: []bool{true, true}, }), - outMetrics: generateTestMetrics(testMetric{ + outMetrics: generateTestSumMetrics(testSumMetric{ metricNames: []string{"metric_1", "metric_2"}, metricValues: [][]float64{{100}, {4}}, isCumulative: []bool{true, true}, @@ -240,6 +308,20 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { } } + if eM.DataType() == pmetric.MetricDataTypeHistogram { + eDataPoints := eM.Histogram().DataPoints() + aDataPoints := aM.Histogram().DataPoints() + + require.Equal(t, eDataPoints.Len(), aDataPoints.Len()) + require.Equal(t, eM.Histogram().AggregationTemporality(), aM.Histogram().AggregationTemporality()) + + for j := 0; j < eDataPoints.Len(); j++ { + require.Equal(t, eDataPoints.At(j).Count(), aDataPoints.At(j).Count()) + require.Equal(t, eDataPoints.At(j).HasSum(), aDataPoints.At(j).HasSum()) + require.Equal(t, eDataPoints.At(j).Sum(), aDataPoints.At(j).Sum()) + require.Equal(t, eDataPoints.At(j).BucketCounts().AsRaw(), aDataPoints.At(j).BucketCounts().AsRaw()) + } + } } require.NoError(t, mgp.Shutdown(ctx)) @@ -247,7 +329,7 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { } } -func generateTestMetrics(tm testMetric) pmetric.Metrics { +func generateTestSumMetrics(tm testSumMetric) pmetric.Metrics { md := pmetric.NewMetrics() now := time.Now() @@ -277,6 +359,41 @@ func generateTestMetrics(tm testMetric) pmetric.Metrics { return md } +func generateTestHistogramMetrics(tm testHistogramMetric) pmetric.Metrics { + md := pmetric.NewMetrics() + now := time.Now() + + rm := md.ResourceMetrics().AppendEmpty() + ms := rm.ScopeMetrics().AppendEmpty().Metrics() + for i, name := range tm.metricNames { + m := ms.AppendEmpty() + m.SetName(name) + m.SetDataType(pmetric.MetricDataTypeHistogram) + + hist := m.Histogram() + + if tm.isCumulative[i] { + hist.SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative) + } else { + hist.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) + } + + for index, count := range tm.metricCounts[i] { + dp := m.Histogram().DataPoints().AppendEmpty() + dp.SetTimestamp(pcommon.NewTimestampFromTime(now.Add(10 * time.Second))) + dp.SetCount(count) + + sums := tm.metricSums[i] + if len(sums) > 0 { + dp.SetSum(sums[index]) + } + dp.SetBucketCounts(pcommon.NewImmutableUInt64Slice(tm.metricBuckets[i][index])) + } + } + + return md +} + func BenchmarkConsumeMetrics(b *testing.B) { c := consumertest.NewNop() params := component.ProcessorCreateSettings{ From 24acf48ba6b646df33935f3533c461b5dcb51d33 Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Mon, 18 Jul 2022 16:46:59 +0100 Subject: [PATCH 03/14] Update changelog --- unreleased/issue-12423.yaml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100755 unreleased/issue-12423.yaml diff --git a/unreleased/issue-12423.yaml b/unreleased/issue-12423.yaml new file mode 100755 index 000000000000..88318fa60605 --- /dev/null +++ b/unreleased/issue-12423.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cumulativetodeltaprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Cumulative Histogram metrics will now be converted to delta temporality and are no longer skipped. + +# One or more tracking issues related to the change +issues: [12423] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This means that consumers who currently rely on Histograms being skipped would need to ensure they are excluded via config. From 67977af9cea5a17aae018259422f63113e6044d2 Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Mon, 18 Jul 2022 16:51:12 +0100 Subject: [PATCH 04/14] Update README --- processor/cumulativetodeltaprocessor/README.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/README.md b/processor/cumulativetodeltaprocessor/README.md index 6807646066f1..c7ebee1231b4 100644 --- a/processor/cumulativetodeltaprocessor/README.md +++ b/processor/cumulativetodeltaprocessor/README.md @@ -9,7 +9,7 @@ ## Description -The cumulative to delta processor (`cumulativetodeltaprocessor`) converts monotonic, cumulative sum metrics to monotonic, delta sum metrics. Non-monotonic sums are excluded. +The cumulative to delta processor (`cumulativetodeltaprocessor`) converts monotonic, cumulative sum and histogram metrics to monotonic, delta metrics. Non-monotonic sums are excluded. ## Configuration @@ -31,7 +31,7 @@ processors: # processor name: cumulativetodelta cumulativetodelta: - # list the exact cumulative sum metrics to convert to delta + # list the exact cumulative sum or histogram metrics to convert to delta include: metrics: - @@ -47,8 +47,8 @@ processors: # processor name: cumulativetodelta cumulativetodelta: - # Convert cumulative sum metrics to delta - # if and only if 'metric' is in the name + # Convert cumulative sum or histogram metrics to delta + # if and only if 'metric' is in the name include: metrics: - "*metric*" @@ -60,8 +60,8 @@ processors: # processor name: cumulativetodelta cumulativetodelta: - # Convert cumulative sum metrics to delta - # if and only if 'metric' is not in the name + # Convert cumulative sum or histogram metrics to delta + # if and only if 'metric' is not in the name exclude: metrics: - "*metric*" @@ -73,7 +73,7 @@ processors: # processor name: cumulativetodelta cumulativetodelta: # If include/exclude are not specified - # convert all cumulative sum metrics to delta + # convert all cumulative sum or histogram metrics to delta ``` ## Warnings From 529a3e9bc3a65d92ab247cfcae7f21b413f09180 Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Tue, 19 Jul 2022 10:14:52 +0100 Subject: [PATCH 05/14] Collect histogram identities into a struct --- .../internal/tracking/identity.go | 6 ++++++ .../cumulativetodeltaprocessor/processor.go | 21 +++++++++++-------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/identity.go b/processor/cumulativetodeltaprocessor/internal/tracking/identity.go index 784622291063..ffd9bc2a367b 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/identity.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/identity.go @@ -35,6 +35,12 @@ type MetricIdentity struct { MetricField string } +type HistogramIdentities struct { + CountIdentity MetricIdentity + SumIdentity MetricIdentity + BucketIdentities []MetricIdentity +} + const A = int32('A') const SEP = byte(0x1E) const SEPSTR = string(SEP) diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 055cf567d05b..d8674f0f8c8d 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -99,8 +99,6 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme return false } - histogramIdentities := make([]tracking.MetricIdentity, 0, 16) - countIdentity := tracking.MetricIdentity{ Resource: rm.Resource(), InstrumentationLibrary: ilm.Scope(), @@ -116,7 +114,7 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme sumIdentity.MetricField = "sum" sumIdentity.MetricValueType = pmetric.NumberDataPointValueTypeDouble - histogramIdentities = append(histogramIdentities, countIdentity, sumIdentity) + bucketIdentities := make([]tracking.MetricIdentity, 0, 16) if ms.DataPoints().Len() == 0 { return false @@ -127,7 +125,13 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme metricField := fmt.Sprintf("bucket_%d", index) bucketIdentity := countIdentity bucketIdentity.MetricField = metricField - histogramIdentities = append(histogramIdentities, bucketIdentity) + bucketIdentities = append(bucketIdentities, bucketIdentity) + } + + histogramIdentities := tracking.HistogramIdentities{ + CountIdentity: countIdentity, + SumIdentity: sumIdentity, + BucketIdentities: bucketIdentities, } ctdp.convertHistogramDataPoints(ms.DataPoints(), &histogramIdentities) @@ -203,11 +207,11 @@ func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseId } } -func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{}, baseIdentities *[]tracking.MetricIdentity) { +func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{}, baseIdentities *tracking.HistogramIdentities) { if dps, ok := in.(pmetric.HistogramDataPointSlice); ok { dps.RemoveIf(func(dp pmetric.HistogramDataPoint) bool { - countId := (*baseIdentities)[0] + countId := baseIdentities.CountIdentity countId.StartTimestamp = dp.StartTimestamp() countId.Attributes = dp.Attributes() countPoint := tracking.MetricPoint{ @@ -225,7 +229,7 @@ func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{ dp.SetCount(uint64(countDelta.IntValue)) if dp.HasSum() { - sumId := (*baseIdentities)[1] + sumId := baseIdentities.SumIdentity sumId.StartTimestamp = dp.StartTimestamp() sumId.Attributes = dp.Attributes() sumPoint := tracking.MetricPoint{ @@ -243,10 +247,9 @@ func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{ dp.SetSum(sumDelta.FloatValue) } - firstBucketIndex := 2 rawCounts := dp.BucketCounts().AsRaw() for index := 0; index < len(rawCounts); index++ { - bucketId := (*baseIdentities)[firstBucketIndex+index] + bucketId := baseIdentities.BucketIdentities[index] bucketId.StartTimestamp = dp.StartTimestamp() bucketId.Attributes = dp.Attributes() bucketPoint := tracking.MetricPoint{ From 2ef3fea1ce5a8484fd38c53aecd7a0b54be743b7 Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Tue, 19 Jul 2022 13:16:15 +0100 Subject: [PATCH 06/14] Refactor convertHistogramDataPoints and don't convert unless all changes are valid --- .../cumulativetodeltaprocessor/processor.go | 97 +++++++++---------- 1 file changed, 47 insertions(+), 50 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index d8674f0f8c8d..3c2c92bac15a 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -164,6 +164,32 @@ func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metricName string) b (ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metricName)) } +func (ctdp *cumulativeToDeltaProcessor) convertFloatValue(id tracking.MetricIdentity, dp pmetric.HistogramDataPoint, value float64) (tracking.DeltaValue, bool) { + id.StartTimestamp = dp.StartTimestamp() + id.Attributes = dp.Attributes() + trackingPoint := tracking.MetricPoint{ + Identity: id, + Value: tracking.ValuePoint{ + ObservedTimestamp: dp.Timestamp(), + FloatValue: value, + }, + } + return ctdp.deltaCalculator.Convert(trackingPoint) +} + +func (ctdp *cumulativeToDeltaProcessor) convertIntValue(id tracking.MetricIdentity, dp pmetric.HistogramDataPoint, value int64) (tracking.DeltaValue, bool) { + id.StartTimestamp = dp.StartTimestamp() + id.Attributes = dp.Attributes() + trackingPoint := tracking.MetricPoint{ + Identity: id, + Value: tracking.ValuePoint{ + ObservedTimestamp: dp.Timestamp(), + IntValue: value, + }, + } + return ctdp.deltaCalculator.Convert(trackingPoint) +} + func (ctdp *cumulativeToDeltaProcessor) convertDataPoints(in interface{}, baseIdentity tracking.MetricIdentity) { if dps, ok := in.(pmetric.NumberDataPointSlice); ok { @@ -212,65 +238,36 @@ func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{ if dps, ok := in.(pmetric.HistogramDataPointSlice); ok { dps.RemoveIf(func(dp pmetric.HistogramDataPoint) bool { countId := baseIdentities.CountIdentity - countId.StartTimestamp = dp.StartTimestamp() - countId.Attributes = dp.Attributes() - countPoint := tracking.MetricPoint{ - Identity: countId, - Value: tracking.ValuePoint{ - ObservedTimestamp: dp.Timestamp(), - IntValue: int64(dp.Count()), - }, - } - countDelta, countValid := ctdp.deltaCalculator.Convert(countPoint) - if !countValid { - return true - } + countDelta, countValid := ctdp.convertIntValue(countId, dp, int64(dp.Count())) - dp.SetCount(uint64(countDelta.IntValue)) + hasSum := dp.HasSum() && !math.IsNaN(dp.Sum()) + sumDelta, sumValid := tracking.DeltaValue{}, true - if dp.HasSum() { + if hasSum { sumId := baseIdentities.SumIdentity - sumId.StartTimestamp = dp.StartTimestamp() - sumId.Attributes = dp.Attributes() - sumPoint := tracking.MetricPoint{ - Identity: sumId, - Value: tracking.ValuePoint{ - ObservedTimestamp: dp.Timestamp(), - FloatValue: dp.Sum(), - }, - } - sumDelta, sumValid := ctdp.deltaCalculator.Convert(sumPoint) - if !sumValid { - return true - } - - dp.SetSum(sumDelta.FloatValue) + sumDelta, sumValid = ctdp.convertFloatValue(sumId, dp, dp.Sum()) } - rawCounts := dp.BucketCounts().AsRaw() - for index := 0; index < len(rawCounts); index++ { + bucketsValid := true + rawBucketCounts := dp.BucketCounts().AsRaw() + for index := 0; index < len(rawBucketCounts); index++ { bucketId := baseIdentities.BucketIdentities[index] - bucketId.StartTimestamp = dp.StartTimestamp() - bucketId.Attributes = dp.Attributes() - bucketPoint := tracking.MetricPoint{ - Identity: bucketId, - Value: tracking.ValuePoint{ - ObservedTimestamp: dp.Timestamp(), - IntValue: int64(rawCounts[index]), - }, - } - bucketDelta, bucketValid := ctdp.deltaCalculator.Convert(bucketPoint) - if !bucketValid { - return true - } - - rawCounts[index] = uint64(bucketDelta.IntValue) + bucketDelta, bucketValid := ctdp.convertIntValue(bucketId, dp, int64(rawBucketCounts[index])) + rawBucketCounts[index] = uint64(bucketDelta.IntValue) + bucketsValid = bucketsValid && bucketValid } - dp.SetBucketCounts(pcommon.NewImmutableUInt64Slice(rawCounts)) - dp.SetStartTimestamp(countDelta.StartTimestamp) + if countValid && sumValid && bucketsValid { + dp.SetStartTimestamp(countDelta.StartTimestamp) + dp.SetCount(uint64(countDelta.IntValue)) + if hasSum { + dp.SetSum(sumDelta.FloatValue) + } + dp.SetBucketCounts(pcommon.NewImmutableUInt64Slice(rawBucketCounts)) + return false + } - return false + return true }) } } From 3d903bc9c1e71604daacd86a35ba3c0e1f630261 Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Tue, 19 Jul 2022 13:21:48 +0100 Subject: [PATCH 07/14] Add test case for NaN in Histogram sums --- .../processor_test.go | 112 ++++++++++++------ 1 file changed, 73 insertions(+), 39 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index 74842499fe76..98f4aba8bbea 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -127,6 +127,54 @@ var ( isCumulative: []bool{false, true}, }), }, + { + name: "cumulative_to_delta_nan_value", + include: MatchMetrics{ + Metrics: []string{"_1"}, + Config: filterset.Config{ + MatchType: "regexp", + RegexpConfig: nil, + }, + }, + inMetrics: generateTestSumMetrics(testSumMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricValues: [][]float64{{100, 200, math.NaN()}, {4}}, + isCumulative: []bool{true, true}, + }), + outMetrics: generateTestSumMetrics(testSumMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricValues: [][]float64{{100, 100, math.NaN()}, {4}}, + isCumulative: []bool{false, true}, + }), + }, + { + name: "cumulative_to_delta_exclude_precedence", + metrics: nil, + include: MatchMetrics{ + Metrics: []string{".*"}, + Config: filterset.Config{ + MatchType: "regexp", + RegexpConfig: nil, + }, + }, + exclude: MatchMetrics{ + Metrics: []string{".*"}, + Config: filterset.Config{ + MatchType: "regexp", + RegexpConfig: nil, + }, + }, + inMetrics: generateTestSumMetrics(testSumMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricValues: [][]float64{{100}, {4}}, + isCumulative: []bool{true, true}, + }), + outMetrics: generateTestSumMetrics(testSumMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricValues: [][]float64{{100}, {4}}, + isCumulative: []bool{true, true}, + }), + }, { name: "cumulative_to_delta_histogram_one_positive", include: MatchMetrics{ @@ -158,7 +206,7 @@ var ( }), }, { - name: "cumulative_to_delta_histogram_one_positive_without_sums", + name: "cumulative_to_delta_histogram_nan_sum", include: MatchMetrics{ Metrics: []string{"metric_1"}, Config: filterset.Config{ @@ -169,7 +217,7 @@ var ( inMetrics: generateTestHistogramMetrics(testHistogramMetric{ metricNames: []string{"metric_1", "metric_2"}, metricCounts: [][]uint64{{100, 200, 500}, {4}}, - metricSums: [][]float64{{}, {4}}, + metricSums: [][]float64{{100, math.NaN(), 500}, {4}}, metricBuckets: [][][]uint64{ {{50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, {{4, 4, 4}}, @@ -179,7 +227,7 @@ var ( outMetrics: generateTestHistogramMetrics(testHistogramMetric{ metricNames: []string{"metric_1", "metric_2"}, metricCounts: [][]uint64{{100, 100, 300}, {4}}, - metricSums: [][]float64{{}, {4}}, + metricSums: [][]float64{{100, math.NaN(), 400}, {4}}, metricBuckets: [][][]uint64{ {{50, 25, 25}, {50, 25, 25}, {150, 75, 75}}, {{4, 4, 4}}, @@ -188,51 +236,33 @@ var ( }), }, { - name: "cumulative_to_delta_nan_value", + name: "cumulative_to_delta_histogram_one_positive_without_sums", include: MatchMetrics{ - Metrics: []string{"_1"}, + Metrics: []string{"metric_1"}, Config: filterset.Config{ - MatchType: "regexp", + MatchType: "strict", RegexpConfig: nil, }, }, - inMetrics: generateTestSumMetrics(testSumMetric{ - metricNames: []string{"metric_1", "metric_2"}, - metricValues: [][]float64{{100, 200, math.NaN()}, {4}}, - isCumulative: []bool{true, true}, - }), - outMetrics: generateTestSumMetrics(testSumMetric{ + inMetrics: generateTestHistogramMetrics(testHistogramMetric{ metricNames: []string{"metric_1", "metric_2"}, - metricValues: [][]float64{{100, 100, math.NaN()}, {4}}, - isCumulative: []bool{false, true}, - }), - }, - { - name: "cumulative_to_delta_exclude_precedence", - metrics: nil, - include: MatchMetrics{ - Metrics: []string{".*"}, - Config: filterset.Config{ - MatchType: "regexp", - RegexpConfig: nil, - }, - }, - exclude: MatchMetrics{ - Metrics: []string{".*"}, - Config: filterset.Config{ - MatchType: "regexp", - RegexpConfig: nil, + metricCounts: [][]uint64{{100, 200, 500}, {4}}, + metricSums: [][]float64{{}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + {{4, 4, 4}}, }, - }, - inMetrics: generateTestSumMetrics(testSumMetric{ - metricNames: []string{"metric_1", "metric_2"}, - metricValues: [][]float64{{100}, {4}}, isCumulative: []bool{true, true}, }), - outMetrics: generateTestSumMetrics(testSumMetric{ + outMetrics: generateTestHistogramMetrics(testHistogramMetric{ metricNames: []string{"metric_1", "metric_2"}, - metricValues: [][]float64{{100}, {4}}, - isCumulative: []bool{true, true}, + metricCounts: [][]uint64{{100, 100, 300}, {4}}, + metricSums: [][]float64{{}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {50, 25, 25}, {150, 75, 75}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{false, true}, }), }, } @@ -318,7 +348,11 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { for j := 0; j < eDataPoints.Len(); j++ { require.Equal(t, eDataPoints.At(j).Count(), aDataPoints.At(j).Count()) require.Equal(t, eDataPoints.At(j).HasSum(), aDataPoints.At(j).HasSum()) - require.Equal(t, eDataPoints.At(j).Sum(), aDataPoints.At(j).Sum()) + if math.IsNaN(eDataPoints.At(j).Sum()) { + require.True(t, math.IsNaN(aDataPoints.At(j).Sum())) + } else { + require.Equal(t, eDataPoints.At(j).Sum(), aDataPoints.At(j).Sum()) + } require.Equal(t, eDataPoints.At(j).BucketCounts().AsRaw(), aDataPoints.At(j).BucketCounts().AsRaw()) } } From de2c6928e399870c5d75cc2051e81956a30aa275 Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Tue, 19 Jul 2022 19:23:33 +0100 Subject: [PATCH 08/14] Fix lint errors --- processor/cumulativetodeltaprocessor/processor.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 3c2c92bac15a..ceee1217a650 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -237,21 +237,21 @@ func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{ if dps, ok := in.(pmetric.HistogramDataPointSlice); ok { dps.RemoveIf(func(dp pmetric.HistogramDataPoint) bool { - countId := baseIdentities.CountIdentity + countID := baseIdentities.CountIdentity countDelta, countValid := ctdp.convertIntValue(countId, dp, int64(dp.Count())) hasSum := dp.HasSum() && !math.IsNaN(dp.Sum()) sumDelta, sumValid := tracking.DeltaValue{}, true if hasSum { - sumId := baseIdentities.SumIdentity + sumID := baseIdentities.SumIdentity sumDelta, sumValid = ctdp.convertFloatValue(sumId, dp, dp.Sum()) } bucketsValid := true rawBucketCounts := dp.BucketCounts().AsRaw() for index := 0; index < len(rawBucketCounts); index++ { - bucketId := baseIdentities.BucketIdentities[index] + bucketID := baseIdentities.BucketIdentities[index] bucketDelta, bucketValid := ctdp.convertIntValue(bucketId, dp, int64(rawBucketCounts[index])) rawBucketCounts[index] = uint64(bucketDelta.IntValue) bucketsValid = bucketsValid && bucketValid From 3349bf74beb3b9ab31ca78e0e28f4c986a7d9805 Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Wed, 20 Jul 2022 15:30:54 +0100 Subject: [PATCH 09/14] Address review comments --- .../internal/tracking/identity_test.go | 21 ++++++++++ .../cumulativetodeltaprocessor/processor.go | 41 +++++++++++-------- 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go b/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go index 26b2223ea834..edb0017ef935 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go @@ -175,6 +175,13 @@ func TestMetricIdentity_IsSupportedMetricType(t *testing.T) { }, want: true, }, + { + name: "none", + fields: fields{ + MetricDataType: pmetric.MetricDataTypeNone, + }, + want: false, + }, { name: "gauge", fields: fields{ @@ -182,6 +189,20 @@ func TestMetricIdentity_IsSupportedMetricType(t *testing.T) { }, want: false, }, + { + name: "exponential_histogram", + fields: fields{ + MetricDataType: pmetric.MetricDataTypeExponentialHistogram, + }, + want: false, + }, + { + name: "summary", + fields: fields{ + MetricDataType: pmetric.MetricDataTypeSummary, + }, + want: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index ceee1217a650..71e574d01112 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -99,6 +99,10 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme return false } + if ms.DataPoints().Len() == 0 { + return false + } + countIdentity := tracking.MetricIdentity{ Resource: rm.Resource(), InstrumentationLibrary: ilm.Scope(), @@ -114,19 +118,7 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme sumIdentity.MetricField = "sum" sumIdentity.MetricValueType = pmetric.NumberDataPointValueTypeDouble - bucketIdentities := make([]tracking.MetricIdentity, 0, 16) - - if ms.DataPoints().Len() == 0 { - return false - } - - firstDataPoint := ms.DataPoints().At(0) - for index := 0; index < firstDataPoint.BucketCounts().Len(); index++ { - metricField := fmt.Sprintf("bucket_%d", index) - bucketIdentity := countIdentity - bucketIdentity.MetricField = metricField - bucketIdentities = append(bucketIdentities, bucketIdentity) - } + bucketIdentities := makeBucketIdentities(countIdentity, ms.DataPoints().At(0)) histogramIdentities := tracking.HistogramIdentities{ CountIdentity: countIdentity, @@ -149,6 +141,19 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme return md, nil } +func makeBucketIdentities(baseIdentity tracking.MetricIdentity, dp pmetric.HistogramDataPoint) []tracking.MetricIdentity { + numBuckets := dp.BucketCounts().Len() + bucketIdentities := make([]tracking.MetricIdentity, numBuckets) + + for index := 0; index < numBuckets; index++ { + bucketIdentity := baseIdentity + bucketIdentity.MetricField = fmt.Sprintf("bucket_%d", index) + bucketIdentities[index] = bucketIdentity + } + + return bucketIdentities +} + func (ctdp *cumulativeToDeltaProcessor) shutdown(context.Context) error { ctdp.cancelFunc() return nil @@ -164,7 +169,7 @@ func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metricName string) b (ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metricName)) } -func (ctdp *cumulativeToDeltaProcessor) convertFloatValue(id tracking.MetricIdentity, dp pmetric.HistogramDataPoint, value float64) (tracking.DeltaValue, bool) { +func (ctdp *cumulativeToDeltaProcessor) convertHistogramFloatValue(id tracking.MetricIdentity, dp pmetric.HistogramDataPoint, value float64) (tracking.DeltaValue, bool) { id.StartTimestamp = dp.StartTimestamp() id.Attributes = dp.Attributes() trackingPoint := tracking.MetricPoint{ @@ -177,7 +182,7 @@ func (ctdp *cumulativeToDeltaProcessor) convertFloatValue(id tracking.MetricIden return ctdp.deltaCalculator.Convert(trackingPoint) } -func (ctdp *cumulativeToDeltaProcessor) convertIntValue(id tracking.MetricIdentity, dp pmetric.HistogramDataPoint, value int64) (tracking.DeltaValue, bool) { +func (ctdp *cumulativeToDeltaProcessor) convertHistogramIntValue(id tracking.MetricIdentity, dp pmetric.HistogramDataPoint, value int64) (tracking.DeltaValue, bool) { id.StartTimestamp = dp.StartTimestamp() id.Attributes = dp.Attributes() trackingPoint := tracking.MetricPoint{ @@ -238,21 +243,21 @@ func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in interface{ if dps, ok := in.(pmetric.HistogramDataPointSlice); ok { dps.RemoveIf(func(dp pmetric.HistogramDataPoint) bool { countID := baseIdentities.CountIdentity - countDelta, countValid := ctdp.convertIntValue(countId, dp, int64(dp.Count())) + countDelta, countValid := ctdp.convertHistogramIntValue(countID, dp, int64(dp.Count())) hasSum := dp.HasSum() && !math.IsNaN(dp.Sum()) sumDelta, sumValid := tracking.DeltaValue{}, true if hasSum { sumID := baseIdentities.SumIdentity - sumDelta, sumValid = ctdp.convertFloatValue(sumId, dp, dp.Sum()) + sumDelta, sumValid = ctdp.convertHistogramFloatValue(sumID, dp, dp.Sum()) } bucketsValid := true rawBucketCounts := dp.BucketCounts().AsRaw() for index := 0; index < len(rawBucketCounts); index++ { bucketID := baseIdentities.BucketIdentities[index] - bucketDelta, bucketValid := ctdp.convertIntValue(bucketId, dp, int64(rawBucketCounts[index])) + bucketDelta, bucketValid := ctdp.convertHistogramIntValue(bucketID, dp, int64(rawBucketCounts[index])) rawBucketCounts[index] = uint64(bucketDelta.IntValue) bucketsValid = bucketsValid && bucketValid } From 0a9432c205f829ec3c7109984f3d0d740b975245 Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Wed, 20 Jul 2022 16:00:46 +0100 Subject: [PATCH 10/14] Add feature gate for histogram support --- .../cumulativetodeltaprocessor/processor.go | 37 +++++++++---- .../processor_test.go | 52 ++++++++++++++++--- 2 files changed, 74 insertions(+), 15 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 71e574d01112..fc35ecfb36e9 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -21,27 +21,42 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/service/featuregate" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/internal/tracking" ) +const enableHistogramSupportGateID = "processor.cumulativetodeltaprocessor.EnableHistogramSupport" + +var enableHistogramSupportGate = featuregate.Gate{ + ID: enableHistogramSupportGateID, + Enabled: false, + Description: "wip", +} + +func init() { + featuregate.GetRegistry().MustRegister(enableHistogramSupportGate) +} + type cumulativeToDeltaProcessor struct { - metrics map[string]struct{} - includeFS filterset.FilterSet - excludeFS filterset.FilterSet - logger *zap.Logger - deltaCalculator *tracking.MetricTracker - cancelFunc context.CancelFunc + metrics map[string]struct{} + includeFS filterset.FilterSet + excludeFS filterset.FilterSet + logger *zap.Logger + deltaCalculator *tracking.MetricTracker + cancelFunc context.CancelFunc + histogramSupportEnabled bool } func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor { ctx, cancel := context.WithCancel(context.Background()) p := &cumulativeToDeltaProcessor{ - logger: logger, - deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness), - cancelFunc: cancel, + logger: logger, + deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness), + cancelFunc: cancel, + histogramSupportEnabled: featuregate.GetRegistry().IsEnabled(enableHistogramSupportGateID), } if len(config.Metrics) > 0 { p.logger.Warn("The 'metrics' configuration is deprecated. Use 'include'/'exclude' instead.") @@ -94,6 +109,10 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme ms.SetAggregationTemporality(pmetric.MetricAggregationTemporalityDelta) return ms.DataPoints().Len() == 0 case pmetric.MetricDataTypeHistogram: + if !ctdp.histogramSupportEnabled { + return false + } + ms := m.Histogram() if ms.AggregationTemporality() != pmetric.MetricAggregationTemporalityCumulative { return false diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index 98f4aba8bbea..79ca6cbfcd7a 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/service/featuregate" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset" @@ -48,12 +49,13 @@ type testHistogramMetric struct { } type cumulativeToDeltaTest struct { - name string - metrics []string - include MatchMetrics - exclude MatchMetrics - inMetrics pmetric.Metrics - outMetrics pmetric.Metrics + name string + metrics []string + include MatchMetrics + exclude MatchMetrics + inMetrics pmetric.Metrics + outMetrics pmetric.Metrics + histogramSupportEnabled bool } var ( @@ -204,6 +206,7 @@ var ( }, isCumulative: []bool{false, true}, }), + histogramSupportEnabled: true, }, { name: "cumulative_to_delta_histogram_nan_sum", @@ -234,6 +237,7 @@ var ( }, isCumulative: []bool{false, true}, }), + histogramSupportEnabled: true, }, { name: "cumulative_to_delta_histogram_one_positive_without_sums", @@ -264,6 +268,38 @@ var ( }, isCumulative: []bool{false, true}, }), + histogramSupportEnabled: true, + }, + { + name: "cumulative_to_delta_histogram_ignored_without_feature", + include: MatchMetrics{ + Metrics: []string{"metric_1"}, + Config: filterset.Config{ + MatchType: "strict", + RegexpConfig: nil, + }, + }, + inMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 200, 500}, {4}}, + metricSums: [][]float64{{100, 200, 500}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{true, true}, + }), + outMetrics: generateTestHistogramMetrics(testHistogramMetric{ + metricNames: []string{"metric_1", "metric_2"}, + metricCounts: [][]uint64{{100, 200, 500}, {4}}, + metricSums: [][]float64{{100, 200, 500}, {4}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + {{4, 4, 4}}, + }, + isCumulative: []bool{true, true}, + }), + histogramSupportEnabled: false, }, } ) @@ -271,6 +307,10 @@ var ( func TestCumulativeToDeltaProcessor(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { + registry := featuregate.GetRegistry() + registry.Apply(map[string]bool{ + enableHistogramSupportGateID: test.histogramSupportEnabled, + }) // next stores the results of the filter metric processor next := new(consumertest.MetricsSink) cfg := &Config{ From 34835d1149cb92642f2c39cdde6a1e546fe2345d Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Wed, 20 Jul 2022 16:02:25 +0100 Subject: [PATCH 11/14] Update processor/cumulativetodeltaprocessor/README.md Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> --- processor/cumulativetodeltaprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/cumulativetodeltaprocessor/README.md b/processor/cumulativetodeltaprocessor/README.md index c7ebee1231b4..a0e0c0c9c79b 100644 --- a/processor/cumulativetodeltaprocessor/README.md +++ b/processor/cumulativetodeltaprocessor/README.md @@ -9,7 +9,7 @@ ## Description -The cumulative to delta processor (`cumulativetodeltaprocessor`) converts monotonic, cumulative sum and histogram metrics to monotonic, delta metrics. Non-monotonic sums are excluded. +The cumulative to delta processor (`cumulativetodeltaprocessor`) converts monotonic, cumulative sum and histogram metrics to monotonic, delta metrics. Non-monotonic sums and exponential histograms are excluded. ## Configuration From c4a9d6fe3eefb92184121eef39f31e24cf632dff Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Thu, 21 Jul 2022 10:12:44 +0100 Subject: [PATCH 12/14] Fix formatting --- .../cumulativetodeltaprocessor/internal/tracking/identity.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processor/cumulativetodeltaprocessor/internal/tracking/identity.go b/processor/cumulativetodeltaprocessor/internal/tracking/identity.go index ffd9bc2a367b..0c7caa7cd332 100644 --- a/processor/cumulativetodeltaprocessor/internal/tracking/identity.go +++ b/processor/cumulativetodeltaprocessor/internal/tracking/identity.go @@ -36,8 +36,8 @@ type MetricIdentity struct { } type HistogramIdentities struct { - CountIdentity MetricIdentity - SumIdentity MetricIdentity + CountIdentity MetricIdentity + SumIdentity MetricIdentity BucketIdentities []MetricIdentity } From cb5146b4a2aa1344d001c1e1b8b2cf264a148110 Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Thu, 21 Jul 2022 15:42:39 +0100 Subject: [PATCH 13/14] Update README to document new feature flag. --- processor/cumulativetodeltaprocessor/README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/processor/cumulativetodeltaprocessor/README.md b/processor/cumulativetodeltaprocessor/README.md index a0e0c0c9c79b..4173d1d0947c 100644 --- a/processor/cumulativetodeltaprocessor/README.md +++ b/processor/cumulativetodeltaprocessor/README.md @@ -11,6 +11,8 @@ The cumulative to delta processor (`cumulativetodeltaprocessor`) converts monotonic, cumulative sum and histogram metrics to monotonic, delta metrics. Non-monotonic sums and exponential histograms are excluded. +Histogram conversion is currently behind a [feature gate](#feature-gate-configurations) and will only be converted if the feature flag is set. + ## Configuration Configuration is specified through a list of metrics. The processor uses metric names to identify a set of cumulative metrics and converts them from cumulative to delta. @@ -76,6 +78,14 @@ processors: # convert all cumulative sum or histogram metrics to delta ``` +## Feature gate configurations + +The **processor.cumulativetodeltaprocessor.EnableHistogramSupport** feature flag controls whether cumulative histograms will be converted to delta temporality or not. It is disabled by default, meaning histograms will not be modified by the processor. + +Pass `--feature-gates processor.cumulativetodeltaprocessor.EnableHistogramSupport` to enable this feature. + +This feature flag will be removed, and histograms will be enabled by default in release v0.60.0, September 2022. + ## Warnings - [Statefulness](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/standard-warnings.md#statefulness): The cumulativetodelta processor's calculates delta by remembering the previous value of a metric. For this reason, the calculation is only accurate if the metric is continuously sent to the same instance of the collector. As a result, the cumulativetodelta processor may not work as expected if used in a deployment of multiple collectors. When using this processor it is best for the data source to being sending data to a single collector. From f14a200bdbdaea6eea2fa07f914aafd1a9e5de40 Mon Sep 17 00:00:00 2001 From: Vi <1149443+mistodon@users.noreply.github.com> Date: Thu, 21 Jul 2022 16:33:24 +0100 Subject: [PATCH 14/14] Update processor/cumulativetodeltaprocessor/README.md Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> --- processor/cumulativetodeltaprocessor/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor/cumulativetodeltaprocessor/README.md b/processor/cumulativetodeltaprocessor/README.md index 4173d1d0947c..900be174b10b 100644 --- a/processor/cumulativetodeltaprocessor/README.md +++ b/processor/cumulativetodeltaprocessor/README.md @@ -80,7 +80,7 @@ processors: ## Feature gate configurations -The **processor.cumulativetodeltaprocessor.EnableHistogramSupport** feature flag controls whether cumulative histograms will be converted to delta temporality or not. It is disabled by default, meaning histograms will not be modified by the processor. +The **processor.cumulativetodeltaprocessor.EnableHistogramSupport** feature flag controls whether cumulative histograms delta conversion is supported or not. It is disabled by default, meaning histograms will not be modified by the processor. If enabled, which histograms are converted is still subjected to the processor's include/exclude filtering. Pass `--feature-gates processor.cumulativetodeltaprocessor.EnableHistogramSupport` to enable this feature.