Skip to content

Commit

Permalink
remove directories from object store list operation (#3394)
Browse files Browse the repository at this point in the history
  • Loading branch information
slim-bean authored Feb 26, 2021
1 parent a3a2c1a commit cf8be4a
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pkg/storage/stores/shipper/compactor/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (t *table) compact() error {
return
}

// The s3 client can also return the directory itself in the ListObjects.
if shipper_util.IsDirectory(objectKey) {
continue
}

var dbName string
dbName, err = shipper_util.GetDBNameFromObjectKey(objectKey)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/stores/shipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,11 @@ func (t *Table) init(ctx context.Context, spanLogger log.Logger) (err error) {

level.Debug(spanLogger).Log("total-files-downloaded", len(objects))

objects = shipper_util.RemoveDirectories(objects)

// open all the downloaded dbs
for _, object := range objects {

dbName, err := getDBNameFromObjectKey(object.Key)
if err != nil {
return err
Expand Down Expand Up @@ -405,7 +408,10 @@ func (t *Table) checkStorageForUpdates(ctx context.Context) (toDownload []chunk.
t.dbsMtx.RLock()
defer t.dbsMtx.RUnlock()

objects = shipper_util.RemoveDirectories(objects)

for _, object := range objects {

dbName, err := getDBNameFromObjectKey(object.Key)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -503,6 +509,11 @@ func (t *Table) doParallelDownload(ctx context.Context, objects []chunk.StorageO
break
}

// The s3 client can also return the directory itself in the ListObjects.
if shipper_util.IsDirectory(object.Key) {
continue
}

var dbName string
dbName, err = getDBNameFromObjectKey(object.Key)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/stores/shipper/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
Expand Down Expand Up @@ -176,3 +177,20 @@ func attemptCleanFlock(f *os.File) {
return f, errors.New("error for cleanup")
})
}

// RemoveDirectories will return a new slice with any StorageObjects identified as directories removed.
func RemoveDirectories(incoming []chunk.StorageObject) []chunk.StorageObject {
outgoing := make([]chunk.StorageObject, 0, len(incoming))
for _, o := range incoming {
if IsDirectory(o.Key) {
continue
}
outgoing = append(outgoing, o)
}
return outgoing
}

// IsDirectory will return true if the string ends in a forward slash
func IsDirectory(key string) bool {
return strings.HasSuffix(key, "/")
}
75 changes: 75 additions & 0 deletions pkg/storage/stores/shipper/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"path/filepath"
"testing"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/stores/shipper/testutil"
Expand Down Expand Up @@ -76,3 +78,76 @@ func Test_CompressFile(t *testing.T) {

require.Equal(t, testData, b)
}

func TestRemoveDirectories(t *testing.T) {
tests := []struct {
name string
incoming []chunk.StorageObject
expected []chunk.StorageObject
}{
{
name: "no trailing slash",
incoming: []chunk.StorageObject{
{Key: "obj1"},
{Key: "obj2"},
{Key: "obj3"},
},
expected: []chunk.StorageObject{
{Key: "obj1"},
{Key: "obj2"},
{Key: "obj3"},
},
},
{
name: "one trailing slash",
incoming: []chunk.StorageObject{
{Key: "obj1"},
{Key: "obj2/"},
{Key: "obj3"},
},
expected: []chunk.StorageObject{
{Key: "obj1"},
{Key: "obj3"},
},
},
{
name: "only trailing slash",
incoming: []chunk.StorageObject{
{Key: "obj1"},
{Key: "obj2"},
{Key: "/"},
},
expected: []chunk.StorageObject{
{Key: "obj1"},
{Key: "obj2"},
},
},
{
name: "all trailing slash",
incoming: []chunk.StorageObject{
{Key: "/"},
{Key: "/"},
{Key: "/"},
},
expected: []chunk.StorageObject{},
},
{
name: "internal slash",
incoming: []chunk.StorageObject{
{Key: "test/test1"},
{Key: "te/st"},
{Key: "/sted"},
},
expected: []chunk.StorageObject{
{Key: "test/test1"},
{Key: "te/st"},
{Key: "/sted"},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.expected, RemoveDirectories(test.incoming))
})
}
}

0 comments on commit cf8be4a

Please sign in to comment.