Skip to content

Commit

Permalink
fix(approx_topk): Map approx_topk operation in all cases (#16131)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored Feb 14, 2025
1 parent 8bd6400 commit 6d691ac
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 61 deletions.
134 changes: 74 additions & 60 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,66 +293,7 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr
return nil, 0, fmt.Errorf("approx_topk is not enabled. See -limits.shard_aggregations")
}

// TODO(owen-d): integrate bounded sharding with approx_topk
// I'm not doing this now because it uses a separate code path and may not handle
// bounded shards in the same way
shards, bytesPerShard, err := m.shards.Resolver().Shards(expr)
if err != nil {
return nil, 0, err
}

// approx_topk(k, inner) ->
// topk(
// k,
// eval_cms(
// __count_min_sketch__(inner, shard=1) ++ __count_min_sketch__(inner, shard=2)...
// )
// )

countMinSketchExpr := syntax.MustClone(expr)
countMinSketchExpr.Operation = syntax.OpTypeCountMinSketch
countMinSketchExpr.Params = 0

// Even if this query is not sharded the user wants an approximation. This is helpful if some
// inferred label has a very high cardinality. Note that the querier does not support CountMinSketchEvalExpr
// which is why it's evaluated on the front end.
if shards == 0 {
return &syntax.VectorAggregationExpr{
Left: &CountMinSketchEvalExpr{
downstreams: []DownstreamSampleExpr{{
SampleExpr: countMinSketchExpr,
}},
},
Grouping: expr.Grouping,
Operation: syntax.OpTypeTopK,
Params: expr.Params,
}, bytesPerShard, nil
}

downstreams := make([]DownstreamSampleExpr, 0, shards)
for shard := 0; shard < shards; shard++ {
s := NewPowerOfTwoShard(index.ShardAnnotation{
Shard: uint32(shard),
Of: uint32(shards),
})
downstreams = append(downstreams, DownstreamSampleExpr{
shard: &ShardWithChunkRefs{
Shard: s,
},
SampleExpr: countMinSketchExpr,
})
}

sharded := &CountMinSketchEvalExpr{
downstreams: downstreams,
}

return &syntax.VectorAggregationExpr{
Left: sharded,
Grouping: expr.Grouping,
Operation: syntax.OpTypeTopK,
Params: expr.Params,
}, bytesPerShard, nil
return m.mapApproxTopk(expr, false)
default:
// this should not be reachable. If an operation is shardable it should
// have an optimization listed. Nonetheless, we log this as a warning
Expand All @@ -367,6 +308,16 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr
}
return expr, exprStats.Bytes, nil
}
} else {
// if this AST contains unshardable operations, we still need to rewrite some operations (e.g. approx_topk) as if it had 0 shards as they are not supported on the querier
switch expr.Operation {
case syntax.OpTypeApproxTopK:
level.Error(util_log.Logger).Log(
"msg", "encountered unshardable approx_topk operation",
"operation", expr.Operation,
)
return m.mapApproxTopk(expr, true)
}
}

// if this AST contains unshardable operations, don't shard this at this level,
Expand All @@ -388,6 +339,69 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr
}, bytesPerShard, nil
}

func (m ShardMapper) mapApproxTopk(expr *syntax.VectorAggregationExpr, forceNoShard bool) (*syntax.VectorAggregationExpr, uint64, error) {
// TODO(owen-d): integrate bounded sharding with approx_topk
// I'm not doing this now because it uses a separate code path and may not handle
// bounded shards in the same way
shards, bytesPerShard, err := m.shards.Resolver().Shards(expr)
if err != nil {
return nil, 0, err
}

// approx_topk(k, inner) ->
// topk(
// k,
// eval_cms(
// __count_min_sketch__(inner, shard=1) ++ __count_min_sketch__(inner, shard=2)...
// )
// )

countMinSketchExpr := syntax.MustClone(expr)
countMinSketchExpr.Operation = syntax.OpTypeCountMinSketch
countMinSketchExpr.Params = 0

// Even if this query is not sharded the user wants an approximation. This is helpful if some
// inferred label has a very high cardinality. Note that the querier does not support CountMinSketchEvalExpr
// which is why it's evaluated on the front end.
if shards == 0 || forceNoShard {
return &syntax.VectorAggregationExpr{
Left: &CountMinSketchEvalExpr{
downstreams: []DownstreamSampleExpr{{
SampleExpr: countMinSketchExpr,
}},
},
Grouping: expr.Grouping,
Operation: syntax.OpTypeTopK,
Params: expr.Params,
}, bytesPerShard, nil
}

downstreams := make([]DownstreamSampleExpr, 0, shards)
for shard := 0; shard < shards; shard++ {
s := NewPowerOfTwoShard(index.ShardAnnotation{
Shard: uint32(shard),
Of: uint32(shards),
})
downstreams = append(downstreams, DownstreamSampleExpr{
shard: &ShardWithChunkRefs{
Shard: s,
},
SampleExpr: countMinSketchExpr,
})
}

sharded := &CountMinSketchEvalExpr{
downstreams: downstreams,
}

return &syntax.VectorAggregationExpr{
Left: sharded,
Grouping: expr.Grouping,
Operation: syntax.OpTypeTopK,
Params: expr.Params,
}, bytesPerShard, nil
}

