Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store/v2): Add Pruning Tests & Fix SQLite & PebbleDB Pruning #18459

Merged
merged 27 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ff31990
updates
alexanderbez Nov 13, 2023
b93cbcf
updates
alexanderbez Nov 14, 2023
20a687d
updates
alexanderbez Nov 14, 2023
a4e2e6e
updates
alexanderbez Nov 14, 2023
18983d5
updates
alexanderbez Nov 14, 2023
1120a92
updates
alexanderbez Nov 14, 2023
18c5e20
Merge branch 'main' into bez/fix-sqlite-ss-prune
alexanderbez Nov 14, 2023
1836016
updates
alexanderbez Nov 14, 2023
e8296c0
Merge branch 'bez/fix-sqlite-ss-prune' of github.com:cosmos/cosmos-sd…
alexanderbez Nov 14, 2023
168d0c4
Merge branch 'main' into bez/fix-sqlite-ss-prune
alexanderbez Nov 15, 2023
7d7ac92
updates
alexanderbez Nov 15, 2023
224998f
Merge branch 'bez/fix-sqlite-ss-prune' of github.com:cosmos/cosmos-sd…
alexanderbez Nov 15, 2023
cd14557
updates
alexanderbez Nov 15, 2023
b85d911
updates
alexanderbez Nov 16, 2023
8fc7e93
updates
alexanderbez Nov 16, 2023
49c6c6c
updates
alexanderbez Nov 16, 2023
3a11bb9
updates
alexanderbez Nov 16, 2023
08113b4
updates
alexanderbez Nov 16, 2023
035fafd
updates
alexanderbez Nov 16, 2023
ba3cab5
Merge branch 'main' into bez/fix-sqlite-ss-prune
alexanderbez Nov 16, 2023
930dbd4
Merge branch 'main' into bez/fix-sqlite-ss-prune
alexanderbez Nov 16, 2023
b65a4c1
updates
alexanderbez Nov 16, 2023
cb6aa04
Merge branch 'bez/fix-sqlite-ss-prune' of github.com:cosmos/cosmos-sd…
alexanderbez Nov 16, 2023
582b11a
Merge branch 'main' into bez/fix-sqlite-ss-prune
alexanderbez Nov 17, 2023
c44fd82
updates
alexanderbez Nov 17, 2023
94bc2bb
updates
alexanderbez Nov 17, 2023
d903a1e
updates
alexanderbez Nov 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions store/errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package store

import (
"fmt"

"cosmossdk.io/errors"
)

Expand Down Expand Up @@ -32,7 +34,16 @@ var (
ErrClosed = errors.Register(StoreCodespace, 8, "closed")
ErrRecordNotFound = errors.Register(StoreCodespace, 9, "record not found")
ErrUnknownStoreKey = errors.Register(StoreCodespace, 10, "unknown store key")
ErrInvalidVersion = errors.Register(StoreCodespace, 11, "invalid version")
ErrKeyEmpty = errors.Register(StoreCodespace, 12, "key empty")
ErrStartAfterEnd = errors.Register(StoreCodespace, 13, "start key after end key")
ErrKeyEmpty = errors.Register(StoreCodespace, 11, "key empty")
ErrStartAfterEnd = errors.Register(StoreCodespace, 12, "start key after end key")
)

// ErrVersionPruned defines an error returned when a version queried is pruned
// or does not exist.
type ErrVersionPruned struct {
EarliestVersion uint64
}

