Skip to content

Commit

Permalink
refactor: syncronize pruning manager (#187)
Browse files Browse the repository at this point in the history
* progress

* refactor pruning manager to have no data races; flush heights immediately when updated in memory; unit tests

* typo in TestMultiStore_PruningRestart

* fmt

* improve comments

* avoid mutex init, move comments to struct declaration, return nil with error, fix logs
  • Loading branch information
p0mvn committed Apr 20, 2022
1 parent d253b2d commit cf64bc6
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 200 deletions.
6 changes: 4 additions & 2 deletions pruning/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ var (
PruneSnapshotHeightsKey = pruneSnapshotHeightsKey

// functions
Int64SliceToBytes = int64SliceToBytes
ListToBytes = listToBytes
Int64SliceToBytes = int64SliceToBytes
ListToBytes = listToBytes
LoadPruningHeights = loadPruningHeights
LoadPruningSnapshotHeights = loadPruningSnapshotHeights
)
184 changes: 105 additions & 79 deletions pruning/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,21 @@ import (
)

type Manager struct {
logger log.Logger
db dbm.DB
opts types.PruningOptions
snapshotInterval uint64
pruneHeights []int64
db dbm.DB
logger log.Logger
opts types.PruningOptions
snapshotInterval uint64
pruneHeights []int64
// Although pruneHeights happen in the same goroutine with the normal execution,
// we sync access to them to avoid soundness issues in the future if concurrency pattern changes.
pruneHeightsMx sync.Mutex
// These are the heights that are multiples of snapshotInterval and kept for state sync snapshots.
// The heights are added to this list to be pruned when a snapshot is complete.
pruneSnapshotHeights *list.List
mx sync.Mutex
// Snapshots are taken in a separate goroutine fromt the regular execution
// and can be delivered asynchrounously via HandleHeightSnapshot.
// Therefore, we sync access to pruneSnapshotHeights with this mutex.
pruneSnapshotHeightsMx sync.Mutex
}

const (
Expand All @@ -32,16 +40,15 @@ var (
pruneSnapshotHeightsKey = []byte("s/pruneSnheights")
)

func NewManager(logger log.Logger, db dbm.DB) *Manager {
func NewManager(db dbm.DB, logger log.Logger) *Manager {
return &Manager{
logger: logger,
db: db,
opts: types.NewPruningOptions(types.PruningNothing),
pruneHeights: []int64{},
db: db,
logger: logger,
opts: types.NewPruningOptions(types.PruningNothing),
// These are the heights that are multiples of snapshotInterval and kept for state sync snapshots.
// The heights are added to this list to be pruned when a snapshot is complete.
pruneHeights: []int64{},
pruneSnapshotHeights: list.New(),
mx: sync.Mutex{},
}
}

Expand All @@ -55,15 +62,25 @@ func (m *Manager) GetOptions() types.PruningOptions {
return m.opts
}

// GetPruningHeights returns all heights to be pruned during the next call to Prune().
func (m *Manager) GetPruningHeights() []int64 {
return m.pruneHeights
}
// GetFlushAndResetPruningHeights returns all heights to be pruned during the next call to Prune().
// It also flushes and resets the pruning heights.
func (m *Manager) GetFlushAndResetPruningHeights() ([]int64, error) {
if m.opts.GetPruningStrategy() == types.PruningNothing {
return []int64{}, nil
}
m.pruneHeightsMx.Lock()
defer m.pruneHeightsMx.Unlock()

pruningHeights := m.pruneHeights

// flush the updates to disk so that it is not lost if crash happens.
if err := m.db.SetSync(pruneHeightsKey, int64SliceToBytes(pruningHeights)); err != nil {
return nil, err
}

m.pruneHeights = make([]int64, 0, m.opts.Interval)

// ResetPruningHeights resets the heights to be pruned.
func (m *Manager) ResetPruningHeights() {
// reuse previously allocated memory
m.pruneHeights = m.pruneHeights[:0]
return pruningHeights, nil
}

// HandleHeight determines if pruneHeight height needs to be kept for pruning at the right interval prescribed by
Expand All @@ -76,9 +93,14 @@ func (m *Manager) HandleHeight(previousHeight int64) int64 {
}

defer func() {
// handle persisted snapshot heights
m.mx.Lock()
defer m.mx.Unlock()
m.pruneHeightsMx.Lock()
defer m.pruneHeightsMx.Unlock()

m.pruneSnapshotHeightsMx.Lock()
defer m.pruneSnapshotHeightsMx.Unlock()

// move persisted snapshot heights to pruneHeights which
// represent the heights to be pruned at the next pruning interval.
var next *list.Element
for e := m.pruneSnapshotHeights.Front(); e != nil; e = next {
snHeight := e.Value.(int64)
Expand All @@ -92,6 +114,11 @@ func (m *Manager) HandleHeight(previousHeight int64) int64 {
next = e.Next()
}
}

// flush the updates to disk so that they are not lost if crash happens.
if err := m.db.SetSync(pruneHeightsKey, int64SliceToBytes(m.pruneHeights)); err != nil {
panic(err)
}
}()

if int64(m.opts.KeepRecent) < previousHeight {
Expand All @@ -102,21 +129,38 @@ func (m *Manager) HandleHeight(previousHeight int64) int64 {
// - snapshotInterval % (height - KeepRecent) != 0 as that means the height is not
// a 'snapshot' height.
if m.snapshotInterval == 0 || pruneHeight%int64(m.snapshotInterval) != 0 {
m.pruneHeightsMx.Lock()
defer m.pruneHeightsMx.Unlock()

m.pruneHeights = append(m.pruneHeights, pruneHeight)

// flush the updates to disk so that they are not lost if crash happens.
if err := m.db.SetSync(pruneHeightsKey, int64SliceToBytes(m.pruneHeights)); err != nil {
panic(err)
}
return pruneHeight
}
}
return 0
}

// HandleHeightSnapshot persists the snapshot height to be pruned at the next appropriate
// height defined by the pruning strategy. Flushes the update to disk and panics if the flush fails
// The input height must be greater than 0 and pruning strategy any but pruning nothing.
// If one of these conditions is not met, this function does nothing.
func (m *Manager) HandleHeightSnapshot(height int64) {
if m.opts.GetPruningStrategy() == types.PruningNothing {
if m.opts.GetPruningStrategy() == types.PruningNothing || height <= 0 {
return
}
m.mx.Lock()
defer m.mx.Unlock()
m.logger.Debug("HandleHeightSnapshot", "height", height) // TODO: change log level to Debug
m.pruneSnapshotHeightsMx.Lock()
defer m.pruneSnapshotHeightsMx.Unlock()
m.logger.Debug("HandleHeightSnapshot", "height", height)
m.pruneSnapshotHeights.PushBack(height)

// flush the updates to disk so that they are not lost if crash happens.
if err := m.db.SetSync(pruneSnapshotHeightsKey, listToBytes(m.pruneSnapshotHeights)); err != nil {
panic(err)
}
}

// SetSnapshotInterval sets the interval at which the snapshots are taken.
Expand All @@ -129,102 +173,84 @@ func (m *Manager) ShouldPruneAtHeight(height int64) bool {
return m.opts.Interval > 0 && m.opts.GetPruningStrategy() != types.PruningNothing && height%int64(m.opts.Interval) == 0
}

// FlushPruningHeights flushes the pruning heights to the database for crash recovery.
func (m *Manager) FlushPruningHeights() {
if m.opts.GetPruningStrategy() == types.PruningNothing {
return
}
batch := m.db.NewBatch()
defer batch.Close()
m.flushPruningHeights(batch)
m.flushPruningSnapshotHeights(batch)

if err := batch.WriteSync(); err != nil {
panic(fmt.Errorf("error on batch write %w", err))
}
}

// LoadPruningHeights loads the pruning heights from the database as a crash recovery.
func (m *Manager) LoadPruningHeights() error {
if m.opts.GetPruningStrategy() == types.PruningNothing {
return nil
}
if err := m.loadPruningHeights(); err != nil {
loadedPruneHeights, err := loadPruningHeights(m.db)
if err != nil {
return err
}
return m.loadPruningSnapshotHeights()

if len(loadedPruneHeights) > 0 {
m.pruneHeightsMx.Lock()
defer m.pruneHeightsMx.Unlock()
m.pruneHeights = loadedPruneHeights
}

loadedPruneSnapshotHeights, err := loadPruningSnapshotHeights(m.db)
if err != nil {
return err
}

if loadedPruneSnapshotHeights.Len() > 0 {
m.pruneSnapshotHeightsMx.Lock()
defer m.pruneSnapshotHeightsMx.Unlock()
m.pruneSnapshotHeights = loadedPruneSnapshotHeights
}

return nil
}

func (m *Manager) loadPruningHeights() error {
bz, err := m.db.Get(pruneHeightsKey)
func loadPruningHeights(db dbm.DB) ([]int64, error) {
bz, err := db.Get(pruneHeightsKey)
if err != nil {
return fmt.Errorf("failed to get pruned heights: %w", err)
return nil, fmt.Errorf("failed to get pruned heights: %w", err)
}
if len(bz) == 0 {
return nil
return []int64{}, nil
}

prunedHeights := make([]int64, len(bz)/8)
i, offset := 0, 0
for offset < len(bz) {
h := int64(binary.BigEndian.Uint64(bz[offset : offset+8]))
if h < 0 {
return fmt.Errorf(errNegativeHeightsFmt, h)
return []int64{}, fmt.Errorf(errNegativeHeightsFmt, h)
}

prunedHeights[i] = h
i++
offset += 8
}

if len(prunedHeights) > 0 {
m.pruneHeights = prunedHeights
}

return nil
return prunedHeights, nil
}

func (m *Manager) loadPruningSnapshotHeights() error {
bz, err := m.db.Get(pruneSnapshotHeightsKey)
func loadPruningSnapshotHeights(db dbm.DB) (*list.List, error) {
bz, err := db.Get(pruneSnapshotHeightsKey)
pruneSnapshotHeights := list.New()
if err != nil {
return fmt.Errorf("failed to get post-snapshot pruned heights: %w", err)
return nil, fmt.Errorf("failed to get post-snapshot pruned heights: %w", err)
}
if len(bz) == 0 {
return nil
return pruneSnapshotHeights, nil
}

pruneSnapshotHeights := list.New()
i, offset := 0, 0
for offset < len(bz) {
h := int64(binary.BigEndian.Uint64(bz[offset : offset+8]))
if h < 0 {
return fmt.Errorf(errNegativeHeightsFmt, h)
return pruneSnapshotHeights, fmt.Errorf(errNegativeHeightsFmt, h)
}

pruneSnapshotHeights.PushBack(h)
i++
offset += 8
}

m.mx.Lock()
defer m.mx.Unlock()
m.pruneSnapshotHeights = pruneSnapshotHeights

return nil
}

func (m *Manager) flushPruningHeights(batch dbm.Batch) {
if err := batch.Set([]byte(pruneHeightsKey), int64SliceToBytes(m.pruneHeights)); err != nil {
panic(err)
}
}

func (m *Manager) flushPruningSnapshotHeights(batch dbm.Batch) {
m.mx.Lock()
defer m.mx.Unlock()
if err := batch.Set([]byte(pruneSnapshotHeightsKey), listToBytes(m.pruneSnapshotHeights)); err != nil {
panic(err)
}
return pruneSnapshotHeights, nil
}

func int64SliceToBytes(slice []int64) []byte {
Expand Down
Loading

0 comments on commit cf64bc6

Please sign in to comment.