Skip to content

Commit

Permalink
[nspcc-dev#112] api: Add cache for ListObjectsV2
Browse files Browse the repository at this point in the history
Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
  • Loading branch information
masterSplinter01 committed Jul 26, 2021
1 parent 70250c2 commit 4e3025a
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 7 deletions.
27 changes: 23 additions & 4 deletions api/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (

type (
layer struct {
pool pool.Pool
log *zap.Logger
pool pool.Pool
log *zap.Logger
cache ObjectsListV2Cache
}

// Params stores basic API parameters.
Expand Down Expand Up @@ -97,9 +98,16 @@ type (
Get(ctx context.Context, address *object.Address) (*object.Object, error)
}

// Timer provides basic interface for timer ¯\_(ツ)_/¯.
Timer interface {
StartTimer()
StopTimer()
}

// Client provides S3 API client interface.
Client interface {
NeoFS
Timer

ListBuckets(ctx context.Context) ([]*BucketInfo, error)
GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error)
Expand Down Expand Up @@ -140,9 +148,11 @@ const (
// NewLayer creates instance of layer. It checks credentials
// and establishes gRPC connection with node.
func NewLayer(log *zap.Logger, conns pool.Pool) Client {
cache := newListObjectsCache()
return &layer{
pool: conns,
log: log,
pool: conns,
log: log,
cache: cache,
}
}

Expand Down Expand Up @@ -465,3 +475,12 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
}
return &res, nil
}

func (n *layer) StartTimer() {
n.cache.StartTimer()
}

func (n *layer) StopTimer() {
n.log.Info("stopping timer")
n.cache.StopTimer()
}
32 changes: 29 additions & 3 deletions api/layer/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package layer
import (
"context"
"errors"
"fmt"
"io"
"net/url"
"sort"
Expand All @@ -13,6 +14,7 @@ import (
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -253,6 +255,8 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
result ListObjectsInfoV2
allObjects []*ObjectInfo
bkt *BucketInfo
accessKey string
cacheKey string
)

if p.MaxKeys == 0 {
Expand All @@ -263,9 +267,19 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
return nil, err
}

if p.ContinuationToken != "" {
// find cache with continuation token
if data, ok := ctx.Value(api.GateData).(*accessbox.GateData); ok && data != nil {
accessKey = data.AccessKey
} else {
return nil, fmt.Errorf("couldn't get access key from the accessbox: %s", err)
}

cacheKey = createKey(&accessKey, bkt.CID)

if p.ContinuationToken != "" {
allObjects = n.cache.GetCache(p.ContinuationToken, cacheKey)
}

if allObjects == nil {
allObjects, err = n.listSortedAllObjects(ctx, allObjectParams{
Bucket: bkt,
Prefix: p.Prefix,
Expand All @@ -275,13 +289,25 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
if err != nil {
return nil, err
}

if p.ContinuationToken != "" {
for i, obj := range allObjects {
if obj.ID().String() == p.ContinuationToken {
allObjects = allObjects[i:]
break
}
}
}
}

if len(allObjects) > p.MaxKeys {
result.IsTruncated = true

restObjects := allObjects[p.MaxKeys:]
n.cache.PutCache(createKey(&accessKey, bkt.CID), restObjects)
result.NextContinuationToken = restObjects[0].id.String()

allObjects = allObjects[:p.MaxKeys]
// add creating of cache here
}

for _, ov := range allObjects {
Expand Down
90 changes: 90 additions & 0 deletions api/layer/object_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package layer

import (
"sync"
"time"

cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
)

// ObjectsListV2Cache provides interface for cache of ListObjectsV2 in a layer struct.
type ObjectsListV2Cache interface {
Timer
GetCache(token string, key string) []*ObjectInfo
PutCache(key string, objects []*ObjectInfo)
}

var (
ticker = time.NewTicker(time.Second * 60)
)

type (
listObjectsCache struct {
caches map[string]cache
quit chan struct{}
mtx sync.RWMutex
}
cache struct {
list []*ObjectInfo
created time.Time
}
)

func (l *listObjectsCache) GetCache(token, key string) []*ObjectInfo {
if val, ok := l.caches[key]; ok {
for i, obj := range val.list {
if obj.ID().String() == token {
return val.list[i:]
}
}
}

return nil
}

func (l *listObjectsCache) PutCache(key string, objects []*ObjectInfo) {
var (
c cache
)
l.mtx.Lock()
defer l.mtx.Unlock()
c.list = objects
c.created = time.Now().UTC()
l.caches[key] = c
}

func (l *listObjectsCache) StartTimer() {
for {
select {
case <-ticker.C:
cur := time.Now()
for key, obj := range l.caches {
if cur.Sub(obj.created) >= time.Minute {
l.mtx.Lock()
delete(l.caches, key)
l.mtx.Unlock()
}
}
case <-l.quit:
ticker.Stop()
return
}
}
}

func (l *listObjectsCache) StopTimer() {
l.quit <- struct{}{}
}

func createKey(accessKey *string, cid *cid.ID) string {
return *accessKey + cid.String()
}

func newListObjectsCache() *listObjectsCache {
c := listObjectsCache{}

c.quit = make(chan struct{})
c.caches = make(map[string]cache)

return &c
}
4 changes: 4 additions & 0 deletions cmd/s3-gw/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ func (a *App) Server(ctx context.Context) {
srv.Handler = router
srv.ErrorLog = zap.NewStdLog(a.log)

go a.obj.StartTimer()

go func() {
a.log.Info("starting server",
zap.String("bind", addr))
Expand Down Expand Up @@ -202,6 +204,8 @@ func (a *App) Server(ctx context.Context) {
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()

a.obj.StopTimer()

a.log.Info("stopping server",
zap.Error(srv.Shutdown(ctx)))

Expand Down

0 comments on commit 4e3025a

Please sign in to comment.