Skip to content

Commit

Permalink
fix: Adjust with offset in last, first and quantile over time queries. (
Browse files Browse the repository at this point in the history
#15915)

**What this PR does / why we need it**:
The query sharding for `last_over_time`, `first_over_time` and `quantile_over_time` would not adjust start and end by the passed `offset` for the underlying range query.

This change introduces tests and fixes the adjustment.
  • Loading branch information
jeschkies authored Jan 28, 2025
1 parent 57c27d9 commit 83339cb
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 22 deletions.
7 changes: 5 additions & 2 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -304,6 +305,7 @@ func (e *QuantileSketchMergeExpr) Walk(f syntax.WalkFn) {
type MergeFirstOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
offset time.Duration
}

func (e MergeFirstOverTimeExpr) String() string {
Expand Down Expand Up @@ -332,6 +334,7 @@ func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) {
type MergeLastOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
offset time.Duration
}

func (e MergeLastOverTimeExpr) String() string {
Expand Down Expand Up @@ -590,7 +593,7 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
}
}

return NewMergeFirstOverTimeStepEvaluator(params, xs), nil
return NewMergeFirstOverTimeStepEvaluator(params, xs, e.offset), nil
case *MergeLastOverTimeExpr:
queries := make([]DownstreamQuery, len(e.downstreams))

Expand Down Expand Up @@ -625,7 +628,7 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
}
}
return NewMergeLastOverTimeStepEvaluator(params, xs), nil
return NewMergeLastOverTimeStepEvaluator(params, xs, e.offset), nil
case *CountMinSketchEvalExpr:
queries := make([]DownstreamQuery, len(e.downstreams))

