Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

max line lengths (component + tenant overrides) #1686

Merged
merged 10 commits into from
Feb 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features

* [1686](https://github.com/grafana/loki/pull/1686) **owen-d**: Introduces the `distributor.max-line-size` flag and associated yaml config. When enabled, lines longer than this config will not be accepted.
* [1662](https://github.com/grafana/loki/pull/1662) **owen-d**: Introduces binary operators in LogQL
* [1572](https://github.com/grafana/loki/pull/1572) **owen-d**: Introduces the `querier.query-ingesters-within` flag and associated yaml config. When enabled, queries for a time range that do not overlap this lookback interval will not be sent to the ingesters.
* [1558](https://github.com/grafana/loki/pull/1558) **owen-d**: Introduces `ingester.max-chunk-age` which specifies the maximum chunk age before it's cut.
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,10 @@ logs in Loki.
# Maximum number of active streams per user, per ingester. 0 to disable.
[max_streams_per_user: <int> | default = 10000]

# Maximum line size on ingestion path. Example: 256kb.
# There is no limit when unset.
[max_line_size: <string> | default = none ]

# Maximum number of active streams per user, across the cluster. 0 to disable.
# When the global limit is enabled, each ingester is configured with a dynamic
# local limit based on the replication factor and the current number of healthy
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/Microsoft/go-winio v0.4.12 // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/bmatcuk/doublestar v1.2.2
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee
github.com/containerd/containerd v1.3.2 // indirect
github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee h1:BnPxIde0gjtTnc9Er7cxvBk8DHLWhEux0SxayC8dP6I=
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff v1.0.0/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
Expand Down
26 changes: 9 additions & 17 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"

"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -76,7 +75,7 @@ type Distributor struct {
cfg Config
clientCfg client.Config
ingestersRing ring.ReadRing
overrides *validation.Overrides
validator *Validator
pool *cortex_client.Pool

// The global rate limiter requires a distributors ring to count
Expand All @@ -96,6 +95,11 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
}
}

validator, err := NewValidator(overrides)
if err != nil {
return nil, err
}

// Create the configured ingestion rate limit strategy (local or global).
var ingestionRateStrategy limiter.RateLimiterStrategy
var distributorsRing *ring.Lifecycler
Expand All @@ -119,7 +123,7 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
clientCfg: clientCfg,
ingestersRing: ingestersRing,
distributorsRing: distributorsRing,
overrides: overrides,
validator: validator,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ingestersRing, factory, cortex_util.Logger),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
}
Expand Down Expand Up @@ -194,16 +198,14 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validatedSamplesCount := 0

