Skip to content

Commit

Permalink
remove boltdb files from ingesters on startup which do not have a ind…
Browse files Browse the repository at this point in the history
…ex bucket (#3929)
  • Loading branch information
sandeepsukhani authored Jul 1, 2021
1 parent 71889dc commit 43c16ed
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/compactor/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error {

err = t.readFile(downloadAt)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error reading file", "err", err)
level.Error(util_log.Logger).Log("msg", fmt.Sprintf("error reading file %s", objectKey), "err", err)
return
}
case <-t.quit:
Expand Down
22 changes: 19 additions & 3 deletions pkg/storage/stores/shipper/uploads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,21 +476,37 @@ func loadBoltDBsFromDir(dir string) (map[string]*bbolt.DB, error) {
if fileInfo.IsDir() {
continue
}
fullPath := filepath.Join(dir, fileInfo.Name())

if strings.HasSuffix(fileInfo.Name(), tempFileSuffix) || strings.HasSuffix(fileInfo.Name(), snapshotFileSuffix) {
// If an ingester is killed abruptly in the middle of an upload operation it could leave out a temp file which holds the snapshot of db for uploading.
// Cleaning up those temp files to avoid problems.
if err := os.Remove(filepath.Join(dir, fileInfo.Name())); err != nil {
level.Error(util_log.Logger).Log("msg", "failed to remove temp file", "name", fileInfo.Name(), "err", err)
if err := os.Remove(fullPath); err != nil {
level.Error(util_log.Logger).Log("msg", fmt.Sprintf("failed to remove temp file %s", fullPath), "err", err)
}
continue
}

db, err := shipper_util.SafeOpenBoltdbFile(filepath.Join(dir, fileInfo.Name()))
db, err := shipper_util.SafeOpenBoltdbFile(fullPath)
if err != nil {
return nil, err
}

hasBucket := false
_ = db.View(func(tx *bbolt.Tx) error {
hasBucket = tx.Bucket(bucketName) != nil
return nil
})

if !hasBucket {
level.Info(util_log.Logger).Log("msg", fmt.Sprintf("file %s has no bucket named %s, so removing it", fullPath, bucketName))
_ = db.Close()
if err := os.Remove(fullPath); err != nil {
level.Error(util_log.Logger).Log("msg", fmt.Sprintf("failed to remove file %s without bucket", fullPath), "err", err)
}
continue
}

dbs[fileInfo.Name()] = db
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/stores/shipper/uploads/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,11 @@ func Test_LoadBoltDBsFromDir(t *testing.T) {
},
}, false)

// create a boltdb file without bucket which should get removed
db, err := local.OpenBoltdbFile(filepath.Join(tablePath, "no-bucket"))
require.NoError(t, err)
require.NoError(t, db.Close())

// try loading the dbs
dbs, err := loadBoltDBsFromDir(tablePath)
require.NoError(t, err)
Expand All @@ -360,6 +365,10 @@ func Test_LoadBoltDBsFromDir(t *testing.T) {
for _, boltdb := range dbs {
require.NoError(t, boltdb.Close())
}

filesInfo, err := ioutil.ReadDir(tablePath)
require.NoError(t, err)
require.Len(t, filesInfo, 2)
}

func TestTable_ImmutableUploads(t *testing.T) {
Expand Down

0 comments on commit 43c16ed

Please sign in to comment.