diff --git a/CHANGELOG.md b/CHANGELOG.md index 16d41db689089..f1b38503cdea5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## Main +* [5984](https://github.com/grafana/loki/pull/5984) **dannykopping** and **salvacorts**: Querier: prevent unnecessary calls to ingesters. * [5899](https://github.com/grafana/loki/pull/5899) **simonswine**: Update go image to 1.17.9. * [5888](https://github.com/grafana/loki/pull/5888) **Papawy** Fix common config net interface name overwritten by ring common config * [5799](https://github.com/grafana/loki/pull/5799) **cyriltovena** Fix deduping issues when multiple entries with the same timestamp exist. diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 66886b468415e..a53e59229ec9e 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -237,19 +237,42 @@ func (q *SingleTenantQuerier) deletesForUser(ctx context.Context, startT, endT t return deletes, nil } +func (q *SingleTenantQuerier) isWithinIngesterMaxLookbackPeriod(maxLookback time.Duration, queryEnd time.Time) bool { + // if no lookback limits are configured, always consider this within the range of the lookback period + if maxLookback <= 0 { + return true + } + + // find the first instance that we would want to query the ingester from... + ingesterOldestStartTime := time.Now().Add(-maxLookback) + + // ...and if the query range ends before that, don't query the ingester + return queryEnd.After(ingesterOldestStartTime) +} + +func (q *SingleTenantQuerier) calculateIngesterMaxLookbackPeriod() time.Duration { + mlb := time.Duration(-1) + if q.cfg.IngesterQueryStoreMaxLookback != 0 { + // IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range. + mlb = q.cfg.IngesterQueryStoreMaxLookback + } else if q.cfg.QueryIngestersWithin != 0 { + mlb = q.cfg.QueryIngestersWithin + } + + return mlb +} + func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) { // limitQueryInterval is a flag for whether store queries should be limited to start time of ingester queries. limitQueryInterval := false // ingesterMLB having -1 means query ingester for whole duration. - ingesterMLB := time.Duration(-1) if q.cfg.IngesterQueryStoreMaxLookback != 0 { // IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range. limitQueryInterval = true - ingesterMLB = q.cfg.IngesterQueryStoreMaxLookback - } else if q.cfg.QueryIngestersWithin != 0 { - ingesterMLB = q.cfg.QueryIngestersWithin } + ingesterMLB := q.calculateIngesterMaxLookbackPeriod() + // query ingester for whole duration. if ingesterMLB == -1 { i := &interval{ @@ -266,15 +289,18 @@ func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time return i, i } + ingesterQueryWithinRange := q.isWithinIngesterMaxLookbackPeriod(ingesterMLB, queryEnd) + // see if there is an overlap between ingester query interval and actual query interval, if not just do the store query. - ingesterOldestStartTime := time.Now().Add(-ingesterMLB) - if queryEnd.Before(ingesterOldestStartTime) { + if !ingesterQueryWithinRange { return nil, &interval{ start: queryStart, end: queryEnd, } } + ingesterOldestStartTime := time.Now().Add(-ingesterMLB) + // if there is an overlap and we are not limiting the query interval then do both store and ingester query for whole query interval. if !limitQueryInterval { i := &interval{ @@ -327,17 +353,25 @@ func (q *SingleTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout)) defer cancel() + ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(*req.Start, *req.End) + var ingesterValues [][]string - if !q.cfg.QueryStoreOnly { - ingesterValues, err = q.ingesterQuerier.Label(ctx, req) + if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil { + timeFramedReq := *req + timeFramedReq.Start = &ingesterQueryInterval.start + timeFramedReq.End = &ingesterQueryInterval.end + + ingesterValues, err = q.ingesterQuerier.Label(ctx, &timeFramedReq) if err != nil { return nil, err } } var storeValues []string - if !q.cfg.QueryIngesterOnly { - from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano()) + if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil { + from := model.TimeFromUnixNano(storeQueryInterval.start.UnixNano()) + through := model.TimeFromUnixNano(storeQueryInterval.end.UnixNano()) + if req.Values { storeValues, err = q.store.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name) if err != nil { @@ -440,13 +474,17 @@ func (q *SingleTenantQuerier) awaitSeries(ctx context.Context, req *logproto.Ser series := make(chan [][]logproto.SeriesIdentifier, 2) errs := make(chan error, 2) + ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(req.Start, req.End) + // fetch series from ingesters and store concurrently - if q.cfg.QueryStoreOnly { - series <- [][]logproto.SeriesIdentifier{} - } else { + if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil { + timeFramedReq := *req + timeFramedReq.Start = ingesterQueryInterval.start + timeFramedReq.End = ingesterQueryInterval.end + go func() { // fetch series identifiers from ingesters - resps, err := q.ingesterQuerier.Series(ctx, req) + resps, err := q.ingesterQuerier.Series(ctx, &timeFramedReq) if err != nil { errs <- err return @@ -454,17 +492,24 @@ func (q *SingleTenantQuerier) awaitSeries(ctx context.Context, req *logproto.Ser series <- resps }() + } else { + // If only queriying the store or the query range does not overlap with the ingester max lookback period (defined by `query_ingesters_within`) + // then don't call out to the ingesters, and send an empty result back to the channel + series <- [][]logproto.SeriesIdentifier{} } - if !q.cfg.QueryIngesterOnly { + if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil { go func() { - storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups(), req.Shards) + storeValues, err := q.seriesForMatchers(ctx, storeQueryInterval.start, storeQueryInterval.end, req.GetGroups(), req.Shards) if err != nil { errs <- err return } series <- [][]logproto.SeriesIdentifier{storeValues} }() + } else { + // If we are not querying the store, send an empty result back to the channel + series <- [][]logproto.SeriesIdentifier{} } var sets [][]logproto.SeriesIdentifier diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 1242a4e4c1632..b8ec0067a04d8 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -686,6 +686,328 @@ func TestQuerier_buildQueryIntervals(t *testing.T) { } } +func TestQuerier_calculateIngesterMaxLookbackPeriod(t *testing.T) { + for _, tc := range []struct { + name string + ingesterQueryStoreMaxLookback time.Duration + queryIngestersWithin time.Duration + expected time.Duration + }{ + { + name: "defaults are set; infinite lookback period if no values are set", + expected: -1, + }, + { + name: "only setting ingesterQueryStoreMaxLookback", + ingesterQueryStoreMaxLookback: time.Hour, + expected: time.Hour, + }, + { + name: "setting both ingesterQueryStoreMaxLookback and queryIngestersWithin; ingesterQueryStoreMaxLookback takes precedence", + ingesterQueryStoreMaxLookback: time.Hour, + queryIngestersWithin: time.Minute, + expected: time.Hour, + }, + { + name: "only setting queryIngestersWithin", + queryIngestersWithin: time.Minute, + expected: time.Minute, + }, + } { + t.Run(tc.name, func(t *testing.T) { + querier := SingleTenantQuerier{cfg: Config{ + IngesterQueryStoreMaxLookback: tc.ingesterQueryStoreMaxLookback, + QueryIngestersWithin: tc.queryIngestersWithin, + }} + + assert.Equal(t, tc.expected, querier.calculateIngesterMaxLookbackPeriod()) + }) + } +} + +func TestQuerier_isWithinIngesterMaxLookbackPeriod(t *testing.T) { + overlappingQuery := interval{ + start: time.Now().Add(-6 * time.Hour), + end: time.Now(), + } + + nonOverlappingQuery := interval{ + start: time.Now().Add(-24 * time.Hour), + end: time.Now().Add(-12 * time.Hour), + } + + for _, tc := range []struct { + name string + ingesterQueryStoreMaxLookback time.Duration + queryIngestersWithin time.Duration + overlappingWithinRange bool + nonOverlappingWithinRange bool + }{ + { + name: "default values, query ingesters and store for whole duration", + overlappingWithinRange: true, + nonOverlappingWithinRange: true, + }, + { + name: "ingesterQueryStoreMaxLookback set to 1h", + ingesterQueryStoreMaxLookback: time.Hour, + overlappingWithinRange: true, + nonOverlappingWithinRange: false, + }, + { + name: "ingesterQueryStoreMaxLookback set to 10h", + ingesterQueryStoreMaxLookback: 10 * time.Hour, + overlappingWithinRange: true, + nonOverlappingWithinRange: false, + }, + { + name: "ingesterQueryStoreMaxLookback set to 1h and queryIngestersWithin set to 16h, ingesterQueryStoreMaxLookback takes precedence", + ingesterQueryStoreMaxLookback: time.Hour, + queryIngestersWithin: 16 * time.Hour, // if used, this would put the nonOverlapping query in range + overlappingWithinRange: true, + nonOverlappingWithinRange: false, + }, + { + name: "ingesterQueryStoreMaxLookback set to -1, query just ingesters", + ingesterQueryStoreMaxLookback: -1, + overlappingWithinRange: true, + nonOverlappingWithinRange: true, + }, + { + name: "queryIngestersWithin set to 1h", + queryIngestersWithin: time.Hour, + overlappingWithinRange: true, + nonOverlappingWithinRange: false, + }, + { + name: "queryIngestersWithin set to 10h", + queryIngestersWithin: 10 * time.Hour, + overlappingWithinRange: true, + nonOverlappingWithinRange: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + querier := SingleTenantQuerier{cfg: Config{ + IngesterQueryStoreMaxLookback: tc.ingesterQueryStoreMaxLookback, + QueryIngestersWithin: tc.queryIngestersWithin, + }} + + lookbackPeriod := querier.calculateIngesterMaxLookbackPeriod() + assert.Equal(t, tc.overlappingWithinRange, querier.isWithinIngesterMaxLookbackPeriod(lookbackPeriod, overlappingQuery.end)) + assert.Equal(t, tc.nonOverlappingWithinRange, querier.isWithinIngesterMaxLookbackPeriod(lookbackPeriod, nonOverlappingQuery.end)) + }) + } +} + +func TestQuerier_RequestingIngesters(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "test") + + requestMapping := map[string]struct { + ingesterMethod string + storeMethod string + }{ + "SelectLogs": { + ingesterMethod: "Query", + storeMethod: "SelectLogs", + }, + "SelectSamples": { + ingesterMethod: "QuerySample", + storeMethod: "SelectSamples", + }, + "LabelValuesForMetricName": { + ingesterMethod: "Label", + storeMethod: "LabelValuesForMetricName", + }, + "LabelNamesForMetricName": { + ingesterMethod: "Label", + storeMethod: "LabelNamesForMetricName", + }, + "Series": { + ingesterMethod: "Series", + storeMethod: "Series", + }, + } + + tests := []struct { + desc string + start, end time.Time + setIngesterQueryStoreMaxLookback bool + expectedCallsStore int + expectedCallsIngesters int + }{ + { + desc: "Data in storage and ingesters", + start: time.Now().Add(-time.Hour * 2), + end: time.Now(), + expectedCallsStore: 1, + expectedCallsIngesters: 1, + }, + { + desc: "Data in ingesters (IngesterQueryStoreMaxLookback not set)", + start: time.Now().Add(-time.Minute * 15), + end: time.Now(), + expectedCallsStore: 1, + expectedCallsIngesters: 1, + }, + { + desc: "Data only in storage", + start: time.Now().Add(-time.Hour * 2), + end: time.Now().Add(-time.Hour * 1), + expectedCallsStore: 1, + expectedCallsIngesters: 0, + }, + { + desc: "Data in ingesters (IngesterQueryStoreMaxLookback set)", + start: time.Now().Add(-time.Minute * 15), + end: time.Now(), + setIngesterQueryStoreMaxLookback: true, + expectedCallsStore: 0, + expectedCallsIngesters: 1, + }, + } + + requests := []struct { + name string + do func(querier *SingleTenantQuerier, start, end time.Time) error + }{ + { + name: "SelectLogs", + do: func(querier *SingleTenantQuerier, start, end time.Time) error { + _, err := querier.SelectLogs(ctx, logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: "{type=\"test\", fail=\"yes\"} |= \"foo\"", + Limit: 10, + Start: start, + End: end, + Direction: logproto.FORWARD, + }, + }) + + return err + }, + }, + { + name: "SelectSamples", + do: func(querier *SingleTenantQuerier, start, end time.Time) error { + _, err := querier.SelectSamples(ctx, logql.SelectSampleParams{ + SampleQueryRequest: &logproto.SampleQueryRequest{ + Selector: "count_over_time({foo=\"bar\"}[5m])", + Start: start, + End: end, + }, + }) + return err + }, + }, + { + name: "LabelValuesForMetricName", + do: func(querier *SingleTenantQuerier, start, end time.Time) error { + _, err := querier.Label(ctx, &logproto.LabelRequest{ + Name: "type", + Values: true, + Start: &start, + End: &end, + }) + return err + }, + }, + { + name: "LabelNamesForMetricName", + do: func(querier *SingleTenantQuerier, start, end time.Time) error { + _, err := querier.Label(ctx, &logproto.LabelRequest{ + Values: false, + Start: &start, + End: &end, + }) + return err + }, + }, + { + name: "Series", + do: func(querier *SingleTenantQuerier, start, end time.Time) error { + _, err := querier.Series(ctx, &logproto.SeriesRequest{ + Start: start, + End: end, + }) + return err + }, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + + conf := mockQuerierConfig() + conf.QueryIngestersWithin = time.Minute * 30 + if tc.setIngesterQueryStoreMaxLookback { + conf.IngesterQueryStoreMaxLookback = conf.QueryIngestersWithin + } + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + for _, request := range requests { + t.Run(request.name, func(t *testing.T) { + ingesterClient, store, querier, err := setupIngesterQuerierMocks(conf, limits) + require.NoError(t, err) + + err = request.do(querier, tc.start, tc.end) + require.NoError(t, err) + + callsIngesters := ingesterClient.GetMockedCallsByMethod(requestMapping[request.name].ingesterMethod) + assert.Equal(t, tc.expectedCallsIngesters, len(callsIngesters)) + + callsStore := store.GetMockedCallsByMethod(requestMapping[request.name].storeMethod) + assert.Equal(t, tc.expectedCallsStore, len(callsStore)) + }) + } + }) + } +} + +func setupIngesterQuerierMocks(conf Config, limits *validation.Overrides) (*querierClientMock, *storeMock, *SingleTenantQuerier, error) { + queryClient := newQueryClientMock() + queryClient.On("Recv").Return(mockQueryResponse([]logproto.Stream{mockStream(1, 1)}), nil) + + querySampleClient := newQuerySampleClientMock() + querySampleClient.On("Recv").Return(mockQueryResponse([]logproto.Stream{mockStream(1, 1)}), nil) + + ingesterClient := newQuerierClientMock() + ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil) + ingesterClient.On("QuerySample", mock.Anything, mock.Anything, mock.Anything).Return(querySampleClient, nil) + ingesterClient.On("Label", mock.Anything, mock.Anything, mock.Anything).Return(mockLabelResponse([]string{"bar"}), nil) + ingesterClient.On("Series", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.SeriesResponse{ + Series: []logproto.SeriesIdentifier{ + { + Labels: map[string]string{"bar": "1"}, + }, + }, + }, nil) + + store := newStoreMock() + store.On("SelectLogs", mock.Anything, mock.Anything).Return(mockStreamIterator(0, 1), nil) + store.On("SelectSamples", mock.Anything, mock.Anything).Return(mockSampleIterator(querySampleClient), nil) + store.On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"1", "2", "3"}, nil) + store.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"foo"}, nil) + store.On("Series", mock.Anything, mock.Anything).Return([]logproto.SeriesIdentifier{ + {Labels: map[string]string{"foo": "1"}}, + }, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + store, limits) + + if err != nil { + return nil, nil, nil, err + } + + return ingesterClient, store, querier, nil +} + type fakeTimeLimits struct { maxQueryLookback time.Duration maxQueryLength time.Duration