Skip to content

Commit

Permalink
per user index changes on the write path (#5193)
Browse files Browse the repository at this point in the history
* per user index changes on the write path

- add a flag to enable building per user index from the ingesters
- when enabled, ingesters would build boltdb files with buckets per tenant
  • Loading branch information
sandeepsukhani authored Jan 24, 2022
1 parent 61ff29a commit d87f2a1
Show file tree
Hide file tree
Showing 18 changed files with 263 additions and 197 deletions.
21 changes: 13 additions & 8 deletions pkg/storage/chunk/local/boltdb_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
)

var (
defaultBucketName = []byte("index")
ErrUnexistentBoltDB = errors.New("boltdb file does not exist")
IndexBucketName = []byte("index")
ErrUnexistentBoltDB = errors.New("boltdb file does not exist")
ErrEmptyIndexBucketName = errors.New("empty index bucket name")
)

const (
Expand Down Expand Up @@ -170,11 +171,11 @@ func (b *BoltIndexClient) GetDB(name string, operation int) (*bbolt.DB, error) {
return db, nil
}

func (b *BoltIndexClient) WriteToDB(ctx context.Context, db *bbolt.DB, bucketName []byte, writes TableWrites) error {
func (b *BoltIndexClient) WriteToDB(_ context.Context, db *bbolt.DB, bucketName []byte, writes TableWrites) error {
return db.Update(func(tx *bbolt.Tx) error {
var b *bbolt.Bucket
if len(bucketName) == 0 {
bucketName = defaultBucketName
return ErrEmptyIndexBucketName
}

// a bucket should already exist for deletes, for other writes we create one otherwise.
Expand Down Expand Up @@ -214,7 +215,7 @@ func (b *BoltIndexClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch
return err
}

err = b.WriteToDB(ctx, db, nil, writes)
err = b.WriteToDB(ctx, db, IndexBucketName, writes)
if err != nil {
return err
}
Expand All @@ -237,12 +238,16 @@ func (b *BoltIndexClient) query(ctx context.Context, query chunk.IndexQuery, cal
return err
}

return b.QueryDB(ctx, db, query, callback)
return b.QueryDB(ctx, db, IndexBucketName, query, callback)
}

func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, bucketName []byte, query chunk.IndexQuery,
callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
return db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(defaultBucketName)
if len(bucketName) == 0 {
return ErrEmptyIndexBucketName
}
bucket := tx.Bucket(bucketName)
if bucket == nil {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/chunk/local/boltdb_index_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func setupDB(t *testing.T, boltdbIndexClient *BoltIndexClient, dbname string) {
require.NoError(t, err)

err = db.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(defaultBucketName)
b, err := tx.CreateBucketIfNotExists(IndexBucketName)
if err != nil {
return err
}
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestBoltDBReload(t *testing.T) {

valueFromDb := []byte{}
_ = droppedDb.View(func(tx *bbolt.Tx) error {
b := tx.Bucket(defaultBucketName)
b := tx.Bucket(IndexBucketName)
valueFromDb = b.Get(testKey)
return nil
})
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestBoltDB_Writes(t *testing.T) {
{
name: "deletes without initial writes",
testDeletes: []string{"1", "2"},
err: fmt.Errorf("bucket %s not found in table 3", defaultBucketName),
err: fmt.Errorf("bucket %s not found in table 3", IndexBucketName),
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (t *deleteRequestsTable) BatchWrite(ctx context.Context, batch chunk.WriteB
}

for _, tableWrites := range boltWriteBatch.Writes {
if err := t.boltdbIndexClient.WriteToDB(ctx, t.db, nil, tableWrites); err != nil {
if err := t.boltdbIndexClient.WriteToDB(ctx, t.db, local.IndexBucketName, tableWrites); err != nil {
return err
}
}
Expand All @@ -182,7 +182,7 @@ func (t *deleteRequestsTable) BatchWrite(ctx context.Context, batch chunk.WriteB

func (t *deleteRequestsTable) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error {
for _, query := range queries {
if err := t.boltdbIndexClient.QueryDB(ctx, t.db, query, callback); err != nil {
if err := t.boltdbIndexClient.QueryDB(ctx, t.db, local.IndexBucketName, query, callback); err != nil {
return err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestDeleteRequestsTable(t *testing.T) {
require.NoError(t, testDeleteRequestsTable.BatchWrite(context.Background(), batch))

// see if right records were written
testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, testDeleteRequestsTable.db, testDeleteRequestsTable.boltdbIndexClient, 0, 10)
testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, testDeleteRequestsTable.db, local.IndexBucketName, testDeleteRequestsTable.boltdbIndexClient, 0, 10)

// upload the file to the storage
require.NoError(t, testDeleteRequestsTable.uploadFile())
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestDeleteRequestsTable(t *testing.T) {
require.NotEmpty(t, testDeleteRequestsTable.dbPath)

// validate records in local db
testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, testDeleteRequestsTable.db, testDeleteRequestsTable.boltdbIndexClient, 0, 20)
testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, testDeleteRequestsTable.db, local.IndexBucketName, testDeleteRequestsTable.boltdbIndexClient, 0, 20)
}

func checkRecordsInStorage(t *testing.T, storageFilePath string, start, numRecords int) {
Expand All @@ -104,5 +104,5 @@ func checkRecordsInStorage(t *testing.T, storageFilePath string, start, numRecor

defer boltdbIndexClient.Stop()

testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, tempDB, boltdbIndexClient, start, numRecords)
testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, tempDB, local.IndexBucketName, boltdbIndexClient, start, numRecords)
}
3 changes: 2 additions & 1 deletion pkg/storage/stores/shipper/compactor/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/log/level"
"go.etcd.io/bbolt"

"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/util"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
Expand Down Expand Up @@ -217,7 +218,7 @@ func (is *indexSet) writeBatch(_ string, batch []indexEntry) error {
is.uploadCompactedDB = true
is.removeSourceObjects = true
return is.compactedDB.Batch(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(bucketName)
b, err := tx.CreateBucketIfNotExists(local.IndexBucketName)
if err != nil {
return err
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/storage/stores/shipper/compactor/retention/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.etcd.io/bbolt"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/local"
)

func Test_ChunkIterator(t *testing.T) {
Expand All @@ -32,7 +33,7 @@ func Test_ChunkIterator(t *testing.T) {
require.Len(t, tables, 1)
var actual []ChunkEntry
err := tables[0].DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(bucketName), tt.config)
it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config)
require.NoError(t, err)
for it.Next() {
require.NoError(t, it.Err())
Expand All @@ -53,7 +54,7 @@ func Test_ChunkIterator(t *testing.T) {
// second pass we delete c2
actual = actual[:0]
err = tables[0].DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(bucketName), tt.config)
it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config)
require.NoError(t, err)
for it.Next() {
actual = append(actual, it.Entry())
Expand Down Expand Up @@ -88,7 +89,7 @@ func Test_SeriesCleaner(t *testing.T) {
require.Len(t, tables, 1)
// remove c1, c2 chunk
err := tables[0].DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(bucketName), tt.config)
it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config)
require.NoError(t, err)
for it.Next() {
require.NoError(t, it.Err())
Expand All @@ -101,7 +102,7 @@ func Test_SeriesCleaner(t *testing.T) {
require.NoError(t, err)

err = tables[0].DB.Update(func(tx *bbolt.Tx) error {
cleaner := newSeriesCleaner(tx.Bucket(bucketName), tt.config, tables[0].name)
cleaner := newSeriesCleaner(tx.Bucket(local.IndexBucketName), tt.config, tables[0].name)
if err := cleaner.Cleanup(entryFromChunk(testSchema, c2).UserID, c2.Metric); err != nil {
return err
}
Expand All @@ -112,7 +113,7 @@ func Test_SeriesCleaner(t *testing.T) {
require.NoError(t, err)

err = tables[0].DB.View(func(tx *bbolt.Tx) error {
return tx.Bucket(bucketName).ForEach(func(k, _ []byte) error {
return tx.Bucket(local.IndexBucketName).ForEach(func(k, _ []byte) error {
c1SeriesID := entryFromChunk(testSchema, c1).SeriesID
c2SeriesID := entryFromChunk(testSchema, c2).SeriesID
series, ok, err := parseLabelIndexSeriesID(decodeKey(k))
Expand Down Expand Up @@ -169,7 +170,7 @@ func Benchmark_ChunkIterator(b *testing.B) {

var total int64
_ = store.indexTables()[0].Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
bucket := tx.Bucket(local.IndexBucketName)
for n := 0; n < b.N; n++ {
it, err := newChunkIndexIterator(bucket, allSchemas[0].config)
require.NoError(b, err)
Expand Down
8 changes: 3 additions & 5 deletions pkg/storage/stores/shipper/compactor/retention/retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/local"
util_log "github.com/grafana/loki/pkg/util/log"
)

var (
bucketName = []byte("index")
chunkBucket = []byte("chunks")
)
var chunkBucket = []byte("chunks")

const (
logMetricName = "logs"
Expand Down Expand Up @@ -87,7 +85,7 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB)

var empty, modified bool
err = db.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
bucket := tx.Bucket(local.IndexBucketName)
if bucket == nil {
return nil
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/storage/stores/shipper/compactor/retention/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/local"
"github.com/grafana/loki/pkg/storage/chunk/objectclient"
"github.com/grafana/loki/pkg/validation"
)
Expand Down Expand Up @@ -205,7 +206,7 @@ func Test_EmptyTable(t *testing.T) {
tables := store.indexTables()
require.Len(t, tables, 1)
err := tables[0].DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config)
it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config)
require.NoError(t, err)
empty, _, err := markforDelete(context.Background(), tables[0].name, noopWriter{}, it, noopCleaner{},
NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: 0}, "2": {retentionPeriod: 0}}}), nil)
Expand Down Expand Up @@ -366,7 +367,7 @@ func TestChunkRewriter(t *testing.T) {
chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder, schemaCfg.SchemaConfig)
for _, indexTable := range store.indexTables() {
err := indexTable.DB.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(bucketName)
bucket := tx.Bucket(local.IndexBucketName)
if bucket == nil {
return nil
}
Expand Down Expand Up @@ -645,10 +646,10 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) {
for i, table := range tables {
seriesCleanRecorder := newSeriesCleanRecorder()
err := table.DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config)
it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config)
require.NoError(t, err)

cr, err := newChunkRewriter(chunkClient, schema.config, table.name, tx.Bucket(bucketName))
cr, err := newChunkRewriter(chunkClient, schema.config, table.name, tx.Bucket(local.IndexBucketName))
require.NoError(t, err)
empty, isModified, err := markforDelete(context.Background(), table.name, noopWriter{}, it, seriesCleanRecorder,
expirationChecker, cr)
Expand Down Expand Up @@ -692,7 +693,7 @@ func TestMarkForDelete_DropChunkFromIndex(t *testing.T) {

for i, table := range tables {
err := table.DB.Update(func(tx *bbolt.Tx) error {
it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config)
it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config)
require.NoError(t, err)
empty, _, err := markforDelete(context.Background(), table.name, noopWriter{}, it, noopCleaner{},
NewExpirationChecker(fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: retentionPeriod}}}), nil)
Expand Down
11 changes: 5 additions & 6 deletions pkg/storage/stores/shipper/compactor/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/common/model"
"go.etcd.io/bbolt"

"github.com/grafana/loki/pkg/storage/chunk/local"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
Expand All @@ -37,8 +38,6 @@ const (
recreatedCompactedDBSuffix = ".r.gz"
)

var bucketName = []byte("index")

type indexEntry struct {
k, v []byte
}
Expand Down Expand Up @@ -292,18 +291,18 @@ func (t *table) compactFiles(files []storage.IndexFile) error {
}

// writeBatch writes a batch to compactedDB
func (t *table) writeBatch(userID string, batch []indexEntry) error {
if userID == string(bucketName) {
func (t *table) writeBatch(bucketName string, batch []indexEntry) error {
if bucketName == shipper_util.GetUnsafeString(local.IndexBucketName) {
return t.writeCommonIndex(batch)
}
return t.writeUserIndex(userID, batch)
return t.writeUserIndex(bucketName, batch)
}

// writeCommonIndex writes a batch to compactedDB
func (t *table) writeCommonIndex(batch []indexEntry) error {
t.uploadCompactedDB = true
return t.compactedDB.Batch(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(bucketName)
b, err := tx.CreateBucketIfNotExists(local.IndexBucketName)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/stores/shipper/downloads/index_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/spanlogger"

"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/local"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
Expand Down Expand Up @@ -183,7 +184,7 @@ func (t *indexSet) MultiQueries(ctx context.Context, queries []chunk.IndexQuery,
return err
}

userIDBytes := []byte(userID)
userIDBytes := shipper_util.GetUnsafeBytes(userID)

err = t.dbsMtx.rLock(ctx)
if err != nil {
Expand All @@ -204,7 +205,7 @@ func (t *indexSet) MultiQueries(ctx context.Context, queries []chunk.IndexQuery,
err := db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(userIDBytes)
if bucket == nil {
bucket = tx.Bucket(defaultBucketName)
bucket = tx.Bucket(local.IndexBucketName)
if bucket == nil {
return nil
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ const (
maxDownloadConcurrency = 50
)

var defaultBucketName = []byte("index")

type BoltDBIndexClient interface {
QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/storage/stores/shipper/shipper_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Config struct {
ResyncInterval time.Duration `yaml:"resync_interval"`
QueryReadyNumDays int `yaml:"query_ready_num_days"`
IndexGatewayClientConfig IndexGatewayClientConfig `yaml:"index_gateway_client"`
BuildPerTenantIndex bool `yaml:"build_per_tenant_index"`
IngesterName string `yaml:"-"`
Mode int `yaml:"-"`
IngesterDBRetainPeriod time.Duration `yaml:"-"`
Expand All @@ -78,6 +79,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries")
f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage")
f.IntVar(&cfg.QueryReadyNumDays, "boltdb.shipper.query-ready-num-days", 0, "Number of days of index to be kept downloaded for queries. Works only with tables created with 24h period.")
f.BoolVar(&cfg.BuildPerTenantIndex, "boltdb.shipper.build-per-tenant-index", false, "Build per tenant index files")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -134,10 +136,11 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R
}

cfg := uploads.Config{
Uploader: uploader,
IndexDir: s.cfg.ActiveIndexDirectory,
UploadInterval: UploadInterval,
DBRetainPeriod: s.cfg.IngesterDBRetainPeriod,
Uploader: uploader,
IndexDir: s.cfg.ActiveIndexDirectory,
UploadInterval: UploadInterval,
DBRetainPeriod: s.cfg.IngesterDBRetainPeriod,
MakePerTenantBuckets: s.cfg.BuildPerTenantIndex,
}
uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, indexStorageClient, registerer)
if err != nil {
Expand Down
Loading

0 comments on commit d87f2a1

Please sign in to comment.