Skip to content

Commit

Permalink
Preserve interval parameter when splitting queries by time (grafana#5622
Browse files Browse the repository at this point in the history
) (grafana#5815)

* Preserve interval parameter when splitting queries by time

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add tests for interval param on Loki downstream requests

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add changelog entry for grafana#5622

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
(cherry picked from commit 95b8856)

Co-authored-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
2 people authored and splitice committed May 21, 2022
1 parent 5ef1f50 commit 0acfd6e
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 75 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
* [5051](https://github.com/grafana/loki/pull/5051) **liguozhong**: New promtail pipeline: Promtail Rate Limit stage.
* [5707](https://github.com/grafana/loki/pull/5707) **franzwong** Promtail: Rename config name limit_config to limits_config.
* [5780](https://github.com/grafana/loki/pull/5780) **simonswine**: Update alpine image to 3.15.4.
## Main
* [5622](https://github.com/grafana/loki/pull/5622) **chaudum**: Fix bug in query splitter that caused `interval` query parameter to be ignored and therefore returning more logs than expected.
* [5521](https://github.com/grafana/loki/pull/5521) **cstyan**: Move stream lag configuration to top level clients config struct and refactor stream lag metric, this resolves a bug with duplicate metric collection when a single Promtail binary is running multiple Promtail clients.
* [5568](https://github.com/grafana/loki/pull/5568) **afayngelerindbx**: Fix canary panics due to concurrent execution of `confirmMissing`
* [5552](https://github.com/grafana/loki/pull/5552) **jiachengxu**: Loki mixin: add `DiskSpaceUtilizationPanel`
* [5541](https://github.com/grafana/loki/pull/5541) **bboreham**: Queries: reject very deeply nested regexps which could crash Loki.
* [5536](https://github.com/grafana/loki/pull/5536) **jiachengxu**: Loki mixin: make labelsSelector in loki chunks dashboards configurable
* [5535](https://github.com/grafana/loki/pull/5535) **jiachengxu**: Loki mixins: use labels selector for loki chunks dashboard
Expand Down
16 changes: 11 additions & 5 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (r *LokiRequest) LogToSpan(sp opentracing.Span) {
otlog.String("start", timestamp.Time(r.GetStart()).String()),
otlog.String("end", timestamp.Time(r.GetEnd()).String()),
otlog.Int64("step (ms)", r.GetStep()),
otlog.Int64("interval (ms)", r.GetInterval()),
otlog.Int64("limit", int64(r.GetLimit())),
otlog.String("direction", r.GetDirection().String()),
otlog.String("shards", strings.Join(r.GetShards(), ",")),
Expand Down Expand Up @@ -210,10 +211,10 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []
Direction: req.Direction,
StartTs: req.Start.UTC(),
EndTs: req.End.UTC(),
// GetStep must return milliseconds
Step: int64(req.Step) / 1e6,
Path: r.URL.Path,
Shards: req.Shards,
Step: req.Step.Milliseconds(),
Interval: req.Interval.Milliseconds(),
Path: r.URL.Path,
Shards: req.Shards,
}, nil
case InstantQueryOp:
req, err := loghttp.ParseInstantQuery(r)
Expand Down Expand Up @@ -277,6 +278,9 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http
if request.Step != 0 {
params["step"] = []string{fmt.Sprintf("%f", float64(request.Step)/float64(1e3))}
}
if request.Interval != 0 {
params["interval"] = []string{fmt.Sprintf("%f", float64(request.Interval)/float64(1e3))}
}
u := &url.URL{
// the request could come /api/prom/query but we want to only use the new api.
Path: "/loki/api/v1/query_range",
Expand Down Expand Up @@ -812,7 +816,9 @@ func (p paramsRangeWrapper) End() time.Time {
func (p paramsRangeWrapper) Step() time.Duration {
return time.Duration(p.GetStep() * 1e6)
}
func (p paramsRangeWrapper) Interval() time.Duration { return 0 }
func (p paramsRangeWrapper) Interval() time.Duration {
return time.Duration(p.GetInterval() * 1e6)
}
func (p paramsRangeWrapper) Direction() logproto.Direction {
return p.GetDirection()
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,27 @@ func Test_codec_DecodeRequest(t *testing.T) {
wantErr bool
}{
{"wrong", func() (*http.Request, error) { return http.NewRequest(http.MethodGet, "/bad?step=bad", nil) }, nil, true},
{"ok", func() (*http.Request, error) {
{"query_range", func() (*http.Request, error) {
return http.NewRequest(http.MethodGet,
fmt.Sprintf(`/query_range?start=%d&end=%d&query={foo="bar"}&step=1&limit=200&direction=FORWARD`, start.UnixNano(), end.UnixNano()), nil)
fmt.Sprintf(`/query_range?start=%d&end=%d&query={foo="bar"}&step=10&limit=200&direction=FORWARD`, start.UnixNano(), end.UnixNano()), nil)
}, &LokiRequest{
Query: `{foo="bar"}`,
Limit: 200,
Step: 1000, // step is expected in ms.
Step: 10000, // step is expected in ms
Direction: logproto.FORWARD,
Path: "/query_range",
StartTs: start,
EndTs: end,
}, false},
{"ok", func() (*http.Request, error) {
{"query_range", func() (*http.Request, error) {
return http.NewRequest(http.MethodGet,
fmt.Sprintf(`/query_range?start=%d&end=%d&query={foo="bar"}&step=86400&limit=200&direction=FORWARD`, start.UnixNano(), end.UnixNano()), nil)
fmt.Sprintf(`/query_range?start=%d&end=%d&query={foo="bar"}&interval=10&limit=200&direction=BACKWARD`, start.UnixNano(), end.UnixNano()), nil)
}, &LokiRequest{
Query: `{foo="bar"}`,
Limit: 200,
Step: 86400000, // step is expected in ms.
Direction: logproto.FORWARD,
Step: 14000, // step is expected in ms; calculated default if request param not present
Interval: 10000, // interval is expected in ms
Direction: logproto.BACKWARD,
Path: "/query_range",
StartTs: start,
EndTs: end,
Expand Down Expand Up @@ -222,7 +223,8 @@ func Test_codec_EncodeRequest(t *testing.T) {
toEncode := &LokiRequest{
Query: `{foo="bar"}`,
Limit: 200,
Step: 86400000,
Step: 86400000, // nanoseconds
Interval: 10000000, // nanoseconds
Direction: logproto.FORWARD,
Path: "/query_range",
StartTs: start,
Expand All @@ -238,12 +240,14 @@ func Test_codec_EncodeRequest(t *testing.T) {
require.Equal(t, fmt.Sprintf("%d", 200), got.URL.Query().Get("limit"))
require.Equal(t, `FORWARD`, got.URL.Query().Get("direction"))
require.Equal(t, "86400.000000", got.URL.Query().Get("step"))
require.Equal(t, "10000.000000", got.URL.Query().Get("interval"))

// testing a full roundtrip
req, err := LokiCodec.DecodeRequest(context.TODO(), got, nil)
require.NoError(t, err)
require.Equal(t, toEncode.Query, req.(*LokiRequest).Query)
require.Equal(t, toEncode.Step, req.(*LokiRequest).Step)
require.Equal(t, toEncode.Interval, req.(*LokiRequest).Interval)
require.Equal(t, toEncode.StartTs, req.(*LokiRequest).StartTs)
require.Equal(t, toEncode.EndTs, req.(*LokiRequest).EndTs)
require.Equal(t, toEncode.Direction, req.(*LokiRequest).Direction)
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package queryrange
import (
"context"
"fmt"
"time"

"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -26,7 +25,7 @@ type DownstreamHandler struct {
}

func ParamsToLokiRequest(params logql.Params, shards logql.Shards) queryrangebase.Request {
if params.Start() == params.End() {
if params.Start().Equal(params.End()) {
return &LokiInstantRequest{
Query: params.Query(),
Limit: params.Limit(),
Expand All @@ -39,7 +38,8 @@ func ParamsToLokiRequest(params logql.Params, shards logql.Shards) queryrangebas
return &LokiRequest{
Query: params.Query(),
Limit: params.Limit(),
Step: int64(params.Step() / time.Millisecond),
Step: params.Step().Milliseconds(),
Interval: params.Interval().Milliseconds(),
StartTs: params.Start(),
EndTs: params.End(),
Direction: params.Direction(),
Expand Down
Loading

0 comments on commit 0acfd6e

Please sign in to comment.