Skip to content

Commit

Permalink
lru cache logql.ParseLabels (#3092)
Browse files Browse the repository at this point in the history
* lru cache logql.ParseLabels

* lru cache logql.ParseLabels

* lru cache logql.ParseLabels

* lru cache logql.ParseLabels

* lru cache logql.ParseLabels
  • Loading branch information
liguozhong authored Jan 5, 2021
1 parent 5572366 commit bd321fb
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 9 deletions.
37 changes: 28 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/services"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"go.uber.org/atomic"

Expand Down Expand Up @@ -51,6 +52,8 @@ var (
Name: "distributor_lines_received_total",
Help: "The total number of lines received per tenant",
}, []string{"tenant"})

maxLabelCacheSize = 100000
)

// Config for a Distributor.
Expand Down Expand Up @@ -86,6 +89,7 @@ type Distributor struct {

// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
labelCache *lru.Cache
}

// New a distributor creates.
Expand Down Expand Up @@ -121,6 +125,10 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
ingestionRateStrategy = newLocalIngestionRateStrategy(overrides)
}

labelCache, err := lru.New(maxLabelCacheSize)
if err != nil {
return nil, err
}
d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
Expand All @@ -129,6 +137,7 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
validator: validator,
pool: cortex_distributor.NewPool(clientCfg.PoolConfig, ingestersRing, factory, cortex_util.Logger),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
labelCache: labelCache,
}

servs = append(servs, d.pool)
Expand Down Expand Up @@ -206,19 +215,11 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validatedSamplesCount := 0

for _, stream := range req.Streams {
ls, err := logql.ParseLabels(stream.Labels)
stream.Labels, err = d.parseStreamLabels(userID, stream.Labels, &stream)
if err != nil {
validationErr = httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err)
continue
}
// ensure labels are correctly sorted.
// todo(ctovena) we should lru cache this
stream.Labels = ls.String()
if err := d.validator.ValidateLabels(userID, ls, stream); err != nil {
validationErr = err
continue
}

entries := make([]logproto.Entry, 0, len(stream.Entries))
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(userID, stream.Labels, entry); err != nil {
Expand Down Expand Up @@ -356,3 +357,21 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.Ingester
func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

func (d *Distributor) parseStreamLabels(userID string, key string, stream *logproto.Stream) (string, error) {
labelVal, ok := d.labelCache.Get(key)
if ok {
return labelVal.(string), nil
}
ls, err := logql.ParseLabels(key)
if err != nil {
return "", httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err)
}
// ensure labels are correctly sorted.
if err := d.validator.ValidateLabels(userID, ls, *stream); err != nil {
return "", err
}
lsVal := ls.String()
d.labelCache.Add(key, lsVal)
return lsVal, nil
}
18 changes: 18 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,24 @@ func Test_SortLabelsOnPush(t *testing.T) {
require.Equal(t, `{a="b", buzz="f"}`, ingester.pushed[0].Streams[0].Labels)
}

func Benchmark_SortLabelsOnPush(b *testing.B) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.EnforceMetricName = false
ingester := &mockIngester{}
d := prepare(&testing.T{}, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck
request := makeWriteRequest(10, 10)
for n := 0; n < b.N; n++ {
stream := request.Streams[0]
stream.Labels = `{buzz="f", a="b"}`
_, err := d.parseStreamLabels("123", stream.Labels, &stream)
if err != nil {
panic("parseStreamLabels fail,err:" + err.Error())
}
}
}

func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
type testPush struct {
bytes int
Expand Down

0 comments on commit bd321fb

Please sign in to comment.