Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move -querier.split-queries-by-interval into limits config #5233

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* [4857](https://github.com/grafana/loki/pull/4857) **jordanrushing**: New schema v12 changes chunk key structure
* [5077](https://github.com/grafana/loki/pull/5077) **trevorwhitney**: Change some default values for better out-of-the-box performance
* [5204](https://github.com/grafana/loki/pull/5204) **trevorwhitney**: Default `max_outstanding_per_tenant` to `2048`
* [5233](https://github.com/grafana/loki/pull/5233) **trevorwhitney**: Move querier.split-queries-by-interval into limits config

# 2.4.1 (2021/11/07)

Expand Down
24 changes: 1 addition & 23 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Frontend.RegisterFlags(f)
c.Ruler.RegisterFlags(f)
c.Worker.RegisterFlags(f)
c.registerQueryRangeFlagsWithChangedDefaultValues(f)
c.QueryRange.RegisterFlags(f)
c.RuntimeConfig.RegisterFlags(f)
c.MemberlistKV.RegisterFlags(f)
c.Tracing.RegisterFlags(f)
Expand Down Expand Up @@ -138,28 +138,6 @@ func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) {
})
}

func (c *Config) registerQueryRangeFlagsWithChangedDefaultValues(fs *flag.FlagSet) {
throwaway := flag.NewFlagSet("throwaway", flag.PanicOnError)
// NB: We can remove this after removing Loki's dependency on Cortex and bringing in the queryrange.Config.
// That will let us change the defaults there rather than include wrapper functions like this one.
// Register to throwaway flags first. Default values are remembered during registration and cannot be changed,
// but we can take values from throwaway flag set and reregister into supplied flags with new default values.
c.QueryRange.RegisterFlags(throwaway)

throwaway.VisitAll(func(f *flag.Flag) {
// Ignore errors when setting new values. We have a test to verify that it works.
switch f.Name {
case "querier.split-queries-by-interval":
_ = f.Value.Set("30m")

case "querier.parallelise-shardable-queries":
_ = f.Value.Set("true")
}

fs.Var(f.Value, f.Name, f.Usage)
})
}

// Clone takes advantage of pass-by-value semantics to return a distinct *Config.
// This is primarily used to parse a different flag set without mutating the original *Config.
func (c *Config) Clone() flagext.Registerer {
Expand Down
6 changes: 1 addition & 5 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,12 @@ func (l limits) QuerySplitDuration(user string) time.Duration {
}

// WithDefaults will construct a Limits with a default value for QuerySplitDuration when no overrides are present.
func WithDefaultLimits(l Limits, conf queryrangebase.Config) Limits {
func WithDefaultLimits(l Limits) Limits {
res := limits{
Limits: l,
overrides: true,
}

if conf.SplitQueriesByInterval != 0 {
res.splitDuration = conf.SplitQueriesByInterval
}

return res
}

Expand Down
13 changes: 5 additions & 8 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,14 @@ import (

func TestLimits(t *testing.T) {
l := fakeLimits{
splits: map[string]time.Duration{"a": time.Minute},
querySplitDuration: time.Hour,
splits: map[string]time.Duration{"a": time.Minute},
}

require.Equal(t, l.QuerySplitDuration("a"), time.Minute)
require.Equal(t, l.QuerySplitDuration("b"), time.Duration(0))

wrapped := WithDefaultLimits(l, queryrangebase.Config{
SplitQueriesByInterval: time.Hour,
})
require.Equal(t, l.QuerySplitDuration("b"), time.Hour)

wrapped := WithDefaultLimits(l)
require.Equal(t, wrapped.QuerySplitDuration("a"), time.Minute)
require.Equal(t, wrapped.QuerySplitDuration("b"), time.Hour)

Expand All @@ -52,10 +50,9 @@ func TestLimits(t *testing.T) {

func Test_seriesLimiter(t *testing.T) {
cfg := testConfig
cfg.SplitQueriesByInterval = time.Hour
cfg.CacheResults = false
// split in 7 with 2 in // max.
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2, querySplitDuration: time.Hour}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down
124 changes: 2 additions & 122 deletions pkg/querier/queryrange/queryrangebase/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,13 @@ import (
"io"
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/flagext"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/tenant"
)

const day = 24 * time.Hour
Expand All @@ -54,7 +43,7 @@ var (

// Config for query_range middleware chain.
type Config struct {
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
//Deprecated
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
Expand All @@ -67,10 +56,9 @@ type Config struct {
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use an a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", false, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.")
f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", true, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
cfg.ResultsCacheConfig.RegisterFlags(f)
}
Expand Down Expand Up @@ -133,114 +121,6 @@ func (f RoundTripFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return f(r)
}

// NewTripperware returns a Tripperware configured with middlewares to limit, align, split, retry and cache requests.
func NewTripperware(
cfg Config,
log log.Logger,
limits Limits,
codec Codec,
cacheExtractor Extractor,
schema chunk.SchemaConfig,
engineOpts promql.EngineOpts,
minShardingLookback time.Duration,
registerer prometheus.Registerer,
cacheGenNumberLoader CacheGenNumberLoader,
) (Tripperware, cache.Cache, error) {
// Per tenant query metrics.
queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_queries_total",
Help: "Total queries sent per tenant.",
}, []string{"op", "user"})

activeUsers := util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
err := util.DeleteMatchingLabels(queriesPerTenant, map[string]string{"user": user})
if err != nil {
level.Warn(log).Log("msg", "failed to remove cortex_query_frontend_queries_total metric for user", "user", user)
}
})

// Metric used to keep track of each middleware execution duration.
metrics := NewInstrumentMiddlewareMetrics(registerer)

queryRangeMiddleware := []Middleware{NewLimitsMiddleware(limits)}
if cfg.AlignQueriesWithStep {
queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("step_align", metrics), StepAlignMiddleware)
}
if cfg.SplitQueriesByInterval != 0 {
staticIntervalFn := func(_ Request) time.Duration { return cfg.SplitQueriesByInterval }
queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, codec, registerer))
}

var c cache.Cache
if cfg.CacheResults {
shouldCache := func(r Request) bool {
return !r.GetCachingOptions().Disabled
}
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, codec, cacheExtractor, cacheGenNumberLoader, shouldCache, registerer)
if err != nil {
return nil, nil, err
}
c = cache
queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("results_cache", metrics), queryCacheMiddleware)
}

