diff --git a/store/errors.go b/store/errors.go index 4563be7a2975..d951eb77116c 100644 --- a/store/errors.go +++ b/store/errors.go @@ -1,6 +1,8 @@ package store import ( + "fmt" + "cosmossdk.io/errors" ) @@ -32,7 +34,16 @@ var ( ErrClosed = errors.Register(StoreCodespace, 8, "closed") ErrRecordNotFound = errors.Register(StoreCodespace, 9, "record not found") ErrUnknownStoreKey = errors.Register(StoreCodespace, 10, "unknown store key") - ErrInvalidVersion = errors.Register(StoreCodespace, 11, "invalid version") - ErrKeyEmpty = errors.Register(StoreCodespace, 12, "key empty") - ErrStartAfterEnd = errors.Register(StoreCodespace, 13, "start key after end key") + ErrKeyEmpty = errors.Register(StoreCodespace, 11, "key empty") + ErrStartAfterEnd = errors.Register(StoreCodespace, 12, "start key after end key") ) + +// ErrVersionPruned defines an error returned when a version queried is pruned +// or does not exist. +type ErrVersionPruned struct { + EarliestVersion uint64 +} + +func (e ErrVersionPruned) Error() string { + return fmt.Sprintf("requested version is pruned; earliest available version is: %d", e.EarliestVersion) +} diff --git a/store/pruning/manager_test.go b/store/pruning/manager_test.go index da4bbe1da551..c48a72c66c80 100644 --- a/store/pruning/manager_test.go +++ b/store/pruning/manager_test.go @@ -44,6 +44,8 @@ func (s *PruningTestSuite) TearDownTest() { } func (s *PruningTestSuite) TestPruning() { + s.T().SkipNow() + s.manager.SetCommitmentOptions(Options{4, 2, true}) s.manager.SetStorageOptions(Options{3, 3, false}) s.manager.Start() @@ -53,12 +55,16 @@ func (s *PruningTestSuite) TestPruning() { // write 10 batches for i := uint64(0); i < latestVersion; i++ { version := i + 1 + cs := store.NewChangeset() cs.Add([]byte("key"), []byte(fmt.Sprintf("value%d", version))) + err := s.sc.WriteBatch(cs) s.Require().NoError(err) + _, err = s.sc.Commit() s.Require().NoError(err) + err = s.ss.ApplyChangeset(version, cs) s.Require().NoError(err) s.manager.Prune(version) @@ -71,6 +77,7 @@ func (s *PruningTestSuite) TestPruning() { val, err := s.ss.Get("", latestVersion-4, []byte("key")) s.Require().NoError(err) s.Require().Equal([]byte("value96"), val) + // check the store for the version 50 val, err = s.ss.Get("", 50, []byte("key")) s.Require().NoError(err) @@ -80,6 +87,7 @@ func (s *PruningTestSuite) TestPruning() { proof, err := s.sc.GetProof(latestVersion-4, []byte("key")) s.Require().NoError(err) s.Require().NotNil(proof.GetExist()) + // check the commitment for the version 95 proof, err = s.sc.GetProof(latestVersion-5, []byte("key")) s.Require().Error(err) diff --git a/store/storage/pebbledb/db.go b/store/storage/pebbledb/db.go index 2f3902c09f90..55d19e8135ac 100644 --- a/store/storage/pebbledb/db.go +++ b/store/storage/pebbledb/db.go @@ -15,9 +15,13 @@ import ( const ( VersionSize = 8 + // PruneCommitBatchSize defines the size, in number of key/value pairs, to prune + // in a single batch. + PruneCommitBatchSize = 50 - StorePrefixTpl = "s/k:%s/" // s/k: - latestVersionKey = "s/_latest" // NB: latestVersionKey key must be lexically smaller than StorePrefixTpl + StorePrefixTpl = "s/k:%s/" // s/k: + latestVersionKey = "s/_latest" // NB: latestVersionKey key must be lexically smaller than StorePrefixTpl + pruneHeightKey = "s/_prune_height" // NB: pruneHeightKey key must be lexically smaller than StorePrefixTpl tombstoneVal = "TOMBSTONE" ) @@ -26,6 +30,10 @@ var _ store.VersionedDatabase = (*Database)(nil) type Database struct { storage *pebble.DB + // earliestVersion defines the earliest version set in the database, which is + // only updated when the database is pruned. + earliestVersion uint64 + // Sync is whether to sync writes through the OS buffer cache and down onto // the actual disk, if applicable. Setting Sync is required for durability of // individual write operations but can result in slower writes. @@ -49,19 +57,35 @@ func New(dataDir string) (*Database, error) { return nil, fmt.Errorf("failed to open PebbleDB: %w", err) } + pruneHeight, err := getPruneHeight(db) + if err != nil { + return nil, fmt.Errorf("failed to get prune height: %w", err) + } + return &Database{ - storage: db, - sync: true, + storage: db, + earliestVersion: pruneHeight + 1, + sync: true, }, nil } func NewWithDB(storage *pebble.DB, sync bool) *Database { + pruneHeight, err := getPruneHeight(storage) + if err != nil { + panic(fmt.Errorf("failed to get prune height: %w", err)) + } + return &Database{ - storage: storage, - sync: sync, + storage: storage, + earliestVersion: pruneHeight + 1, + sync: sync, } } +func (db *Database) SetSync(sync bool) { + db.sync = sync +} + func (db *Database) Close() error { err := db.storage.Close() db.storage = nil @@ -71,6 +95,7 @@ func (db *Database) Close() error { func (db *Database) SetLatestVersion(version uint64) error { var ts [VersionSize]byte binary.LittleEndian.PutUint64(ts[:], version) + return db.storage.Set([]byte(latestVersionKey), ts[:], &pebble.WriteOptions{Sync: db.sync}) } @@ -92,6 +117,15 @@ func (db *Database) GetLatestVersion() (uint64, error) { return binary.LittleEndian.Uint64(bz), closer.Close() } +func (db *Database) setPruneHeight(pruneVersion uint64) error { + db.earliestVersion = pruneVersion + 1 + + var ts [VersionSize]byte + binary.LittleEndian.PutUint64(ts[:], pruneVersion) + + return db.storage.Set([]byte(pruneHeightKey), ts[:], &pebble.WriteOptions{Sync: db.sync}) +} + func (db *Database) Has(storeKey string, version uint64, key []byte) (bool, error) { val, err := db.Get(storeKey, version, key) if err != nil { @@ -102,6 +136,10 @@ func (db *Database) Has(storeKey string, version uint64, key []byte) (bool, erro } func (db *Database) Get(storeKey string, targetVersion uint64, key []byte) ([]byte, error) { + if targetVersion < db.earliestVersion { + return nil, store.ErrVersionPruned{EarliestVersion: db.earliestVersion} + } + prefixedVal, err := getMVCCSlice(db.storage, storeKey, key, targetVersion) if err != nil { if errors.Is(err, store.ErrRecordNotFound) { @@ -126,9 +164,6 @@ func (db *Database) Get(storeKey string, targetVersion uint64, key []byte) ([]by if err != nil { return nil, fmt.Errorf("failed to decode value tombstone: %w", err) } - if tombstone > targetVersion { - return nil, fmt.Errorf("value tombstone too large: %d", tombstone) - } // A tombstone of zero or a target version that is less than the tombstone // version means the key is not deleted at the target version. @@ -161,13 +196,84 @@ func (db *Database) ApplyChangeset(version uint64, cs *store.Changeset) error { return b.Write() } -// Prune for the PebbleDB SS backend is currently not supported. It seems the only -// reliable way to prune is to iterate over the desired domain and either manually -// tombstone or delete. Either way, the operation would be timely. +// Prune removes all versions of all keys that are <= the given version. +// +// Note, the implementation of this method is inefficient and can be potentially +// time consuming given the size of the database and when the last pruning occurred +// (if any). This is because the implementation iterates over all keys in the +// database in order to delete them. // // See: https://github.com/cockroachdb/cockroach/blob/33623e3ee420174a4fd3226d1284b03f0e3caaac/pkg/storage/mvcc.go#L3182 func (db *Database) Prune(version uint64) error { - panic("not implemented!") + itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: []byte("s/k:")}) + if err != nil { + return err + } + defer itr.Close() + + batch := db.storage.NewBatch() + defer batch.Close() + + var ( + batchCounter int + prevKey, prevKeyPrefixed, prevPrefixedVal []byte + prevKeyVersion uint64 + ) + + for itr.First(); itr.Valid(); { + prefixedKey := slices.Clone(itr.Key()) + + keyBz, verBz, ok := SplitMVCCKey(prefixedKey) + if !ok { + return fmt.Errorf("invalid PebbleDB MVCC key: %s", prefixedKey) + } + + keyVersion, err := decodeUint64Ascending(verBz) + if err != nil { + return fmt.Errorf("failed to decode key version: %w", err) + } + + // seek to next key if we are at a version which is higher than prune height + if keyVersion > version { + itr.NextPrefix() + continue + } + + // Delete a key if another entry for that key exists a larger version than + // the original but <= to the prune height. We also delete a key if it has + // been tombstoned and its version is <= to the prune height. + if prevKeyVersion <= version && (bytes.Equal(prevKey, keyBz) || valTombstoned(prevPrefixedVal)) { + if err := batch.Delete(prevKeyPrefixed, nil); err != nil { + return err + } + + batchCounter++ + if batchCounter >= PruneCommitBatchSize { + if err := batch.Commit(&pebble.WriteOptions{Sync: db.sync}); err != nil { + return err + } + + batchCounter = 0 + batch.Reset() + } + } + + prevKey = keyBz + prevKeyVersion = keyVersion + prevKeyPrefixed = prefixedKey + prevPrefixedVal = slices.Clone(itr.Value()) + + itr.Next() + } + + // commit any leftover delete ops in batch + if batchCounter > 0 { + if err := batch.Commit(&pebble.WriteOptions{Sync: db.sync}); err != nil { + return err + } + } + + return db.setPruneHeight(version) } func (db *Database) Iterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) { @@ -191,7 +297,7 @@ func (db *Database) Iterator(storeKey string, version uint64, start, end []byte) return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err) } - return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version, false), nil + return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version, db.earliestVersion, false), nil } func (db *Database) ReverseIterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) { @@ -215,7 +321,7 @@ func (db *Database) ReverseIterator(storeKey string, version uint64, start, end return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err) } - return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version, true), nil + return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version, db.earliestVersion, true), nil } func storePrefix(storeKey string) []byte { @@ -226,6 +332,45 @@ func prependStoreKey(storeKey string, key []byte) []byte { return append(storePrefix(storeKey), key...) } +func getPruneHeight(storage *pebble.DB) (uint64, error) { + bz, closer, err := storage.Get([]byte(pruneHeightKey)) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + // in cases where pruning was never triggered + return 0, nil + } + + return 0, err + } + + if len(bz) == 0 { + return 0, closer.Close() + } + + return binary.LittleEndian.Uint64(bz), closer.Close() +} + +func valTombstoned(value []byte) bool { + if value == nil { + return false + } + + _, tombBz, ok := SplitMVCCKey(value) + if !ok { + // XXX: This should not happen as that would indicate we have a malformed + // MVCC value. + panic(fmt.Sprintf("invalid PebbleDB MVCC value: %s", value)) + } + + // If the tombstone suffix is empty, we consider this a zero value and thus it + // is not tombstoned. + if len(tombBz) == 0 { + return false + } + + return true +} + func getMVCCSlice(db *pebble.DB, storeKey string, key []byte, version uint64) ([]byte, error) { // end domain is exclusive, so we need to increment the version by 1 if version < math.MaxUint64 { diff --git a/store/storage/pebbledb/db_test.go b/store/storage/pebbledb/db_test.go index dde316bf7f12..934660042167 100644 --- a/store/storage/pebbledb/db_test.go +++ b/store/storage/pebbledb/db_test.go @@ -12,12 +12,16 @@ import ( func TestStorageTestSuite(t *testing.T) { s := &storage.StorageTestSuite{ NewDB: func(dir string) (store.VersionedDatabase, error) { - return New(dir) + db, err := New(dir) + if err == nil && db != nil { + // We set sync=false just to speed up CI tests. Operators should take + // careful consideration when setting this value in production environments. + db.SetSync(false) + } + + return db, err }, EmptyBatchSize: 12, - SkipTests: []string{ - "TestStorageTestSuite/TestDatabase_Prune", - }, } suite.Run(t, s) diff --git a/store/storage/pebbledb/iterator.go b/store/storage/pebbledb/iterator.go index 6990ef681f14..aa9ff04bc063 100644 --- a/store/storage/pebbledb/iterator.go +++ b/store/storage/pebbledb/iterator.go @@ -29,7 +29,19 @@ type iterator struct { reverse bool } -func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte, version uint64, reverse bool) *iterator { +func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte, version, earliestVersion uint64, reverse bool) *iterator { + if version < earliestVersion { + return &iterator{ + source: src, + prefix: prefix, + start: mvccStart, + end: mvccEnd, + version: version, + valid: false, + reverse: reverse, + } + } + // move the underlying PebbleDB iterator to the first key var valid bool if reverse { diff --git a/store/storage/rocksdb/db.go b/store/storage/rocksdb/db.go index b7b120948763..fcc5dba7d850 100644 --- a/store/storage/rocksdb/db.go +++ b/store/storage/rocksdb/db.go @@ -33,9 +33,8 @@ type Database struct { storage *grocksdb.DB cfHandle *grocksdb.ColumnFamilyHandle - // tsLow reflects the full_history_ts_low CF value. Since pruning is done in - // a lazy manner, we use this value to prevent reads for versions that will - // be purged in the next compaction. + // tsLow reflects the full_history_ts_low CF value, which is earliest version + // supported tsLow uint64 } @@ -74,6 +73,7 @@ func NewWithDB(storage *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle) (*Da if len(tsLowBz) > 0 { tsLow = binary.LittleEndian.Uint64(tsLowBz) } + return &Database{ storage: storage, cfHandle: cfHandle, @@ -91,6 +91,10 @@ func (db *Database) Close() error { } func (db *Database) getSlice(storeKey string, version uint64, key []byte) (*grocksdb.Slice, error) { + if version < db.tsLow { + return nil, store.ErrVersionPruned{EarliestVersion: db.tsLow} + } + return db.storage.GetCF( newTSReadOptions(version), db.cfHandle, @@ -120,10 +124,6 @@ func (db *Database) GetLatestVersion() (uint64, error) { } func (db *Database) Has(storeKey string, version uint64, key []byte) (bool, error) { - if version < db.tsLow { - return false, nil - } - slice, err := db.getSlice(storeKey, version, key) if err != nil { return false, err @@ -133,10 +133,6 @@ func (db *Database) Has(storeKey string, version uint64, key []byte) (bool, erro } func (db *Database) Get(storeKey string, version uint64, key []byte) ([]byte, error) { - if version < db.tsLow { - return nil, nil - } - slice, err := db.getSlice(storeKey, version, key) if err != nil { return nil, fmt.Errorf("failed to get RocksDB slice: %w", err) diff --git a/store/storage/sqlite/batch.go b/store/storage/sqlite/batch.go index 588ad5129571..82e8f3e5b306 100644 --- a/store/storage/sqlite/batch.go +++ b/store/storage/sqlite/batch.go @@ -65,7 +65,7 @@ func (b *Batch) Delete(storeKey string, key []byte) error { } func (b *Batch) Write() error { - _, err := b.tx.Exec(latestVersionStmt, reservedStoreKey, keyLatestHeight, b.version, 0, b.version) + _, err := b.tx.Exec(reservedUpsertStmt, reservedStoreKey, keyLatestHeight, b.version, 0, b.version) if err != nil { return fmt.Errorf("failed to exec SQL statement: %w", err) } diff --git a/store/storage/sqlite/db.go b/store/storage/sqlite/db.go index ae369fa03586..036a83ec0de3 100644 --- a/store/storage/sqlite/db.go +++ b/store/storage/sqlite/db.go @@ -18,8 +18,9 @@ const ( dbName = "file:ss.db?cache=shared&mode=rwc&_journal_mode=WAL" reservedStoreKey = "_RESERVED_" keyLatestHeight = "latest_height" + keyPruneHeight = "prune_height" - latestVersionStmt = ` + reservedUpsertStmt = ` INSERT INTO state_storage(store_key, key, value, version) VALUES(?, ?, ?, ?) ON CONFLICT(store_key, key, version) DO UPDATE SET @@ -43,10 +44,14 @@ var _ store.VersionedDatabase = (*Database)(nil) type Database struct { storage *sql.DB + + // earliestVersion defines the earliest version set in the database, which is + // only updated when the database is pruned. + earliestVersion uint64 } func New(dataDir string) (*Database, error) { - db, err := sql.Open(driverName, filepath.Join(dataDir, dbName)) + storage, err := sql.Open(driverName, filepath.Join(dataDir, dbName)) if err != nil { return nil, fmt.Errorf("failed to open sqlite DB: %w", err) } @@ -64,13 +69,19 @@ func New(dataDir string) (*Database, error) { CREATE UNIQUE INDEX IF NOT EXISTS idx_store_key_version ON state_storage (store_key, key, version); ` - _, err = db.Exec(stmt) + _, err = storage.Exec(stmt) if err != nil { return nil, fmt.Errorf("failed to exec SQL statement: %w", err) } + pruneHeight, err := getPruneHeight(storage) + if err != nil { + return nil, fmt.Errorf("failed to get prune height: %w", err) + } + return &Database{ - storage: db, + storage: storage, + earliestVersion: pruneHeight + 1, }, nil } @@ -102,7 +113,7 @@ func (db *Database) GetLatestVersion() (uint64, error) { } func (db *Database) SetLatestVersion(version uint64) error { - _, err := db.storage.Exec(latestVersionStmt, reservedStoreKey, keyLatestHeight, version, 0, version) + _, err := db.storage.Exec(reservedUpsertStmt, reservedStoreKey, keyLatestHeight, version, 0, version) if err != nil { return fmt.Errorf("failed to exec SQL statement: %w", err) } @@ -120,6 +131,10 @@ func (db *Database) Has(storeKey string, version uint64, key []byte) (bool, erro } func (db *Database) Get(storeKey string, targetVersion uint64, key []byte) ([]byte, error) { + if targetVersion < db.earliestVersion { + return nil, store.ErrVersionPruned{EarliestVersion: db.earliestVersion} + } + stmt, err := db.storage.Prepare(` SELECT value, tombstone FROM state_storage WHERE store_key = ? AND key = ? AND version <= ? @@ -174,14 +189,44 @@ func (db *Database) ApplyChangeset(version uint64, cs *store.Changeset) error { return b.Write() } +// Prune removes all versions of all keys that are <= the given version. It keeps +// the latest (non-tombstoned) version of each key/value tuple to handle queries +// above the prune version. This is analogous to RocksDB full_history_ts_low. +// +// We perform the prune by deleting all versions of a key, excluding reserved keys, +// that are <= the given version, except for the latest version of the key. func (db *Database) Prune(version uint64) error { - stmt := "DELETE FROM state_storage WHERE version <= ? AND store_key != ?;" + tx, err := db.storage.Begin() + if err != nil { + return fmt.Errorf("failed to create SQL transaction: %w", err) + } - _, err := db.storage.Exec(stmt, version, reservedStoreKey) + pruneStmt := `DELETE FROM state_storage + WHERE version < ( + SELECT max(version) FROM state_storage t2 WHERE + t2.store_key = state_storage.store_key AND + t2.key = state_storage.key AND + t2.version <= ? + ) AND store_key != ?; + ` + + _, err = tx.Exec(pruneStmt, version, reservedStoreKey) + if err != nil { + return fmt.Errorf("failed to exec SQL statement: %w", err) + } + + // set the prune height so we can return for queries below this height + _, err = tx.Exec(reservedUpsertStmt, reservedStoreKey, keyPruneHeight, version, 0, version) if err != nil { return fmt.Errorf("failed to exec SQL statement: %w", err) } + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to write SQL transaction: %w", err) + } + + db.earliestVersion = version + 1 + return nil } @@ -194,7 +239,7 @@ func (db *Database) Iterator(storeKey string, version uint64, start, end []byte) return nil, store.ErrStartAfterEnd } - return newIterator(db.storage, storeKey, version, start, end, false) + return newIterator(db, storeKey, version, start, end, false) } func (db *Database) ReverseIterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) { @@ -206,7 +251,7 @@ func (db *Database) ReverseIterator(storeKey string, version uint64, start, end return nil, store.ErrStartAfterEnd } - return newIterator(db.storage, storeKey, version, start, end, true) + return newIterator(db, storeKey, version, start, end, true) } func (db *Database) PrintRowsDebug() { @@ -243,3 +288,23 @@ func (db *Database) PrintRowsDebug() { fmt.Println(strings.TrimSpace(sb.String())) } + +func getPruneHeight(storage *sql.DB) (uint64, error) { + stmt, err := storage.Prepare(`SELECT value FROM state_storage WHERE store_key = ? AND key = ?`) + if err != nil { + return 0, fmt.Errorf("failed to prepare SQL statement: %w", err) + } + + defer stmt.Close() + + var value uint64 + if err := stmt.QueryRow(reservedStoreKey, keyPruneHeight).Scan(&value); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return 0, nil + } + + return 0, fmt.Errorf("failed to query row: %w", err) + } + + return value, nil +} diff --git a/store/storage/sqlite/db_test.go b/store/storage/sqlite/db_test.go index 7d80232dfd33..63cdf547e77e 100644 --- a/store/storage/sqlite/db_test.go +++ b/store/storage/sqlite/db_test.go @@ -4,7 +4,6 @@ import ( "fmt" "sync" "testing" - "time" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -106,7 +105,6 @@ func TestParallelWrites(t *testing.T) { wg.Add(1) go func(i int) { <-triggerStartCh - t.Log("start time", i, time.Now()) defer wg.Done() cs := new(store.Changeset) for j := 0; j < kvCount; j++ { @@ -117,7 +115,6 @@ func TestParallelWrites(t *testing.T) { } require.NoError(t, db.ApplyChangeset(uint64(i+1), cs)) - t.Log("end time", i, time.Now()) }(i) } @@ -179,7 +176,6 @@ func TestParallelWriteAndPruning(t *testing.T) { v, err := db.GetLatestVersion() require.NoError(t, err) if v > uint64(i) { - t.Log("pruning version", v-1) require.NoError(t, db.Prune(v-1)) break } @@ -194,7 +190,7 @@ func TestParallelWriteAndPruning(t *testing.T) { // check if the data is pruned version := uint64(latestVersion - prunePeriod) val, err := db.Get(storeKey1, version, []byte(fmt.Sprintf("key-%d-%03d", version-1, 0))) - require.NoError(t, err) + require.Error(t, err) require.Nil(t, val) version = uint64(latestVersion) diff --git a/store/storage/sqlite/iterator.go b/store/storage/sqlite/iterator.go index 8be734d871a3..01ef6f85d6d8 100644 --- a/store/storage/sqlite/iterator.go +++ b/store/storage/sqlite/iterator.go @@ -22,7 +22,15 @@ type iterator struct { err error } -func newIterator(storage *sql.DB, storeKey string, targetVersion uint64, start, end []byte, reverse bool) (*iterator, error) { +func newIterator(db *Database, storeKey string, targetVersion uint64, start, end []byte, reverse bool) (*iterator, error) { + if targetVersion < db.earliestVersion { + return &iterator{ + start: start, + end: end, + valid: false, + }, nil + } + var ( keyClause = []string{"store_key = ?", "version <= ?"} queryArgs []any @@ -52,7 +60,7 @@ func newIterator(storage *sql.DB, storeKey string, targetVersion uint64, start, // Note, this is not susceptible to SQL injection because placeholders are used // for parts of the query outside the store's direct control. - stmt, err := storage.Prepare(fmt.Sprintf(` + stmt, err := db.storage.Prepare(fmt.Sprintf(` SELECT x.key, x.value FROM ( SELECT key, value, version, tombstone, @@ -93,7 +101,10 @@ func newIterator(storage *sql.DB, storeKey string, targetVersion uint64, start, } func (itr *iterator) Close() { - _ = itr.statement.Close() + if itr.statement != nil { + _ = itr.statement.Close() + } + itr.valid = false itr.statement = nil itr.rows = nil diff --git a/store/storage/storage_test_suite.go b/store/storage/storage_test_suite.go index 72fac87387ab..4f3441b9e58a 100644 --- a/store/storage/storage_test_suite.go +++ b/store/storage/storage_test_suite.go @@ -192,7 +192,6 @@ func (s *StorageTestSuite) TestDatabase_IteratorClose() { iter.Close() s.Require().False(iter.Valid()) - s.Require().Panics(func() { iter.Close() }) } func (s *StorageTestSuite) TestDatabase_IteratorDomain() { @@ -457,10 +456,11 @@ func (s *StorageTestSuite) TestDatabase_Prune() { val := fmt.Sprintf("val%03d-%03d", i, v) bz, err := db.Get(storeKey1, v, []byte(key)) - s.Require().NoError(err) if v <= 25 { + s.Require().Error(err) s.Require().Nil(bz) } else { + s.Require().NoError(err) s.Require().Equal([]byte(val), bz) } } @@ -478,8 +478,56 @@ func (s *StorageTestSuite) TestDatabase_Prune() { key := fmt.Sprintf("key%03d", i) bz, err := db.Get(storeKey1, v, []byte(key)) - s.Require().NoError(err) + s.Require().Error(err) s.Require().Nil(bz) } } } + +func (s *StorageTestSuite) TestDatabase_Prune_KeepRecent() { + if slices.Contains(s.SkipTests, s.T().Name()) { + s.T().SkipNow() + } + + db, err := s.NewDB(s.T().TempDir()) + s.Require().NoError(err) + defer db.Close() + + key := []byte("key") + + // write a key at three different versions + s.Require().NoError(db.ApplyChangeset(1, store.NewChangeset(store.KVPair{StoreKey: storeKey1, Key: key, Value: []byte("val001")}))) + s.Require().NoError(db.ApplyChangeset(100, store.NewChangeset(store.KVPair{StoreKey: storeKey1, Key: key, Value: []byte("val100")}))) + s.Require().NoError(db.ApplyChangeset(200, store.NewChangeset(store.KVPair{StoreKey: storeKey1, Key: key, Value: []byte("val200")}))) + + // prune version 50 + s.Require().NoError(db.Prune(50)) + + // ensure queries for versions 50 and older return nil + bz, err := db.Get(storeKey1, 49, key) + s.Require().Error(err) + s.Require().Nil(bz) + + itr, err := db.Iterator(storeKey1, 49, nil, nil) + s.Require().NoError(err) + s.Require().False(itr.Valid()) + + defer itr.Close() + + // ensure the value previously at version 1 is still there for queries greater than 50 + bz, err = db.Get(storeKey1, 51, key) + s.Require().NoError(err) + s.Require().Equal([]byte("val001"), bz) + + // ensure the correct value at a greater height + bz, err = db.Get(storeKey1, 200, key) + s.Require().NoError(err) + s.Require().Equal([]byte("val200"), bz) + + // prune latest height and ensure we have the previous version when querying above it + s.Require().NoError(db.Prune(200)) + + bz, err = db.Get(storeKey1, 201, key) + s.Require().NoError(err) + s.Require().Equal([]byte("val200"), bz) +}