Skip to content

Commit

Permalink
Add a flag which allows the queriers to run completely standalone and…
Browse files Browse the repository at this point in the history
… only query stored data. (grafana#3700)
  • Loading branch information
slim-bean authored May 17, 2021
1 parent 210dc35 commit fa24735
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 15 deletions.
5 changes: 5 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ The `querier_config` block configures the Loki Querier.
# CLI flag: -querier.query-ingesters-within
[query_ingesters_within: <duration> | default = 0s]
# Only query the store, do not attempt to query any ingesters,
# useful for running a standalone querier pool opearting only against stored data.
# CLI flag: -querier.query-store-only
[query_store_only: <boolean> | default = false]
# Configuration options for the LogQL engine.
engine:
# Timeout for query execution
Expand Down
4 changes: 4 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ func (t *Loki) initStore() (_ services.Service, err error) {
boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)
switch t.Cfg.Target {
case Querier, Ruler:
// Do not use the AsyncStore if the querier is configured with QueryStoreOnly set to true
if t.Cfg.Querier.QueryStoreOnly {
break
}
// Use AsyncStore to query both ingesters local store and chunk store for store queries.
// Only queriers should use the AsyncStore, it should never be used in ingesters.
chunkStore = loki_storage.NewAsyncStore(chunkStore, t.ingesterQuerier,
Expand Down
38 changes: 23 additions & 15 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Config struct {
IngesterQueryStoreMaxLookback time.Duration `yaml:"-"`
Engine logql.EngineOpts `yaml:"engine,omitempty"`
MaxConcurrent int `yaml:"max_concurrent"`
QueryStoreOnly bool `yaml:"query_store_only"`
}

// RegisterFlags register flags.
Expand All @@ -56,6 +57,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.ExtraQueryDelay, "querier.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.")
f.BoolVar(&cfg.QueryStoreOnly, "querier.query-store-only", false, "Queriers should only query the store and not try to query any ingesters")
}

// Querier handlers queries.
Expand Down Expand Up @@ -96,7 +98,7 @@ func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams)
ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End)

iters := []iter.EntryIterator{}
if ingesterQueryInterval != nil {
if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
// Make a copy of the request before modifying
// because the initial request is used below to query stores
queryRequestCopy := *params.QueryRequest
Expand Down Expand Up @@ -139,7 +141,7 @@ func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSamplePa
ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(params.Start, params.End)

iters := []iter.SampleIterator{}
if ingesterQueryInterval != nil {
if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
// Make a copy of the request before modifying
// because the initial request is used below to query stores
queryRequestCopy := *params.SampleQueryRequest
Expand Down Expand Up @@ -261,9 +263,12 @@ func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logpr
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
defer cancel()

ingesterValues, err := q.ingesterQuerier.Label(ctx, req)
if err != nil {
return nil, err
var ingesterValues [][]string
if !q.cfg.QueryStoreOnly {
ingesterValues, err = q.ingesterQuerier.Label(ctx, req)
if err != nil {
return nil, err
}
}

from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
Expand Down Expand Up @@ -371,17 +376,20 @@ func (q *Querier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest)
errs := make(chan error, 2)

// fetch series from ingesters and store concurrently
if q.cfg.QueryStoreOnly {
series <- [][]logproto.SeriesIdentifier{}
} else {
go func() {
// fetch series identifiers from ingesters
resps, err := q.ingesterQuerier.Series(ctx, req)
if err != nil {
errs <- err
return
}

go func() {
// fetch series identifiers from ingesters
resps, err := q.ingesterQuerier.Series(ctx, req)
if err != nil {
errs <- err
return
}

series <- resps
}()
series <- resps
}()
}

go func() {
storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups())
Expand Down

0 comments on commit fa24735

Please sign in to comment.