for _, stream := range req.Streams {
if err := d.validateLabels(userID, stream.Labels); err != nil {
if err := d.validator.ValidateLabels(userID, stream.Labels); err != nil {
validationErr = err
continue
}

entries := make([]logproto.Entry, 0, len(stream.Entries))
for _, entry := range stream.Entries {
if err := cortex_validation.ValidateSample(d.overrides, userID, metricName, cortex_client.Sample{
TimestampMs: entry.Timestamp.UnixNano() / int64(time.Millisecond),
}); err != nil {
if err := d.validator.ValidateEntry(userID, entry); err != nil {
validationErr = err
continue
}
Expand Down Expand Up @@ -281,16 +283,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}

func (d *Distributor) validateLabels(userID, labels string) error {
ls, err := util.ToClientLabels(labels)
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

// everything in `ValidateLabels` returns `httpgrpc.Errorf` errors, no sugaring needed
return cortex_validation.ValidateLabels(d.overrides, userID, ls)
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) {
err := d.sendSamplesErr(ctx, ingester, streamTrackers)
Expand Down
21 changes: 21 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
fe "github.com/grafana/loki/pkg/util/flagext"
"github.com/grafana/loki/pkg/util/validation"
)

Expand All @@ -43,6 +44,8 @@ func TestDistributor(t *testing.T) {

for i, tc := range []struct {
lines int
maxLineSize uint64
mangleLabels bool
expectedResponse *logproto.PushResponse
expectedError error
}{
Expand All @@ -54,17 +57,35 @@ func TestDistributor(t *testing.T) {
lines: 100,
expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (100 bytes) exceeded while adding 100 lines for a total size of 1000 bytes"),
},
{
lines: 100,
maxLineSize: 1,
expectedResponse: success,
expectedError: httpgrpc.Errorf(http.StatusBadRequest, "max line size (1B) exceeded while adding (10B) size line"),
},
{
lines: 100,
mangleLabels: true,
expectedResponse: success,
expectedError: httpgrpc.Errorf(http.StatusBadRequest, "parse error at line 1, col 4: literal not terminated"),
},
} {
t.Run(fmt.Sprintf("[%d](samples=%v)", i, tc.lines), func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.EnforceMetricName = false
limits.IngestionRateMB = ingestionRateLimit
limits.IngestionBurstSizeMB = ingestionRateLimit
limits.MaxLineSize = fe.ByteSize(tc.maxLineSize)

d := prepare(t, limits, nil)

request := makeWriteRequest(tc.lines, 10)

if tc.mangleLabels {
request.Streams[0].Labels = `{ab"`
}

response, err := d.Push(ctx, request)
assert.Equal(t, tc.expectedResponse, response)
assert.Equal(t, tc.expectedError, err)
Expand Down
16 changes: 16 additions & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package distributor

import "time"

// Limits is an interface for distributor limits/related configs
type Limits interface {
MaxLineSize(userID string) int
EnforceMetricName(userID string) bool
MaxLabelNamesPerSeries(userID string) int
MaxLabelNameLength(userID string) int
MaxLabelValueLength(userID string) int

CreationGracePeriod(userID string) time.Duration
RejectOldSamples(userID string) bool
RejectOldSamplesMaxAge(userID string) time.Duration
}
62 changes: 62 additions & 0 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package distributor

import (
"errors"
"net/http"
"time"

cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/flagext"
"github.com/weaveworks/common/httpgrpc"
)

type Validator struct {
Limits
}

func NewValidator(l Limits) (*Validator, error) {
if l == nil {
return nil, errors.New("nil Limits")
}
return &Validator{l}, nil
}

// ValidateEntry returns an error if the entry is invalid
func (v Validator) ValidateEntry(userID string, entry logproto.Entry) error {
if err := cortex_validation.ValidateSample(v, userID, metricName, cortex_client.Sample{
TimestampMs: entry.Timestamp.UnixNano() / int64(time.Millisecond),
}); err != nil {
return err
}

if maxSize := v.MaxLineSize(userID); maxSize != 0 && len(entry.Line) > maxSize {
// I wish we didn't return httpgrpc errors here as it seems
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
return httpgrpc.Errorf(
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
http.StatusBadRequest,
"max line size (%s) exceeded while adding (%s) size line",
flagext.ByteSize(uint64(maxSize)).String(),
flagext.ByteSize(uint64(len(entry.Line))).String(),
)
}

return nil
}

// Validate labels returns an error if the labels are invalid
func (v Validator) ValidateLabels(userID string, labels string) error {
ls, err := util.ToClientLabels(labels)
if err != nil {
// I wish we didn't return httpgrpc errors here as it seems
// an orthogonal concept (we need not use ValidateLabels in this context)
// but the upstream cortex_validation pkg uses it, so we keep this
// for parity.
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
return cortex_validation.ValidateLabels(v, userID, ls)
}
46 changes: 46 additions & 0 deletions pkg/util/flagext/bytesize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package flagext

import (
"strings"

"github.com/c2h5oh/datasize"
)

// ByteSize is a flag parsing compatibility type for constructing human friendly sizes.
// It implements flag.Value & flag.Getter.
type ByteSize uint64

func (bs ByteSize) String() string {
return datasize.ByteSize(bs).String()
}

func (bs *ByteSize) Set(s string) error {
var v datasize.ByteSize

// Bytesize currently doesn't handle things like Mb, but only handles MB.
// Therefore we capitalize just for convenience
if err := v.UnmarshalText([]byte(strings.ToUpper(s))); err != nil {
return err
}
*bs = ByteSize(v.Bytes())
return nil
}

func (bs ByteSize) Get() interface{} {
return bs.Val()
}

func (bs ByteSize) Val() int {
return int(bs)
}

/// UnmarshalYAML the Unmarshaler interface of the yaml pkg.
func (bs *ByteSize) UnmarshalYAML(unmarshal func(interface{}) error) error {
var str string
err := unmarshal(&str)
if err != nil {
return err
}

return bs.Set(str)
}
104 changes: 104 additions & 0 deletions pkg/util/flagext/bytesize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package flagext

import (
"testing"

"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
)

func Test_ByteSize(t *testing.T) {
for _, tc := range []struct {
in string
err bool
out int
}{
{
in: "abc",
err: true,
},
{
in: "",
err: false,
out: 0,
},
{
in: "0",
err: false,
out: 0,
},
{
in: "1b",
err: false,
out: 1,
},
{
in: "100kb",
err: false,
out: 100 << 10,
},
{
in: "100 KB",
err: false,
out: 100 << 10,
},
{
// ensure lowercase works
in: "50mb",
err: false,
out: 50 << 20,
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
},
{
// ensure mixed capitalization works
in: "50Mb",
err: false,
out: 50 << 20,
},
{
in: "256GB",
err: false,
out: 256 << 30,
},
} {
t.Run(tc.in, func(t *testing.T) {
var bs ByteSize

err := bs.Set(tc.in)
if tc.err {
require.NotNil(t, err)
} else {
require.Nil(t, err)
require.Equal(t, tc.out, bs.Get().(int))
}

})
}
}

func Test_ByteSizeYAML(t *testing.T) {
for _, tc := range []struct {
in string
err bool
out ByteSize
}{
{
in: "256GB",
out: ByteSize(256 << 30),
},
{
in: "abc",
err: true,
},
} {
t.Run(tc.in, func(t *testing.T) {
var out ByteSize
err := yaml.Unmarshal([]byte(tc.in), &out)
if tc.err {
require.NotNil(t, err)
} else {
require.Nil(t, err)
require.Equal(t, tc.out, out)
}
})
}
}
Loading