Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mana vector fixes #2396

Merged
merged 25 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
75d88ce
Use mana delay param only from mana plugin
daria305 Aug 22, 2022
ac3d5c1
Update unit tests
daria305 Aug 22, 2022
40b1323
Prune activity on epoch confirmed
daria305 Aug 22, 2022
355017a
Add missing expected calls
daria305 Aug 24, 2022
aed4483
Merge branch 'develop' into feat/mana-vector-update-fixes
daria305 Aug 24, 2022
0e856c3
Update mana vector update event after merging
daria305 Aug 24, 2022
34941a3
Tiny renames and comment
karimodm Aug 25, 2022
5cedc23
Update DiffUTXO test
daria305 Aug 25, 2022
69a3b21
Merge remote-tracking branch 'origin/feat/mana-vector-update-fixes' i…
daria305 Aug 25, 2022
6840417
Make SetEpochs private
daria305 Aug 25, 2022
7f4eadd
Change NodesActivityLog to shrinking map. Serializing not works
daria305 Aug 25, 2022
ab6022c
Fix stackoverflow
daria305 Aug 29, 2022
ae4c11a
Fix serialization
daria305 Aug 29, 2022
8397d22
Unexport field
daria305 Aug 29, 2022
08aa2da
Fix comment and forEach
daria305 Aug 29, 2022
8528b52
Fix channel close race condition
karimodm Aug 29, 2022
c3d429b
Return lowest processed epoch
karimodm Aug 29, 2022
363182b
deadlock fix
karimodm Aug 29, 2022
84d6156
Warpsync more channel fixes
karimodm Aug 30, 2022
62a49a0
Fix snapshot mutable constructor
daria305 Aug 30, 2022
b9d87ff
Use mutable model for Activity log to hide setEpochs
daria305 Aug 30, 2022
d48b9f2
Fix: epochstorage deadlock
karimodm Aug 30, 2022
68246d7
Warpsync: properly check for overlapping ranges
karimodm Aug 30, 2022
e577168
Make Inbound peer resource allocation synchronous
karimodm Aug 30, 2022
e92df0d
Merge remote-tracking branch 'origin/fix/warpsync-bugs' into feat/man…
daria305 Aug 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions packages/core/epoch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/cockroachdb/errors"
"github.com/iotaledger/hive.go/core/generics/set"
"github.com/iotaledger/hive.go/core/generics/shrinkingmap"
"github.com/iotaledger/hive.go/core/identity"
"strings"
"time"
Expand Down Expand Up @@ -227,7 +228,9 @@ func ComputeECR(tangleRoot, stateMutationRoot, stateRoot, manaRoot MerkleRoot) E

// region NodesActivityLog //////////////////////////////////////////////////////////////////////////////////////////////////

type NodesActivityLog map[Index]*ActivityLog
type NodesActivityLog struct {
shrinkingmap.ShrinkingMap[Index, *ActivityLog]
}

