Skip to content

Commit

Permalink
[nspcc-dev#112] Add cache to ListObjects and layer
Browse files Browse the repository at this point in the history
Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
  • Loading branch information
masterSplinter01 committed Jul 28, 2021
1 parent c24fe5c commit 0ceea95
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 7 deletions.
10 changes: 6 additions & 4 deletions api/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (

type (
layer struct {
pool pool.Pool
log *zap.Logger
pool pool.Pool
log *zap.Logger
cache ObjectsListV2Cache
}

// Params stores basic API parameters.
Expand Down Expand Up @@ -128,8 +129,9 @@ const (
// and establishes gRPC connection with node.
func NewLayer(log *zap.Logger, conns pool.Pool) Client {
return &layer{
pool: conns,
log: log,
pool: conns,
log: log,
cache: newListObjectsCache(),
}
}

Expand Down
45 changes: 42 additions & 3 deletions api/layer/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -258,6 +259,8 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
result ListObjectsInfoV2
allObjects []*ObjectInfo
bkt *BucketInfo
cacheKey string
box *accessbox.Box
)

if p.MaxKeys == 0 {
Expand All @@ -268,9 +271,18 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
return nil, err
}

if box, err = GetBoxData(ctx); err != nil {
return nil, err
}

cacheKey = createKey(box.Gate.AccessKey, bkt.CID)

if p.ContinuationToken != "" {
// find cache with continuation token
} else {
allObjects = n.cache.Get(p.ContinuationToken, cacheKey)
allObjects = trimStartAfter(p.StartAfter, allObjects)
}

if allObjects == nil {
allObjects, err = n.listSortedAllObjects(ctx, allObjectParams{
Bucket: bkt,
Prefix: p.Prefix,
Expand All @@ -280,13 +292,20 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
if err != nil {
return nil, err
}

if p.ContinuationToken != "" {
allObjects = trimAfterObjectID(p.ContinuationToken, allObjects)
}
}

if len(allObjects) > p.MaxKeys {
result.IsTruncated = true

restObjects := allObjects[p.MaxKeys:]
n.cache.Put(cacheKey, restObjects)
result.NextContinuationToken = restObjects[0].id.String()

allObjects = allObjects[:p.MaxKeys]
// add creating of cache here
}

for _, ov := range allObjects {
Expand Down Expand Up @@ -343,3 +362,23 @@ func (n *layer) listSortedAllObjects(ctx context.Context, p allObjectParams) ([]

return objects, nil
}

func trimStartAfter(startAfter string, objects []*ObjectInfo) []*ObjectInfo {
if objects != nil && len(startAfter) != 0 && objects[0].Name <= startAfter {
for i := range objects {
if objects[i].Name > startAfter {
return objects[i:]
}
}
}
return objects
}

func trimAfterObjectID(id string, objects []*ObjectInfo) []*ObjectInfo {
for i, obj := range objects {
if obj.ID().String() == id {
return objects[i:]
}
}
return objects
}
79 changes: 79 additions & 0 deletions api/layer/object_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package layer

import (
"sync"
"time"

cid "github.com/nspcc-dev/neofs-api-go/pkg/container/id"
)

/*
This is an implementation of a cache for ListObjectsV2 which we return to users by ContinuationToken.
The cache is a map which has a key: (access_key from AccessBox) + container_id and a value: list of objects with
creation time. After putting a record we start a timer (via time.AfterFunc) that removes the record after
defaultCacheLifetime value.
ContinuationToken in our gateway is an objectID in NeoFS.
We don't keep ContinuationToken in this structure because we assume that users who received the token can reconnect
to other gateways and they should be able to get a list of objects.
When we receive the token from the user we just try to find the cache and then we return the list of objects which
starts from this token (i.e. objectID).
*/

// ObjectsListV2Cache provides interface for cache of ListObjectsV2 in a layer struct.
type (
ObjectsListV2Cache interface {
Get(token string, key string) []*ObjectInfo
Put(key string, objects []*ObjectInfo)
}
)

var (
defaultCacheLifetime = time.Second * 60
)

type (
listObjectsCache struct {
caches map[string]cache
mtx sync.RWMutex
}
cache struct {
list []*ObjectInfo
}
)

func newListObjectsCache() *listObjectsCache {
return &listObjectsCache{
caches: make(map[string]cache),
}
}

func (l *listObjectsCache) Get(token, key string) []*ObjectInfo {
l.mtx.RLock()
defer l.mtx.RUnlock()
if val, ok := l.caches[key]; ok {
return trimAfterObjectID(token, val.list)
}

return nil
}

func (l *listObjectsCache) Put(key string, objects []*ObjectInfo) {
var c cache

l.mtx.Lock()
defer l.mtx.Unlock()
c.list = objects
l.caches[key] = c
time.AfterFunc(defaultCacheLifetime, func() {
l.mtx.Lock()
delete(l.caches, key)
l.mtx.Unlock()
})
}

func createKey(accessKey string, cid *cid.ID) string {
return accessKey + cid.String()
}

0 comments on commit 0ceea95

Please sign in to comment.