Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/ingester: handle labels mapping to the same fast fingerprint. #1247

Merged
merged 8 commits into from
Dec 5, 2019
Merged
9 changes: 4 additions & 5 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/chunkenc"
loki_util "github.com/grafana/loki/pkg/util"
Expand Down Expand Up @@ -206,7 +205,7 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
return nil
}

func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, []client.LabelAdapter) {
func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint, immediate bool) ([]*chunkDesc, labels.Labels) {
instance.streamsMtx.Lock()
defer instance.streamsMtx.Unlock()

Expand Down Expand Up @@ -261,19 +260,19 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream) {

if len(stream.chunks) == 0 {
delete(instance.streams, stream.fp)
instance.index.Delete(client.FromLabelAdaptersToLabels(stream.labels), stream.fp)
instance.index.Delete(stream.labels, stream.fp)
instance.streamsRemovedTotal.Inc()
memoryStreams.Dec()
}
}

func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs []client.LabelAdapter, cs []*chunkDesc) error {
func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc) error {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
}

labelsBuilder := labels.NewBuilder(client.FromLabelAdaptersToLabels(labelPairs))
labelsBuilder := labels.NewBuilder(labelPairs)
labelsBuilder.Set(nameLabel, logsValue)
metric := labelsBuilder.Labels()

Expand Down
108 changes: 82 additions & 26 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,64 @@ func TestChunkFlushingIdle(t *testing.T) {
cfg.RetainPeriod = 500 * time.Millisecond

store, ing := newTestStore(t, cfg)
userIDs, testData := pushTestSamples(t, ing)
testData := pushTestSamples(t, ing)

// wait beyond idle time so samples flush
time.Sleep(cfg.MaxChunkIdle * 2)
store.checkData(t, userIDs, testData)
store.checkData(t, testData)
}

func TestChunkFlushingShutdown(t *testing.T) {
store, ing := newTestStore(t, defaultIngesterTestConfig(t))
userIDs, testData := pushTestSamples(t, ing)
testData := pushTestSamples(t, ing)
ing.Shutdown()
store.checkData(t, userIDs, testData)
store.checkData(t, testData)
}

func TestFlushingCollidingLabels(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushCheckPeriod = 20 * time.Millisecond
cfg.MaxChunkIdle = 100 * time.Millisecond
cfg.RetainPeriod = 500 * time.Millisecond

store, ing := newTestStore(t, cfg)
defer store.Stop()

const userID = "testUser"
ctx := user.InjectOrgID(context.Background(), userID)

// checkData only iterates between unix seconds 0 and 1000
now := time.Unix(0, 0)

req := &logproto.PushRequest{Streams: []*logproto.Stream{
// some colliding label sets
{Labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}.String(), Entries: entries(5, now.Add(time.Minute))},
{Labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}.String(), Entries: entries(5, now)},
{Labels: model.LabelSet{"app": "l", "uniq0": "1", "uniq1": "0"}.String(), Entries: entries(5, now.Add(time.Minute))},
{Labels: model.LabelSet{"app": "m", "uniq0": "0", "uniq1": "0"}.String(), Entries: entries(5, now)},
{Labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "0"}.String(), Entries: entries(5, now.Add(time.Minute))},
{Labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "0"}.String(), Entries: entries(5, now)},
}}

sort.Slice(req.Streams, func(i, j int) bool {
return req.Streams[i].Labels < req.Streams[j].Labels
})

_, err := ing.Push(ctx, req)
require.NoError(t, err)

// force flush
ing.Shutdown()

// verify that we get all the data back
store.checkData(t, map[string][]*logproto.Stream{userID: req.Streams})

// make sure all chunks have different fingerprint, even colliding ones.
chunkFingerprints := map[model.Fingerprint]bool{}
for _, c := range store.getChunksForUser(userID) {
require.False(t, chunkFingerprints[c.Fingerprint])
chunkFingerprints[c.Fingerprint] = true
}
}

