Skip to content

Commit

Permalink
Do not return error when storing existing value. (#68)
Browse files Browse the repository at this point in the history
The blockstore uses the immutable option, so no longer requires that an error is returned when storing an existing value. This allows an existing value to be stored without having to handle an error when doing so.

- Move some things outside of bucket lock
- Change GCScanFree option to GCFastScan
  • Loading branch information
gammazero authored Aug 29, 2022
1 parent 0076714 commit b5a6bac
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 86 deletions.
65 changes: 21 additions & 44 deletions store/index/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var log = logging.Logger("storethehash/index")
// garbageCollector is a goroutine that runs periodically to search for and
// remove stale index files. It runs every gcInterval, if there have been any
// index updates.
func (index *Index) garbageCollector(interval, timeLimit time.Duration, scanUnused bool) {
func (index *Index) garbageCollector(interval, timeLimit time.Duration, fastScan bool) {
defer close(index.gcDone)

var gcDone chan struct{}
Expand Down Expand Up @@ -60,7 +60,7 @@ func (index *Index) garbageCollector(interval, timeLimit time.Duration, scanUnus
}

log.Infow("GC started")
fileCount, err := index.gc(gcCtx, scanUnused)
fileCount, err := index.gc(gcCtx, fastScan)
if err != nil {
switch err {
case context.DeadlineExceeded:
Expand Down Expand Up @@ -88,12 +88,12 @@ func (index *Index) garbageCollector(interval, timeLimit time.Duration, scanUnus

// gc searches for and removes stale index files. Returns the number of unused
// index files that were removed.
func (index *Index) gc(ctx context.Context, scanFree bool) (int, error) {
func (index *Index) gc(ctx context.Context, fastScan bool) (int, error) {
var count int
var err error

if scanFree {
count, err = index.freeFreeFiles(ctx)
if fastScan {
count, err = index.truncateUnusedFiles(ctx)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -135,14 +135,20 @@ func (index *Index) gc(ctx context.Context, scanFree bool) (int, error) {
return count, nil
}

func (index *Index) freeFreeFiles(ctx context.Context) (int, error) {
busySet := make(map[uint32]struct{})
maxFileSize := index.maxFileSize
func (index *Index) truncateUnusedFiles(ctx context.Context) (int, error) {
header, err := readHeader(index.headerPath)
if err != nil {
return 0, fmt.Errorf("cannot read index header: %w", err)
}
index.flushLock.Lock()
lastFileNum := index.fileNum
index.flushLock.Unlock()

var i int
busySet := make(map[uint32]struct{}, lastFileNum-header.FirstFile)
maxFileSize := index.maxFileSize
end := 1 << index.sizeBits
tmpBuckets := make([]types.Position, 4096)
for i < end {
for i := 0; i < end; {
index.bucketLk.RLock()
i += copy(tmpBuckets, index.buckets[i:])
index.bucketLk.RUnlock()
Expand All @@ -154,45 +160,20 @@ func (index *Index) freeFreeFiles(ctx context.Context) (int, error) {
}
}

if ctx.Err() != nil {
return 0, ctx.Err()
}

header, err := readHeader(index.headerPath)
if err != nil {
return 0, fmt.Errorf("cannot read index header: %w", err)
}

var rmCount int
basePath := index.basePath

index.flushLock.Lock()
lastFileNum := index.fileNum
index.flushLock.Unlock()

for fileNum := header.FirstFile; fileNum < lastFileNum; fileNum++ {
if _, busy := busySet[fileNum]; busy {
continue
}
indexPath := indexFileName(basePath, fileNum)

if fileNum == header.FirstFile {
header.FirstFile++
err = writeHeader(index.headerPath, header)
if err != nil {
return 0, fmt.Errorf("cannot write index header: %w", err)
}

err = os.Remove(indexPath)
if err != nil {
log.Errorw("Error removing index file", "err", err, "file", indexPath)
continue
}
log.Infow("Removed unused index file", "file", indexPath)
rmCount++
continue
if ctx.Err() != nil {
return 0, ctx.Err()
}

indexPath := indexFileName(basePath, fileNum)

fi, err := os.Stat(indexPath)
if err != nil {
log.Errorw("Cannot stat index file", "err", err, "file", indexPath)
Expand All @@ -208,13 +189,9 @@ func (index *Index) freeFreeFiles(ctx context.Context) (int, error) {
continue
}
log.Infow("Emptied unused index file", "file", indexPath)

if ctx.Err() != nil {
break
}
}

return rmCount, ctx.Err()
return rmCount, nil
}

// gcIndexFile scans a single index file, checking if any of the entries are in
Expand Down
56 changes: 29 additions & 27 deletions store/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type bucketPool map[BucketIndex][]byte
//
// Specifying 0 for indexSizeBits and maxFileSize results in using their
// default values. A gcInterval of 0 disables garbage collection.
func Open(ctx context.Context, path string, primary primary.PrimaryStorage, indexSizeBits uint8, maxFileSize uint32, gcInterval, gcTimeLimit time.Duration, gcScanUnused bool) (*Index, error) {
func Open(ctx context.Context, path string, primary primary.PrimaryStorage, indexSizeBits uint8, maxFileSize uint32, gcInterval, gcTimeLimit time.Duration, gcFastScan bool) (*Index, error) {
var file *os.File
headerPath := filepath.Clean(path) + ".info"

Expand Down Expand Up @@ -243,7 +243,7 @@ func Open(ctx context.Context, path string, primary primary.PrimaryStorage, inde
} else {
idx.updateSig = make(chan struct{}, 1)
idx.gcDone = make(chan struct{})
go idx.garbageCollector(gcInterval, gcTimeLimit, gcScanUnused)
go idx.garbageCollector(gcInterval, gcTimeLimit, gcFastScan)
}

return idx, nil
Expand Down Expand Up @@ -401,17 +401,19 @@ func (idx *Index) Put(key []byte, location types.Block) error {
if err != nil {
return err
}

// The key does not need the prefix that was used to find the right
// bucket. For simplicity only full bytes are trimmed off.
indexKey := stripBucketPrefix(key, idx.sizeBits)

idx.bucketLk.Lock()
defer idx.bucketLk.Unlock()

records, err := idx.getRecordsFromBucket(bucket)
if err != nil {
return err
}

// The key does not need the prefix that was used to find the right
// bucket. For simplicity only full bytes are trimmed off.
indexKey := stripBucketPrefix(key, idx.sizeBits)

// No records stored in that bucket yet
var newData []byte
if records == nil {
Expand Down Expand Up @@ -554,33 +556,33 @@ func (idx *Index) Update(key []byte, location types.Block) error {
if err != nil {
return err
}

// The key does not need the prefix that was used to find its bucket. For
// simplicity only full bytes are trimmed off.
indexKey := stripBucketPrefix(key, idx.sizeBits)

idx.bucketLk.Lock()
defer idx.bucketLk.Unlock()
records, err := idx.getRecordsFromBucket(bucket)
if err != nil {
return err
}

// The key does not need the prefix that was used to find its bucket. For
// simplicity only full bytes are trimmed off.
indexKey := stripBucketPrefix(key, idx.sizeBits)

var newData []byte
// If no records are stored in that bucket yet, it means there is no key to
// be updated.
if records == nil {
return fmt.Errorf("no records found in index, unable to update key")
} else {
// Read the record list to find the key and position.
r := records.GetRecord(indexKey)
if r == nil {
return fmt.Errorf("key to update not found in index")
}
// We want to overwrite the key so no need to do anything else.
// Update key in position.
newData = records.PutKeys([]KeyPositionPair{{r.Key, location}}, r.Pos, r.NextPos())
}

// Read the record list to find the key and position.
r := records.GetRecord(indexKey)
if r == nil {
return fmt.Errorf("key to update not found in index")
}
// Update key in position.
newData = records.PutKeys([]KeyPositionPair{{r.Key, location}}, r.Pos, r.NextPos())

idx.outstandingWork += types.Work(len(newData) + BucketPrefixSize + sizePrefixSize)
idx.nextPool[bucket] = newData
return nil
Expand All @@ -593,18 +595,19 @@ func (idx *Index) Remove(key []byte) (bool, error) {
if err != nil {
return false, err
}

// The key does not need the prefix that was used to find its bucket. For
// simplicity only full bytes are trimmed off.
indexKey := stripBucketPrefix(key, idx.sizeBits)

idx.bucketLk.Lock()
defer idx.bucketLk.Unlock()

records, err := idx.getRecordsFromBucket(bucket)
if err != nil {
return false, err
}

// The key does not need the prefix that was used to find its bucket. For
// simplicity only full bytes are trimmed off.
indexKey := stripBucketPrefix(key, idx.sizeBits)

var newData []byte
// If no records are stored in that bucket yet, it means there is no key to
// be removed.
if records == nil {
Expand All @@ -620,7 +623,7 @@ func (idx *Index) Remove(key []byte) (bool, error) {
}

// Remove key from record.
newData = records.PutKeys([]KeyPositionPair{}, r.Pos, r.NextPos())
newData := records.PutKeys([]KeyPositionPair{}, r.Pos, r.NextPos())
// NOTE: We are removing the key without changing any keys. If we want
// to optimize for storage we need to check the keys with the same prefix
// and see if any of them can be shortened. This process will be similar
Expand Down Expand Up @@ -842,8 +845,7 @@ func (idx *Index) Flush() (types.Work, error) {
idx.bucketLk.Lock()
defer idx.bucketLk.Unlock()
for _, blk := range blks {
bucket := blk.bucket
if err = idx.buckets.Put(bucket, blk.blk.Offset); err != nil {
if err = idx.buckets.Put(blk.bucket, blk.blk.Offset); err != nil {
return 0, fmt.Errorf("error commiting bucket: %w", err)
}
}
Expand Down
10 changes: 5 additions & 5 deletions store/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const (
defaultSyncInterval = time.Second
defaultGCInterval = 30 * time.Minute
defaultGCTimeLimit = 5 * time.Minute
defaultGCScanFree = true
defaultGCFastScan = true
)

type config struct {
Expand All @@ -23,7 +23,7 @@ type config struct {
burstRate types.Work
gcInterval time.Duration
gcTimeLimit time.Duration
gcScanFree bool
gcFastScan bool
}

type Option func(*config)
Expand Down Expand Up @@ -79,10 +79,10 @@ func GCTimeLimit(gcTimeLimit time.Duration) Option {
}
}

// GCFreeScan enables a fast scan of files to find any that are not reverenced
// GCFastScan enables a fast scan of files to find any that are not referenced
// by any index buckets.
func GCScanFree(scanFree bool) Option {
func GCFastScan(fastScan bool) Option {
return func(c *config) {
c.gcScanFree = scanFree
c.gcFastScan = fastScan
}
}
13 changes: 5 additions & 8 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ func OpenStore(ctx context.Context, path string, primary primary.PrimaryStorage,
burstRate: defaultBurstRate,
gcInterval: defaultGCInterval,
gcTimeLimit: defaultGCTimeLimit,
gcScanFree: defaultGCScanFree,
gcFastScan: defaultGCFastScan,
}
c.apply(options)

index, err := index.Open(ctx, path, primary, c.indexSizeBits, c.indexFileSize, c.gcInterval, c.gcTimeLimit, c.gcScanFree)
index, err := index.Open(ctx, path, primary, c.indexSizeBits, c.indexFileSize, c.gcInterval, c.gcTimeLimit, c.gcFastScan)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -232,12 +232,9 @@ func (s *Store) Put(key []byte, value []byte) error {
}
if bytes.Equal(value, storedVal) {
// Trying to put the same value in an existing key, so ok to
// directly return.
//
// NOTE: How many times is this going to happen. Can this step be
// removed? This is still needed for the blockstore and that is
// ErrKeyExists is returned..
return types.ErrKeyExists
// directly return. This is not needed for the blockstore, since it
// sets s.immutable = true.
return nil
}
}

Expand Down
2 changes: 1 addition & 1 deletion store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestUpdate(t *testing.T) {

t.Logf("Overwrite same key with same value")
err = s.Put(blks[0].Cid().Bytes(), blks[1].RawData())
require.Error(t, err, types.ErrKeyExists.Error())
require.NoError(t, err) // immutable would return error
value, found, err = s.Get(blks[0].Cid().Bytes())
require.NoError(t, err)
require.True(t, found)
Expand Down
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "v0.2.4"
"version": "v0.2.5"
}

0 comments on commit b5a6bac

Please sign in to comment.