Skip to content

Commit

Permalink
Deprecate querier split_queries_by_interval
Browse files Browse the repository at this point in the history
  • Loading branch information
ssncferreira committed Jan 20, 2022
1 parent dad7fca commit 305e520
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main

* [5184](https://github.com/grafana/loki/pull/5184) **ssncferreira** Deprecate `querier.split-queries-by-interval` in favor of `limits.split-queries-by-interval`.
* [5170](https://github.com/grafana/loki/pull/5170) **chaudum** Fix deadlock in Promtail caused when targets got removed from a target group by the discovery manager.
* [5163](https://github.com/grafana/loki/pull/5163) **chaudum** Fix regression in fluentd plugin introduced with #5107 that caused `NoMethodError` when parsing non-string values of log lines.
* [5144](https://github.com/grafana/loki/pull/5144) **dannykopping** Ruler: fix remote write basic auth credentials.
Expand Down
9 changes: 7 additions & 2 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,13 @@ The `frontend` block configures the Loki query-frontend.
The `query_range` block configures query splitting and caching in the Loki query-frontend.

```yaml
# Deprecated: Split queries by an interval and execute in parallel, 0 disables it.
# Use -limit.split-queries-by-interval instead.
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 30m]
# Deprecated: Split queries by day and execute in parallel.
# Use -querier.split-queries-by-interval instead.
# Use -limit.split-queries-by-interval instead.
# CLI flag: -querier.split-queries-by-day
[split_queries_by_day: <boolean> | default = false]
Expand Down Expand Up @@ -2267,7 +2272,7 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# should use in multiple of 24 hours (same as the storage bucketing scheme),
# to avoid queriers downloading and processing the same chunks. This also
# determines how cache keys are chosen when result caching is enabled
# CLI flag: -querier.split-queries-by-interval
# CLI flag: -limit.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 30m]
```
Expand Down
3 changes: 0 additions & 3 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,6 @@ func (c *Config) registerQueryRangeFlagsWithChangedDefaultValues(fs *flag.FlagSe
throwaway.VisitAll(func(f *flag.Flag) {
// Ignore errors when setting new values. We have a test to verify that it works.
switch f.Name {
case "querier.split-queries-by-interval":
_ = f.Value.Set("30m")

case "querier.parallelise-shardable-queries":
_ = f.Value.Set("true")
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
type Limits interface {
queryrangebase.Limits
logql.Limits
QuerySplitDurationDefault() time.Duration
QuerySplitDuration(string) time.Duration
MaxQuerySeries(string) int
MaxEntriesLimitPerQuery(string) int
Expand All @@ -56,15 +57,14 @@ func (l limits) QuerySplitDuration(user string) time.Duration {
}

// WithDefaults will construct a Limits with a default value for QuerySplitDuration when no overrides are present.
func WithDefaultLimits(l Limits, conf queryrangebase.Config) Limits {
func WithDefaultLimits(l Limits) Limits {
res := limits{
Limits: l,
overrides: true,
}

if conf.SplitQueriesByInterval != 0 {
res.splitDuration = conf.SplitQueriesByInterval
}
// Set as the default split by interval value
res.splitDuration = l.QuerySplitDurationDefault()

return res
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ import (

func TestLimits(t *testing.T) {
l := fakeLimits{
splits: map[string]time.Duration{"a": time.Minute},
splitDefault: 30 * time.Second,
splits: map[string]time.Duration{"a": time.Minute},
}

require.Equal(t, l.QuerySplitDuration("a"), time.Minute)
require.Equal(t, l.QuerySplitDuration("b"), time.Duration(0))
require.Equal(t, 30*time.Second, l.QuerySplitDurationDefault())
require.Equal(t, time.Minute, l.QuerySplitDuration("a"))
require.Equal(t, 30*time.Second, l.QuerySplitDuration("b"))

wrapped := WithDefaultLimits(l, queryrangebase.Config{
SplitQueriesByInterval: time.Hour,
})
wrapped := WithDefaultLimits(l)

require.Equal(t, wrapped.QuerySplitDuration("a"), time.Minute)
require.Equal(t, wrapped.QuerySplitDuration("b"), time.Hour)
require.Equal(t, 30*time.Second, wrapped.QuerySplitDurationDefault())
require.Equal(t, time.Minute, wrapped.QuerySplitDuration("a"))
require.Equal(t, 30*time.Second, wrapped.QuerySplitDuration("b"))

r := &LokiRequest{
Query: "qry",
Expand All @@ -52,10 +53,9 @@ func TestLimits(t *testing.T) {

func Test_seriesLimiter(t *testing.T) {
cfg := testConfig
cfg.SplitQueriesByInterval = time.Hour
cfg.CacheResults = false
// split in 7 with 2 in // max.
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2, splitDefault: time.Hour}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/querier/queryrange/queryrangebase/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (

// Config for query_range middleware chain.
type Config struct {
// Deprecated: SplitQueriesByInterval will be removed in the next major release
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
Expand All @@ -65,11 +66,18 @@ type Config struct {
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use an a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", false, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")

// TODO(ssncferreira): delete when querier.split-queries-by-interval is fully deprecated.
dur, err := time.ParseDuration("30m")
if err != nil {
dur = 0
}
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", dur, "Deprecated: Split queries by an interval and execute in parallel, 0 disables it. Use -limit.split-queries-by-interval instead.")

cfg.ResultsCacheConfig.RegisterFlags(f)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewTripperware(
) (queryrangebase.Tripperware, Stopper, error) {
// Ensure that QuerySplitDuration uses configuration defaults.
// This avoids divide by zero errors when determining cache keys where user specific overrides don't exist.
limits = WithDefaultLimits(limits, cfg.Config)
limits = WithDefaultLimits(limits)

instrumentMetrics := queryrangebase.NewInstrumentMiddlewareMetrics(registerer)
retryMetrics := queryrangebase.NewRetryMiddlewareMetrics(registerer)
Expand Down
23 changes: 15 additions & 8 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ import (
var (
testTime = time.Date(2019, 12, 02, 11, 10, 10, 10, time.UTC)
testConfig = Config{queryrangebase.Config{
SplitQueriesByInterval: 4 * time.Hour,
AlignQueriesWithStep: true,
MaxRetries: 3,
CacheResults: true,
AlignQueriesWithStep: true,
MaxRetries: 3,
CacheResults: true,
ResultsCacheConfig: queryrangebase.ResultsCacheConfig{
CacheConfig: cache.Config{
EnableFifoCache: true,
Expand Down Expand Up @@ -109,7 +108,7 @@ var (

// those tests are mostly for testing the glue between all component and make sure they activate correctly.
func TestMetricsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxSeries: math.MaxInt32}, chunk.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxSeries: math.MaxInt32, splitDefault: 4 * time.Hour}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -402,7 +401,7 @@ func TestUnhandledPath(t *testing.T) {
}

func TestRegexpParamsSupport(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{splitDefault: 4 * time.Hour}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -552,15 +551,23 @@ type fakeLimits struct {
maxQueryLookback time.Duration
maxEntriesLimitPerQuery int
maxSeries int
splitDefault time.Duration
splits map[string]time.Duration
minShardingLookback time.Duration
}

func (f fakeLimits) QuerySplitDurationDefault() time.Duration {
return f.splitDefault
}

func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
if f.splits == nil {
return 0
return f.splitDefault
}
if val, ok := f.splits[key]; ok {
return val
}
return f.splits[key]
return f.splitDefault
}

func (f fakeLimits) MaxQueryLength(string) time.Duration {
Expand Down
9 changes: 5 additions & 4 deletions pkg/querier/queryrange/split_by_interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func Test_splitByInterval_Do(t *testing.T) {
}, nil
})

l := WithDefaultLimits(fakeLimits{}, queryrangebase.Config{SplitQueriesByInterval: time.Hour})
l := WithDefaultLimits(fakeLimits{splitDefault: time.Hour})
split := SplitByIntervalMiddleware(
l,
LokiCodec,
Expand Down Expand Up @@ -719,7 +719,7 @@ func Test_series_splitByInterval_Do(t *testing.T) {
}, nil
})

l := WithDefaultLimits(fakeLimits{}, queryrangebase.Config{SplitQueriesByInterval: time.Hour})
l := WithDefaultLimits(fakeLimits{splitDefault: time.Hour})
split := SplitByIntervalMiddleware(
l,
LokiCodec,
Expand Down Expand Up @@ -800,7 +800,7 @@ func Test_ExitEarly(t *testing.T) {
}, nil
})

l := WithDefaultLimits(fakeLimits{}, queryrangebase.Config{SplitQueriesByInterval: time.Hour})
l := WithDefaultLimits(fakeLimits{splitDefault: time.Hour})
split := SplitByIntervalMiddleware(
l,
LokiCodec,
Expand Down Expand Up @@ -879,7 +879,8 @@ func Test_DoesntDeadlock(t *testing.T) {

l := WithDefaultLimits(fakeLimits{
maxQueryParallelism: n,
}, queryrangebase.Config{SplitQueriesByInterval: time.Hour})
splitDefault: time.Hour,
})
split := SplitByIntervalMiddleware(
l,
LokiCodec,
Expand Down
10 changes: 9 additions & 1 deletion pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

_ = l.PerTenantOverridePeriod.Set("10s")
f.Var(&l.PerTenantOverridePeriod, "limits.per-user-override-period", "Period with this to reload the overrides.")

_ = l.QuerySplitDuration.Set("30m")
f.Var(&l.QuerySplitDuration, "limits.split-queries-by-interval", "Split queries by an interval and execute in parallel, 0 disables it. You should use an a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled")
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
Expand Down Expand Up @@ -380,7 +383,12 @@ func (o *Overrides) MinShardingLookback(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).MinShardingLookback)
}

// QuerySplitDuration returns the tenant specific splitby interval applied in the query frontend.
// QuerySplitDurationDefault returns the default global split by interval applied in the query frontend.
func (o *Overrides) QuerySplitDurationDefault() time.Duration {
return time.Duration(o.defaultLimits.QuerySplitDuration)
}

// QuerySplitDuration returns the tenant specific split by interval applied in the query frontend.
func (o *Overrides) QuerySplitDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).QuerySplitDuration)
}
Expand Down

0 comments on commit 305e520

Please sign in to comment.