Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not return error when storing existing value. #68

Merged
merged 1 commit into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 21 additions & 44 deletions store/index/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var log = logging.Logger("storethehash/index")
// garbageCollector is a goroutine that runs periodically to search for and
// remove stale index files. It runs every gcInterval, if there have been any
// index updates.
func (index *Index) garbageCollector(interval, timeLimit time.Duration, scanUnused bool) {
func (index *Index) garbageCollector(interval, timeLimit time.Duration, fastScan bool) {
defer close(index.gcDone)

var gcDone chan struct{}
Expand Down Expand Up @@ -60,7 +60,7 @@ func (index *Index) garbageCollector(interval, timeLimit time.Duration, scanUnus
}

log.Infow("GC started")
fileCount, err := index.gc(gcCtx, scanUnused)
fileCount, err := index.gc(gcCtx, fastScan)
if err != nil {
switch err {
case context.DeadlineExceeded:
Expand Down Expand Up @@ -88,12 +88,12 @@ func (index *Index) garbageCollector(interval, timeLimit time.Duration, scanUnus

// gc searches for and removes stale index files. Returns the number of unused
// index files that were removed.
func (index *Index) gc(ctx context.Context, scanFree bool) (int, error) {
func (index *Index) gc(ctx context.Context, fastScan bool) (int, error) {
var count int
var err error

if scanFree {
count, err = index.freeFreeFiles(ctx)
if fastScan {
count, err = index.truncateUnusedFiles(ctx)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -135,14 +135,20 @@ func (index *Index) gc(ctx context.Context, scanFree bool) (int, error) {
return count, nil
}

func (index *Index) freeFreeFiles(ctx context.Context) (int, error) {
busySet := make(map[uint32]struct{})
maxFileSize := index.maxFileSize
func (index *Index) truncateUnusedFiles(ctx context.Context) (int, error) {
header, err := readHeader(index.headerPath)
if err != nil {
return 0, fmt.Errorf("cannot read index header: %w", err)
}
index.flushLock.Lock()
lastFileNum := index.fileNum
index.flushLock.Unlock()

var i int
busySet := make(map[uint32]struct{}, lastFileNum-header.FirstFile)
maxFileSize := index.maxFileSize
end := 1 << index.sizeBits
tmpBuckets := make([]types.Position, 4096)
for i < end {
for i := 0; i < end; {
index.bucketLk.RLock()
i += copy(tmpBuckets, index.buckets[i:])
index.bucketLk.RUnlock()
Expand All @@ -154,45 +160,20 @@ func (index *Index) freeFreeFiles(ctx context.Context) (int, error) {
}
}

if ctx.Err() != nil {
return 0, ctx.Err()
}

header, err := readHeader(index.headerPath)
if err != nil {
return 0, fmt.Errorf("cannot read index header: %w", err)
}

var rmCount int
basePath := index.basePath

index.flushLock.Lock()
lastFileNum := index.fileNum
index.flushLock.Unlock()

for fileNum := header.FirstFile; fileNum < lastFileNum; fileNum++ {
if _, busy := busySet[fileNum]; busy {
continue
}
indexPath := indexFileName(basePath, fileNum)

if fileNum == header.FirstFile {
header.FirstFile++
err = writeHeader(index.headerPath, header)
if err != nil {
return 0, fmt.Errorf("cannot write index header: %w", err)
}

err = os.Remove(indexPath)
if err != nil {
log.Errorw("Error removing index file", "err", err, "file", indexPath)
continue
}
log.Infow("Removed unused index file", "file", indexPath)
rmCount++
continue
if ctx.Err() != nil {
return 0, ctx.Err()
}

indexPath := indexFileName(basePath, fileNum)

fi, err := os.Stat(indexPath)
if err != nil {
log.Errorw("Cannot stat index file", "err", err, "file", indexPath)
Expand All @@ -208,13 +189,9 @@ func (index *Index) freeFreeFiles(ctx context.Context) (int, error) {
continue
}
log.Infow("Emptied unused index file", "file", indexPath)

if ctx.Err() != nil {
break
}
}

return rmCount, ctx.Err()
return rmCount, nil
}

// gcIndexFile scans a single index file, checking if any of the entries are in
Expand Down
56 changes: 29 additions & 27 deletions store/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type bucketPool map[BucketIndex][]byte
//
// Specifying 0 for indexSizeBits and maxFileSize results in using their
// default values. A gcInterval of 0 disables garbage collection.
func Open(ctx context.Context, path string, primary primary.PrimaryStorage, indexSizeBits uint8, maxFileSize uint32, gcInterval, gcTimeLimit time.Duration, gcScanUnused bool) (*Index, error) {
func Open(ctx context.Context, path string, primary primary.PrimaryStorage, indexSizeBits uint8, maxFileSize uint32, gcInterval, gcTimeLimit time.Duration, gcFastScan bool) (*Index, error) {
var file *os.File
headerPath := filepath.Clean(path) + ".info"

Expand Down Expand Up @@ -243,7 +243,7 @@ func Open(ctx context.Context, path string, primary primary.PrimaryStorage, inde
} else {
idx.updateSig = make(chan struct{}, 1)
idx.gcDone = make(chan struct{})
go idx.garbageCollector(gcInterval, gcTimeLimit, gcScanUnused)
go idx.garbageCollector(gcInterval, gcTimeLimit, gcFastScan)
}

return idx, nil
Expand Down Expand Up @@ -401,17 +401,19 @@ func (idx *Index) Put(key []byte, location types.Block) error {
if err != nil {
return err
}

// The key does not need the prefix that was used to find the right
// bucket. For simplicity only full bytes are trimmed off.
indexKey := stripBucketPrefix(key, idx.sizeBits)

idx.bucketLk.Lock()
defer idx.bucketLk.Unlock()

records, err := idx.getRecordsFromBucket(bucket)
if err != nil {
return err
}

// The key does not need the prefix that was used to find the right
// bucket. For simplicity only full bytes are trimmed off.
indexKey := stripBucketPrefix(key, idx.sizeBits)

// No records stored in that bucket yet
var newData []byte
if records == nil {
Expand Down Expand Up @@ -554,33 +556,33 @@ func (idx *Index) Update(key []byte, location types.Block) error {
if err != nil {
return err
}

// The key does not need the prefix that was used to find its bucket. For
// simplicity only full bytes are trimmed off.
indexKey := stripBucketPrefix(key, idx.sizeBits)

idx.bucketLk.Lock()
defer idx.bucketLk.Unlock()
records, err := idx.getRecordsFromBucket(bucket)
if err != nil {
return err
}

// The key does not need the prefix that was used to find its bucket. For
// simplicity only full bytes are trimmed off.
indexKey := stripBucketPrefix(key, idx.sizeBits)

var newData []byte
// If no records are stored in that bucket yet, it means there is no key to
// be updated.
if records == nil {
return fmt.Errorf("no records found in index, unable to update key")
} else {
// Read the record list to find the key and position.
r := records.GetRecord(indexKey)
if r == nil {
return fmt.Errorf("key to update not found in index")
}
// We want to overwrite the key so no need to do anything else.
// Update key in position.
newData = records.PutKeys([]KeyPositionPair{{r.Key, location}}, r.Pos, r.NextPos())
}

// Read the record list to find the key and position.
r := records.GetRecord(indexKey)
if r == nil {
return fmt.Errorf("key to update not found in index")
}
// Update key in position.
newData = records.PutKeys([]KeyPositionPair{{r.Key, location}}, r.Pos, r.NextPos())

idx.outstandingWork += types.Work(len(newData) + BucketPrefixSize + sizePrefixSize)
idx.nextPool[bucket] = newData
return nil
Expand All @@ -593,18 +595,19 @@ func (idx *Index) Remove(key []byte) (bool, error) {
if err != nil {
return false, err
}

// The key does not need the prefix that was used to find its bucket. For
// simplicity only full bytes are trimmed off.
indexKey := stripBucketPrefix(key, idx.sizeBits)

idx.bucketLk.Lock()
defer idx.bucketLk.Unlock()

records, err := idx.getRecordsFromBucket(bucket)
if err != nil {
return false, err
}

// The key does not need the prefix that was used to find its bucket. For
// simplicity only full bytes are trimmed off.
indexKey := stripBucketPrefix(key, idx.sizeBits)

var newData []byte
// If no records are stored in that bucket yet, it means there is no key to
// be removed.
if records == nil {
Expand All @@ -620,7 +623,7 @@ func (idx *Index) Remove(key []byte) (bool, error) {
}

// Remove key from record.
newData = records.PutKeys([]KeyPositionPair{}, r.Pos, r.NextPos())
newData := records.PutKeys([]KeyPositionPair{}, r.Pos, r.NextPos())
// NOTE: We are removing the key without changing any keys. If we want
// to optimize for storage we need to check the keys with the same prefix
// and see if any of them can be shortened. This process will be similar
Expand Down Expand Up @@ -842,8 +845,7 @@ func (idx *Index) Flush() (types.Work, error) {
idx.bucketLk.Lock()
defer idx.bucketLk.Unlock()
for _, blk := range blks {
bucket := blk.bucket
if err = idx.buckets.Put(bucket, blk.blk.Offset); err != nil {
if err = idx.buckets.Put(blk.bucket, blk.blk.Offset); err != nil {
return 0, fmt.Errorf("error commiting bucket: %w", err)
}
}
Expand Down
10 changes: 5 additions & 5 deletions store/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const (
defaultSyncInterval = time.Second
defaultGCInterval = 30 * time.Minute
defaultGCTimeLimit = 5 * time.Minute
defaultGCScanFree = true
defaultGCFastScan = true
)

type config struct {
Expand All @@ -23,7 +23,7 @@ type config struct {
burstRate types.Work
gcInterval time.Duration
gcTimeLimit time.Duration
gcScanFree bool
gcFastScan bool
}

type Option func(*config)
Expand Down Expand Up @@ -79,10 +79,10 @@ func GCTimeLimit(gcTimeLimit time.Duration) Option {
}
}

// GCFreeScan enables a fast scan of files to find any that are not reverenced
// GCFastScan enables a fast scan of files to find any that are not referenced
// by any index buckets.
func GCScanFree(scanFree bool) Option {
func GCFastScan(fastScan bool) Option {
return func(c *config) {
c.gcScanFree = scanFree
c.gcFastScan = fastScan
}
}
13 changes: 5 additions & 8 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ func OpenStore(ctx context.Context, path string, primary primary.PrimaryStorage,
burstRate: defaultBurstRate,
gcInterval: defaultGCInterval,
gcTimeLimit: defaultGCTimeLimit,
gcScanFree: defaultGCScanFree,
gcFastScan: defaultGCFastScan,
}
c.apply(options)

index, err := index.Open(ctx, path, primary, c.indexSizeBits, c.indexFileSize, c.gcInterval, c.gcTimeLimit, c.gcScanFree)
index, err := index.Open(ctx, path, primary, c.indexSizeBits, c.indexFileSize, c.gcInterval, c.gcTimeLimit, c.gcFastScan)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -232,12 +232,9 @@ func (s *Store) Put(key []byte, value []byte) error {
}
if bytes.Equal(value, storedVal) {
// Trying to put the same value in an existing key, so ok to
// directly return.
//
// NOTE: How many times is this going to happen. Can this step be
// removed? This is still needed for the blockstore and that is
// ErrKeyExists is returned..
return types.ErrKeyExists
// directly return. This is not needed for the blockstore, since it
// sets s.immutable = true.
return nil
}
}

Expand Down
2 changes: 1 addition & 1 deletion store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestUpdate(t *testing.T) {

t.Logf("Overwrite same key with same value")
err = s.Put(blks[0].Cid().Bytes(), blks[1].RawData())
require.Error(t, err, types.ErrKeyExists.Error())
require.NoError(t, err) // immutable would return error
value, found, err = s.Get(blks[0].Cid().Bytes())
require.NoError(t, err)
require.True(t, found)
Expand Down
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "v0.2.4"
"version": "v0.2.5"
}