Skip to content

Commit

Permalink
Refactor/remove global splitby (#5243)
Browse files Browse the repository at this point in the history
* cleans up internal limits struct, removes unnecessary logic

* moves querier.split-queries-by-interval to limits, simplifies resulting code.

* docs, changelog

* flag supports model.duration instead of time.duration

* lint and adds new error for deprecated field

* reregisters queryrange flags

* comment lint

* ensures we validate queryrange config and simplifies embedded struct validate()

* 0 still disables

* simplify cache key generation and protect against divide by zero
  • Loading branch information
owen-d authored Jan 27, 2022
1 parent 3d135e5 commit 91d837e
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 96 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Main

* [5243](https://github.com/grafana/loki/pull/5243) **owen-d**: moves `querier.split-queries-by-interval` to limits code only.
* [5139](https://github.com/grafana/loki/pull/5139) **DylanGuedes**: Drop support for legacy configuration rules format.
* [4911](https://github.com/grafana/loki/pull/4911) **jeschkies**: Support Docker service discovery in Promtail.
* [5107](https://github.com/grafana/loki/pull/5107) **chaudum** Fix bug in fluentd plugin that caused log lines containing non UTF-8 characters to be dropped.
Expand Down
6 changes: 2 additions & 4 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2263,10 +2263,8 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -frontend.min-sharding-lookback
[min_sharding_lookback: <duration> | default = 0s]

# Split queries by an interval and execute in parallel, 0 disables it. You
# should use in multiple of 24 hours (same as the storage bucketing scheme),
# to avoid queriers downloading and processing the same chunks. This also
# determines how cache keys are chosen when result caching is enabled
# Split queries by an interval and execute in parallel, any value less than zero disables it.
# This also determines how cache keys are chosen when result caching is enabled
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 30m]
```
Expand Down
19 changes: 19 additions & 0 deletions docs/sources/upgrading/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ The output is incredibly verbose as it shows the entire internal config struct u

### Loki

#### `querier.split-queries-by-interval` flag migrated yaml path and default value.

The CLI flag `querier.split-queries-by-interval` has changed it's corresponding yaml equivalent from
```yaml
query_range:
split_queries_by_interval: 10m
```
->
```
limits_config:
split_queries_by_interval: 10m

```

Additionally, it has a new default value of `30m` rather than `0`.

This is part of it's migration path from a global configuration to a per-tenant one (still subject to default tenant limits in the `limits_config`).
It keeps it's CLI flag as `querier.split-queries-by-interval`.

#### Dropped support for old Prometheus rules configuration format

Alerting rules previously could be specified in two formats: 1.x format (legacy one, named `v0` internally) and 2.x.
Expand Down
27 changes: 4 additions & 23 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Frontend.RegisterFlags(f)
c.Ruler.RegisterFlags(f)
c.Worker.RegisterFlags(f)
c.registerQueryRangeFlagsWithChangedDefaultValues(f)
c.QueryRange.RegisterFlags(f)
c.RuntimeConfig.RegisterFlags(f)
c.MemberlistKV.RegisterFlags(f)
c.Tracing.RegisterFlags(f)
Expand Down Expand Up @@ -138,28 +138,6 @@ func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) {
})
}

func (c *Config) registerQueryRangeFlagsWithChangedDefaultValues(fs *flag.FlagSet) {
throwaway := flag.NewFlagSet("throwaway", flag.PanicOnError)
// NB: We can remove this after removing Loki's dependency on Cortex and bringing in the queryrange.Config.
// That will let us change the defaults there rather than include wrapper functions like this one.
// Register to throwaway flags first. Default values are remembered during registration and cannot be changed,
// but we can take values from throwaway flag set and reregister into supplied flags with new default values.
c.QueryRange.RegisterFlags(throwaway)

throwaway.VisitAll(func(f *flag.Flag) {
// Ignore errors when setting new values. We have a test to verify that it works.
switch f.Name {
case "querier.split-queries-by-interval":
_ = f.Value.Set("30m")

case "querier.parallelise-shardable-queries":
_ = f.Value.Set("true")
}

fs.Var(f.Value, f.Name, f.Usage)
})
}

// Clone takes advantage of pass-by-value semantics to return a distinct *Config.
// This is primarily used to parse a different flag set without mutating the original *Config.
func (c *Config) Clone() flagext.Registerer {
Expand Down Expand Up @@ -222,6 +200,9 @@ func (c *Config) Validate() error {
)
}
}
if err := c.QueryRange.Validate(); err != nil {
return errors.Wrap(err, "invalid query_range config")
}
return nil
}

Expand Down
33 changes: 7 additions & 26 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,10 @@ type Limits interface {
type limits struct {
Limits
splitDuration time.Duration
overrides bool
}

func (l limits) QuerySplitDuration(user string) time.Duration {
if !l.overrides {
return l.splitDuration
}
dur := l.Limits.QuerySplitDuration(user)
if dur == 0 {
return l.splitDuration
}
return dur
}

// WithDefaults will construct a Limits with a default value for QuerySplitDuration when no overrides are present.
func WithDefaultLimits(l Limits, conf queryrangebase.Config) Limits {
res := limits{
Limits: l,
overrides: true,
}

if conf.SplitQueriesByInterval != 0 {
res.splitDuration = conf.SplitQueriesByInterval
}

return res
return l.splitDuration
}

// WithSplitByLimits will construct a Limits with a static split by duration.
Expand All @@ -84,11 +62,14 @@ type cacheKeyLimits struct {
Limits
}

// GenerateCacheKey will panic if it encounters a 0 split duration. We ensure against this by requiring
// a nonzero split interval when caching is enabled
func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrangebase.Request) string {
split := l.QuerySplitDuration(userID)
currentInterval := r.GetStart() / int64(split/time.Millisecond)

var currentInterval int64
if denominator := int64(split / time.Millisecond); denominator > 0 {
currentInterval = r.GetStart() / denominator
}

// include both the currentInterval and the split duration in key to ensure
// a cache key can't be reused when an interval changes
return fmt.Sprintf("%s:%s:%d:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval, split)
Expand Down
34 changes: 24 additions & 10 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,12 @@ func TestLimits(t *testing.T) {
splits: map[string]time.Duration{"a": time.Minute},
}

require.Equal(t, l.QuerySplitDuration("a"), time.Minute)
require.Equal(t, l.QuerySplitDuration("b"), time.Duration(0))
wrapped := WithSplitByLimits(l, time.Hour)

wrapped := WithDefaultLimits(l, queryrangebase.Config{
SplitQueriesByInterval: time.Hour,
})

require.Equal(t, wrapped.QuerySplitDuration("a"), time.Minute)
// Test default
require.Equal(t, wrapped.QuerySplitDuration("b"), time.Hour)
// Ensure we override the underlying implementation
require.Equal(t, wrapped.QuerySplitDuration("a"), time.Hour)

r := &LokiRequest{
Query: "qry",
Expand All @@ -45,17 +42,17 @@ func TestLimits(t *testing.T) {

require.Equal(
t,
fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart()/int64(time.Minute/time.Millisecond), int64(time.Minute)),
fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart()/int64(time.Hour/time.Millisecond), int64(time.Hour)),
cacheKeyLimits{wrapped}.GenerateCacheKey("a", r),
)
}

func Test_seriesLimiter(t *testing.T) {
cfg := testConfig
cfg.SplitQueriesByInterval = time.Hour
cfg.CacheResults = false
// split in 7 with 2 in // max.
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, nil)
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -269,3 +266,20 @@ func Test_MaxQueryLookBack(t *testing.T) {
_, err = tpw(rt).RoundTrip(req)
require.NoError(t, err)
}

func Test_GenerateCacheKey_NoDivideZero(t *testing.T) {
l := cacheKeyLimits{WithSplitByLimits(nil, 0)}
start := time.Now()
r := &LokiRequest{
Query: "qry",
StartTs: start,
Step: int64(time.Minute / time.Millisecond),
}

require.Equal(
t,
fmt.Sprintf("foo:qry:%d:0:0", r.GetStep()),
l.GenerateCacheKey("foo", r),
)

}
18 changes: 11 additions & 7 deletions pkg/querier/queryrange/queryrangebase/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,33 @@ var (

// Config for query_range middleware chain.
type Config struct {
// Deprecated: SplitQueriesByInterval will be removed in the next major release
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
ShardedQueries bool `yaml:"parallelise_shardable_queries"`

AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
ShardedQueries bool `yaml:"parallelise_shardable_queries"`
// List of headers which query_range middleware chain would forward to downstream querier.
ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use an a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", false, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.")
f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", true, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
cfg.ResultsCacheConfig.RegisterFlags(f)
}

// Validate validates the config.
func (cfg *Config) Validate() error {
if cfg.SplitQueriesByInterval != 0 {
return errors.New("the yaml flag `split_queries_by_interval` must now be set in the `limits_config` section instead of the `query_range` config section")
}
if cfg.CacheResults {
if err := cfg.ResultsCacheConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid ResultsCache config")
Expand Down
14 changes: 0 additions & 14 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/cortexproject/cortex/pkg/tenant"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/httpgrpc"
Expand All @@ -31,16 +30,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Config.RegisterFlags(f)
}

// Validate validates the config.
func (cfg *Config) Validate() error {
if cfg.CacheResults {
if err := cfg.ResultsCacheConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid ResultsCache config")
}
}
return nil
}

// Stopper gracefully shutdown resources created
type Stopper interface {
Stop()
Expand All @@ -54,9 +43,6 @@ func NewTripperware(
schema chunk.SchemaConfig,
registerer prometheus.Registerer,
) (queryrangebase.Tripperware, Stopper, error) {
// Ensure that QuerySplitDuration uses configuration defaults.
// This avoids divide by zero errors when determining cache keys where user specific overrides don't exist.
limits = WithDefaultLimits(limits, cfg.Config)

instrumentMetrics := queryrangebase.NewInstrumentMiddlewareMetrics(registerer)
retryMetrics := queryrangebase.NewRetryMiddlewareMetrics(registerer)
Expand Down
13 changes: 7 additions & 6 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ import (
var (
testTime = time.Date(2019, 12, 02, 11, 10, 10, 10, time.UTC)
testConfig = Config{queryrangebase.Config{
SplitQueriesByInterval: 4 * time.Hour,
AlignQueriesWithStep: true,
MaxRetries: 3,
CacheResults: true,
AlignQueriesWithStep: true,
MaxRetries: 3,
CacheResults: true,
ResultsCacheConfig: queryrangebase.ResultsCacheConfig{
CacheConfig: cache.Config{
EnableFifoCache: true,
Expand Down Expand Up @@ -109,7 +108,8 @@ var (

// those tests are mostly for testing the glue between all component and make sure they activate correctly.
func TestMetricsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, chunk.SchemaConfig{}, nil)
l := WithSplitByLimits(fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -402,7 +402,8 @@ func TestUnhandledPath(t *testing.T) {
}

func TestRegexpParamsSupport(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, chunk.SchemaConfig{}, nil)
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/querier/queryrange/split_by_interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ func Test_splitByInterval_Do(t *testing.T) {
}, nil
})

l := WithDefaultLimits(fakeLimits{maxQueryParallelism: 1}, queryrangebase.Config{SplitQueriesByInterval: time.Hour})
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour)
split := SplitByIntervalMiddleware(
l,
LokiCodec,
Expand Down Expand Up @@ -749,7 +749,7 @@ func Test_series_splitByInterval_Do(t *testing.T) {
}, nil
})

l := WithDefaultLimits(fakeLimits{maxQueryParallelism: 1}, queryrangebase.Config{SplitQueriesByInterval: time.Hour})
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour)
split := SplitByIntervalMiddleware(
l,
LokiCodec,
Expand Down Expand Up @@ -830,7 +830,7 @@ func Test_ExitEarly(t *testing.T) {
}, nil
})

l := WithDefaultLimits(fakeLimits{maxQueryParallelism: 1}, queryrangebase.Config{SplitQueriesByInterval: time.Hour})
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour)
split := SplitByIntervalMiddleware(
l,
LokiCodec,
Expand Down Expand Up @@ -907,9 +907,7 @@ func Test_DoesntDeadlock(t *testing.T) {
}, nil
})

l := WithDefaultLimits(fakeLimits{
maxQueryParallelism: n,
}, queryrangebase.Config{SplitQueriesByInterval: time.Hour})
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: n}, time.Hour)
split := SplitByIntervalMiddleware(
l,
LokiCodec,
Expand Down
4 changes: 4 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

_ = l.PerTenantOverridePeriod.Set("10s")
f.Var(&l.PerTenantOverridePeriod, "limits.per-user-override-period", "Period with this to reload the overrides.")

_ = l.QuerySplitDuration.Set("30m")
f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by an interval and execute in parallel, 0 disables it. This also determines how cache keys are chosen when result caching is enabled")
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
Expand All @@ -208,6 +211,7 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error {

// Validate validates that this limits config is valid.
func (l *Limits) Validate() error {

if l.StreamRetention != nil {
for i, rule := range l.StreamRetention {
matchers, err := logql.ParseMatchers(rule.Selector)
Expand Down

0 comments on commit 91d837e

Please sign in to comment.