Skip to content

Commit

Permalink
refactor(store/v2): handle the error returned by batch.Close
Browse files Browse the repository at this point in the history
  • Loading branch information
lfz941 committed Sep 18, 2024
1 parent f1dd03f commit dfa36b9
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions store/v2/storage/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,20 @@ func (db *Database) Get(storeKey []byte, targetVersion uint64, key []byte) ([]by
// database in order to delete them.
//
// See: https://github.com/cockroachdb/cockroach/blob/33623e3ee420174a4fd3226d1284b03f0e3caaac/pkg/storage/mvcc.go#L3182
func (db *Database) Prune(version uint64) error {
func (db *Database) Prune(version uint64) (err error) {
itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: []byte("s/k:")})
if err != nil {
return err
}
defer itr.Close()

batch := db.storage.NewBatch()
defer batch.Close()
defer func() {
cErr := batch.Close()
if err == nil {
err = cErr
}
}()

var (
batchCounter int
Expand Down Expand Up @@ -331,9 +336,14 @@ func (db *Database) ReverseIterator(storeKey []byte, version uint64, start, end
return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version, db.earliestVersion, true), nil
}

func (db *Database) PruneStoreKeys(storeKeys []string, version uint64) error {
func (db *Database) PruneStoreKeys(storeKeys []string, version uint64) (err error) {
batch := db.storage.NewBatch()
defer batch.Close()
defer func() {
cErr := batch.Close()
if err == nil {
err = cErr
}
}()

for _, storeKey := range storeKeys {
if err := batch.Set([]byte(fmt.Sprintf("%s%s", encoding.BuildPrefixWithVersion(removedStoreKeyPrefix, version), storeKey)), []byte{}, nil); err != nil {
Expand Down Expand Up @@ -428,9 +438,14 @@ func getMVCCSlice(db *pebble.DB, storeKey, key []byte, version uint64) ([]byte,
return slices.Clone(value), err
}

func (db *Database) deleteRemovedStoreKeys(version uint64) error {
func (db *Database) deleteRemovedStoreKeys(version uint64) (err error) {
batch := db.storage.NewBatch()
defer batch.Close()
defer func() {
cErr := batch.Close()
if err == nil {
err = cErr
}
}()

end := encoding.BuildPrefixWithVersion(removedStoreKeyPrefix, version+1)
storeKeyIter, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: []byte(removedStoreKeyPrefix), UpperBound: end})
Expand Down

0 comments on commit dfa36b9

Please sign in to comment.