type testStore struct {
Expand Down Expand Up @@ -103,12 +149,19 @@ func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
if err != nil {
return err
}
for _, chunk := range chunks {
for ix, chunk := range chunks {
for _, label := range chunk.Metric {
if label.Value == "" {
return fmt.Errorf("Chunk has blank label %q", label.Name)
}
}

// remove __name__ label
if chunk.Metric.Has("__name__") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

labelsBuilder := labels.NewBuilder(chunk.Metric)
labelsBuilder.Del("__name__")
chunks[ix].Metric = labelsBuilder.Labels()
}
}
s.chunks[userID] = append(s.chunks[userID], chunks...)
return nil
Expand All @@ -124,7 +177,7 @@ func (s *testStore) LazyQuery(ctx context.Context, req *logproto.QueryRequest) (

func (s *testStore) Stop() {}

func pushTestSamples(t *testing.T, ing logproto.PusherServer) ([]string, map[string][]*logproto.Stream) {
func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]*logproto.Stream {
userIDs := []string{"1", "2", "3"}

// Create test samples.
Expand All @@ -141,7 +194,7 @@ func pushTestSamples(t *testing.T, ing logproto.PusherServer) ([]string, map[str
})
require.NoError(t, err)
}
return userIDs, testData
return testData
}

func buildTestStreams(offset int) []*logproto.Stream {
Expand Down Expand Up @@ -170,27 +223,30 @@ func buildTestStreams(offset int) []*logproto.Stream {
}

// check that the store is holding data equivalent to what we expect
func (s *testStore) checkData(t *testing.T, userIDs []string, testData map[string][]*logproto.Stream) {
func (s *testStore) checkData(t *testing.T, testData map[string][]*logproto.Stream) {
for userID, expected := range testData {
streams := s.getStreamsForUser(t, userID)
require.Equal(t, expected, streams)
}
}

func (s *testStore) getStreamsForUser(t *testing.T, userID string) []*logproto.Stream {
var streams []*logproto.Stream
for _, c := range s.getChunksForUser(userID) {
lokiChunk := c.Data.(*chunkenc.Facade).LokiChunk()
streams = append(streams, buildStreamsFromChunk(t, c.Metric.String(), lokiChunk))
}
sort.Slice(streams, func(i, j int) bool {
return streams[i].Labels < streams[j].Labels
})
return streams
}

func (s *testStore) getChunksForUser(userID string) []chunk.Chunk {
s.mtx.Lock()
defer s.mtx.Unlock()
for _, userID := range userIDs {
chunks := s.chunks[userID]
streams := []*logproto.Stream{}
for _, chunk := range chunks {
lokiChunk := chunk.Data.(*chunkenc.Facade).LokiChunk()
if chunk.Metric.Has("__name__") {
labelsBuilder := labels.NewBuilder(chunk.Metric)
labelsBuilder.Del("__name__")
chunk.Metric = labelsBuilder.Labels()
}
labels := chunk.Metric.String()
streams = append(streams, buildStreamsFromChunk(t, labels, lokiChunk))
}
sort.Slice(streams, func(i, j int) bool {
return streams[i].Labels < streams[j].Labels
})
require.Equal(t, testData[userID], streams)
}

return s.chunks[userID]
}

func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream {
Expand Down
34 changes: 24 additions & 10 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ var (

type instance struct {
streamsMtx sync.RWMutex
streams map[model.Fingerprint]*stream
streams map[model.Fingerprint]*stream // we use 'mapped' fingerprints here.
index *index.InvertedIndex
mapper *fpMapper // using of mapper needs streamsMtx because it calls back

instanceID string

Expand All @@ -66,7 +67,7 @@ type instance struct {
}

func newInstance(instanceID string, blockSize int, limits *validation.Overrides) *instance {
return &instance{
i := &instance{
streams: map[model.Fingerprint]*stream{},
index: index.New(),
instanceID: instanceID,
Expand All @@ -78,6 +79,8 @@ func newInstance(instanceID string, blockSize int, limits *validation.Overrides)
tailers: map[uint32]*tailer{},
limits: limits,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
}

// consumeChunk manually adds a chunk that was received during ingester chunk
Expand All @@ -86,11 +89,13 @@ func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapte
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()

fp := client.FastFingerprint(labels)
rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)

stream, ok := i.streams[fp]
if !ok {
stream = newStream(fp, labels, i.blockSize)
i.index.Add(labels, fp)
sortedLabels := i.index.Add(labels, fp)
stream = newStream(fp, sortedLabels, i.blockSize)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
memoryStreams.Inc()
Expand Down Expand Up @@ -136,7 +141,8 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
}

func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, error) {
fp := client.FastFingerprint(labels)
rawFp := client.FastFingerprint(labels)
fp := i.mapper.mapFP(rawFp, labels)

stream, ok := i.streams[fp]
if ok {
Expand All @@ -146,8 +152,8 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
if len(i.streams) >= i.limits.MaxStreamsPerUser(i.instanceID) {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user streams limit (%d) exceeded", i.limits.MaxStreamsPerUser(i.instanceID))
}
stream = newStream(fp, labels, i.blockSize)
i.index.Add(labels, fp)
sortedLabels := i.index.Add(labels, fp)
stream = newStream(fp, sortedLabels, i.blockSize)
i.streams[fp] = stream
memoryStreams.Inc()
i.streamsCreatedTotal.Inc()
Expand All @@ -156,6 +162,15 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
return stream, nil
}

// Return labels associated with given fingerprint. Used by fingerprint mapper. Must hold streamsMtx.
func (i *instance) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
s := i.streams[fp]
if s == nil {
return nil
}
return s.labels
}

func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
expr, err := (logql.SelectParams{QueryRequest: req}).LogSelector()
if err != nil {
Expand Down Expand Up @@ -210,9 +225,8 @@ outer:
if !ok {
return nil, ErrStreamMissing
}
lbs := client.FromLabelAdaptersToLabels(stream.labels)
for _, filter := range filters {
if !filter.Matches(lbs.Get(filter.Name)) {
if !filter.Matches(stream.labels.Get(filter.Name)) {
continue outer
}
}
Expand Down
Loading