Skip to content

Commit

Permalink
[nspcc-dev#112] api: Change a way to clean cache
Browse files Browse the repository at this point in the history
Replaced background Worker by time.StartAfter

Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
  • Loading branch information
masterSplinter01 committed Jul 28, 2021
1 parent a6500eb commit 17b49f7
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 94 deletions.
27 changes: 2 additions & 25 deletions api/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ type (
pool pool.Pool
log *zap.Logger
cache ObjectsListV2Cache

workers []Worker
}

// Params stores basic API parameters.
Expand Down Expand Up @@ -98,16 +96,9 @@ type (
Get(ctx context.Context, address *object.Address) (*object.Object, error)
}

// Workers provides basic interface for background workers ¯\_(ツ)_/¯.
Workers interface {
StartWorkers()
StopWorkers()
}

// Client provides S3 API client interface.
Client interface {
NeoFS
Workers

ListBuckets(ctx context.Context) ([]*BucketInfo, error)
GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error)
Expand Down Expand Up @@ -136,13 +127,12 @@ const (

// NewLayer creates instance of layer. It checks credentials
// and establishes gRPC connection with node.
func NewLayer(ctx context.Context, log *zap.Logger, conns pool.Pool) Client {
cache := newListObjectsCache(ctx)
func NewLayer(log *zap.Logger, conns pool.Pool) Client {
cache := newListObjectsCache()
return &layer{
pool: conns,
log: log,
cache: cache,
workers: []Worker{cache},
}
}

Expand Down Expand Up @@ -461,16 +451,3 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar
}
return &res, nil
}

func (n *layer) StartWorkers() {
for _, wrk := range n.workers {
wrk.Start()
}
}

func (n *layer) StopWorkers() {
n.log.Info("stopping timer")
for _, wrk := range n.workers {
wrk.Stop()
}
}
79 changes: 15 additions & 64 deletions api/layer/object_cache.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package layer

import (
"context"
"sync"
"time"

Expand All @@ -11,14 +10,9 @@ import (
/*
This is an implementation of a cache for ListObjectsV2 which we return to users by ContinuationToken.
The cache consists of a map with data and a timer.
The map has a key: (access_key from AccessBox) + container_id and a value: list of objects with creation time.
The timer every timerDefaultTickerTime checks the map with caches and removes the expired caches.
NB: Because the timer checks all caches at one time the caches can live longer than timerDefaultTickerTime.
E.g. if timerDefaultTickerTime is 1 minute and check happened when some cache is alive for 50 seconds, this cache
will not be cleaned, it will be cleaned next time => this cache will live for almost 2 minutes.
That's why IRL the caches will live timerDefaultTickerTime * 2.
The cache is a map which has a key: (access_key from AccessBox) + container_id and a value: list of objects with
creation time. After putting a record we start a timer (via time.AfterFunc) that removes the record after
defaultCacheLifetime value.
ContinuationToken in our gateway is an objectID in NeoFS.
Expand All @@ -34,22 +28,15 @@ type (
GetCache(token string, key string) []*ObjectInfo
PutCache(key string, objects []*ObjectInfo)
}
Worker interface {
Start()
Stop()
}
)

var (
timerDefaultTickerTime = time.Second * 60
ticker = time.NewTicker(timerDefaultTickerTime)
defaultCacheLifetime = time.Second * 60
)

type (
listObjectsCache struct {
caches map[string]cache
ctx context.Context
quit chan struct{}
mtx sync.RWMutex
}
cache struct {
Expand All @@ -58,6 +45,12 @@ type (
}
)

func newListObjectsCache() *listObjectsCache {
c := listObjectsCache{}
c.caches = make(map[string]cache)
return &c
}

func (l *listObjectsCache) GetCache(token, key string) []*ObjectInfo {
l.mtx.RLock()
defer l.mtx.RUnlock()
Expand All @@ -81,55 +74,13 @@ func (l *listObjectsCache) PutCache(key string, objects []*ObjectInfo) {
c.list = objects
c.created = time.Now()
l.caches[key] = c
}

func (l *listObjectsCache) Start() {
done := l.ctx.Done()
go func() {
for {
select {
case <-ticker.C:
l.checkAndCleanCaches()
case _, ok := <-l.quit:
if !ok {
ticker.Stop()
return
}
case <-done:
ticker.Stop()
return

}
}
}()
}

func (l *listObjectsCache) Stop() {
close(l.quit)
time.AfterFunc(defaultCacheLifetime, func() {
l.mtx.Lock()
delete(l.caches, key)
l.mtx.Unlock()
})
}

func createKey(accessKey string, cid *cid.ID) string {
return accessKey + cid.String()
}

func newListObjectsCache(ctx context.Context) *listObjectsCache {
c := listObjectsCache{}

c.quit = make(chan struct{})
c.caches = make(map[string]cache)

c.ctx = ctx

return &c
}

func (l *listObjectsCache) checkAndCleanCaches() {
cur := time.Now()
for key, obj := range l.caches {
if cur.Sub(obj.created) >= timerDefaultTickerTime {
l.mtx.Lock()
delete(l.caches, key)
l.mtx.Unlock()
}
}
}
6 changes: 1 addition & 5 deletions cmd/s3-gw/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App {
}

// prepare object layer
obj = layer.NewLayer(ctx, l, conns)
obj = layer.NewLayer(l, conns)

// prepare auth center
ctr = auth.New(conns, key)
Expand Down Expand Up @@ -175,8 +175,6 @@ func (a *App) Server(ctx context.Context) {
srv.Handler = router
srv.ErrorLog = zap.NewStdLog(a.log)

a.obj.StartWorkers()

go func() {
a.log.Info("starting server",
zap.String("bind", addr))
Expand Down Expand Up @@ -204,8 +202,6 @@ func (a *App) Server(ctx context.Context) {
ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout)
defer cancel()

a.obj.StopWorkers()

a.log.Info("stopping server",
zap.Error(srv.Shutdown(ctx)))

Expand Down

0 comments on commit 17b49f7

Please sign in to comment.