Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate max_look_back_period in the chunk storage. #3677

Merged
merged 2 commits into from
May 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions cmd/migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type syncRange struct {
}

func main() {

var defaultsConfig loki.Config

from := flag.String("from", "", "Start Time RFC339Nano 2006-01-02T15:04:05.999999999Z07:00")
Expand Down Expand Up @@ -91,7 +90,7 @@ func main() {
}
// Create a new registerer to avoid registering duplicate metrics
prometheus.DefaultRegisterer = prometheus.NewRegistry()
sourceStore, err := cortex_storage.NewStore(sourceConfig.StorageConfig.Config, sourceConfig.ChunkStoreConfig, sourceConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
sourceStore, err := cortex_storage.NewStore(sourceConfig.StorageConfig.Config, sourceConfig.ChunkStoreConfig.StoreConfig, sourceConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
log.Println("Failed to create source store:", err)
os.Exit(1)
Expand All @@ -104,7 +103,7 @@ func main() {

// Create a new registerer to avoid registering duplicate metrics
prometheus.DefaultRegisterer = prometheus.NewRegistry()
destStore, err := cortex_storage.NewStore(destConfig.StorageConfig.Config, destConfig.ChunkStoreConfig, destConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
destStore, err := cortex_storage.NewStore(destConfig.StorageConfig.Config, destConfig.ChunkStoreConfig.StoreConfig, destConfig.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
log.Println("Failed to create destination store:", err)
os.Exit(1)
Expand Down Expand Up @@ -175,7 +174,7 @@ func main() {
errorChan := make(chan error)
statsChan := make(chan stats)

//Start the parallel processors
// Start the parallel processors
var wg sync.WaitGroup
cancelContext, cancelFunc := context.WithCancel(ctx)
for i := 0; i < *parallel; i++ {
Expand All @@ -195,7 +194,7 @@ func main() {
syncChan <- syncRanges[i]
i++
}
//Everything processed, exit
// Everything processed, exit
cancelFunc()
}()

Expand Down Expand Up @@ -229,16 +228,15 @@ func main() {
for {
time.Sleep(100 * time.Second)
}

}

func calcSyncRanges(from, to int64, shardBy int64) []*syncRange {
//Calculate the sync ranges
// Calculate the sync ranges
syncRanges := []*syncRange{}
//diff := to - from
//shards := diff / shardBy
// diff := to - from
// shards := diff / shardBy
currentFrom := from
//currentTo := from
// currentTo := from
currentTo := from + shardBy
for currentFrom < to && currentTo <= to {
s := &syncRange{
Expand Down Expand Up @@ -286,7 +284,6 @@ func newChunkMover(ctx context.Context, source, dest storage.Store, sourceUser,
}

func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh <-chan *syncRange, errCh chan<- error, statsCh chan<- stats) {

for {
select {
case <-ctx.Done():
Expand All @@ -306,7 +303,7 @@ func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh <
for i, f := range fetchers {
log.Printf("%v Processing Schema %v which contains %v chunks\n", threadID, i, len(schemaGroups[i]))

//Slice up into batches
// Slice up into batches
for j := 0; j < len(schemaGroups[i]); j += m.batch {
k := j + m.batch
if k > len(schemaGroups[i]) {
Expand Down Expand Up @@ -394,9 +391,7 @@ func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh <
}

func mustParse(t string) time.Time {

ret, err := time.Parse(time.RFC3339Nano, t)

if err != nil {
log.Fatalf("Unable to parse time %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
return err
}

chunkStore, err := cortex_storage.NewStore(conf.StorageConfig.Config, conf.ChunkStoreConfig, conf.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
chunkStore, err := cortex_storage.NewStore(conf.StorageConfig.Config, conf.ChunkStoreConfig.StoreConfig, conf.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
return err
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Config struct {
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
ChunkStoreConfig chunk.StoreConfig `yaml:"chunk_store_config,omitempty"`
ChunkStoreConfig storage.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig storage.SchemaConfig `yaml:"schema_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
Expand Down Expand Up @@ -138,6 +138,13 @@ func (c *Config) Validate() error {
if err := c.CompactorConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid compactor config")
}
if err := c.ChunkStoreConfig.Validate(util_log.Logger); err != nil {
return errors.Wrap(err, "invalid chunk store config")
}
// TODO(cyriltovena): remove when MaxLookBackPeriod in the storage will be fully deprecated.
if c.ChunkStoreConfig.MaxLookBackPeriod > 0 {
c.LimitsConfig.MaxQueryLookback = c.ChunkStoreConfig.MaxLookBackPeriod
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
}
}

chunkStore, err := cortex_storage.NewStore(t.Cfg.StorageConfig.Config, t.Cfg.ChunkStoreConfig, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer, nil, util_log.Logger)
chunkStore, err := cortex_storage.NewStore(t.Cfg.StorageConfig.Config, t.Cfg.ChunkStoreConfig.StoreConfig, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
return
}
Expand Down
60 changes: 39 additions & 21 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"net/http"
"time"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/util/spanlogger"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"

"github.com/grafana/loki/pkg/iter"
Expand All @@ -29,6 +31,8 @@ const (
tailerWaitEntryThrottle = time.Second / 2
)

var nowFunc = func() time.Time { return time.Now() }

type interval struct {
start, end time.Time
}
Expand Down Expand Up @@ -83,7 +87,8 @@ func (q *Querier) SetQueryable(queryable logql.Querier) {

// Select Implements logql.Querier which select logs via matchers and regex filters.
func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) {
err := q.validateQueryRequest(ctx, params)
var err error
params.Start, params.End, err = q.validateQueryRequest(ctx, params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -125,7 +130,8 @@ func (q *Querier) SelectLogs(ctx context.Context, params logql.SelectLogParams)
}

func (q *Querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) {
err := q.validateQueryRequest(ctx, params)
var err error
params.Start, params.End, err = q.validateQueryRequest(ctx, params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -247,7 +253,7 @@ func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logpr
return nil, err
}

if err = q.validateQueryTimeRange(userID, *req.Start, *req.End); err != nil {
if *req.Start, *req.End, err = validateQueryTimeRangeLimits(ctx, userID, q.limits, *req.Start, *req.End); err != nil {
return nil, err
}

Expand Down Expand Up @@ -303,7 +309,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
},
}

err = q.validateQueryRequest(ctx, histReq)
histReq.Start, histReq.End, err = q.validateQueryRequest(ctx, histReq)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -348,7 +354,7 @@ func (q *Querier) Series(ctx context.Context, req *logproto.SeriesRequest) (*log
return nil, err
}

if err = q.validateQueryTimeRange(userID, req.Start, req.End); err != nil {
if req.Start, req.End, err = validateQueryTimeRangeLimits(ctx, userID, q.limits, req.Start, req.End); err != nil {
return nil, err
}

Expand All @@ -357,11 +363,9 @@ func (q *Querier) Series(ctx context.Context, req *logproto.SeriesRequest) (*log
defer cancel()

return q.awaitSeries(ctx, req)

}

func (q *Querier) awaitSeries(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {

// buffer the channels to the # of calls they're expecting su
series := make(chan [][]logproto.SeriesIdentifier, 2)
errs := make(chan error, 2)
Expand Down Expand Up @@ -465,38 +469,52 @@ func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time,
return ids, nil
}

func (q *Querier) validateQueryRequest(ctx context.Context, req logql.QueryParams) error {
func (q *Querier) validateQueryRequest(ctx context.Context, req logql.QueryParams) (time.Time, time.Time, error) {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return err
return time.Time{}, time.Time{}, err
}

selector, err := req.LogSelector()
if err != nil {
return err
return time.Time{}, time.Time{}, err
}
matchers := selector.Matchers()

maxStreamMatchersPerQuery := q.limits.MaxStreamsMatchersPerQuery(userID)
if len(matchers) > maxStreamMatchersPerQuery {
return httpgrpc.Errorf(http.StatusBadRequest,
return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest,
"max streams matchers per query exceeded, matchers-count > limit (%d > %d)", len(matchers), maxStreamMatchersPerQuery)
}

return q.validateQueryTimeRange(userID, req.GetStart(), req.GetEnd())
return validateQueryTimeRangeLimits(ctx, userID, q.limits, req.GetStart(), req.GetEnd())
}

func (q *Querier) validateQueryTimeRange(userID string, from time.Time, through time.Time) error {
if (through).Before(from) {
return httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", through, from)
}
type timeRangeLimits interface {
MaxQueryLookback(string) time.Duration
MaxQueryLength(string) time.Duration
}

maxQueryLength := q.limits.MaxQueryLength(userID)
if maxQueryLength > 0 && (through).Sub(from) > maxQueryLength {
return httpgrpc.Errorf(http.StatusBadRequest, cortex_validation.ErrQueryTooLong, (through).Sub(from), maxQueryLength)
}
func validateQueryTimeRangeLimits(ctx context.Context, userID string, limits timeRangeLimits, from, through time.Time) (time.Time, time.Time, error) {
now := nowFunc()
// Clamp the time range based on the max query lookback.
if maxQueryLookback := limits.MaxQueryLookback(userID); maxQueryLookback > 0 && from.Before(now.Add(-maxQueryLookback)) {
origStartTime := from
from = now.Add(-maxQueryLookback)

return nil
level.Debug(spanlogger.FromContext(ctx)).Log(
"msg", "the start time of the query has been manipulated because of the 'max query lookback' setting",
"original", origStartTime,
"updated", from)

}
if maxQueryLength := limits.MaxQueryLength(userID); maxQueryLength > 0 && (through).Sub(from) > maxQueryLength {
return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, cortex_validation.ErrQueryTooLong, (through).Sub(from), maxQueryLength)
}
if through.Before(from) {
return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", through, from)
}
return from, through, nil
}

func (q *Querier) checkTailRequestLimit(ctx context.Context) error {
Expand Down
59 changes: 44 additions & 15 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,21 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/validation"

"github.com/grafana/loki/pkg/storage"

"github.com/grafana/loki/pkg/logql"

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/flagext"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/validation"
)

const (
Expand Down Expand Up @@ -326,7 +321,6 @@ func TestQuerier_SeriesAPI(t *testing.T) {
{Labels: map[string]string{"a": "1", "b": "2"}},
{Labels: map[string]string{"a": "1", "b": "3"}},
}, nil)

},
func(t *testing.T, q *Querier, req *logproto.SeriesRequest) {
ctx := user.InjectOrgID(context.Background(), "test")
Expand Down Expand Up @@ -365,7 +359,6 @@ func TestQuerier_SeriesAPI(t *testing.T) {
}

func TestQuerier_IngesterMaxQueryLookback(t *testing.T) {

limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

Expand Down Expand Up @@ -395,7 +388,6 @@ func TestQuerier_IngesterMaxQueryLookback(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {

req := logproto.QueryRequest{
Selector: `{app="foo"}`,
Limit: 1000,
Expand Down Expand Up @@ -437,7 +429,6 @@ func TestQuerier_IngesterMaxQueryLookback(t *testing.T) {
queryClient.AssertExpectations(t)
ingesterClient.AssertExpectations(t)
store.AssertExpectations(t)

})
}
}
Expand Down Expand Up @@ -691,7 +682,45 @@ func TestQuerier_buildQueryIntervals(t *testing.T) {
ingesterQueryInterval: ingesterQueryInterval,
storeQueryInterval: storeQueryInterval,
})
})
}
}

type fakeTimeLimits struct {
maxQueryLookback time.Duration
maxQueryLength time.Duration
}

func (f fakeTimeLimits) MaxQueryLookback(_ string) time.Duration { return f.maxQueryLookback }
func (f fakeTimeLimits) MaxQueryLength(_ string) time.Duration { return f.maxQueryLength }

func Test_validateQueryTimeRangeLimits(t *testing.T) {
now := time.Now()
nowFunc = func() time.Time { return now }
tests := []struct {
name string
limits timeRangeLimits
from time.Time
through time.Time
wantFrom time.Time
wantThrough time.Time
wantErr bool
}{
{"no change", fakeTimeLimits{1000 * time.Hour, 1000 * time.Hour}, now, now.Add(24 * time.Hour), now, now.Add(24 * time.Hour), false},
{"clamped to 24h", fakeTimeLimits{24 * time.Hour, 1000 * time.Hour}, now.Add(-48 * time.Hour), now, now.Add(-24 * time.Hour), now, false},
{"end before start", fakeTimeLimits{}, now, now.Add(-48 * time.Hour), time.Time{}, time.Time{}, true},
{"query too long", fakeTimeLimits{maxQueryLength: 24 * time.Hour}, now.Add(-48 * time.Hour), now, time.Time{}, time.Time{}, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
from, through, err := validateQueryTimeRangeLimits(context.Background(), "foo", tt.limits, tt.from, tt.through)
if tt.wantErr {
require.NotNil(t, err)
} else {
require.Nil(t, err)
}
require.Equal(t, tt.wantFrom, from, "wanted (%s) got (%s)", tt.wantFrom, from)
require.Equal(t, tt.wantThrough, through)
})
}
}
Loading