diff --git a/pkg/storage/chunk/local/boltdb_index_client.go b/pkg/storage/chunk/local/boltdb_index_client.go index 1c39e5d68641d..78befb790504d 100644 --- a/pkg/storage/chunk/local/boltdb_index_client.go +++ b/pkg/storage/chunk/local/boltdb_index_client.go @@ -21,8 +21,9 @@ import ( ) var ( - defaultBucketName = []byte("index") - ErrUnexistentBoltDB = errors.New("boltdb file does not exist") + IndexBucketName = []byte("index") + ErrUnexistentBoltDB = errors.New("boltdb file does not exist") + ErrEmptyIndexBucketName = errors.New("empty index bucket name") ) const ( @@ -170,11 +171,11 @@ func (b *BoltIndexClient) GetDB(name string, operation int) (*bbolt.DB, error) { return db, nil } -func (b *BoltIndexClient) WriteToDB(ctx context.Context, db *bbolt.DB, bucketName []byte, writes TableWrites) error { +func (b *BoltIndexClient) WriteToDB(_ context.Context, db *bbolt.DB, bucketName []byte, writes TableWrites) error { return db.Update(func(tx *bbolt.Tx) error { var b *bbolt.Bucket if len(bucketName) == 0 { - bucketName = defaultBucketName + return ErrEmptyIndexBucketName } // a bucket should already exist for deletes, for other writes we create one otherwise. @@ -214,7 +215,7 @@ func (b *BoltIndexClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch return err } - err = b.WriteToDB(ctx, db, nil, writes) + err = b.WriteToDB(ctx, db, IndexBucketName, writes) if err != nil { return err } @@ -237,12 +238,16 @@ func (b *BoltIndexClient) query(ctx context.Context, query chunk.IndexQuery, cal return err } - return b.QueryDB(ctx, db, query, callback) + return b.QueryDB(ctx, db, IndexBucketName, query, callback) } -func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { +func (b *BoltIndexClient) QueryDB(ctx context.Context, db *bbolt.DB, bucketName []byte, query chunk.IndexQuery, + callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { return db.View(func(tx *bbolt.Tx) error { - bucket := tx.Bucket(defaultBucketName) + if len(bucketName) == 0 { + return ErrEmptyIndexBucketName + } + bucket := tx.Bucket(bucketName) if bucket == nil { return nil } diff --git a/pkg/storage/chunk/local/boltdb_index_client_test.go b/pkg/storage/chunk/local/boltdb_index_client_test.go index 0a9e97f738e89..54d8395dc179f 100644 --- a/pkg/storage/chunk/local/boltdb_index_client_test.go +++ b/pkg/storage/chunk/local/boltdb_index_client_test.go @@ -24,7 +24,7 @@ func setupDB(t *testing.T, boltdbIndexClient *BoltIndexClient, dbname string) { require.NoError(t, err) err = db.Update(func(tx *bbolt.Tx) error { - b, err := tx.CreateBucketIfNotExists(defaultBucketName) + b, err := tx.CreateBucketIfNotExists(IndexBucketName) if err != nil { return err } @@ -63,7 +63,7 @@ func TestBoltDBReload(t *testing.T) { valueFromDb := []byte{} _ = droppedDb.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucketName) + b := tx.Bucket(IndexBucketName) valueFromDb = b.Get(testKey) return nil }) @@ -207,7 +207,7 @@ func TestBoltDB_Writes(t *testing.T) { { name: "deletes without initial writes", testDeletes: []string{"1", "2"}, - err: fmt.Errorf("bucket %s not found in table 3", defaultBucketName), + err: fmt.Errorf("bucket %s not found in table 3", IndexBucketName), }, } { t.Run(tc.name, func(t *testing.T) { diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go index eba19abec4b59..f700b6a0b9fa9 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table.go @@ -172,7 +172,7 @@ func (t *deleteRequestsTable) BatchWrite(ctx context.Context, batch chunk.WriteB } for _, tableWrites := range boltWriteBatch.Writes { - if err := t.boltdbIndexClient.WriteToDB(ctx, t.db, nil, tableWrites); err != nil { + if err := t.boltdbIndexClient.WriteToDB(ctx, t.db, local.IndexBucketName, tableWrites); err != nil { return err } } @@ -182,7 +182,7 @@ func (t *deleteRequestsTable) BatchWrite(ctx context.Context, batch chunk.WriteB func (t *deleteRequestsTable) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error { for _, query := range queries { - if err := t.boltdbIndexClient.QueryDB(ctx, t.db, query, callback); err != nil { + if err := t.boltdbIndexClient.QueryDB(ctx, t.db, local.IndexBucketName, query, callback); err != nil { return err } } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go index 3e9e2e97cab67..10483895cc271 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_table_test.go @@ -45,7 +45,7 @@ func TestDeleteRequestsTable(t *testing.T) { require.NoError(t, testDeleteRequestsTable.BatchWrite(context.Background(), batch)) // see if right records were written - testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, testDeleteRequestsTable.db, testDeleteRequestsTable.boltdbIndexClient, 0, 10) + testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, testDeleteRequestsTable.db, local.IndexBucketName, testDeleteRequestsTable.boltdbIndexClient, 0, 10) // upload the file to the storage require.NoError(t, testDeleteRequestsTable.uploadFile()) @@ -78,7 +78,7 @@ func TestDeleteRequestsTable(t *testing.T) { require.NotEmpty(t, testDeleteRequestsTable.dbPath) // validate records in local db - testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, testDeleteRequestsTable.db, testDeleteRequestsTable.boltdbIndexClient, 0, 20) + testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, testDeleteRequestsTable.db, local.IndexBucketName, testDeleteRequestsTable.boltdbIndexClient, 0, 20) } func checkRecordsInStorage(t *testing.T, storageFilePath string, start, numRecords int) { @@ -104,5 +104,5 @@ func checkRecordsInStorage(t *testing.T, storageFilePath string, start, numRecor defer boltdbIndexClient.Stop() - testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, tempDB, boltdbIndexClient, start, numRecords) + testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, tempDB, local.IndexBucketName, boltdbIndexClient, start, numRecords) } diff --git a/pkg/storage/stores/shipper/compactor/index_set.go b/pkg/storage/stores/shipper/compactor/index_set.go index 8feab4b2fbbfe..dc5cd05dd213e 100644 --- a/pkg/storage/stores/shipper/compactor/index_set.go +++ b/pkg/storage/stores/shipper/compactor/index_set.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/log/level" "go.etcd.io/bbolt" + "github.com/grafana/loki/pkg/storage/chunk/local" "github.com/grafana/loki/pkg/storage/chunk/util" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/shipper/storage" @@ -217,7 +218,7 @@ func (is *indexSet) writeBatch(_ string, batch []indexEntry) error { is.uploadCompactedDB = true is.removeSourceObjects = true return is.compactedDB.Batch(func(tx *bbolt.Tx) error { - b, err := tx.CreateBucketIfNotExists(bucketName) + b, err := tx.CreateBucketIfNotExists(local.IndexBucketName) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/compactor/retention/iterator_test.go b/pkg/storage/stores/shipper/compactor/retention/iterator_test.go index 0f193a2a6b616..82b23fceb1311 100644 --- a/pkg/storage/stores/shipper/compactor/retention/iterator_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/iterator_test.go @@ -12,6 +12,7 @@ import ( "go.etcd.io/bbolt" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/local" ) func Test_ChunkIterator(t *testing.T) { @@ -32,7 +33,7 @@ func Test_ChunkIterator(t *testing.T) { require.Len(t, tables, 1) var actual []ChunkEntry err := tables[0].DB.Update(func(tx *bbolt.Tx) error { - it, err := newChunkIndexIterator(tx.Bucket(bucketName), tt.config) + it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config) require.NoError(t, err) for it.Next() { require.NoError(t, it.Err()) @@ -53,7 +54,7 @@ func Test_ChunkIterator(t *testing.T) { // second pass we delete c2 actual = actual[:0] err = tables[0].DB.Update(func(tx *bbolt.Tx) error { - it, err := newChunkIndexIterator(tx.Bucket(bucketName), tt.config) + it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config) require.NoError(t, err) for it.Next() { actual = append(actual, it.Entry()) @@ -88,7 +89,7 @@ func Test_SeriesCleaner(t *testing.T) { require.Len(t, tables, 1) // remove c1, c2 chunk err := tables[0].DB.Update(func(tx *bbolt.Tx) error { - it, err := newChunkIndexIterator(tx.Bucket(bucketName), tt.config) + it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), tt.config) require.NoError(t, err) for it.Next() { require.NoError(t, it.Err()) @@ -101,7 +102,7 @@ func Test_SeriesCleaner(t *testing.T) { require.NoError(t, err) err = tables[0].DB.Update(func(tx *bbolt.Tx) error { - cleaner := newSeriesCleaner(tx.Bucket(bucketName), tt.config, tables[0].name) + cleaner := newSeriesCleaner(tx.Bucket(local.IndexBucketName), tt.config, tables[0].name) if err := cleaner.Cleanup(entryFromChunk(testSchema, c2).UserID, c2.Metric); err != nil { return err } @@ -112,7 +113,7 @@ func Test_SeriesCleaner(t *testing.T) { require.NoError(t, err) err = tables[0].DB.View(func(tx *bbolt.Tx) error { - return tx.Bucket(bucketName).ForEach(func(k, _ []byte) error { + return tx.Bucket(local.IndexBucketName).ForEach(func(k, _ []byte) error { c1SeriesID := entryFromChunk(testSchema, c1).SeriesID c2SeriesID := entryFromChunk(testSchema, c2).SeriesID series, ok, err := parseLabelIndexSeriesID(decodeKey(k)) @@ -169,7 +170,7 @@ func Benchmark_ChunkIterator(b *testing.B) { var total int64 _ = store.indexTables()[0].Update(func(tx *bbolt.Tx) error { - bucket := tx.Bucket(bucketName) + bucket := tx.Bucket(local.IndexBucketName) for n := 0; n < b.N; n++ { it, err := newChunkIndexIterator(bucket, allSchemas[0].config) require.NoError(b, err) diff --git a/pkg/storage/stores/shipper/compactor/retention/retention.go b/pkg/storage/stores/shipper/compactor/retention/retention.go index 0b3db49e461fa..4da6447d12494 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention.go @@ -15,13 +15,11 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/local" util_log "github.com/grafana/loki/pkg/util/log" ) -var ( - bucketName = []byte("index") - chunkBucket = []byte("chunks") -) +var chunkBucket = []byte("chunks") const ( logMetricName = "logs" @@ -87,7 +85,7 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) var empty, modified bool err = db.Update(func(tx *bbolt.Tx) error { - bucket := tx.Bucket(bucketName) + bucket := tx.Bucket(local.IndexBucketName) if bucket == nil { return nil } diff --git a/pkg/storage/stores/shipper/compactor/retention/retention_test.go b/pkg/storage/stores/shipper/compactor/retention/retention_test.go index 62d47f8067501..9ee9a5daea76e 100644 --- a/pkg/storage/stores/shipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/shipper/compactor/retention/retention_test.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/local" "github.com/grafana/loki/pkg/storage/chunk/objectclient" "github.com/grafana/loki/pkg/validation" ) @@ -205,7 +206,7 @@ func Test_EmptyTable(t *testing.T) { tables := store.indexTables() require.Len(t, tables, 1) err := tables[0].DB.Update(func(tx *bbolt.Tx) error { - it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config) + it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config) require.NoError(t, err) empty, _, err := markforDelete(context.Background(), tables[0].name, noopWriter{}, it, noopCleaner{}, NewExpirationChecker(&fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: 0}, "2": {retentionPeriod: 0}}}), nil) @@ -366,7 +367,7 @@ func TestChunkRewriter(t *testing.T) { chunkClient := objectclient.NewClient(newTestObjectClient(store.chunkDir), objectclient.Base64Encoder, schemaCfg.SchemaConfig) for _, indexTable := range store.indexTables() { err := indexTable.DB.Update(func(tx *bbolt.Tx) error { - bucket := tx.Bucket(bucketName) + bucket := tx.Bucket(local.IndexBucketName) if bucket == nil { return nil } @@ -645,10 +646,10 @@ func TestMarkForDelete_SeriesCleanup(t *testing.T) { for i, table := range tables { seriesCleanRecorder := newSeriesCleanRecorder() err := table.DB.Update(func(tx *bbolt.Tx) error { - it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config) + it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config) require.NoError(t, err) - cr, err := newChunkRewriter(chunkClient, schema.config, table.name, tx.Bucket(bucketName)) + cr, err := newChunkRewriter(chunkClient, schema.config, table.name, tx.Bucket(local.IndexBucketName)) require.NoError(t, err) empty, isModified, err := markforDelete(context.Background(), table.name, noopWriter{}, it, seriesCleanRecorder, expirationChecker, cr) @@ -692,7 +693,7 @@ func TestMarkForDelete_DropChunkFromIndex(t *testing.T) { for i, table := range tables { err := table.DB.Update(func(tx *bbolt.Tx) error { - it, err := newChunkIndexIterator(tx.Bucket(bucketName), schema.config) + it, err := newChunkIndexIterator(tx.Bucket(local.IndexBucketName), schema.config) require.NoError(t, err) empty, _, err := markforDelete(context.Background(), table.name, noopWriter{}, it, noopCleaner{}, NewExpirationChecker(fakeLimits{perTenant: map[string]retentionLimit{"1": {retentionPeriod: retentionPeriod}}}), nil) diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index f65cd19fd73bf..b9ab02611c6d3 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -17,6 +17,7 @@ import ( "github.com/prometheus/common/model" "go.etcd.io/bbolt" + "github.com/grafana/loki/pkg/storage/chunk/local" chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" "github.com/grafana/loki/pkg/storage/stores/shipper/storage" @@ -37,8 +38,6 @@ const ( recreatedCompactedDBSuffix = ".r.gz" ) -var bucketName = []byte("index") - type indexEntry struct { k, v []byte } @@ -292,18 +291,18 @@ func (t *table) compactFiles(files []storage.IndexFile) error { } // writeBatch writes a batch to compactedDB -func (t *table) writeBatch(userID string, batch []indexEntry) error { - if userID == string(bucketName) { +func (t *table) writeBatch(bucketName string, batch []indexEntry) error { + if bucketName == shipper_util.GetUnsafeString(local.IndexBucketName) { return t.writeCommonIndex(batch) } - return t.writeUserIndex(userID, batch) + return t.writeUserIndex(bucketName, batch) } // writeCommonIndex writes a batch to compactedDB func (t *table) writeCommonIndex(batch []indexEntry) error { t.uploadCompactedDB = true return t.compactedDB.Batch(func(tx *bbolt.Tx) error { - b, err := tx.CreateBucketIfNotExists(bucketName) + b, err := tx.CreateBucketIfNotExists(local.IndexBucketName) if err != nil { return err } diff --git a/pkg/storage/stores/shipper/downloads/index_set.go b/pkg/storage/stores/shipper/downloads/index_set.go index 064351931d455..3a89e96a6ed40 100644 --- a/pkg/storage/stores/shipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/downloads/index_set.go @@ -19,6 +19,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/grafana/loki/pkg/storage/chunk" + "github.com/grafana/loki/pkg/storage/chunk/local" chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" "github.com/grafana/loki/pkg/storage/stores/shipper/storage" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" @@ -183,7 +184,7 @@ func (t *indexSet) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, return err } - userIDBytes := []byte(userID) + userIDBytes := shipper_util.GetUnsafeBytes(userID) err = t.dbsMtx.rLock(ctx) if err != nil { @@ -204,7 +205,7 @@ func (t *indexSet) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, err := db.View(func(tx *bbolt.Tx) error { bucket := tx.Bucket(userIDBytes) if bucket == nil { - bucket = tx.Bucket(defaultBucketName) + bucket = tx.Bucket(local.IndexBucketName) if bucket == nil { return nil } diff --git a/pkg/storage/stores/shipper/downloads/table.go b/pkg/storage/stores/shipper/downloads/table.go index 2e014b60a4709..3740d53f39674 100644 --- a/pkg/storage/stores/shipper/downloads/table.go +++ b/pkg/storage/stores/shipper/downloads/table.go @@ -28,8 +28,6 @@ const ( maxDownloadConcurrency = 50 ) -var defaultBucketName = []byte("index") - type BoltDBIndexClient interface { QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error } diff --git a/pkg/storage/stores/shipper/shipper_index_client.go b/pkg/storage/stores/shipper/shipper_index_client.go index 6ae977c3b84f3..0b8604108e173 100644 --- a/pkg/storage/stores/shipper/shipper_index_client.go +++ b/pkg/storage/stores/shipper/shipper_index_client.go @@ -62,6 +62,7 @@ type Config struct { ResyncInterval time.Duration `yaml:"resync_interval"` QueryReadyNumDays int `yaml:"query_ready_num_days"` IndexGatewayClientConfig IndexGatewayClientConfig `yaml:"index_gateway_client"` + BuildPerTenantIndex bool `yaml:"build_per_tenant_index"` IngesterName string `yaml:"-"` Mode int `yaml:"-"` IngesterDBRetainPeriod time.Duration `yaml:"-"` @@ -78,6 +79,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.CacheTTL, "boltdb.shipper.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries") f.DurationVar(&cfg.ResyncInterval, "boltdb.shipper.resync-interval", 5*time.Minute, "Resync downloaded files with the storage") f.IntVar(&cfg.QueryReadyNumDays, "boltdb.shipper.query-ready-num-days", 0, "Number of days of index to be kept downloaded for queries. Works only with tables created with 24h period.") + f.BoolVar(&cfg.BuildPerTenantIndex, "boltdb.shipper.build-per-tenant-index", false, "Build per tenant index files") } func (cfg *Config) Validate() error { @@ -134,10 +136,11 @@ func (s *Shipper) init(storageClient chunk.ObjectClient, registerer prometheus.R } cfg := uploads.Config{ - Uploader: uploader, - IndexDir: s.cfg.ActiveIndexDirectory, - UploadInterval: UploadInterval, - DBRetainPeriod: s.cfg.IngesterDBRetainPeriod, + Uploader: uploader, + IndexDir: s.cfg.ActiveIndexDirectory, + UploadInterval: UploadInterval, + DBRetainPeriod: s.cfg.IngesterDBRetainPeriod, + MakePerTenantBuckets: s.cfg.BuildPerTenantIndex, } uploadsManager, err := uploads.NewTableManager(cfg, s.boltDBIndexClient, indexStorageClient, registerer) if err != nil { diff --git a/pkg/storage/stores/shipper/testutil/testutil.go b/pkg/storage/stores/shipper/testutil/testutil.go index cf2b68f6b7658..838f9d75021f2 100644 --- a/pkg/storage/stores/shipper/testutil/testutil.go +++ b/pkg/storage/stores/shipper/testutil/testutil.go @@ -20,8 +20,6 @@ import ( chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" ) -var defaultBucketName = []byte("index") - func AddRecordsToDB(t *testing.T, path string, dbClient *local.BoltIndexClient, start, numRecords int, bucketName []byte) { t.Helper() db, err := local.OpenBoltdbFile(path) @@ -31,7 +29,7 @@ func AddRecordsToDB(t *testing.T, path string, dbClient *local.BoltIndexClient, AddRecordsToBatch(batch, "test", start, numRecords) if len(bucketName) == 0 { - bucketName = defaultBucketName + bucketName = local.IndexBucketName } require.NoError(t, dbClient.WriteToDB(context.Background(), db, bucketName, batch.(*local.BoltWriteBatch).Writes["test"])) @@ -64,16 +62,16 @@ func TestSingleTableQuery(t *testing.T, userID string, queries []chunk.IndexQuer } type SingleDBQuerier interface { - QueryDB(ctx context.Context, db *bbolt.DB, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error + QueryDB(ctx context.Context, db *bbolt.DB, bucketName []byte, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error } -func TestSingleDBQuery(t *testing.T, query chunk.IndexQuery, db *bbolt.DB, querier SingleDBQuerier, start, numRecords int) { +func TestSingleDBQuery(t *testing.T, query chunk.IndexQuery, db *bbolt.DB, bucketName []byte, querier SingleDBQuerier, start, numRecords int) { t.Helper() minValue := start maxValue := start + numRecords fetchedRecords := make(map[string]string) - err := querier.QueryDB(context.Background(), db, query, makeTestCallback(t, minValue, maxValue, fetchedRecords)) + err := querier.QueryDB(context.Background(), db, bucketName, query, makeTestCallback(t, minValue, maxValue, fetchedRecords)) require.NoError(t, err) require.Len(t, fetchedRecords, numRecords) @@ -271,7 +269,7 @@ func SetupTable(t *testing.T, path string, commonDBsConfig DBsConfig, perUserDBs } } - SetupDBsAtPath(t, path, commonDBsWithDefaultBucket, true, defaultBucketName) + SetupDBsAtPath(t, path, commonDBsWithDefaultBucket, true, local.IndexBucketName) for dbName, userRecords := range commonDBsWithPerUserBucket { for userID, dbRecords := range userRecords { @@ -282,7 +280,7 @@ func SetupTable(t *testing.T, path string, commonDBsConfig DBsConfig, perUserDBs } for userID, dbRecords := range perUserDBs { - SetupDBsAtPath(t, filepath.Join(path, userID), dbRecords, true, defaultBucketName) + SetupDBsAtPath(t, filepath.Join(path, userID), dbRecords, true, local.IndexBucketName) } } diff --git a/pkg/storage/stores/shipper/uploads/table.go b/pkg/storage/stores/shipper/uploads/table.go index ebad7afe5b864..055dab88c9283 100644 --- a/pkg/storage/stores/shipper/uploads/table.go +++ b/pkg/storage/stores/shipper/uploads/table.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/go-kit/log/level" "go.etcd.io/bbolt" @@ -35,8 +36,6 @@ const ( snapshotFileSuffix = ".snapshot" ) -var bucketName = []byte("index") - type BoltDBIndexClient interface { QueryWithCursor(_ context.Context, c *bbolt.Cursor, query chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) (shouldContinue bool)) error WriteToDB(ctx context.Context, db *bbolt.DB, bucketName []byte, writes local.TableWrites) error @@ -54,11 +53,12 @@ type dbSnapshot struct { // Table is a collection of multiple dbs created for a same table by the ingester. // All the public methods are concurrency safe and take care of mutexes to avoid any data race. type Table struct { - name string - path string - uploader string - storageClient StorageClient - boltdbIndexClient BoltDBIndexClient + name string + path string + uploader string + storageClient StorageClient + boltdbIndexClient BoltDBIndexClient + makePerTenantBuckets bool dbs map[string]*bbolt.DB dbsMtx sync.RWMutex @@ -72,17 +72,18 @@ type Table struct { } // NewTable create a new Table without looking for any existing local dbs belonging to the table. -func NewTable(path, uploader string, storageClient StorageClient, boltdbIndexClient BoltDBIndexClient) (*Table, error) { +func NewTable(path, uploader string, storageClient StorageClient, boltdbIndexClient BoltDBIndexClient, makePerTenantBuckets bool) (*Table, error) { err := chunk_util.EnsureDirectory(path) if err != nil { return nil, err } - return newTableWithDBs(map[string]*bbolt.DB{}, path, uploader, storageClient, boltdbIndexClient) + return newTableWithDBs(map[string]*bbolt.DB{}, path, uploader, storageClient, boltdbIndexClient, makePerTenantBuckets) } // LoadTable loads local dbs belonging to the table and creates a new Table with references to dbs if there are any otherwise it doesn't create a table -func LoadTable(path, uploader string, storageClient StorageClient, boltdbIndexClient BoltDBIndexClient, metrics *metrics) (*Table, error) { +func LoadTable(path, uploader string, storageClient StorageClient, boltdbIndexClient BoltDBIndexClient, + makePerTenantBuckets bool, metrics *metrics) (*Table, error) { dbs, err := loadBoltDBsFromDir(path, metrics) if err != nil { return nil, err @@ -92,20 +93,22 @@ func LoadTable(path, uploader string, storageClient StorageClient, boltdbIndexCl return nil, nil } - return newTableWithDBs(dbs, path, uploader, storageClient, boltdbIndexClient) + return newTableWithDBs(dbs, path, uploader, storageClient, boltdbIndexClient, makePerTenantBuckets) } -func newTableWithDBs(dbs map[string]*bbolt.DB, path, uploader string, storageClient StorageClient, boltdbIndexClient BoltDBIndexClient) (*Table, error) { +func newTableWithDBs(dbs map[string]*bbolt.DB, path, uploader string, storageClient StorageClient, boltdbIndexClient BoltDBIndexClient, + makePerTenantBuckets bool) (*Table, error) { return &Table{ - name: filepath.Base(path), - path: path, - uploader: uploader, - storageClient: storageClient, - boltdbIndexClient: boltdbIndexClient, - dbs: dbs, - dbSnapshots: map[string]*dbSnapshot{}, - dbUploadTime: map[string]time.Time{}, - modifyShardsSince: time.Now().Unix(), + name: filepath.Base(path), + path: path, + uploader: uploader, + storageClient: storageClient, + boltdbIndexClient: boltdbIndexClient, + dbs: dbs, + dbSnapshots: map[string]*dbSnapshot{}, + dbUploadTime: map[string]time.Time{}, + modifyShardsSince: time.Now().Unix(), + makePerTenantBuckets: makePerTenantBuckets, }, nil } @@ -189,11 +192,21 @@ func (lt *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, c lt.dbSnapshotsMtx.RLock() defer lt.dbSnapshotsMtx.RUnlock() + userID, err := tenant.TenantID(ctx) + if err != nil { + return err + } + + userIDBytes := shipper_util.GetUnsafeBytes(userID) + for _, db := range lt.dbSnapshots { err := db.boltdb.View(func(tx *bbolt.Tx) error { - bucket := tx.Bucket(bucketName) + bucket := tx.Bucket(userIDBytes) if bucket == nil { - return nil + bucket = tx.Bucket(local.IndexBucketName) + if bucket == nil { + return nil + } } for _, query := range queries { @@ -244,6 +257,16 @@ func (lt *Table) Write(ctx context.Context, writes local.TableWrites) error { // db files are named after the time shard i.e epoch of the truncated time. // If a db file does not exist for a shard it gets created. func (lt *Table) write(ctx context.Context, tm time.Time, writes local.TableWrites) error { + writeToBucket := local.IndexBucketName + if lt.makePerTenantBuckets { + userID, err := tenant.TenantID(ctx) + if err != nil { + return err + } + + writeToBucket = []byte(userID) + } + // do not write to files older than init time otherwise we might endup modifying file which was already created and uploaded before last shutdown. shard := tm.Truncate(ShardDBsByDuration).Unix() if shard < lt.modifyShardsSince { @@ -255,7 +278,7 @@ func (lt *Table) write(ctx context.Context, tm time.Time, writes local.TableWrit return err } - return lt.boltdbIndexClient.WriteToDB(ctx, db, bucketName, writes) + return lt.boltdbIndexClient.WriteToDB(ctx, db, writeToBucket, writes) } // Stop closes all the open dbs. @@ -495,15 +518,17 @@ func loadBoltDBsFromDir(dir string, metrics *metrics) (map[string]*bbolt.DB, err hasBucket := false _ = db.View(func(tx *bbolt.Tx) error { - hasBucket = tx.Bucket(bucketName) != nil - return nil + return tx.ForEach(func(_ []byte, _ *bbolt.Bucket) error { + hasBucket = true + return nil + }) }) if !hasBucket { - level.Info(util_log.Logger).Log("msg", fmt.Sprintf("file %s has no bucket named %s, so removing it", fullPath, bucketName)) + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("file %s has no buckets, so removing it", fullPath)) _ = db.Close() if err := os.Remove(fullPath); err != nil { - level.Error(util_log.Logger).Log("msg", fmt.Sprintf("failed to remove file %s without bucket", fullPath), "err", err) + level.Error(util_log.Logger).Log("msg", fmt.Sprintf("failed to remove file %s without any buckets", fullPath), "err", err) } continue } diff --git a/pkg/storage/stores/shipper/uploads/table_manager.go b/pkg/storage/stores/shipper/uploads/table_manager.go index 8494a69036850..7f0a87a4d2269 100644 --- a/pkg/storage/stores/shipper/uploads/table_manager.go +++ b/pkg/storage/stores/shipper/uploads/table_manager.go @@ -22,10 +22,11 @@ import ( ) type Config struct { - Uploader string - IndexDir string - UploadInterval time.Duration - DBRetainPeriod time.Duration + Uploader string + IndexDir string + UploadInterval time.Duration + DBRetainPeriod time.Duration + MakePerTenantBuckets bool } type TableManager struct { @@ -148,7 +149,8 @@ func (tm *TableManager) getOrCreateTable(tableName string) (*Table, error) { table, ok = tm.tables[tableName] if !ok { var err error - table, err = NewTable(filepath.Join(tm.cfg.IndexDir, tableName), tm.cfg.Uploader, tm.storageClient, tm.boltIndexClient) + table, err = NewTable(filepath.Join(tm.cfg.IndexDir, tableName), tm.cfg.Uploader, tm.storageClient, + tm.boltIndexClient, tm.cfg.MakePerTenantBuckets) if err != nil { return nil, err } @@ -235,7 +237,8 @@ func (tm *TableManager) loadTables() (map[string]*Table, error) { } level.Info(util_log.Logger).Log("msg", fmt.Sprintf("loading table %s", fileInfo.Name())) - table, err := LoadTable(filepath.Join(tm.cfg.IndexDir, fileInfo.Name()), tm.cfg.Uploader, tm.storageClient, tm.boltIndexClient, tm.metrics) + table, err := LoadTable(filepath.Join(tm.cfg.IndexDir, fileInfo.Name()), tm.cfg.Uploader, tm.storageClient, + tm.boltIndexClient, tm.cfg.MakePerTenantBuckets, tm.metrics) if err != nil { return nil, err } diff --git a/pkg/storage/stores/shipper/uploads/table_test.go b/pkg/storage/stores/shipper/uploads/table_test.go index 9443d66569b16..8dcdaa8a8ec9d 100644 --- a/pkg/storage/stores/shipper/uploads/table_test.go +++ b/pkg/storage/stores/shipper/uploads/table_test.go @@ -14,6 +14,7 @@ import ( "github.com/klauspost/compress/gzip" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/storage/chunk" "github.com/grafana/loki/pkg/storage/chunk/local" @@ -42,11 +43,11 @@ func buildTestClients(t *testing.T, path string) (*local.BoltIndexClient, Storag type stopFunc func() -func buildTestTable(t *testing.T, path string) (*Table, *local.BoltIndexClient, stopFunc) { +func buildTestTable(t *testing.T, path string, makePerTenantBuckets bool) (*Table, *local.BoltIndexClient, stopFunc) { boltDBIndexClient, fsObjectClient := buildTestClients(t, path) indexPath := filepath.Join(path, indexDirName) - table, err := NewTable(indexPath, "test", fsObjectClient, boltDBIndexClient) + table, err := NewTable(indexPath, "test", fsObjectClient, boltDBIndexClient, makePerTenantBuckets) require.NoError(t, err) return table, boltDBIndexClient, func() { @@ -70,8 +71,9 @@ func TestLoadTable(t *testing.T) { boltDBIndexClient.Stop() }() - // setup some dbs for a table at a path. - tablePath := testutil.SetupDBsAtPath(t, filepath.Join(indexPath, "test-table"), map[string]testutil.DBRecords{ + // setup some dbs with default bucket and per tenant bucket for a table at a path. + tablePath := filepath.Join(indexPath, "test-table") + testutil.SetupDBsAtPath(t, tablePath, map[string]testutil.DBRecords{ "db1": { Start: 0, NumRecords: 10, @@ -81,6 +83,16 @@ func TestLoadTable(t *testing.T) { NumRecords: 10, }, }, false, nil) + testutil.SetupDBsAtPath(t, tablePath, map[string]testutil.DBRecords{ + "db3": { + Start: 20, + NumRecords: 10, + }, + "db4": { + Start: 30, + NumRecords: 10, + }, + }, false, []byte(userID)) // change a boltdb file to text file which would fail to open. invalidFilePath := filepath.Join(tablePath, "invalid") @@ -91,7 +103,7 @@ func TestLoadTable(t *testing.T) { require.Error(t, err) // try loading the table. - table, err := LoadTable(tablePath, "test", nil, boltDBIndexClient, newMetrics(nil)) + table, err := LoadTable(tablePath, "test", nil, boltDBIndexClient, false, newMetrics(nil)) require.NoError(t, err) require.NotNil(t, table) @@ -99,119 +111,120 @@ func TestLoadTable(t *testing.T) { table.Stop() }() - // verify that we still have 3 files(2 valid, 1 invalid) + // verify that we still have 5 files(4 valid, 1 invalid) filesInfo, err := ioutil.ReadDir(tablePath) require.NoError(t, err) - require.Len(t, filesInfo, 3) + require.Len(t, filesInfo, 5) require.NoError(t, table.Snapshot()) // query the loaded table to see if it has right data. - testutil.TestSingleTableQuery(t, userID, []chunk.IndexQuery{{}}, table, 0, 20) + testutil.TestSingleTableQuery(t, userID, []chunk.IndexQuery{{}}, table, 0, 40) } func TestTable_Write(t *testing.T) { - tempDir, err := ioutil.TempDir("", "table-writes") - require.NoError(t, err) - - table, boltIndexClient, stopFunc := buildTestTable(t, tempDir) - - defer func() { - stopFunc() - require.NoError(t, os.RemoveAll(tempDir)) - }() - - now := time.Now() - - // allow modifying last 5 shards - table.modifyShardsSince = now.Add(-5 * ShardDBsByDuration).Unix() - - // a couple of times for which we want to do writes to make the table create different shards - testCases := []struct { - writeTime time.Time - dbName string // set only when it is supposed to be written to a different name than usual - }{ - { - writeTime: now, - }, - { - writeTime: now.Add(-(ShardDBsByDuration + 5*time.Minute)), - }, - { - writeTime: now.Add(-(ShardDBsByDuration*3 + 3*time.Minute)), - }, - { - writeTime: now.Add(-6 * ShardDBsByDuration), // write with time older than table.modifyShardsSince - dbName: fmt.Sprint(table.modifyShardsSince), - }, - } - - numFiles := 0 - - // performing writes and checking whether the index gets written to right shard - for i, tc := range testCases { - t.Run(fmt.Sprint(i), func(t *testing.T) { - batch := boltIndexClient.NewWriteBatch() - testutil.AddRecordsToBatch(batch, "test", i*10, 10) - require.NoError(t, table.write(context.Background(), tc.writeTime, batch.(*local.BoltWriteBatch).Writes["test"])) - - numFiles++ - require.Equal(t, numFiles, len(table.dbs)) - - expectedDBName := tc.dbName - if expectedDBName == "" { - expectedDBName = fmt.Sprint(tc.writeTime.Truncate(ShardDBsByDuration).Unix()) + for _, withPerTenantBucket := range []bool{false, true} { + t.Run(fmt.Sprintf("withPerTenantBucket=%v", withPerTenantBucket), func(t *testing.T) { + tempDir := t.TempDir() + + table, boltIndexClient, stopFunc := buildTestTable(t, tempDir, withPerTenantBucket) + defer stopFunc() + + now := time.Now() + + // allow modifying last 5 shards + table.modifyShardsSince = now.Add(-5 * ShardDBsByDuration).Unix() + + // a couple of times for which we want to do writes to make the table create different shards + testCases := []struct { + writeTime time.Time + dbName string // set only when it is supposed to be written to a different name than usual + }{ + { + writeTime: now, + }, + { + writeTime: now.Add(-(ShardDBsByDuration + 5*time.Minute)), + }, + { + writeTime: now.Add(-(ShardDBsByDuration*3 + 3*time.Minute)), + }, + { + writeTime: now.Add(-6 * ShardDBsByDuration), // write with time older than table.modifyShardsSince + dbName: fmt.Sprint(table.modifyShardsSince), + }, } - db, ok := table.dbs[expectedDBName] - require.True(t, ok) - - require.NoError(t, table.Snapshot()) - // test that the table has current + previous records - testutil.TestSingleTableQuery(t, userID, []chunk.IndexQuery{{}}, table, 0, (i+1)*10) - testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, db, boltIndexClient, i*10, 10) + numFiles := 0 + + // performing writes and checking whether the index gets written to right shard + for i, tc := range testCases { + t.Run(fmt.Sprint(i), func(t *testing.T) { + batch := boltIndexClient.NewWriteBatch() + testutil.AddRecordsToBatch(batch, "test", i*10, 10) + require.NoError(t, table.write(user.InjectOrgID(context.Background(), userID), tc.writeTime, batch.(*local.BoltWriteBatch).Writes["test"])) + + numFiles++ + require.Equal(t, numFiles, len(table.dbs)) + + expectedDBName := tc.dbName + if expectedDBName == "" { + expectedDBName = fmt.Sprint(tc.writeTime.Truncate(ShardDBsByDuration).Unix()) + } + db, ok := table.dbs[expectedDBName] + require.True(t, ok) + + require.NoError(t, table.Snapshot()) + + // test that the table has current + previous records + testutil.TestSingleTableQuery(t, userID, []chunk.IndexQuery{{}}, table, 0, (i+1)*10) + bucketToQuery := local.IndexBucketName + if withPerTenantBucket { + bucketToQuery = []byte(userID) + } + testutil.TestSingleDBQuery(t, chunk.IndexQuery{}, db, bucketToQuery, boltIndexClient, i*10, 10) + }) + } }) } } func TestTable_Upload(t *testing.T) { - tempDir, err := ioutil.TempDir("", "upload") - require.NoError(t, err) + for _, withPerTenantBucket := range []bool{false, true} { + t.Run(fmt.Sprintf("withPerTenantBucket=%v", withPerTenantBucket), func(t *testing.T) { + tempDir := t.TempDir() - table, boltIndexClient, stopFunc := buildTestTable(t, tempDir) - require.NoError(t, err) + table, boltIndexClient, stopFunc := buildTestTable(t, tempDir, withPerTenantBucket) + defer stopFunc() - defer func() { - stopFunc() - require.NoError(t, os.RemoveAll(tempDir)) - }() + now := time.Now() - now := time.Now() + // write a batch for now + batch := boltIndexClient.NewWriteBatch() + testutil.AddRecordsToBatch(batch, "test", 0, 10) + require.NoError(t, table.write(user.InjectOrgID(context.Background(), userID), now, batch.(*local.BoltWriteBatch).Writes["test"])) - // write a batch for now - batch := boltIndexClient.NewWriteBatch() - testutil.AddRecordsToBatch(batch, "test", 0, 10) - require.NoError(t, table.write(context.Background(), now, batch.(*local.BoltWriteBatch).Writes["test"])) + // upload the table + require.NoError(t, table.Upload(context.Background(), true)) + require.Len(t, table.dbs, 1) - // upload the table - require.NoError(t, table.Upload(context.Background(), true)) - require.Len(t, table.dbs, 1) + // compare the local dbs for the table with the dbs in remote storage after upload to ensure they have same data + objectStorageDir := filepath.Join(tempDir, objectsStorageDirName) + compareTableWithStorage(t, table, objectStorageDir) - // compare the local dbs for the table with the dbs in remote storage after upload to ensure they have same data - objectStorageDir := filepath.Join(tempDir, objectsStorageDirName) - compareTableWithStorage(t, table, objectStorageDir) + // write a batch to another shard + batch = boltIndexClient.NewWriteBatch() + testutil.AddRecordsToBatch(batch, "test", 20, 10) + require.NoError(t, table.write(user.InjectOrgID(context.Background(), userID), now.Add(ShardDBsByDuration), batch.(*local.BoltWriteBatch).Writes["test"])) - // write a batch to another shard - batch = boltIndexClient.NewWriteBatch() - testutil.AddRecordsToBatch(batch, "test", 20, 10) - require.NoError(t, table.write(context.Background(), now.Add(ShardDBsByDuration), batch.(*local.BoltWriteBatch).Writes["test"])) + // upload the dbs to storage + require.NoError(t, table.Upload(context.Background(), true)) + require.Len(t, table.dbs, 2) - // upload the dbs to storage - require.NoError(t, table.Upload(context.Background(), true)) - require.Len(t, table.dbs, 2) - - // check local dbs with remote dbs to ensure they have same data - compareTableWithStorage(t, table, objectStorageDir) + // check local dbs with remote dbs to ensure they have same data + compareTableWithStorage(t, table, objectStorageDir) + }) + } } func compareTableWithStorage(t *testing.T, table *Table, storageDir string) { @@ -284,7 +297,7 @@ func TestTable_Cleanup(t *testing.T) { testutil.AddRecordsToDB(t, notUploaded, boltDBIndexClient, 20, 10, nil) // load existing dbs - table, err := LoadTable(indexPath, "test", storageClient, boltDBIndexClient, newMetrics(nil)) + table, err := LoadTable(indexPath, "test", storageClient, boltDBIndexClient, false, newMetrics(nil)) require.NoError(t, err) require.Len(t, table.dbs, 3) @@ -422,7 +435,7 @@ func TestTable_ImmutableUploads(t *testing.T) { tableName := "test-table" tablePath := testutil.SetupDBsAtPath(t, filepath.Join(indexPath, tableName), dbs, false, nil) - table, err := LoadTable(tablePath, "test", storageClient, boltDBIndexClient, newMetrics(nil)) + table, err := LoadTable(tablePath, "test", storageClient, boltDBIndexClient, false, newMetrics(nil)) require.NoError(t, err) require.NotNil(t, table) @@ -490,8 +503,11 @@ func TestTable_MultiQueries(t *testing.T) { boltDBIndexClient.Stop() }() - // setup some dbs for a table at a path. - tablePath := testutil.SetupDBsAtPath(t, filepath.Join(indexPath, "test-table"), map[string]testutil.DBRecords{ + user1, user2 := "user1", "user2" + + // setup some dbs with default bucket and per tenant bucket for a table at a path. + tablePath := filepath.Join(indexPath, "test-table") + testutil.SetupDBsAtPath(t, tablePath, map[string]testutil.DBRecords{ "db1": { Start: 0, NumRecords: 10, @@ -501,9 +517,19 @@ func TestTable_MultiQueries(t *testing.T) { NumRecords: 10, }, }, false, nil) + testutil.SetupDBsAtPath(t, tablePath, map[string]testutil.DBRecords{ + "db3": { + Start: 20, + NumRecords: 10, + }, + "db4": { + Start: 30, + NumRecords: 10, + }, + }, false, []byte(user1)) // try loading the table. - table, err := LoadTable(tablePath, "test", nil, boltDBIndexClient, newMetrics(nil)) + table, err := LoadTable(tablePath, "test", nil, boltDBIndexClient, false, newMetrics(nil)) require.NoError(t, err) require.NotNil(t, table) @@ -515,10 +541,13 @@ func TestTable_MultiQueries(t *testing.T) { // build queries each looking for specific value from all the dbs var queries []chunk.IndexQuery - for i := 5; i < 15; i++ { + for i := 5; i < 35; i++ { queries = append(queries, chunk.IndexQuery{ValueEqual: []byte(strconv.Itoa(i))}) } - // query the loaded table to see if it has right data. - testutil.TestSingleTableQuery(t, userID, queries, table, 5, 10) + // querying data for user1 should return both data from common index and user1's index + testutil.TestSingleTableQuery(t, user1, queries, table, 5, 30) + + // querying data for user2 should return only common index + testutil.TestSingleTableQuery(t, user2, queries, table, 5, 15) } diff --git a/pkg/storage/stores/shipper/util/queries.go b/pkg/storage/stores/shipper/util/queries.go index 561a338a1b00c..788bf27857637 100644 --- a/pkg/storage/stores/shipper/util/queries.go +++ b/pkg/storage/stores/shipper/util/queries.go @@ -3,7 +3,6 @@ package util import ( "context" "sync" - "unsafe" "github.com/grafana/loki/pkg/storage/chunk" chunk_util "github.com/grafana/loki/pkg/storage/chunk/util" @@ -84,7 +83,7 @@ func (i *IndexDeduper) isSeen(hashValue string, rangeValue []byte) bool { i.mtx.RLock() // index entries are never modified during query processing so it should be safe to reference a byte slice as a string. - rangeValueStr := yoloString(rangeValue) + rangeValueStr := GetUnsafeString(rangeValue) if _, ok := i.seenRangeValues[hashValue][rangeValueStr]; ok { i.mtx.RUnlock() @@ -144,7 +143,3 @@ func (f *filteringBatchIter) Next() bool { return false } - -func yoloString(buf []byte) string { - return *((*string)(unsafe.Pointer(&buf))) -} diff --git a/pkg/storage/stores/shipper/util/util.go b/pkg/storage/stores/shipper/util/util.go index 4e39a6a5bf262..daafc7bb3599e 100644 --- a/pkg/storage/stores/shipper/util/util.go +++ b/pkg/storage/stores/shipper/util/util.go @@ -9,6 +9,7 @@ import ( "runtime/debug" "strings" "sync" + "unsafe" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -247,3 +248,11 @@ func IsCompressedFile(filename string) bool { func LoggerWithFilename(logger log.Logger, filename string) log.Logger { return log.With(logger, "file-name", filename) } + +func GetUnsafeBytes(s string) []byte { + return *((*[]byte)(unsafe.Pointer(&s))) +} + +func GetUnsafeString(buf []byte) string { + return *((*string)(unsafe.Pointer(&buf))) +}