Skip to content

Commit

Permalink
feat(dataobj): Implement SelectLogs using a topk min-heap (#16261)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Feb 13, 2025
1 parent 896bac0 commit 8f69f0d
Show file tree
Hide file tree
Showing 4 changed files with 623 additions and 9 deletions.
250 changes: 249 additions & 1 deletion pkg/dataobj/querier/iter.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package querier

import (
"container/heap"
"context"
"io"
"sort"
"sync"

"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logql/syntax"
)
Expand All @@ -25,8 +28,253 @@ var (
return &samples
},
}
entryWithLabelsPool = sync.Pool{
New: func() interface{} {
entries := make([]entryWithLabels, 0, 1024)
return &entries
},
}
)

type entryWithLabels struct {
Labels string
StreamHash uint64
Entry logproto.Entry
}

// newEntryIterator creates a new EntryIterator for the given context, streams, and reader.
// It reads records from the reader and adds them to the topk heap based on the direction.
// The topk heap is used to maintain the top k entries based on the direction.
// The final result is returned as a slice of entries.
func newEntryIterator(ctx context.Context,
streams map[int64]dataobj.Stream,
reader *dataobj.LogsReader,
req logql.SelectLogParams,
) (iter.EntryIterator, error) {
bufPtr := recordsPool.Get().(*[]dataobj.Record)
defer recordsPool.Put(bufPtr)
buf := *bufPtr

selector, err := req.LogSelector()
if err != nil {
return nil, err
}
pipeline, err := selector.Pipeline()
if err != nil {
return nil, err
}

var (
prevStreamID int64 = -1
streamExtractor log.StreamPipeline
streamHash uint64
top = newTopK(int(req.Limit), req.Direction)
)

for {
n, err := reader.Read(ctx, buf)
if err != nil && err != io.EOF {
return nil, err
}

if n == 0 {
break
}

for _, record := range buf[:n] {
stream, ok := streams[record.StreamID]
if !ok {
continue
}
if prevStreamID != record.StreamID {
streamExtractor = pipeline.ForStream(stream.Labels)
streamHash = streamExtractor.BaseLabels().Hash()
prevStreamID = record.StreamID
}

timestamp := record.Timestamp.UnixNano()
line, parsedLabels, ok := streamExtractor.ProcessString(timestamp, record.Line, record.Metadata...)
if !ok {
continue
}
var metadata []logproto.LabelAdapter
if len(record.Metadata) > 0 {
metadata = logproto.FromLabelsToLabelAdapters(record.Metadata)
}
top.Add(entryWithLabels{
Labels: parsedLabels.String(),
StreamHash: streamHash,
Entry: logproto.Entry{
Timestamp: record.Timestamp,
Line: line,
StructuredMetadata: metadata,
},
})
}
}
return top.Iterator(), nil
}

// entryHeap implements a min-heap of entries based on a custom less function.
// The less function determines the ordering based on the direction (FORWARD/BACKWARD).
// For FORWARD direction:
// - When comparing timestamps: entry.Timestamp.After(b) means 'a' is "less" than 'b'
// - Example: [t3, t2, t1] where t3 is most recent, t3 will be at the root (index 0)
//
// For BACKWARD direction:
// - When comparing timestamps: entry.Timestamp.Before(b) means 'a' is "less" than 'b'
// - Example: [t1, t2, t3] where t1 is oldest, t1 will be at the root (index 0)
//
// In both cases:
// - When timestamps are equal, we use labels as a tiebreaker
// - The root of the heap (index 0) contains the entry we want to evict first
type entryHeap struct {
less func(a, b entryWithLabels) bool
entries []entryWithLabels
}

func (h *entryHeap) Push(x any) {
h.entries = append(h.entries, x.(entryWithLabels))
}

func (h *entryHeap) Pop() any {
old := h.entries
n := len(old)
x := old[n-1]
h.entries = old[:n-1]
return x
}

func (h *entryHeap) Len() int {
return len(h.entries)
}

func (h *entryHeap) Less(i, j int) bool {
return h.less(h.entries[i], h.entries[j])
}

func (h *entryHeap) Swap(i, j int) {
h.entries[i], h.entries[j] = h.entries[j], h.entries[i]
}

