Skip to content

Commit

Permalink
max line lengths (component + tenant overrides) (#1686)
Browse files Browse the repository at this point in the history
* bytesize flag support

* max line limits

* configurable tenant override options for max line size

* bytesize yaml support

* validator methods

* distributor validator type

* human friendly line size output, distributor test

* invalid labels distributor test

* docs + vendor

* removes dead limits code
  • Loading branch information
owen-d authored Feb 13, 2020
1 parent 2f061f3 commit d6b8587
Show file tree
Hide file tree
Showing 17 changed files with 640 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features

* [1686](https://github.com/grafana/loki/pull/1686) **owen-d**: Introduces the `distributor.max-line-size` flag and associated yaml config. When enabled, lines longer than this config will not be accepted.
* [1662](https://github.com/grafana/loki/pull/1662) **owen-d**: Introduces binary operators in LogQL
* [1572](https://github.com/grafana/loki/pull/1572) **owen-d**: Introduces the `querier.query-ingesters-within` flag and associated yaml config. When enabled, queries for a time range that do not overlap this lookback interval will not be sent to the ingesters.
* [1558](https://github.com/grafana/loki/pull/1558) **owen-d**: Introduces `ingester.max-chunk-age` which specifies the maximum chunk age before it's cut.
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,10 @@ logs in Loki.
# Maximum number of active streams per user, per ingester. 0 to disable.
[max_streams_per_user: <int> | default = 10000]
# Maximum line size on ingestion path. Example: 256kb.
# There is no limit when unset.
[max_line_size: <string> | default = none ]
# Maximum number of active streams per user, across the cluster. 0 to disable.
# When the global limit is enabled, each ingester is configured with a dynamic
# local limit based on the replication factor and the current number of healthy
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/Microsoft/go-winio v0.4.12 // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/bmatcuk/doublestar v1.2.2
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee
github.com/containerd/containerd v1.3.2 // indirect
github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee h1:BnPxIde0gjtTnc9Er7cxvBk8DHLWhEux0SxayC8dP6I=
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff v1.0.0/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
Expand Down
26 changes: 9 additions & 17 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"

"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -76,7 +75,7 @@ type Distributor struct {
cfg Config
clientCfg client.Config
ingestersRing ring.ReadRing
overrides *validation.Overrides
validator *Validator
pool *cortex_client.Pool

// The global rate limiter requires a distributors ring to count
Expand All @@ -96,6 +95,11 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
}
}

validator, err := NewValidator(overrides)
if err != nil {
return nil, err
}

// Create the configured ingestion rate limit strategy (local or global).
var ingestionRateStrategy limiter.RateLimiterStrategy
var distributorsRing *ring.Lifecycler
Expand All @@ -119,7 +123,7 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
clientCfg: clientCfg,
ingestersRing: ingestersRing,
distributorsRing: distributorsRing,
overrides: overrides,
validator: validator,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ingestersRing, factory, cortex_util.Logger),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
}
Expand Down Expand Up @@ -194,16 +198,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validatedSamplesCount := 0

for _, stream := range req.Streams {
if err := d.validateLabels(userID, stream.Labels); err != nil {
if err := d.validator.ValidateLabels(userID, stream.Labels); err != nil {
validationErr = err
continue
}

entries := make([]logproto.Entry, 0, len(stream.Entries))
for _, entry := range stream.Entries {
if err := cortex_validation.ValidateSample(d.overrides, userID, metricName, cortex_client.Sample{
TimestampMs: entry.Timestamp.UnixNano() / int64(time.Millisecond),
}); err != nil {
if err := d.validator.ValidateEntry(userID, entry); err != nil {
validationErr = err
continue
}
Expand Down Expand Up @@ -281,16 +283,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}

func (d *Distributor) validateLabels(userID, labels string) error {
ls, err := util.ToClientLabels(labels)
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

// everything in `ValidateLabels` returns `httpgrpc.Errorf` errors, no sugaring needed
return cortex_validation.ValidateLabels(d.overrides, userID, ls)
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) {
err := d.sendSamplesErr(ctx, ingester, streamTrackers)
Expand Down
21 changes: 21 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
fe "github.com/grafana/loki/pkg/util/flagext"
"github.com/grafana/loki/pkg/util/validation"
)

Expand All @@ -43,6 +44,8 @@ func TestDistributor(t *testing.T) {

for i, tc := range []struct {
lines int
maxLineSize uint64
mangleLabels bool
expectedResponse *logproto.PushResponse
expectedError error
}{
Expand All @@ -54,17 +57,35 @@ func TestDistributor(t *testing.T) {
lines: 100,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (100 bytes) exceeded while adding 100 lines for a total size of 1000 bytes"),
},
{
lines: 100,
maxLineSize: 1,
expectedResponse: success,
expectedError: httpgrpc.Errorf(http.StatusBadRequest, "max line size (1B) exceeded while adding (10B) size line"),
},
{
lines: 100,
mangleLabels: true,
expectedResponse: success,
expectedError: httpgrpc.Errorf(http.StatusBadRequest, "parse error at line 1, col 4: literal not terminated"),
},
} {
t.Run(fmt.Sprintf("[%d](samples=%v)", i, tc.lines), func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.EnforceMetricName = false
limits.IngestionRateMB = ingestionRateLimit
limits.IngestionBurstSizeMB = ingestionRateLimit
limits.MaxLineSize = fe.ByteSize(tc.maxLineSize)

d := prepare(t, limits, nil)

request := makeWriteRequest(tc.lines, 10)

if tc.mangleLabels {
request.Streams[0].Labels = `{ab"`
}

response, err := d.Push(ctx, request)
assert.Equal(t, tc.expectedResponse, response)
assert.Equal(t, tc.expectedError, err)
Expand Down
16 changes: 16 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package distributor

import "time"

// Limits is an interface for distributor limits/related configs
type Limits interface {
MaxLineSize(userID string) int
EnforceMetricName(userID string) bool
MaxLabelNamesPerSeries(userID string) int
MaxLabelNameLength(userID string) int
MaxLabelValueLength(userID string) int

CreationGracePeriod(userID string) time.Duration
RejectOldSamples(userID string) bool
RejectOldSamplesMaxAge(userID string) time.Duration
}
62 changes: 62 additions & 0 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package distributor

import (
"errors"
"net/http"
"time"

cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/flagext"
"github.com/weaveworks/common/httpgrpc"
)

type Validator struct {
Limits
}

func NewValidator(l Limits) (*Validator, error) {
if l == nil {
return nil, errors.New("nil Limits")
}
return &Validator{l}, nil
}

// ValidateEntry returns an error if the entry is invalid
func (v Validator) ValidateEntry(userID string, entry logproto.Entry) error {
if err := cortex_validation.ValidateSample(v, userID, metricName, cortex_client.Sample{
TimestampMs: entry.Timestamp.UnixNano() / int64(time.Millisecond),
}); err != nil {
return err
}

if maxSize := v.MaxLineSize(userID); maxSize != 0 && len(entry.Line) > maxSize {
// I wish we didn't return httpgrpc errors here as it seems
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
return httpgrpc.Errorf(
http.StatusBadRequest,
"max line size (%s) exceeded while adding (%s) size line",
flagext.ByteSize(uint64(maxSize)).String(),
flagext.ByteSize(uint64(len(entry.Line))).String(),
)
}

return nil
}

// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(userID string, labels string) error {
ls, err := util.ToClientLabels(labels)
if err != nil {
// I wish we didn't return httpgrpc errors here as it seems
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
return cortex_validation.ValidateLabels(v, userID, ls)
}
46 changes: 46 additions & 0 deletions pkg/util/flagext/bytesize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package flagext

import (
"strings"

"github.com/c2h5oh/datasize"
)

// ByteSize is a flag parsing compatibility type for constructing human friendly sizes.
// It implements flag.Value & flag.Getter.
type ByteSize uint64

func (bs ByteSize) String() string {
return datasize.ByteSize(bs).String()
}

func (bs *ByteSize) Set(s string) error {
var v datasize.ByteSize

// Bytesize currently doesn't handle things like Mb, but only handles MB.
// Therefore we capitalize just for convenience
if err := v.UnmarshalText([]byte(strings.ToUpper(s))); err != nil {
return err
}
*bs = ByteSize(v.Bytes())
return nil
}

func (bs ByteSize) Get() interface{} {
return bs.Val()
}

func (bs ByteSize) Val() int {
return int(bs)
}

/// UnmarshalYAML the Unmarshaler interface of the yaml pkg.
func (bs *ByteSize) UnmarshalYAML(unmarshal func(interface{}) error) error {
var str string
err := unmarshal(&str)
if err != nil {
return err
}

return bs.Set(str)
}
104 changes: 104 additions & 0 deletions pkg/util/flagext/bytesize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package flagext

import (
"testing"

"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
)

func Test_ByteSize(t *testing.T) {
for _, tc := range []struct {
in string
err bool
out int
}{
{
in: "abc",
err: true,
},
{
in: "",
err: false,
out: 0,
},
{
in: "0",
err: false,
out: 0,
},
{
in: "1b",
err: false,
out: 1,
},
{
in: "100kb",
err: false,
out: 100 << 10,
},
{
in: "100 KB",
err: false,
out: 100 << 10,
},
{
// ensure lowercase works
in: "50mb",
err: false,
out: 50 << 20,
},
{
// ensure mixed capitalization works
in: "50Mb",
err: false,
out: 50 << 20,
},
{
in: "256GB",
err: false,
out: 256 << 30,
},
} {
t.Run(tc.in, func(t *testing.T) {
var bs ByteSize

err := bs.Set(tc.in)
if tc.err {
require.NotNil(t, err)
} else {
require.Nil(t, err)
require.Equal(t, tc.out, bs.Get().(int))
}

})
}
}

func Test_ByteSizeYAML(t *testing.T) {
for _, tc := range []struct {
in string
err bool
out ByteSize
}{
{
in: "256GB",
out: ByteSize(256 << 30),
},
{
in: "abc",
err: true,
},
} {
t.Run(tc.in, func(t *testing.T) {
var out ByteSize
err := yaml.Unmarshal([]byte(tc.in), &out)
if tc.err {
require.NotNil(t, err)
} else {
require.Nil(t, err)
require.Equal(t, tc.out, out)
}
})
}
}
Loading

0 comments on commit d6b8587

Please sign in to comment.