Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up EntrySortIterator by 20%. #5318

Merged
merged 14 commits into from
Feb 9, 2022
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ require (
require (
github.com/google/renameio/v2 v2.0.0
github.com/mattn/go-ieproxy v0.0.1
github.com/psilva261/timsort/v2 v2.0.0
github.com/xdg-go/scram v1.0.2
gopkg.in/Graylog2/go-gelf.v2 v2.0.0-20191017102106-1550ee647df0
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,8 @@ github.com/prometheus/prometheus v0.0.0-20200609090129-a6600f564e3c/go.mod h1:S5
github.com/prometheus/prometheus v1.8.2-0.20211119115433-692a54649ed7 h1:8rwRA5BKEAWtawaP4ozroDm3xMDrp9POAoT3HkQ3ZHw=
github.com/prometheus/prometheus v1.8.2-0.20211119115433-692a54649ed7/go.mod h1:outfylaI89+D5IO87TRPRmxfucIobTO3Rb0l2TKqpj0=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/psilva261/timsort/v2 v2.0.0 h1:w7ePtcPItveXl/xu4Nae6S6p9/bTfo+swTXixfU7hBg=
github.com/psilva261/timsort/v2 v2.0.0/go.mod h1:NB+lnymkBaIJSJ376BB0n0LsdmqpWeG+w3KnzJ1ELqM=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
Expand Down
108 changes: 79 additions & 29 deletions pkg/iter/entry_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"container/heap"
"context"
"io"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -323,12 +324,13 @@ func (i *mergeEntryIterator) Len() int {
}

type entrySortIterator struct {
heap interface {
heap.Interface
Peek() EntryIterator
}
//heap interface {
// heap.Interface
// Peek() EntryIterator
//}
is []EntryIterator
prefetched bool
dir logproto.Direction

currEntry entryWithLabels
errs []error
Expand All @@ -344,28 +346,61 @@ func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) Entr
if len(is) == 1 {
return is[0]
}
result := &entrySortIterator{is: is}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorMaxHeap{iteratorHeap: make([]EntryIterator, 0, len(is))}
case logproto.FORWARD:
result.heap = &iteratorMinHeap{iteratorHeap: make([]EntryIterator, 0, len(is))}
default:
panic("bad direction")
}

result := &entrySortIterator{is: is, dir: direction}
return result
}

// init initialize the underlaying heap
func (it *entrySortIterator) less(i, j int) bool {
t1, t2 := it.is[i].Entry().Timestamp, it.is[j].Entry().Timestamp
if it.dir == logproto.BACKWARD {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
t2, t1 = t1, t2

}

un1 := t1.UnixNano()
un2 := t2.UnixNano()

switch {
case un1 < un2:
return true
case un1 > un2:
return false
default: // un1 == un2:
return it.is[i].Labels() < it.is[j].Labels()
}
}

func (it *entrySortIterator) lessThan(t1 time.Time, l1 string, j int) bool {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
t2 := it.is[j].Entry().Timestamp
if it.dir == logproto.BACKWARD {
t2, t1 = t1, t2
}
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

un1 := t1.UnixNano()
un2 := t2.UnixNano()

switch {
case un1 < un2:
return true
case un1 > un2:
return false
default: // un1 == un2:
return l1 < it.is[j].Labels()
}
}

// init throws out empty iterators and sorts them.
func (i *entrySortIterator) init() {
if i.prefetched {
return
}

i.prefetched = true
tmp := make([]EntryIterator, 0, len(i.is))
for _, it := range i.is {
if it.Next() {
i.heap.Push(it)
tmp = append(tmp, it)
continue
}

Expand All @@ -374,35 +409,55 @@ func (i *entrySortIterator) init() {
}
util.LogError("closing iterator", it.Close)
}
heap.Init(i.heap)
i.is = tmp
sort.Slice(i.is, i.less)
}

// We can now clear the list of input iterators to merge, given they have all
// been processed and the non empty ones have been pushed to the heap
i.is = nil
func (i *entrySortIterator) fix() {
t1 := i.is[0].Entry().Timestamp
l1 := i.is[0].Labels()
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

// shortcut
if len(i.is) <= 1 || i.lessThan(t1, l1, 1) {
return
}

// First element is out of place. So we reposition it.
index := sort.Search(len(i.is), func(in int) bool { return i.lessThan(t1, l1, in) })

head := i.is[0]
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
if index == len(i.is) {
i.is = append(i.is[1:], head)
} else {
i.is = append(i.is[1:index+1], i.is[index:]...)
i.is[index] = head
}
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
}

func (i *entrySortIterator) Next() bool {
i.init()

if i.heap.Len() == 0 {
if len(i.is) == 0 {
return false
}

next := i.heap.Peek()
next := i.is[0]
i.currEntry.entry = next.Entry()
i.currEntry.labels = next.Labels()
// if the top iterator is empty, we remove it.
if !next.Next() {
heap.Pop(i.heap)
i.is = i.is[1:]
if err := next.Error(); err != nil {
i.errs = append(i.errs, err)
}
util.LogError("closing iterator", next.Close)
return true
}
if i.heap.Len() > 1 {
heap.Fix(i.heap, 0)

if len(i.is) > 1 {
i.fix()
}

return true
}

Expand All @@ -426,11 +481,6 @@ func (i *entrySortIterator) Error() error {
}

func (i *entrySortIterator) Close() error {
for i.heap.Len() > 0 {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
if err := i.heap.Pop().(EntryIterator).Close(); err != nil {
return err
}
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/iter/entry_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ func BenchmarkSortIterator(b *testing.B) {
})

b.Run("sort", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
Expand Down
3 changes: 3 additions & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,9 @@ github.com/prometheus/prometheus/util/teststorage
github.com/prometheus/prometheus/util/testutil
github.com/prometheus/prometheus/util/treecache
github.com/prometheus/prometheus/web/api/v1
# github.com/psilva261/timsort/v2 v2.0.0
## explicit; go 1.13
github.com/psilva261/timsort/v2
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
# github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
## explicit
github.com/rcrowley/go-metrics
Expand Down