Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap): Add store cache #3543

Merged
merged 4 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 236 additions & 0 deletions store/cache/accessor_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package cache

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru/v2"

eds "github.com/celestiaorg/celestia-node/share/new_eds"
)

const defaultCloseTimeout = time.Minute

var _ Cache = (*AccessorCache)(nil)

// AccessorCache implements the Cache interface using an LRU cache backend.
type AccessorCache struct {
// The name is a prefix that will be used for cache metrics if they are enabled.
name string
// stripedLocks prevents simultaneous RW access to the accessor cache. Instead
// of using only one lock or one lock per uint64, we stripe the uint64s across 256 locks. 256 is
// chosen because it 0-255 is the range of values we get looking at the last byte of the uint64.
stripedLocks [256]*sync.RWMutex
// Caches the accessor for a given uint64 for accessor read affinity, i.e., further reads will
// likely be from the same accessor. Maps (Datahash -> accessor).
cache *lru.Cache[uint64, *accessor]

metrics *metrics
}

// accessor is the value stored in Cache. It implements the eds.AccessorStreamer interface. It has a
// reference counted so that it can be removed from the cache only when all references are released.
type accessor struct {
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
eds.AccessorStreamer

lock sync.Mutex
done chan struct{}
refs atomic.Int32
isClosed bool
}

func NewAccessorCache(name string, cacheSize int) (*AccessorCache, error) {
bc := &AccessorCache{
name: name,
stripedLocks: [256]*sync.RWMutex{},
}

for i := range bc.stripedLocks {
bc.stripedLocks[i] = &sync.RWMutex{}
}
// Instantiate the Accessor Cache.
bslru, err := lru.NewWithEvict[uint64, *accessor](cacheSize, bc.evictFn())
if err != nil {
return nil, fmt.Errorf("creating accessor cache %s: %w", name, err)
}
bc.cache = bslru
return bc, nil
}

// evictFn will be invoked when an item is evicted from the cache.
func (bc *AccessorCache) evictFn() func(uint64, *accessor) {
return func(_ uint64, ac *accessor) {
// we don't want to block cache on close and can release accessor from cache early, while it is
// being closed in parallel routine
go func() {
err := ac.close()
if err != nil {
bc.metrics.observeEvicted(true)
log.Errorf("couldn't close accessor after cache eviction: %s", err)
return
}
bc.metrics.observeEvicted(false)
}()
}
}

// Get retrieves the accessor for a given uint64 from the Cache. If the Accessor is not in
// the Cache, it returns an ErrCacheMiss.
func (bc *AccessorCache) Get(height uint64) (eds.AccessorStreamer, error) {
lk := bc.getLock(height)
lk.RLock()
defer lk.RUnlock()

ac, ok := bc.cache.Get(height)
if !ok {
bc.metrics.observeGet(false)
return nil, ErrCacheMiss
}

bc.metrics.observeGet(true)
return newRefCloser(ac)
}

// GetOrLoad attempts to get an item from the cache, and if not found, invokes
// the provided loader function to load it.
func (bc *AccessorCache) GetOrLoad(
ctx context.Context,
height uint64,
loader OpenAccessorFn,
) (eds.AccessorStreamer, error) {
lk := bc.getLock(height)
lk.Lock()
defer lk.Unlock()

ac, ok := bc.cache.Get(height)
if ok {
// return accessor, only if it is not closed yet
accessorWithRef, err := newRefCloser(ac)
if err == nil {
bc.metrics.observeGet(true)
return accessorWithRef, nil
}
}

// accessor not found in cache or closed, so load new one using loader
f, err := loader(ctx)
if err != nil {
return nil, fmt.Errorf("unable to load accessor: %w", err)
}

ac = &accessor{AccessorStreamer: f}
// Create a new accessor first to increment the reference count in it, so it cannot get evicted
// from the inner lru cache before it is used.
rc, err := newRefCloser(ac)
if err != nil {
return nil, err
}
bc.cache.Add(height, ac)
return rc, nil
}

// Remove removes the Accessor for a given uint64 from the cache.
func (bc *AccessorCache) Remove(height uint64) error {
lk := bc.getLock(height)
lk.RLock()
ac, ok := bc.cache.Get(height)
lk.RUnlock()
if !ok {
// item is not in cache
return nil
}
if err := ac.close(); err != nil {
return err
}
// The cache will call evictFn on removal, where accessor close will be called.
bc.cache.Remove(height)
return nil
}

// EnableMetrics enables metrics for the cache.
func (bc *AccessorCache) EnableMetrics() error {
var err error
bc.metrics, err = newMetrics(bc)
return err
}

func (s *accessor) addRef() error {
s.lock.Lock()
defer s.lock.Unlock()
if s.isClosed {
// item is already closed and soon will be removed after all refs are released
return ErrCacheMiss
}
if s.refs.Add(1) == 1 {
// there were no refs previously and done channel was closed, reopen it by recreating
s.done = make(chan struct{})
}
return nil
}

func (s *accessor) removeRef() {
s.lock.Lock()
defer s.lock.Unlock()
if s.refs.Add(-1) <= 0 {
close(s.done)
}
}

// close closes the accessor and removes it from the cache if it is not closed yet. It will block
// until all references are released or timeout is reached.
func (s *accessor) close() error {
s.lock.Lock()
if s.isClosed {
s.lock.Unlock()
// accessor will be closed by another goroutine
return nil
}
s.isClosed = true
done := s.done
s.lock.Unlock()

select {
case <-done:
case <-time.After(defaultCloseTimeout):
return fmt.Errorf("closing accessor, some readers didn't close the accessor within timeout,"+
" amount left: %v", s.refs.Load())
}
if err := s.AccessorStreamer.Close(); err != nil {
return fmt.Errorf("closing accessor: %w", err)
}
return nil
}

// refCloser exists for reference counting protection on accessor. It ensures that a caller can't
// decrement it more than once.
type refCloser struct {
*accessor
closeFn func()
}

// newRefCloser creates new refCloser
func newRefCloser(abs *accessor) (*refCloser, error) {
if err := abs.addRef(); err != nil {
return nil, err
}

var closeOnce sync.Once
return &refCloser{
accessor: abs,
closeFn: func() {
closeOnce.Do(abs.removeRef)
},
}, nil
}

func (c *refCloser) Close() error {
c.closeFn()
return nil
}

func (bc *AccessorCache) getLock(k uint64) *sync.RWMutex {
return bc.stripedLocks[byte(k%256)]
}
Loading
Loading