Skip to content

Commit

Permalink
MQE: fix functions that could return multiple series with the same la…
Browse files Browse the repository at this point in the history
…bels (#10533)

* Add failing test

* Fix failing test cases
  • Loading branch information
charleskorn authored Jan 30, 2025
1 parent 1712177 commit 656dfb1
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 4 deletions.
12 changes: 8 additions & 4 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ func ClampFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke
SeriesMetadataFunction: functions.DropSeriesName,
}

return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{min, max}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil
o := functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{min, max}, memoryConsumptionTracker, f, expressionPosition, timeRange)
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}

func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) InstantVectorFunctionOperatorFactory {
Expand All @@ -245,7 +246,8 @@ func ClampMinMaxFunctionOperatorFactory(functionName string, isMin bool) Instant
SeriesMetadataFunction: functions.DropSeriesName,
}

return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{clampTo}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil
o := functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{clampTo}, memoryConsumptionTracker, f, expressionPosition, timeRange)
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}
}

Expand Down Expand Up @@ -277,7 +279,8 @@ func RoundFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracke
SeriesMetadataFunction: functions.DropSeriesName,
}

return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil
o := functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{toNearest}, memoryConsumptionTracker, f, expressionPosition, timeRange)
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}

func HistogramQuantileFunctionOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
Expand Down Expand Up @@ -331,7 +334,8 @@ func HistogramFractionFunctionOperatorFactory(args []types.Operator, memoryConsu
SeriesMetadataFunction: functions.DropSeriesName,
}

return functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{lower, upper}, memoryConsumptionTracker, f, expressionPosition, timeRange), nil
o := functions.NewFunctionOverInstantVector(inner, []types.ScalarOperator{lower, upper}, memoryConsumptionTracker, f, expressionPosition, timeRange)
return operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker), nil
}

// These functions return an instant-vector.
Expand Down
99 changes: 99 additions & 0 deletions pkg/streamingpromql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,18 @@
package streamingpromql

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/streamingpromql/operators/functions"
)

Expand Down Expand Up @@ -50,3 +58,94 @@ func TestRegisterScalarFunctionOperatorFactory(t *testing.T) {
// Cleanup changes to instantVectorFunctionOperatorFactories
delete(scalarFunctionOperatorFactories, "new_function")
}

// This test ensures that all functions correctly merge series after dropping the metric name.
func TestFunctionDeduplicateAndMerge(t *testing.T) {
data := `
load 30s
float_a{env="prod"} _ 0 1 _ _ _ _ _ _ _ _ _ _ _ _
float_b{env="prod"} _ _ _ _ _ _ _ _ _ _ _ _ _ 8 9
histogram_a{env="prod"} _ {{count:0}} {{count:1}} _ _ _ _ _ _ _ _ _ _ _ _
histogram_b{env="prod"} _ _ _ _ _ _ _ _ _ _ _ _ _ {{count:8}} {{count:9}}
`

storage := promqltest.LoadedStorage(t, data)
opts := NewTestEngineOpts()
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

ctx := context.Background()
start := timestamp.Time(0).Add(time.Minute)
end := timestamp.Time(0).Add(7 * time.Minute)
step := time.Minute

preSelectorFunctionArgs := map[string]string{
"histogram_quantile": "0.1",
"histogram_fraction": "0, 0.1",
}

postSelectorFunctionArgs := map[string]string{
"clamp": "-Inf, Inf",
"clamp_min": "-Inf",
"clamp_max": "Inf",
"label_replace": `"__name__", "$1", "env", "(.*)"`,
}

for name := range instantVectorFunctionOperatorFactories {
if name == "vector" || name == "last_over_time" {
// This test doesn't apply to vector() because it takes a scalar parameter.
// This test doesn't apply to last_over_time() because it doesn't drop the metric name.
continue
}

t.Run(name, func(t *testing.T) {
metricType := "float"

if strings.HasPrefix(name, "histogram_") {
metricType = "histogram"
}

selector := fmt.Sprintf(`{__name__=~"%s.*"}`, metricType)

if isFunctionOverRangeVector(name) {
selector = selector + "[1m]"
}

preSelectorArgs := preSelectorFunctionArgs[name]
if preSelectorArgs != "" {
preSelectorArgs = preSelectorArgs + ", "
}

postSelectorArgs := postSelectorFunctionArgs[name]
if postSelectorArgs != "" {
postSelectorArgs = ", " + postSelectorArgs
}

expr := fmt.Sprintf("%s(%s%s%s)", name, preSelectorArgs, selector, postSelectorArgs)

q, err := engine.NewRangeQuery(ctx, storage, nil, expr, start, end, step)
require.NoError(t, err)
defer q.Close()

mimirResult := q.Exec(ctx)
require.NoError(t, mimirResult.Err)
m, err := mimirResult.Matrix()
require.NoError(t, err)

require.Len(t, m, 1, "expected a single series")
})
}
}

func isFunctionOverRangeVector(name string) bool {
if strings.HasSuffix(name, "_over_time") {
return true
}

switch name {
case "changes", "delta", "deriv", "idelta", "increase", "irate", "rate", "resets":
return true
default:
return false
}
}

0 comments on commit 656dfb1

Please sign in to comment.