diff --git a/pruning/export_test.go b/pruning/export_test.go index 6a270b46eb82..51b1c10462a9 100644 --- a/pruning/export_test.go +++ b/pruning/export_test.go @@ -7,6 +7,8 @@ var ( PruneSnapshotHeightsKey = pruneSnapshotHeightsKey // functions - Int64SliceToBytes = int64SliceToBytes - ListToBytes = listToBytes + Int64SliceToBytes = int64SliceToBytes + ListToBytes = listToBytes + LoadPruningHeights = loadPruningHeights + LoadPruningSnapshotHeights = loadPruningSnapshotHeights ) diff --git a/pruning/manager.go b/pruning/manager.go index 4ce3c6d96a16..a36ab2c296cb 100644 --- a/pruning/manager.go +++ b/pruning/manager.go @@ -13,13 +13,21 @@ import ( ) type Manager struct { - logger log.Logger - db dbm.DB - opts types.PruningOptions - snapshotInterval uint64 - pruneHeights []int64 + db dbm.DB + logger log.Logger + opts types.PruningOptions + snapshotInterval uint64 + pruneHeights []int64 + // Although pruneHeights happen in the same goroutine with the normal execution, + // we sync access to them to avoid soundness issues in the future if concurrency pattern changes. + pruneHeightsMx sync.Mutex + // These are the heights that are multiples of snapshotInterval and kept for state sync snapshots. + // The heights are added to this list to be pruned when a snapshot is complete. pruneSnapshotHeights *list.List - mx sync.Mutex + // Snapshots are taken in a separate goroutine fromt the regular execution + // and can be delivered asynchrounously via HandleHeightSnapshot. + // Therefore, we sync access to pruneSnapshotHeights with this mutex. + pruneSnapshotHeightsMx sync.Mutex } const ( @@ -32,16 +40,15 @@ var ( pruneSnapshotHeightsKey = []byte("s/pruneSnheights") ) -func NewManager(logger log.Logger, db dbm.DB) *Manager { +func NewManager(db dbm.DB, logger log.Logger) *Manager { return &Manager{ - logger: logger, - db: db, - opts: types.NewPruningOptions(types.PruningNothing), - pruneHeights: []int64{}, + db: db, + logger: logger, + opts: types.NewPruningOptions(types.PruningNothing), // These are the heights that are multiples of snapshotInterval and kept for state sync snapshots. // The heights are added to this list to be pruned when a snapshot is complete. + pruneHeights: []int64{}, pruneSnapshotHeights: list.New(), - mx: sync.Mutex{}, } } @@ -55,15 +62,25 @@ func (m *Manager) GetOptions() types.PruningOptions { return m.opts } -// GetPruningHeights returns all heights to be pruned during the next call to Prune(). -func (m *Manager) GetPruningHeights() []int64 { - return m.pruneHeights -} +// GetFlushAndResetPruningHeights returns all heights to be pruned during the next call to Prune(). +// It also flushes and resets the pruning heights. +func (m *Manager) GetFlushAndResetPruningHeights() ([]int64, error) { + if m.opts.GetPruningStrategy() == types.PruningNothing { + return []int64{}, nil + } + m.pruneHeightsMx.Lock() + defer m.pruneHeightsMx.Unlock() + + pruningHeights := m.pruneHeights + + // flush the updates to disk so that it is not lost if crash happens. + if err := m.db.SetSync(pruneHeightsKey, int64SliceToBytes(pruningHeights)); err != nil { + return nil, err + } + + m.pruneHeights = make([]int64, 0, m.opts.Interval) -// ResetPruningHeights resets the heights to be pruned. -func (m *Manager) ResetPruningHeights() { - // reuse previously allocated memory - m.pruneHeights = m.pruneHeights[:0] + return pruningHeights, nil } // HandleHeight determines if pruneHeight height needs to be kept for pruning at the right interval prescribed by @@ -76,9 +93,14 @@ func (m *Manager) HandleHeight(previousHeight int64) int64 { } defer func() { - // handle persisted snapshot heights - m.mx.Lock() - defer m.mx.Unlock() + m.pruneHeightsMx.Lock() + defer m.pruneHeightsMx.Unlock() + + m.pruneSnapshotHeightsMx.Lock() + defer m.pruneSnapshotHeightsMx.Unlock() + + // move persisted snapshot heights to pruneHeights which + // represent the heights to be pruned at the next pruning interval. var next *list.Element for e := m.pruneSnapshotHeights.Front(); e != nil; e = next { snHeight := e.Value.(int64) @@ -92,6 +114,11 @@ func (m *Manager) HandleHeight(previousHeight int64) int64 { next = e.Next() } } + + // flush the updates to disk so that they are not lost if crash happens. + if err := m.db.SetSync(pruneHeightsKey, int64SliceToBytes(m.pruneHeights)); err != nil { + panic(err) + } }() if int64(m.opts.KeepRecent) < previousHeight { @@ -102,21 +129,38 @@ func (m *Manager) HandleHeight(previousHeight int64) int64 { // - snapshotInterval % (height - KeepRecent) != 0 as that means the height is not // a 'snapshot' height. if m.snapshotInterval == 0 || pruneHeight%int64(m.snapshotInterval) != 0 { + m.pruneHeightsMx.Lock() + defer m.pruneHeightsMx.Unlock() + m.pruneHeights = append(m.pruneHeights, pruneHeight) + + // flush the updates to disk so that they are not lost if crash happens. + if err := m.db.SetSync(pruneHeightsKey, int64SliceToBytes(m.pruneHeights)); err != nil { + panic(err) + } return pruneHeight } } return 0 } +// HandleHeightSnapshot persists the snapshot height to be pruned at the next appropriate +// height defined by the pruning strategy. Flushes the update to disk and panics if the flush fails +// The input height must be greater than 0 and pruning strategy any but pruning nothing. +// If one of these conditions is not met, this function does nothing. func (m *Manager) HandleHeightSnapshot(height int64) { - if m.opts.GetPruningStrategy() == types.PruningNothing { + if m.opts.GetPruningStrategy() == types.PruningNothing || height <= 0 { return } - m.mx.Lock() - defer m.mx.Unlock() - m.logger.Debug("HandleHeightSnapshot", "height", height) // TODO: change log level to Debug + m.pruneSnapshotHeightsMx.Lock() + defer m.pruneSnapshotHeightsMx.Unlock() + m.logger.Debug("HandleHeightSnapshot", "height", height) m.pruneSnapshotHeights.PushBack(height) + + // flush the updates to disk so that they are not lost if crash happens. + if err := m.db.SetSync(pruneSnapshotHeightsKey, listToBytes(m.pruneSnapshotHeights)); err != nil { + panic(err) + } } // SetSnapshotInterval sets the interval at which the snapshots are taken. @@ -129,39 +173,43 @@ func (m *Manager) ShouldPruneAtHeight(height int64) bool { return m.opts.Interval > 0 && m.opts.GetPruningStrategy() != types.PruningNothing && height%int64(m.opts.Interval) == 0 } -// FlushPruningHeights flushes the pruning heights to the database for crash recovery. -func (m *Manager) FlushPruningHeights() { - if m.opts.GetPruningStrategy() == types.PruningNothing { - return - } - batch := m.db.NewBatch() - defer batch.Close() - m.flushPruningHeights(batch) - m.flushPruningSnapshotHeights(batch) - - if err := batch.WriteSync(); err != nil { - panic(fmt.Errorf("error on batch write %w", err)) - } -} - // LoadPruningHeights loads the pruning heights from the database as a crash recovery. func (m *Manager) LoadPruningHeights() error { if m.opts.GetPruningStrategy() == types.PruningNothing { return nil } - if err := m.loadPruningHeights(); err != nil { + loadedPruneHeights, err := loadPruningHeights(m.db) + if err != nil { return err } - return m.loadPruningSnapshotHeights() + + if len(loadedPruneHeights) > 0 { + m.pruneHeightsMx.Lock() + defer m.pruneHeightsMx.Unlock() + m.pruneHeights = loadedPruneHeights + } + + loadedPruneSnapshotHeights, err := loadPruningSnapshotHeights(m.db) + if err != nil { + return err + } + + if loadedPruneSnapshotHeights.Len() > 0 { + m.pruneSnapshotHeightsMx.Lock() + defer m.pruneSnapshotHeightsMx.Unlock() + m.pruneSnapshotHeights = loadedPruneSnapshotHeights + } + + return nil } -func (m *Manager) loadPruningHeights() error { - bz, err := m.db.Get(pruneHeightsKey) +func loadPruningHeights(db dbm.DB) ([]int64, error) { + bz, err := db.Get(pruneHeightsKey) if err != nil { - return fmt.Errorf("failed to get pruned heights: %w", err) + return nil, fmt.Errorf("failed to get pruned heights: %w", err) } if len(bz) == 0 { - return nil + return []int64{}, nil } prunedHeights := make([]int64, len(bz)/8) @@ -169,7 +217,7 @@ func (m *Manager) loadPruningHeights() error { for offset < len(bz) { h := int64(binary.BigEndian.Uint64(bz[offset : offset+8])) if h < 0 { - return fmt.Errorf(errNegativeHeightsFmt, h) + return []int64{}, fmt.Errorf(errNegativeHeightsFmt, h) } prunedHeights[i] = h @@ -177,28 +225,24 @@ func (m *Manager) loadPruningHeights() error { offset += 8 } - if len(prunedHeights) > 0 { - m.pruneHeights = prunedHeights - } - - return nil + return prunedHeights, nil } -func (m *Manager) loadPruningSnapshotHeights() error { - bz, err := m.db.Get(pruneSnapshotHeightsKey) +func loadPruningSnapshotHeights(db dbm.DB) (*list.List, error) { + bz, err := db.Get(pruneSnapshotHeightsKey) + pruneSnapshotHeights := list.New() if err != nil { - return fmt.Errorf("failed to get post-snapshot pruned heights: %w", err) + return nil, fmt.Errorf("failed to get post-snapshot pruned heights: %w", err) } if len(bz) == 0 { - return nil + return pruneSnapshotHeights, nil } - pruneSnapshotHeights := list.New() i, offset := 0, 0 for offset < len(bz) { h := int64(binary.BigEndian.Uint64(bz[offset : offset+8])) if h < 0 { - return fmt.Errorf(errNegativeHeightsFmt, h) + return pruneSnapshotHeights, fmt.Errorf(errNegativeHeightsFmt, h) } pruneSnapshotHeights.PushBack(h) @@ -206,25 +250,7 @@ func (m *Manager) loadPruningSnapshotHeights() error { offset += 8 } - m.mx.Lock() - defer m.mx.Unlock() - m.pruneSnapshotHeights = pruneSnapshotHeights - - return nil -} - -func (m *Manager) flushPruningHeights(batch dbm.Batch) { - if err := batch.Set([]byte(pruneHeightsKey), int64SliceToBytes(m.pruneHeights)); err != nil { - panic(err) - } -} - -func (m *Manager) flushPruningSnapshotHeights(batch dbm.Batch) { - m.mx.Lock() - defer m.mx.Unlock() - if err := batch.Set([]byte(pruneSnapshotHeightsKey), listToBytes(m.pruneSnapshotHeights)); err != nil { - panic(err) - } + return pruneSnapshotHeights, nil } func int64SliceToBytes(slice []int64) []byte { diff --git a/pruning/manager_test.go b/pruning/manager_test.go index 398c51f5cddf..e85dc6e8fa2a 100644 --- a/pruning/manager_test.go +++ b/pruning/manager_test.go @@ -4,9 +4,7 @@ import ( "container/list" "fmt" - "sync" "testing" - "time" "github.com/cosmos/cosmos-sdk/pruning" "github.com/cosmos/cosmos-sdk/pruning/types" @@ -17,10 +15,12 @@ import ( ) func TestNewManager(t *testing.T) { - manager := pruning.NewManager(log.NewNopLogger(), db.NewMemDB()) + manager := pruning.NewManager(db.NewMemDB(), log.NewNopLogger()) require.NotNil(t, manager) - require.NotNil(t, manager.GetPruningHeights()) + heights, err := manager.GetFlushAndResetPruningHeights() + require.NoError(t, err) + require.NotNil(t, heights) require.Equal(t, types.PruningNothing, manager.GetOptions().GetPruningStrategy()) } @@ -75,7 +75,7 @@ func TestStrategies(t *testing.T) { }, } - manager := pruning.NewManager(log.NewNopLogger(), db.NewMemDB()) + manager := pruning.NewManager(db.NewMemDB(), log.NewNopLogger()) require.NotNil(t, manager) @@ -112,7 +112,8 @@ func TestStrategies(t *testing.T) { handleHeightActual := manager.HandleHeight(curHeight) shouldPruneAtHeightActual := manager.ShouldPruneAtHeight(curHeight) - curPruningHeihts := manager.GetPruningHeights() + curPruningHeihts, err := manager.GetFlushAndResetPruningHeights() + require.Nil(t, err) curHeightStr := fmt.Sprintf("height: %d", curHeight) @@ -121,7 +122,9 @@ func TestStrategies(t *testing.T) { require.Equal(t, int64(0), handleHeightActual, curHeightStr) require.False(t, shouldPruneAtHeightActual, curHeightStr) - require.Equal(t, 0, len(manager.GetPruningHeights())) + heights, err := manager.GetFlushAndResetPruningHeights() + require.NoError(t, err) + require.Equal(t, 0, len(heights)) default: if curHeight > int64(curKeepRecent) && (tc.snapshotInterval != 0 && (curHeight-int64(curKeepRecent))%int64(tc.snapshotInterval) != 0 || tc.snapshotInterval == 0) { expectedHeight := curHeight - int64(curKeepRecent) @@ -131,12 +134,15 @@ func TestStrategies(t *testing.T) { } else { require.Equal(t, int64(0), handleHeightActual, curHeightStr) - require.Equal(t, 0, len(manager.GetPruningHeights())) + heights, err := manager.GetFlushAndResetPruningHeights() + require.NoError(t, err) + require.Equal(t, 0, len(heights)) } require.Equal(t, curHeight%int64(curInterval) == 0, shouldPruneAtHeightActual, curHeightStr) } - manager.ResetPruningHeights() - require.Equal(t, 0, len(manager.GetPruningHeights())) + heights, err := manager.GetFlushAndResetPruningHeights() + require.NoError(t, err) + require.Equal(t, 0, len(heights)) } }) } @@ -185,18 +191,154 @@ func TestHandleHeight_Inputs(t *testing.T) { for name, tc := range testcases { t.Run(name, func(t *testing.T) { - manager := pruning.NewManager(log.NewNopLogger(), db.NewMemDB()) + manager := pruning.NewManager(db.NewMemDB(), log.NewNopLogger()) require.NotNil(t, manager) manager.SetOptions(types.NewPruningOptions(tc.strategy)) handleHeightActual := manager.HandleHeight(tc.height) require.Equal(t, tc.expectedResult, handleHeightActual) - require.Equal(t, tc.expectedHeights, manager.GetPruningHeights()) + actualHeights, err := manager.GetFlushAndResetPruningHeights() + require.NoError(t, err) + require.Equal(t, len(tc.expectedHeights), len(actualHeights)) + require.Equal(t, tc.expectedHeights, actualHeights) + }) + } +} + +func TestHandleHeight_FlushLoadFromDisk(t *testing.T) { + testcases := map[string]struct { + previousHeight int64 + keepRecent uint64 + snapshotInterval uint64 + movedSnapshotHeights []int64 + expectedHandleHeightResult int64 + expectedLoadPruningHeightsResult error + expectedLoadedHeights []int64 + }{ + "simple flush occurs": { + previousHeight: 11, + keepRecent: 10, + snapshotInterval: 0, + movedSnapshotHeights: []int64{}, + expectedHandleHeightResult: 11 - 10, + expectedLoadPruningHeightsResult: nil, + expectedLoadedHeights: []int64{11 - 10}, + }, + "previous height <= keep recent - no update and no flush": { + previousHeight: 9, + keepRecent: 10, + snapshotInterval: 0, + movedSnapshotHeights: []int64{}, + expectedHandleHeightResult: 0, + expectedLoadPruningHeightsResult: nil, + expectedLoadedHeights: []int64{}, + }, + "previous height alligns with snapshot interval - no update and no flush": { + previousHeight: 12, + keepRecent: 10, + snapshotInterval: 2, + movedSnapshotHeights: []int64{}, + expectedHandleHeightResult: 0, + expectedLoadPruningHeightsResult: nil, + expectedLoadedHeights: []int64{}, + }, + "previous height does not align with snapshot interval - flush": { + previousHeight: 12, + keepRecent: 10, + snapshotInterval: 3, + movedSnapshotHeights: []int64{}, + expectedHandleHeightResult: 2, + expectedLoadPruningHeightsResult: nil, + expectedLoadedHeights: []int64{2}, + }, + "moved snapshot heights - flushed": { + previousHeight: 32, + keepRecent: 10, + snapshotInterval: 5, + movedSnapshotHeights: []int64{15, 20, 25}, + expectedHandleHeightResult: 22, + expectedLoadPruningHeightsResult: nil, + expectedLoadedHeights: []int64{15, 20, 22}, + }, + "previous height alligns with snapshot interval - no update but flush snapshot heights": { + previousHeight: 30, + keepRecent: 10, + snapshotInterval: 5, + movedSnapshotHeights: []int64{15, 20, 25}, + expectedHandleHeightResult: 0, + expectedLoadPruningHeightsResult: nil, + expectedLoadedHeights: []int64{15}, + }, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + // Setup + db := db.NewMemDB() + manager := pruning.NewManager(db, log.NewNopLogger()) + require.NotNil(t, manager) + + manager.SetSnapshotInterval(tc.snapshotInterval) + manager.SetOptions(types.NewCustomPruningOptions(uint64(tc.keepRecent), uint64(10))) + + for _, snapshotHeight := range tc.movedSnapshotHeights { + manager.HandleHeightSnapshot(snapshotHeight) + } + + // Test HandleHeight and flush + handleHeightActual := manager.HandleHeight(tc.previousHeight) + require.Equal(t, tc.expectedHandleHeightResult, handleHeightActual) + + loadedPruneHeights, err := pruning.LoadPruningHeights(db) + require.NoError(t, err) + require.Equal(t, len(loadedPruneHeights), len(loadedPruneHeights)) + + // Test load back + err = manager.LoadPruningHeights() + require.NoError(t, err) + + heights, err := manager.GetFlushAndResetPruningHeights() + require.NoError(t, err) + require.Equal(t, len(tc.expectedLoadedHeights), len(heights)) + require.ElementsMatch(t, tc.expectedLoadedHeights, heights) }) } } +func TestHandleHeightSnapshot_FlushLoadFromDisk(t *testing.T) { + loadedHeightsMirror := []int64{} + + // Setup + db := db.NewMemDB() + manager := pruning.NewManager(db, log.NewNopLogger()) + require.NotNil(t, manager) + + manager.SetOptions(types.NewPruningOptions(types.PruningEverything)) + + for snapshotHeight := int64(-1); snapshotHeight < 100; snapshotHeight++ { + // Test flush + manager.HandleHeightSnapshot(snapshotHeight) + + // Post test + if snapshotHeight > 0 { + loadedHeightsMirror = append(loadedHeightsMirror, snapshotHeight) + } + + loadedSnapshotHeights, err := pruning.LoadPruningSnapshotHeights(db) + require.NoError(t, err) + require.Equal(t, len(loadedHeightsMirror), loadedSnapshotHeights.Len()) + + // Test load back + err = manager.LoadPruningHeights() + require.NoError(t, err) + + loadedSnapshotHeights, err = pruning.LoadPruningSnapshotHeights(db) + require.NoError(t, err) + require.Equal(t, len(loadedHeightsMirror), loadedSnapshotHeights.Len()) + } +} + func TestFlushLoad(t *testing.T) { const ( totalHeights = 1000 @@ -207,7 +349,7 @@ func TestFlushLoad(t *testing.T) { var ( db = db.NewMemDB() - manager = pruning.NewManager(log.NewNopLogger(), db) + manager = pruning.NewManager(db, log.NewNopLogger()) curStrategy = types.NewCustomPruningOptions(pruningKeepRecent, pruningInterval) heightsToPruneMirror = make([]int64, 0) ) @@ -231,28 +373,35 @@ func TestFlushLoad(t *testing.T) { require.Equal(t, int64(0), handleHeightActual, curHeightStr) } - if manager.ShouldPruneAtHeight(curHeight) { - manager.ResetPruningHeights() - heightsToPruneMirror = make([]int64, 0) - } - - // N.B.: There is no reason behind the choice of 3. - if curHeight%3 == 0 { - require.Equal(t, heightsToPruneMirror, manager.GetPruningHeights(), curHeightStr) - manager.FlushPruningHeights() + if manager.ShouldPruneAtHeight(curHeight) && curHeight > int64(pruningKeepRecent) { + actualHeights, err := manager.GetFlushAndResetPruningHeights() + require.NoError(t, err) + require.Equal(t, len(heightsToPruneMirror), len(actualHeights)) + require.Equal(t, heightsToPruneMirror, actualHeights) - manager.ResetPruningHeights() - require.Equal(t, make([]int64, 0), manager.GetPruningHeights(), curHeightStr) + err = manager.LoadPruningHeights() + require.NoError(t, err) - err := manager.LoadPruningHeights() + actualHeights, err = manager.GetFlushAndResetPruningHeights() require.NoError(t, err) - require.Equal(t, heightsToPruneMirror, manager.GetPruningHeights(), curHeightStr) + require.Equal(t, len(heightsToPruneMirror), len(actualHeights)) + require.Equal(t, heightsToPruneMirror, actualHeights) + + heightsToPruneMirror = make([]int64, 0) } } } func TestLoadPruningHeights(t *testing.T) { - var err error + var ( + manager = pruning.NewManager(db.NewMemDB(), log.NewNopLogger()) + err error + ) + require.NotNil(t, manager) + + // must not be PruningNothing + manager.SetOptions(types.NewPruningOptions(types.PruningDefault)) + testcases := map[string]struct { flushedPruningHeights []int64 getFlushedPruningSnapshotHeights func() *list.List @@ -308,7 +457,7 @@ func TestLoadPruningHeights(t *testing.T) { require.NoError(t, err) } - manager := pruning.NewManager(log.NewNopLogger(), db) + manager := pruning.NewManager(db, log.NewNopLogger()) require.NotNil(t, manager) // must not be PruningNothing @@ -321,86 +470,10 @@ func TestLoadPruningHeights(t *testing.T) { } func TestLoadPruningHeights_PruneNothing(t *testing.T) { - var manager = pruning.NewManager(log.NewNopLogger(), db.NewMemDB()) + var manager = pruning.NewManager(db.NewMemDB(), log.NewNopLogger()) require.NotNil(t, manager) manager.SetOptions(types.NewPruningOptions(types.PruningNothing)) require.Nil(t, manager.LoadPruningHeights()) } - -func TestWithSnapshot(t *testing.T) { - manager := pruning.NewManager(log.NewNopLogger(), db.NewMemDB()) - require.NotNil(t, manager) - - curStrategy := types.NewCustomPruningOptions(10, 10) - - snapshotInterval := uint64(15) - manager.SetSnapshotInterval(snapshotInterval) - - manager.SetOptions(curStrategy) - require.Equal(t, curStrategy, manager.GetOptions()) - - keepRecent := curStrategy.KeepRecent - - heightsToPruneMirror := make([]int64, 0) - - mx := sync.Mutex{} - snapshotHeightsToPruneMirror := list.New() - - wg := sync.WaitGroup{} - defer wg.Wait() - - for curHeight := int64(1); curHeight < 100000; curHeight++ { - mx.Lock() - handleHeightActual := manager.HandleHeight(curHeight) - - curHeightStr := fmt.Sprintf("height: %d", curHeight) - - if curHeight > int64(keepRecent) && (curHeight-int64(keepRecent))%int64(snapshotInterval) != 0 { - expectedHandleHeight := curHeight - int64(keepRecent) - require.Equal(t, expectedHandleHeight, handleHeightActual, curHeightStr) - heightsToPruneMirror = append(heightsToPruneMirror, expectedHandleHeight) - } else { - require.Equal(t, int64(0), handleHeightActual, curHeightStr) - } - - actualHeightsToPrune := manager.GetPruningHeights() - - var next *list.Element - for e := snapshotHeightsToPruneMirror.Front(); e != nil; e = next { - snapshotHeight := e.Value.(int64) - if snapshotHeight < curHeight-int64(keepRecent) { - heightsToPruneMirror = append(heightsToPruneMirror, snapshotHeight) - - // We must get next before removing to be able to continue iterating. - next = e.Next() - snapshotHeightsToPruneMirror.Remove(e) - } else { - next = e.Next() - } - } - - require.Equal(t, heightsToPruneMirror, actualHeightsToPrune, curHeightStr) - mx.Unlock() - - if manager.ShouldPruneAtHeight(curHeight) { - manager.ResetPruningHeights() - heightsToPruneMirror = make([]int64, 0) - } - - // Mimic taking snapshots in the background - if curHeight%int64(snapshotInterval) == 0 { - wg.Add(1) - go func(curHeightCp int64) { - time.Sleep(time.Nanosecond * 500) - - mx.Lock() - manager.HandleHeightSnapshot(curHeightCp) - snapshotHeightsToPruneMirror.PushBack(curHeightCp) - mx.Unlock() - wg.Done() - }(curHeight) - } - } -} diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index c90be7ed6737..98ce593c5854 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -60,7 +60,7 @@ type Store struct { db dbm.DB logger log.Logger lastCommitInfo *types.CommitInfo - mx *sync.RWMutex // mutex to sync access to lastCommitInfo + mx sync.RWMutex // mutex to sync access to lastCommitInfo pruningManager *pruning.Manager iavlCacheSize int storesParams map[types.StoreKey]storeParams @@ -95,8 +95,7 @@ func NewStore(db dbm.DB, logger log.Logger) *Store { stores: make(map[types.StoreKey]types.CommitKVStore), keysByName: make(map[string]types.StoreKey), listeners: make(map[types.StoreKey][]types.WriteListener), - pruningManager: pruning.NewManager(logger, db), - mx: &sync.RWMutex{}, + pruningManager: pruning.NewManager(db, logger), } } @@ -530,22 +529,28 @@ func (rs *Store) GetKVStore(key types.StoreKey) types.KVStore { } func (rs *Store) handlePruning(version int64) error { - defer rs.pruningManager.FlushPruningHeights() - rs.pruningManager.HandleHeight(version - 1) // we should never prune the current version. if !rs.pruningManager.ShouldPruneAtHeight(version) { return nil } - rs.logger.Info("prune start", "height", version) + defer rs.logger.Info("prune end", "height", version) + return rs.pruneStores() +} - pruningHeights := rs.pruningManager.GetPruningHeights() - rs.logger.Debug(fmt.Sprintf("pruning the following heights: %v\n", pruningHeights)) +func (rs *Store) pruneStores() error { + pruningHeights, err := rs.pruningManager.GetFlushAndResetPruningHeights() + if err != nil { + return err + } if len(pruningHeights) == 0 { + rs.logger.Debug("pruning skipped; no heights to prune") return nil } + rs.logger.Debug("pruning heights", "heights", pruningHeights) + for key, store := range rs.stores { // If the store is wrapped with an inter-block cache, we must first unwrap // it to get the underlying IAVL store. @@ -564,8 +569,6 @@ func (rs *Store) handlePruning(version int64) error { return err } } - rs.pruningManager.ResetPruningHeights() - rs.logger.Info("prune end", "height", version) return nil } @@ -897,7 +900,6 @@ func (rs *Store) Restore( } rs.flushLastCommitInfo(rs.buildCommitInfo(int64(height))) - rs.pruningManager.FlushPruningHeights() return rs.LoadLatestVersion() } diff --git a/store/rootmulti/store_test.go b/store/rootmulti/store_test.go index ced7f3cd619a..a39a14c8e6e0 100644 --- a/store/rootmulti/store_test.go +++ b/store/rootmulti/store_test.go @@ -536,6 +536,60 @@ func TestMultiStore_Pruning(t *testing.T) { } } +func TestMultiStore_Pruning_SameHeightsTwice(t *testing.T) { + const ( + numVersions int64 = 10 + keepRecent uint64 = 2 + interval uint64 = 10 + ) + + expectedHeights := []int64{} + for i := int64(1); i < numVersions-int64(keepRecent); i++ { + expectedHeights = append(expectedHeights, i) + } + + db := dbm.NewMemDB() + + ms := newMultiStoreWithMounts(db, pruningtypes.NewCustomPruningOptions(keepRecent, interval)) + require.NoError(t, ms.LoadLatestVersion()) + + var lastCommitInfo types.CommitID + for i := int64(0); i < numVersions; i++ { + lastCommitInfo = ms.Commit() + } + + require.Equal(t, numVersions, lastCommitInfo.Version) + + for v := int64(1); v < numVersions-int64(keepRecent); v++ { + err := ms.LoadVersion(v) + require.Error(t, err, "expected error when loading pruned height: %d", v) + } + + for v := int64(numVersions - int64(keepRecent)); v < numVersions; v++ { + err := ms.LoadVersion(v) + require.NoError(t, err, "expected no error when loading height: %d", v) + } + + // Get latest + err := ms.LoadVersion(numVersions - 1) + require.NoError(t, err) + + // Ensure already pruned heights were loaded + heights, err := ms.pruningManager.GetFlushAndResetPruningHeights() + require.NoError(t, err) + require.Equal(t, expectedHeights, heights) + + require.NoError(t, ms.pruningManager.LoadPruningHeights()) + + // Test pruning the same heights again + lastCommitInfo = ms.Commit() + require.Equal(t, numVersions, lastCommitInfo.Version) + + // Ensure that can commit one more height with no panic + lastCommitInfo = ms.Commit() + require.Equal(t, numVersions+1, lastCommitInfo.Version) +} + func TestMultiStore_PruningRestart(t *testing.T) { db := dbm.NewMemDB() ms := newMultiStoreWithMounts(db, pruningtypes.NewCustomPruningOptions(2, 11)) @@ -553,18 +607,28 @@ func TestMultiStore_PruningRestart(t *testing.T) { // ensure we've persisted the current batch of heights to prune to the store's DB err := ms.pruningManager.LoadPruningHeights() require.NoError(t, err) - require.Equal(t, pruneHeights, ms.pruningManager.GetPruningHeights()) + + actualHeightsToPrune, err := ms.pruningManager.GetFlushAndResetPruningHeights() + require.NoError(t, err) + require.Equal(t, len(pruneHeights), len(actualHeightsToPrune)) + require.Equal(t, pruneHeights, actualHeightsToPrune) // "restart" ms = newMultiStoreWithMounts(db, pruningtypes.NewCustomPruningOptions(2, 11)) ms.SetSnapshotInterval(3) err = ms.LoadLatestVersion() require.NoError(t, err) - require.Equal(t, pruneHeights, ms.pruningManager.GetPruningHeights()) + + actualHeightsToPrune, err = ms.pruningManager.GetFlushAndResetPruningHeights() + require.NoError(t, err) + require.Equal(t, pruneHeights, actualHeightsToPrune) // commit one more block and ensure the heights have been pruned ms.Commit() - require.Empty(t, ms.pruningManager.GetPruningHeights()) + + actualHeightsToPrune, err = ms.pruningManager.GetFlushAndResetPruningHeights() + require.NoError(t, err) + require.Empty(t, actualHeightsToPrune) for _, v := range pruneHeights { _, err := ms.CacheMultiStoreWithVersion(v)