func (m ShardMapper) mapLabelReplaceExpr(expr *syntax.LabelReplaceExpr, r *downstreamRecorder, topLevel bool) (syntax.SampleExpr, uint64, error) {
subMapped, bytesPerShard, err := m.Map(expr.Left, r, topLevel)
if err != nil {
Expand Down
115 changes: 114 additions & 1 deletion pkg/logql/shardmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestMappingStrings(t *testing.T) {
sum by (foo_extracted) (
downstream<sumby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=0_of_2>++downstream<sumby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=1_of_2>
)
/
/
sum by (foo_extracted) (
downstream<countby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=0_of_2>++downstream<countby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=1_of_2>
)
Expand Down Expand Up @@ -652,6 +652,52 @@ func TestMapping(t *testing.T) {
},
},
},
{
in: `rate({foo="bar"}[5m]) > 3`,
expr: &syntax.BinOpExpr{
Op: syntax.OpTypeGT,
SampleExpr: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: NewPowerOfTwoShard(index.ShardAnnotation{
Shard: 0,
Of: 2,
}).Bind(nil),
SampleExpr: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeRate,
Left: &syntax.LogRangeExpr{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
},
Interval: 5 * time.Minute,
},
},
},
next: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: NewPowerOfTwoShard(index.ShardAnnotation{
Shard: 1,
Of: 2,
}).Bind(nil),
SampleExpr: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeRate,
Left: &syntax.LogRangeExpr{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
},
Interval: 5 * time.Minute,
},
},
},
next: nil,
},
},
RHS: &syntax.LiteralExpr{Val: 3},
Opts: &syntax.BinOpOptions{
ReturnBool: false,
VectorMatching: &syntax.VectorMatching{},
},
},
},
{
in: `count_over_time({foo="bar"}[5m])`,
expr: &ConcatSampleExpr{
Expand Down Expand Up @@ -833,6 +879,73 @@ func TestMapping(t *testing.T) {
},
},
},
{
in: `approx_topk(3, rate({foo="bar"}[5m]) > 3)`,
expr: &syntax.VectorAggregationExpr{
Grouping: &syntax.Grouping{},
Params: 3,
Operation: syntax.OpTypeTopK,
Left: &CountMinSketchEvalExpr{
downstreams: []DownstreamSampleExpr{
{
shard: nil,
SampleExpr: &syntax.VectorAggregationExpr{
Operation: syntax.OpTypeCountMinSketch,
Left: &syntax.BinOpExpr{
Op: syntax.OpTypeGT,
SampleExpr: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeRate,
Left: &syntax.LogRangeExpr{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
},
Interval: 5 * time.Minute,
},
},
RHS: &syntax.LiteralExpr{Val: 3},
Opts: &syntax.BinOpOptions{
ReturnBool: false,
VectorMatching: &syntax.VectorMatching{},
},
},
Grouping: &syntax.Grouping{},
},
},
},
},
},
},
{
// A contrived query that is not shardable: but we must rewrite `approx_topk` as if it had 0 shards because the approx_topk operation is not supported on the querier
in: `approx_topk(3, topk(5, rate({foo="bar"}[5m])))`,
expr: &syntax.VectorAggregationExpr{
Left: &CountMinSketchEvalExpr{
downstreams: []DownstreamSampleExpr{{
SampleExpr: &syntax.VectorAggregationExpr{
Operation: syntax.OpTypeCountMinSketch,
Left: &syntax.VectorAggregationExpr{Operation: syntax.OpTypeTopK,
Params: 5,
Left: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeRate,
Left: &syntax.LogRangeExpr{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
},
Interval: 5 * time.Minute,
},
},
Grouping: &syntax.Grouping{},
},
Grouping: &syntax.Grouping{},
},
shard: nil,
}},
},
Grouping: &syntax.Grouping{},
Operation: syntax.OpTypeTopK,
Params: 3,
},
},
{
in: `count(rate({foo="bar"}[5m]))`,
expr: &syntax.VectorAggregationExpr{
Expand Down

0 comments on commit 6d691ac

Please sign in to comment.