func (e ErrVersionPruned) Error() string {
return fmt.Sprintf("requested version is pruned; earliest available version is: %d", e.EarliestVersion)
}
175 changes: 160 additions & 15 deletions store/storage/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ import (

const (
VersionSize = 8
// PruneCommitBatchSize defines the size, in number of key/value pairs, to prune
// in a single batch.
PruneCommitBatchSize = 50

StorePrefixTpl = "s/k:%s/" // s/k:<storeKey>
latestVersionKey = "s/_latest" // NB: latestVersionKey key must be lexically smaller than StorePrefixTpl
StorePrefixTpl = "s/k:%s/" // s/k:<storeKey>
latestVersionKey = "s/_latest" // NB: latestVersionKey key must be lexically smaller than StorePrefixTpl
pruneHeightKey = "s/_prune_height" // NB: pruneHeightKey key must be lexically smaller than StorePrefixTpl
tombstoneVal = "TOMBSTONE"
)

Expand All @@ -26,6 +30,10 @@ var _ store.VersionedDatabase = (*Database)(nil)
type Database struct {
storage *pebble.DB

// earliestVersion defines the earliest version set in the database, which is
// only updated when the database is pruned.
earliestVersion uint64

// Sync is whether to sync writes through the OS buffer cache and down onto
// the actual disk, if applicable. Setting Sync is required for durability of
// individual write operations but can result in slower writes.
Expand All @@ -49,19 +57,35 @@ func New(dataDir string) (*Database, error) {
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
}

pruneHeight, err := getPruneHeight(db)
if err != nil {
return nil, fmt.Errorf("failed to get prune height: %w", err)
}

return &Database{
storage: db,
sync: true,
storage: db,
earliestVersion: pruneHeight + 1,
sync: true,
}, nil
}

func NewWithDB(storage *pebble.DB, sync bool) *Database {
pruneHeight, err := getPruneHeight(storage)
if err != nil {
panic(fmt.Errorf("failed to get prune height: %w", err))
}

return &Database{
storage: storage,
sync: sync,
storage: storage,
earliestVersion: pruneHeight + 1,
sync: sync,
}
}

func (db *Database) SetSync(sync bool) {
db.sync = sync
}

func (db *Database) Close() error {
err := db.storage.Close()
db.storage = nil
Expand All @@ -71,6 +95,7 @@ func (db *Database) Close() error {
func (db *Database) SetLatestVersion(version uint64) error {
var ts [VersionSize]byte
binary.LittleEndian.PutUint64(ts[:], version)

return db.storage.Set([]byte(latestVersionKey), ts[:], &pebble.WriteOptions{Sync: db.sync})
}

Expand All @@ -92,6 +117,15 @@ func (db *Database) GetLatestVersion() (uint64, error) {
return binary.LittleEndian.Uint64(bz), closer.Close()
}

func (db *Database) setPruneHeight(pruneVersion uint64) error {
db.earliestVersion = pruneVersion + 1

var ts [VersionSize]byte
binary.LittleEndian.PutUint64(ts[:], pruneVersion)

return db.storage.Set([]byte(pruneHeightKey), ts[:], &pebble.WriteOptions{Sync: db.sync})
}

func (db *Database) Has(storeKey string, version uint64, key []byte) (bool, error) {
val, err := db.Get(storeKey, version, key)
if err != nil {
Expand All @@ -102,6 +136,10 @@ func (db *Database) Has(storeKey string, version uint64, key []byte) (bool, erro
}

func (db *Database) Get(storeKey string, targetVersion uint64, key []byte) ([]byte, error) {
if targetVersion < db.earliestVersion {
return nil, store.ErrVersionPruned{EarliestVersion: db.earliestVersion}
}

prefixedVal, err := getMVCCSlice(db.storage, storeKey, key, targetVersion)
if err != nil {
if errors.Is(err, store.ErrRecordNotFound) {
Expand All @@ -126,9 +164,6 @@ func (db *Database) Get(storeKey string, targetVersion uint64, key []byte) ([]by
if err != nil {
return nil, fmt.Errorf("failed to decode value tombstone: %w", err)
}
if tombstone > targetVersion {
return nil, fmt.Errorf("value tombstone too large: %d", tombstone)
}

// A tombstone of zero or a target version that is less than the tombstone
// version means the key is not deleted at the target version.
Expand Down Expand Up @@ -161,13 +196,84 @@ func (db *Database) ApplyChangeset(version uint64, cs *store.Changeset) error {
return b.Write()
}

// Prune for the PebbleDB SS backend is currently not supported. It seems the only
// reliable way to prune is to iterate over the desired domain and either manually
// tombstone or delete. Either way, the operation would be timely.
// Prune removes all versions of all keys that are <= the given version.
//
// Note, the implementation of this method is inefficient and can be potentially
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
// time consuming given the size of the database and when the last pruning occurred
// (if any). This is because the implementation iterates over all keys in the
// database in order to delete them.
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
//
// See: https://github.com/cockroachdb/cockroach/blob/33623e3ee420174a4fd3226d1284b03f0e3caaac/pkg/storage/mvcc.go#L3182
func (db *Database) Prune(version uint64) error {
panic("not implemented!")
itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: []byte("s/k:")})
if err != nil {
return err
}
defer itr.Close()

batch := db.storage.NewBatch()
defer batch.Close()

var (
batchCounter int
prevKey, prevKeyPrefixed, prevPrefixedVal []byte
prevKeyVersion uint64
)

for itr.First(); itr.Valid(); {
prefixedKey := slices.Clone(itr.Key())

keyBz, verBz, ok := SplitMVCCKey(prefixedKey)
if !ok {
return fmt.Errorf("invalid PebbleDB MVCC key: %s", prefixedKey)
}

keyVersion, err := decodeUint64Ascending(verBz)
if err != nil {
return fmt.Errorf("failed to decode key version: %w", err)
}

// seek to next key if we are at a version which is higher than prune height
if keyVersion > version {
itr.NextPrefix()
continue
}

// Delete a key if another entry for that key exists a larger version than
// the original but <= to the prune height. We also delete a key if it has
// been tombstoned and its version is <= to the prune height.
if prevKeyVersion <= version && (bytes.Equal(prevKey, keyBz) || valTombstoned(prevPrefixedVal)) {
if err := batch.Delete(prevKeyPrefixed, nil); err != nil {
return err
}

batchCounter++
if batchCounter >= PruneCommitBatchSize {
if err := batch.Commit(&pebble.WriteOptions{Sync: db.sync}); err != nil {
return err
}

batchCounter = 0
batch.Reset()
}
}

prevKey = keyBz
prevKeyVersion = keyVersion
prevKeyPrefixed = prefixedKey
prevPrefixedVal = slices.Clone(itr.Value())

itr.Next()
}

// commit any leftover delete ops in batch
if batchCounter > 0 {
if err := batch.Commit(&pebble.WriteOptions{Sync: db.sync}); err != nil {
return err
}
}

return db.setPruneHeight(version)
}

func (db *Database) Iterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) {
Expand All @@ -191,7 +297,7 @@ func (db *Database) Iterator(storeKey string, version uint64, start, end []byte)
return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err)
}

return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version, false), nil
return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version, db.earliestVersion, false), nil
}

func (db *Database) ReverseIterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) {
Expand All @@ -215,7 +321,7 @@ func (db *Database) ReverseIterator(storeKey string, version uint64, start, end
return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err)
}

return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version, true), nil
return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version, db.earliestVersion, true), nil
}