if cfg.ShardedQueries {
if minShardingLookback == 0 {
return nil, nil, errInvalidMinShardingLookback
}

shardingware := NewQueryShardMiddleware(
log,
promql.NewEngine(engineOpts),
schema.Configs,
codec,
minShardingLookback,
metrics,
registerer,
)

queryRangeMiddleware = append(
queryRangeMiddleware,
shardingware, // instrumentation is included in the sharding middleware
)
}

if cfg.MaxRetries > 0 {
queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer)))
}

// Start cleanup. If cleaner stops or fail, we will simply not clean the metrics for inactive users.
_ = activeUsers.StartAsync(context.Background())
return func(next http.RoundTripper) http.RoundTripper {
// Finally, if the user selected any query range middleware, stitch it in.
if len(queryRangeMiddleware) > 0 {
queryrange := NewRoundTripper(next, codec, cfg.ForwardHeaders, queryRangeMiddleware...)
return RoundTripFunc(func(r *http.Request) (*http.Response, error) {
isQueryRange := strings.HasSuffix(r.URL.Path, "/query_range")
op := "query"
if isQueryRange {
op = "query_range"
}

tenantIDs, err := tenant.TenantIDs(r.Context())
// This should never happen anyways because we have auth middleware before this.
if err != nil {
return nil, err
}
userStr := tenant.JoinTenantIDs(tenantIDs)
activeUsers.UpdateUserTimestamp(userStr, time.Now())
queriesPerTenant.WithLabelValues(op, userStr).Inc()

if !isQueryRange {
return next.RoundTrip(r)
}
return queryrange.RoundTrip(r)
})
}
return next
}, c, nil
}

type roundTripper struct {
next http.RoundTripper
handler Handler
Expand Down
124 changes: 0 additions & 124 deletions pkg/querier/queryrange/queryrangebase/roundtrip_test.go

This file was deleted.

11 changes: 11 additions & 0 deletions pkg/querier/queryrange/queryrangebase/split_by_interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,14 @@ func Test_evaluateAtModifier(t *testing.T) {
func toMs(t time.Duration) int64 {
return int64(t / time.Millisecond)
}

type singleHostRoundTripper struct {
host string
next http.RoundTripper
}

func (s singleHostRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
r.URL.Scheme = "http"
r.URL.Host = s.host
return s.next.RoundTrip(r)
}
Loading