From 91d837e79c08648e4533200bbe9307c2729843cc Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 27 Jan 2022 12:13:49 -0500 Subject: [PATCH] Refactor/remove global splitby (#5243) * cleans up internal limits struct, removes unnecessary logic * moves querier.split-queries-by-interval to limits, simplifies resulting code. * docs, changelog * flag supports model.duration instead of time.duration * lint and adds new error for deprecated field * reregisters queryrange flags * comment lint * ensures we validate queryrange config and simplifies embedded struct validate() * 0 still disables * simplify cache key generation and protect against divide by zero --- CHANGELOG.md | 1 + docs/sources/configuration/_index.md | 6 ++-- docs/sources/upgrading/_index.md | 19 +++++++++++ pkg/loki/loki.go | 27 +++------------ pkg/querier/queryrange/limits.go | 33 ++++-------------- pkg/querier/queryrange/limits_test.go | 34 +++++++++++++------ .../queryrange/queryrangebase/roundtrip.go | 18 ++++++---- pkg/querier/queryrange/roundtrip.go | 14 -------- pkg/querier/queryrange/roundtrip_test.go | 13 +++---- .../queryrange/split_by_interval_test.go | 10 +++--- pkg/validation/limits.go | 4 +++ 11 files changed, 83 insertions(+), 96 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b2b5f766f004e..2bba95c74daeb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [5243](https://github.com/grafana/loki/pull/5243) **owen-d**: moves `querier.split-queries-by-interval` to limits code only. * [5139](https://github.com/grafana/loki/pull/5139) **DylanGuedes**: Drop support for legacy configuration rules format. * [4911](https://github.com/grafana/loki/pull/4911) **jeschkies**: Support Docker service discovery in Promtail. * [5107](https://github.com/grafana/loki/pull/5107) **chaudum** Fix bug in fluentd plugin that caused log lines containing non UTF-8 characters to be dropped. diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 491a6b1379f7f..50c887325c90b 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2263,10 +2263,8 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -frontend.min-sharding-lookback [min_sharding_lookback: | default = 0s] -# Split queries by an interval and execute in parallel, 0 disables it. You -# should use in multiple of 24 hours (same as the storage bucketing scheme), -# to avoid queriers downloading and processing the same chunks. This also -# determines how cache keys are chosen when result caching is enabled +# Split queries by an interval and execute in parallel, any value less than zero disables it. +# This also determines how cache keys are chosen when result caching is enabled # CLI flag: -querier.split-queries-by-interval [split_queries_by_interval: | default = 30m] ``` diff --git a/docs/sources/upgrading/_index.md b/docs/sources/upgrading/_index.md index 0f176ba9bf188..a23ac66b8e73b 100644 --- a/docs/sources/upgrading/_index.md +++ b/docs/sources/upgrading/_index.md @@ -33,6 +33,25 @@ The output is incredibly verbose as it shows the entire internal config struct u ### Loki +#### `querier.split-queries-by-interval` flag migrated yaml path and default value. + +The CLI flag `querier.split-queries-by-interval` has changed it's corresponding yaml equivalent from +```yaml +query_range: + split_queries_by_interval: 10m +``` +-> +``` +limits_config: + split_queries_by_interval: 10m + +``` + +Additionally, it has a new default value of `30m` rather than `0`. + +This is part of it's migration path from a global configuration to a per-tenant one (still subject to default tenant limits in the `limits_config`). +It keeps it's CLI flag as `querier.split-queries-by-interval`. + #### Dropped support for old Prometheus rules configuration format Alerting rules previously could be specified in two formats: 1.x format (legacy one, named `v0` internally) and 2.x. diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index dab636d2539db..6a10d54610eef 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -109,7 +109,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Frontend.RegisterFlags(f) c.Ruler.RegisterFlags(f) c.Worker.RegisterFlags(f) - c.registerQueryRangeFlagsWithChangedDefaultValues(f) + c.QueryRange.RegisterFlags(f) c.RuntimeConfig.RegisterFlags(f) c.MemberlistKV.RegisterFlags(f) c.Tracing.RegisterFlags(f) @@ -138,28 +138,6 @@ func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) { }) } -func (c *Config) registerQueryRangeFlagsWithChangedDefaultValues(fs *flag.FlagSet) { - throwaway := flag.NewFlagSet("throwaway", flag.PanicOnError) - // NB: We can remove this after removing Loki's dependency on Cortex and bringing in the queryrange.Config. - // That will let us change the defaults there rather than include wrapper functions like this one. - // Register to throwaway flags first. Default values are remembered during registration and cannot be changed, - // but we can take values from throwaway flag set and reregister into supplied flags with new default values. - c.QueryRange.RegisterFlags(throwaway) - - throwaway.VisitAll(func(f *flag.Flag) { - // Ignore errors when setting new values. We have a test to verify that it works. - switch f.Name { - case "querier.split-queries-by-interval": - _ = f.Value.Set("30m") - - case "querier.parallelise-shardable-queries": - _ = f.Value.Set("true") - } - - fs.Var(f.Value, f.Name, f.Usage) - }) -} - // Clone takes advantage of pass-by-value semantics to return a distinct *Config. // This is primarily used to parse a different flag set without mutating the original *Config. func (c *Config) Clone() flagext.Registerer { @@ -222,6 +200,9 @@ func (c *Config) Validate() error { ) } } + if err := c.QueryRange.Validate(); err != nil { + return errors.Wrap(err, "invalid query_range config") + } return nil } diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 442019b9945c0..b4eaba0910c9f 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -43,32 +43,10 @@ type Limits interface { type limits struct { Limits splitDuration time.Duration - overrides bool } func (l limits) QuerySplitDuration(user string) time.Duration { - if !l.overrides { - return l.splitDuration - } - dur := l.Limits.QuerySplitDuration(user) - if dur == 0 { - return l.splitDuration - } - return dur -} - -// WithDefaults will construct a Limits with a default value for QuerySplitDuration when no overrides are present. -func WithDefaultLimits(l Limits, conf queryrangebase.Config) Limits { - res := limits{ - Limits: l, - overrides: true, - } - - if conf.SplitQueriesByInterval != 0 { - res.splitDuration = conf.SplitQueriesByInterval - } - - return res + return l.splitDuration } // WithSplitByLimits will construct a Limits with a static split by duration. @@ -84,11 +62,14 @@ type cacheKeyLimits struct { Limits } -// GenerateCacheKey will panic if it encounters a 0 split duration. We ensure against this by requiring -// a nonzero split interval when caching is enabled func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrangebase.Request) string { split := l.QuerySplitDuration(userID) - currentInterval := r.GetStart() / int64(split/time.Millisecond) + + var currentInterval int64 + if denominator := int64(split / time.Millisecond); denominator > 0 { + currentInterval = r.GetStart() / denominator + } + // include both the currentInterval and the split duration in key to ensure // a cache key can't be reused when an interval changes return fmt.Sprintf("%s:%s:%d:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval, split) diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 9f210606e002c..0f65da3cfb645 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -27,15 +27,12 @@ func TestLimits(t *testing.T) { splits: map[string]time.Duration{"a": time.Minute}, } - require.Equal(t, l.QuerySplitDuration("a"), time.Minute) - require.Equal(t, l.QuerySplitDuration("b"), time.Duration(0)) + wrapped := WithSplitByLimits(l, time.Hour) - wrapped := WithDefaultLimits(l, queryrangebase.Config{ - SplitQueriesByInterval: time.Hour, - }) - - require.Equal(t, wrapped.QuerySplitDuration("a"), time.Minute) + // Test default require.Equal(t, wrapped.QuerySplitDuration("b"), time.Hour) + // Ensure we override the underlying implementation + require.Equal(t, wrapped.QuerySplitDuration("a"), time.Hour) r := &LokiRequest{ Query: "qry", @@ -45,17 +42,17 @@ func TestLimits(t *testing.T) { require.Equal( t, - fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart()/int64(time.Minute/time.Millisecond), int64(time.Minute)), + fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart()/int64(time.Hour/time.Millisecond), int64(time.Hour)), cacheKeyLimits{wrapped}.GenerateCacheKey("a", r), ) } func Test_seriesLimiter(t *testing.T) { cfg := testConfig - cfg.SplitQueriesByInterval = time.Hour cfg.CacheResults = false // split in 7 with 2 in // max. - tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, nil) + l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour) + tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } @@ -269,3 +266,20 @@ func Test_MaxQueryLookBack(t *testing.T) { _, err = tpw(rt).RoundTrip(req) require.NoError(t, err) } + +func Test_GenerateCacheKey_NoDivideZero(t *testing.T) { + l := cacheKeyLimits{WithSplitByLimits(nil, 0)} + start := time.Now() + r := &LokiRequest{ + Query: "qry", + StartTs: start, + Step: int64(time.Minute / time.Millisecond), + } + + require.Equal( + t, + fmt.Sprintf("foo:qry:%d:0:0", r.GetStep()), + l.GenerateCacheKey("foo", r), + ) + +} diff --git a/pkg/querier/queryrange/queryrangebase/roundtrip.go b/pkg/querier/queryrange/queryrangebase/roundtrip.go index 27bd1dc4f0582..1cf6f934c65e6 100644 --- a/pkg/querier/queryrange/queryrangebase/roundtrip.go +++ b/pkg/querier/queryrange/queryrangebase/roundtrip.go @@ -54,12 +54,14 @@ var ( // Config for query_range middleware chain. type Config struct { + // Deprecated: SplitQueriesByInterval will be removed in the next major release SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"` - AlignQueriesWithStep bool `yaml:"align_queries_with_step"` - ResultsCacheConfig `yaml:"results_cache"` - CacheResults bool `yaml:"cache_results"` - MaxRetries int `yaml:"max_retries"` - ShardedQueries bool `yaml:"parallelise_shardable_queries"` + + AlignQueriesWithStep bool `yaml:"align_queries_with_step"` + ResultsCacheConfig `yaml:"results_cache"` + CacheResults bool `yaml:"cache_results"` + MaxRetries int `yaml:"max_retries"` + ShardedQueries bool `yaml:"parallelise_shardable_queries"` // List of headers which query_range middleware chain would forward to downstream querier. ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"` } @@ -67,16 +69,18 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.") - f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use an a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled") f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.") f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.") - f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", false, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.") + f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", true, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.") f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.") cfg.ResultsCacheConfig.RegisterFlags(f) } // Validate validates the config. func (cfg *Config) Validate() error { + if cfg.SplitQueriesByInterval != 0 { + return errors.New("the yaml flag `split_queries_by_interval` must now be set in the `limits_config` section instead of the `query_range` config section") + } if cfg.CacheResults { if err := cfg.ResultsCacheConfig.Validate(); err != nil { return errors.Wrap(err, "invalid ResultsCache config") diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 167ce482665de..f9be0ad0cecef 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -9,7 +9,6 @@ import ( "github.com/cortexproject/cortex/pkg/tenant" "github.com/go-kit/log" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/weaveworks/common/httpgrpc" @@ -31,16 +30,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.Config.RegisterFlags(f) } -// Validate validates the config. -func (cfg *Config) Validate() error { - if cfg.CacheResults { - if err := cfg.ResultsCacheConfig.Validate(); err != nil { - return errors.Wrap(err, "invalid ResultsCache config") - } - } - return nil -} - // Stopper gracefully shutdown resources created type Stopper interface { Stop() @@ -54,9 +43,6 @@ func NewTripperware( schema chunk.SchemaConfig, registerer prometheus.Registerer, ) (queryrangebase.Tripperware, Stopper, error) { - // Ensure that QuerySplitDuration uses configuration defaults. - // This avoids divide by zero errors when determining cache keys where user specific overrides don't exist. - limits = WithDefaultLimits(limits, cfg.Config) instrumentMetrics := queryrangebase.NewInstrumentMiddlewareMetrics(registerer) retryMetrics := queryrangebase.NewRetryMiddlewareMetrics(registerer) diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 478ed82239c2c..f737bc588ff4e 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -33,10 +33,9 @@ import ( var ( testTime = time.Date(2019, 12, 02, 11, 10, 10, 10, time.UTC) testConfig = Config{queryrangebase.Config{ - SplitQueriesByInterval: 4 * time.Hour, - AlignQueriesWithStep: true, - MaxRetries: 3, - CacheResults: true, + AlignQueriesWithStep: true, + MaxRetries: 3, + CacheResults: true, ResultsCacheConfig: queryrangebase.ResultsCacheConfig{ CacheConfig: cache.Config{ EnableFifoCache: true, @@ -109,7 +108,8 @@ var ( // those tests are mostly for testing the glue between all component and make sure they activate correctly. func TestMetricsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, chunk.SchemaConfig{}, nil) + l := WithSplitByLimits(fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, 4*time.Hour) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } @@ -402,7 +402,8 @@ func TestUnhandledPath(t *testing.T) { } func TestRegexpParamsSupport(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, chunk.SchemaConfig{}, nil) + l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, 4*time.Hour) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index acb16e05788eb..5264f04e630c1 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -581,7 +581,7 @@ func Test_splitByInterval_Do(t *testing.T) { }, nil }) - l := WithDefaultLimits(fakeLimits{maxQueryParallelism: 1}, queryrangebase.Config{SplitQueriesByInterval: time.Hour}) + l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour) split := SplitByIntervalMiddleware( l, LokiCodec, @@ -749,7 +749,7 @@ func Test_series_splitByInterval_Do(t *testing.T) { }, nil }) - l := WithDefaultLimits(fakeLimits{maxQueryParallelism: 1}, queryrangebase.Config{SplitQueriesByInterval: time.Hour}) + l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour) split := SplitByIntervalMiddleware( l, LokiCodec, @@ -830,7 +830,7 @@ func Test_ExitEarly(t *testing.T) { }, nil }) - l := WithDefaultLimits(fakeLimits{maxQueryParallelism: 1}, queryrangebase.Config{SplitQueriesByInterval: time.Hour}) + l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour) split := SplitByIntervalMiddleware( l, LokiCodec, @@ -907,9 +907,7 @@ func Test_DoesntDeadlock(t *testing.T) { }, nil }) - l := WithDefaultLimits(fakeLimits{ - maxQueryParallelism: n, - }, queryrangebase.Config{SplitQueriesByInterval: time.Hour}) + l := WithSplitByLimits(fakeLimits{maxQueryParallelism: n}, time.Hour) split := SplitByIntervalMiddleware( l, LokiCodec, diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index d25e1e5ad476b..cb6c4fef4c0e3 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -184,6 +184,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.PerTenantOverridePeriod.Set("10s") f.Var(&l.PerTenantOverridePeriod, "limits.per-user-override-period", "Period with this to reload the overrides.") + + _ = l.QuerySplitDuration.Set("30m") + f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by an interval and execute in parallel, 0 disables it. This also determines how cache keys are chosen when result caching is enabled") } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -208,6 +211,7 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error { // Validate validates that this limits config is valid. func (l *Limits) Validate() error { + if l.StreamRetention != nil { for i, rule := range l.StreamRetention { matchers, err := logql.ParseMatchers(rule.Selector)