diff --git a/api/layer/layer.go b/api/layer/layer.go index ab31d589..461f2c2a 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -25,6 +25,8 @@ type ( pool pool.Pool log *zap.Logger cache ObjectsListV2Cache + + workers []Worker } // Params stores basic API parameters. @@ -96,16 +98,16 @@ type ( Get(ctx context.Context, address *object.Address) (*object.Object, error) } - // Timer provides basic interface for timer ¯\_(ツ)_/¯. - Timer interface { - StartTimer() - StopTimer() + // Workers provides basic interface for background workers ¯\_(ツ)_/¯. + Workers interface { + StartWorkers() + StopWorkers() } // Client provides S3 API client interface. Client interface { NeoFS - Timer + Workers ListBuckets(ctx context.Context) ([]*BucketInfo, error) GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error) @@ -134,12 +136,13 @@ const ( // NewLayer creates instance of layer. It checks credentials // and establishes gRPC connection with node. -func NewLayer(log *zap.Logger, conns pool.Pool) Client { - cache := newListObjectsCache() +func NewLayer(ctx context.Context, log *zap.Logger, conns pool.Pool) Client { + cache := newListObjectsCache(ctx) return &layer{ pool: conns, log: log, cache: cache, + workers: []Worker{cache}, } } @@ -459,11 +462,15 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar return &res, nil } -func (n *layer) StartTimer() { - n.cache.StartTimer() +func (n *layer) StartWorkers() { + for _, wrk := range n.workers { + wrk.Start() + } } -func (n *layer) StopTimer() { +func (n *layer) StopWorkers() { n.log.Info("stopping timer") - n.cache.StopTimer() + for _, wrk := range n.workers { + wrk.Stop() + } } diff --git a/api/layer/object.go b/api/layer/object.go index 2d36b963..bbbc089e 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -278,7 +278,7 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis return nil, fmt.Errorf("couldn't get access key from the accessbox: %s", err) } - cacheKey = createKey(&accessKey, bkt.CID) + cacheKey = createKey(accessKey, bkt.CID) if p.ContinuationToken != "" { allObjects = n.cache.GetCache(p.ContinuationToken, cacheKey) @@ -309,7 +309,7 @@ func (n *layer) ListObjectsV2(ctx context.Context, p *ListObjectsParamsV2) (*Lis result.IsTruncated = true restObjects := allObjects[p.MaxKeys:] - n.cache.PutCache(createKey(&accessKey, bkt.CID), restObjects) + n.cache.PutCache(cacheKey, restObjects) result.NextContinuationToken = restObjects[0].id.String() allObjects = allObjects[:p.MaxKeys] diff --git a/api/layer/object_cache.go b/api/layer/object_cache.go index 5099dc17..00942e33 100644 --- a/api/layer/object_cache.go +++ b/api/layer/object_cache.go @@ -1,6 +1,7 @@ package layer import ( + "context" "sync" "time" @@ -28,20 +29,26 @@ import ( */ // ObjectsListV2Cache provides interface for cache of ListObjectsV2 in a layer struct. -type ObjectsListV2Cache interface { - Timer - GetCache(token string, key string) []*ObjectInfo - PutCache(key string, objects []*ObjectInfo) -} +type ( + ObjectsListV2Cache interface { + GetCache(token string, key string) []*ObjectInfo + PutCache(key string, objects []*ObjectInfo) + } + Worker interface { + Start() + Stop() + } +) var ( timerDefaultTickerTime = time.Second * 60 - ticker = time.NewTicker(timerDefaultTickerTime) + ticker = time.NewTicker(timerDefaultTickerTime) ) type ( listObjectsCache struct { caches map[string]cache + ctx context.Context quit chan struct{} mtx sync.RWMutex } @@ -52,6 +59,8 @@ type ( ) func (l *listObjectsCache) GetCache(token, key string) []*ObjectInfo { + l.mtx.RLock() + defer l.mtx.RUnlock() if val, ok := l.caches[key]; ok { for i, obj := range val.list { if obj.ID().String() == token { @@ -70,42 +79,57 @@ func (l *listObjectsCache) PutCache(key string, objects []*ObjectInfo) { l.mtx.Lock() defer l.mtx.Unlock() c.list = objects - c.created = time.Now().UTC() + c.created = time.Now() l.caches[key] = c } -func (l *listObjectsCache) StartTimer() { - for { - select { - case <-ticker.C: - cur := time.Now() - for key, obj := range l.caches { - if cur.Sub(obj.created) >= time.Minute { - l.mtx.Lock() - delete(l.caches, key) - l.mtx.Unlock() +func (l *listObjectsCache) Start() { + done := l.ctx.Done() + go func() { + for { + select { + case <-ticker.C: + l.checkAndCleanCaches() + case _, ok := <-l.quit: + if !ok { + ticker.Stop() + return } + case <-done: + ticker.Stop() + return + } - case <-l.quit: - ticker.Stop() - return } - } + }() } -func (l *listObjectsCache) StopTimer() { - l.quit <- struct{}{} +func (l *listObjectsCache) Stop() { + close(l.quit) } -func createKey(accessKey *string, cid *cid.ID) string { - return *accessKey + cid.String() +func createKey(accessKey string, cid *cid.ID) string { + return accessKey + cid.String() } -func newListObjectsCache() *listObjectsCache { +func newListObjectsCache(ctx context.Context) *listObjectsCache { c := listObjectsCache{} c.quit = make(chan struct{}) c.caches = make(map[string]cache) + c.ctx = ctx + return &c } + +func (l *listObjectsCache) checkAndCleanCaches() { + cur := time.Now() + for key, obj := range l.caches { + if cur.Sub(obj.created) >= timerDefaultTickerTime { + l.mtx.Lock() + delete(l.caches, key) + l.mtx.Unlock() + } + } +} diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 1992f120..5bd1f72f 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -109,7 +109,7 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { } // prepare object layer - obj = layer.NewLayer(l, conns) + obj = layer.NewLayer(ctx, l, conns) // prepare auth center ctr = auth.New(conns, key) @@ -175,7 +175,7 @@ func (a *App) Server(ctx context.Context) { srv.Handler = router srv.ErrorLog = zap.NewStdLog(a.log) - go a.obj.StartTimer() + a.obj.StartWorkers() go func() { a.log.Info("starting server", @@ -204,7 +204,7 @@ func (a *App) Server(ctx context.Context) { ctx, cancel := context.WithTimeout(context.Background(), defaultShutdownTimeout) defer cancel() - a.obj.StopTimer() + a.obj.StopWorkers() a.log.Info("stopping server", zap.Error(srv.Shutdown(ctx)))