Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compactor improvements #4018

Merged
merged 4 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 58 additions & 12 deletions pkg/storage/stores/shipper/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Config struct {
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
}

// RegisterFlags registers flags.
Expand All @@ -52,6 +53,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.")
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
f.DurationVar(&cfg.DeleteRequestCancelPeriod, "boltdb.shipper.compactor.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.")
f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
}

func (cfg *Config) IsDefaults() bool {
Expand All @@ -61,6 +63,9 @@ func (cfg *Config) IsDefaults() bool {
}

func (cfg *Config) Validate() error {
if cfg.MaxCompactionParallelism < 1 {
return errors.New("max compaction parallelism must be >= 1")
}
return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}

Expand Down Expand Up @@ -246,23 +251,64 @@ func (c *Compactor) RunCompaction(ctx context.Context) error {
tables[i] = strings.TrimSuffix(string(dir), delimiter)
}

for _, tableName := range tables {
if tableName == deletion.DeleteRequestsTableName {
// we do not want to compact or apply retention on delete requests table
continue
compactTablesChan := make(chan string)
errChan := make(chan error)

for i := 0; i < c.cfg.MaxCompactionParallelism; i++ {
go func() {
var err error
defer func() {
errChan <- err
}()

for {
select {
case tableName, ok := <-compactTablesChan:
if !ok {
return
}

level.Info(util_log.Logger).Log("msg", "compacting table", "table-name", tableName)
err = c.CompactTable(ctx, tableName)
if err != nil {
return
}
level.Info(util_log.Logger).Log("msg", "finished compacting table", "table-name", tableName)
case <-ctx.Done():
return
}
}
}()
}

go func() {
for _, tableName := range tables {
if tableName == deletion.DeleteRequestsTableName {
// we do not want to compact or apply retention on delete requests table
continue
}

select {
case compactTablesChan <- tableName:
case <-ctx.Done():
return
}
}
if err := c.CompactTable(ctx, tableName); err != nil {

close(compactTablesChan)
}()

var firstErr error
// read all the errors
for i := 0; i < c.cfg.MaxCompactionParallelism; i++ {
err := <-errChan
if err != nil && firstErr == nil {
status = statusFailure
}
// check if context was cancelled before going for next table.
select {
case <-ctx.Done():
return nil
default:
firstErr = err
}
}

return nil
return firstErr
}

type expirationChecker struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/stores/shipper/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func TestIsDefaults(t *testing.T) {
RetentionDeleteDelay: 2 * time.Hour,
RetentionDeleteWorkCount: 150,
DeleteRequestCancelPeriod: 24 * time.Hour,
MaxCompactionParallelism: 1,
}, true},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
Expand Down
101 changes: 75 additions & 26 deletions pkg/storage/stores/shipper/compactor/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
chunk_util "github.com/cortexproject/cortex/pkg/chunk/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
util_math "github.com/cortexproject/cortex/pkg/util/math"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.etcd.io/bbolt"

Expand Down Expand Up @@ -42,6 +44,7 @@ type table struct {
tableMarker retention.TableMarker

compactedDB *bbolt.DB
logger log.Logger

ctx context.Context
quit chan struct{}
Expand All @@ -62,6 +65,7 @@ func newTable(ctx context.Context, workingDirectory string, objectClient chunk.O
applyRetention: applyRetention,
tableMarker: tableMarker,
}
table.logger = log.With(util_log.Logger, "table-name", table.name)

return &table, nil
}
Expand All @@ -72,20 +76,20 @@ func (t *table) compact(tableHasExpiredStreams bool) error {
return err
}

level.Info(util_log.Logger).Log("msg", "listed files", "count", len(objects))
level.Info(t.logger).Log("msg", "listed files", "count", len(objects))

defer func() {
err := t.cleanup()
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to cleanup table", "name", t.name)
level.Error(t.logger).Log("msg", "failed to cleanup table")
}
}()

applyRetention := t.applyRetention && tableHasExpiredStreams

if !applyRetention {
if len(objects) < compactMinDBs {
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(objects)))
level.Info(t.logger).Log("msg", fmt.Sprintf("skipping compaction since we have just %d files in storage", len(objects)))
return nil
}
if err := t.compactFiles(objects); err != nil {
Expand Down Expand Up @@ -120,16 +124,14 @@ func (t *table) compact(tableHasExpiredStreams bool) error {
if err != nil {
return err
}
t.compactedDB, err = shipper_util.SafeOpenBoltdbFile(downloadAt)
t.compactedDB, err = openBoltdbFileWithNoSync(downloadAt)
if err != nil {
return err
}
// no need to enforce write to disk, we'll upload and delete the file anyway.
t.compactedDB.NoSync = true
}

if t.compactedDB == nil {
level.Info(util_log.Logger).Log("msg", "skipping compaction no files found.")
level.Info(t.logger).Log("msg", "skipping compaction no files found.")
return nil
}

Expand Down Expand Up @@ -157,15 +159,25 @@ func (t *table) compact(tableHasExpiredStreams bool) error {

func (t *table) compactFiles(objects []chunk.StorageObject) error {
var err error
// create a new compacted db
t.compactedDB, err = shipper_util.SafeOpenBoltdbFile(filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix())))
level.Info(t.logger).Log("msg", "starting compaction of dbs")

compactedDBName := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix()))
seedFileIdx, err := findSeedObjectIdx(objects)
if err != nil {
return err
}

