diff --git a/store/index/gc.go b/store/index/gc.go index c1b35f4..8a09591 100644 --- a/store/index/gc.go +++ b/store/index/gc.go @@ -18,7 +18,7 @@ var log = logging.Logger("storethehash/index") // garbageCollector is a goroutine that runs periodically to search for and // remove stale index files. It runs every gcInterval, if there have been any // index updates. -func (index *Index) garbageCollector(interval, timeLimit time.Duration, scanUnused bool) { +func (index *Index) garbageCollector(interval, timeLimit time.Duration, fastScan bool) { defer close(index.gcDone) var gcDone chan struct{} @@ -60,7 +60,7 @@ func (index *Index) garbageCollector(interval, timeLimit time.Duration, scanUnus } log.Infow("GC started") - fileCount, err := index.gc(gcCtx, scanUnused) + fileCount, err := index.gc(gcCtx, fastScan) if err != nil { switch err { case context.DeadlineExceeded: @@ -88,12 +88,12 @@ func (index *Index) garbageCollector(interval, timeLimit time.Duration, scanUnus // gc searches for and removes stale index files. Returns the number of unused // index files that were removed. -func (index *Index) gc(ctx context.Context, scanFree bool) (int, error) { +func (index *Index) gc(ctx context.Context, fastScan bool) (int, error) { var count int var err error - if scanFree { - count, err = index.freeFreeFiles(ctx) + if fastScan { + count, err = index.truncateUnusedFiles(ctx) if err != nil { return 0, err } @@ -135,14 +135,20 @@ func (index *Index) gc(ctx context.Context, scanFree bool) (int, error) { return count, nil } -func (index *Index) freeFreeFiles(ctx context.Context) (int, error) { - busySet := make(map[uint32]struct{}) - maxFileSize := index.maxFileSize +func (index *Index) truncateUnusedFiles(ctx context.Context) (int, error) { + header, err := readHeader(index.headerPath) + if err != nil { + return 0, fmt.Errorf("cannot read index header: %w", err) + } + index.flushLock.Lock() + lastFileNum := index.fileNum + index.flushLock.Unlock() - var i int + busySet := make(map[uint32]struct{}, lastFileNum-header.FirstFile) + maxFileSize := index.maxFileSize end := 1 << index.sizeBits tmpBuckets := make([]types.Position, 4096) - for i < end { + for i := 0; i < end; { index.bucketLk.RLock() i += copy(tmpBuckets, index.buckets[i:]) index.bucketLk.RUnlock() @@ -154,45 +160,20 @@ func (index *Index) freeFreeFiles(ctx context.Context) (int, error) { } } - if ctx.Err() != nil { - return 0, ctx.Err() - } - - header, err := readHeader(index.headerPath) - if err != nil { - return 0, fmt.Errorf("cannot read index header: %w", err) - } - var rmCount int basePath := index.basePath - index.flushLock.Lock() - lastFileNum := index.fileNum - index.flushLock.Unlock() - for fileNum := header.FirstFile; fileNum < lastFileNum; fileNum++ { if _, busy := busySet[fileNum]; busy { continue } - indexPath := indexFileName(basePath, fileNum) - if fileNum == header.FirstFile { - header.FirstFile++ - err = writeHeader(index.headerPath, header) - if err != nil { - return 0, fmt.Errorf("cannot write index header: %w", err) - } - - err = os.Remove(indexPath) - if err != nil { - log.Errorw("Error removing index file", "err", err, "file", indexPath) - continue - } - log.Infow("Removed unused index file", "file", indexPath) - rmCount++ - continue + if ctx.Err() != nil { + return 0, ctx.Err() } + indexPath := indexFileName(basePath, fileNum) + fi, err := os.Stat(indexPath) if err != nil { log.Errorw("Cannot stat index file", "err", err, "file", indexPath) @@ -208,13 +189,9 @@ func (index *Index) freeFreeFiles(ctx context.Context) (int, error) { continue } log.Infow("Emptied unused index file", "file", indexPath) - - if ctx.Err() != nil { - break - } } - return rmCount, ctx.Err() + return rmCount, nil } // gcIndexFile scans a single index file, checking if any of the entries are in diff --git a/store/index/index.go b/store/index/index.go index 465f916..fffd015 100644 --- a/store/index/index.go +++ b/store/index/index.go @@ -151,7 +151,7 @@ type bucketPool map[BucketIndex][]byte // // Specifying 0 for indexSizeBits and maxFileSize results in using their // default values. A gcInterval of 0 disables garbage collection. -func Open(ctx context.Context, path string, primary primary.PrimaryStorage, indexSizeBits uint8, maxFileSize uint32, gcInterval, gcTimeLimit time.Duration, gcScanUnused bool) (*Index, error) { +func Open(ctx context.Context, path string, primary primary.PrimaryStorage, indexSizeBits uint8, maxFileSize uint32, gcInterval, gcTimeLimit time.Duration, gcFastScan bool) (*Index, error) { var file *os.File headerPath := filepath.Clean(path) + ".info" @@ -243,7 +243,7 @@ func Open(ctx context.Context, path string, primary primary.PrimaryStorage, inde } else { idx.updateSig = make(chan struct{}, 1) idx.gcDone = make(chan struct{}) - go idx.garbageCollector(gcInterval, gcTimeLimit, gcScanUnused) + go idx.garbageCollector(gcInterval, gcTimeLimit, gcFastScan) } return idx, nil @@ -401,17 +401,19 @@ func (idx *Index) Put(key []byte, location types.Block) error { if err != nil { return err } + + // The key does not need the prefix that was used to find the right + // bucket. For simplicity only full bytes are trimmed off. + indexKey := stripBucketPrefix(key, idx.sizeBits) + idx.bucketLk.Lock() defer idx.bucketLk.Unlock() + records, err := idx.getRecordsFromBucket(bucket) if err != nil { return err } - // The key does not need the prefix that was used to find the right - // bucket. For simplicity only full bytes are trimmed off. - indexKey := stripBucketPrefix(key, idx.sizeBits) - // No records stored in that bucket yet var newData []byte if records == nil { @@ -554,6 +556,11 @@ func (idx *Index) Update(key []byte, location types.Block) error { if err != nil { return err } + + // The key does not need the prefix that was used to find its bucket. For + // simplicity only full bytes are trimmed off. + indexKey := stripBucketPrefix(key, idx.sizeBits) + idx.bucketLk.Lock() defer idx.bucketLk.Unlock() records, err := idx.getRecordsFromBucket(bucket) @@ -561,26 +568,21 @@ func (idx *Index) Update(key []byte, location types.Block) error { return err } - // The key does not need the prefix that was used to find its bucket. For - // simplicity only full bytes are trimmed off. - indexKey := stripBucketPrefix(key, idx.sizeBits) - var newData []byte // If no records are stored in that bucket yet, it means there is no key to // be updated. if records == nil { return fmt.Errorf("no records found in index, unable to update key") - } else { - // Read the record list to find the key and position. - r := records.GetRecord(indexKey) - if r == nil { - return fmt.Errorf("key to update not found in index") - } - // We want to overwrite the key so no need to do anything else. - // Update key in position. - newData = records.PutKeys([]KeyPositionPair{{r.Key, location}}, r.Pos, r.NextPos()) } + // Read the record list to find the key and position. + r := records.GetRecord(indexKey) + if r == nil { + return fmt.Errorf("key to update not found in index") + } + // Update key in position. + newData = records.PutKeys([]KeyPositionPair{{r.Key, location}}, r.Pos, r.NextPos()) + idx.outstandingWork += types.Work(len(newData) + BucketPrefixSize + sizePrefixSize) idx.nextPool[bucket] = newData return nil @@ -593,18 +595,19 @@ func (idx *Index) Remove(key []byte) (bool, error) { if err != nil { return false, err } + + // The key does not need the prefix that was used to find its bucket. For + // simplicity only full bytes are trimmed off. + indexKey := stripBucketPrefix(key, idx.sizeBits) + idx.bucketLk.Lock() defer idx.bucketLk.Unlock() + records, err := idx.getRecordsFromBucket(bucket) if err != nil { return false, err } - // The key does not need the prefix that was used to find its bucket. For - // simplicity only full bytes are trimmed off. - indexKey := stripBucketPrefix(key, idx.sizeBits) - - var newData []byte // If no records are stored in that bucket yet, it means there is no key to // be removed. if records == nil { @@ -620,7 +623,7 @@ func (idx *Index) Remove(key []byte) (bool, error) { } // Remove key from record. - newData = records.PutKeys([]KeyPositionPair{}, r.Pos, r.NextPos()) + newData := records.PutKeys([]KeyPositionPair{}, r.Pos, r.NextPos()) // NOTE: We are removing the key without changing any keys. If we want // to optimize for storage we need to check the keys with the same prefix // and see if any of them can be shortened. This process will be similar @@ -842,8 +845,7 @@ func (idx *Index) Flush() (types.Work, error) { idx.bucketLk.Lock() defer idx.bucketLk.Unlock() for _, blk := range blks { - bucket := blk.bucket - if err = idx.buckets.Put(bucket, blk.blk.Offset); err != nil { + if err = idx.buckets.Put(blk.bucket, blk.blk.Offset); err != nil { return 0, fmt.Errorf("error commiting bucket: %w", err) } } diff --git a/store/option.go b/store/option.go index 913bd84..e7b46a0 100644 --- a/store/option.go +++ b/store/option.go @@ -13,7 +13,7 @@ const ( defaultSyncInterval = time.Second defaultGCInterval = 30 * time.Minute defaultGCTimeLimit = 5 * time.Minute - defaultGCScanFree = true + defaultGCFastScan = true ) type config struct { @@ -23,7 +23,7 @@ type config struct { burstRate types.Work gcInterval time.Duration gcTimeLimit time.Duration - gcScanFree bool + gcFastScan bool } type Option func(*config) @@ -79,10 +79,10 @@ func GCTimeLimit(gcTimeLimit time.Duration) Option { } } -// GCFreeScan enables a fast scan of files to find any that are not reverenced +// GCFastScan enables a fast scan of files to find any that are not referenced // by any index buckets. -func GCScanFree(scanFree bool) Option { +func GCFastScan(fastScan bool) Option { return func(c *config) { - c.gcScanFree = scanFree + c.gcFastScan = fastScan } } diff --git a/store/store.go b/store/store.go index e210b05..d93d557 100644 --- a/store/store.go +++ b/store/store.go @@ -51,11 +51,11 @@ func OpenStore(ctx context.Context, path string, primary primary.PrimaryStorage, burstRate: defaultBurstRate, gcInterval: defaultGCInterval, gcTimeLimit: defaultGCTimeLimit, - gcScanFree: defaultGCScanFree, + gcFastScan: defaultGCFastScan, } c.apply(options) - index, err := index.Open(ctx, path, primary, c.indexSizeBits, c.indexFileSize, c.gcInterval, c.gcTimeLimit, c.gcScanFree) + index, err := index.Open(ctx, path, primary, c.indexSizeBits, c.indexFileSize, c.gcInterval, c.gcTimeLimit, c.gcFastScan) if err != nil { return nil, err } @@ -232,12 +232,9 @@ func (s *Store) Put(key []byte, value []byte) error { } if bytes.Equal(value, storedVal) { // Trying to put the same value in an existing key, so ok to - // directly return. - // - // NOTE: How many times is this going to happen. Can this step be - // removed? This is still needed for the blockstore and that is - // ErrKeyExists is returned.. - return types.ErrKeyExists + // directly return. This is not needed for the blockstore, since it + // sets s.immutable = true. + return nil } } diff --git a/store/store_test.go b/store/store_test.go index 3e4107d..d9ae249 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -59,7 +59,7 @@ func TestUpdate(t *testing.T) { t.Logf("Overwrite same key with same value") err = s.Put(blks[0].Cid().Bytes(), blks[1].RawData()) - require.Error(t, err, types.ErrKeyExists.Error()) + require.NoError(t, err) // immutable would return error value, found, err = s.Get(blks[0].Cid().Bytes()) require.NoError(t, err) require.True(t, found) diff --git a/version.json b/version.json index 871f7c0..7af5dc4 100644 --- a/version.json +++ b/version.json @@ -1,3 +1,3 @@ { - "version": "v0.2.4" + "version": "v0.2.5" }