-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
Copy pathlimits.go
352 lines (302 loc) · 9.94 KB
/
limits.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
package queryrange
import (
"context"
"fmt"
"net/http"
"sync"
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/tenant"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
)
const (
limitErrTmpl = "maximum of series (%d) reached for a single query"
)
var (
ErrMaxQueryParalellism = fmt.Errorf("querying is disabled, please contact your Loki operator")
)
// Limits extends the cortex limits interface with support for per tenant splitby parameters
type Limits interface {
queryrangebase.Limits
logql.Limits
QuerySplitDurationDefault() time.Duration
QuerySplitDuration(string) time.Duration
MaxQuerySeries(string) int
MaxEntriesLimitPerQuery(string) int
MinShardingLookback(string) time.Duration
}
type limits struct {
Limits
splitDuration time.Duration
overrides bool
}
func (l limits) QuerySplitDuration(user string) time.Duration {
if !l.overrides {
return l.splitDuration
}
dur := l.Limits.QuerySplitDuration(user)
if dur == 0 {
return l.splitDuration
}
return dur
}
// WithDefaults will construct a Limits with a default value for QuerySplitDuration when no overrides are present.
func WithDefaultLimits(l Limits, conf queryrangebase.Config) Limits {
res := limits{
Limits: l,
overrides: true,
}
// TODO(ssncferreira): Remove once cortex' split_queries_by_interval is fully deprecated in the next major release
if conf.SplitQueriesByInterval != 0 {
level.Warn(util_log.Logger).Log("deprecated", "yaml flag 'query_range.split_queries_by_interval' is deprecated, use yaml flag 'limits_config.split_queries_by_interval' or CLI flag -querier.split-queries-by-interval instead.",
"default split_queries_by_interval", l.QuerySplitDurationDefault())
}
// Set as the default split by interval value
res.splitDuration = l.QuerySplitDurationDefault()
return res
}
// WithSplitByLimits will construct a Limits with a static split by duration.
func WithSplitByLimits(l Limits, splitBy time.Duration) Limits {
return limits{
Limits: l,
splitDuration: splitBy,
}
}
// cacheKeyLimits intersects Limits and CacheSplitter
type cacheKeyLimits struct {
Limits
}
// GenerateCacheKey will panic if it encounters a 0 split duration. We ensure against this by requiring
// a nonzero split interval when caching is enabled
func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrangebase.Request) string {
split := l.QuerySplitDuration(userID)
currentInterval := r.GetStart() / int64(split/time.Millisecond)
// include both the currentInterval and the split duration in key to ensure
// a cache key can't be reused when an interval changes
return fmt.Sprintf("%s:%s:%d:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval, split)
}
type limitsMiddleware struct {
Limits
next queryrangebase.Handler
}
// NewLimitsMiddleware creates a new Middleware that enforces query limits.
func NewLimitsMiddleware(l Limits) queryrangebase.Middleware {
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return limitsMiddleware{
next: next,
Limits: l,
}
})
}
func (l limitsMiddleware) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
log, ctx := spanlogger.New(ctx, "limits")
defer log.Finish()
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
// Clamp the time range based on the max query lookback.
if maxQueryLookback := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLookback); maxQueryLookback > 0 {
minStartTime := util.TimeToMillis(time.Now().Add(-maxQueryLookback))
if r.GetEnd() < minStartTime {
// The request is fully outside the allowed range, so we can return an
// empty response.
level.Debug(log).Log(
"msg", "skipping the execution of the query because its time range is before the 'max query lookback' setting",
"reqStart", util.FormatTimeMillis(r.GetStart()),
"redEnd", util.FormatTimeMillis(r.GetEnd()),
"maxQueryLookback", maxQueryLookback)
return NewEmptyResponse(r)
}
if r.GetStart() < minStartTime {
// Replace the start time in the request.
level.Debug(log).Log(
"msg", "the start time of the query has been manipulated because of the 'max query lookback' setting",
"original", util.FormatTimeMillis(r.GetStart()),
"updated", util.FormatTimeMillis(minStartTime))
r = r.WithStartEnd(minStartTime, r.GetEnd())
}
}
// Enforce the max query length.
if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLength); maxQueryLength > 0 {
queryLen := timestamp.Time(r.GetEnd()).Sub(timestamp.Time(r.GetStart()))
if queryLen > maxQueryLength {
return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, maxQueryLength)
}
}
return l.next.Do(ctx, r)
}
type seriesLimiter struct {
hashes map[uint64]struct{}
rw sync.RWMutex
buf []byte // buf used for hashing to avoid allocations.
maxSeries int
next queryrangebase.Handler
}
type seriesLimiterMiddleware int
// newSeriesLimiter creates a new series limiter middleware for use for a single request.
func newSeriesLimiter(maxSeries int) queryrangebase.Middleware {
return seriesLimiterMiddleware(maxSeries)
}
// Wrap wraps a global handler and returns a per request limited handler.
// The handler returned is thread safe.
func (slm seriesLimiterMiddleware) Wrap(next queryrangebase.Handler) queryrangebase.Handler {
return &seriesLimiter{
hashes: make(map[uint64]struct{}),
maxSeries: int(slm),
buf: make([]byte, 0, 1024),
next: next,
}
}
func (sl *seriesLimiter) Do(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
// no need to fire a request if the limit is already reached.
if sl.isLimitReached() {
return nil, httpgrpc.Errorf(http.StatusBadRequest, limitErrTmpl, sl.maxSeries)
}
res, err := sl.next.Do(ctx, req)
if err != nil {
return res, err
}
promResponse, ok := res.(*LokiPromResponse)
if !ok {
return res, nil
}
if promResponse.Response == nil {
return res, nil
}
sl.rw.Lock()
var hash uint64
for _, s := range promResponse.Response.Data.Result {
lbs := logproto.FromLabelAdaptersToLabels(s.Labels)
hash, sl.buf = lbs.HashWithoutLabels(sl.buf, []string(nil)...)
sl.hashes[hash] = struct{}{}
}
sl.rw.Unlock()
if sl.isLimitReached() {
return nil, httpgrpc.Errorf(http.StatusBadRequest, limitErrTmpl, sl.maxSeries)
}
return res, nil
}
func (sl *seriesLimiter) isLimitReached() bool {
sl.rw.RLock()
defer sl.rw.RUnlock()
return len(sl.hashes) > sl.maxSeries
}
type limitedRoundTripper struct {
next http.RoundTripper
limits Limits
codec queryrangebase.Codec
middleware queryrangebase.Middleware
}
// NewLimitedRoundTripper creates a new roundtripper that enforces MaxQueryParallelism to the `next` roundtripper across `middlewares`.
func NewLimitedRoundTripper(next http.RoundTripper, codec queryrangebase.Codec, limits Limits, middlewares ...queryrangebase.Middleware) http.RoundTripper {
transport := limitedRoundTripper{
next: next,
codec: codec,
limits: limits,
middleware: queryrangebase.MergeMiddlewares(middlewares...),
}
return transport
}
type work struct {
req queryrangebase.Request
ctx context.Context
result chan result
}
type result struct {
response queryrangebase.Response
err error
}
func newWork(ctx context.Context, req queryrangebase.Request) work {
return work{
req: req,
ctx: ctx,
result: make(chan result, 1),
}
}
func (rt limitedRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
var (
wg sync.WaitGroup
intermediate = make(chan work)
ctx, cancel = context.WithCancel(r.Context())
)
defer func() {
cancel()
wg.Wait()
}()
// Do not forward any request header.
request, err := rt.codec.DecodeRequest(ctx, r, nil)
if err != nil {
return nil, err
}
if span := opentracing.SpanFromContext(ctx); span != nil {
request.LogToSpan(span)
}
userid, err := tenant.TenantID(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
parallelism := rt.limits.MaxQueryParallelism(userid)
if parallelism < 1 {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, ErrMaxQueryParalellism.Error())
}
for i := 0; i < parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case w := <-intermediate:
resp, err := rt.do(w.ctx, w.req)
w.result <- result{response: resp, err: err}
case <-ctx.Done():
return
}
}
}()
}
response, err := rt.middleware.Wrap(
queryrangebase.HandlerFunc(func(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
w := newWork(ctx, r)
select {
case intermediate <- w:
case <-ctx.Done():
return nil, ctx.Err()
}
select {
case response := <-w.result:
return response.response, response.err
case <-ctx.Done():
return nil, ctx.Err()
}
})).Do(ctx, request)
if err != nil {
return nil, err
}
return rt.codec.EncodeResponse(ctx, response)
}
func (rt limitedRoundTripper) do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
request, err := rt.codec.EncodeRequest(ctx, r)
if err != nil {
return nil, err
}
if err := user.InjectOrgIDIntoHTTPRequest(ctx, request); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
response, err := rt.next.RoundTrip(request)
if err != nil {
return nil, err
}
defer func() { _ = response.Body.Close() }()
return rt.codec.DecodeResponse(ctx, response, r)
}