Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support configurable maximum of the limits parameter #1798

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,9 @@ logs in Loki.
# There is no limit when unset.
[max_line_size: <string> | default = none ]

# Maximum number of log entries that will be returned for a query. 0 to disable.
[max_entries_limit: <int> | default = 5000 ]
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved

# Maximum number of active streams per user, across the cluster. 0 to disable.
# When the global limit is enabled, each ingester is configured with a dynamic
# local limit based on the replication factor and the current number of healthy
Expand Down
44 changes: 38 additions & 6 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@ import (
"net/http"
"time"

"github.com/grafana/loki/pkg/loghttp"
loghttp_legacy "github.com/grafana/loki/pkg/loghttp/legacy"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/marshal"
marshal_legacy "github.com/grafana/loki/pkg/logql/marshal/legacy"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/websocket"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/loghttp"
loghttp_legacy "github.com/grafana/loki/pkg/loghttp/legacy"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/marshal"
marshal_legacy "github.com/grafana/loki/pkg/logql/marshal/legacy"
)

const (
Expand All @@ -43,6 +44,12 @@ func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) {
writeError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}

if err := q.validateEntriesLimits(ctx, request.Limit); err != nil {
writeError(err, w)
return
}

query := q.engine.NewRangeQuery(request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit)
result, err := query.Exec(ctx)
if err != nil {
Expand All @@ -67,6 +74,12 @@ func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) {
writeError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}

if err := q.validateEntriesLimits(ctx, request.Limit); err != nil {
writeError(err, w)
return
}

query := q.engine.NewInstantQuery(request.Query, request.Ts, request.Direction, request.Limit)
result, err := query.Exec(ctx)
if err != nil {
Expand Down Expand Up @@ -109,6 +122,11 @@ func (q *Querier) LogQueryHandler(w http.ResponseWriter, r *http.Request) {
return
}

if err := q.validateEntriesLimits(ctx, request.Limit); err != nil {
writeError(err, w)
return
}

query := q.engine.NewRangeQuery(request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit)
result, err := query.Exec(ctx)
if err != nil {
Expand Down Expand Up @@ -326,3 +344,17 @@ func writeError(err error, w http.ResponseWriter) {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

func (q *Querier) validateEntriesLimits(ctx context.Context, limit uint32) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to validate this also for LogQueryHandler but only once we have verified it's a log request.

So after those lines:

	// short circuit metric queries
	if _, ok := expr.(logql.SampleExpr); ok {
		writeError(httpgrpc.Errorf(http.StatusBadRequest, "legacy endpoints only support %s result type", logql.ValueTypeStreams), w)
		return
	}

userID, err := user.ExtractOrgID(ctx)
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

maxEntriesLimit := q.limits.MaxEntriesLimitPerQuery(userID)
if int(limit) > maxEntriesLimit && maxEntriesLimit != 0 {
return httpgrpc.Errorf(http.StatusBadRequest,
"max entries limit per query exceeded, limit > max_entries_limit (%d > %d)", limit, maxEntriesLimit)
}
return nil
}
1 change: 1 addition & 0 deletions pkg/querier/queryrange/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
type Limits interface {
queryrange.Limits
QuerySplitDuration(string) time.Duration
MaxEntriesLimitPerQuery(string) int
}

type limits struct {
Expand Down
28 changes: 28 additions & 0 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package queryrange
import (
"flag"
"net/http"
"net/url"
"strconv"
"strings"

"github.com/cortexproject/cortex/pkg/chunk/cache"
Expand All @@ -13,6 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/grafana/loki/pkg/logql"
)
Expand Down Expand Up @@ -63,10 +66,15 @@ func NewTripperware(cfg Config, log log.Logger, limits Limits, registerer promet
// weavework server uses httpgrpc errors for status code.
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

if _, ok := expr.(logql.SampleExpr); ok {
return metricRT.RoundTrip(req)
}
if logSelector, ok := expr.(logql.LogSelectorExpr); ok {
if err := validateLimits(req, params, limits); err != nil {
return nil, err
}

// backport the old regexp params into the query params
regexp := params.Get("regexp")
if regexp != "" {
Expand All @@ -87,6 +95,26 @@ func NewTripperware(cfg Config, log log.Logger, limits Limits, registerer promet
}, cache, nil
}

// validates log entries limits
func validateLimits(req *http.Request, params url.Values, limits Limits) error {
userID, err := user.ExtractOrgID(req.Context())
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

reqLimit, err := strconv.Atoi(params.Get("limit"))
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

maxEntriesLimit := limits.MaxEntriesLimitPerQuery(userID)
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
if reqLimit > maxEntriesLimit && maxEntriesLimit != 0 {
return httpgrpc.Errorf(http.StatusBadRequest,
"max entries limit per query exceeded, limit > max_entries_limit (%d > %d)", reqLimit, maxEntriesLimit)
}
return nil
}

// NewLogFilterTripperware creates a new frontend tripperware responsible for handling log requests with regex.
func NewLogFilterTripperware(
cfg Config,
Expand Down
72 changes: 70 additions & 2 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"

Expand Down Expand Up @@ -286,9 +287,72 @@ func TestRegexpParamsSupport(t *testing.T) {
require.NoError(t, err)
}

func TestEntriesLimitsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, nil)
if stopper != nil {
defer stopper.Stop()
}
require.NoError(t, err)
rt, err := newfakeRoundTripper()
require.NoError(t, err)
defer rt.Close()

lreq := &LokiRequest{
Query: `{app="foo"}`, // no regex so it should go to the querier
Limit: 10000,
StartTs: testTime.Add(-6 * time.Hour),
EndTs: testTime,
Direction: logproto.FORWARD,
Path: "/loki/api/v1/query_range",
}

ctx := user.InjectOrgID(context.Background(), "1")
req, err := lokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)

req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)

_, err = tpw(rt).RoundTrip(req)
require.Equal(t, httpgrpc.Errorf(http.StatusBadRequest, "max entries limit per query exceeded, limit > max_entries_limit (10000 > 5000)"), err)
}

func TestEntriesLimitWithZeroTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{}, nil)
if stopper != nil {
defer stopper.Stop()
}
require.NoError(t, err)
rt, err := newfakeRoundTripper()
require.NoError(t, err)
defer rt.Close()

lreq := &LokiRequest{
Query: `{app="foo"}`, // no regex so it should go to the querier
Limit: 10000,
StartTs: testTime.Add(-6 * time.Hour),
EndTs: testTime,
Direction: logproto.FORWARD,
Path: "/loki/api/v1/query_range",
}

ctx := user.InjectOrgID(context.Background(), "1")
req, err := lokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)

