Skip to content

Commit

Permalink
Boltdb shipper query performance improvements (#2770)
Browse files Browse the repository at this point in the history
* boltdb-shipper query with cursor to do multiple queries in a single transaction

* index deduper

* use fifo cache in ingesters

* snapshot dbs in ingester for reads

* retain fewer files in ingesters for queries

* set timeout in cache config instead of index read cache config which saves us from unmarshalling the object for checking the expiry

* add comment about fifo cache invalidation

* tweaking a few comments

Co-authored-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
sandeepsukhani and slim-bean authored Oct 20, 2020
1 parent fe7aadf commit f60554c
Show file tree
Hide file tree
Showing 12 changed files with 556 additions and 109 deletions.
21 changes: 16 additions & 5 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,18 @@ func (t *Loki) initStore() (_ services.Service, err error) {
case Ingester:
// We do not want ingester to unnecessarily keep downloading files
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
// Do not cache index from Ingester.
t.cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{}
// Use fifo cache for caching index in memory.
t.cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{
EnableFifoCache: true,
Fifocache: cache.FifoCacheConfig{
MaxSizeBytes: "200 MB",
// We snapshot the index in ingesters every minute for reads so reduce the index cache validity by a minute.
// This is usually set in StorageConfig.IndexCacheValidity but since this is exclusively used for caching the index entries,
// I(Sandeep) am setting it here which also helps reduce some CPU cycles and allocations required for
// unmarshalling the cached data to check the expiry.
Validity: t.cfg.StorageConfig.IndexCacheValidity - 1*time.Minute,
},
}
case Querier:
// We do not want query to do any updates to index
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
Expand Down Expand Up @@ -293,7 +303,8 @@ func (t *Loki) initStore() (_ services.Service, err error) {
if t.cfg.SchemaConfig.Configs[boltdbShipperConfigIdx].IndexType != shipper.BoltDBShipperType {
boltdbShipperConfigIdx++
}
mlb, err := calculateMaxLookBack(t.cfg.SchemaConfig.Configs[boltdbShipperConfigIdx], t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge)
mlb, err := calculateMaxLookBack(t.cfg.SchemaConfig.Configs[boltdbShipperConfigIdx], t.cfg.Ingester.QueryStoreMaxLookBackPeriod,
t.cfg.Ingester.MaxChunkAge, t.cfg.StorageConfig.BoltDBShipperConfig.ResyncInterval)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -495,12 +506,12 @@ func (t *Loki) initCompactor() (services.Service, error) {
return t.compactor, nil
}

func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge time.Duration) (time.Duration, error) {
func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge, querierResyncInterval time.Duration) (time.Duration, error) {
if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 {
return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`")
}
// When using shipper, limit max look back for query to MaxChunkAge + upload interval by shipper + 15 mins to query only data whose index is not pushed yet
defaultMaxLookBack := maxChunkAge + shipper.UploadInterval + (15 * time.Minute)
defaultMaxLookBack := maxChunkAge + shipper.UploadInterval + querierResyncInterval + (15 * time.Minute)

if maxLookBackConfig == 0 {
// If the QueryStoreMaxLookBackPeriod is still it's default value of 0, set it to the default calculated value.
Expand Down
31 changes: 18 additions & 13 deletions pkg/loki/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (

func Test_calculateMaxLookBack(t *testing.T) {
type args struct {
pc chunk.PeriodConfig
maxLookBackConfig time.Duration
maxChunkAge time.Duration
pc chunk.PeriodConfig
maxLookBackConfig time.Duration
maxChunkAge time.Duration
querierBoltDBFilesResyncInterval time.Duration
}
tests := []struct {
name string
Expand All @@ -25,10 +26,11 @@ func Test_calculateMaxLookBack(t *testing.T) {
pc: chunk.PeriodConfig{
ObjectType: "filesystem",
},
maxLookBackConfig: 0,
maxChunkAge: 1 * time.Hour,
maxLookBackConfig: 0,
maxChunkAge: 1 * time.Hour,
querierBoltDBFilesResyncInterval: 5 * time.Minute,
},
want: 90 * time.Minute,
want: 81 * time.Minute,
wantErr: false,
},
{
Expand All @@ -37,8 +39,9 @@ func Test_calculateMaxLookBack(t *testing.T) {
pc: chunk.PeriodConfig{
ObjectType: "filesystem",
},
maxLookBackConfig: -1,
maxChunkAge: 1 * time.Hour,
maxLookBackConfig: -1,
maxChunkAge: 1 * time.Hour,
querierBoltDBFilesResyncInterval: 5 * time.Minute,
},
want: -1,
wantErr: false,
Expand All @@ -49,8 +52,9 @@ func Test_calculateMaxLookBack(t *testing.T) {
pc: chunk.PeriodConfig{
ObjectType: "gcs",
},
maxLookBackConfig: -1,
maxChunkAge: 1 * time.Hour,
maxLookBackConfig: -1,
maxChunkAge: 1 * time.Hour,
querierBoltDBFilesResyncInterval: 5 * time.Minute,
},
want: 0,
wantErr: true,
Expand All @@ -61,16 +65,17 @@ func Test_calculateMaxLookBack(t *testing.T) {
pc: chunk.PeriodConfig{
ObjectType: "filesystem",
},
maxLookBackConfig: 1 * time.Hour,
maxChunkAge: 1 * time.Hour,
maxLookBackConfig: 1 * time.Hour,
maxChunkAge: 1 * time.Hour,
querierBoltDBFilesResyncInterval: 5 * time.Minute,
},
want: 0,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := calculateMaxLookBack(tt.args.pc, tt.args.maxLookBackConfig, tt.args.maxChunkAge)
got, err := calculateMaxLookBack(tt.args.pc, tt.args.maxLookBackConfig, tt.args.maxChunkAge, tt.args.querierBoltDBFilesResyncInterval)
if (err != nil) != tt.wantErr {
t.Errorf("calculateMaxLookBack() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
20 changes: 18 additions & 2 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,8 +814,6 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
store, err := NewStore(config, schemaConfig, chunkStore, nil)
require.NoError(t, err)

defer store.Stop()

// time ranges adding a chunk for each store and a chunk which overlaps both the stores
chunksToBuildForTimeRanges := []timeRange{
{
Expand Down Expand Up @@ -846,6 +844,24 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) {
addedChunkIDs[chk.ExternalKey()] = struct{}{}
}

// recreate the store because boltdb-shipper now runs queriers on snapshots which are created every 1 min and during startup.
store.Stop()

chunkStore, err = storage.NewStore(
config.Config,
chunk.StoreConfig{},
schemaConfig.SchemaConfig,
limits,
nil,
nil,
cortex_util.Logger,
)

store, err = NewStore(config, schemaConfig, chunkStore, nil)
require.NoError(t, err)

defer store.Stop()

// get all the chunks from both the stores
chunks, err := store.Get(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName)...)
require.NoError(t, err)
Expand Down
28 changes: 24 additions & 4 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ const (
delimiter = "/"
)

var bucketName = []byte("index")

type BoltDBIndexClient interface {
QueryDB(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error
QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error
}

type StorageClient interface {
Expand Down Expand Up @@ -225,12 +227,30 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca

level.Debug(log).Log("table-name", t.name, "query-count", len(queries))

id := shipper_util.NewIndexDeduper(callback)

for name, db := range t.dbs {
for _, query := range queries {
if err := t.boltDBIndexClient.QueryDB(ctx, db.boltdb, query, callback); err != nil {
return err
err := db.boltdb.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
if bucket == nil {
return nil
}

for _, query := range queries {
if err := t.boltDBIndexClient.QueryWithCursor(ctx, bucket.Cursor(), query, func(query chunk.IndexQuery, batch chunk.ReadBatch) (shouldContinue bool) {
return id.Callback(query, batch)
}); err != nil {
return err
}
}

return nil
})

if err != nil {
return err
}

level.Debug(log).Log("queried-db", name)
}

Expand Down
50 changes: 50 additions & 0 deletions pkg/storage/stores/shipper/downloads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,53 @@ func TestTable_doParallelDownload(t *testing.T) {
})
}
}

func TestTable_DuplicateIndex(t *testing.T) {
tempDir, err := ioutil.TempDir("", "table-writes")
require.NoError(t, err)

defer func() {
require.NoError(t, os.RemoveAll(tempDir))
}()

objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)

testDBs := map[string]testutil.DBRecords{
"db1": {
Start: 0,
NumRecords: 10,
},
"duplicate_db1": {
Start: 0,
NumRecords: 10,
},
"db2": {
Start: 10,
NumRecords: 10,
},
"partially_duplicate_db2": {
Start: 10,
NumRecords: 5,
},
"db3": {
Start: 20,
NumRecords: 10,
},
}

testutil.SetupDBTablesAtPath(t, "test", objectStoragePath, testDBs, true)

table, _, stopFunc := buildTestTable(t, "test", tempDir)
defer func() {
stopFunc()
}()

// build queries each looking for specific value from all the dbs
var queries []chunk.IndexQuery
for i := 5; i < 25; i++ {
queries = append(queries, chunk.IndexQuery{ValueEqual: []byte(strconv.Itoa(i))})
}

// query the loaded table to see if it has right data.
testutil.TestSingleTableQuery(t, queries, table, 5, 20)
}
8 changes: 5 additions & 3 deletions pkg/storage/stores/shipper/shipper_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ const (

StorageKeyPrefix = "index/"

// UploadInterval defines interval for uploading active boltdb files from local which are being written to by ingesters.
UploadInterval = 15 * time.Minute
// UploadInterval defines interval for when we check if there are new index files to upload.
// It's also used to snapshot the currently written index tables so the snapshots can be used for reads.
UploadInterval = 1 * time.Minute
)

type boltDBIndexClient interface {
QueryDB(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error
QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error
NewWriteBatch() chunk.WriteBatch
WriteToDB(ctx context.Context, db *bbolt.DB, writes local.TableWrites) error
Stop()
Expand Down Expand Up @@ -124,6 +125,7 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R
Uploader: uploader,
IndexDir: s.cfg.ActiveIndexDirectory,
UploadInterval: UploadInterval,
DBRetainPeriod: s.cfg.ResyncInterval + 2*time.Minute,
}
uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, prefixedObjectClient, registerer)
if err != nil {
Expand Down
Loading

0 comments on commit f60554c

Please sign in to comment.