From 502af0ddf3840b181c4ad1b299d939fac3fdc84d Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 26 Jan 2022 17:00:56 -0500 Subject: [PATCH 01/10] cleans up internal limits struct, removes unnecessary logic --- pkg/querier/queryrange/limits.go | 24 +------------------ pkg/querier/queryrange/limits_test.go | 16 ++++++------- pkg/querier/queryrange/roundtrip.go | 3 --- pkg/querier/queryrange/roundtrip_test.go | 13 +++++----- .../queryrange/split_by_interval_test.go | 10 ++++---- 5 files changed, 19 insertions(+), 47 deletions(-) diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 442019b9945c0..1c618622cbeb7 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -43,32 +43,10 @@ type Limits interface { type limits struct { Limits splitDuration time.Duration - overrides bool } func (l limits) QuerySplitDuration(user string) time.Duration { - if !l.overrides { - return l.splitDuration - } - dur := l.Limits.QuerySplitDuration(user) - if dur == 0 { - return l.splitDuration - } - return dur -} - -// WithDefaults will construct a Limits with a default value for QuerySplitDuration when no overrides are present. -func WithDefaultLimits(l Limits, conf queryrangebase.Config) Limits { - res := limits{ - Limits: l, - overrides: true, - } - - if conf.SplitQueriesByInterval != 0 { - res.splitDuration = conf.SplitQueriesByInterval - } - - return res + return l.splitDuration } // WithSplitByLimits will construct a Limits with a static split by duration. diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 9f210606e002c..e4163c5aa137c 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -27,15 +27,12 @@ func TestLimits(t *testing.T) { splits: map[string]time.Duration{"a": time.Minute}, } - require.Equal(t, l.QuerySplitDuration("a"), time.Minute) - require.Equal(t, l.QuerySplitDuration("b"), time.Duration(0)) + wrapped := WithSplitByLimits(l, time.Hour) - wrapped := WithDefaultLimits(l, queryrangebase.Config{ - SplitQueriesByInterval: time.Hour, - }) - - require.Equal(t, wrapped.QuerySplitDuration("a"), time.Minute) + // Test default require.Equal(t, wrapped.QuerySplitDuration("b"), time.Hour) + // Ensure we override the underlying implementation + require.Equal(t, wrapped.QuerySplitDuration("a"), time.Hour) r := &LokiRequest{ Query: "qry", @@ -45,7 +42,7 @@ func TestLimits(t *testing.T) { require.Equal( t, - fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart()/int64(time.Minute/time.Millisecond), int64(time.Minute)), + fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart()/int64(time.Hour/time.Millisecond), int64(time.Hour)), cacheKeyLimits{wrapped}.GenerateCacheKey("a", r), ) } @@ -55,7 +52,8 @@ func Test_seriesLimiter(t *testing.T) { cfg.SplitQueriesByInterval = time.Hour cfg.CacheResults = false // split in 7 with 2 in // max. - tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, nil) + l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour) + tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 167ce482665de..8a86f84d63ce8 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -54,9 +54,6 @@ func NewTripperware( schema chunk.SchemaConfig, registerer prometheus.Registerer, ) (queryrangebase.Tripperware, Stopper, error) { - // Ensure that QuerySplitDuration uses configuration defaults. - // This avoids divide by zero errors when determining cache keys where user specific overrides don't exist. - limits = WithDefaultLimits(limits, cfg.Config) instrumentMetrics := queryrangebase.NewInstrumentMiddlewareMetrics(registerer) retryMetrics := queryrangebase.NewRetryMiddlewareMetrics(registerer) diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 478ed82239c2c..f737bc588ff4e 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -33,10 +33,9 @@ import ( var ( testTime = time.Date(2019, 12, 02, 11, 10, 10, 10, time.UTC) testConfig = Config{queryrangebase.Config{ - SplitQueriesByInterval: 4 * time.Hour, - AlignQueriesWithStep: true, - MaxRetries: 3, - CacheResults: true, + AlignQueriesWithStep: true, + MaxRetries: 3, + CacheResults: true, ResultsCacheConfig: queryrangebase.ResultsCacheConfig{ CacheConfig: cache.Config{ EnableFifoCache: true, @@ -109,7 +108,8 @@ var ( // those tests are mostly for testing the glue between all component and make sure they activate correctly. func TestMetricsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, chunk.SchemaConfig{}, nil) + l := WithSplitByLimits(fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, 4*time.Hour) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } @@ -402,7 +402,8 @@ func TestUnhandledPath(t *testing.T) { } func TestRegexpParamsSupport(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, chunk.SchemaConfig{}, nil) + l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, 4*time.Hour) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index acb16e05788eb..5264f04e630c1 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -581,7 +581,7 @@ func Test_splitByInterval_Do(t *testing.T) { }, nil }) - l := WithDefaultLimits(fakeLimits{maxQueryParallelism: 1}, queryrangebase.Config{SplitQueriesByInterval: time.Hour}) + l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour) split := SplitByIntervalMiddleware( l, LokiCodec, @@ -749,7 +749,7 @@ func Test_series_splitByInterval_Do(t *testing.T) { }, nil }) - l := WithDefaultLimits(fakeLimits{maxQueryParallelism: 1}, queryrangebase.Config{SplitQueriesByInterval: time.Hour}) + l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour) split := SplitByIntervalMiddleware( l, LokiCodec, @@ -830,7 +830,7 @@ func Test_ExitEarly(t *testing.T) { }, nil }) - l := WithDefaultLimits(fakeLimits{maxQueryParallelism: 1}, queryrangebase.Config{SplitQueriesByInterval: time.Hour}) + l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour) split := SplitByIntervalMiddleware( l, LokiCodec, @@ -907,9 +907,7 @@ func Test_DoesntDeadlock(t *testing.T) { }, nil }) - l := WithDefaultLimits(fakeLimits{ - maxQueryParallelism: n, - }, queryrangebase.Config{SplitQueriesByInterval: time.Hour}) + l := WithSplitByLimits(fakeLimits{maxQueryParallelism: n}, time.Hour) split := SplitByIntervalMiddleware( l, LokiCodec, From 157abe053aaf9ff123d90e066047b1aa4deaeb63 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 26 Jan 2022 17:20:16 -0500 Subject: [PATCH 02/10] moves querier.split-queries-by-interval to limits, simplifies resulting code. --- pkg/loki/loki.go | 23 ------------------- pkg/querier/queryrange/limits.go | 11 ++++++++- .../queryrange/queryrangebase/roundtrip.go | 15 ++++++------ pkg/validation/limits.go | 4 ++++ 4 files changed, 22 insertions(+), 31 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index dab636d2539db..0d6e4b28bcd5f 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -109,7 +109,6 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Frontend.RegisterFlags(f) c.Ruler.RegisterFlags(f) c.Worker.RegisterFlags(f) - c.registerQueryRangeFlagsWithChangedDefaultValues(f) c.RuntimeConfig.RegisterFlags(f) c.MemberlistKV.RegisterFlags(f) c.Tracing.RegisterFlags(f) @@ -138,28 +137,6 @@ func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) { }) } -func (c *Config) registerQueryRangeFlagsWithChangedDefaultValues(fs *flag.FlagSet) { - throwaway := flag.NewFlagSet("throwaway", flag.PanicOnError) - // NB: We can remove this after removing Loki's dependency on Cortex and bringing in the queryrange.Config. - // That will let us change the defaults there rather than include wrapper functions like this one. - // Register to throwaway flags first. Default values are remembered during registration and cannot be changed, - // but we can take values from throwaway flag set and reregister into supplied flags with new default values. - c.QueryRange.RegisterFlags(throwaway) - - throwaway.VisitAll(func(f *flag.Flag) { - // Ignore errors when setting new values. We have a test to verify that it works. - switch f.Name { - case "querier.split-queries-by-interval": - _ = f.Value.Set("30m") - - case "querier.parallelise-shardable-queries": - _ = f.Value.Set("true") - } - - fs.Var(f.Value, f.Name, f.Usage) - }) -} - // Clone takes advantage of pass-by-value semantics to return a distinct *Config. // This is primarily used to parse a different flag set without mutating the original *Config. func (c *Config) Clone() flagext.Registerer { diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 1c618622cbeb7..f653725e2f276 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -66,7 +66,16 @@ type cacheKeyLimits struct { // a nonzero split interval when caching is enabled func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrangebase.Request) string { split := l.QuerySplitDuration(userID) - currentInterval := r.GetStart() / int64(split/time.Millisecond) + + // Ensure that we don't divide by zero when calculating the interval. + // Since we encode the original split in the key, + // we won't accidentally conflate two keys with different derived splits. + var splitInterval int64 = 1 + if x := int64(split / time.Millisecond); x > splitInterval { + splitInterval = x + } + + currentInterval := r.GetStart() / splitInterval // include both the currentInterval and the split duration in key to ensure // a cache key can't be reused when an interval changes return fmt.Sprintf("%s:%s:%d:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval, split) diff --git a/pkg/querier/queryrange/queryrangebase/roundtrip.go b/pkg/querier/queryrange/queryrangebase/roundtrip.go index 27bd1dc4f0582..b8b561c053681 100644 --- a/pkg/querier/queryrange/queryrangebase/roundtrip.go +++ b/pkg/querier/queryrange/queryrangebase/roundtrip.go @@ -54,12 +54,14 @@ var ( // Config for query_range middleware chain. type Config struct { + // Deprecated: SplitQueriesByInterval will be removed in the next major release SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"` - AlignQueriesWithStep bool `yaml:"align_queries_with_step"` - ResultsCacheConfig `yaml:"results_cache"` - CacheResults bool `yaml:"cache_results"` - MaxRetries int `yaml:"max_retries"` - ShardedQueries bool `yaml:"parallelise_shardable_queries"` + + AlignQueriesWithStep bool `yaml:"align_queries_with_step"` + ResultsCacheConfig `yaml:"results_cache"` + CacheResults bool `yaml:"cache_results"` + MaxRetries int `yaml:"max_retries"` + ShardedQueries bool `yaml:"parallelise_shardable_queries"` // List of headers which query_range middleware chain would forward to downstream querier. ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"` } @@ -67,10 +69,9 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.") - f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use an a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled") f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.") f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.") - f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", false, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.") + f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", true, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.") f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.") cfg.ResultsCacheConfig.RegisterFlags(f) } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index d25e1e5ad476b..df37275bac6e7 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -184,6 +184,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.PerTenantOverridePeriod.Set("10s") f.Var(&l.PerTenantOverridePeriod, "limits.per-user-override-period", "Period with this to reload the overrides.") + f.DurationVar(&l.QuerySplitDuration, "querier.split-queries-by-interval", 30*time.Minute, "Split queries by an interval and execute in parallel, any value less than 0 disables it. This also determines how cache keys are chosen when result caching is enabled") } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -208,6 +209,9 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error { // Validate validates that this limits config is valid. func (l *Limits) Validate() error { + if l.QuerySplitDuration < 0 { + l.QuerySplitDuration = 0 + } if l.StreamRetention != nil { for i, rule := range l.StreamRetention { matchers, err := logql.ParseMatchers(rule.Selector) From 3a0f6aca7d0d7d2939368b234f1f4ab169adbecd Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 26 Jan 2022 17:24:05 -0500 Subject: [PATCH 03/10] docs, changelog --- CHANGELOG.md | 1 + docs/sources/configuration/_index.md | 6 ++---- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 105851a23f6ea..7927263ab55fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [5243](https://github.com/grafana/loki/pull/5243) **owen-d**: moves `querier.split-queries-by-interval` to limits code only. * [4911](https://github.com/grafana/loki/pull/4911) **jeschkies**: Support Docker service discovery in Promtail. * [5107](https://github.com/grafana/loki/pull/5107) **chaudum** Fix bug in fluentd plugin that caused log lines containing non UTF-8 characters to be dropped. * [5187](https://github.com/grafana/loki/pull/5187) **aknuds1** Rename metric `cortex_experimental_features_in_use_total` to `loki_experimental_features_in_use_total` and metric `log_messages_total` to `loki_log_messages_total`. diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 491a6b1379f7f..50c887325c90b 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2263,10 +2263,8 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -frontend.min-sharding-lookback [min_sharding_lookback: | default = 0s] -# Split queries by an interval and execute in parallel, 0 disables it. You -# should use in multiple of 24 hours (same as the storage bucketing scheme), -# to avoid queriers downloading and processing the same chunks. This also -# determines how cache keys are chosen when result caching is enabled +# Split queries by an interval and execute in parallel, any value less than zero disables it. +# This also determines how cache keys are chosen when result caching is enabled # CLI flag: -querier.split-queries-by-interval [split_queries_by_interval: | default = 30m] ``` From 7c096106d311bcc9c49542f01e4f4751a1968cf9 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 26 Jan 2022 17:27:42 -0500 Subject: [PATCH 04/10] flag supports model.duration instead of time.duration --- pkg/validation/limits.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index df37275bac6e7..eabb4e8c1b673 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -184,7 +184,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.PerTenantOverridePeriod.Set("10s") f.Var(&l.PerTenantOverridePeriod, "limits.per-user-override-period", "Period with this to reload the overrides.") - f.DurationVar(&l.QuerySplitDuration, "querier.split-queries-by-interval", 30*time.Minute, "Split queries by an interval and execute in parallel, any value less than 0 disables it. This also determines how cache keys are chosen when result caching is enabled") + + _ = l.QuerySplitDuration.Set("30m") + f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by an interval and execute in parallel, any value less than 0 disables it. This also determines how cache keys are chosen when result caching is enabled") } // UnmarshalYAML implements the yaml.Unmarshaler interface. From 762621667638072384c56585e38083240c7fbd80 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 26 Jan 2022 17:39:18 -0500 Subject: [PATCH 05/10] lint and adds new error for deprecated field --- pkg/querier/queryrange/limits_test.go | 1 - pkg/querier/queryrange/queryrangebase/roundtrip.go | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index e4163c5aa137c..0f1ebf1f0d856 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -49,7 +49,6 @@ func TestLimits(t *testing.T) { func Test_seriesLimiter(t *testing.T) { cfg := testConfig - cfg.SplitQueriesByInterval = time.Hour cfg.CacheResults = false // split in 7 with 2 in // max. l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour) diff --git a/pkg/querier/queryrange/queryrangebase/roundtrip.go b/pkg/querier/queryrange/queryrangebase/roundtrip.go index b8b561c053681..c02e50ed9869c 100644 --- a/pkg/querier/queryrange/queryrangebase/roundtrip.go +++ b/pkg/querier/queryrange/queryrangebase/roundtrip.go @@ -78,6 +78,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // Validate validates the config. func (cfg *Config) Validate() error { + if cfg.SplitQueriesByInterval != 0 { + return errors.New("The yaml flag `split_queries_by_interval` must now be set in the `limits_config` section instead of the `query_range` config section.") + } if cfg.CacheResults { if err := cfg.ResultsCacheConfig.Validate(); err != nil { return errors.Wrap(err, "invalid ResultsCache config") From f04175fef8814ec8f14125fa8763d328a791a47f Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 27 Jan 2022 09:02:54 -0500 Subject: [PATCH 06/10] reregisters queryrange flags --- pkg/loki/loki.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 0d6e4b28bcd5f..f023c6f5901ac 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -109,6 +109,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Frontend.RegisterFlags(f) c.Ruler.RegisterFlags(f) c.Worker.RegisterFlags(f) + c.QueryRange.RegisterFlags(f) c.RuntimeConfig.RegisterFlags(f) c.MemberlistKV.RegisterFlags(f) c.Tracing.RegisterFlags(f) From cf2c82164aeac7f8007e62d47dbddcfd2348d75f Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 27 Jan 2022 09:03:38 -0500 Subject: [PATCH 07/10] comment lint --- pkg/querier/queryrange/queryrangebase/roundtrip.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/queryrange/queryrangebase/roundtrip.go b/pkg/querier/queryrange/queryrangebase/roundtrip.go index c02e50ed9869c..1cf6f934c65e6 100644 --- a/pkg/querier/queryrange/queryrangebase/roundtrip.go +++ b/pkg/querier/queryrange/queryrangebase/roundtrip.go @@ -79,7 +79,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // Validate validates the config. func (cfg *Config) Validate() error { if cfg.SplitQueriesByInterval != 0 { - return errors.New("The yaml flag `split_queries_by_interval` must now be set in the `limits_config` section instead of the `query_range` config section.") + return errors.New("the yaml flag `split_queries_by_interval` must now be set in the `limits_config` section instead of the `query_range` config section") } if cfg.CacheResults { if err := cfg.ResultsCacheConfig.Validate(); err != nil { From 3638bc0d087c7d42492c9c589e4b3a001a62de43 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 27 Jan 2022 09:48:08 -0500 Subject: [PATCH 08/10] ensures we validate queryrange config and simplifies embedded struct validate() --- pkg/loki/loki.go | 3 +++ pkg/querier/queryrange/roundtrip.go | 11 ----------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index f023c6f5901ac..6a10d54610eef 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -200,6 +200,9 @@ func (c *Config) Validate() error { ) } } + if err := c.QueryRange.Validate(); err != nil { + return errors.Wrap(err, "invalid query_range config") + } return nil } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 8a86f84d63ce8..f9be0ad0cecef 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -9,7 +9,6 @@ import ( "github.com/cortexproject/cortex/pkg/tenant" "github.com/go-kit/log" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/weaveworks/common/httpgrpc" @@ -31,16 +30,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.Config.RegisterFlags(f) } -// Validate validates the config. -func (cfg *Config) Validate() error { - if cfg.CacheResults { - if err := cfg.ResultsCacheConfig.Validate(); err != nil { - return errors.Wrap(err, "invalid ResultsCache config") - } - } - return nil -} - // Stopper gracefully shutdown resources created type Stopper interface { Stop() From 9118a4ff03b53799aa9a089a8c41308430fa5bcf Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 27 Jan 2022 10:27:53 -0500 Subject: [PATCH 09/10] 0 still disables --- docs/sources/upgrading/_index.md | 20 ++++++++++++++++++++ pkg/validation/limits.go | 6 ++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/docs/sources/upgrading/_index.md b/docs/sources/upgrading/_index.md index 797747e284c56..b585f047af8f7 100644 --- a/docs/sources/upgrading/_index.md +++ b/docs/sources/upgrading/_index.md @@ -33,6 +33,26 @@ The output is incredibly verbose as it shows the entire internal config struct u ### Loki +#### `querier.split-queries-by-interval` flag migrated yaml path and default value. + +The CLI flag `querier.split-queries-by-interval` has changed it's corresponding yaml equivalent from +```yaml +query_range: + split_queries_by_interval: 10m +``` +-> +``` +limits_config: + split_queries_by_interval: 10m + +``` + +Additionally, it has a new default value of `30m` rather than `0`. + +This is part of it's migration path from a global configuration to a per-tenant one (still subject to default tenant limits in the `limits_config`). +It keeps it's CLI flag as `querier.split-queries-by-interval`. + + #### Error responses from API The body of HTTP error responses from API endpoints changed from plain text to diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index eabb4e8c1b673..cb6c4fef4c0e3 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -186,7 +186,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.PerTenantOverridePeriod, "limits.per-user-override-period", "Period with this to reload the overrides.") _ = l.QuerySplitDuration.Set("30m") - f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by an interval and execute in parallel, any value less than 0 disables it. This also determines how cache keys are chosen when result caching is enabled") + f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by an interval and execute in parallel, 0 disables it. This also determines how cache keys are chosen when result caching is enabled") } // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -211,9 +211,7 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error { // Validate validates that this limits config is valid. func (l *Limits) Validate() error { - if l.QuerySplitDuration < 0 { - l.QuerySplitDuration = 0 - } + if l.StreamRetention != nil { for i, rule := range l.StreamRetention { matchers, err := logql.ParseMatchers(rule.Selector) From 36de8d44e93e4c5df1af50126b995f3d651e721d Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 27 Jan 2022 10:48:18 -0500 Subject: [PATCH 10/10] simplify cache key generation and protect against divide by zero --- pkg/querier/queryrange/limits.go | 12 +++--------- pkg/querier/queryrange/limits_test.go | 17 +++++++++++++++++ 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index f653725e2f276..b4eaba0910c9f 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -62,20 +62,14 @@ type cacheKeyLimits struct { Limits } -// GenerateCacheKey will panic if it encounters a 0 split duration. We ensure against this by requiring -// a nonzero split interval when caching is enabled func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrangebase.Request) string { split := l.QuerySplitDuration(userID) - // Ensure that we don't divide by zero when calculating the interval. - // Since we encode the original split in the key, - // we won't accidentally conflate two keys with different derived splits. - var splitInterval int64 = 1 - if x := int64(split / time.Millisecond); x > splitInterval { - splitInterval = x + var currentInterval int64 + if denominator := int64(split / time.Millisecond); denominator > 0 { + currentInterval = r.GetStart() / denominator } - currentInterval := r.GetStart() / splitInterval // include both the currentInterval and the split duration in key to ensure // a cache key can't be reused when an interval changes return fmt.Sprintf("%s:%s:%d:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval, split) diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 0f1ebf1f0d856..0f65da3cfb645 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -266,3 +266,20 @@ func Test_MaxQueryLookBack(t *testing.T) { _, err = tpw(rt).RoundTrip(req) require.NoError(t, err) } + +func Test_GenerateCacheKey_NoDivideZero(t *testing.T) { + l := cacheKeyLimits{WithSplitByLimits(nil, 0)} + start := time.Now() + r := &LokiRequest{ + Query: "qry", + StartTs: start, + Step: int64(time.Minute / time.Millisecond), + } + + require.Equal( + t, + fmt.Sprintf("foo:qry:%d:0:0", r.GetStep()), + l.GenerateCacheKey("foo", r), + ) + +}