Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run store queries in ingester when using tsdb as index store #6209

Merged
merged 1 commit into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&cfg.SyncMinUtilization, "ingester.sync-min-utilization", 0, "Minimum utilization of chunk when doing synchronization.")
f.IntVar(&cfg.MaxReturnedErrors, "ingester.max-ignored-stream-errors", 10, "Maximum number of ignored stream errors to return. 0 to return all errors.")
f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", 2*time.Hour, "Maximum chunk age before flushing.")
f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.")
f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper/tsdb index and filesystem object store. -1 for infinite.")
f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester.autoforget-unhealthy", false, "Enable to remove unhealthy ingesters from the ring after `ring.kvstore.heartbeat_timeout`")
f.IntVar(&cfg.IndexShards, "ingester.index-shards", index.DefaultIndexShards, "Shard factor used in the ingesters for the in process reverse index. This MUST be evenly divisible by ALL schema shard factors or Loki will not start.")
f.IntVar(&cfg.MaxDroppedStreams, "ingester.tailer.max-dropped-streams", 10, "Maximum number of dropped streams to keep in memory during tailing")
Expand Down Expand Up @@ -646,39 +646,41 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
return sendSampleBatches(ctx, it, queryServer)
}

// boltdbShipperMaxLookBack returns a max look back period only if active index type is boltdb-shipper.
// max look back is limited to from time of boltdb-shipper config.
// It considers previous periodic config's from time if that also has index type set to boltdb-shipper.
func (i *Ingester) boltdbShipperMaxLookBack() time.Duration {
// asyncStoreMaxLookBack returns a max look back period only if active index type is one of async index stores like `boltdb-shipper` and `tsdb`.
// max look back is limited to from time of async store config.
// It considers previous periodic config's from time if that also has async index type.
// This is to limit the lookback to only async stores where relevant.
func (i *Ingester) asyncStoreMaxLookBack() time.Duration {
activePeriodicConfigIndex := config.ActivePeriodConfig(i.periodicConfigs)
activePeriodicConfig := i.periodicConfigs[activePeriodicConfigIndex]
if activePeriodicConfig.IndexType != config.BoltDBShipperType {
if activePeriodicConfig.IndexType != config.BoltDBShipperType && activePeriodicConfig.IndexType != config.TSDBType {
return 0
}

startTime := activePeriodicConfig.From
if activePeriodicConfigIndex != 0 && i.periodicConfigs[activePeriodicConfigIndex-1].IndexType == config.BoltDBShipperType {
if activePeriodicConfigIndex != 0 && (i.periodicConfigs[activePeriodicConfigIndex-1].IndexType == config.BoltDBShipperType ||
i.periodicConfigs[activePeriodicConfigIndex-1].IndexType == config.TSDBType) {
startTime = i.periodicConfigs[activePeriodicConfigIndex-1].From
}

maxLookBack := time.Since(startTime.Time.Time())
return maxLookBack
}

// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper.
// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper or tsdb.
func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) {
orgID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

boltdbShipperMaxLookBack := i.boltdbShipperMaxLookBack()
if boltdbShipperMaxLookBack == 0 {
asyncStoreMaxLookBack := i.asyncStoreMaxLookBack()
if asyncStoreMaxLookBack == 0 {
return &logproto.GetChunkIDsResponse{}, nil
}

reqStart := req.Start
reqStart = adjustQueryStartTime(boltdbShipperMaxLookBack, reqStart, time.Now())
reqStart = adjustQueryStartTime(asyncStoreMaxLookBack, reqStart, time.Now())

// parse the request
start, end := errUtil.RoundToMilliseconds(reqStart, req.End)
Expand Down Expand Up @@ -726,9 +728,9 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
return resp, nil
}

// Only continue if the active index type is boltdb-shipper or QueryStore flag is true.
boltdbShipperMaxLookBack := i.boltdbShipperMaxLookBack()
if boltdbShipperMaxLookBack == 0 && !i.cfg.QueryStore {
// Only continue if the active index type is one of async index store types or QueryStore flag is true.
asyncStoreMaxLookBack := i.asyncStoreMaxLookBack()
if asyncStoreMaxLookBack == 0 && !i.cfg.QueryStore {
return resp, nil
}

Expand All @@ -739,8 +741,8 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
}

maxLookBackPeriod := i.cfg.QueryStoreMaxLookBackPeriod
if boltdbShipperMaxLookBack != 0 {
maxLookBackPeriod = boltdbShipperMaxLookBack
if asyncStoreMaxLookBack != 0 {
maxLookBackPeriod = asyncStoreMaxLookBack
}
// Adjust the start time based on QueryStoreMaxLookBackPeriod.
start := adjustQueryStartTime(maxLookBackPeriod, *req.Start, time.Now())
Expand Down
26 changes: 18 additions & 8 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func TestIngester_buildStoreRequest(t *testing.T) {
}
}

func TestIngester_boltdbShipperMaxLookBack(t *testing.T) {
func TestIngester_asyncStoreMaxLookBack(t *testing.T) {
now := model.Now()

for _, tc := range []struct {
Expand All @@ -415,7 +415,7 @@ func TestIngester_boltdbShipperMaxLookBack(t *testing.T) {
expectedMaxLookBack time.Duration
}{
{
name: "not using boltdb-shipper",
name: "not using async index store",
periodicConfigs: []config.PeriodConfig{
{
From: config.DayTime{Time: now.Add(-24 * time.Hour)},
Expand All @@ -434,7 +434,17 @@ func TestIngester_boltdbShipperMaxLookBack(t *testing.T) {
expectedMaxLookBack: time.Since(now.Add(-24 * time.Hour).Time()),
},
{
name: "active config boltdb-shipper, previous config non boltdb-shipper",
name: "just one periodic config with tsdb",
periodicConfigs: []config.PeriodConfig{
{
From: config.DayTime{Time: now.Add(-24 * time.Hour)},
IndexType: "tsdb",
},
},
expectedMaxLookBack: time.Since(now.Add(-24 * time.Hour).Time()),
},
{
name: "active config boltdb-shipper, previous config non async index store",
periodicConfigs: []config.PeriodConfig{
{
From: config.DayTime{Time: now.Add(-48 * time.Hour)},
Expand All @@ -448,25 +458,25 @@ func TestIngester_boltdbShipperMaxLookBack(t *testing.T) {
expectedMaxLookBack: time.Since(now.Add(-24 * time.Hour).Time()),
},
{
name: "current and previous config both using boltdb-shipper",
name: "current and previous config both using async index store",
periodicConfigs: []config.PeriodConfig{
{
From: config.DayTime{Time: now.Add(-48 * time.Hour)},
IndexType: "boltdb-shipper",
},
{
From: config.DayTime{Time: now.Add(-24 * time.Hour)},
IndexType: "boltdb-shipper",
IndexType: "tsdb",
},
},
expectedMaxLookBack: time.Since(now.Add(-48 * time.Hour).Time()),
},
{
name: "active config non boltdb-shipper, previous config boltdb-shipper",
name: "active config non async index store, previous config tsdb",
periodicConfigs: []config.PeriodConfig{
{
From: config.DayTime{Time: now.Add(-48 * time.Hour)},
IndexType: "boltdb-shipper",
IndexType: "tsdb",
},
{
From: config.DayTime{Time: now.Add(-24 * time.Hour)},
Expand All @@ -477,7 +487,7 @@ func TestIngester_boltdbShipperMaxLookBack(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
ingester := Ingester{periodicConfigs: tc.periodicConfigs}
mlb := ingester.boltdbShipperMaxLookBack()
mlb := ingester.asyncStoreMaxLookBack()
require.InDelta(t, tc.expectedMaxLookBack, mlb, float64(time.Second))
})
}
Expand Down