Skip to content

Commit

Permalink
[deltatocumulativeprocessor] Introduce an upper bound for exp histogr…
Browse files Browse the repository at this point in the history
…am buckets (open-telemetry#36874)

#### Description
This PR introduces a limit on the maximum number of exponential
histogram buckets within the deltatocumulativeprocessor. Previously,
when merging delta metrics into cumulative metrics, the resulting
exponential histograms could grow very large, potentially causing
excessive memory usage and processing overhead. By capping the number of
buckets at 160 and dynamically downscaling histograms when necessary,
this change ensures that the processor remains efficient and stable even
when handling large, merged exponential histograms.

#### Link to tracking issue
Fixes open-telemetry#33277

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Added unit tests for edge cases.

#### Documentation
Updated changelog.
  • Loading branch information
euroelessar authored Jan 16, 2025
1 parent 0788185 commit 37c8044
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 6 deletions.
27 changes: 27 additions & 0 deletions .chloggen/deltatocumulative-cap-exphisto.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: deltatocumulativeprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: cap the number of exponential histogram buckets to 160

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33277]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
17 changes: 17 additions & 0 deletions processor/deltatocumulativeprocessor/internal/data/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice"
)

var maxBuckets = 160

func (dp Number) Add(in Number) Number {
switch in.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
Expand Down Expand Up @@ -76,6 +78,21 @@ func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram {
hi.SetScale(lo.Scale())
}

// Downscale if an expected number of buckets after the merge is too large.
from := expo.Scale(dp.Scale())
to := max(
expo.Limit(maxBuckets, from, dp.Positive(), in.Positive()),
expo.Limit(maxBuckets, from, dp.Negative(), in.Negative()),
)
if from != to {
expo.Downscale(dp.Positive(), from, to)
expo.Downscale(dp.Negative(), from, to)
expo.Downscale(in.Positive(), from, to)
expo.Downscale(in.Negative(), from, to)
dp.SetScale(int32(to))
in.SetScale(int32(to))
}

if dp.ZeroThreshold() != in.ZeroThreshold() {
hi, lo := expo.HiLo(dp, in, H.ZeroThreshold)
expo.WidenZero(lo.DataPoint, hi.ZeroThreshold())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ func Merge(arel, brel Buckets) {
lo := min(a.Lower(), b.Lower())
up := max(a.Upper(), b.Upper())

// Skip leading and trailing zeros to reduce number of buckets.
// As we cap number of buckets this allows us to have higher scale.
for lo < up && a.Abs(lo) == 0 && b.Abs(lo) == 0 {
lo++
}
for lo < up-1 && a.Abs(up-1) == 0 && b.Abs(up-1) == 0 {
up--
}

size := up - lo

counts := pcommon.NewUInt64Slice()
Expand Down
28 changes: 28 additions & 0 deletions processor/deltatocumulativeprocessor/internal/data/expo/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package expo // import "github.com/open-telemetry/opentelemetry-collector-contri
import (
"fmt"
"math"

"go.opentelemetry.io/collector/pdata/pmetric"
)

type Scale int32
Expand Down Expand Up @@ -113,3 +115,29 @@ func Collapse(bs Buckets) {
counts.SetAt(i, 0)
}
}

// Limit returns a target Scale that when be downscaled to,
// the total bucket count after [Merge] never exceeds maxBuckets.
func Limit(maxBuckets int, scale Scale, arel, brel pmetric.ExponentialHistogramDataPointBuckets) Scale {
a, b := Abs(arel), Abs(brel)

lo := min(a.Lower(), b.Lower())
up := max(a.Upper(), b.Upper())

// Skip leading and trailing zeros.
for lo < up && a.Abs(lo) == 0 && b.Abs(lo) == 0 {
lo++
}
for lo < up-1 && a.Abs(up-1) == 0 && b.Abs(up-1) == 0 {
up--
}

// Keep downscaling until the number of buckets is within the limit.
for up-lo > maxBuckets {
lo /= 2
up /= 2
scale--
}

return scale
}
98 changes: 93 additions & 5 deletions processor/deltatocumulativeprocessor/internal/data/expo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func TestExpoAdd(t *testing.T) {
type bins = expotest.Bins
obs0 := expotest.Observe0

prevMaxBuckets := maxBuckets
maxBuckets = 8
defer func() { maxBuckets = prevMaxBuckets }()

cases := []struct {
name string
dp, in expdp
Expand All @@ -31,7 +35,7 @@ func TestExpoAdd(t *testing.T) {
name: "noop",
dp: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0},
in: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0},
want: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0},
want: expdp{PosNeg: rawbs(nil, 5), Count: 0},
}, {
name: "simple",
dp: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0},
Expand Down Expand Up @@ -61,17 +65,17 @@ func TestExpoAdd(t *testing.T) {
name: "zero/count",
dp: expdp{PosNeg: bins{0, 1, 2}.Into(), Zt: 0, Zc: 3, Count: 5},
in: expdp{PosNeg: bins{0, 1, 0}.Into(), Zt: 0, Zc: 2, Count: 3},
want: expdp{PosNeg: bins{0, 2, 2}.Into(), Zt: 0, Zc: 5, Count: 8},
want: expdp{PosNeg: bins{ø, 2, 2, ø, ø, ø, ø, ø}.Into(), Zt: 0, Zc: 5, Count: 8},
}, {
name: "zero/diff",
dp: expdp{PosNeg: bins{ø, ø, 0, 1, 1, 1}.Into(), Zt: 0.0, Zc: 2},
in: expdp{PosNeg: bins{ø, ø, ø, ø, 1, 1}.Into(), Zt: 2.0, Zc: 2},
want: expdp{PosNeg: bins{ø, ø, ø, ø, 2, 2}.Into(), Zt: 2.0, Zc: 4 + 2*1},
want: expdp{PosNeg: bins{ø, ø, ø, ø, 2, 2, ø, ø}.Into(), Zt: 2.0, Zc: 4 + 2*1},
}, {
name: "zero/subzero",
dp: expdp{PosNeg: bins{ø, 1, 1, 1, 1, 1}.Into(), Zt: 0.2, Zc: 2},
in: expdp{PosNeg: bins{ø, ø, 1, 1, 1, 1}.Into(), Zt: 0.3, Zc: 2},
want: expdp{PosNeg: bins{ø, ø, 2, 2, 2, 2}.Into(), Zt: 0.5, Zc: 4 + 2*1},
want: expdp{PosNeg: bins{ø, ø, 2, 2, 2, 2, ø, ø}.Into(), Zt: 0.5, Zc: 4 + 2*1},
}, {
name: "negative-offset",
dp: expdp{PosNeg: rawbs([]uint64{ /* */ 1, 2}, -2)},
Expand All @@ -85,9 +89,93 @@ func TestExpoAdd(t *testing.T) {
bs := pmetric.NewExponentialHistogramDataPointBuckets()
expotest.ObserveInto(bs, expo.Scale(0), 1, 2, 3, 4)
expotest.ObserveInto(bs, expo.Scale(0), 4, 3, 2, 1)
bs.BucketCounts().Append([]uint64{0, 0}...) // rescaling leaves zeroed memory. this is expected
return bs
}()},
}, {
name: "scale/no_downscale_within_limit",
dp: expdp{
Scale: 0,
PosNeg: bins{1, 1, 1, 1, 1, 1, 1, 1}.Into(),
Count: 8,
},
in: expdp{
Scale: 0,
PosNeg: bins{2, 2, 2, 2, 2, 2, 2, 2}.Into(),
Count: 16,
},
want: expdp{
Scale: 0,
PosNeg: bins{3, 3, 3, 3, 3, 3, 3, 3}.Into(),
Count: 24,
},
}, {
name: "scale/downscale_once_exceeds_limit",
dp: expdp{
Scale: 0,
PosNeg: rawbs([]uint64{1, 1, 1, 1, 1, 1, 1, 1}, 0),
Count: 8,
},
in: expdp{
Scale: 0,
PosNeg: rawbs([]uint64{2, 2, 2, 2, 2, 2, 2, 2}, 6),
Count: 16,
},
want: expdp{
Scale: -1,
PosNeg: rawbs([]uint64{2, 2, 2, 6, 4, 4, 4}, 0),
Count: 24,
},
}, {
name: "scale/downscale_multiple_times_until_within_limit",
dp: expdp{
Scale: 0,
PosNeg: rawbs([]uint64{1, 1, 1, 1, 1, 1, 1, 1}, -6),
Count: 8,
},
in: expdp{
Scale: 0,
PosNeg: rawbs([]uint64{2, 2, 2, 2, 2, 2, 2, 2}, 6),
Count: 16,
},
want: expdp{
Scale: -2,
PosNeg: rawbs([]uint64{2, 4, 2, 4, 8, 4}, -2),
Count: 24,
},
}, {
name: "scale/ignore_leading_trailing_zeros_in_bucket_count",
dp: expdp{
Scale: 0,
PosNeg: rawbs([]uint64{0, 0, 1, 5, 5, 1, 0, 0}, -2),
Count: 12,
},
in: expdp{
Scale: 0,
PosNeg: rawbs([]uint64{0, 2, 2, 3, 3, 2, 2, 0}, 0),
Count: 14,
},
want: expdp{
Scale: 0,
PosNeg: rawbs([]uint64{1, 7, 7, 4, 3, 2, 2}, 0),
Count: 26,
},
}, {
name: "scale/downscale_with_leading_trailing_zeros",
dp: expdp{
Scale: 0,
PosNeg: rawbs([]uint64{0, 0, 1, 10, 10, 1, 0, 0}, -4),
Count: 22,
},
in: expdp{
Scale: 0,
PosNeg: rawbs([]uint64{0, 0, 2, 10, 10, 2, 0, 0}, 4),
Count: 24,
},
want: expdp{
Scale: -1,
PosNeg: rawbs([]uint64{11, 11, 0, 0, 12, 12}, -1),
Count: 46,
},
}}

for _, cs := range cases {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ resourceMetrics:
scale: 1
zeroCount: 0
positive:
bucketCounts: [0,0,0,0,0]
bucketCounts: []

-- out --
resourceMetrics:
Expand Down

0 comments on commit 37c8044

Please sign in to comment.