func (al *NodesActivityLog) FromBytes(data []byte) (err error) {
_, err = serix.DefaultAPI.Decode(context.Background(), data, al, serix.WithValidation())
Expand All @@ -246,38 +249,42 @@ func (al *NodesActivityLog) Bytes() []byte {
return objBytes
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
func NewNodesActivityLog() *NodesActivityLog {
return &NodesActivityLog{*shrinkingmap.New[Index, *ActivityLog]()}
}

// endregi`azon ///////////////////////////////////////////////////////////////////////////////////////////////////////////

// region ActivityLog //////////////////////////////////////////////////////////////////////////////////////////////////

// ActivityLog is a time-based log of node activity. It stores information when a node is active and provides
// functionality to query for certain timeframes.
type ActivityLog struct {
SetEpochs *set.AdvancedSet[identity.ID] `serix:"0,lengthPrefixType=uint32"`
setEpochs *set.AdvancedSet[identity.ID] `serix:"0,lengthPrefixType=uint32"`
}

// NewActivityLog is the constructor for ActivityLog.
func NewActivityLog() *ActivityLog {
a := &ActivityLog{
SetEpochs: set.NewAdvancedSet[identity.ID](),
setEpochs: set.NewAdvancedSet[identity.ID](),
}

return a
}

// Add adds a node to the activity log.
func (a *ActivityLog) Add(nodeID identity.ID) (added bool) {
return a.SetEpochs.Add(nodeID)
return a.setEpochs.Add(nodeID)
}

// Remove removes a node from the activity log.
func (a *ActivityLog) Remove(nodeID identity.ID) (removed bool) {
return a.SetEpochs.Delete(nodeID)
return a.setEpochs.Delete(nodeID)
}

// Active returns true if the provided node was active.
func (a *ActivityLog) Active(nodeID identity.ID) (active bool) {
if a.SetEpochs.Has(nodeID) {
if a.setEpochs.Has(nodeID) {
return true
}

Expand All @@ -287,8 +294,8 @@ func (a *ActivityLog) Active(nodeID identity.ID) (active bool) {
// String returns a human-readable version of ActivityLog.
func (a *ActivityLog) String() string {
var builder strings.Builder
builder.WriteString(fmt.Sprintf("ActivityLog(len=%d, elements=", a.SetEpochs.Size()))
a.SetEpochs.ForEach(func(nodeID identity.ID) (err error) {
builder.WriteString(fmt.Sprintf("ActivityLog(len=%d, elements=", a.setEpochs.Size()))
a.setEpochs.ForEach(func(nodeID identity.ID) (err error) {
builder.WriteString(fmt.Sprintf("%s, ", nodeID.String()))
return
})
Expand All @@ -299,10 +306,26 @@ func (a *ActivityLog) String() string {
// Clone clones the ActivityLog.
func (a *ActivityLog) Clone() *ActivityLog {
clone := NewActivityLog()
clone.SetEpochs = a.SetEpochs.Clone()
clone.setEpochs = a.setEpochs.Clone()
return clone
}

// ForEach iterates through the activity set and calls the callback for every element.
func (a *ActivityLog) ForEach(callback func(nodeID identity.ID) (err error)) (err error) {
a.setEpochs.ForEach(func(nodeID identity.ID) (err error) {
if err = callback(nodeID); err != nil {
return err
}
return nil
})

return err
}

func (a *ActivityLog) Size() int {
return a.Size()
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////

// SnapshotEpochActivity is the data structure to store node activity for the snapshot.
Expand Down
51 changes: 29 additions & 22 deletions packages/core/tangleold/cmanaweightprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"fmt"
"github.com/iotaledger/goshimmer/packages/core/epoch"
"github.com/iotaledger/hive.go/core/generics/shrinkingmap"
"github.com/iotaledger/hive.go/core/serix"
"github.com/iotaledger/hive.go/core/types"
"sync"
"time"

"github.com/cockroachdb/errors"
"github.com/iotaledger/hive.go/core/identity"
"github.com/iotaledger/hive.go/core/kvstore"
"github.com/iotaledger/hive.go/core/serix"
)

func init() {
Expand All @@ -38,7 +38,7 @@ type ActivityUpdatesCount map[identity.ID]uint64
type CManaWeightProvider struct {
store kvstore.KVStore
mutex sync.RWMutex
activityLog epoch.NodesActivityLog
activityLog *epoch.NodesActivityLog
updatedActivityCount *shrinkingmap.ShrinkingMap[epoch.Index, ActivityUpdatesCount]
manaRetrieverFunc ManaRetrieverFunc
timeRetrieverFunc TimeRetrieverFunc
Expand All @@ -48,7 +48,7 @@ type CManaWeightProvider struct {
// NewCManaWeightProvider is the constructor for CManaWeightProvider.
func NewCManaWeightProvider(manaRetrieverFunc ManaRetrieverFunc, timeRetrieverFunc TimeRetrieverFunc, confirmedEpochRetrieverFunc ConfirmedEpochRetrieverFunc, store ...kvstore.KVStore) (cManaWeightProvider *CManaWeightProvider) {
cManaWeightProvider = &CManaWeightProvider{
activityLog: make(epoch.NodesActivityLog),
activityLog: epoch.NewNodesActivityLog(),
updatedActivityCount: shrinkingmap.New[epoch.Index, ActivityUpdatesCount](shrinkingmap.WithShrinkingThresholdCount(100)),
manaRetrieverFunc: manaRetrieverFunc,
timeRetrieverFunc: timeRetrieverFunc,
Expand Down Expand Up @@ -83,10 +83,10 @@ func (c *CManaWeightProvider) Update(ei epoch.Index, nodeID identity.ID) {
c.mutex.Lock()
defer c.mutex.Unlock()

a, exists := c.activityLog[ei]
a, exists := c.activityLog.Get(ei)
if !exists {
a = epoch.NewActivityLog()
c.activityLog[ei] = a
c.activityLog.Set(ei, a)
}

a.Add(nodeID)
Expand All @@ -108,7 +108,7 @@ func (c *CManaWeightProvider) Remove(ei epoch.Index, nodeID identity.ID, updated
}
// if that was the last activity for this node in the ei epoch, then remove it from activity list
if epochUpdatesCount[nodeID] == 0 {
if a, exists := c.activityLog[ei]; exists {
if a, exists := c.activityLog.Get(ei); exists {
a.Remove(nodeID)
return true
}
Expand Down Expand Up @@ -136,11 +136,11 @@ func (c *CManaWeightProvider) WeightsOfRelevantVoters() (weights map[identity.ID
// nodes mana is counted only once for total weight calculation
totalWeightOnce := make(map[identity.ID]types.Empty)
for ei := lowerBoundEpoch; ei <= upperBoundEpoch; ei++ {
al, exists := c.activityLog[ei]
al, exists := c.activityLog.Get(ei)
if !exists {
continue
}
al.SetEpochs.ForEach(func(nodeID identity.ID) error {
al.ForEach(func(nodeID identity.ID) error {
nodeMana := mana[nodeID]
// Do this check after determining whether a node was active because otherwise we would never clean up
// the ActivityLog of nodes lower than the threshold.
Expand Down Expand Up @@ -170,8 +170,8 @@ func (c *CManaWeightProvider) SnapshotEpochActivity(epochDiffIndex epoch.Index)
c.mutex.Lock()
defer c.mutex.Unlock()

for ei, al := range c.activityLog {
al.SetEpochs.ForEach(func(nodeID identity.ID) error {
c.activityLog.ForEach(func(ei epoch.Index, activity *epoch.ActivityLog) bool {
activity.ForEach(func(nodeID identity.ID) error {
// we save only activity log up to epochDiffIndex as it is the last snapshotted epoch
if ei > epochDiffIndex {
return nil
Expand All @@ -186,7 +186,9 @@ func (c *CManaWeightProvider) SnapshotEpochActivity(epochDiffIndex epoch.Index)
}
return nil
})
}
return true
})

return
}

Expand All @@ -204,11 +206,14 @@ func (c *CManaWeightProvider) LoadActiveNodes(loadedActiveNodes epoch.SnapshotEp
defer c.mutex.Unlock()

for ei, epochActivity := range loadedActiveNodes {
if _, ok := c.activityLog[ei]; !ok {
c.activityLog[ei] = epoch.NewActivityLog()
var activityLog *epoch.ActivityLog
var ok bool
if activityLog, ok = c.activityLog.Get(ei); !ok {
activityLog = epoch.NewActivityLog()
c.activityLog.Set(ei, activityLog)
}
for nodeID, activityCount := range epochActivity.NodesLog() {
c.activityLog[ei].Add(nodeID)
activityLog.Add(nodeID)
c.updateActivityCount(ei, nodeID, activityCount)
}
}
Expand All @@ -224,15 +229,16 @@ type TimeRetrieverFunc func() time.Time
type ConfirmedEpochRetrieverFunc func() epoch.Index

// activeNodes returns the map of the active nodes.
func (c *CManaWeightProvider) activeNodes() (activeNodes epoch.NodesActivityLog) {
activeNodes = make(epoch.NodesActivityLog)
func (c *CManaWeightProvider) activeNodes() (activeNodes *epoch.NodesActivityLog) {
activeNodes = epoch.NewNodesActivityLog()

c.mutex.Lock()
defer c.mutex.Unlock()

for nodeID, al := range c.activityLog {
activeNodes[nodeID] = al.Clone()
}
c.activityLog.ForEach(func(ei epoch.Index, activity *epoch.ActivityLog) bool {
activeNodes.Set(ei, activity.Clone())
return true
})

return activeNodes
}
Expand All @@ -249,11 +255,12 @@ func (c *CManaWeightProvider) activityBoundaries() (lowerBoundEpoch, upperBoundE

// clean removes all activity logs for epochs lower than provided bound.
func (c *CManaWeightProvider) clean(cutoffEpoch epoch.Index) {
for ei := range c.activityLog {
c.activityLog.ForEachKey(func(ei epoch.Index) bool {
if ei < cutoffEpoch {
delete(c.activityLog, ei)
c.activityLog.Delete(ei)
}
}
return true
})
// clean also the updates counting map
c.updatedActivityCount.ForEach(func(ei epoch.Index, count ActivityUpdatesCount) bool {
if ei < cutoffEpoch {
Expand Down
32 changes: 25 additions & 7 deletions packages/core/tangleold/cmanaweightprovider_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tangleold

import (
"math/rand"
"testing"
"time"

Expand All @@ -19,22 +20,39 @@ func TestActiveNodesMarshalling(t *testing.T) {
"node3": identity.GenerateIdentity().ID(),
}

activeNodes := make(epoch.NodesActivityLog)
activeNodes := epoch.NewNodesActivityLog()

for i := 0; i < 100; i++ {
ei := epoch.Index(i)
al := epoch.NewActivityLog()
activeNodes.Set(ei, al)
weight := 1.0
for _, nodeID := range nodes {
if rand.Float64() < 0.1*weight {
al.Add(nodeID)
}
weight += 1
}
}

for _, nodeID := range nodes {
for i := 0; i < crypto.Randomness.Intn(100); i++ {
activeNodes[epoch.Index(i)] = epoch.NewActivityLog()
activeNodes[epoch.Index(i)].Add(nodeID)
al := epoch.NewActivityLog()
al.Add(nodeID)
activeNodes.Set(epoch.Index(i), al)
}
}
activeNodesBytes := activeNodes.Bytes()
activeNodes2 := make(epoch.NodesActivityLog)
activeNodes2 := epoch.NewNodesActivityLog()
err := activeNodes2.FromBytes(activeNodesBytes)
require.NoError(t, err)

for nodeID, a := range activeNodes {
assert.EqualValues(t, a.SetEpochs.Size(), activeNodes2[nodeID].SetEpochs.Size())
}
activeNodes.ForEach(func(ei epoch.Index, activity *epoch.ActivityLog) bool {
activity2, exists := activeNodes2.Get(ei)
require.True(t, exists)
assert.EqualValues(t, activity.Size(), activity2.Size())
return true
})
}

func TestCManaWeightProvider(t *testing.T) {
Expand Down