From 0c6f3f958c7b479940d8b1bf0fa1dede35561330 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 4 May 2020 12:35:28 -0400 Subject: [PATCH 1/2] wi Signed-off-by: Cyril Tovena --- pkg/chunkenc/memchunk.go | 2 +- pkg/chunkenc/memchunk_test.go | 19 +++++++++ pkg/iter/iterator.go | 75 +++++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 1 deletion(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index e46fa12191d69..8cfd0b44e8102 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -493,7 +493,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi return iterForward, nil } - return iter.NewReversedIter(iterForward, 0, false) + return iter.NewEntryReversedIter(iterForward) } func (b block) iterator(ctx context.Context, pool ReaderPool, filter logql.LineFilter) iter.EntryIterator { diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 50f7d27e46787..72b1961d41e63 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -572,6 +572,25 @@ func BenchmarkRead(b *testing.B) { } } +func BenchmarkBackwardIterator(b *testing.B) { + b.ReportAllocs() + c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize) + _ = fillChunk(c) + b.ResetTimer() + for n := 0; n < b.N; n++ { + iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, nil) + if err != nil { + panic(err) + } + for iterator.Next() { + _ = iterator.Entry() + } + if err := iterator.Close(); err != nil { + b.Fatal(err) + } + } +} + func TestGenerateDataSize(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 4af2a96f63a9e..ff49d6781ad2a 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "sync" "time" "github.com/grafana/loki/pkg/helpers" @@ -586,6 +587,80 @@ func (i *reverseIterator) Close() error { return nil } +var entryPool = sync.Pool{ + New: func() interface{} { + return &entries{ + a: make([]logproto.Entry, 0, 1024), + } + }, +} + +type entries struct { + a []logproto.Entry +} + +type reverseEntryIterator struct { + iter EntryIterator + cur logproto.Entry + entries *entries + + loaded bool +} + +// NewEntryReversedIter returns an iterator which loads all or up to N entries +// of an existing iterator, and then iterates over them backward. +// Preload entries when they are being queried with a timeout. +func NewEntryReversedIter(it EntryIterator) (EntryIterator, error) { + iter, err := &reverseEntryIterator{ + iter: it, + entries: entryPool.Get().(*entries), + }, it.Error() + + if err != nil { + return nil, err + } + + return iter, nil +} + +func (i *reverseEntryIterator) load() { + if !i.loaded { + i.loaded = true + for i.iter.Next() { + i.entries.a = append(i.entries.a, i.iter.Entry()) + } + i.iter.Close() + } +} + +func (i *reverseEntryIterator) Next() bool { + i.load() + if len(i.entries.a) == 0 { + entryPool.Put(i.entries) + i.entries = nil + return false + } + i.cur, i.entries.a = i.entries.a[len(i.entries.a)-1], i.entries.a[:len(i.entries.a)-1] + return true +} + +func (i *reverseEntryIterator) Entry() logproto.Entry { + return i.cur +} + +func (i *reverseEntryIterator) Labels() string { + return "" +} + +func (i *reverseEntryIterator) Error() error { return nil } + +func (i *reverseEntryIterator) Close() error { + if !i.loaded { + return i.iter.Close() + } + return nil +} + // ReadBatch reads a set of entries off an iterator. func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, uint32, error) { streams := map[string]*logproto.Stream{} From 4b94441a684268523552671f247197fd7e9781ac Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 5 May 2020 10:34:18 -0400 Subject: [PATCH 2/2] Add tests. Signed-off-by: Cyril Tovena --- pkg/iter/iterator.go | 40 +++++++++++++++++++++------------------ pkg/iter/iterator_test.go | 19 ++++++++++++++++++- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index ff49d6781ad2a..c20734f540faa 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -587,33 +587,32 @@ func (i *reverseIterator) Close() error { return nil } -var entryPool = sync.Pool{ +var entryBufferPool = sync.Pool{ New: func() interface{} { - return &entries{ - a: make([]logproto.Entry, 0, 1024), + return &entryBuffer{ + entries: make([]logproto.Entry, 0, 1024), } }, } -type entries struct { - a []logproto.Entry +type entryBuffer struct { + entries []logproto.Entry } type reverseEntryIterator struct { - iter EntryIterator - cur logproto.Entry - entries *entries + iter EntryIterator + cur logproto.Entry + buf *entryBuffer loaded bool } -// NewEntryReversedIter returns an iterator which loads all or up to N entries -// of an existing iterator, and then iterates over them backward. -// Preload entries when they are being queried with a timeout. +// NewEntryReversedIter returns an iterator which loads all entries and iterates backward. +// The labels of entries is always empty. func NewEntryReversedIter(it EntryIterator) (EntryIterator, error) { iter, err := &reverseEntryIterator{ - iter: it, - entries: entryPool.Get().(*entries), + iter: it, + buf: entryBufferPool.Get().(*entryBuffer), }, it.Error() if err != nil { @@ -627,7 +626,7 @@ func (i *reverseEntryIterator) load() { if !i.loaded { i.loaded = true for i.iter.Next() { - i.entries.a = append(i.entries.a, i.iter.Entry()) + i.buf.entries = append(i.buf.entries, i.iter.Entry()) } i.iter.Close() } @@ -635,12 +634,12 @@ func (i *reverseEntryIterator) load() { func (i *reverseEntryIterator) Next() bool { i.load() - if len(i.entries.a) == 0 { - entryPool.Put(i.entries) - i.entries = nil + if len(i.buf.entries) == 0 { + entryBufferPool.Put(i.buf) + i.buf.entries = nil return false } - i.cur, i.entries.a = i.entries.a[len(i.entries.a)-1], i.entries.a[:len(i.entries.a)-1] + i.cur, i.buf.entries = i.buf.entries[len(i.buf.entries)-1], i.buf.entries[:len(i.buf.entries)-1] return true } @@ -655,6 +654,11 @@ func (i *reverseEntryIterator) Labels() string { func (i *reverseEntryIterator) Error() error { return nil } func (i *reverseEntryIterator) Close() error { + if i.buf.entries != nil { + i.buf.entries = i.buf.entries[:0] + entryBufferPool.Put(i.buf) + i.buf.entries = nil + } if !i.loaded { return i.iter.Close() } diff --git a/pkg/iter/iterator_test.go b/pkg/iter/iterator_test.go index d7b2d839f58db..0138d65f391ac 100644 --- a/pkg/iter/iterator_test.go +++ b/pkg/iter/iterator_test.go @@ -278,7 +278,7 @@ func TestInsert(t *testing.T) { })) } -func TestReverseEntryIterator(t *testing.T) { +func TestReverseIterator(t *testing.T) { itr1 := mkStreamIterator(inverse(offset(testSize, identity)), defaultLabels) itr2 := mkStreamIterator(inverse(offset(testSize, identity)), "{foobar: \"bazbar\"}") @@ -300,6 +300,23 @@ func TestReverseEntryIterator(t *testing.T) { assert.NoError(t, reversedIter.Close()) } +func TestReverseEntryIterator(t *testing.T) { + itr1 := mkStreamIterator(identity, defaultLabels) + + reversedIter, err := NewEntryReversedIter(itr1) + require.NoError(t, err) + + for i := int64(testSize - 1); i >= 0; i-- { + assert.Equal(t, true, reversedIter.Next()) + assert.Equal(t, identity(i), reversedIter.Entry(), fmt.Sprintln("iteration", i)) + assert.Equal(t, reversedIter.Labels(), "") + } + + assert.Equal(t, false, reversedIter.Next()) + assert.Equal(t, nil, reversedIter.Error()) + assert.NoError(t, reversedIter.Close()) +} + func TestReverseEntryIteratorUnlimited(t *testing.T) { itr1 := mkStreamIterator(offset(testSize, identity), defaultLabels) itr2 := mkStreamIterator(offset(testSize, identity), "{foobar: \"bazbar\"}")