From 086908e5352ef51e065f09537410cb0dad5b7634 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 10 May 2022 18:36:45 -0400 Subject: [PATCH] stores: Cache disk usage results for a few seconds --- extern/sector-storage/stores/local.go | 49 +------ .../stores/localstorage_cached.go | 131 ++++++++++++++++++ extern/sector-storage/stores/remote.go | 6 + 3 files changed, 138 insertions(+), 48 deletions(-) create mode 100644 extern/sector-storage/stores/localstorage_cached.go diff --git a/extern/sector-storage/stores/local.go b/extern/sector-storage/stores/local.go index e33b662d5d2..4a8c4e1b3a8 100644 --- a/extern/sector-storage/stores/local.go +++ b/extern/sector-storage/stores/local.go @@ -85,56 +85,9 @@ type path struct { reserved int64 reservations map[abi.SectorID]storiface.SectorFileType - - statLk sync.Mutex - statDone chan struct{} - - lastStat *fsutil.FsStat // nil if no stat / was error } func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) { - for { - p.statLk.Lock() - if p.statDone == nil { - p.statDone = make(chan struct{}) - p.statLk.Unlock() - - st, err := p.doStat(ls) - - p.statLk.Lock() - p.lastStat = nil - if err == nil { - p.lastStat = &st - } - close(p.statDone) - p.statDone = nil - p.statLk.Unlock() - return st, err - } - - doneCh := p.statDone - p.statLk.Unlock() - - select { - case <-doneCh: - // todo context? - } - - p.statLk.Lock() - if p.lastStat == nil { - p.statLk.Unlock() - continue - } - - st := *p.lastStat - - p.statLk.Unlock() - - return st, nil - } -} - -func (p *path) doStat(ls LocalStorage) (fsutil.FsStat, error) { start := time.Now() stat, err := ls.Stat(p.local) @@ -217,7 +170,7 @@ type URLs []string func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) { l := &Local{ - localStorage: ls, + localStorage: newCachedLocalStorage(ls), index: index, urls: urls, diff --git a/extern/sector-storage/stores/localstorage_cached.go b/extern/sector-storage/stores/localstorage_cached.go new file mode 100644 index 00000000000..9ada06cbfd7 --- /dev/null +++ b/extern/sector-storage/stores/localstorage_cached.go @@ -0,0 +1,131 @@ +package stores + +import ( + "sync" + "time" + + lru "github.com/hashicorp/golang-lru" + + "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" +) + +var StatTimeout = 5 * time.Second +var MaxDiskUsageDuration = time.Second + +type cachedLocalStorage struct { + base LocalStorage + + statLk sync.Mutex + stats *lru.Cache // path -> statEntry + pathDUs *lru.Cache // path -> *diskUsageEntry +} + +func newCachedLocalStorage(ls LocalStorage) *cachedLocalStorage { + statCache, _ := lru.New(1024) + duCache, _ := lru.New(1024) + + return &cachedLocalStorage{ + base: ls, + stats: statCache, + pathDUs: duCache, + } +} + +type statEntry struct { + stat fsutil.FsStat + time time.Time +} + +type diskUsageEntry struct { + last diskUsageResult + + usagePromise <-chan diskUsageResult +} + +type diskUsageResult struct { + usage int64 + time time.Time +} + +func (c *cachedLocalStorage) GetStorage() (StorageConfig, error) { + return c.base.GetStorage() +} + +func (c *cachedLocalStorage) SetStorage(f func(*StorageConfig)) error { + return c.base.SetStorage(f) +} + +func (c *cachedLocalStorage) Stat(path string) (fsutil.FsStat, error) { + c.statLk.Lock() + defer c.statLk.Unlock() + + if v, ok := c.stats.Get(path); ok && time.Now().Sub(v.(statEntry).time) < StatTimeout { + return v.(statEntry).stat, nil + } + + // if we don't, get the stat + st, err := c.base.Stat(path) + if err == nil { + c.stats.Add(path, statEntry{ + stat: st, + time: time.Now(), + }) + } + + return st, err +} + +func (c *cachedLocalStorage) DiskUsage(path string) (int64, error) { + c.statLk.Lock() + defer c.statLk.Unlock() + + var entry *diskUsageEntry + + if v, ok := c.pathDUs.Get(path); ok { + entry = v.(*diskUsageEntry) + + // if we have recent cached entry, use that + if time.Now().Sub(entry.last.time) < StatTimeout { + return entry.last.usage, nil + } + } else { + entry = new(diskUsageEntry) + c.pathDUs.Add(path, entry) + } + + // start a new disk usage request; this can take a while so start a + // goroutine, and if it doesn't return quickly, return either the + // previous value, or zero - that's better than potentially blocking + // here for a long time. + if entry.usagePromise == nil { + resCh := make(chan diskUsageResult, 1) + go func() { + du, err := c.base.DiskUsage(path) + if err != nil { + log.Errorw("error getting disk usage", "path", path, "error", err) + } + resCh <- diskUsageResult{ + usage: du, + time: time.Now(), + } + }() + entry.usagePromise = resCh + } + + // wait for the disk usage result; if it doesn't come, fallback on + // previous usage + select { + case du := <-entry.usagePromise: + entry.usagePromise = nil + entry.last = du + case <-time.After(MaxDiskUsageDuration): + log.Warnw("getting usage is slow, falling back to previous usage", + "path", path, + "fallback", entry.last.usage, + "age", time.Now().Sub(entry.last.time)) + } + + return entry.last.usage, nil +} + +var _ LocalStorage = &cachedLocalStorage{} diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index 62c780d09b9..0a9868c8f59 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -95,6 +95,8 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector") } + // First make sure that no other goroutines are trying to fetch this sector; + // wait if there are any. for { r.fetchLk.Lock() @@ -122,6 +124,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin r.fetchLk.Unlock() }() + // Try to get the sector from local storage paths, stores, err := r.local.AcquireSector(ctx, s, existing, allocate, pathType, op) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("local acquire error: %w", err) @@ -148,6 +151,9 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin odt = storiface.FsOverheadFinalized } + // If any path types weren't found in local storage, try fetching them + + // First reserve storage releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, odt) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err)