Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep epoch contents in permanent storage in epochstorage plugin #2328

Merged
merged 3 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/database/prefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ const (

// PrefixNotarization defines the storage prefix for the epochs package.
PrefixNotarization

// PrefixEpochsStorage defines the storage prefix for the epoch storage plugin.
PrefixEpochsStorage
)
180 changes: 70 additions & 110 deletions plugins/epochstorage/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ var (
Plugin *node.Plugin
deps = new(dependencies)

epochContentsMutex sync.RWMutex
epochContents = make(map[epoch.Index]*epochContentStorages, 0)
baseStore kvstore.KVStore

committableEpochsMutex sync.RWMutex
committableEpochs = make([]*epoch.ECRecord, 0)
epochVotersWeightMutex sync.RWMutex
Expand All @@ -42,20 +42,12 @@ var (

maxEpochContentsToKeep = 100
numEpochContentsToRemove = 20
minEpochIndex = epoch.Index(0)

epochOrderMutex sync.RWMutex
epochOrderMap = make(map[epoch.Index]types.Empty, 0)
epochOrder = make([]epoch.Index, 0)
)

type epochContentStorages struct {
spentOutputs kvstore.KVStore
createdOutputs kvstore.KVStore
blockIDs kvstore.KVStore
transactionIDs kvstore.KVStore
}

type latestVote struct {
ei epoch.Index
ecr epoch.ECR
Expand All @@ -66,6 +58,7 @@ type dependencies struct {

Tangle *tangle.Tangle
NotarizationMgr *notarization.Manager
Storage kvstore.KVStore
}

func init() {
Expand All @@ -81,18 +74,16 @@ func init() {
}

func configure(plugin *node.Plugin) {
// get the last committed epoch EI as minEpochIndex
snapshotEC, _ := deps.NotarizationMgr.GetLatestEC()
minEpochIndex = snapshotEC.EI()
baseStore = deps.Storage

deps.NotarizationMgr.Events.TangleTreeInserted.Attach(event.NewClosure(func(event *notarization.TangleTreeUpdatedEvent) {
epochOrderMutex.Lock()
if _, ok := epochOrderMap[event.EI]; !ok {
epochOrderMap[event.EI] = types.Void
epochOrder = append(epochOrder, event.EI)
checkEpochContentLimit()
}
epochOrderMutex.Unlock()
checkEpochContentLimit()

err := insertblockToEpoch(event.EI, event.BlockID)
if err != nil {
Expand Down Expand Up @@ -150,9 +141,7 @@ func run(*node.Plugin) {
}

func checkEpochContentLimit() {
epochOrderMutex.Lock()
if len(epochOrder) <= maxEpochContentsToKeep {
epochOrderMutex.Unlock()
return
}

Expand All @@ -169,23 +158,13 @@ func checkEpochContentLimit() {
for _, i := range epochToRemove {
delete(epochOrderMap, i)
}
epochOrderMutex.Unlock()

epochContentsMutex.Lock()
for _, i := range epochToRemove {
delete(epochContents, i)
}
epochContentsMutex.Unlock()

epochVotersWeightMutex.Lock()
for _, i := range epochToRemove {
delete(epochVotersWeight, i)
}
epochVotersWeightMutex.Unlock()

// update minEpochIndex
minEpochIndex = epochOrder[0]

committableEpochsMutex.Lock()
if len(committableEpochs) < maxEpochContentsToKeep {
committableEpochsMutex.Unlock()
Expand All @@ -211,33 +190,24 @@ func GetPendingConflictCount() map[epoch.Index]uint64 {
return deps.NotarizationMgr.PendingConflictsCountAll()
}

func GetEpochblocks(ei epoch.Index) ([]tangle.BlockID, error) {
stores, err := getEpochContentStorage(ei)
if err != nil {
return []tangle.BlockID{}, err
}
func GetEpochblocks(ei epoch.Index) (blockIDs []tangle.BlockID) {
prefix := append([]byte{database.PrefixEpochsStorage, prefixBlockIDs}, ei.Bytes()...)

var blkIDs []tangle.BlockID
stores.blockIDs.IterateKeys(kvstore.EmptyPrefix, func(key kvstore.Key) bool {
var blkID tangle.BlockID
if _, err := blkID.Decode(key); err != nil {
baseStore.IterateKeys(prefix, func(key kvstore.Key) bool {
var blockID tangle.BlockID
if _, err := blockID.Decode(key); err != nil {
panic("BlockID could not be parsed!")
}
blkIDs = append(blkIDs, blkID)
blockIDs = append(blockIDs, blockID)
return true
})

return blkIDs, nil
return
}

func GetEpochTransactions(ei epoch.Index) ([]utxo.TransactionID, error) {
stores, err := getEpochContentStorage(ei)
if err != nil {
return []utxo.TransactionID{}, err
}
func GetEpochTransactions(ei epoch.Index) (txIDs []utxo.TransactionID) {
prefix := append([]byte{database.PrefixEpochsStorage, prefixTransactionIDs}, ei.Bytes()...)

var txIDs []utxo.TransactionID
stores.transactionIDs.IterateKeys(kvstore.EmptyPrefix, func(key kvstore.Key) bool {
baseStore.IterateKeys(prefix, func(key kvstore.Key) bool {
var txID utxo.TransactionID
if _, err := txID.Decode(key); err != nil {
panic("TransactionID could not be parsed!")
Expand All @@ -246,36 +216,33 @@ func GetEpochTransactions(ei epoch.Index) ([]utxo.TransactionID, error) {
return true
})

return txIDs, nil
return
}

func GetEpochUTXOs(ei epoch.Index) (spent, created []utxo.OutputID, err error) {
stores, err := getEpochContentStorage(ei)
if err != nil {
return []utxo.OutputID{}, []utxo.OutputID{}, err
}
func GetEpochUTXOs(ei epoch.Index) (spent, created []utxo.OutputID) {
createdPrefix := append([]byte{database.PrefixEpochsStorage, prefixCreatedOutput}, ei.Bytes()...)
spentPrefix := append([]byte{database.PrefixEpochsStorage, prefixSpentOutput}, ei.Bytes()...)

stores.createdOutputs.IterateKeys(kvstore.EmptyPrefix, func(key kvstore.Key) bool {
baseStore.IterateKeys(spentPrefix, func(key kvstore.Key) bool {
var outputID utxo.OutputID
if err = outputID.FromBytes(key); err != nil {
if err := outputID.FromBytes(key); err != nil {
panic(err)
}
created = append(created, outputID)
spent = append(spent, outputID)

return true
})

stores.spentOutputs.IterateKeys(kvstore.EmptyPrefix, func(key kvstore.Key) bool {
baseStore.IterateKeys(createdPrefix, func(key kvstore.Key) bool {
var outputID utxo.OutputID
if err = outputID.FromBytes(key); err != nil {
if err := outputID.FromBytes(key); err != nil {
panic(err)
}
spent = append(spent, outputID)
created = append(created, outputID)

return true
})

return spent, created, nil
return
}

func GetEpochVotersWeight(ei epoch.Index) (weights map[epoch.ECR]map[identity.ID]float64) {
Expand All @@ -296,99 +263,73 @@ func GetEpochVotersWeight(ei epoch.Index) (weights map[epoch.ECR]map[identity.ID
return weights
}

func getEpochContentStorage(ei epoch.Index) (*epochContentStorages, error) {
if ei < minEpochIndex {
return nil, errors.New("Epoch storage no longer exists")
}

epochContentsMutex.RLock()
stores, ok := epochContents[ei]
epochContentsMutex.RUnlock()

if !ok {
stores = newEpochContentStorage()
epochContentsMutex.Lock()
epochContents[ei] = stores
epochContentsMutex.Unlock()
}

return stores, nil
}

func newEpochContentStorage() *epochContentStorages {
db, _ := database.NewMemDB()

// keep data in temporary storage
return &epochContentStorages{
spentOutputs: db.NewStore(),
createdOutputs: db.NewStore(),
transactionIDs: db.NewStore(),
blockIDs: db.NewStore(),
}
}

func insertblockToEpoch(ei epoch.Index, blkID tangle.BlockID) error {
epochContentStorage, err := getEpochContentStorage(ei)
blockStore, err := baseStore.WithRealm(append([]byte{database.PrefixEpochsStorage, prefixBlockIDs}, ei.Bytes()...))
if err != nil {
return err
panic(err)
}

if err := epochContentStorage.blockIDs.Set(blkID.Bytes(), blkID.Bytes()); err != nil {
if err := blockStore.Set(blkID.Bytes(), blkID.Bytes()); err != nil {
return errors.New("Fail to insert block to epoch store")
}
return nil
}

func removeblockFromEpoch(ei epoch.Index, blkID tangle.BlockID) error {
epochContentStorage, err := getEpochContentStorage(ei)
blockStore, err := baseStore.WithRealm(append([]byte{database.PrefixEpochsStorage, prefixBlockIDs}, ei.Bytes()...))
if err != nil {
return err
panic(err)
}

if err := epochContentStorage.blockIDs.Delete(blkID.Bytes()); err != nil {
if err := blockStore.Delete(blkID.Bytes()); err != nil {
return errors.New("Fail to remove block from epoch store")
}
return nil
}

func insertTransactionToEpoch(ei epoch.Index, txID utxo.TransactionID) error {
epochContentStorage, err := getEpochContentStorage(ei)
txStore, err := baseStore.WithRealm(append([]byte{database.PrefixEpochsStorage, prefixTransactionIDs}, ei.Bytes()...))
if err != nil {
return err
panic(err)
}

if err := epochContentStorage.transactionIDs.Set(txID.Bytes(), txID.Bytes()); err != nil {
if err := txStore.Set(txID.Bytes(), txID.Bytes()); err != nil {
return errors.New("Fail to insert Transaction to epoch store")
}
return nil
}

func removeTransactionFromEpoch(ei epoch.Index, txID utxo.TransactionID) error {
epochContentStorage, err := getEpochContentStorage(ei)
txStore, err := baseStore.WithRealm(append([]byte{database.PrefixEpochsStorage, prefixTransactionIDs}, ei.Bytes()...))
if err != nil {
return err
panic(err)
}

if err := epochContentStorage.transactionIDs.Delete(txID.Bytes()); err != nil {
if err := txStore.Delete(txID.Bytes()); err != nil {
return errors.New("Fail to remove Transaction from epoch store")
}
return nil
}

func insertOutputsToEpoch(ei epoch.Index, spent, created []*ledger.OutputWithMetadata) error {
epochContentStorage, err := getEpochContentStorage(ei)
createdStore, err := baseStore.WithRealm(append([]byte{database.PrefixEpochsStorage, prefixCreatedOutput}, ei.Bytes()...))
if err != nil {
return err
panic(err)
}

spentStore, err := baseStore.WithRealm(append([]byte{database.PrefixEpochsStorage, prefixSpentOutput}, ei.Bytes()...))
if err != nil {
panic(err)
}

for _, s := range spent {
if err := epochContentStorage.spentOutputs.Set(s.ID().Bytes(), s.ID().Bytes()); err != nil {
if err := spentStore.Set(s.ID().Bytes(), s.ID().Bytes()); err != nil {
return errors.New("Fail to insert spent output to epoch store")
}
}

for _, c := range created {
if err := epochContentStorage.createdOutputs.Set(c.ID().Bytes(), c.ID().Bytes()); err != nil {
if err := createdStore.Set(c.ID().Bytes(), c.ID().Bytes()); err != nil {
return errors.New("Fail to insert created output to epoch store")
}
}
Expand All @@ -397,19 +338,24 @@ func insertOutputsToEpoch(ei epoch.Index, spent, created []*ledger.OutputWithMet
}

func removeOutputsFromEpoch(ei epoch.Index, spent, created []*ledger.OutputWithMetadata) error {
epochContentStorage, err := getEpochContentStorage(ei)
createdStore, err := baseStore.WithRealm(append([]byte{database.PrefixEpochsStorage, prefixCreatedOutput}, ei.Bytes()...))
if err != nil {
panic(err)
}

spentStore, err := baseStore.WithRealm(append([]byte{database.PrefixEpochsStorage, prefixSpentOutput}, ei.Bytes()...))
if err != nil {
return err
panic(err)
}

for _, s := range spent {
if err := epochContentStorage.spentOutputs.Delete(s.ID().Bytes()); err != nil {
if err := spentStore.Delete(s.ID().Bytes()); err != nil {
return errors.New("Fail to remove spent output from epoch store")
}
}

for _, c := range created {
if err := epochContentStorage.createdOutputs.Delete(c.ID().Bytes()); err != nil {
if err := createdStore.Delete(c.ID().Bytes()); err != nil {
return errors.New("Fail to remove created output from epoch store")
}
}
Expand Down Expand Up @@ -441,3 +387,17 @@ func saveEpochVotersWeight(block *tangle.Block) {
epochVotersLatestVote[voter] = &latestVote{ei: epochIndex, ecr: ecr, issuedTime: block.M.IssuingTime}
epochVotersWeight[epochIndex][ecr][voter] = activeWeights[voter]
}

// region db prefixes //////////////////////////////////////////////////////////////////////////////////////////////////

const (
prefixSpentOutput byte = iota

prefixCreatedOutput

prefixBlockIDs

prefixTransactionIDs
)

// region Options //////////////////////////////////////////////////////////////////////////////////////////////////////
Loading