Skip to content

Commit

Permalink
revision
Browse files Browse the repository at this point in the history
  • Loading branch information
xuan-cao-swi committed Jan 10, 2025
1 parent f152522 commit f3b8aa7
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ class ExponentialBucketHistogram # rubocop:disable Metrics/ClassLength

# The default boundaries is calculated based on default max_size and max_scale value
def initialize(
aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta), # TODO: aggregation_temporality should be renamed to collect_aggregation_temporality for clear definition
aggregation_temporality: ENV.fetch('OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE', :delta),
max_size: MAX_SIZE,
max_scale: MAX_SCALE,
record_min_max: true,
zero_threshold: 0
)
@data_points = {}
@aggregation_temporality = aggregation_temporality
@record_min_max = record_min_max
@min = Float::INFINITY
Expand All @@ -47,19 +46,19 @@ def initialize(
@mapping = new_mapping(@scale)
end

def collect(start_time, end_time)
def collect(start_time, end_time, data_points)
if @aggregation_temporality == :delta
# Set timestamps and 'move' data point values to result.
hdps = @data_points.values.map! do |hdp|
hdps = data_points.values.map! do |hdp|
hdp.start_time_unix_nano = start_time
hdp.time_unix_nano = end_time
hdp
end
@data_points.clear
data_points.clear
hdps
else
# Assume merge is done in collector side at this point
@data_points.values.map! do |hdp|
# Update timestamps and take a snapshot.
data_points.values.map! do |hdp|
hdp.start_time_unix_nano ||= start_time # Start time of a data point is from the first observation.
hdp.time_unix_nano = end_time
hdp = hdp.dup
Expand All @@ -70,16 +69,16 @@ def collect(start_time, end_time)
end
end

# rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
def update(amount, attributes)
# rubocop:disable Metrics/MethodLength
def update(amount, attributes, data_points)
# fetch or initialize the ExponentialHistogramDataPoint
hdp = @data_points.fetch(attributes) do
hdp = data_points.fetch(attributes) do
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end

@data_points[attributes] = ExponentialHistogramDataPoint.new(
data_points[attributes] = ExponentialHistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
0, # :time_unix_nano
Expand Down Expand Up @@ -118,7 +117,7 @@ def update(amount, attributes)

bucket_index = @mapping.map_to_index(amount)

is_rescaling_needed = false
rescaling_needed = false
low = high = 0

if buckets.counts == [0] # special case of empty
Expand All @@ -127,17 +126,17 @@ def update(amount, attributes)
buckets.index_base = bucket_index

elsif bucket_index < buckets.index_start && (buckets.index_end - bucket_index) >= @size
is_rescaling_needed = true
rescaling_needed = true
low = bucket_index
high = buckets.index_end

elsif bucket_index > buckets.index_end && (bucket_index - buckets.index_start) >= @size
is_rescaling_needed = true
rescaling_needed = true
low = buckets.index_start
high = bucket_index
end

if is_rescaling_needed
if rescaling_needed
scale_change = get_scale_change(low, high)
downscale(scale_change, hdp.positive, hdp.negative)
new_scale = @mapping.scale - scale_change
Expand All @@ -151,21 +150,11 @@ def update(amount, attributes)
# adjust buckets based on the bucket_index
if bucket_index < buckets.index_start
span = buckets.index_end - bucket_index

if span >= buckets.counts.size
OpenTelemetry.logger.debug "buckets need to grow to #{span + 1} from #{buckets.counts.size} (max bucket size #{@size})"
buckets.grow(span + 1, @size)
end

grow_buckets(span, buckets)
buckets.index_start = bucket_index
elsif bucket_index > buckets.index_end
span = bucket_index - buckets.index_start

if span >= buckets.counts.size
OpenTelemetry.logger.debug "buckets need to grow to #{span + 1} from #{buckets.counts.size} (max bucket size #{@size})"
buckets.grow(span + 1, @size)
end

grow_buckets(span, buckets)
buckets.index_end = bucket_index
end

Expand All @@ -175,10 +164,17 @@ def update(amount, attributes)
buckets.increment_bucket(bucket_index)
nil
end
# rubocop:enable Metrics/MethodLength, Metrics/CyclomaticComplexity
# rubocop:enable Metrics/MethodLength

private

def grow_buckets(span, buckets)
return if span < buckets.counts.size

OpenTelemetry.logger.debug "buckets need to grow to #{span + 1} from #{buckets.counts.size} (max bucket size #{@size})"
buckets.grow(span + 1, @size)
end

def new_mapping(scale)
scale <= 0 ? ExponentialHistogram::ExponentMapping.new(scale) : ExponentialHistogram::LogarithmMapping.new(scale)
end
Expand All @@ -201,8 +197,7 @@ def get_scale_change(low, high)
end

def downscale(change, positive, negative)
return if change == 0
raise 'Invalid change of scale' if change.negative?
return if change <= 0

positive.downscale(change)
negative.downscale(change)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,26 @@
zero_threshold: 0
)
end

let(:data_points) { {} }
let(:record_min_max) { true }
let(:max_size) { 20 }
let(:max_scale) { 5 }
let(:aggregation_temporality) { :delta }
# Time in nano
let(:start_time) { (Time.now.to_r * 1_000_000_000).to_i }
let(:end_time) { ((Time.now + 60).to_r * 1_000_000_000).to_i }
let(:start_time) { Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond) }
let(:end_time) { Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond) + (60 * 1_000_000_000) }

