Skip to content

Commit

Permalink
Remove redundant spans (#10055)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
- Stop using `instrument.CollectedRequest` on all Index calls since it
emits spans that are redundant with existing ones and aren't improving
Loki's monitoring while making it more expensive to run
- Add events to `QuerySample` to improve its monitoring so we can better
track what is holding down its performance
- Remove redundant `Ingester.GetChunkRef` span
  • Loading branch information
DylanGuedes authored Jul 26, 2023
1 parent 94f0597 commit 122313c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 22 deletions.
34 changes: 20 additions & 14 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error {
// initialize stats collection for ingester queries.
_, ctx := stats.NewContext(queryServer.Context())
sp := opentracing.SpanFromContext(ctx)

instanceID, err := tenant.TenantID(ctx)
if err != nil {
Expand All @@ -930,10 +931,14 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
if err != nil {
return err
}

it, err := instance.QuerySample(ctx, logql.SelectSampleParams{SampleQueryRequest: req})
if err != nil {
return err
}
if sp != nil {
sp.LogKV("event", "finished instance query sample", "selector", req.Selector, "start", req.Start, "end", req.End)
}

if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok {
storeReq := logql.SelectSampleParams{SampleQueryRequest: &logproto.SampleQueryRequest{
Expand Down Expand Up @@ -1107,8 +1112,7 @@ func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*lo
}

func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "Ingester.GetStats")
defer sp.Finish()
sp := opentracing.SpanFromContext(ctx)

user, err := tenant.TenantID(ctx)
if err != nil {
Expand Down Expand Up @@ -1150,16 +1154,18 @@ func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest
}

merged := index_stats.MergeStats(resps...)
sp.LogKV(
"user", user,
"from", req.From.Time(),
"through", req.Through.Time(),
"matchers", syntax.MatchersString(matchers),
"streams", merged.Streams,
"chunks", merged.Chunks,
"bytes", merged.Bytes,
"entries", merged.Entries,
)
if sp != nil {
sp.LogKV(
"user", user,
"from", req.From.Time(),
"through", req.Through.Time(),
"matchers", syntax.MatchersString(matchers),
"streams", merged.Streams,
"chunks", merged.Chunks,
"bytes", merged.Bytes,
"entries", merged.Entries,
)
}

return &merged, nil
}
Expand Down Expand Up @@ -1288,9 +1294,9 @@ func (i *Ingester) TailersCount(ctx context.Context, _ *logproto.TailersCountReq
return &resp, nil
}

// buildStoreRequest returns a store request from an ingester request, returns nit if QueryStore is set to false in configuration.
// buildStoreRequest returns a store request from an ingester request, returns nil if QueryStore is set to false in configuration.
// The request may be truncated due to QueryStoreMaxLookBackPeriod which limits the range of request to make sure
// we only query enough to not miss any data and not add too to many duplicates by covering the who time range in query.
// we only query enough to not miss any data and not add too many duplicates by covering the whole time range in query.
func buildStoreRequest(cfg Config, start, end, now time.Time) (time.Time, time.Time, bool) {
if !cfg.QueryStore {
return time.Time{}, time.Time{}, false
Expand Down
7 changes: 6 additions & 1 deletion pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,8 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ
}

func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer logproto.Querier_QuerySampleServer) error {
sp := opentracing.SpanFromContext(ctx)

stats := stats.FromContext(ctx)
for !isDone(ctx) {
batch, size, err := iter.ReadSampleBatch(it, queryBatchSampleSize)
Expand All @@ -927,8 +929,11 @@ func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer
}

stats.Reset()

if sp != nil {
sp.LogKV("event", "sent batch", "size", size)
}
}

return nil
}

Expand Down
15 changes: 8 additions & 7 deletions pkg/storage/stores/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
loki_instrument "github.com/grafana/loki/pkg/util/instrument"
)

type Filterable interface {
Expand Down Expand Up @@ -56,7 +57,7 @@ func NewMonitoredReaderWriter(rw ReaderWriter, reg prometheus.Registerer) Reader
func (m monitoredReaderWriter) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]logproto.ChunkRef, error) {
var chunks []logproto.ChunkRef

if err := instrument.CollectedRequest(ctx, "chunk_refs", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
if err := loki_instrument.TimeRequest(ctx, "chunk_refs", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
chunks, err = m.rw.GetChunkRefs(ctx, userID, from, through, matchers...)
return err
Expand All @@ -69,7 +70,7 @@ func (m monitoredReaderWriter) GetChunkRefs(ctx context.Context, userID string,

func (m monitoredReaderWriter) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
var lbls []labels.Labels
if err := instrument.CollectedRequest(ctx, "series", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
if err := loki_instrument.TimeRequest(ctx, "series", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
lbls, err = m.rw.GetSeries(ctx, userID, from, through, matchers...)
return err
Expand All @@ -82,7 +83,7 @@ func (m monitoredReaderWriter) GetSeries(ctx context.Context, userID string, fro

func (m monitoredReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
var values []string
if err := instrument.CollectedRequest(ctx, "label_values", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
if err := loki_instrument.TimeRequest(ctx, "label_values", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
values, err = m.rw.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...)
return err
Expand All @@ -95,7 +96,7 @@ func (m monitoredReaderWriter) LabelValuesForMetricName(ctx context.Context, use

func (m monitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
var values []string
if err := instrument.CollectedRequest(ctx, "label_names", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
if err := loki_instrument.TimeRequest(ctx, "label_names", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
values, err = m.rw.LabelNamesForMetricName(ctx, userID, from, through, metricName)
return err
Expand All @@ -108,7 +109,7 @@ func (m monitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, user

func (m monitoredReaderWriter) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) {
var sts *stats.Stats
if err := instrument.CollectedRequest(ctx, "stats", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
if err := loki_instrument.TimeRequest(ctx, "stats", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
sts, err = m.rw.Stats(ctx, userID, from, through, matchers...)
return err
Expand All @@ -121,7 +122,7 @@ func (m monitoredReaderWriter) Stats(ctx context.Context, userID string, from, t

func (m monitoredReaderWriter) Volume(ctx context.Context, userID string, from, through model.Time, limit int32, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error) {
var vol *logproto.VolumeResponse
if err := instrument.CollectedRequest(ctx, "volume", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
if err := loki_instrument.TimeRequest(ctx, "volume", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
vol, err = m.rw.Volume(ctx, userID, from, through, limit, targetLabels, aggregateBy, matchers...)
return err
Expand All @@ -137,7 +138,7 @@ func (m monitoredReaderWriter) SetChunkFilterer(chunkFilter chunk.RequestChunkFi
}

func (m monitoredReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error {
return instrument.CollectedRequest(ctx, "index_chunk", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
return loki_instrument.TimeRequest(ctx, "index_chunk", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
return m.rw.IndexChunk(ctx, from, through, chk)
})
}

0 comments on commit 122313c

Please sign in to comment.