Expand Down
14 changes: 10 additions & 4 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestMappingEquivalence(t *testing.T) {
{`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s])`, true, nil},
{`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s]) without (stream)`, true, nil},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true, []string{ShardQuantileOverTime}},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s] offset 2s)`, true, []string{ShardQuantileOverTime}},
{
`
(quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a) > 1)
Expand All @@ -75,8 +76,12 @@ func TestMappingEquivalence(t *testing.T) {
},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardFirstOverTime}},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardFirstOverTime}},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s] offset 2s) by (a)`, false, []string{ShardFirstOverTime}},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s] offset -2s) by (a)`, false, []string{ShardFirstOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardLastOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardLastOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s] offset 2s) by (a)`, false, []string{ShardLastOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s] offset -2s) by (a)`, false, []string{ShardLastOverTime}},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
Expand Down Expand Up @@ -190,6 +195,7 @@ func TestMappingEquivalenceSketches(t *testing.T) {
}{
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.05},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.02},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s] offset 2s) by (a)`, 0.02},
} {
q := NewMockQuerier(
shards,
Expand Down Expand Up @@ -241,8 +247,8 @@ func TestMappingEquivalenceSketches(t *testing.T) {
// plus set step and interval to 0
params, err := NewLiteralParams(
tc.query,
time.Unix(1, 0),
time.Unix(1, 0),
time.Unix(10, 0),
time.Unix(10, 0),
0,
0,
logproto.FORWARD,
Expand Down Expand Up @@ -294,7 +300,7 @@ func TestApproxTopkSketches(t *testing.T) {
shardedQuery string
regularQuery string
realtiveError float64
//cardinalityEstimate int
// cardinalityEstimate int
}{
// Note:our data generation results in less spread between topk things for 10k streams than for 100k streams
// if we have 1k streams, we can get much more accurate results for topk 10 than topk 100
Expand All @@ -304,7 +310,7 @@ func TestApproxTopkSketches(t *testing.T) {
shardedQuery: `approx_topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
regularQuery: `topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
realtiveError: 0.0012,
//cardinalityEstimate: 3,
// cardinalityEstimate: 3,
},
{
labelShards: 10,
Expand Down
29 changes: 27 additions & 2 deletions pkg/logql/first_last_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ func newFirstWithTimestampIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64,
) RangeVectorIterator {
// forces at least one step.
if step == 0 {
step = 1
}
if offset != 0 {
start = start - offset
end = end - offset
}

inner := &batchRangeVectorIterator{
iter: it,
step: step,
Expand Down Expand Up @@ -70,6 +79,15 @@ func newLastWithTimestampIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64,
) RangeVectorIterator {
// forces at least one step.
if step == 0 {
step = 1
}
if offset != 0 {
start = start - offset
end = end - offset
}

inner := &batchRangeVectorIterator{
iter: it,
step: step,
Expand Down Expand Up @@ -127,6 +145,7 @@ type mergeOverTimeStepEvaluator struct {
step time.Duration
matrices []promql.Matrix
merge func(promql.Vector, int, int, promql.Series) promql.Vector
offset time.Duration
}

// Next returns the first or last element within one step of each matrix.
Expand Down Expand Up @@ -170,6 +189,10 @@ func (e *mergeOverTimeStepEvaluator) pop(r, s int) {

// inRange returns true if t is in step range of ts.
func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool {
// The time stamp needs to be adjusted because the original datapoint at t is
// from a shifted query.
ts -= e.offset.Milliseconds()

// special case instant queries
if e.step.Milliseconds() == 0 {
return true
Expand All @@ -181,7 +204,7 @@ func (*mergeOverTimeStepEvaluator) Close() error { return nil }

func (*mergeOverTimeStepEvaluator) Error() error { return nil }

func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator {
func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix, offset time.Duration) StepEvaluator {
if len(m) == 0 {
return EmptyEvaluator[SampleVector]{}
}
Expand All @@ -199,6 +222,7 @@ func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEv
step: step,
matrices: m,
merge: mergeFirstOverTime,
offset: offset,
}
}

Expand All @@ -218,7 +242,7 @@ func mergeFirstOverTime(vec promql.Vector, pos int, nSeries int, series promql.S
return vec
}

func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator {
func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix, offset time.Duration) StepEvaluator {
if len(m) == 0 {
return EmptyEvaluator[SampleVector]{}
}
Expand All @@ -236,6 +260,7 @@ func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEva
step: step,
matrices: m,
merge: mergeLastOverTime,
offset: offset,
}
}

Expand Down
28 changes: 17 additions & 11 deletions pkg/logql/quantile_over_time_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ func newQuantileSketchIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64,
) RangeVectorIterator {
// forces at least one step.
if step == 0 {
step = 1
}
if offset != 0 {
start = start - offset
end = end - offset
}

inner := &batchRangeVectorIterator{
iter: it,
step: step,
Expand Down Expand Up @@ -302,23 +311,20 @@ func JoinQuantileSketchVector(next bool, r StepResult, stepEvaluator StepEvaluat
// QuantileSketchMatrixStepEvaluator steps through a matrix of quantile sketch
// vectors, ie t-digest or DDSketch structures per time step.
type QuantileSketchMatrixStepEvaluator struct {
start, end, ts time.Time
step time.Duration
m ProbabilisticQuantileMatrix
end, ts time.Time
step time.Duration
m ProbabilisticQuantileMatrix
}

func NewQuantileSketchMatrixStepEvaluator(m ProbabilisticQuantileMatrix, params Params) *QuantileSketchMatrixStepEvaluator {
var (
start = params.Start()
end = params.End()
step = params.Step()
step = params.Step()
)
return &QuantileSketchMatrixStepEvaluator{
start: start,
end: end,
ts: start.Add(-step), // will be corrected on first Next() call
step: step,
m: m,
end: params.End(),
ts: params.Start().Add(-step), // will be corrected on first Next() call
step: step,
m: m,
}
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/logql/range_vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func newPoint(t time.Time, v float64) promql.FPoint {
}

func Benchmark_RangeVectorIteratorCompare(b *testing.B) {

// no overlap test case.
buildStreamingIt := func() (RangeVectorIterator, error) {
tt := struct {
Expand Down Expand Up @@ -183,7 +182,6 @@ func Benchmark_RangeVectorIteratorCompare(b *testing.B) {
}
}
})

}

func Benchmark_RangeVectorIterator(b *testing.B) {
Expand Down Expand Up @@ -214,7 +212,6 @@ func Benchmark_RangeVectorIterator(b *testing.B) {
i++
}
}

}

func Test_RangeVectorIterator_InstantQuery(t *testing.T) {
Expand Down Expand Up @@ -445,6 +442,7 @@ func Test_RangeVectorIterator(t *testing.T) {
time.Unix(110, 0), time.Unix(120, 0),
},
{
// TODO: use this test case
(5 * time.Second).Nanoseconds(), // no overlap
(30 * time.Second).Nanoseconds(),
(10 * time.Second).Nanoseconds(),
Expand Down
2 changes: 2 additions & 0 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,

return &MergeFirstOverTimeExpr{
downstreams: downstreams,
offset: expr.Left.Offset,
}, bytesPerShard, nil
case syntax.OpRangeTypeLast:
if !m.lastOverTimeSharding {
Expand Down Expand Up @@ -623,6 +624,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,

return &MergeLastOverTimeExpr{
downstreams: downstreams,
offset: expr.Left.Offset,
}, bytesPerShard, nil
default:
// don't shard if there's not an appropriate optimization
Expand Down

0 comments on commit 83339cb

Please sign in to comment.