describe '#collect' do
it 'returns all the data points' do
expbh.update(1.03, {})
expbh.update(1.23, {})
expbh.update(0, {})
expbh.update(1.03, {}, data_points)
expbh.update(1.23, {}, data_points)
expbh.update(0, {}, data_points)

expbh.update(1.45, { 'foo' => 'bar' })
expbh.update(1.67, { 'foo' => 'bar' })
expbh.update(1.45, { 'foo' => 'bar' }, data_points)
expbh.update(1.67, { 'foo' => 'bar' }, data_points)

exphdps = expbh.collect(start_time, end_time)
exphdps = expbh.collect(start_time, end_time, data_points)

_(exphdps.size).must_equal(2)
_(exphdps[0].attributes).must_equal({})
Expand Down Expand Up @@ -68,7 +70,8 @@

# The corresponding Go test is TestAlternatingGrowth1 where:
# agg := NewFloat64(NewConfig(WithMaxSize(4)))
# agg is an instance of github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/histogram/structure.Histogram[float64]
# agg is an instance of (go package) github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/histogram/structure.Histogram[float64]
# agg permalink: https://github.com/lightstep/otel-launcher-go/blob/v1.34.0/lightstep/sdk/metric/aggregator/histogram/histogram.go#L34
expbh = OpenTelemetry::SDK::Metrics::Aggregation::ExponentialBucketHistogram.new(
aggregation_temporality: aggregation_temporality,
record_min_max: record_min_max,
Expand All @@ -77,11 +80,11 @@
zero_threshold: 0
)

expbh.update(2, {})
expbh.update(4, {})
expbh.update(1, {})
expbh.update(2, {}, data_points)
expbh.update(4, {}, data_points)
expbh.update(1, {}, data_points)

exphdps = expbh.collect(start_time, end_time)
exphdps = expbh.collect(start_time, end_time, data_points)

_(exphdps.size).must_equal(1)
_(exphdps[0].attributes).must_equal({})
Expand Down Expand Up @@ -110,14 +113,14 @@
zero_threshold: 0
)

expbh.update(2, {})
expbh.update(2, {})
expbh.update(2, {})
expbh.update(1, {})
expbh.update(8, {})
expbh.update(0.5, {})
expbh.update(2, {}, data_points)
expbh.update(2, {}, data_points)
expbh.update(2, {}, data_points)
expbh.update(1, {}, data_points)
expbh.update(8, {}, data_points)
expbh.update(0.5, {}, data_points)

exphdps = expbh.collect(start_time, end_time)
exphdps = expbh.collect(start_time, end_time, data_points)

_(exphdps.size).must_equal(1)
_(exphdps[0].attributes).must_equal({})
Expand Down Expand Up @@ -178,10 +181,10 @@
)

permutation.each do |value|
expbh.update(value, {})
expbh.update(value, {}, data_points)
end

exphdps = expbh.collect(start_time, end_time)
exphdps = expbh.collect(start_time, end_time, data_points)

assert_equal expected[:scale], exphdps[0].scale
assert_equal expected[:offset], exphdps[0].positive.offset
Expand All @@ -201,11 +204,11 @@
zero_threshold: 0
)

expbh.update(Float::MAX, {})
expbh.update(1, {})
expbh.update(2**-1074, {})
expbh.update(Float::MAX, {}, data_points)
expbh.update(1, {}, data_points)
expbh.update(2**-1074, {}, data_points)

exphdps = expbh.collect(start_time, end_time)
exphdps = expbh.collect(start_time, end_time, data_points)

assert_equal Float::MAX, exphdps[0].sum
assert_equal 3, exphdps[0].count
Expand All @@ -225,10 +228,10 @@
)

[1, 3, 5, 7, 9].each do |value|
expbh.update(value, {})
expbh.update(value, {}, data_points)
end

exphdps = expbh.collect(start_time, end_time)
exphdps = expbh.collect(start_time, end_time, data_points)

assert_equal 1, exphdps[0].min
assert_equal 9, exphdps[0].max
Expand All @@ -240,10 +243,10 @@
)

[-1, -3, -5, -7, -9].each do |value|
expbh.update(value, {})
expbh.update(value, {}, data_points)
end

exphdps = expbh.collect(start_time, end_time)
exphdps = expbh.collect(start_time, end_time, data_points)

assert_equal(-9, exphdps[0].min)
assert_equal(-1, exphdps[0].max)
Expand Down

0 comments on commit f3b8aa7

Please sign in to comment.