From bd321fb4f2f34198aa5487fbc7613139003e19b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=9B=BD=E5=BF=A0?= <249032432@qq.com> Date: Wed, 6 Jan 2021 01:20:56 +0800 Subject: [PATCH] lru cache logql.ParseLabels (#3092) * lru cache logql.ParseLabels * lru cache logql.ParseLabels * lru cache logql.ParseLabels * lru cache logql.ParseLabels * lru cache logql.ParseLabels --- pkg/distributor/distributor.go | 37 ++++++++++++++++++++++------- pkg/distributor/distributor_test.go | 18 ++++++++++++++ 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 755fda975c3c9..471ba3edc8a45 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -12,6 +12,7 @@ import ( cortex_util "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/services" + lru "github.com/hashicorp/golang-lru" "github.com/pkg/errors" "go.uber.org/atomic" @@ -51,6 +52,8 @@ var ( Name: "distributor_lines_received_total", Help: "The total number of lines received per tenant", }, []string{"tenant"}) + + maxLabelCacheSize = 100000 ) // Config for a Distributor. @@ -86,6 +89,7 @@ type Distributor struct { // Per-user rate limiter. ingestionRateLimiter *limiter.RateLimiter + labelCache *lru.Cache } // New a distributor creates. @@ -121,6 +125,10 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr ingestionRateStrategy = newLocalIngestionRateStrategy(overrides) } + labelCache, err := lru.New(maxLabelCacheSize) + if err != nil { + return nil, err + } d := Distributor{ cfg: cfg, clientCfg: clientCfg, @@ -129,6 +137,7 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr validator: validator, pool: cortex_distributor.NewPool(clientCfg.PoolConfig, ingestersRing, factory, cortex_util.Logger), ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), + labelCache: labelCache, } servs = append(servs, d.pool) @@ -206,19 +215,11 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log validatedSamplesCount := 0 for _, stream := range req.Streams { - ls, err := logql.ParseLabels(stream.Labels) + stream.Labels, err = d.parseStreamLabels(userID, stream.Labels, &stream) if err != nil { - validationErr = httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err) - continue - } - // ensure labels are correctly sorted. - // todo(ctovena) we should lru cache this - stream.Labels = ls.String() - if err := d.validator.ValidateLabels(userID, ls, stream); err != nil { validationErr = err continue } - entries := make([]logproto.Entry, 0, len(stream.Entries)) for _, entry := range stream.Entries { if err := d.validator.ValidateEntry(userID, stream.Labels, entry); err != nil { @@ -356,3 +357,21 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.Ingester func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil } + +func (d *Distributor) parseStreamLabels(userID string, key string, stream *logproto.Stream) (string, error) { + labelVal, ok := d.labelCache.Get(key) + if ok { + return labelVal.(string), nil + } + ls, err := logql.ParseLabels(key) + if err != nil { + return "", httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err) + } + // ensure labels are correctly sorted. + if err := d.validator.ValidateLabels(userID, ls, *stream); err != nil { + return "", err + } + lsVal := ls.String() + d.labelCache.Add(key, lsVal) + return lsVal, nil +} diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 2d25029fa9adc..a51c1619fecfa 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -112,6 +112,24 @@ func Test_SortLabelsOnPush(t *testing.T) { require.Equal(t, `{a="b", buzz="f"}`, ingester.pushed[0].Streams[0].Labels) } +func Benchmark_SortLabelsOnPush(b *testing.B) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.EnforceMetricName = false + ingester := &mockIngester{} + d := prepare(&testing.T{}, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) + defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + request := makeWriteRequest(10, 10) + for n := 0; n < b.N; n++ { + stream := request.Streams[0] + stream.Labels = `{buzz="f", a="b"}` + _, err := d.parseStreamLabels("123", stream.Labels, &stream) + if err != nil { + panic("parseStreamLabels fail,err:" + err.Error()) + } + } +} + func TestDistributor_PushIngestionRateLimiter(t *testing.T) { type testPush struct { bytes int