Skip to content

Commit

Permalink
Cache label strings in ingester to improve memory usage. (#2926)
Browse files Browse the repository at this point in the history
* Add benchmark for base stats.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Cache labels string in ingester to improve memory usage.

This reduce allocations by a lot, but assume that labels comes in sorted which is not ensure in the distributors.
The distributors will take a performance hit, but that's easier to scale or improve later.(added some todos)

see benchmark:

```
❯ benchcmp before.txt after.txt
benchmark                     old ns/op     new ns/op     delta
Benchmark_PushInstance-16     43505         4950          -88.62%

benchmark                     old allocs     new allocs     delta
Benchmark_PushInstance-16     240            12             -95.00%

benchmark                     old bytes     new bytes     delta
Benchmark_PushInstance-16     42568         1787          -95.80%
```

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* 🤦 wrong hashing.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Nov 13, 2020
1 parent a5cc650 commit 2303f1b
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 43 deletions.
11 changes: 10 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

cortex_distributor "github.com/cortexproject/cortex/pkg/distributor"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
cortex_util "github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -205,7 +206,15 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validatedSamplesCount := 0

for _, stream := range req.Streams {
if err := d.validator.ValidateLabels(userID, stream); err != nil {
ls, err := util.ToClientLabels(stream.Labels)
if err != nil {
validationErr = httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err)
continue
}
// ensure labels are correctly sorted.
// todo(ctovena) we should lru cache this
stream.Labels = cortex_client.FromLabelAdaptersToLabels(ls).String()
if err := d.validator.ValidateLabels(userID, ls, stream); err != nil {
validationErr = err
continue
}
Expand Down
31 changes: 26 additions & 5 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestDistributor(t *testing.T) {
limits.IngestionBurstSizeMB = ingestionRateLimit
limits.MaxLineSize = fe.ByteSize(tc.maxLineSize)

d := prepare(t, limits, nil)
d := prepare(t, limits, nil, nil)
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck

request := makeWriteRequest(tc.lines, 10)
Expand All @@ -97,6 +97,21 @@ func TestDistributor(t *testing.T) {
}
}

func Test_SortLabelsOnPush(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.EnforceMetricName = false
ingester := &mockIngester{}
d := prepare(t, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck

request := makeWriteRequest(10, 10)
request.Streams[0].Labels = `{buzz="f", a="b"}`
_, err := d.Push(ctx, request)
require.NoError(t, err)
require.Equal(t, `{a="b", buzz="f"}`, ingester.pushed[0].Streams[0].Labels)
}

func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
type testPush struct {
bytes int
Expand Down Expand Up @@ -165,7 +180,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
// Start all expected distributors
distributors := make([]*Distributor, testData.distributors)
for i := 0; i < testData.distributors; i++ {
distributors[i] = prepare(t, limits, kvStore)
distributors[i] = prepare(t, limits, kvStore, nil)
defer services.StopAndAwaitTerminated(context.Background(), distributors[i]) //nolint:errcheck
}

Expand Down Expand Up @@ -211,7 +226,7 @@ func loopbackInterfaceName() (string, error) {
return "", fmt.Errorf("can't retrieve loopback interface name")
}

func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client) *Distributor {
func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client, factory func(addr string) (ring_client.PoolClient, error)) *Distributor {
var (
distributorConfig Config
clientConfig client.Config
Expand Down Expand Up @@ -243,8 +258,11 @@ func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client) *Distri
distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int())
distributorConfig.DistributorRing.KVStore.Mock = kvStore
distributorConfig.DistributorRing.InstanceInterfaceNames = []string{loopbackName}
distributorConfig.factory = func(addr string) (ring_client.PoolClient, error) {
return ingesters[addr], nil
distributorConfig.factory = factory
if factory == nil {
distributorConfig.factory = func(addr string) (ring_client.PoolClient, error) {
return ingesters[addr], nil
}
}

d, err := New(distributorConfig, clientConfig, ingestersRing, overrides, nil)
Expand Down Expand Up @@ -279,9 +297,12 @@ func makeWriteRequest(lines int, size int) *logproto.PushRequest {
type mockIngester struct {
grpc_health_v1.HealthClient
logproto.PusherClient

pushed []*logproto.PushRequest
}

func (i *mockIngester) Push(ctx context.Context, in *logproto.PushRequest, opts ...grpc.CallOption) (*logproto.PushResponse, error) {
i.pushed = append(i.pushed, in)
return nil, nil
}

Expand Down
12 changes: 1 addition & 11 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -53,16 +52,7 @@ func (v Validator) ValidateEntry(userID string, labels string, entry logproto.En
}

// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(userID string, stream logproto.Stream) error {
ls, err := util.ToClientLabels(stream.Labels)
if err != nil {
// I wish we didn't return httpgrpc errors here as it seems
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
return httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: %v", err)
}

func (v Validator) ValidateLabels(userID string, ls []cortex_client.LabelAdapter, stream logproto.Stream) error {
numLabelNames := len(ls)
if numLabelNames > v.MaxLabelNamesPerSeries(userID) {
validation.DiscardedSamples.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Inc()
Expand Down
12 changes: 11 additions & 1 deletion pkg/distributor/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/stretchr/testify/assert"
"github.com/weaveworks/common/httpgrpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -149,8 +151,16 @@ func TestValidator_ValidateLabels(t *testing.T) {
v, err := NewValidator(o)
assert.NoError(t, err)

err = v.ValidateLabels(tt.userID, logproto.Stream{Labels: tt.labels})
err = v.ValidateLabels(tt.userID, mustParseLabels(tt.labels), logproto.Stream{Labels: tt.labels})
assert.Equal(t, tt.expected, err)
})
}
}

func mustParseLabels(s string) []client.LabelAdapter {
labels, err := util.ToClientLabels(s)
if err != nil {
panic(err)
}
return labels
}
5 changes: 3 additions & 2 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint
instance.streamsMtx.Lock()
defer instance.streamsMtx.Unlock()

stream, ok := instance.streams[fp]
stream, ok := instance.streamsByFP[fp]
if !ok {
return nil, nil
}
Expand Down Expand Up @@ -300,7 +300,8 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {
memoryChunks.Sub(float64(prevNumChunks - len(stream.chunks)))

if len(stream.chunks) == 0 {
delete(instance.streams, stream.fp)
delete(instance.streamsByFP, stream.fp)
delete(instance.streams, stream.labelsString)
instance.index.Delete(stream.labels, stream.fp)
instance.streamsRemovedTotal.Inc()
memoryStreams.WithLabelValues(instance.instanceID).Dec()
Expand Down
59 changes: 36 additions & 23 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@ var (
type instance struct {
cfg *Config
streamsMtx sync.RWMutex
streams map[model.Fingerprint]*stream // we use 'mapped' fingerprints here.
index *index.InvertedIndex
mapper *fpMapper // using of mapper needs streamsMtx because it calls back

buf []byte // buffer used to compute fps.
streams map[string]*stream
streamsByFP map[model.Fingerprint]*stream

index *index.InvertedIndex
mapper *fpMapper // using of mapper needs streamsMtx because it calls back

instanceID string

Expand All @@ -81,10 +85,12 @@ type instance struct {

func newInstance(cfg *Config, instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance {
i := &instance{
cfg: cfg,
streams: map[model.Fingerprint]*stream{},
index: index.New(),
instanceID: instanceID,
cfg: cfg,
streams: map[string]*stream{},
streamsByFP: map[model.Fingerprint]*stream{},
buf: make([]byte, 0, 1024),
index: index.New(),
instanceID: instanceID,

streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),
Expand All @@ -106,14 +112,14 @@ func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapte
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()

rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)
fp := i.getHashForLabels(labels)

stream, ok := i.streams[fp]
stream, ok := i.streamsByFP[fp]
if !ok {
sortedLabels := i.index.Add(labels, fp)
stream = newStream(i.cfg, fp, sortedLabels, i.factory)
i.streams[fp] = stream
i.streamsByFP[fp] = stream
i.streams[stream.labelsString] = stream
i.streamsCreatedTotal.Inc()
memoryStreams.WithLabelValues(i.instanceID).Inc()
i.addTailersToNewStream(stream)
Expand Down Expand Up @@ -153,19 +159,12 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
}

func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, error) {
labels, err := util.ToClientLabels(pushReqStream.Labels)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)

stream, ok := i.streams[fp]
stream, ok := i.streams[pushReqStream.Labels]
if ok {
return stream, nil
}

err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
err := i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
if err != nil {
validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
Expand All @@ -176,19 +175,33 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, er
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg())
}

labels, err := util.ToClientLabels(pushReqStream.Labels)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(labels, fp)
stream = newStream(i.cfg, fp, sortedLabels, i.factory)
i.streams[fp] = stream
i.streams[pushReqStream.Labels] = stream
i.streamsByFP[fp] = stream

memoryStreams.WithLabelValues(i.instanceID).Inc()
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)

return stream, nil
}

func (i *instance) getHashForLabels(labels []client.LabelAdapter) model.Fingerprint {
var fp uint64
lbsModel := client.FromLabelAdaptersToLabels(labels)
fp, i.buf = lbsModel.HashWithoutLabels(i.buf, []string(nil)...)
return i.mapper.mapFP(model.Fingerprint(fp), labels)
}

// Return labels associated with given fingerprint. Used by fingerprint mapper. Must hold streamsMtx.
func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels {
s := i.streams[fp]
s := i.streamsByFP[fp]
if s == nil {
return nil
}
Expand Down Expand Up @@ -361,7 +374,7 @@ func (i *instance) forMatchingStreams(

outer:
for _, streamID := range ids {
stream, ok := i.streams[streamID]
stream, ok := i.streamsByFP[streamID]
if !ok {
return ErrStreamMissing
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,43 @@ func makeRandomLabels() string {
}
return ls.Labels().String()
}

func Benchmark_PushInstance(b *testing.B) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

i := newInstance(&Config{}, "test", defaultFactory, limiter, 0, 0)
ctx := context.Background()

for n := 0; n < b.N; n++ {
_ = i.Push(ctx, &logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{cpu="10",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"}`,
Entries: []logproto.Entry{
{Timestamp: time.Now(), Line: "1"},
{Timestamp: time.Now(), Line: "2"},
{Timestamp: time.Now(), Line: "3"},
},
},
{
Labels: `{cpu="35",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"}`,
Entries: []logproto.Entry{
{Timestamp: time.Now(), Line: "1"},
{Timestamp: time.Now(), Line: "2"},
{Timestamp: time.Now(), Line: "3"},
},
},
{
Labels: `{cpu="89",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"}`,
Entries: []logproto.Entry{
{Timestamp: time.Now(), Line: "1"},
{Timestamp: time.Now(), Line: "2"},
{Timestamp: time.Now(), Line: "3"},
},
},
},
})
}
}

0 comments on commit 2303f1b

Please sign in to comment.