Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Querier: prevent unnecessary calls to ingesters #5984

Merged
merged 16 commits into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Main
* [5984](https://github.com/grafana/loki/pull/5984) **dannykopping** and **salvacorts**: Querier: prevent unnecessary calls to ingesters.
* [5899](https://github.com/grafana/loki/pull/5899) **simonswine**: Update go image to 1.17.9.
* [5888](https://github.com/grafana/loki/pull/5888) **Papawy** Fix common config net interface name overwritten by ring common config
* [5799](https://github.com/grafana/loki/pull/5799) **cyriltovena** Fix deduping issues when multiple entries with the same timestamp exist.
Expand Down
77 changes: 61 additions & 16 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,19 +237,42 @@ func (q *SingleTenantQuerier) deletesForUser(ctx context.Context, startT, endT t
return deletes, nil
}

func (q *SingleTenantQuerier) isWithinIngesterMaxLookbackPeriod(maxLookback time.Duration, queryEnd time.Time) bool {
// if no lookback limits are configured, always consider this within the range of the lookback period
if maxLookback <= 0 {
return true
}

// find the first instance that we would want to query the ingester from...
ingesterOldestStartTime := time.Now().Add(-maxLookback)

// ...and if the query range ends before that, don't query the ingester
return queryEnd.After(ingesterOldestStartTime)
}

func (q *SingleTenantQuerier) calculateIngesterMaxLookbackPeriod() time.Duration {
mlb := time.Duration(-1)
if q.cfg.IngesterQueryStoreMaxLookback != 0 {
// IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range.
mlb = q.cfg.IngesterQueryStoreMaxLookback
} else if q.cfg.QueryIngestersWithin != 0 {
mlb = q.cfg.QueryIngestersWithin
}

return mlb
}

func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) {
// limitQueryInterval is a flag for whether store queries should be limited to start time of ingester queries.
limitQueryInterval := false
// ingesterMLB having -1 means query ingester for whole duration.
ingesterMLB := time.Duration(-1)
if q.cfg.IngesterQueryStoreMaxLookback != 0 {
// IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range.
limitQueryInterval = true
ingesterMLB = q.cfg.IngesterQueryStoreMaxLookback
} else if q.cfg.QueryIngestersWithin != 0 {
ingesterMLB = q.cfg.QueryIngestersWithin
}

ingesterMLB := q.calculateIngesterMaxLookbackPeriod()

// query ingester for whole duration.
if ingesterMLB == -1 {
i := &interval{
Expand All @@ -266,15 +289,18 @@ func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time
return i, i
}

ingesterQueryWithinRange := q.isWithinIngesterMaxLookbackPeriod(ingesterMLB, queryEnd)

// see if there is an overlap between ingester query interval and actual query interval, if not just do the store query.
ingesterOldestStartTime := time.Now().Add(-ingesterMLB)
if queryEnd.Before(ingesterOldestStartTime) {
if !ingesterQueryWithinRange {
return nil, &interval{
start: queryStart,
end: queryEnd,
}
}

ingesterOldestStartTime := time.Now().Add(-ingesterMLB)

// if there is an overlap and we are not limiting the query interval then do both store and ingester query for whole query interval.
if !limitQueryInterval {
i := &interval{
Expand Down Expand Up @@ -327,17 +353,25 @@ func (q *SingleTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequ
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
defer cancel()

ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(*req.Start, *req.End)

var ingesterValues [][]string
if !q.cfg.QueryStoreOnly {
ingesterValues, err = q.ingesterQuerier.Label(ctx, req)
if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
timeFramedReq := *req
timeFramedReq.Start = &ingesterQueryInterval.start
timeFramedReq.End = &ingesterQueryInterval.end

ingesterValues, err = q.ingesterQuerier.Label(ctx, &timeFramedReq)
if err != nil {
return nil, err
}
}

var storeValues []string
if !q.cfg.QueryIngesterOnly {
from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil {
from := model.TimeFromUnixNano(storeQueryInterval.start.UnixNano())
through := model.TimeFromUnixNano(storeQueryInterval.end.UnixNano())

if req.Values {
storeValues, err = q.store.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name)
if err != nil {
Expand Down Expand Up @@ -440,31 +474,42 @@ func (q *SingleTenantQuerier) awaitSeries(ctx context.Context, req *logproto.Ser
series := make(chan [][]logproto.SeriesIdentifier, 2)
errs := make(chan error, 2)

ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(req.Start, req.End)

// fetch series from ingesters and store concurrently
if q.cfg.QueryStoreOnly {
series <- [][]logproto.SeriesIdentifier{}
} else {
if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
timeFramedReq := *req
timeFramedReq.Start = ingesterQueryInterval.start
timeFramedReq.End = ingesterQueryInterval.end

go func() {
// fetch series identifiers from ingesters
resps, err := q.ingesterQuerier.Series(ctx, req)
resps, err := q.ingesterQuerier.Series(ctx, &timeFramedReq)
if err != nil {
errs <- err
return
}

series <- resps
}()
} else {
// If only queriying the store or the query range does not overlap with the ingester max lookback period (defined by `query_ingesters_within`)
// then don't call out to the ingesters, and send an empty result back to the channel
series <- [][]logproto.SeriesIdentifier{}
}

if !q.cfg.QueryIngesterOnly {
if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil {
go func() {
storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups(), req.Shards)
storeValues, err := q.seriesForMatchers(ctx, storeQueryInterval.start, storeQueryInterval.end, req.GetGroups(), req.Shards)
if err != nil {
errs <- err
return
}
series <- [][]logproto.SeriesIdentifier{storeValues}
}()
} else {
// If we are not querying the store, send an empty result back to the channel
series <- [][]logproto.SeriesIdentifier{}
}

var sets [][]logproto.SeriesIdentifier
Expand Down
Loading