Skip to content

Commit

Permalink
Loki: Per Tenant Runtime Configs (#3460)
Browse files Browse the repository at this point in the history
* Add per tenant configs mechanism

* Add per tenant logging of stream creation

* fix tests

* enable per tenant configs on push request logging.

* fixing up the stream log, changing log levels to debug
  • Loading branch information
slim-bean authored Mar 10, 2021
1 parent 93f2b2d commit 8012362
Show file tree
Hide file tree
Showing 17 changed files with 212 additions and 51 deletions.
1 change: 1 addition & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ auth_enabled: false

server:
http_listen_port: 3100
grpc_listen_port: 9096

ingester:
wal:
Expand Down
5 changes: 4 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -65,6 +66,7 @@ type Distributor struct {

cfg Config
clientCfg client.Config
tenantConfigs *runtime.TenantConfigs
ingestersRing ring.ReadRing
validator *Validator
pool *ring_client.Pool
Expand All @@ -82,7 +84,7 @@ type Distributor struct {
}

// New a distributor creates.
func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overrides *validation.Overrides, registerer prometheus.Registerer) (*Distributor, error) {
func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, ingestersRing ring.ReadRing, overrides *validation.Overrides, registerer prometheus.Registerer) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
factory = func(addr string) (ring_client.PoolClient, error) {
Expand Down Expand Up @@ -121,6 +123,7 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
tenantConfigs: configs,
ingestersRing: ingestersRing,
distributorsRing: distributorsRing,
validator: validator,
Expand Down
3 changes: 2 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
fe "github.com/grafana/loki/pkg/util/flagext"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -313,7 +314,7 @@ func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client, factory
}
}

d, err := New(distributorConfig, clientConfig, ingestersRing, overrides, nil)
d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))

Expand Down
50 changes: 45 additions & 5 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"fmt"
"math"
"net/http"
"strings"
"time"

"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/dustin/go-humanize"
gokit "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -43,29 +45,67 @@ const applicationJSON = "application/json"

// PushHandler reads a snappy-compressed proto from the HTTP body.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
req, err := ParseRequest(r)
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := user.ExtractOrgID(r.Context())
req, err := ParseRequest(logger, userID, r)
if err != nil {
if d.tenantConfigs.LogPushRequest(userID) {
level.Debug(logger).Log(
"msg", "push request failed",
"code", http.StatusBadRequest,
"err", err,
)
}
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if d.tenantConfigs.LogPushRequestStreams(userID) {
var sb strings.Builder
for _, s := range req.Streams {
sb.WriteString(s.Labels)
}
level.Debug(logger).Log(
"msg", "push request streams",
"streams", sb.String(),
)
}

_, err = d.Push(r.Context(), req)
if err == nil {
if d.tenantConfigs.LogPushRequest(userID) {
level.Debug(logger).Log(
"msg", "push request successful",
)
}
w.WriteHeader(http.StatusNoContent)
return
}

resp, ok := httpgrpc.HTTPResponseFromError(err)
if ok {
http.Error(w, string(resp.Body), int(resp.Code))
body := string(resp.Body)
if d.tenantConfigs.LogPushRequest(userID) {
level.Debug(logger).Log(
"msg", "push request failed",
"code", resp.Code,
"err", body,
)
}
http.Error(w, body, int(resp.Code))
} else {
if d.tenantConfigs.LogPushRequest(userID) {
level.Debug(logger).Log(
"msg", "push request failed",
"code", http.StatusInternalServerError,
"err", err.Error(),
)
}
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

func ParseRequest(r *http.Request) (*logproto.PushRequest, error) {
userID, _ := user.ExtractOrgID(r.Context())
logger := util_log.WithContext(r.Context(), util_log.Logger)
func ParseRequest(logger gokit.Logger, userID string, r *http.Request) (*logproto.PushRequest, error) {

var body lokiutil.SizeReader

Expand Down
3 changes: 2 additions & 1 deletion pkg/distributor/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"testing"

util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -75,7 +76,7 @@ func TestParseRequest(t *testing.T) {
if len(test.contentEncoding) > 0 {
request.Header.Add("Content-Encoding", test.contentEncoding)
}
data, err := ParseRequest(request)
data, err := ParseRequest(util_log.Logger, "", request)
if test.valid {
assert.Nil(t, err, "Should not give error for %d", index)
assert.NotNil(t, data, "Should give data for %d", index)
Expand Down
23 changes: 12 additions & 11 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -70,7 +71,7 @@ func TestIngesterWAL(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -113,7 +114,7 @@ func TestIngesterWAL(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -127,7 +128,7 @@ func TestIngesterWAL(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -152,7 +153,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -198,7 +199,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
require.NoError(t, err)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down Expand Up @@ -258,7 +259,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand All @@ -279,7 +280,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester, ensuring we replayed from WAL.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -303,7 +304,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand All @@ -324,7 +325,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// restart the ingester, ensuring we can replay from the checkpoint as well.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down Expand Up @@ -455,7 +456,7 @@ func Test_SeriesIterator(t *testing.T) {
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

for i := 0; i < 3; i++ {
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, noopWAL{}, NilMetrics, nil)
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
instances = append(instances, inst)
Expand Down Expand Up @@ -505,7 +506,7 @@ func Benchmark_SeriesIterator(b *testing.B) {
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

for i := range instances {
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, noopWAL{}, NilMetrics, nil)
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, nil, noopWAL{}, NilMetrics, nil)

require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{
Expand Down
3 changes: 2 additions & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/runtime"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
Expand Down Expand Up @@ -257,7 +258,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore,
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

ing, err := New(cfg, client.Config{}, store, limits, nil)
ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))

Expand Down
11 changes: 7 additions & 4 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper"
errUtil "github.com/grafana/loki/pkg/util"
listutil "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -121,8 +122,9 @@ func (cfg *Config) Validate() error {
type Ingester struct {
services.Service

cfg Config
clientConfig client.Config
cfg Config
clientConfig client.Config
tenantConfigs *runtime.TenantConfigs

shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown
instancesMtx sync.RWMutex
Expand Down Expand Up @@ -168,7 +170,7 @@ type ChunkStore interface {
}

// New makes a new Ingester.
func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
Expand All @@ -178,6 +180,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
tenantConfigs: configs,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
Expand Down Expand Up @@ -401,7 +404,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics, i.flushOnShutdownSwitch)
inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch)
i.instances[instanceID] = inst
}
return inst
Expand Down
5 changes: 3 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)

Expand All @@ -37,7 +38,7 @@ func TestIngester(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}

i, err := New(ingesterConfig, client.Config{}, store, limits, nil)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

Expand Down Expand Up @@ -219,7 +220,7 @@ func TestIngesterStreamLimitExceeded(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}

i, err := New(ingesterConfig, client.Config{}, store, overrides, nil)
i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

Expand Down
Loading

0 comments on commit 8012362

Please sign in to comment.