req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)

_, err = tpw(rt).RoundTrip(req)
require.NoError(t, err)
}

type fakeLimits struct {
maxQueryParallelism int
splits map[string]time.Duration
maxQueryParallelism int
maxEntriesLimitPerQuery int
splits map[string]time.Duration
}

func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
Expand All @@ -309,6 +373,10 @@ func (f fakeLimits) MaxQueryParallelism(string) int {
return f.maxQueryParallelism
}

func (f fakeLimits) MaxEntriesLimitPerQuery(string) int {
return f.maxEntriesLimitPerQuery
}

func counter() (*int, http.Handler) {
count := 0
var lock sync.Mutex
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Limits struct {
CardinalityLimit int `yaml:"cardinality_limit"`
MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query"`
MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests"`
MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query"`

// Query frontend enforced limits. The default is actually parameterized by the queryrange config.
QuerySplitDuration time.Duration `yaml:"split_queries_by_interval"`
Expand All @@ -66,6 +67,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", 14*24*time.Hour, "Maximum accepted sample age before rejecting.")
f.DurationVar(&l.CreationGracePeriod, "validation.create-grace-period", 10*time.Minute, "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.")
f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.")
f.IntVar(&l.MaxEntriesLimitPerQuery, "validation.max-entries-limit", 5000, "Per-user entries limit per query")

f.IntVar(&l.MaxLocalStreamsPerUser, "ingester.max-streams-per-user", 10e3, "Maximum number of active streams per user, per ingester. 0 to disable.")
f.IntVar(&l.MaxGlobalStreamsPerUser, "ingester.max-global-streams-per-user", 0, "Maximum number of active streams per user, across the cluster. 0 to disable.")
Expand Down Expand Up @@ -236,6 +238,11 @@ func (o *Overrides) MaxLineSize(userID string) int {
return o.getOverridesForUser(userID).MaxLineSize.Val()
}

// MaxEntriesLimitPerQuery returns the limit to number of entries the querier should return per query.
func (o *Overrides) MaxEntriesLimitPerQuery(userID string) int {
return o.getOverridesForUser(userID).MaxEntriesLimitPerQuery
}

func (o *Overrides) getOverridesForUser(userID string) *Limits {
if o.tenantLimits != nil {
l := o.tenantLimits(userID)
Expand Down