Skip to content

Commit

Permalink
[nspcc-dev#112] Fix comments from PR
Browse files Browse the repository at this point in the history
Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
  • Loading branch information
masterSplinter01 committed Jul 26, 2021
1 parent c10ab0d commit 660bffe
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 42 deletions.
29 changes: 18 additions & 11 deletions api/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type (
pool pool.Pool
log *zap.Logger
cache ObjectsListV2Cache

workers []Worker
}

// Params stores basic API parameters.
Expand Down Expand Up @@ -98,16 +100,16 @@ type (
Get(ctx context.Context, address *object.Address) (*object.Object, error)
}

// Timer provides basic interface for timer ¯\_(ツ)_/¯.
Timer interface {
StartTimer()
StopTimer()
// Workers provides basic interface for timer ¯\_(ツ)_/¯.
Workers interface {
StartWorkers()
StopWorkers()
}

// Client provides S3 API client interface.
Client interface {
NeoFS
Timer
Workers

ListBuckets(ctx context.Context) ([]*BucketInfo, error)
GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error)
Expand Down Expand Up @@ -147,12 +149,13 @@ const (

// NewLayer creates instance of layer. It checks credentials
// and establishes gRPC connection with node.
func NewLayer(log *zap.Logger, conns pool.Pool) Client {
cache := newListObjectsCache()
func NewLayer(ctx context.Context, log *zap.Logger, conns pool.Pool) Client {
cache := newListObjectsCache(ctx)
return &layer{
pool: conns,
log: log,
cache: cache,
workers: []Worker{cache},
}
}

Expand Down Expand Up @@ -476,11 +479,15 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
return &res, nil
}

func (n *layer) StartTimer() {
n.cache.StartTimer()
func (n *layer) StartWorkers() {
for _, wrk := range n.workers {
wrk.Start()
}
}

func (n *layer) StopTimer() {
func (n *layer) StopWorkers() {
n.log.Info("stopping timer")
n.cache.StopTimer()
for _, wrk := range n.workers {
wrk.Stop()
}
}
4 changes: 2 additions & 2 deletions api/layer/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
return nil, fmt.Errorf("couldn't get access key from the accessbox: %s", err)
}

cacheKey = createKey(&accessKey, bkt.CID)
cacheKey = createKey(accessKey, bkt.CID)

if p.ContinuationToken != "" {
allObjects = n.cache.GetCache(p.ContinuationToken, cacheKey)
Expand Down Expand Up @@ -304,7 +304,7 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis
result.IsTruncated = true

restObjects := allObjects[p.MaxKeys:]
n.cache.PutCache(createKey(&accessKey, bkt.CID), restObjects)
n.cache.PutCache(createKey(accessKey, bkt.CID), restObjects)
result.NextContinuationToken = restObjects[0].id.String()

allObjects = allObjects[:p.MaxKeys]
Expand Down
76 changes: 50 additions & 26 deletions api/layer/object_cache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package layer

import (
"context"
"sync"
"time"

Expand Down Expand Up @@ -28,20 +29,26 @@ import (
*/

// ObjectsListV2Cache provides interface for cache of ListObjectsV2 in a layer struct.
type ObjectsListV2Cache interface {
Timer
GetCache(token string, key string) []*ObjectInfo
PutCache(key string, objects []*ObjectInfo)
}
type (
ObjectsListV2Cache interface {
GetCache(token string, key string) []*ObjectInfo
PutCache(key string, objects []*ObjectInfo)
}
Worker interface {
Start()
Stop()
}
)

var (
timerDefaultTickerTime = time.Second * 60
ticker = time.NewTicker(timerDefaultTickerTime)
ticker = time.NewTicker(timerDefaultTickerTime)
)

type (
listObjectsCache struct {
caches map[string]cache
ctx context.Context
quit chan struct{}
mtx sync.RWMutex
}
Expand All @@ -53,6 +60,8 @@ type (

func (l *listObjectsCache) GetCache(token, key string) []*ObjectInfo {
if val, ok := l.caches[key]; ok {
l.mtx.RLock()
defer l.mtx.RUnlock()
for i, obj := range val.list {
if obj.ID().String() == token {
return val.list[i:]
Expand All @@ -70,42 +79,57 @@ func (l *listObjectsCache) PutCache(key string, objects []*ObjectInfo) {
l.mtx.Lock()
defer l.mtx.Unlock()
c.list = objects
c.created = time.Now().UTC()
c.created = time.Now()
l.caches[key] = c
}

func (l *listObjectsCache) StartTimer() {
for {
select {
case <-ticker.C:
cur := time.Now()
for key, obj := range l.caches {
if cur.Sub(obj.created) >= time.Minute {
l.mtx.Lock()
delete(l.caches, key)
l.mtx.Unlock()
func (l *listObjectsCache) Start() {
done := l.ctx.Done()
go func() {
for {
select {
case <-ticker.C:
l.checkAndCleanCaches()
case _, ok := <-l.quit:
if !ok {
ticker.Stop()
return
}
case <-done:
ticker.Stop()
return

}
case <-l.quit:
ticker.Stop()
return
}
}
}()
}

func (l *listObjectsCache) StopTimer() {
l.quit <- struct{}{}
func (l *listObjectsCache) Stop() {
close(l.quit)
}

func createKey(accessKey *string, cid *cid.ID) string {
return *accessKey + cid.String()
func createKey(accessKey string, cid *cid.ID) string {
return accessKey + cid.String()
}

func newListObjectsCache() *listObjectsCache {
func newListObjectsCache(ctx context.Context) *listObjectsCache {
c := listObjectsCache{}

c.quit = make(chan struct{})
c.caches = make(map[string]cache)

c.ctx = ctx

return &c
}

func (l *listObjectsCache) checkAndCleanCaches() {
cur := time.Now()
for key, obj := range l.caches {
if cur.Sub(obj.created) >= timerDefaultTickerTime {
l.mtx.Lock()
delete(l.caches, key)
l.mtx.Unlock()
}
}
}
6 changes: 3 additions & 3 deletions cmd/s3-gw/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App {
}

// prepare object layer
obj = layer.NewLayer(l, conns)
obj = layer.NewLayer(ctx, l, conns)

// prepare auth center
ctr = auth.New(conns, key)
Expand Down Expand Up @@ -175,7 +175,7 @@ func (a *App) Server(ctx context.Context) {
srv.Handler = router
srv.ErrorLog = zap.NewStdLog(a.log)

go a.obj.StartTimer()
a.obj.StartWorkers()

go func() {
a.log.Info("starting server",
Expand Down Expand Up @@ -204,7 +204,7 @@ func (a *App) Server(ctx context.Context) {
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()

a.obj.StopTimer()
a.obj.StopWorkers()

a.log.Info("stopping server",
zap.Error(srv.Shutdown(ctx)))
Expand Down

0 comments on commit 660bffe

Please sign in to comment.