Skip to content

Commit

Permalink
Implement offset modifier for range vector aggregation in LogQL
Browse files Browse the repository at this point in the history
implementation for #2785, the offset modifier allows changing the time offset for range vectors in a query to support e.g. selective timeshift in Grafana

Signed-off-by: garrettlish <garrettlish@163.com>
  • Loading branch information
xiancli committed Mar 9, 2021
1 parent 2d07934 commit 248448a
Show file tree
Hide file tree
Showing 9 changed files with 594 additions and 469 deletions.
30 changes: 30 additions & 0 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ func newUnwrapExpr(id string, operation string) *unwrapExpr {
type logRange struct {
left LogSelectorExpr
interval time.Duration
offset time.Duration

unwrap *unwrapExpr
}
Expand All @@ -524,6 +525,22 @@ func newLogRange(left LogSelectorExpr, interval time.Duration, u *unwrapExpr) *l
}
}

type offsetExpr struct {
offset time.Duration
}

func (o *offsetExpr) String() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf(" %s %s", OpOffset, o.offset.String()))
return sb.String()
}

func newOffsetExpr(offset time.Duration) *offsetExpr {
return &offsetExpr{
offset: offset,
}
}

const (
// vector ops
OpTypeSum = "sum"
Expand Down Expand Up @@ -582,6 +599,7 @@ const (

OpPipe = "|"
OpUnwrap = "unwrap"
OpOffset = "offset"

// conversion Op
OpConvBytes = "bytes"
Expand Down Expand Up @@ -623,11 +641,16 @@ type rangeAggregationExpr struct {
operation string

params *float64
offset *offsetExpr
grouping *grouping
implicit
}

func newRangeAggregationExpr(left *logRange, operation string, gr *grouping, stringParams *string) SampleExpr {
return newRangeAggregationExprWithOffset(left, operation, nil, gr, stringParams)
}

func newRangeAggregationExprWithOffset(left *logRange, operation string, offset *offsetExpr, gr *grouping, stringParams *string) SampleExpr {
var params *float64
if stringParams != nil {
if operation != OpRangeTypeQuantile {
Expand All @@ -645,9 +668,13 @@ func newRangeAggregationExpr(left *logRange, operation string, gr *grouping, str
panic(newParseError(fmt.Sprintf("parameter required for operation %s", operation), 0, 0))
}
}
if offset != nil {
left.offset = offset.offset
}
e := &rangeAggregationExpr{
left: left,
operation: operation,
offset: offset,
grouping: gr,
params: params,
}
Expand Down Expand Up @@ -695,6 +722,9 @@ func (e *rangeAggregationExpr) String() string {
sb.WriteString(",")
}
sb.WriteString(e.left.String())
if e.offset != nil {
sb.WriteString(e.offset.String())
}
sb.WriteString(")")
if e.grouping != nil {
sb.WriteString(e.grouping.String())
Expand Down
24 changes: 24 additions & 0 deletions pkg/logql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,27 @@ func Test_SampleExpr_String(t *testing.T) {
for _, tc := range []string{
`rate( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] offset 10m )`,
`sum without(a) ( rate ( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum by(a) (rate( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum(count_over_time({job="mysql"}[5m]))`,
`sum(count_over_time({job="mysql"}[5m] offset 10m))`,
`sum(count_over_time({job="mysql"} | json [5m]))`,
`sum(count_over_time({job="mysql"} | json [5m] offset 10m))`,
`sum(count_over_time({job="mysql"} | logfmt [5m]))`,
`sum(count_over_time({job="mysql"} | logfmt [5m] offset 10m))`,
`sum(count_over_time({job="mysql"} | unpack | json [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m] offset 10m))`,
`topk(10,sum(rate({region="us-east1"}[5m])) by (name))`,
`topk by (name)(10,sum(rate({region="us-east1"}[5m])))`,
`avg( rate( ( {job="nginx"} |= "GET" ) [10s] ) ) by (region)`,
`avg(min_over_time({job="nginx"} |= "GET" | unwrap foo[10s])) by (region)`,
`avg(min_over_time({job="nginx"} |= "GET" | unwrap foo[10s] offset 10m)) by (region)`,
`sum by (cluster) (count_over_time({job="mysql"}[5m]))`,
`sum by (cluster) (count_over_time({job="mysql"}[5m] offset 10m))`,
`sum by (cluster) (count_over_time({job="mysql"}[5m])) / sum by (cluster) (count_over_time({job="postgres"}[5m])) `,
`sum by (cluster) (count_over_time({job="mysql"}[5m] offset 10m)) / sum by (cluster) (count_over_time({job="postgres"}[5m] offset 10m)) `,
`
sum by (cluster) (count_over_time({job="postgres"}[5m])) /
sum by (cluster) (count_over_time({job="postgres"}[5m])) /
Expand All @@ -86,6 +94,8 @@ func Test_SampleExpr_String(t *testing.T) {
)`,
`stdvar_over_time({app="foo"} |= "bar" | json | latency >= 250ms or ( status_code < 500 and status_code > 200)
| line_format "blip{{ .foo }}blop {{.status_code}}" | label_format foo=bar,status_code="buzz{{.bar}}" | unwrap foo [5m])`,
`stdvar_over_time({app="foo"} |= "bar" | json | latency >= 250ms or ( status_code < 500 and status_code > 200)
| line_format "blip{{ .foo }}blop {{.status_code}}" | label_format foo=bar,status_code="buzz{{.bar}}" | unwrap foo [5m] offset 10m)`,
`sum_over_time({namespace="tns"} |= "level=error" | json |foo>=5,bar<25ms|unwrap latency [5m])`,
`sum by (job) (
sum_over_time({namespace="tns"} |= "level=error" | json | foo=5 and bar<25ms | unwrap latency[5m])
Expand Down Expand Up @@ -130,6 +140,20 @@ func Test_SampleExpr_String(t *testing.T) {
`10 / (5/2)`,
`10 / (count_over_time({job="postgres"}[5m])/2)`,
`{app="foo"} | json response_status="response.status.code", first_param="request.params[0]"`,
`label_replace(
sum by (job) (
sum_over_time(
{namespace="tns"} |= "level=error" | json | avg=5 and bar<25ms | unwrap duration(latency) | __error__!~".*" [5m] offset 1h
)
/
count_over_time({namespace="tns"} | logfmt | label_format foo=bar[5m] offset 1h)
),
"foo",
"$1",
"service",
"(.*):.*"
)
`,
} {
t.Run(tc, func(t *testing.T) {
expr, err := ParseExpr(tc)
Expand Down
14 changes: 9 additions & 5 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ func (ev *DefaultEvaluator) StepEvaluator(
nextEv = SampleEvaluatorFunc(func(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error) {
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-rangExpr.left.interval),
End: q.End(),
Start: q.Start().Add(-rangExpr.left.interval).Add(-rangExpr.left.offset),
End: q.End().Add(-rangExpr.left.offset),
Selector: e.String(), // intentionally send the the vector for reducing labels.
Shards: q.Shards(),
},
Expand All @@ -191,8 +191,8 @@ func (ev *DefaultEvaluator) StepEvaluator(
case *rangeAggregationExpr:
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-e.left.interval),
End: q.End(),
Start: q.Start().Add(-e.left.interval).Add(-e.left.offset),
End: q.End().Add(-e.left.offset),
Selector: expr.String(),
Shards: q.Shards(),
},
Expand Down Expand Up @@ -412,11 +412,15 @@ func rangeAggEvaluator(
if err != nil {
return nil, err
}
var offset int64
if expr.offset != nil {
offset = expr.offset.offset.Nanoseconds()
}
iter := newRangeVectorIterator(
it,
expr.left.interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(),
q.Start().UnixNano(), q.End().UnixNano(), offset,
)
if expr.operation == OpRangeTypeAbsent {
return &absentRangeVectorEvaluator{
Expand Down
10 changes: 9 additions & 1 deletion pkg/logql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
JSONExpression log.JSONExpression
JSONExpressionList []log.JSONExpression
UnwrapExpr *unwrapExpr
OffsetExpr *offsetExpr
}

%start root
Expand Down Expand Up @@ -90,6 +91,7 @@ import (
%type <JSONExpressionList> jsonExpressionList
%type <UnwrapExpr> unwrapExpr
%type <UnitFilter> unitFilter
%type <OffsetExpr> offsetExpr

%token <bytes> BYTES
%token <str> IDENTIFIER STRING NUMBER
Expand All @@ -98,7 +100,7 @@ import (
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
BYTES_OVER_TIME BYTES_RATE BOOL JSON REGEXP LOGFMT PIPE LINE_FMT LABEL_FMT UNWRAP AVG_OVER_TIME SUM_OVER_TIME MIN_OVER_TIME
MAX_OVER_TIME STDVAR_OVER_TIME STDDEV_OVER_TIME QUANTILE_OVER_TIME BYTES_CONV DURATION_CONV DURATION_SECONDS_CONV
ABSENT_OVER_TIME LABEL_REPLACE UNPACK
ABSENT_OVER_TIME LABEL_REPLACE UNPACK OFFSET

// Operators are listed with increasing precedence.
%left <binOp> OR
Expand Down Expand Up @@ -166,6 +168,10 @@ rangeAggregationExpr:
| rangeOp OPEN_PARENTHESIS NUMBER COMMA logRangeExpr CLOSE_PARENTHESIS { $$ = newRangeAggregationExpr($5, $1, nil, &$3) }
| rangeOp OPEN_PARENTHESIS logRangeExpr CLOSE_PARENTHESIS grouping { $$ = newRangeAggregationExpr($3, $1, $5, nil) }
| rangeOp OPEN_PARENTHESIS NUMBER COMMA logRangeExpr CLOSE_PARENTHESIS grouping { $$ = newRangeAggregationExpr($5, $1, $7, &$3) }
| rangeOp OPEN_PARENTHESIS logRangeExpr offsetExpr CLOSE_PARENTHESIS { $$ = newRangeAggregationExprWithOffset($3, $1, $4, nil, nil) }
| rangeOp OPEN_PARENTHESIS NUMBER COMMA logRangeExpr offsetExpr CLOSE_PARENTHESIS { $$ = newRangeAggregationExprWithOffset($5, $1, $6, nil, &$3) }
| rangeOp OPEN_PARENTHESIS logRangeExpr offsetExpr CLOSE_PARENTHESIS grouping { $$ = newRangeAggregationExprWithOffset($3, $1, $4, $6, nil) }
| rangeOp OPEN_PARENTHESIS NUMBER COMMA logRangeExpr offsetExpr CLOSE_PARENTHESIS grouping { $$ = newRangeAggregationExprWithOffset($5, $1, $6, $8, &$3) }
;

vectorAggregationExpr:
Expand Down Expand Up @@ -363,6 +369,8 @@ rangeOp:
| ABSENT_OVER_TIME { $$ = OpRangeTypeAbsent }
;

offsetExpr:
OFFSET DURATION { $$ = newOffsetExpr( $2 ) }

labels:
IDENTIFIER { $$ = []string{ $1 } }
Expand Down
Loading

0 comments on commit 248448a

Please sign in to comment.