Skip to content

Commit

Permalink
do not list buckets without local quorum (minio#20852)
Browse files Browse the repository at this point in the history
ListBuckets() would result in listing buckets
without quorum, this PR fixes the behavior.
  • Loading branch information
harshavardhana authored Jan 19, 2025
1 parent 3d0f513 commit 779ec8f
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 33 deletions.
46 changes: 33 additions & 13 deletions cmd/erasure-healing.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/minio/minio/internal/grid"
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v3/sync/errgroup"
"github.com/puzpuzpuz/xsync/v3"
)

//go:generate stringer -type=healingMetric -trimprefix=healingMetric $GOFILE
Expand Down Expand Up @@ -117,9 +118,8 @@ func (er erasureObjects) listAndHeal(ctx context.Context, bucket, prefix string,

// listAllBuckets lists all buckets from all disks. It also
// returns the occurrence of each buckets in all disks
func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets map[string]VolInfo, readQuorum int) error {
func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets *xsync.MapOf[string, VolInfo], readQuorum int) error {
g := errgroup.WithNErrs(len(storageDisks))
var mu sync.Mutex
for index := range storageDisks {
index := index
g.Go(func() error {
Expand All @@ -131,32 +131,52 @@ func listAllBuckets(ctx context.Context, storageDisks []StorageAPI, healBuckets
if err != nil {
return err
}

for _, volInfo := range volsInfo {
// StorageAPI can send volume names which are
// incompatible with buckets - these are
// skipped, like the meta-bucket.
if isReservedOrInvalidBucket(volInfo.Name, false) {
continue
}
mu.Lock()
if _, ok := healBuckets[volInfo.Name]; !ok {
healBuckets[volInfo.Name] = volInfo
}
mu.Unlock()

healBuckets.Compute(volInfo.Name, func(oldValue VolInfo, loaded bool) (newValue VolInfo, del bool) {
if loaded {
newValue = oldValue
newValue.count = oldValue.count + 1
return newValue, false
}
return VolInfo{
Name: volInfo.Name,
Created: volInfo.Created,
count: 1,
}, false
})
}

return nil
}, index)
}
return reduceReadQuorumErrs(ctx, g.Wait(), bucketMetadataOpIgnoredErrs, readQuorum)
}

var errLegacyXLMeta = errors.New("legacy XL meta")
if err := reduceReadQuorumErrs(ctx, g.Wait(), bucketMetadataOpIgnoredErrs, readQuorum); err != nil {
return err
}

healBuckets.Range(func(volName string, volInfo VolInfo) bool {
if volInfo.count < readQuorum {
healBuckets.Delete(volName)
}
return true
})

var errOutdatedXLMeta = errors.New("outdated XL meta")
return nil
}

var (
errPartCorrupt = errors.New("part corrupt")
errPartMissing = errors.New("part missing")
errLegacyXLMeta = errors.New("legacy XL meta")
errOutdatedXLMeta = errors.New("outdated XL meta")
errPartCorrupt = errors.New("part corrupt")
errPartMissing = errors.New("part missing")
)

// Only heal on disks where we are sure that healing is needed. We can expand
Expand Down
10 changes: 3 additions & 7 deletions cmd/erasure-sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/minio/minio/internal/logger"
"github.com/minio/pkg/v3/console"
"github.com/minio/pkg/v3/sync/errgroup"
"github.com/puzpuzpuz/xsync/v3"
)

// setsDsyncLockers is encapsulated type for Close()
Expand Down Expand Up @@ -701,9 +702,8 @@ func (s *erasureSets) getHashedSet(input string) (set *erasureObjects) {
}

// listDeletedBuckets lists deleted buckets from all disks.
func listDeletedBuckets(ctx context.Context, storageDisks []StorageAPI, delBuckets map[string]VolInfo, readQuorum int) error {
func listDeletedBuckets(ctx context.Context, storageDisks []StorageAPI, delBuckets *xsync.MapOf[string, VolInfo], readQuorum int) error {
g := errgroup.WithNErrs(len(storageDisks))
var mu sync.Mutex
for index := range storageDisks {
index := index
g.Go(func() error {
Expand All @@ -722,11 +722,7 @@ func listDeletedBuckets(ctx context.Context, storageDisks []StorageAPI, delBucke
vi, err := storageDisks[index].StatVol(ctx, pathJoin(minioMetaBucket, bucketMetaPrefix, deletedBucketsPrefix, volName))
if err == nil {
vi.Name = strings.TrimSuffix(volName, SlashSeparator)
mu.Lock()
if _, ok := delBuckets[volName]; !ok {
delBuckets[volName] = vi
}
mu.Unlock()
delBuckets.Store(volName, vi)
}
}
return nil
Expand Down
23 changes: 13 additions & 10 deletions cmd/peer-s3-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/minio/madmin-go/v3"
"github.com/minio/pkg/v3/sync/errgroup"
"github.com/puzpuzpuz/xsync/v3"
)

const (
Expand Down Expand Up @@ -164,15 +165,15 @@ func listBucketsLocal(ctx context.Context, opts BucketOptions) (buckets []Bucket
quorum := (len(localDrives) / 2)

buckets = make([]BucketInfo, 0, 32)
healBuckets := map[string]VolInfo{}
healBuckets := xsync.NewMapOf[string, VolInfo]()

// lists all unique buckets across drives.
if err := listAllBuckets(ctx, localDrives, healBuckets, quorum); err != nil {
return nil, err
}

// include deleted buckets in listBuckets output
deletedBuckets := map[string]VolInfo{}
deletedBuckets := xsync.NewMapOf[string, VolInfo]()

if opts.Deleted {
// lists all deleted buckets across drives.
Expand All @@ -181,25 +182,27 @@ func listBucketsLocal(ctx context.Context, opts BucketOptions) (buckets []Bucket
}
}

for _, v := range healBuckets {
healBuckets.Range(func(_ string, volInfo VolInfo) bool {
bi := BucketInfo{
Name: v.Name,
Created: v.Created,
Name: volInfo.Name,
Created: volInfo.Created,
}
if vi, ok := deletedBuckets[v.Name]; ok {
if vi, ok := deletedBuckets.Load(volInfo.Name); ok {
bi.Deleted = vi.Created
}
buckets = append(buckets, bi)
}
return true
})

for _, v := range deletedBuckets {
if _, ok := healBuckets[v.Name]; !ok {
deletedBuckets.Range(func(_ string, v VolInfo) bool {
if _, ok := healBuckets.Load(v.Name); !ok {
buckets = append(buckets, BucketInfo{
Name: v.Name,
Deleted: v.Created,
})
}
}
return true
})

return buckets, nil
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/storage-datatypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,16 @@ type VolsInfo []VolInfo
// VolInfo - represents volume stat information.
// The above means that any added/deleted fields are incompatible.
//
// The above means that any added/deleted fields are incompatible.
//
//msgp:tuple VolInfo
type VolInfo struct {
// Name of the volume.
Name string

// Date and time when the volume was created.
Created time.Time

// total VolInfo counts
count int
}

// FilesInfo represent a list of files, additionally
Expand Down
2 changes: 1 addition & 1 deletion cmd/xl-storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ func TestXLStorageListVols(t *testing.T) {
if volInfos, err = xlStorage.ListVols(context.Background()); err != nil {
t.Fatalf("expected: <nil>, got: %s", err)
} else if len(volInfos) != 1 {
t.Fatalf("expected: one entry, got: %s", volInfos)
t.Fatalf("expected: one entry, got: %v", volInfos)
}

// TestXLStorage non-empty list vols.
Expand Down

0 comments on commit 779ec8f

Please sign in to comment.