Skip to content

Commit

Permalink
feat(dataob): Implement SelectSamples (#16251)
Browse files Browse the repository at this point in the history
Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
cyriltovena and owen-d authored Feb 13, 2025
1 parent 1b1471d commit 13a6c33
Show file tree
Hide file tree
Showing 7 changed files with 1,000 additions and 375 deletions.
137 changes: 137 additions & 0 deletions pkg/dataobj/querier/iter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package querier

import (
"context"
"io"
"sync"

"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logql/syntax"
)

var (
recordsPool = sync.Pool{
New: func() interface{} {
records := make([]dataobj.Record, 1024)
return &records
},
}
samplesPool = sync.Pool{
New: func() interface{} {
samples := make([]logproto.Sample, 0, 1024)
return &samples
},
}
)

func newSampleIterator(ctx context.Context,
streams map[int64]dataobj.Stream,
extractor syntax.SampleExtractor,
reader *dataobj.LogsReader,
) (iter.SampleIterator, error) {
bufPtr := recordsPool.Get().(*[]dataobj.Record)
defer recordsPool.Put(bufPtr)
buf := *bufPtr

var (
iterators []iter.SampleIterator
prevStreamID int64 = -1
streamExtractor log.StreamSampleExtractor
series = map[string]*logproto.Series{}
streamHash uint64
)

for {
n, err := reader.Read(ctx, buf)
if err != nil && err != io.EOF {
return nil, err
}

// Handle end of stream or empty read
if n == 0 {
iterators = appendIteratorFromSeries(iterators, series)
break
}

// Process records in the current batch
for _, record := range buf[:n] {
stream, ok := streams[record.StreamID]
if !ok {
continue
}

// Handle stream transition
if prevStreamID != record.StreamID {
iterators = appendIteratorFromSeries(iterators, series)
clear(series)
streamExtractor = extractor.ForStream(stream.Labels)
streamHash = streamExtractor.BaseLabels().Hash()
prevStreamID = record.StreamID
}

// Process the record
timestamp := record.Timestamp.UnixNano()
value, parsedLabels, ok := streamExtractor.ProcessString(timestamp, record.Line, record.Metadata...)
if !ok {
continue
}

// Get or create series for the parsed labels
labelString := parsedLabels.String()
s, exists := series[labelString]
if !exists {
s = createNewSeries(labelString, streamHash)
series[labelString] = s
}

// Add sample to the series
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: timestamp,
Value: value,
Hash: 0, // todo write a test to verify that we should not try to dedupe when we don't have a hash
})
}
}

if len(iterators) == 0 {
return iter.NoopSampleIterator, nil
}

return iter.NewSortSampleIterator(iterators), nil
}

// createNewSeries creates a new Series for the given labels and stream hash
func createNewSeries(labels string, streamHash uint64) *logproto.Series {
samplesPtr := samplesPool.Get().(*[]logproto.Sample)
samples := *samplesPtr
return &logproto.Series{
Labels: labels,
Samples: samples[:0],
StreamHash: streamHash,
}
}

// appendIteratorFromSeries appends a new SampleIterator to the given list of iterators
func appendIteratorFromSeries(iterators []iter.SampleIterator, series map[string]*logproto.Series) []iter.SampleIterator {
if len(series) == 0 {
return iterators
}

seriesResult := make([]logproto.Series, 0, len(series))
for _, s := range series {
seriesResult = append(seriesResult, *s)
}

return append(iterators, iter.SampleIteratorWithClose(
iter.NewMultiSeriesIterator(seriesResult),
func() error {
for _, s := range seriesResult {
samplesPool.Put(&s.Samples)
}
return nil
},
))
}
78 changes: 17 additions & 61 deletions pkg/dataobj/querier/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ import (
"github.com/grafana/loki/v3/pkg/logql"
)

var streamsPool = sync.Pool{
New: func() any {
streams := make([]dataobj.Stream, 1024)
return &streams
},
}

