diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 72a7926da3328..4af2a96f63a9e 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -238,6 +238,15 @@ func (i *heapIterator) Next() bool { }) } + // shortcut if we have a single tuple. + if len(i.tuples) == 1 { + i.currEntry = i.tuples[0].Entry + i.currLabels = i.tuples[0].Labels() + i.requeue(i.tuples[0].EntryIterator, false) + i.tuples = i.tuples[:0] + return true + } + // Find in tuples which entry occurs most often which, due to quorum based // replication, is guaranteed to be the correct next entry. t := mostCommon(i.tuples) @@ -250,7 +259,10 @@ func (i *heapIterator) Next() bool { i.requeue(i.tuples[j].EntryIterator, true) continue } - i.stats.TotalDuplicates++ + // we count as duplicates only if the tuple is not the one (t) used to fill the current entry + if i.tuples[j] != t { + i.stats.TotalDuplicates++ + } i.requeue(i.tuples[j].EntryIterator, false) } i.tuples = i.tuples[:0] diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go index 756b54932d51c..d7b2d839f58db 100644 --- a/pkg/iter/iterator_test.go +++ b/pkg/iter/iterator_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/stats" ) const testSize = 10 @@ -378,3 +379,132 @@ func Test_PeekingIterator(t *testing.T) { t.Fatal("should not be ok.") } } + +func Test_DuplicateCount(t *testing.T) { + stream := &logproto.Stream{ + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo", + }, + { + Timestamp: time.Unix(0, 3), + Line: "foo", + }, + }, + } + + for _, test := range []struct { + name string + iters []EntryIterator + direction logproto.Direction + expectedDuplicates int64 + }{ + { + "empty b", + []EntryIterator{}, + logproto.BACKWARD, + 0, + }, + { + "empty f", + []EntryIterator{}, + logproto.FORWARD, + 0, + }, + { + "replication 2 b", + []EntryIterator{ + NewStreamIterator(stream), + NewStreamIterator(stream), + }, + logproto.BACKWARD, + 3, + }, + { + "replication 2 f", + []EntryIterator{ + NewStreamIterator(stream), + NewStreamIterator(stream), + }, + logproto.FORWARD, + 3, + }, + { + "replication 3 f", + []EntryIterator{ + NewStreamIterator(stream), + NewStreamIterator(stream), + NewStreamIterator(stream), + NewStreamIterator(&logproto.Stream{ + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 4), + Line: "bar", + }, + }}), + }, + logproto.FORWARD, + 6, + }, + { + "replication 3 b", + []EntryIterator{ + NewStreamIterator(stream), + NewStreamIterator(stream), + NewStreamIterator(stream), + NewStreamIterator(&logproto.Stream{ + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 4), + Line: "bar", + }, + }}), + }, + logproto.BACKWARD, + 6, + }, + { + "single f", + []EntryIterator{ + NewStreamIterator(&logproto.Stream{ + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 4), + Line: "bar", + }, + }}), + }, + logproto.FORWARD, + 0, + }, + { + "single b", + []EntryIterator{ + NewStreamIterator(&logproto.Stream{ + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 4), + Line: "bar", + }, + }}), + }, + logproto.BACKWARD, + 0, + }, + } { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + ctx = stats.NewContext(ctx) + it := NewHeapIterator(ctx, test.iters, test.direction) + defer it.Close() + for it.Next() { + } + require.Equal(t, test.expectedDuplicates, stats.GetChunkData(ctx).TotalDuplicates) + }) + } +}