From 43c16ed8a10ffc01cea17d0d178351d762760e4a Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 1 Jul 2021 18:14:52 +0530 Subject: [PATCH] remove boltdb files from ingesters on startup which do not have a index bucket (#3929) --- pkg/storage/stores/shipper/compactor/table.go | 2 +- pkg/storage/stores/shipper/uploads/table.go | 22 ++++++++++++++++--- .../stores/shipper/uploads/table_test.go | 9 ++++++++ 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/pkg/storage/stores/shipper/compactor/table.go b/pkg/storage/stores/shipper/compactor/table.go index a6c79e745a3b1..0da64ca31f8f0 100644 --- a/pkg/storage/stores/shipper/compactor/table.go +++ b/pkg/storage/stores/shipper/compactor/table.go @@ -204,7 +204,7 @@ func (t *table) compactFiles(objects []chunk.StorageObject) error { err = t.readFile(downloadAt) if err != nil { - level.Error(util_log.Logger).Log("msg", "error reading file", "err", err) + level.Error(util_log.Logger).Log("msg", fmt.Sprintf("error reading file %s", objectKey), "err", err) return } case <-t.quit: diff --git a/pkg/storage/stores/shipper/uploads/table.go b/pkg/storage/stores/shipper/uploads/table.go index 46cd871538253..f414635307b61 100644 --- a/pkg/storage/stores/shipper/uploads/table.go +++ b/pkg/storage/stores/shipper/uploads/table.go @@ -476,21 +476,37 @@ func loadBoltDBsFromDir(dir string) (map[string]*bbolt.DB, error) { if fileInfo.IsDir() { continue } + fullPath := filepath.Join(dir, fileInfo.Name()) if strings.HasSuffix(fileInfo.Name(), tempFileSuffix) || strings.HasSuffix(fileInfo.Name(), snapshotFileSuffix) { // If an ingester is killed abruptly in the middle of an upload operation it could leave out a temp file which holds the snapshot of db for uploading. // Cleaning up those temp files to avoid problems. - if err := os.Remove(filepath.Join(dir, fileInfo.Name())); err != nil { - level.Error(util_log.Logger).Log("msg", "failed to remove temp file", "name", fileInfo.Name(), "err", err) + if err := os.Remove(fullPath); err != nil { + level.Error(util_log.Logger).Log("msg", fmt.Sprintf("failed to remove temp file %s", fullPath), "err", err) } continue } - db, err := shipper_util.SafeOpenBoltdbFile(filepath.Join(dir, fileInfo.Name())) + db, err := shipper_util.SafeOpenBoltdbFile(fullPath) if err != nil { return nil, err } + hasBucket := false + _ = db.View(func(tx *bbolt.Tx) error { + hasBucket = tx.Bucket(bucketName) != nil + return nil + }) + + if !hasBucket { + level.Info(util_log.Logger).Log("msg", fmt.Sprintf("file %s has no bucket named %s, so removing it", fullPath, bucketName)) + _ = db.Close() + if err := os.Remove(fullPath); err != nil { + level.Error(util_log.Logger).Log("msg", fmt.Sprintf("failed to remove file %s without bucket", fullPath), "err", err) + } + continue + } + dbs[fileInfo.Name()] = db } diff --git a/pkg/storage/stores/shipper/uploads/table_test.go b/pkg/storage/stores/shipper/uploads/table_test.go index 8ba5d6b0f448f..081d03348ad73 100644 --- a/pkg/storage/stores/shipper/uploads/table_test.go +++ b/pkg/storage/stores/shipper/uploads/table_test.go @@ -347,6 +347,11 @@ func Test_LoadBoltDBsFromDir(t *testing.T) { }, }, false) + // create a boltdb file without bucket which should get removed + db, err := local.OpenBoltdbFile(filepath.Join(tablePath, "no-bucket")) + require.NoError(t, err) + require.NoError(t, db.Close()) + // try loading the dbs dbs, err := loadBoltDBsFromDir(tablePath) require.NoError(t, err) @@ -360,6 +365,10 @@ func Test_LoadBoltDBsFromDir(t *testing.T) { for _, boltdb := range dbs { require.NoError(t, boltdb.Close()) } + + filesInfo, err := ioutil.ReadDir(tablePath) + require.NoError(t, err) + require.Len(t, filesInfo, 2) } func TestTable_ImmutableUploads(t *testing.T) {