From 85c041cf1e093de037a582846a26f2834cd3cd05 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 3 May 2021 13:51:56 +0200 Subject: [PATCH 1/2] Deprecate max_look_back_period in the chunk storage. From now on it will be applied at the frontend, ruler and querier level. The querier validation was missing so I added it. Signed-off-by: Cyril Tovena --- cmd/migrate/main.go | 23 ++++----- pkg/logcli/query/query.go | 2 +- pkg/loki/loki.go | 9 +++- pkg/loki/modules.go | 2 +- pkg/querier/querier.go | 60 +++++++++++++++--------- pkg/querier/querier_test.go | 58 +++++++++++++++++------ pkg/storage/store.go | 24 ++++++++++ production/ksonnet/loki/config.libsonnet | 1 - 8 files changed, 125 insertions(+), 54 deletions(-) diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index 2d1a150853d70..2147bd0a81e03 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -35,7 +35,6 @@ type syncRange struct { } func main() { - var defaultsConfig loki.Config from := flag.String("from", "", "Start Time RFC339Nano 2006-01-02T15:04:05.999999999Z07:00") @@ -91,7 +90,7 @@ func main() { } // Create a new registerer to avoid registering duplicate metrics prometheus.DefaultRegisterer = prometheus.NewRegistry() - sourceStore, err := cortex_storage.NewStore(sourceConfig.StorageConfig.Config, sourceConfig.ChunkStoreConfig, sourceConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger) + sourceStore, err := cortex_storage.NewStore(sourceConfig.StorageConfig.Config, sourceConfig.ChunkStoreConfig.StoreConfig, sourceConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger) if err != nil { log.Println("Failed to create source store:", err) os.Exit(1) @@ -104,7 +103,7 @@ func main() { // Create a new registerer to avoid registering duplicate metrics prometheus.DefaultRegisterer = prometheus.NewRegistry() - destStore, err := cortex_storage.NewStore(destConfig.StorageConfig.Config, destConfig.ChunkStoreConfig, destConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger) + destStore, err := cortex_storage.NewStore(destConfig.StorageConfig.Config, destConfig.ChunkStoreConfig.StoreConfig, destConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger) if err != nil { log.Println("Failed to create destination store:", err) os.Exit(1) @@ -175,7 +174,7 @@ func main() { errorChan := make(chan error) statsChan := make(chan stats) - //Start the parallel processors + // Start the parallel processors var wg sync.WaitGroup cancelContext, cancelFunc := context.WithCancel(ctx) for i := 0; i < *parallel; i++ { @@ -195,7 +194,7 @@ func main() { syncChan <- syncRanges[i] i++ } - //Everything processed, exit + // Everything processed, exit cancelFunc() }() @@ -229,16 +228,15 @@ func main() { for { time.Sleep(100 * time.Second) } - } func calcSyncRanges(from, to int64, shardBy int64) []*syncRange { - //Calculate the sync ranges + // Calculate the sync ranges syncRanges := []*syncRange{} - //diff := to - from - //shards := diff / shardBy + // diff := to - from + // shards := diff / shardBy currentFrom := from - //currentTo := from + // currentTo := from currentTo := from + shardBy for currentFrom < to && currentTo <= to { s := &syncRange{ @@ -286,7 +284,6 @@ func newChunkMover(ctx context.Context, source, dest storage.Store, sourceUser, } func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh <-chan *syncRange, errCh chan<- error, statsCh chan<- stats) { - for { select { case <-ctx.Done(): @@ -306,7 +303,7 @@ func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh < for i, f := range fetchers { log.Printf("%v Processing Schema %v which contains %v chunks\n", threadID, i, len(schemaGroups[i])) - //Slice up into batches + // Slice up into batches for j := 0; j < len(schemaGroups[i]); j += m.batch { k := j + m.batch if k > len(schemaGroups[i]) { @@ -394,9 +391,7 @@ func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh < } func mustParse(t string) time.Time { - ret, err := time.Parse(time.RFC3339Nano, t) - if err != nil { log.Fatalf("Unable to parse time %v", err) } diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index e7834ab3ed191..fc29630260da2 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -189,7 +189,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string return err } - chunkStore, err := cortex_storage.NewStore(conf.StorageConfig.Config, conf.ChunkStoreConfig, conf.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger) + chunkStore, err := cortex_storage.NewStore(conf.StorageConfig.Config, conf.ChunkStoreConfig.StoreConfig, conf.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger) if err != nil { return err } diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 8430355205314..efcf230d78a1e 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -61,7 +61,7 @@ type Config struct { IngesterClient client.Config `yaml:"ingester_client,omitempty"` Ingester ingester.Config `yaml:"ingester,omitempty"` StorageConfig storage.Config `yaml:"storage_config,omitempty"` - ChunkStoreConfig chunk.StoreConfig `yaml:"chunk_store_config,omitempty"` + ChunkStoreConfig storage.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"` SchemaConfig storage.SchemaConfig `yaml:"schema_config,omitempty"` LimitsConfig validation.Limits `yaml:"limits_config,omitempty"` TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"` @@ -138,6 +138,13 @@ func (c *Config) Validate() error { if err := c.CompactorConfig.Validate(); err != nil { return errors.Wrap(err, "invalid compactor config") } + if err := c.ChunkStoreConfig.Validate(util_log.Logger); err != nil { + return errors.Wrap(err, "invalid chunk store config") + } + // TODO(cyriltovena): remove when MaxLookBackPeriod in the storage will be fully deprecated. + if c.ChunkStoreConfig.MaxLookBackPeriod > 0 { + c.LimitsConfig.MaxQueryLookback = c.ChunkStoreConfig.MaxLookBackPeriod + } return nil } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index dc11d80d62d57..93e5ae18da865 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -317,7 +317,7 @@ func (t *Loki) initStore() (_ services.Service, err error) { } } - chunkStore, err := cortex_storage.NewStore(t.Cfg.StorageConfig.Config, t.Cfg.ChunkStoreConfig, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer, nil, util_log.Logger) + chunkStore, err := cortex_storage.NewStore(t.Cfg.StorageConfig.Config, t.Cfg.ChunkStoreConfig.StoreConfig, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer, nil, util_log.Logger) if err != nil { return } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 88de1f9bc2c49..716cfe953b298 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -6,11 +6,13 @@ import ( "net/http" "time" + "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "google.golang.org/grpc/health/grpc_health_v1" + "github.com/cortexproject/cortex/pkg/util/spanlogger" cortex_validation "github.com/cortexproject/cortex/pkg/util/validation" "github.com/grafana/loki/pkg/iter" @@ -29,6 +31,8 @@ const ( tailerWaitEntryThrottle = time.Second / 2 ) +var nowFunc = func() time.Time { return time.Now() } + type interval struct { start, end time.Time } @@ -83,7 +87,8 @@ func (q *Querier) SetQueryable(queryable logql.Querier) { // Select Implements logql.Querier which select logs via matchers and regex filters. func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { - err := q.validateQueryRequest(ctx, params) + var err error + params.Start, params.End, err = q.validateQueryRequest(ctx, params) if err != nil { return nil, err } @@ -125,7 +130,8 @@ func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) } func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { - err := q.validateQueryRequest(ctx, params) + var err error + params.Start, params.End, err = q.validateQueryRequest(ctx, params) if err != nil { return nil, err } @@ -247,7 +253,7 @@ func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logpr return nil, err } - if err = q.validateQueryTimeRange(userID, *req.Start, *req.End); err != nil { + if *req.Start, *req.End, err = validateQueryTimeRangeLimits(ctx, userID, q.limits, *req.Start, *req.End); err != nil { return nil, err } @@ -303,7 +309,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, }, } - err = q.validateQueryRequest(ctx, histReq) + histReq.Start, histReq.End, err = q.validateQueryRequest(ctx, histReq) if err != nil { return nil, err } @@ -348,7 +354,7 @@ func (q *Querier) Series(ctx context.Context, req *logproto.SeriesRequest) (*log return nil, err } - if err = q.validateQueryTimeRange(userID, req.Start, req.End); err != nil { + if req.Start, req.End, err = validateQueryTimeRangeLimits(ctx, userID, q.limits, req.Start, req.End); err != nil { return nil, err } @@ -357,11 +363,9 @@ func (q *Querier) Series(ctx context.Context, req *logproto.SeriesRequest) (*log defer cancel() return q.awaitSeries(ctx, req) - } func (q *Querier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { - // buffer the channels to the # of calls they're expecting su series := make(chan [][]logproto.SeriesIdentifier, 2) errs := make(chan error, 2) @@ -465,38 +469,52 @@ func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time, return ids, nil } -func (q *Querier) validateQueryRequest(ctx context.Context, req logql.QueryParams) error { +func (q *Querier) validateQueryRequest(ctx context.Context, req logql.QueryParams) (time.Time, time.Time, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { - return err + return time.Time{}, time.Time{}, err } selector, err := req.LogSelector() if err != nil { - return err + return time.Time{}, time.Time{}, err } matchers := selector.Matchers() maxStreamMatchersPerQuery := q.limits.MaxStreamsMatchersPerQuery(userID) if len(matchers) > maxStreamMatchersPerQuery { - return httpgrpc.Errorf(http.StatusBadRequest, + return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, "max streams matchers per query exceeded, matchers-count > limit (%d > %d)", len(matchers), maxStreamMatchersPerQuery) } - return q.validateQueryTimeRange(userID, req.GetStart(), req.GetEnd()) + return validateQueryTimeRangeLimits(ctx, userID, q.limits, req.GetStart(), req.GetEnd()) } -func (q *Querier) validateQueryTimeRange(userID string, from time.Time, through time.Time) error { - if (through).Before(from) { - return httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", through, from) - } +type timeRangeLimits interface { + MaxQueryLookback(string) time.Duration + MaxQueryLength(string) time.Duration +} - maxQueryLength := q.limits.MaxQueryLength(userID) - if maxQueryLength > 0 && (through).Sub(from) > maxQueryLength { - return httpgrpc.Errorf(http.StatusBadRequest, cortex_validation.ErrQueryTooLong, (through).Sub(from), maxQueryLength) - } +func validateQueryTimeRangeLimits(ctx context.Context, userID string, limits timeRangeLimits, from, through time.Time) (time.Time, time.Time, error) { + now := nowFunc() + // Clamp the time range based on the max query lookback. + if maxQueryLookback := limits.MaxQueryLookback(userID); maxQueryLookback > 0 && from.Before(now.Add(-maxQueryLookback)) { + origStartTime := from + from = now.Add(-maxQueryLookback) - return nil + level.Debug(spanlogger.FromContext(ctx)).Log( + "msg", "the start time of the query has been manipulated because of the 'max query lookback' setting", + "original", origStartTime, + "updated", from) + + } + if maxQueryLength := limits.MaxQueryLength(userID); maxQueryLength > 0 && (through).Sub(from) > maxQueryLength { + return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, cortex_validation.ErrQueryTooLong, (through).Sub(from), maxQueryLength) + } + if through.Before(from) { + return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", through, from) + } + return from, through, nil } func (q *Querier) checkTailRequestLimit(ctx context.Context) error { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 05d5d5073995b..a9cb1d9b253d1 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -8,26 +8,20 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/ring" ring_client "github.com/cortexproject/cortex/pkg/ring/client" - + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/grafana/loki/pkg/ingester/client" - "github.com/grafana/loki/pkg/validation" - - "github.com/grafana/loki/pkg/storage" - + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - + "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/validation" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" - - "github.com/cortexproject/cortex/pkg/ring" - "github.com/cortexproject/cortex/pkg/util/flagext" - - "github.com/grafana/loki/pkg/logproto" ) const ( @@ -326,7 +320,6 @@ func TestQuerier_SeriesAPI(t *testing.T) { {Labels: map[string]string{"a": "1", "b": "2"}}, {Labels: map[string]string{"a": "1", "b": "3"}}, }, nil) - }, func(t *testing.T, q *Querier, req *logproto.SeriesRequest) { ctx := user.InjectOrgID(context.Background(), "test") @@ -365,7 +358,6 @@ func TestQuerier_SeriesAPI(t *testing.T) { } func TestQuerier_IngesterMaxQueryLookback(t *testing.T) { - limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) @@ -395,7 +387,6 @@ func TestQuerier_IngesterMaxQueryLookback(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - req := logproto.QueryRequest{ Selector: `{app="foo"}`, Limit: 1000, @@ -437,7 +428,6 @@ func TestQuerier_IngesterMaxQueryLookback(t *testing.T) { queryClient.AssertExpectations(t) ingesterClient.AssertExpectations(t) store.AssertExpectations(t) - }) } } @@ -691,7 +681,45 @@ func TestQuerier_buildQueryIntervals(t *testing.T) { ingesterQueryInterval: ingesterQueryInterval, storeQueryInterval: storeQueryInterval, }) + }) + } +} + +type fakeTimeLimits struct { + maxQueryLookback time.Duration + maxQueryLength time.Duration +} +func (f fakeTimeLimits) MaxQueryLookback(_ string) time.Duration { return f.maxQueryLookback } +func (f fakeTimeLimits) MaxQueryLength(_ string) time.Duration { return f.maxQueryLength } + +func Test_validateQueryTimeRangeLimits(t *testing.T) { + now := time.Now() + nowFunc = func() time.Time { return now } + tests := []struct { + name string + limits timeRangeLimits + from time.Time + through time.Time + wantFrom time.Time + wantThrough time.Time + wantErr bool + }{ + {"no change", fakeTimeLimits{1000 * time.Hour, 1000 * time.Hour}, now, now.Add(24 * time.Hour), now, now.Add(24 * time.Hour), false}, + {"clamped to 24h", fakeTimeLimits{24 * time.Hour, 1000 * time.Hour}, now.Add(-48 * time.Hour), now, now.Add(-24 * time.Hour), now, false}, + {"end before start", fakeTimeLimits{}, now, now.Add(-48 * time.Hour), time.Time{}, time.Time{}, true}, + {"query too long", fakeTimeLimits{maxQueryLength: 24 * time.Hour}, now.Add(-48 * time.Hour), now, time.Time{}, time.Time{}, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + from, through, err := validateQueryTimeRangeLimits(context.Background(), "foo", tt.limits, tt.from, tt.through) + if tt.wantErr { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + require.Equal(t, tt.wantFrom, from, "wanted (%s) got (%s)", tt.wantFrom, from) + require.Equal(t, tt.wantThrough, through) }) } } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 58d8314c5f89a..9949bf61d8f34 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -11,6 +11,9 @@ import ( cortex_local "github.com/cortexproject/cortex/pkg/chunk/local" "github.com/cortexproject/cortex/pkg/chunk/storage" "github.com/cortexproject/cortex/pkg/querier/astmapper" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -69,6 +72,27 @@ func (cfg *SchemaConfig) Validate() error { return cfg.SchemaConfig.Validate() } +type ChunkStoreConfig struct { + chunk.StoreConfig `yaml:",inline"` + + // Limits query start time to be greater than now() - MaxLookBackPeriod, if set. + // Will be deprecated in the next major release. + MaxLookBackPeriod model.Duration `yaml:"max_look_back_period"` +} + +// RegisterFlags adds the flags required to configure this flag set. +func (cfg *ChunkStoreConfig) RegisterFlags(f *flag.FlagSet) { + cfg.StoreConfig.RegisterFlags(f) +} + +func (cfg *ChunkStoreConfig) Validate(logger log.Logger) error { + if cfg.MaxLookBackPeriod > 0 { + flagext.DeprecatedFlagsUsed.Inc() + level.Warn(logger).Log("msg", "running with DEPRECATED flag -store.max-look-back-period, use -querier.max-query-lookback instead.") + } + return cfg.StoreConfig.Validate(logger) +} + // Store is the Loki chunk store to retrieve and save chunks. type Store interface { chunk.Store diff --git a/production/ksonnet/loki/config.libsonnet b/production/ksonnet/loki/config.libsonnet index 3636a5066ae46..97789716a50ed 100644 --- a/production/ksonnet/loki/config.libsonnet +++ b/production/ksonnet/loki/config.libsonnet @@ -281,7 +281,6 @@ consistent_hash: true, }, }, - max_look_back_period: 0, }, // Default schema config is boltdb-shipper/gcs, this will need to be overridden for other stores From d346a8cedf04503e2f12e4590e6169aab2e0f48f Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 3 May 2021 13:53:24 +0200 Subject: [PATCH 2/2] Lint Signed-off-by: Cyril Tovena --- pkg/querier/querier_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index a9cb1d9b253d1..f2d48dde487d3 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -11,17 +11,18 @@ import ( "github.com/cortexproject/cortex/pkg/ring" ring_client "github.com/cortexproject/cortex/pkg/ring/client" "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/grafana/loki/pkg/ingester/client" - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" - "github.com/grafana/loki/pkg/storage" - "github.com/grafana/loki/pkg/validation" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/validation" ) const (