Skip to content

Commit

Permalink
Sum values in unwrapped rate aggregation instead of treating them as …
Browse files Browse the repository at this point in the history
…counter (#6361) (#6555)

* Revert unwrapped rate aggregation to previous implementation

This PR reverts the implementation done in #5013 to the original
implementation that sums the extracted values from the log lines instead
of treating them like a Prometheus counter metric.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Move changelog entry

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Remove unused/dead code

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Clean changelog

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
(cherry picked from commit b315ed0)

Co-authored-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
grafanabot and chaudum authored Jun 30, 2022
1 parent 2f2a4d1 commit 1facce1
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 86 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Here is the list with the changes that were produced since the previous release.
* [5888](https://github.com/grafana/loki/pull/5888) **Papawy** Fix common config net interface name overwritten by ring common config

##### Changes
* [6361](https://github.com/grafana/loki/pull/6361) **chaudum**: Sum values in unwrapped rate aggregation instead of treating them as counter.
* [6042](https://github.com/grafana/loki/pull/6042) **slim-bean**: Add a new configuration to allow fudging of ingested timestamps to guarantee sort order of duplicate timestamps at query time.
* [6120](https://github.com/grafana/loki/pull/6120) **KMiller-Grafana**: Rename configuration parameter fudge_duplicate_timestamp to be increment_duplicate_timestamp.
* [5777](https://github.com/grafana/loki/pull/5777) **tatchiuleung**: storage: make Azure blobID chunk delimiter configurable
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/logql/metric_queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ We currently support the functions:
Supported function for operating over unwrapped ranges are:
- `rate(unwrapped-range)`: calculates per second rate of all values in the specified interval.
- `rate(unwrapped-range)`: calculates per second rate of the sum of all values in the specified interval.
- `sum_over_time(unwrapped-range)`: the sum of all values in the specified interval.
- `avg_over_time(unwrapped-range)`: the average value of all points in the specified interval.
- `max_over_time(unwrapped-range)`: the maximum value of all points in the specified interval.
Expand Down
40 changes: 33 additions & 7 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,36 @@ func TestEngine_LogsRateUnwrap(t *testing.T) {
expected interface{}
}{
{
`rate({app="foo"} | unwrap foo [30s])`, time.Unix(60, 0), logproto.FORWARD, 10,
`rate({app="foo"} | unwrap foo [30s])`,
time.Unix(60, 0),
logproto.FORWARD,
10,
[][]logproto.Series{
// 30s range the lower bound of the range is not inclusive only 15 samples will make it 60 included
{newSeries(testSize, offset(46, constantValue(1)), `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}},
},
// SUM(n=47, 61, 1) = 15
// 15 / 30 = 0.5
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.5}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`rate({app="foo"} | unwrap foo [30s])`,
time.Unix(60, 0),
logproto.FORWARD,
10,
[][]logproto.Series{
// 30s range the lower bound of the range is not inclusive only 15 samples will make it 60 included
{newSeries(testSize, offset(46, incValue(10)), `{app="foo"}`)},
{newSeries(testSize, offset(46, incValue(1)), `{app="foo"}`)},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}},
},
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.46666766666666665}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
// SUM(n=47, 61, n) = 810
// 810 / 30 = 27
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 27}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
} {
test := test
Expand Down Expand Up @@ -150,7 +171,9 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}},
},
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.0}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
// SUM(n=46, 61, 2) = 30
// 30 / 30 = 1
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 1.0}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`count_over_time({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), logproto.BACKWARD, 10,
Expand Down Expand Up @@ -1287,19 +1310,22 @@ func TestEngine_RangeQuery(t *testing.T) {
{
`rate(({app=~"foo|bar"} |~".+bar" | unwrap bar)[1m])`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100,
[][]logproto.Series{
{newSeries(testSize, factor(10, constantValue(2)), `{app="foo"}`), newSeries(testSize, factor(5, constantValue(2)), `{app="bar"}`)},
{
newSeries(testSize, factor(10, constantValue(2)), `{app="foo"}`),
newSeries(testSize, factor(5, constantValue(2)), `{app="bar"}`),
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"|unwrap bar[1m])`}},
},
promql.Matrix{
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "bar"}},
Points: []promql.Point{{T: 60 * 1000, V: 0.0}, {T: 90 * 1000, V: 0.0}, {T: 120 * 1000, V: 0.0}, {T: 150 * 1000, V: 0.0}, {T: 180 * 1000, V: 0.0}},
Points: []promql.Point{{T: 60 * 1000, V: 0.4}, {T: 90 * 1000, V: 0.4}, {T: 120 * 1000, V: 0.4}, {T: 150 * 1000, V: 0.4}, {T: 180 * 1000, V: 0.4}},
},
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "foo"}},
Points: []promql.Point{{T: 60 * 1000, V: 0.0}, {T: 90 * 1000, V: 0.0}, {T: 120 * 1000, V: 0.0}, {T: 150 * 1000, V: 0.0}, {T: 180 * 1000, V: 0.0}},
Points: []promql.Point{{T: 60 * 1000, V: 0.2}, {T: 90 * 1000, V: 0.2}, {T: 120 * 1000, V: 0.2}, {T: 150 * 1000, V: 0.2}, {T: 180 * 1000, V: 0.2}},
},
},
},
Expand Down
83 changes: 5 additions & 78 deletions pkg/logql/range_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,92 +218,19 @@ func aggregator(r *syntax.RangeAggregationExpr) (RangeVectorAggregator, error) {
}
}

// rateLogs calculates the per-second rate of log lines.
// rateLogs calculates the per-second rate of log lines or values extracted
// from log lines
func rateLogs(selRange time.Duration, computeValues bool) func(samples []promql.Point) float64 {
return func(samples []promql.Point) float64 {
if !computeValues {
return float64(len(samples)) / selRange.Seconds()
}
return extrapolatedRate(samples, selRange, true, true)
}
}

// extrapolatedRate function is taken from prometheus code promql/functions.go:59
// extrapolatedRate is a utility function for rate/increase/delta.
// It calculates the rate (allowing for counter resets if isCounter is true),
// extrapolates if the first/last sample is close to the boundary, and returns
// the result as either per-second (if isRate is true) or overall.
func extrapolatedRate(samples []promql.Point, selRange time.Duration, isCounter, isRate bool) float64 {
// No sense in trying to compute a rate without at least two points. Drop
// this Vector element.
if len(samples) < 2 {
return 0
}
var (
rangeStart = samples[0].T - durationMilliseconds(selRange)
rangeEnd = samples[len(samples)-1].T
)

resultValue := samples[len(samples)-1].V - samples[0].V
if isCounter {
var lastValue float64
var result float64
for _, sample := range samples {
if sample.V < lastValue {
resultValue += lastValue
}
lastValue = sample.V
}
}

// Duration between first/last samples and boundary of range.
durationToStart := float64(samples[0].T-rangeStart) / 1000
durationToEnd := float64(rangeEnd-samples[len(samples)-1].T) / 1000

sampledInterval := float64(samples[len(samples)-1].T-samples[0].T) / 1000
averageDurationBetweenSamples := sampledInterval / float64(len(samples)-1)

if isCounter && resultValue > 0 && samples[0].V >= 0 {
// Counters cannot be negative. If we have any slope at
// all (i.e. resultValue went up), we can extrapolate
// the zero point of the counter. If the duration to the
// zero point is shorter than the durationToStart, we
// take the zero point as the start of the series,
// thereby avoiding extrapolation to negative counter
// values.
durationToZero := sampledInterval * (samples[0].V / resultValue)
if durationToZero < durationToStart {
durationToStart = durationToZero
result += sample.V
}
return result / selRange.Seconds()
}

// If the first/last samples are close to the boundaries of the range,
// extrapolate the result. This is as we expect that another sample
// will exist given the spacing between samples we've seen thus far,
// with an allowance for noise.
extrapolationThreshold := averageDurationBetweenSamples * 1.1
extrapolateToInterval := sampledInterval

if durationToStart < extrapolationThreshold {
extrapolateToInterval += durationToStart
} else {
extrapolateToInterval += averageDurationBetweenSamples / 2
}
if durationToEnd < extrapolationThreshold {
extrapolateToInterval += durationToEnd
} else {
extrapolateToInterval += averageDurationBetweenSamples / 2
}
resultValue = resultValue * (extrapolateToInterval / sampledInterval)
if isRate {
seconds := selRange.Seconds()
resultValue = resultValue / seconds
}

return resultValue
}

func durationMilliseconds(d time.Duration) int64 {
return int64(d / (time.Millisecond / time.Nanosecond))
}

// rateLogBytes calculates the per-second rate of log bytes.
Expand Down

0 comments on commit 1facce1

Please sign in to comment.