From 68c02189d0d78775af0aa5b39ac709bfd58ea095 Mon Sep 17 00:00:00 2001 From: Aditya C S Date: Thu, 12 Mar 2020 17:28:45 +0530 Subject: [PATCH] Support configurable maximum of the limits parameter --- docs/configuration/README.md | 3 + pkg/querier/http.go | 44 +++++++++++++-- pkg/querier/queryrange/limits.go | 1 + pkg/querier/queryrange/roundtrip.go | 28 +++++++++ pkg/querier/queryrange/roundtrip_test.go | 72 +++++++++++++++++++++++- pkg/util/validation/limits.go | 7 +++ 6 files changed, 147 insertions(+), 8 deletions(-) diff --git a/docs/configuration/README.md b/docs/configuration/README.md index 8a28a558d928f..3608fb2fd7463 100644 --- a/docs/configuration/README.md +++ b/docs/configuration/README.md @@ -789,6 +789,9 @@ logs in Loki. # There is no limit when unset. [max_line_size: | default = none ] +# Maximum number of log entries that will be returned for a query. 0 to disable. +[max_entries_limit: | default = 5000 ] + # Maximum number of active streams per user, across the cluster. 0 to disable. # When the global limit is enabled, each ingester is configured with a dynamic # local limit based on the replication factor and the current number of healthy diff --git a/pkg/querier/http.go b/pkg/querier/http.go index ee6988ebc4cf9..73b4a12089393 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -5,12 +5,6 @@ import ( "net/http" "time" - "github.com/grafana/loki/pkg/loghttp" - loghttp_legacy "github.com/grafana/loki/pkg/loghttp/legacy" - "github.com/grafana/loki/pkg/logql" - "github.com/grafana/loki/pkg/logql/marshal" - marshal_legacy "github.com/grafana/loki/pkg/logql/marshal/legacy" - "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" "github.com/gorilla/websocket" @@ -18,6 +12,13 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + + "github.com/grafana/loki/pkg/loghttp" + loghttp_legacy "github.com/grafana/loki/pkg/loghttp/legacy" + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/marshal" + marshal_legacy "github.com/grafana/loki/pkg/logql/marshal/legacy" ) const ( @@ -43,6 +44,12 @@ func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) { writeError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w) return } + + if err := q.validateEntriesLimits(ctx, request.Limit); err != nil { + writeError(err, w) + return + } + query := q.engine.NewRangeQuery(request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit) result, err := query.Exec(ctx) if err != nil { @@ -67,6 +74,12 @@ func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) { writeError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w) return } + + if err := q.validateEntriesLimits(ctx, request.Limit); err != nil { + writeError(err, w) + return + } + query := q.engine.NewInstantQuery(request.Query, request.Ts, request.Direction, request.Limit) result, err := query.Exec(ctx) if err != nil { @@ -109,6 +122,11 @@ func (q *Querier) LogQueryHandler(w http.ResponseWriter, r *http.Request) { return } + if err := q.validateEntriesLimits(ctx, request.Limit); err != nil { + writeError(err, w) + return + } + query := q.engine.NewRangeQuery(request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit) result, err := query.Exec(ctx) if err != nil { @@ -326,3 +344,17 @@ func writeError(err error, w http.ResponseWriter) { http.Error(w, err.Error(), http.StatusInternalServerError) } } + +func (q *Querier) validateEntriesLimits(ctx context.Context, limit uint32) error { + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + + maxEntriesLimit := q.limits.MaxEntriesLimitPerQuery(userID) + if int(limit) > maxEntriesLimit && maxEntriesLimit != 0 { + return httpgrpc.Errorf(http.StatusBadRequest, + "max entries limit per query exceeded, limit > max_entries_limit (%d > %d)", limit, maxEntriesLimit) + } + return nil +} diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 1d697f9a54915..d87ca2c90e440 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -11,6 +11,7 @@ import ( type Limits interface { queryrange.Limits QuerySplitDuration(string) time.Duration + MaxEntriesLimitPerQuery(string) int } type limits struct { diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 8f8904a36185b..11ccf90446335 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -3,6 +3,8 @@ package queryrange import ( "flag" "net/http" + "net/url" + "strconv" "strings" "github.com/cortexproject/cortex/pkg/chunk/cache" @@ -13,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/logql" ) @@ -63,10 +66,15 @@ func NewTripperware(cfg Config, log log.Logger, limits Limits, registerer promet // weavework server uses httpgrpc errors for status code. return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } + if _, ok := expr.(logql.SampleExpr); ok { return metricRT.RoundTrip(req) } if logSelector, ok := expr.(logql.LogSelectorExpr); ok { + if err := validateLimits(req, params, limits); err != nil { + return nil, err + } + // backport the old regexp params into the query params regexp := params.Get("regexp") if regexp != "" { @@ -87,6 +95,26 @@ func NewTripperware(cfg Config, log log.Logger, limits Limits, registerer promet }, cache, nil } +// validates log entries limits +func validateLimits(req *http.Request, params url.Values, limits Limits) error { + userID, err := user.ExtractOrgID(req.Context()) + if err != nil { + return httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + + reqLimit, err := strconv.Atoi(params.Get("limit")) + if err != nil { + return httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + + maxEntriesLimit := limits.MaxEntriesLimitPerQuery(userID) + if reqLimit > maxEntriesLimit && maxEntriesLimit != 0 { + return httpgrpc.Errorf(http.StatusBadRequest, + "max entries limit per query exceeded, limit > max_entries_limit (%d > %d)", reqLimit, maxEntriesLimit) + } + return nil +} + // NewLogFilterTripperware creates a new frontend tripperware responsible for handling log requests with regex. func NewLogFilterTripperware( cfg Config, diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 15a957bbddd08..809145aa28496 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" @@ -286,9 +287,72 @@ func TestRegexpParamsSupport(t *testing.T) { require.NoError(t, err) } +func TestEntriesLimitsTripperware(t *testing.T) { + tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, nil) + if stopper != nil { + defer stopper.Stop() + } + require.NoError(t, err) + rt, err := newfakeRoundTripper() + require.NoError(t, err) + defer rt.Close() + + lreq := &LokiRequest{ + Query: `{app="foo"}`, // no regex so it should go to the querier + Limit: 10000, + StartTs: testTime.Add(-6 * time.Hour), + EndTs: testTime, + Direction: logproto.FORWARD, + Path: "/loki/api/v1/query_range", + } + + ctx := user.InjectOrgID(context.Background(), "1") + req, err := lokiCodec.EncodeRequest(ctx, lreq) + require.NoError(t, err) + + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + _, err = tpw(rt).RoundTrip(req) + require.Equal(t, httpgrpc.Errorf(http.StatusBadRequest, "max entries limit per query exceeded, limit > max_entries_limit (10000 > 5000)"), err) +} + +func TestEntriesLimitWithZeroTripperware(t *testing.T) { + tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{}, nil) + if stopper != nil { + defer stopper.Stop() + } + require.NoError(t, err) + rt, err := newfakeRoundTripper() + require.NoError(t, err) + defer rt.Close() + + lreq := &LokiRequest{ + Query: `{app="foo"}`, // no regex so it should go to the querier + Limit: 10000, + StartTs: testTime.Add(-6 * time.Hour), + EndTs: testTime, + Direction: logproto.FORWARD, + Path: "/loki/api/v1/query_range", + } + + ctx := user.InjectOrgID(context.Background(), "1") + req, err := lokiCodec.EncodeRequest(ctx, lreq) + require.NoError(t, err) + + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + _, err = tpw(rt).RoundTrip(req) + require.NoError(t, err) +} + type fakeLimits struct { - maxQueryParallelism int - splits map[string]time.Duration + maxQueryParallelism int + maxEntriesLimitPerQuery int + splits map[string]time.Duration } func (f fakeLimits) QuerySplitDuration(key string) time.Duration { @@ -309,6 +373,10 @@ func (f fakeLimits) MaxQueryParallelism(string) int { return f.maxQueryParallelism } +func (f fakeLimits) MaxEntriesLimitPerQuery(string) int { + return f.maxEntriesLimitPerQuery +} + func counter() (*int, http.Handler) { count := 0 var lock sync.Mutex diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 34ed39745dfeb..98c21fbd8e3c1 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -44,6 +44,7 @@ type Limits struct { CardinalityLimit int `yaml:"cardinality_limit"` MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query"` MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests"` + MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query"` // Query frontend enforced limits. The default is actually parameterized by the queryrange config. QuerySplitDuration time.Duration `yaml:"split_queries_by_interval"` @@ -66,6 +67,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", 14*24*time.Hour, "Maximum accepted sample age before rejecting.") f.DurationVar(&l.CreationGracePeriod, "validation.create-grace-period", 10*time.Minute, "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.") f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.") + f.IntVar(&l.MaxEntriesLimitPerQuery, "validation.max-entries-limit", 5000, "Per-user entries limit per query") f.IntVar(&l.MaxLocalStreamsPerUser, "ingester.max-streams-per-user", 10e3, "Maximum number of active streams per user, per ingester. 0 to disable.") f.IntVar(&l.MaxGlobalStreamsPerUser, "ingester.max-global-streams-per-user", 0, "Maximum number of active streams per user, across the cluster. 0 to disable.") @@ -236,6 +238,11 @@ func (o *Overrides) MaxLineSize(userID string) int { return o.getOverridesForUser(userID).MaxLineSize.Val() } +// MaxEntriesLimitPerQuery returns the limit to number of entries the querier should return per query. +func (o *Overrides) MaxEntriesLimitPerQuery(userID string) int { + return o.getOverridesForUser(userID).MaxEntriesLimitPerQuery +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits(userID)