func storePrefix(storeKey string) []byte {
Expand All @@ -226,6 +332,45 @@ func prependStoreKey(storeKey string, key []byte) []byte {
return append(storePrefix(storeKey), key...)
}

func getPruneHeight(storage *pebble.DB) (uint64, error) {
bz, closer, err := storage.Get([]byte(pruneHeightKey))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
// in cases where pruning was never triggered
return 0, nil
}

return 0, err
}

if len(bz) == 0 {
return 0, closer.Close()
}

return binary.LittleEndian.Uint64(bz), closer.Close()
}

func valTombstoned(value []byte) bool {
if value == nil {
return false
}

_, tombBz, ok := SplitMVCCKey(value)
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC value.
panic(fmt.Sprintf("invalid PebbleDB MVCC value: %s", value))
}

// If the tombstone suffix is empty, we consider this a zero value and thus it
// is not tombstoned.
if len(tombBz) == 0 {
return false
}

return true
}
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved

func getMVCCSlice(db *pebble.DB, storeKey string, key []byte, version uint64) ([]byte, error) {
// end domain is exclusive, so we need to increment the version by 1
if version < math.MaxUint64 {
Expand Down
11 changes: 7 additions & 4 deletions store/storage/pebbledb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@ import (
func TestStorageTestSuite(t *testing.T) {
s := &storage.StorageTestSuite{
NewDB: func(dir string) (store.VersionedDatabase, error) {
return New(dir)
db, err := New(dir)
if err == nil && db != nil {
// we set sync=false just to speed up CI tests
db.SetSync(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in most environments this would be true, is there a reason to setting this to false for testing?

Copy link
Contributor Author

@alexanderbez alexanderbez Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just to make the unit tests faster. Not a strong reason, but it helps speed up CI testing time. (11.994s vs 1.214s)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

amazing!! could you add a short comment saying this. Would help for future readers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha

}

return db, err
},
EmptyBatchSize: 12,
SkipTests: []string{
"TestStorageTestSuite/TestDatabase_Prune",
},
}

suite.Run(t, s)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change to set the database sync behavior to false for speeding up CI tests is reasonable, but it should be documented in the code as per the discussion in the comments. A comment should be added to explain why db.SetSync(false) is being called, to provide context for future maintainers.

  if err == nil && db != nil {
+   // Set sync to false to speed up CI tests (11.994s vs 1.214s)
    db.SetSync(false)
  }

Commitable suggestion

[!IMPORTANT]
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
func TestStorageTestSuite(t *testing.T) {
s := &storage.StorageTestSuite{
NewDB: func(dir string) (store.VersionedDatabase, error) {
return New(dir)
db, err := New(dir)
if err == nil && db != nil {
// we set sync=false just to speed up CI tests
db.SetSync(false)
}
return db, err
},
EmptyBatchSize: 12,
SkipTests: []string{
"TestStorageTestSuite/TestDatabase_Prune",
},
}
suite.Run(t, s)
func TestStorageTestSuite(t *testing.T) {
s := &storage.StorageTestSuite{
NewDB: func(dir string) (store.VersionedDatabase, error) {
db, err := New(dir)
if err == nil && db != nil {
// Set sync to false to speed up CI tests (11.994s vs 1.214s)
db.SetSync(false)
}
return db, err
},
EmptyBatchSize: 12,
}
suite.Run(t, s)

Expand Down
14 changes: 13 additions & 1 deletion store/storage/pebbledb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,19 @@ type iterator struct {
reverse bool
}

func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte, version uint64, reverse bool) *iterator {
func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte, version, earliestVersion uint64, reverse bool) *iterator {
if version < earliestVersion {
return &iterator{
source: src,
prefix: prefix,
start: mvccStart,
end: mvccEnd,
version: version,
valid: false,
reverse: reverse,
}
}

// move the underlying PebbleDB iterator to the first key
var valid bool
if reverse {
Expand Down
18 changes: 7 additions & 11 deletions store/storage/rocksdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ type Database struct {
storage *grocksdb.DB
cfHandle *grocksdb.ColumnFamilyHandle

// tsLow reflects the full_history_ts_low CF value. Since pruning is done in
// a lazy manner, we use this value to prevent reads for versions that will
// be purged in the next compaction.
// tsLow reflects the full_history_ts_low CF value, which is earliest version
// supported
tsLow uint64
}

Expand Down Expand Up @@ -74,6 +73,7 @@ func NewWithDB(storage *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle) (*Da
if len(tsLowBz) > 0 {
tsLow = binary.LittleEndian.Uint64(tsLowBz)
}

return &Database{
storage: storage,
cfHandle: cfHandle,
Expand All @@ -91,6 +91,10 @@ func (db *Database) Close() error {
}

func (db *Database) getSlice(storeKey string, version uint64, key []byte) (*grocksdb.Slice, error) {
if version < db.tsLow {
return nil, store.ErrVersionPruned{EarliestVersion: db.tsLow}
}

return db.storage.GetCF(
newTSReadOptions(version),
db.cfHandle,
Expand Down Expand Up @@ -120,10 +124,6 @@ func (db *Database) GetLatestVersion() (uint64, error) {
}

func (db *Database) Has(storeKey string, version uint64, key []byte) (bool, error) {
if version < db.tsLow {
return false, nil
}

slice, err := db.getSlice(storeKey, version, key)
if err != nil {
return false, err
Expand All @@ -133,10 +133,6 @@ func (db *Database) Has(storeKey string, version uint64, key []byte) (bool, erro
}

func (db *Database) Get(storeKey string, version uint64, key []byte) ([]byte, error) {
if version < db.tsLow {
return nil, nil
}

slice, err := db.getSlice(storeKey, version, key)
if err != nil {
return nil, fmt.Errorf("failed to get RocksDB slice: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion store/storage/sqlite/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (b *Batch) Delete(storeKey string, key []byte) error {
}

func (b *Batch) Write() error {
_, err := b.tx.Exec(latestVersionStmt, reservedStoreKey, keyLatestHeight, b.version, 0, b.version)
_, err := b.tx.Exec(reservedUpsertStmt, reservedStoreKey, keyLatestHeight, b.version, 0, b.version)
if err != nil {
return fmt.Errorf("failed to exec SQL statement: %w", err)
}
Expand Down
Loading
Loading