diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 0e1fe45e49e63..f9bc7ca9c4b85 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -920,6 +920,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error { // initialize stats collection for ingester queries. _, ctx := stats.NewContext(queryServer.Context()) + sp := opentracing.SpanFromContext(ctx) instanceID, err := tenant.TenantID(ctx) if err != nil { @@ -930,10 +931,14 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log if err != nil { return err } + it, err := instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req}) if err != nil { return err } + if sp != nil { + sp.LogKV("event", "finished instance query sample", "selector", req.Selector, "start", req.Start, "end", req.End) + } if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { storeReq := logql.SelectSampleParams{SampleQueryRequest: &logproto.SampleQueryRequest{ @@ -1107,8 +1112,7 @@ func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo } func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "Ingester.GetStats") - defer sp.Finish() + sp := opentracing.SpanFromContext(ctx) user, err := tenant.TenantID(ctx) if err != nil { @@ -1150,16 +1154,18 @@ func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest } merged := index_stats.MergeStats(resps...) - sp.LogKV( - "user", user, - "from", req.From.Time(), - "through", req.Through.Time(), - "matchers", syntax.MatchersString(matchers), - "streams", merged.Streams, - "chunks", merged.Chunks, - "bytes", merged.Bytes, - "entries", merged.Entries, - ) + if sp != nil { + sp.LogKV( + "user", user, + "from", req.From.Time(), + "through", req.Through.Time(), + "matchers", syntax.MatchersString(matchers), + "streams", merged.Streams, + "chunks", merged.Chunks, + "bytes", merged.Bytes, + "entries", merged.Entries, + ) + } return &merged, nil } @@ -1288,9 +1294,9 @@ func (i *Ingester) TailersCount(ctx context.Context, _ *logproto.TailersCountReq return &resp, nil } -// buildStoreRequest returns a store request from an ingester request, returns nit if QueryStore is set to false in configuration. +// buildStoreRequest returns a store request from an ingester request, returns nil if QueryStore is set to false in configuration. // The request may be truncated due to QueryStoreMaxLookBackPeriod which limits the range of request to make sure -// we only query enough to not miss any data and not add too to many duplicates by covering the who time range in query. +// we only query enough to not miss any data and not add too many duplicates by covering the whole time range in query. func buildStoreRequest(cfg Config, start, end, now time.Time) (time.Time, time.Time, bool) { if !cfg.QueryStore { return time.Time{}, time.Time{}, false diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index c3c200e59031a..45dddf7163bf9 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -905,6 +905,8 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ } func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer logproto.Querier_QuerySampleServer) error { + sp := opentracing.SpanFromContext(ctx) + stats := stats.FromContext(ctx) for !isDone(ctx) { batch, size, err := iter.ReadSampleBatch(it, queryBatchSampleSize) @@ -927,8 +929,11 @@ func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer } stats.Reset() - + if sp != nil { + sp.LogKV("event", "sent batch", "size", size) + } } + return nil } diff --git a/pkg/storage/stores/index/index.go b/pkg/storage/stores/index/index.go index e2c73edfb92ab..2c46ef799dfdd 100644 --- a/pkg/storage/stores/index/index.go +++ b/pkg/storage/stores/index/index.go @@ -11,6 +11,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/stores/index/stats" + loki_instrument "github.com/grafana/loki/pkg/util/instrument" ) type Filterable interface { @@ -56,7 +57,7 @@ func NewMonitoredReaderWriter(rw ReaderWriter, reg prometheus.Registerer) Reader func (m monitoredReaderWriter) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]logproto.ChunkRef, error) { var chunks []logproto.ChunkRef - if err := instrument.CollectedRequest(ctx, "chunk_refs", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { + if err := loki_instrument.TimeRequest(ctx, "chunk_refs", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { var err error chunks, err = m.rw.GetChunkRefs(ctx, userID, from, through, matchers...) return err @@ -69,7 +70,7 @@ func (m monitoredReaderWriter) GetChunkRefs(ctx context.Context, userID string, func (m monitoredReaderWriter) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) { var lbls []labels.Labels - if err := instrument.CollectedRequest(ctx, "series", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { + if err := loki_instrument.TimeRequest(ctx, "series", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { var err error lbls, err = m.rw.GetSeries(ctx, userID, from, through, matchers...) return err @@ -82,7 +83,7 @@ func (m monitoredReaderWriter) GetSeries(ctx context.Context, userID string, fro func (m monitoredReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { var values []string - if err := instrument.CollectedRequest(ctx, "label_values", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { + if err := loki_instrument.TimeRequest(ctx, "label_values", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { var err error values, err = m.rw.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...) return err @@ -95,7 +96,7 @@ func (m monitoredReaderWriter) LabelValuesForMetricName(ctx context.Context, use func (m monitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) { var values []string - if err := instrument.CollectedRequest(ctx, "label_names", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { + if err := loki_instrument.TimeRequest(ctx, "label_names", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { var err error values, err = m.rw.LabelNamesForMetricName(ctx, userID, from, through, metricName) return err @@ -108,7 +109,7 @@ func (m monitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, user func (m monitoredReaderWriter) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) { var sts *stats.Stats - if err := instrument.CollectedRequest(ctx, "stats", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { + if err := loki_instrument.TimeRequest(ctx, "stats", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { var err error sts, err = m.rw.Stats(ctx, userID, from, through, matchers...) return err @@ -121,7 +122,7 @@ func (m monitoredReaderWriter) Stats(ctx context.Context, userID string, from, t func (m monitoredReaderWriter) Volume(ctx context.Context, userID string, from, through model.Time, limit int32, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error) { var vol *logproto.VolumeResponse - if err := instrument.CollectedRequest(ctx, "volume", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { + if err := loki_instrument.TimeRequest(ctx, "volume", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { var err error vol, err = m.rw.Volume(ctx, userID, from, through, limit, targetLabels, aggregateBy, matchers...) return err @@ -137,7 +138,7 @@ func (m monitoredReaderWriter) SetChunkFilterer(chunkFilter chunk.RequestChunkFi } func (m monitoredReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error { - return instrument.CollectedRequest(ctx, "index_chunk", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { + return loki_instrument.TimeRequest(ctx, "index_chunk", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error { return m.rw.IndexChunk(ctx, from, through, chk) }) }