From f969192f06b8fa4c0bddea888636e7e15618eb5c Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 21 Jul 2021 17:52:34 +0530 Subject: [PATCH 1/4] use previously compacted file as seed file and copy index into it --- pkg/storage/stores/shipper/compactor/table.go | 68 +++++++++++--- .../stores/shipper/compactor/table_test.go | 92 ++++++++++++------- 2 files changed, 114 insertions(+), 46 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index 2973aa4278049..87a65355273b0 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "time" "github.com/cortexproject/cortex/pkg/chunk" @@ -120,12 +121,10 @@ func (t *table) compact(tableHasExpiredStreams bool) error { if err != nil { return err } - t.compactedDB, err = shipper_util.SafeOpenBoltdbFile(downloadAt) + t.compactedDB, err = openBoltdbFileWithNoSync(downloadAt) if err != nil { return err } - // no need to enforce write to disk, we'll upload and delete the file anyway. - t.compactedDB.NoSync = true } if t.compactedDB == nil { @@ -157,15 +156,23 @@ func (t *table) compact(tableHasExpiredStreams bool) error { func (t *table) compactFiles(objects []chunk.StorageObject) error { var err error - // create a new compacted db - t.compactedDB, err = shipper_util.SafeOpenBoltdbFile(filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix()))) + level.Info(util_log.Logger).Log("msg", "starting compaction of dbs") + + compactedDBName := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())) + seedFileIdx, err := findSeedObjectIdx(objects) + if err != nil { + return err + } + + err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objects[seedFileIdx].Key, compactedDBName, false) + if err != nil { + return err + } + + t.compactedDB, err = openBoltdbFileWithNoSync(compactedDBName) if err != nil { return err } - // no need to enforce write to disk, we'll upload and delete the file anyway. - // in case of failure we'll restart the whole process anyway. - t.compactedDB.NoSync = true - level.Info(util_log.Logger).Log("msg", "starting compaction of dbs") errChan := make(chan error) readObjectChan := make(chan string) @@ -220,7 +227,11 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error { // send all files to readObjectChan go func() { - for _, object := range objects { + for i, object := range objects { + // skip seed file + if i == seedFileIdx { + continue + } select { case readObjectChan <- object.Key: case <-t.quit: @@ -295,11 +306,11 @@ func (t *table) writeBatch(batch []indexEntry) error { func (t *table) readFile(path string) error { level.Debug(util_log.Logger).Log("msg", "reading file for compaction", "path", path) - db, err := shipper_util.SafeOpenBoltdbFile(path) + db, err := openBoltdbFileWithNoSync(path) if err != nil { return err } - db.NoSync = true + defer func() { if err := db.Close(); err != nil { level.Error(util_log.Logger).Log("msg", "failed to close db", "path", path, "err", err) @@ -406,3 +417,36 @@ func (t *table) removeObjectsFromStorage(objects []chunk.StorageObject) error { return nil } + +// openBoltdbFileWithNoSync opens a boltdb file and configures it to not sync the file to disk. +// Compaction process is idempotent and we do not retain the files so there is no need to sync them to disk. +func openBoltdbFileWithNoSync(path string) (*bbolt.DB, error) { + boltdb, err := shipper_util.SafeOpenBoltdbFile(path) + if err != nil { + return nil, err + } + + // no need to enforce write to disk, we'll upload and delete the file anyway. + boltdb.NoSync = true + + return boltdb, nil +} + +// findSeedObjectIdx returns index of object to use as seed which would then get index from all the files written to. +// It tries to find previously compacted file(which has uploaderName) which would be the biggest file. +// In a large cluster, using previously compacted file as seed would significantly reduce compaction time. +// If it can't find a previously compacted file, it would just use the first file from the list of files. +func findSeedObjectIdx(objects []chunk.StorageObject) (int, error) { + for i, object := range objects { + dbName, err := shipper_util.GetDBNameFromObjectKey(object.Key) + if err != nil { + return 0, err + } + + if strings.HasPrefix(dbName, uploaderName) { + return i, nil + } + } + + return 0, nil +} diff --git a/pkg/storage/stores/shipper/compactor/table_test.go b/pkg/storage/stores/shipper/compactor/table_test.go index 6942d0af0676e..65b3c91f1e330 100644 --- a/pkg/storage/stores/shipper/compactor/table_test.go +++ b/pkg/storage/stores/shipper/compactor/table_test.go @@ -24,51 +24,75 @@ const ( ) func TestTable_Compaction(t *testing.T) { - tempDir, err := ioutil.TempDir("", "table-compaction") - require.NoError(t, err) + for _, tc := range []struct { + name string + withCompactedFile bool + }{ + { + name: "without compacted file", + withCompactedFile: false, + }, + { + name: "with compacted file", + withCompactedFile: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + tempDir, err := ioutil.TempDir("", fmt.Sprintf("table-compaction-%v", tc.withCompactedFile)) + require.NoError(t, err) - defer func() { - require.NoError(t, os.RemoveAll(tempDir)) - }() + defer func() { + require.NoError(t, os.RemoveAll(tempDir)) + }() - objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) - tablePathInStorage := filepath.Join(objectStoragePath, tableName) - tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName) + objectStoragePath := filepath.Join(tempDir, objectsStorageDirName) + tablePathInStorage := filepath.Join(objectStoragePath, tableName) + tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName) - // setup some dbs - numDBs := compactMinDBs * 2 - numRecordsPerDB := 100 + // setup some dbs + numDBs := compactMinDBs * 2 + numRecordsPerDB := 100 - dbsToSetup := make(map[string]testutil.DBRecords) - for i := 0; i < numDBs; i++ { - dbsToSetup[fmt.Sprint(i)] = testutil.DBRecords{ - Start: i * numRecordsPerDB, - NumRecords: (i + 1) * numRecordsPerDB, - } - } + dbsToSetup := make(map[string]testutil.DBRecords) + for i := 0; i < numDBs; i++ { + dbsToSetup[fmt.Sprint(i)] = testutil.DBRecords{ + Start: i * numRecordsPerDB, + NumRecords: (i + 1) * numRecordsPerDB, + } + } - testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbsToSetup, true) + if tc.withCompactedFile { + // add a compacted file with some overlap with previously created dbs + dbsToSetup[fmt.Sprintf("%s-0", uploaderName)] = testutil.DBRecords{ + Start: (numDBs / 2) * numRecordsPerDB, + NumRecords: (numDBs + 10) * numRecordsPerDB, + } + } - // setup exact same copy of dbs for comparison. - testutil.SetupDBTablesAtPath(t, "test-copy", objectStoragePath, dbsToSetup, false) + testutil.SetupDBTablesAtPath(t, tableName, objectStoragePath, dbsToSetup, true) - // do the compaction - objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) - require.NoError(t, err) + // setup exact same copy of dbs for comparison. + testutil.SetupDBTablesAtPath(t, "test-copy", objectStoragePath, dbsToSetup, false) - table, err := newTable(context.Background(), tableWorkingDirectory, objectClient, false, nil) - require.NoError(t, err) + // do the compaction + objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath}) + require.NoError(t, err) - require.NoError(t, table.compact(false)) + table, err := newTable(context.Background(), tableWorkingDirectory, objectClient, false, nil) + require.NoError(t, err) - // verify that we have only 1 file left in storage after compaction. - files, err := ioutil.ReadDir(tablePathInStorage) - require.NoError(t, err) - require.Len(t, files, 1) - require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) + require.NoError(t, table.compact(false)) + + // verify that we have only 1 file left in storage after compaction. + files, err := ioutil.ReadDir(tablePathInStorage) + require.NoError(t, err) + require.Len(t, files, 1) + require.True(t, strings.HasSuffix(files[0].Name(), ".gz")) - // verify we have all the kvs in compacted db which were there in source dbs. - compareCompactedDB(t, filepath.Join(tablePathInStorage, files[0].Name()), filepath.Join(objectStoragePath, "test-copy")) + // verify we have all the kvs in compacted db which were there in source dbs. + compareCompactedDB(t, filepath.Join(tablePathInStorage, files[0].Name()), filepath.Join(objectStoragePath, "test-copy")) + }) + } } type TableMarkerFunc func(ctx context.Context, tableName string, db *bbolt.DB) (bool, int64, error) From f3e2552cf8ea13114d397949a97d976f69a04e8d Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 21 Jul 2021 18:17:15 +0530 Subject: [PATCH 2/4] allow configuring compactor to compact multiple tables at a time --- .../stores/shipper/compactor/compactor.go | 68 +++++++++++++++---- 1 file changed, 56 insertions(+), 12 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 5799438e5ec5a..3855050788080 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -40,6 +40,7 @@ type Config struct { RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"` RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"` DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` + MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` } // RegisterFlags registers flags. @@ -52,6 +53,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.") f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.") f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") + f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") } func (cfg *Config) IsDefaults() bool { @@ -61,6 +63,9 @@ func (cfg *Config) IsDefaults() bool { } func (cfg *Config) Validate() error { + if cfg.MaxCompactionParallelism < 1 { + return errors.New("max compaction parallelism must be >= 1") + } return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix) } @@ -246,23 +251,62 @@ func (c *Compactor) RunCompaction(ctx context.Context) error { tables[i] = strings.TrimSuffix(string(dir), delimiter) } - for _, tableName := range tables { - if tableName == deletion.DeleteRequestsTableName { - // we do not want to compact or apply retention on delete requests table - continue + compactTablesChan := make(chan string) + errChan := make(chan error) + + for i := 0; i < c.cfg.MaxCompactionParallelism; i++ { + go func() { + var err error + defer func() { + errChan <- err + }() + + for { + select { + case tableName, ok := <-compactTablesChan: + if !ok { + return + } + + err = c.CompactTable(ctx, tableName) + if err != nil { + return + } + case <-ctx.Done(): + return + } + } + }() + } + + go func() { + for _, tableName := range tables { + if tableName == deletion.DeleteRequestsTableName { + // we do not want to compact or apply retention on delete requests table + continue + } + + select { + case compactTablesChan <- tableName: + case <-ctx.Done(): + return + } } - if err := c.CompactTable(ctx, tableName); err != nil { + + close(compactTablesChan) + }() + + var firstErr error + // read all the errors + for i := 0; i < c.cfg.MaxCompactionParallelism; i++ { + err := <-errChan + if err != nil && firstErr == nil { status = statusFailure - } - // check if context was cancelled before going for next table. - select { - case <-ctx.Done(): - return nil - default: + firstErr = err } } - return nil + return firstErr } type expirationChecker struct { From 445d47e5fd2dec2ec720bf3ee18ab5ee8aa9ff7c Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 21 Jul 2021 18:27:07 +0530 Subject: [PATCH 3/4] logging improvements --- .../stores/shipper/compactor/compactor.go | 2 ++ pkg/storage/stores/shipper/compactor/table.go | 35 +++++++++++-------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/compactor.go b/pkg/storage/stores/shipper/compactor/compactor.go index 3855050788080..b26604ff1f53f 100644 --- a/pkg/storage/stores/shipper/compactor/compactor.go +++ b/pkg/storage/stores/shipper/compactor/compactor.go @@ -268,10 +268,12 @@ func (c *Compactor) RunCompaction(ctx context.Context) error { return } + level.Info(util_log.Logger).Log("msg", "compacting table", "table-name", tableName) err = c.CompactTable(ctx, tableName) if err != nil { return } + level.Info(util_log.Logger).Log("msg", "finished compacting table", "table-name", tableName) case <-ctx.Done(): return } diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index 87a65355273b0..dc11d6db43f9f 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -13,6 +13,7 @@ import ( chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" util_log "github.com/cortexproject/cortex/pkg/util/log" util_math "github.com/cortexproject/cortex/pkg/util/math" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "go.etcd.io/bbolt" @@ -43,6 +44,7 @@ type table struct { tableMarker retention.TableMarker compactedDB *bbolt.DB + logger log.Logger ctx context.Context quit chan struct{} @@ -63,6 +65,7 @@ func newTable(ctx context.Context, workingDirectory string, objectClient chunk.O applyRetention: applyRetention, tableMarker: tableMarker, } + table.logger = log.With(util_log.Logger, "table-name", table.name) return &table, nil } @@ -73,12 +76,12 @@ func (t *table) compact(tableHasExpiredStreams bool) error { return err } - level.Info(util_log.Logger).Log("msg", "listed files", "count", len(objects)) + level.Info(t.logger).Log("msg", "listed files", "count", len(objects)) defer func() { err := t.cleanup() if err != nil { - level.Error(util_log.Logger).Log("msg", "failed to cleanup table", "name", t.name) + level.Error(t.logger).Log("msg", "failed to cleanup table") } }() @@ -86,7 +89,7 @@ func (t *table) compact(tableHasExpiredStreams bool) error { if !applyRetention { if len(objects) < compactMinDBs { - level.Info(util_log.Logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(objects))) + level.Info(t.logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(objects))) return nil } if err := t.compactFiles(objects); err != nil { @@ -128,7 +131,7 @@ func (t *table) compact(tableHasExpiredStreams bool) error { } if t.compactedDB == nil { - level.Info(util_log.Logger).Log("msg", "skipping compaction no files found.") + level.Info(t.logger).Log("msg", "skipping compaction no files found.") return nil } @@ -156,7 +159,7 @@ func (t *table) compact(tableHasExpiredStreams bool) error { func (t *table) compactFiles(objects []chunk.StorageObject) error { var err error - level.Info(util_log.Logger).Log("msg", "starting compaction of dbs") + level.Info(t.logger).Log("msg", "starting compaction of dbs") compactedDBName := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())) seedFileIdx, err := findSeedObjectIdx(objects) @@ -164,6 +167,8 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error { return err } + level.Info(t.logger).Log("msg", fmt.Sprintf("using %s as seed file", objects[seedFileIdx].Key)) + err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objects[seedFileIdx].Key, compactedDBName, false) if err != nil { return err @@ -213,7 +218,7 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error { err = t.readFile(downloadAt) if err != nil { - level.Error(util_log.Logger).Log("msg", fmt.Sprintf("error reading file %s", objectKey), "err", err) + level.Error(t.logger).Log("msg", fmt.Sprintf("error reading file %s", objectKey), "err", err) return } case <-t.quit: @@ -241,7 +246,7 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error { } } - level.Debug(util_log.Logger).Log("msg", "closing readObjectChan") + level.Debug(t.logger).Log("msg", "closing readObjectChan") close(readObjectChan) }() @@ -268,7 +273,7 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error { default: } - level.Info(util_log.Logger).Log("msg", "finished compacting the dbs") + level.Info(t.logger).Log("msg", "finished compacting the dbs") return nil } @@ -304,7 +309,7 @@ func (t *table) writeBatch(batch []indexEntry) error { // readFile reads a boltdb file from a path and writes the index in batched mode to compactedDB func (t *table) readFile(path string) error { - level.Debug(util_log.Logger).Log("msg", "reading file for compaction", "path", path) + level.Debug(t.logger).Log("msg", "reading file for compaction", "path", path) db, err := openBoltdbFileWithNoSync(path) if err != nil { @@ -313,11 +318,11 @@ func (t *table) readFile(path string) error { defer func() { if err := db.Close(); err != nil { - level.Error(util_log.Logger).Log("msg", "failed to close db", "path", path, "err", err) + level.Error(t.logger).Log("msg", "failed to close db", "path", path, "err", err) } if err = os.Remove(path); err != nil { - level.Error(util_log.Logger).Log("msg", "failed to remove file", "path", path, "err", err) + level.Error(t.logger).Log("msg", "failed to remove file", "path", path, "err", err) } }() @@ -390,23 +395,23 @@ func (t *table) upload() error { defer func() { if err := compressedDB.Close(); err != nil { - level.Error(util_log.Logger).Log("msg", "failed to close file", "path", compactedDBPath, "err", err) + level.Error(t.logger).Log("msg", "failed to close file", "path", compactedDBPath, "err", err) } if err := os.Remove(compressedDBPath); err != nil { - level.Error(util_log.Logger).Log("msg", "failed to remove file", "path", compressedDBPath, "err", err) + level.Error(t.logger).Log("msg", "failed to remove file", "path", compressedDBPath, "err", err) } }() objectKey := fmt.Sprintf("%s.gz", shipper_util.BuildObjectKey(t.name, uploaderName, fmt.Sprint(time.Now().Unix()))) - level.Info(util_log.Logger).Log("msg", "uploading the compacted file", "objectKey", objectKey) + level.Info(t.logger).Log("msg", "uploading the compacted file", "objectKey", objectKey) return t.storageClient.PutObject(t.ctx, objectKey, compressedDB) } // removeObjectsFromStorage deletes objects from storage. func (t *table) removeObjectsFromStorage(objects []chunk.StorageObject) error { - level.Info(util_log.Logger).Log("msg", "removing source db files from storage", "count", len(objects)) + level.Info(t.logger).Log("msg", "removing source db files from storage", "count", len(objects)) for _, object := range objects { err := t.storageClient.DeleteObject(t.ctx, object.Key) From c6f1b1628b5941ee17f5908a1b48e71c13ce5b04 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 21 Jul 2021 18:47:19 +0530 Subject: [PATCH 4/4] fix broken test --- pkg/storage/stores/shipper/compactor/compactor_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/storage/stores/shipper/compactor/compactor_test.go b/pkg/storage/stores/shipper/compactor/compactor_test.go index e330d5b92592c..09d3611d464f6 100644 --- a/pkg/storage/stores/shipper/compactor/compactor_test.go +++ b/pkg/storage/stores/shipper/compactor/compactor_test.go @@ -50,6 +50,7 @@ func TestIsDefaults(t *testing.T) { RetentionDeleteDelay: 2 * time.Hour, RetentionDeleteWorkCount: 150, DeleteRequestCancelPeriod: 24 * time.Hour, + MaxCompactionParallelism: 1, }, true}, } { t.Run(fmt.Sprint(i), func(t *testing.T) {