level.Info(t.logger).Log("msg", fmt.Sprintf("using %s as seed file", objects[seedFileIdx].Key))

err = shipper_util.GetFileFromStorage(t.ctx, t.storageClient, objects[seedFileIdx].Key, compactedDBName, false)
if err != nil {
return err
}

t.compactedDB, err = openBoltdbFileWithNoSync(compactedDBName)
if err != nil {
return err
}
// no need to enforce write to disk, we'll upload and delete the file anyway.
// in case of failure we'll restart the whole process anyway.
t.compactedDB.NoSync = true
level.Info(util_log.Logger).Log("msg", "starting compaction of dbs")

errChan := make(chan error)
readObjectChan := make(chan string)
Expand Down Expand Up @@ -206,7 +218,7 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error {

err = t.readFile(downloadAt)
if err != nil {
level.Error(util_log.Logger).Log("msg", fmt.Sprintf("error reading file %s", objectKey), "err", err)
level.Error(t.logger).Log("msg", fmt.Sprintf("error reading file %s", objectKey), "err", err)
return
}
case <-t.quit:
Expand All @@ -220,7 +232,11 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error {

// send all files to readObjectChan
go func() {
for _, object := range objects {
for i, object := range objects {
// skip seed file
if i == seedFileIdx {
continue
}
select {
case readObjectChan <- object.Key:
case <-t.quit:
Expand All @@ -230,7 +246,7 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error {
}
}

level.Debug(util_log.Logger).Log("msg", "closing readObjectChan")
level.Debug(t.logger).Log("msg", "closing readObjectChan")

close(readObjectChan)
}()
Expand All @@ -257,7 +273,7 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error {
default:
}

level.Info(util_log.Logger).Log("msg", "finished compacting the dbs")
level.Info(t.logger).Log("msg", "finished compacting the dbs")
return nil
}

Expand Down Expand Up @@ -293,20 +309,20 @@ func (t *table) writeBatch(batch []indexEntry) error {

// readFile reads a boltdb file from a path and writes the index in batched mode to compactedDB
func (t *table) readFile(path string) error {
level.Debug(util_log.Logger).Log("msg", "reading file for compaction", "path", path)
level.Debug(t.logger).Log("msg", "reading file for compaction", "path", path)

db, err := shipper_util.SafeOpenBoltdbFile(path)
db, err := openBoltdbFileWithNoSync(path)
if err != nil {
return err
}
db.NoSync = true

defer func() {
if err := db.Close(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to close db", "path", path, "err", err)
level.Error(t.logger).Log("msg", "failed to close db", "path", path, "err", err)
}

if err = os.Remove(path); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to remove file", "path", path, "err", err)
level.Error(t.logger).Log("msg", "failed to remove file", "path", path, "err", err)
}
}()

Expand Down Expand Up @@ -379,23 +395,23 @@ func (t *table) upload() error {

defer func() {
if err := compressedDB.Close(); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to close file", "path", compactedDBPath, "err", err)
level.Error(t.logger).Log("msg", "failed to close file", "path", compactedDBPath, "err", err)
}

if err := os.Remove(compressedDBPath); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to remove file", "path", compressedDBPath, "err", err)
level.Error(t.logger).Log("msg", "failed to remove file", "path", compressedDBPath, "err", err)
}
}()

objectKey := fmt.Sprintf("%s.gz", shipper_util.BuildObjectKey(t.name, uploaderName, fmt.Sprint(time.Now().Unix())))
level.Info(util_log.Logger).Log("msg", "uploading the compacted file", "objectKey", objectKey)
level.Info(t.logger).Log("msg", "uploading the compacted file", "objectKey", objectKey)

return t.storageClient.PutObject(t.ctx, objectKey, compressedDB)
}

// removeObjectsFromStorage deletes objects from storage.
func (t *table) removeObjectsFromStorage(objects []chunk.StorageObject) error {
level.Info(util_log.Logger).Log("msg", "removing source db files from storage", "count", len(objects))
level.Info(t.logger).Log("msg", "removing source db files from storage", "count", len(objects))

for _, object := range objects {
err := t.storageClient.DeleteObject(t.ctx, object.Key)
Expand All @@ -406,3 +422,36 @@ func (t *table) removeObjectsFromStorage(objects []chunk.StorageObject) error {

return nil
}

// openBoltdbFileWithNoSync opens a boltdb file and configures it to not sync the file to disk.
// Compaction process is idempotent and we do not retain the files so there is no need to sync them to disk.
func openBoltdbFileWithNoSync(path string) (*bbolt.DB, error) {
boltdb, err := shipper_util.SafeOpenBoltdbFile(path)
if err != nil {
return nil, err
}

// no need to enforce write to disk, we'll upload and delete the file anyway.
boltdb.NoSync = true

return boltdb, nil
}

// findSeedObjectIdx returns index of object to use as seed which would then get index from all the files written to.
// It tries to find previously compacted file(which has uploaderName) which would be the biggest file.
// In a large cluster, using previously compacted file as seed would significantly reduce compaction time.
// If it can't find a previously compacted file, it would just use the first file from the list of files.
func findSeedObjectIdx(objects []chunk.StorageObject) (int, error) {
for i, object := range objects {
dbName, err := shipper_util.GetDBNameFromObjectKey(object.Key)
if err != nil {
return 0, err
}

if strings.HasPrefix(dbName, uploaderName) {
return i, nil
}
}

return 0, nil
}
Loading