func lessFn(direction logproto.Direction) func(a, b entryWithLabels) bool {
switch direction {
case logproto.FORWARD:
return func(a, b entryWithLabels) bool {
if a.Entry.Timestamp.Equal(b.Entry.Timestamp) {
return a.Labels < b.Labels
}
return a.Entry.Timestamp.After(b.Entry.Timestamp)
}
case logproto.BACKWARD:
return func(a, b entryWithLabels) bool {
if a.Entry.Timestamp.Equal(b.Entry.Timestamp) {
return a.Labels < b.Labels
}
return a.Entry.Timestamp.Before(b.Entry.Timestamp)
}
default:
panic("invalid direction")
}
}

// topk maintains a min-heap of the k most relevant entries.
// The heap is ordered by timestamp (and labels as tiebreaker) based on the direction:
// - For FORWARD: keeps k oldest entries by evicting newest entries first
// Example with k=3: If entries arrive as [t1,t2,t3,t4,t5], heap will contain [t1,t2,t3]
// - For BACKWARD: keeps k newest entries by evicting oldest entries first
// Example with k=3: If entries arrive as [t1,t2,t3,t4,t5], heap will contain [t3,t4,t5]
type topk struct {
k int
minHeap entryHeap
}

func newTopK(k int, direction logproto.Direction) *topk {
if k <= 0 {
panic("k must be greater than 0")
}
entries := entryWithLabelsPool.Get().(*[]entryWithLabels)
return &topk{
k: k,
minHeap: entryHeap{
less: lessFn(direction),
entries: *entries,
},
}
}

// Add adds a new entry to the topk heap.
// If the heap has less than k entries, the entry is added directly.
// Otherwise, if the new entry should be evicted before the root (index 0),
// it is discarded. If not, the root is popped (discarded) and the new entry is pushed.
//
// For FORWARD direction:
// - Root contains newest entry (to be evicted first)
// - New entries that are newer than root are discarded
// Example: With k=3 and heap=[t1,t2,t3], a new entry t4 is discarded
//
// For BACKWARD direction:
// - Root contains oldest entry (to be evicted first)
// - New entries that are older than root are discarded
// Example: With k=3 and heap=[t3,t4,t5], a new entry t2 is discarded
func (t *topk) Add(r entryWithLabels) {
if t.minHeap.Len() < t.k {
heap.Push(&t.minHeap, r)
return
}
if t.minHeap.less(t.minHeap.entries[0], r) {
_ = heap.Pop(&t.minHeap)
heap.Push(&t.minHeap, r)
}
}

type sliceIterator struct {
entries []entryWithLabels
curr entryWithLabels
}

func (t *topk) Iterator() iter.EntryIterator {
// We swap i and j in the less comparison to reverse the ordering from the minHeap.
// The minHeap is ordered such that the entry to evict is at index 0.
// For FORWARD: newest entries are evicted first, so we want oldest entries first in the final slice
// For BACKWARD: oldest entries are evicted first, so we want newest entries first in the final slice
// By swapping i and j, we effectively reverse the minHeap ordering to get the desired final ordering
sort.Slice(t.minHeap.entries, func(i, j int) bool {
return t.minHeap.less(t.minHeap.entries[j], t.minHeap.entries[i])
})
return &sliceIterator{entries: t.minHeap.entries}
}

func (s *sliceIterator) Next() bool {
if len(s.entries) == 0 {
return false
}
s.curr = s.entries[0]
s.entries = s.entries[1:]
return true
}

func (s *sliceIterator) At() logproto.Entry {
return s.curr.Entry
}

func (s *sliceIterator) Err() error {
return nil
}

func (s *sliceIterator) Labels() string {
return s.curr.Labels
}

func (s *sliceIterator) StreamHash() uint64 {
return s.curr.StreamHash
}

func (s *sliceIterator) Close() error {
entryWithLabelsPool.Put(&s.entries)
return nil
}

func newSampleIterator(ctx context.Context,
streams map[int64]dataobj.Stream,
extractor syntax.SampleExtractor,
Expand Down Expand Up @@ -91,7 +339,7 @@ func newSampleIterator(ctx context.Context,
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: timestamp,
Value: value,
Hash: 0, // todo write a test to verify that we should not try to dedupe when we don't have a hash
Hash: 0,
})
}
}
Expand Down
Loading

0 comments on commit 8f69f0d

Please sign in to comment.