Skip to content

Commit

Permalink
feat(dataobj/querier): Add logging and improve stream processing metr…
Browse files Browse the repository at this point in the history
…ics (#16325)
  • Loading branch information
cyriltovena authored Feb 17, 2025
1 parent 8fd8697 commit 49bcaf4
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 27 deletions.
45 changes: 36 additions & 9 deletions pkg/dataobj/querier/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/grafana/loki/v3/pkg/dataobj"
Expand Down Expand Up @@ -47,7 +50,7 @@ func (s *Store) SelectSeries(ctx context.Context, req logql.SelectLogParams) ([]

uniqueSeries := &sync.Map{}

processor := newStreamProcessor(req.Start, req.End, matchers, objects, shard)
processor := newStreamProcessor(req.Start, req.End, matchers, objects, shard, s.logger)

err = processor.ProcessParallel(ctx, func(h uint64, stream dataobj.Stream) {
uniqueSeries.Store(h, labelsToSeriesIdentifier(stream.Labels))
Expand Down Expand Up @@ -76,7 +79,7 @@ func (s *Store) LabelNamesForMetricName(ctx context.Context, _ string, from, thr
return nil, err
}

processor := newStreamProcessor(start, end, matchers, objects, noShard)
processor := newStreamProcessor(start, end, matchers, objects, noShard, s.logger)
uniqueNames := sync.Map{}

err = processor.ProcessParallel(ctx, func(_ uint64, stream dataobj.Stream) {
Expand Down Expand Up @@ -115,7 +118,7 @@ func (s *Store) LabelValuesForMetricName(ctx context.Context, _ string, from, th
return nil, err
}

processor := newStreamProcessor(start, end, matchers, objects, noShard)
processor := newStreamProcessor(start, end, matchers, objects, noShard, s.logger)
uniqueValues := sync.Map{}

err = processor.ProcessParallel(ctx, func(_ uint64, stream dataobj.Stream) {
Expand All @@ -142,15 +145,17 @@ type streamProcessor struct {
seenSeries *sync.Map
objects []*dataobj.Object
shard logql.Shard
logger log.Logger
}

// newStreamProcessor creates a new streamProcessor with the given parameters
func newStreamProcessor(start, end time.Time, matchers []*labels.Matcher, objects []*dataobj.Object, shard logql.Shard) *streamProcessor {
func newStreamProcessor(start, end time.Time, matchers []*labels.Matcher, objects []*dataobj.Object, shard logql.Shard, logger log.Logger) *streamProcessor {
return &streamProcessor{
predicate: streamPredicate(matchers, start, end),
seenSeries: &sync.Map{},
objects: objects,
shard: shard,
logger: logger,
}
}

Expand All @@ -166,6 +171,9 @@ func (sp *streamProcessor) ProcessParallel(ctx context.Context, onNewStream func
}
}()

start := time.Now()
level.Debug(sp.logger).Log("msg", "processing streams", "total_readers", len(readers))

// set predicate on all readers
for _, reader := range readers {
if err := reader.SetPredicate(sp.predicate); err != nil {
Expand All @@ -174,28 +182,46 @@ func (sp *streamProcessor) ProcessParallel(ctx context.Context, onNewStream func
}

g, ctx := errgroup.WithContext(ctx)
var processedStreams atomic.Int64
for _, reader := range readers {
g.Go(func() error {
return sp.processSingleReader(ctx, reader, onNewStream)
n, err := sp.processSingleReader(ctx, reader, onNewStream)
if err != nil {
return err
}
processedStreams.Add(n)
return nil
})
}
return g.Wait()
err = g.Wait()
if err != nil {
return err
}

level.Debug(sp.logger).Log("msg", "finished processing streams",
"total_readers", len(readers),
"total_streams_processed", processedStreams.Load(),
"duration", time.Since(start),
)

return nil
}

func (sp *streamProcessor) processSingleReader(ctx context.Context, reader *dataobj.StreamsReader, onNewStream func(uint64, dataobj.Stream)) error {
func (sp *streamProcessor) processSingleReader(ctx context.Context, reader *dataobj.StreamsReader, onNewStream func(uint64, dataobj.Stream)) (int64, error) {
var (
streamsPtr = streamsPool.Get().(*[]dataobj.Stream)
streams = *streamsPtr
buf = make([]byte, 0, 1024)
h uint64
processed int64
)

defer streamsPool.Put(streamsPtr)

for {
n, err := reader.Read(ctx, streams)
if err != nil && err != io.EOF {
return err
return processed, err
}
if n == 0 && err == io.EOF {
break
Expand All @@ -207,9 +233,10 @@ func (sp *streamProcessor) processSingleReader(ctx context.Context, reader *data
continue
}
onNewStream(h, stream)
processed++
}
}
return nil
return processed, nil
}

func labelsToSeriesIdentifier(labels labels.Labels) logproto.SeriesIdentifier {
Expand Down
7 changes: 4 additions & 3 deletions pkg/dataobj/querier/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -23,7 +24,7 @@ func TestStore_SelectSeries(t *testing.T) {
// Setup test data
now := setupTestData(t, builder)

store := NewStore(builder.bucket)
store := NewStore(builder.bucket, log.NewNopLogger())
ctx := user.InjectOrgID(context.Background(), testTenant)

tests := []struct {
Expand Down Expand Up @@ -166,7 +167,7 @@ func TestStore_LabelNamesForMetricName(t *testing.T) {
// Setup test data
now := setupTestData(t, builder)

store := NewStore(builder.bucket)
store := NewStore(builder.bucket, log.NewNopLogger())
ctx := user.InjectOrgID(context.Background(), testTenant)

tests := []struct {
Expand Down Expand Up @@ -234,7 +235,7 @@ func TestStore_LabelValuesForMetricName(t *testing.T) {
// Setup test data
now := setupTestData(t, builder)

store := NewStore(builder.bucket)
store := NewStore(builder.bucket, log.NewNopLogger())
ctx := user.InjectOrgID(context.Background(), testTenant)

tests := []struct {
Expand Down
46 changes: 34 additions & 12 deletions pkg/dataobj/querier/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -90,12 +92,14 @@ func (c *Config) PeriodConfig() config.PeriodConfig {
// Store implements querier.Store for querying data objects.
type Store struct {
bucket objstore.Bucket
logger log.Logger
}

// NewStore creates a new Store.
func NewStore(bucket objstore.Bucket) *Store {
func NewStore(bucket objstore.Bucket, logger log.Logger) *Store {
return &Store{
bucket: bucket,
logger: logger,
}
}

Expand All @@ -110,7 +114,7 @@ func (s *Store) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter
return nil, err
}

return selectLogs(ctx, objects, shard, req)
return selectLogs(ctx, objects, shard, req, s.logger)
}

// SelectSamples implements querier.Store
Expand All @@ -129,7 +133,7 @@ func (s *Store) SelectSamples(ctx context.Context, req logql.SelectSampleParams)
return nil, err
}

return selectSamples(ctx, objects, shard, expr, req.Start, req.End)
return selectSamples(ctx, objects, shard, expr, req.Start, req.End, s.logger)
}

// Stats implements querier.Store
Expand Down Expand Up @@ -161,19 +165,21 @@ func (s *Store) objectsForTimeRange(ctx context.Context, from, through time.Time
return nil, err
}

level.Debug(s.logger).Log("msg", "found data objects for time range", "count", len(files), "from", from, "through", through)

objects := make([]*dataobj.Object, 0, len(files))
for _, path := range files {
objects = append(objects, dataobj.FromBucket(s.bucket, path))
}
return objects, nil
}

func selectLogs(ctx context.Context, objects []*dataobj.Object, shard logql.Shard, req logql.SelectLogParams) (iter.EntryIterator, error) {
func selectLogs(ctx context.Context, objects []*dataobj.Object, shard logql.Shard, req logql.SelectLogParams, logger log.Logger) (iter.EntryIterator, error) {
selector, err := req.LogSelector()
if err != nil {
return nil, err
}
shardedObjects, err := shardObjects(ctx, objects, shard)
shardedObjects, err := shardObjects(ctx, objects, shard, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -211,13 +217,8 @@ func selectLogs(ctx context.Context, objects []*dataobj.Object, shard logql.Shar
return iter.NewSortEntryIterator(iterators, req.Direction), nil
}

func selectSamples(ctx context.Context, objects []*dataobj.Object, shard logql.Shard, expr syntax.SampleExpr, start, end time.Time) (iter.SampleIterator, error) {
selector, err := expr.Selector()
if err != nil {
return nil, err
}

shardedObjects, err := shardObjects(ctx, objects, shard)
func selectSamples(ctx context.Context, objects []*dataobj.Object, shard logql.Shard, expr syntax.SampleExpr, start, end time.Time, logger log.Logger) (iter.SampleIterator, error) {
shardedObjects, err := shardObjects(ctx, objects, shard, logger)
if err != nil {
return nil, err
}
Expand All @@ -227,6 +228,11 @@ func selectSamples(ctx context.Context, objects []*dataobj.Object, shard logql.S
shardedObjectsPool.Put(obj)
}
}()
selector, err := expr.Selector()
if err != nil {
return nil, err
}

streamsPredicate := streamPredicate(selector.Matchers(), start, end)
// TODO: support more predicates and combine with log.Pipeline.
logsPredicate := dataobj.TimeRangePredicate[dataobj.LogsPredicate]{
Expand Down Expand Up @@ -268,14 +274,24 @@ func shardObjects(
ctx context.Context,
objects []*dataobj.Object,
shard logql.Shard,
logger log.Logger,
) ([]*shardedObject, error) {
metadatas, err := fetchMetadatas(ctx, objects)
if err != nil {
return nil, err
}

// Count total sections before sharding
var totalSections int
for _, metadata := range metadatas {
totalSections += metadata.LogsSections
}
logger = log.With(logger, "objects", len(objects))
logger = log.With(logger, "total_sections", totalSections)

// sectionIndex tracks the global section number across all objects to ensure consistent sharding
var sectionIndex uint64
var shardedSections int
shardedReaders := make([]*shardedObject, 0, len(objects))

for i, metadata := range metadatas {
Expand All @@ -288,6 +304,7 @@ func shardObjects(
continue
}
}
shardedSections++

if reader == nil {
reader = shardedObjectsPool.Get().(*shardedObject)
Expand All @@ -305,6 +322,11 @@ func shardObjects(
}
}

level.Debug(logger).Log("msg", "sharding sections",
"sharded_sections", shardedSections,
"sharded_objects", len(shardedReaders),
"shard_factor", shard.PowerOfTwo.Of)

return shardedReaders, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/dataobj/querier/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestStore_SelectSamples(t *testing.T) {

// Setup test data
now := setupTestData(t, builder)
store := NewStore(builder.bucket)
store := NewStore(builder.bucket, log.NewNopLogger())
ctx := user.InjectOrgID(context.Background(), testTenant)

tests := []struct {
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestStore_SelectLogs(t *testing.T) {

// Setup test data
now := setupTestData(t, builder)
store := NewStore(builder.bucket)
store := NewStore(builder.bucket, log.NewNopLogger())
ctx := user.InjectOrgID(context.Background(), testTenant)

tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func (t *Loki) getQuerierStore() (querier.Store, error) {

storeCombiner := querier.NewStoreCombiner([]querier.StoreConfig{
{
Store: dataobjquerier.NewStore(store),
Store: dataobjquerier.NewStore(store, log.With(util_log.Logger, "component", "dataobj-querier")),
From: t.Cfg.DataObj.Querier.From.Time,
},
{
Expand Down

0 comments on commit 49bcaf4

Please sign in to comment.