// SelectSeries implements querier.Store
func (s *Store) SelectSeries(ctx context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) {
objects, err := s.objectsForTimeRange(ctx, req.Start, req.End)
Expand Down Expand Up @@ -129,13 +136,6 @@ func (s *Store) LabelValuesForMetricName(ctx context.Context, _ string, from, th
return values, nil
}

var streamsPool = sync.Pool{
New: func() any {
streams := make([]dataobj.Stream, 1024)
return &streams
},
}

// streamProcessor handles processing of unique series with custom collection logic
type streamProcessor struct {
predicate dataobj.StreamsPredicate
Expand All @@ -146,61 +146,25 @@ type streamProcessor struct {

// newStreamProcessor creates a new streamProcessor with the given parameters
func newStreamProcessor(start, end time.Time, matchers []*labels.Matcher, objects []*dataobj.Object, shard logql.Shard) *streamProcessor {
// Create a time range predicate
var predicate dataobj.StreamsPredicate = dataobj.TimeRangePredicate[dataobj.StreamsPredicate]{
StartTime: start,
EndTime: end,
IncludeStart: true,
IncludeEnd: true,
}

// If there are any matchers, combine them with an AND predicate
if len(matchers) > 0 {
predicate = dataobj.AndPredicate[dataobj.StreamsPredicate]{
Left: predicate,
Right: matchersToPredicate(matchers),
}
}

return &streamProcessor{
predicate: predicate,
predicate: streamPredicate(matchers, start, end),
seenSeries: &sync.Map{},
objects: objects,
shard: shard,
}
}

// matchersToPredicate converts a list of matchers to a dataobj.StreamsPredicate
func matchersToPredicate(matchers []*labels.Matcher) dataobj.StreamsPredicate {
var left dataobj.StreamsPredicate
for _, matcher := range matchers {
var right dataobj.StreamsPredicate
switch matcher.Type {
case labels.MatchEqual:
right = dataobj.LabelMatcherPredicate{Name: matcher.Name, Value: matcher.Value}
default:
right = dataobj.LabelFilterPredicate{Name: matcher.Name, Keep: func(_, value string) bool {
return matcher.Matches(value)
}}
}
if left == nil {
left = right
} else {
left = dataobj.AndPredicate[dataobj.StreamsPredicate]{
Left: left,
Right: right,
}
}
}
return left
}

// ProcessParallel processes series from multiple readers in parallel
func (sp *streamProcessor) ProcessParallel(ctx context.Context, onNewStream func(uint64, dataobj.Stream)) error {
readers, err := shardStreamReaders(ctx, sp.objects, sp.shard)
if err != nil {
return err
}
defer func() {
for _, reader := range readers {
streamReaderPool.Put(reader)
}
}()

// set predicate on all readers
for _, reader := range readers {
Expand Down Expand Up @@ -263,17 +227,8 @@ func labelsToSeriesIdentifier(labels labels.Labels) logproto.SeriesIdentifier {

// shardStreamReaders fetches metadata of objects in parallel and shards them into a list of StreamsReaders
func shardStreamReaders(ctx context.Context, objects []*dataobj.Object, shard logql.Shard) ([]*dataobj.StreamsReader, error) {
// fetch all metadata of objects in parallel
g, ctx := errgroup.WithContext(ctx)
metadatas := make([]dataobj.Metadata, len(objects))
for i, obj := range objects {
g.Go(func() error {
var err error
metadatas[i], err = obj.Metadata(ctx)
return err
})
}
if err := g.Wait(); err != nil {
metadatas, err := fetchMetadatas(ctx, objects)
if err != nil {
return nil, err
}
// sectionIndex tracks the global section number across all objects to ensure consistent sharding
Expand All @@ -289,7 +244,8 @@ func shardStreamReaders(ctx context.Context, objects []*dataobj.Object, shard lo
continue
}
}
reader := dataobj.NewStreamsReader(objects[i], j)
reader := streamReaderPool.Get().(*dataobj.StreamsReader)
reader.Reset(objects[i], j)
readers = append(readers, reader)
sectionIndex++
}
Expand Down
Loading

0 comments on commit 13a6c33

Please sign in to comment.