diff --git a/CHANGELOG.md b/CHANGELOG.md index a2f5dafcca1b8..6e040c8bd4a19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ Here is the list with the changes that were produced since the previous release. * [5888](https://github.com/grafana/loki/pull/5888) **Papawy** Fix common config net interface name overwritten by ring common config ##### Changes +* [6361](https://github.com/grafana/loki/pull/6361) **chaudum**: Sum values in unwrapped rate aggregation instead of treating them as counter. * [6042](https://github.com/grafana/loki/pull/6042) **slim-bean**: Add a new configuration to allow fudging of ingested timestamps to guarantee sort order of duplicate timestamps at query time. * [6120](https://github.com/grafana/loki/pull/6120) **KMiller-Grafana**: Rename configuration parameter fudge_duplicate_timestamp to be increment_duplicate_timestamp. * [5777](https://github.com/grafana/loki/pull/5777) **tatchiuleung**: storage: make Azure blobID chunk delimiter configurable diff --git a/docs/sources/logql/metric_queries.md b/docs/sources/logql/metric_queries.md index c34da49b67e78..e5ad7315db170 100644 --- a/docs/sources/logql/metric_queries.md +++ b/docs/sources/logql/metric_queries.md @@ -68,7 +68,7 @@ We currently support the functions: Supported function for operating over unwrapped ranges are: -- `rate(unwrapped-range)`: calculates per second rate of all values in the specified interval. +- `rate(unwrapped-range)`: calculates per second rate of the sum of all values in the specified interval. - `sum_over_time(unwrapped-range)`: the sum of all values in the specified interval. - `avg_over_time(unwrapped-range)`: the average value of all points in the specified interval. - `max_over_time(unwrapped-range)`: the maximum value of all points in the specified interval. diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 79a36a150fb88..689cb04ab7828 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -49,15 +49,36 @@ func TestEngine_LogsRateUnwrap(t *testing.T) { expected interface{} }{ { - `rate({app="foo"} | unwrap foo [30s])`, time.Unix(60, 0), logproto.FORWARD, 10, + `rate({app="foo"} | unwrap foo [30s])`, + time.Unix(60, 0), + logproto.FORWARD, + 10, + [][]logproto.Series{ + // 30s range the lower bound of the range is not inclusive only 15 samples will make it 60 included + {newSeries(testSize, offset(46, constantValue(1)), `{app="foo"}`)}, + }, + []SelectSampleParams{ + {&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}}, + }, + // SUM(n=47, 61, 1) = 15 + // 15 / 30 = 0.5 + promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.5}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}}, + }, + { + `rate({app="foo"} | unwrap foo [30s])`, + time.Unix(60, 0), + logproto.FORWARD, + 10, [][]logproto.Series{ // 30s range the lower bound of the range is not inclusive only 15 samples will make it 60 included - {newSeries(testSize, offset(46, incValue(10)), `{app="foo"}`)}, + {newSeries(testSize, offset(46, incValue(1)), `{app="foo"}`)}, }, []SelectSampleParams{ {&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}}, }, - promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.46666766666666665}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}}, + // SUM(n=47, 61, n) = 810 + // 810 / 30 = 27 + promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 27}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}}, }, } { test := test @@ -150,7 +171,9 @@ func TestEngine_LogsInstantQuery(t *testing.T) { []SelectSampleParams{ {&logproto.SampleQueryRequest{Start: time.Unix(30, 0), End: time.Unix(60, 0), Selector: `rate({app="foo"} | unwrap foo[30s])`}}, }, - promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 0.0}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}}, + // SUM(n=46, 61, 2) = 30 + // 30 / 30 = 1 + promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 1.0}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}}, }, { `count_over_time({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), logproto.BACKWARD, 10, @@ -1287,7 +1310,10 @@ func TestEngine_RangeQuery(t *testing.T) { { `rate(({app=~"foo|bar"} |~".+bar" | unwrap bar)[1m])`, time.Unix(60, 0), time.Unix(180, 0), 30 * time.Second, 0, logproto.FORWARD, 100, [][]logproto.Series{ - {newSeries(testSize, factor(10, constantValue(2)), `{app="foo"}`), newSeries(testSize, factor(5, constantValue(2)), `{app="bar"}`)}, + { + newSeries(testSize, factor(10, constantValue(2)), `{app="foo"}`), + newSeries(testSize, factor(5, constantValue(2)), `{app="bar"}`), + }, }, []SelectSampleParams{ {&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(180, 0), Selector: `rate({app=~"foo|bar"}|~".+bar"|unwrap bar[1m])`}}, @@ -1295,11 +1321,11 @@ func TestEngine_RangeQuery(t *testing.T) { promql.Matrix{ promql.Series{ Metric: labels.Labels{{Name: "app", Value: "bar"}}, - Points: []promql.Point{{T: 60 * 1000, V: 0.0}, {T: 90 * 1000, V: 0.0}, {T: 120 * 1000, V: 0.0}, {T: 150 * 1000, V: 0.0}, {T: 180 * 1000, V: 0.0}}, + Points: []promql.Point{{T: 60 * 1000, V: 0.4}, {T: 90 * 1000, V: 0.4}, {T: 120 * 1000, V: 0.4}, {T: 150 * 1000, V: 0.4}, {T: 180 * 1000, V: 0.4}}, }, promql.Series{ Metric: labels.Labels{{Name: "app", Value: "foo"}}, - Points: []promql.Point{{T: 60 * 1000, V: 0.0}, {T: 90 * 1000, V: 0.0}, {T: 120 * 1000, V: 0.0}, {T: 150 * 1000, V: 0.0}, {T: 180 * 1000, V: 0.0}}, + Points: []promql.Point{{T: 60 * 1000, V: 0.2}, {T: 90 * 1000, V: 0.2}, {T: 120 * 1000, V: 0.2}, {T: 150 * 1000, V: 0.2}, {T: 180 * 1000, V: 0.2}}, }, }, }, diff --git a/pkg/logql/range_vector.go b/pkg/logql/range_vector.go index 713629a45ffcf..da0168ec20f7d 100644 --- a/pkg/logql/range_vector.go +++ b/pkg/logql/range_vector.go @@ -218,92 +218,19 @@ func aggregator(r *syntax.RangeAggregationExpr) (RangeVectorAggregator, error) { } } -// rateLogs calculates the per-second rate of log lines. +// rateLogs calculates the per-second rate of log lines or values extracted +// from log lines func rateLogs(selRange time.Duration, computeValues bool) func(samples []promql.Point) float64 { return func(samples []promql.Point) float64 { if !computeValues { return float64(len(samples)) / selRange.Seconds() } - return extrapolatedRate(samples, selRange, true, true) - } -} - -// extrapolatedRate function is taken from prometheus code promql/functions.go:59 -// extrapolatedRate is a utility function for rate/increase/delta. -// It calculates the rate (allowing for counter resets if isCounter is true), -// extrapolates if the first/last sample is close to the boundary, and returns -// the result as either per-second (if isRate is true) or overall. -func extrapolatedRate(samples []promql.Point, selRange time.Duration, isCounter, isRate bool) float64 { - // No sense in trying to compute a rate without at least two points. Drop - // this Vector element. - if len(samples) < 2 { - return 0 - } - var ( - rangeStart = samples[0].T - durationMilliseconds(selRange) - rangeEnd = samples[len(samples)-1].T - ) - - resultValue := samples[len(samples)-1].V - samples[0].V - if isCounter { - var lastValue float64 + var result float64 for _, sample := range samples { - if sample.V < lastValue { - resultValue += lastValue - } - lastValue = sample.V - } - } - - // Duration between first/last samples and boundary of range. - durationToStart := float64(samples[0].T-rangeStart) / 1000 - durationToEnd := float64(rangeEnd-samples[len(samples)-1].T) / 1000 - - sampledInterval := float64(samples[len(samples)-1].T-samples[0].T) / 1000 - averageDurationBetweenSamples := sampledInterval / float64(len(samples)-1) - - if isCounter && resultValue > 0 && samples[0].V >= 0 { - // Counters cannot be negative. If we have any slope at - // all (i.e. resultValue went up), we can extrapolate - // the zero point of the counter. If the duration to the - // zero point is shorter than the durationToStart, we - // take the zero point as the start of the series, - // thereby avoiding extrapolation to negative counter - // values. - durationToZero := sampledInterval * (samples[0].V / resultValue) - if durationToZero < durationToStart { - durationToStart = durationToZero + result += sample.V } + return result / selRange.Seconds() } - - // If the first/last samples are close to the boundaries of the range, - // extrapolate the result. This is as we expect that another sample - // will exist given the spacing between samples we've seen thus far, - // with an allowance for noise. - extrapolationThreshold := averageDurationBetweenSamples * 1.1 - extrapolateToInterval := sampledInterval - - if durationToStart < extrapolationThreshold { - extrapolateToInterval += durationToStart - } else { - extrapolateToInterval += averageDurationBetweenSamples / 2 - } - if durationToEnd < extrapolationThreshold { - extrapolateToInterval += durationToEnd - } else { - extrapolateToInterval += averageDurationBetweenSamples / 2 - } - resultValue = resultValue * (extrapolateToInterval / sampledInterval) - if isRate { - seconds := selRange.Seconds() - resultValue = resultValue / seconds - } - - return resultValue -} - -func durationMilliseconds(d time.Duration) int64 { - return int64(d / (time.Millisecond / time.Nanosecond)) } // rateLogBytes calculates the per-second rate of log bytes.