From 7c86e651ac71b31bf85e46303931291eafcc0027 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 8 Jul 2024 11:19:22 +0200 Subject: [PATCH] fix: Fixes pattern pruning stability (#13429) --- pkg/pattern/ingester_querier.go | 116 +++++++++++------- pkg/pattern/ingester_querier_test.go | 176 ++++++++++++--------------- pkg/pattern/testdata/patterns.txt | 75 ------------ 3 files changed, 147 insertions(+), 220 deletions(-) delete mode 100644 pkg/pattern/testdata/patterns.txt diff --git a/pkg/pattern/ingester_querier.go b/pkg/pattern/ingester_querier.go index 1e18852eddaa3..e0d77cc0148de 100644 --- a/pkg/pattern/ingester_querier.go +++ b/pkg/pattern/ingester_querier.go @@ -5,23 +5,27 @@ import ( "errors" "math" "net/http" + "sort" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/ring" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/errgroup" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" - "github.com/grafana/loki/v3/pkg/pattern/drain" loki_iter "github.com/grafana/loki/v3/pkg/iter" pattern_iter "github.com/grafana/loki/v3/pkg/pattern/iter" ) // TODO(kolesnikovae): parametrise QueryPatternsRequest -const minClusterSize = 30 +const ( + minClusterSize = 30 + maxPatterns = 300 +) var ErrParseQuery = errors.New("only byte_over_time and count_over_time queries without filters are supported") @@ -132,36 +136,63 @@ func (q *IngesterQuerier) querySample(ctx context.Context, req *logproto.QuerySa return iterators, nil } -func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int, metrics *ingesterQuerierMetrics) *logproto.QueryPatternsResponse { - pruneConfig := drain.DefaultConfig() - pruneConfig.SimTh = 1.0 // Merge & de-dup patterns but don't modify them - +func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int64, metrics *ingesterQuerierMetrics) *logproto.QueryPatternsResponse { patternsBefore := len(resp.Series) - d := drain.New(pruneConfig, "", nil) - for _, p := range resp.Series { - d.TrainPattern(p.GetPattern(), p.Samples) + total := make([]int64, len(resp.Series)) + + for i, p := range resp.Series { + for _, s := range p.Samples { + total[i] += s.Value + } } - resp.Series = resp.Series[:0] - for _, cluster := range d.Clusters() { - if cluster.Size < minClusterSize { - continue + // Create a slice of structs to keep Series and total together + type SeriesWithTotal struct { + Series *logproto.PatternSeries + Total int64 + } + + seriesWithTotals := make([]SeriesWithTotal, len(resp.Series)) + for i := range resp.Series { + seriesWithTotals[i] = SeriesWithTotal{ + Series: resp.Series[i], + Total: total[i], } - pattern := d.PatternString(cluster) - if pattern == "" { - continue + } + + // Sort the slice of structs by the Total field + sort.Slice(seriesWithTotals, func(i, j int) bool { + return seriesWithTotals[i].Total > seriesWithTotals[j].Total + }) + + // Initialize a variable to keep track of the position for valid series + pos := 0 + + // Iterate over the seriesWithTotals + for i := range seriesWithTotals { + if seriesWithTotals[i].Total >= minClusterSize { + // Place the valid series at the current position + resp.Series[pos] = seriesWithTotals[i].Series + pos++ } - resp.Series = append(resp.Series, - logproto.NewPatternSeries(pattern, cluster.Samples())) } + + // Slice the resp.Series to include only the valid series + resp.Series = resp.Series[:pos] + + if len(resp.Series) > maxPatterns { + resp.Series = resp.Series[:maxPatterns] + } + metrics.patternsPrunedTotal.Add(float64(patternsBefore - len(resp.Series))) metrics.patternsRetainedTotal.Add(float64(len(resp.Series))) + return resp } // ForAllIngesters runs f, in parallel, for all ingesters func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) { - replicationSet, err := q.ringClient.Ring().GetReplicationSetForOperation(ring.Read) + replicationSet, err := q.ringClient.Ring().GetAllHealthy(ring.Read) if err != nil { return nil, err } @@ -174,32 +205,29 @@ type ResponseFromIngesters struct { response interface{} } -// forGivenIngesters runs f, in parallel, for given ingesters func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) { - cfg := ring.DoUntilQuorumConfig{ - // Nothing here - } - results, err := ring.DoUntilQuorum(ctx, replicationSet, cfg, func(ctx context.Context, ingester *ring.InstanceDesc) (ResponseFromIngesters, error) { - client, err := q.ringClient.Pool().GetClientFor(ingester.Addr) - if err != nil { - return ResponseFromIngesters{addr: ingester.Addr}, err - } - - resp, err := f(ctx, client.(logproto.PatternClient)) - if err != nil { - return ResponseFromIngesters{addr: ingester.Addr}, err - } - - return ResponseFromIngesters{ingester.Addr, resp}, nil - }, func(ResponseFromIngesters) { - // Nothing to do - }) - if err != nil { + g, ctx := errgroup.WithContext(ctx) + responses := make([]ResponseFromIngesters, len(replicationSet.Instances)) + + for i, ingester := range replicationSet.Instances { + ingester := ingester + i := i + g.Go(func() error { + client, err := q.ringClient.Pool().GetClientFor(ingester.Addr) + if err != nil { + return err + } + + resp, err := f(ctx, client.(logproto.PatternClient)) + if err != nil { + return err + } + responses[i] = ResponseFromIngesters{addr: ingester.Addr, response: resp} + return nil + }) + } + if err := g.Wait(); err != nil { return nil, err } - - responses := make([]ResponseFromIngesters, 0, len(results)) - responses = append(responses, results...) - - return responses, err + return responses, nil } diff --git a/pkg/pattern/ingester_querier_test.go b/pkg/pattern/ingester_querier_test.go index ec64221274a00..6fc125dc7f0a1 100644 --- a/pkg/pattern/ingester_querier_test.go +++ b/pkg/pattern/ingester_querier_test.go @@ -1,16 +1,13 @@ package pattern import ( - "bufio" "context" - "os" "testing" "time" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" - "golang.org/x/exp/slices" "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" @@ -20,106 +17,83 @@ import ( "github.com/grafana/loki/v3/pkg/pattern/metric" ) -func Test_prunePatterns(t *testing.T) { - file, err := os.Open(`testdata/patterns.txt`) - require.NoError(t, err) - defer file.Close() - - resp := new(logproto.QueryPatternsResponse) - scanner := bufio.NewScanner(file) - for scanner.Scan() { - resp.Series = append(resp.Series, logproto.NewPatternSeries(scanner.Text(), []*logproto.PatternSample{})) - } - require.NoError(t, scanner.Err()) - - startingPatterns := len(resp.Series) - prunePatterns(resp, 0, newIngesterQuerierMetrics(prometheus.DefaultRegisterer, "test")) - - expectedPatterns := []string{ - `<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=0 <_> <_>`, - `<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=1 <_> <_>`, - `<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=2 <_> <_>`, - `<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=3 <_> <_>`, - `<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=4 <_> <_>`, - `<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=5 <_> <_>`, - `<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=6 <_> <_>`, - `<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=7 <_> <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>`, - `<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=0 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`, - `<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=1 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`, - `<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=2 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`, - `<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=3 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`, - `<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=4 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`, - `<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=5 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`, - `<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=6 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`, - `<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=7 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type"`, - `<_> caller=wrapper.go:48 level=info component=distributor msg="sample remote write" eventType=bi <_> <_> <_>`, +func TestPrunePatterns(t *testing.T) { + metrics := newIngesterQuerierMetrics(prometheus.NewRegistry(), "test") + testCases := []struct { + name string + inputSeries []*logproto.PatternSeries + minClusterSize int64 + expectedSeries []*logproto.PatternSeries + expectedPruned int + expectedRetained int + }{ + { + name: "No pruning needed", + inputSeries: []*logproto.PatternSeries{ + {Pattern: `{app="test1"}`, Samples: []*logproto.PatternSample{{Value: 40}}}, + {Pattern: `{app="test2"}`, Samples: []*logproto.PatternSample{{Value: 35}}}, + }, + minClusterSize: 20, + expectedSeries: []*logproto.PatternSeries{ + {Pattern: `{app="test1"}`, Samples: []*logproto.PatternSample{{Value: 40}}}, + {Pattern: `{app="test2"}`, Samples: []*logproto.PatternSample{{Value: 35}}}, + }, + expectedPruned: 0, + expectedRetained: 2, + }, + { + name: "Pruning some patterns", + inputSeries: []*logproto.PatternSeries{ + {Pattern: `{app="test1"}`, Samples: []*logproto.PatternSample{{Value: 10}}}, + {Pattern: `{app="test2"}`, Samples: []*logproto.PatternSample{{Value: 5}}}, + {Pattern: `{app="test3"}`, Samples: []*logproto.PatternSample{{Value: 50}}}, + }, + minClusterSize: 20, + expectedSeries: []*logproto.PatternSeries{ + {Pattern: `{app="test3"}`, Samples: []*logproto.PatternSample{{Value: 50}}}, + }, + expectedPruned: 2, + expectedRetained: 1, + }, + { + name: "Limit patterns to maxPatterns", + inputSeries: func() []*logproto.PatternSeries { + series := make([]*logproto.PatternSeries, maxPatterns+10) + for i := 0; i < maxPatterns+10; i++ { + series[i] = &logproto.PatternSeries{ + Pattern: `{app="test"}`, + Samples: []*logproto.PatternSample{{Value: int64(maxPatterns + 10 - i)}}, + } + } + return series + }(), + minClusterSize: 0, + expectedSeries: func() []*logproto.PatternSeries { + series := make([]*logproto.PatternSeries, maxPatterns) + for i := 0; i < maxPatterns; i++ { + series[i] = &logproto.PatternSeries{ + Pattern: `{app="test"}`, + Samples: []*logproto.PatternSample{{Value: int64(maxPatterns + 10 - i)}}, + } + } + return series + }(), + expectedPruned: 10, + expectedRetained: maxPatterns, + }, } - patterns := make([]string, 0, len(resp.Series)) - for _, p := range resp.Series { - patterns = append(patterns, p.GetPattern()) - } - slices.Sort(patterns) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + resp := &logproto.QueryPatternsResponse{ + Series: tc.inputSeries, + } + result := prunePatterns(resp, tc.minClusterSize, metrics) - require.Equal(t, expectedPatterns, patterns) - require.Less(t, len(patterns), startingPatterns, "prunePatterns should remove duplicates") + require.Equal(t, len(tc.expectedSeries), len(result.Series)) + require.Equal(t, tc.expectedSeries, result.Series) + }) + } } func Test_Samples(t *testing.T) { @@ -281,7 +255,7 @@ func (f *fakeRing) Get( } func (f *fakeRing) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, error) { - panic("not implemented") + return ring.ReplicationSet{}, nil } func (f *fakeRing) GetReplicationSetForOperation(_ ring.Operation) (ring.ReplicationSet, error) { diff --git a/pkg/pattern/testdata/patterns.txt b/pkg/pattern/testdata/patterns.txt deleted file mode 100644 index 4aa449faf0e2c..0000000000000 --- a/pkg/pattern/testdata/patterns.txt +++ /dev/null @@ -1,75 +0,0 @@ -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_> -<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=3 <_> <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=0 <_> <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_> -<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=2 <_> <_> -<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=6 <_> <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=wrapper.go:48 level=info component=distributor msg="sample remote write" eventType=bi <_> <_> <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" <_> partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=7 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type" -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=1 <_> <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=6 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type" -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=0 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type" -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=2 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type" -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_> -<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=3 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type" -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_> -<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=5 <_> <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=0, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=2, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=4 <_> <_> -<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=4 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type" -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=4, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=1 <_> <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type" -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=1, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=6, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=aggregator.go:139 level=info msg="received kafka message" topic=cortex-dev-01-aggregations partition=7 <_> <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=5, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_> -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=3, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> sampleTimestamp=2024-04-03 <_> +0000 UTC, <_> -<_> caller=offset_committer.go:174 level=info msg="partition offset committer committed offset" topic=cortex-dev-01-aggregations partition=5 handledMessageTime="2024-04-03 <_> +0000 UTC" <_> <_> +0000 UTC" <_> currentBuckets="unsupported value type" -<_> caller=batcher.go:155 level=info msg="batcher: processing aggregation result" result="user=9960, partitionID=7, <_> <_> <_> <_> <_> <_> <_> <_> <_> <_> +0000 UTC, <_>