Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mana vector fixes #2396

Merged
merged 25 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
75d88ce
Use mana delay param only from mana plugin
daria305 Aug 22, 2022
ac3d5c1
Update unit tests
daria305 Aug 22, 2022
40b1323
Prune activity on epoch confirmed
daria305 Aug 22, 2022
355017a
Add missing expected calls
daria305 Aug 24, 2022
aed4483
Merge branch 'develop' into feat/mana-vector-update-fixes
daria305 Aug 24, 2022
0e856c3
Update mana vector update event after merging
daria305 Aug 24, 2022
34941a3
Tiny renames and comment
karimodm Aug 25, 2022
5cedc23
Update DiffUTXO test
daria305 Aug 25, 2022
69a3b21
Merge remote-tracking branch 'origin/feat/mana-vector-update-fixes' i…
daria305 Aug 25, 2022
6840417
Make SetEpochs private
daria305 Aug 25, 2022
7f4eadd
Change NodesActivityLog to shrinking map. Serializing not works
daria305 Aug 25, 2022
ab6022c
Fix stackoverflow
daria305 Aug 29, 2022
ae4c11a
Fix serialization
daria305 Aug 29, 2022
8397d22
Unexport field
daria305 Aug 29, 2022
08aa2da
Fix comment and forEach
daria305 Aug 29, 2022
8528b52
Fix channel close race condition
karimodm Aug 29, 2022
c3d429b
Return lowest processed epoch
karimodm Aug 29, 2022
363182b
deadlock fix
karimodm Aug 29, 2022
84d6156
Warpsync more channel fixes
karimodm Aug 30, 2022
62a49a0
Fix snapshot mutable constructor
daria305 Aug 30, 2022
b9d87ff
Use mutable model for Activity log to hide setEpochs
daria305 Aug 30, 2022
d48b9f2
Fix: epochstorage deadlock
karimodm Aug 30, 2022
68246d7
Warpsync: properly check for overlapping ranges
karimodm Aug 30, 2022
e577168
Make Inbound peer resource allocation synchronous
karimodm Aug 30, 2022
e92df0d
Merge remote-tracking branch 'origin/fix/warpsync-bugs' into feat/man…
daria305 Aug 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 82 additions & 17 deletions packages/core/epoch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/cockroachdb/errors"
"github.com/iotaledger/hive.go/core/generics/set"
"github.com/iotaledger/hive.go/core/generics/shrinkingmap"
"github.com/iotaledger/hive.go/core/identity"
"strings"
"time"
Expand All @@ -23,6 +24,13 @@ var (
Duration int64 = 10
)

func init() {
err := serix.DefaultAPI.RegisterTypeSettings(nodesActivitySerializableMap{}, serix.TypeSettings{}.WithLengthPrefixType(serix.LengthPrefixTypeAsUint32))
if err != nil {
panic(fmt.Errorf("error registering NodesActivityLog type settings: %w", err))
}
}

// Index is the ID of an epoch.
type Index int64

Expand Down Expand Up @@ -227,9 +235,9 @@ func ComputeECR(tangleRoot, stateMutationRoot, stateRoot, manaRoot MerkleRoot) E

// region NodesActivityLog //////////////////////////////////////////////////////////////////////////////////////////////////

type NodesActivityLog map[Index]*ActivityLog
type nodesActivitySerializableMap map[Index]*ActivityLog

func (al *NodesActivityLog) FromBytes(data []byte) (err error) {
func (al *nodesActivitySerializableMap) FromBytes(data []byte) (err error) {
_, err = serix.DefaultAPI.Decode(context.Background(), data, al, serix.WithValidation())
if err != nil {
err = errors.Errorf("failed to parse activeNodes: %w", err)
Expand All @@ -238,47 +246,94 @@ func (al *NodesActivityLog) FromBytes(data []byte) (err error) {
return
}

func (al *NodesActivityLog) Bytes() []byte {
func (al *nodesActivitySerializableMap) Bytes() []byte {
objBytes, err := serix.DefaultAPI.Encode(context.Background(), *al, serix.WithValidation())
if err != nil {
panic(err)
}
return objBytes
}

func (al *nodesActivitySerializableMap) nodesActivityLog() *NodesActivityLog {
activity := NewNodesActivityLog()
for ei, a := range *al {
activity.Set(ei, a)
}
return activity
}

type NodesActivityLog struct {
shrinkingmap.ShrinkingMap[Index, *ActivityLog] `serix:"0,lengthPrefixType=uint32"`
}

func (al *NodesActivityLog) FromBytes(data []byte) (err error) {
m := make(nodesActivitySerializableMap)
err = m.FromBytes(data)
if err != nil {
return err
}
al.loadActivityLogsMap(m)
return
}

func (al *NodesActivityLog) Bytes() []byte {
m := al.activityLogsMap()
return m.Bytes()
}

func NewNodesActivityLog() *NodesActivityLog {
return &NodesActivityLog{*shrinkingmap.New[Index, *ActivityLog]()}
}

func (al *NodesActivityLog) activityLogsMap() *nodesActivitySerializableMap {
activityMap := make(nodesActivitySerializableMap)
al.ForEach(func(ei Index, activity *ActivityLog) bool {
activityMap[ei] = activity
return true
})
return &activityMap
}

func (al *NodesActivityLog) loadActivityLogsMap(m nodesActivitySerializableMap) {
for ei, a := range m {
al.Set(ei, a)
}
return
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////

// region ActivityLog //////////////////////////////////////////////////////////////////////////////////////////////////

// ActivityLog is a time-based log of node activity. It stores information when a node is active and provides
// functionality to query for certain timeframes.
type ActivityLog struct {
SetEpochs *set.AdvancedSet[identity.ID] `serix:"0,lengthPrefixType=uint32"`
model.Mutable[ActivityLog, *ActivityLog, activityLogModel] `serix:"0"`
}

// nodeActivityModel stores node identities and corresponding accepted block counters indicating how many blocks node issued in a given epoch.
type activityLogModel struct {
ActivityLog *set.AdvancedSet[identity.ID] `serix:"0,lengthPrefixType=uint32"`
}

// NewActivityLog is the constructor for ActivityLog.
func NewActivityLog() *ActivityLog {

a := &ActivityLog{
SetEpochs: set.NewAdvancedSet[identity.ID](),
}

return a
return model.NewMutable[ActivityLog](&activityLogModel{ActivityLog: set.NewAdvancedSet[identity.ID]()})
}

// Add adds a node to the activity log.
func (a *ActivityLog) Add(nodeID identity.ID) (added bool) {
return a.SetEpochs.Add(nodeID)
return a.InnerModel().ActivityLog.Add(nodeID)
}

// Remove removes a node from the activity log.
func (a *ActivityLog) Remove(nodeID identity.ID) (removed bool) {
return a.SetEpochs.Delete(nodeID)
return a.InnerModel().ActivityLog.Delete(nodeID)
}

// Active returns true if the provided node was active.
func (a *ActivityLog) Active(nodeID identity.ID) (active bool) {
if a.SetEpochs.Has(nodeID) {
if a.InnerModel().ActivityLog.Has(nodeID) {
return true
}

Expand All @@ -288,8 +343,8 @@ func (a *ActivityLog) Active(nodeID identity.ID) (active bool) {
// String returns a human-readable version of ActivityLog.
func (a *ActivityLog) String() string {
var builder strings.Builder
builder.WriteString(fmt.Sprintf("ActivityLog(len=%d, elements=", a.SetEpochs.Size()))
a.SetEpochs.ForEach(func(nodeID identity.ID) (err error) {
builder.WriteString(fmt.Sprintf("ActivityLog(len=%d, elements=", a.Size()))
a.InnerModel().ActivityLog.ForEach(func(nodeID identity.ID) (err error) {
builder.WriteString(fmt.Sprintf("%s, ", nodeID.String()))
return
})
Expand All @@ -300,10 +355,20 @@ func (a *ActivityLog) String() string {
// Clone clones the ActivityLog.
func (a *ActivityLog) Clone() *ActivityLog {
clone := NewActivityLog()
clone.SetEpochs = a.SetEpochs.Clone()
clone.InnerModel().ActivityLog = a.InnerModel().ActivityLog.Clone()
return clone
}

// ForEach iterates through the activity set and calls the callback for every element.
func (a *ActivityLog) ForEach(callback func(nodeID identity.ID) (err error)) (err error) {
return a.InnerModel().ActivityLog.ForEach(callback)
}

// Size returns the size of the activity log.
func (a *ActivityLog) Size() int {
daria305 marked this conversation as resolved.
Show resolved Hide resolved
return a.InnerModel().ActivityLog.Size()
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////

// SnapshotEpochActivity is the data structure to store node activity for the snapshot.
Expand All @@ -321,7 +386,7 @@ type SnapshotNodeActivity struct {

// NewSnapshotNodeActivity creates a new SnapshotNodeActivity instance.
func NewSnapshotNodeActivity() *SnapshotNodeActivity {
return model.NewImmutable[SnapshotNodeActivity](&nodeActivityModel{NodesLog: make(map[identity.ID]uint64)})
return model.NewMutable[SnapshotNodeActivity](&nodeActivityModel{NodesLog: make(map[identity.ID]uint64)})
}

// nodeActivityModel stores node identities and corresponding accepted block counters indicating how many blocks node issued in a given epoch.
Expand Down
19 changes: 7 additions & 12 deletions packages/core/notarization/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ type Events struct {
// SyncRange is an event that gets triggered when an entire range of epochs needs to be requested, validated and solidified
SyncRange *event.Event[*SyncRangeEvent]
// ActivityTreeInserted is an event that gets triggered when nodeID is added to the activity tree.
ActivityTreeInserted *event.Event[*ActivityTreeUpdatedEvent]
// ActivityTreeRemoved is an event that gets triggered when nodeID is removed from activity tree.
ActivityTreeRemoved *event.Event[*ActivityTreeUpdatedEvent]

ActivityTreeInserted *event.Event[*ActivityTreeUpdatedEvent]
// ActivityTreeRemoved is an event that gets triggered when nodeID is removed from activity tree.
ActivityTreeRemoved *event.Event[*ActivityTreeUpdatedEvent]
}

// TangleTreeUpdatedEvent is a container that acts as a dictionary for the TangleTree inserted/removed event related parameters.
Expand Down Expand Up @@ -100,10 +99,6 @@ type CompetingCommitmentDetectedEvent struct {
type ManaVectorUpdateEvent struct {
// EI is the index of committable epoch.
EI epoch.Index
// EpochDiffCreated is the list of outputs created in the epoch.
EpochDiffCreated []*ledger.OutputWithMetadata
// EpochDiffSpent is the list of outputs spent in the epoch.
EpochDiffSpent []*ledger.OutputWithMetadata
}

// SyncRangeEvent is a container that acts as a dictionary for the SyncRange event related parameters.
Expand All @@ -116,10 +111,10 @@ type SyncRangeEvent struct {

// ActivityTreeUpdatedEvent is a container that acts as a dictionary for the ActivityTree inserted/removed event related parameters.
type ActivityTreeUpdatedEvent struct {
// EI is the index of the epoch.
EI epoch.Index
// NodeID is the issuer nodeID.
NodeID identity.ID
// EI is the index of the epoch.
EI epoch.Index
// NodeID is the issuer nodeID.
NodeID identity.ID
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
29 changes: 11 additions & 18 deletions packages/core/notarization/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,14 @@ func (m *Manager) PendingConflictsCountAll() (pendingConflicts map[epoch.Index]u
return pendingConflicts
}

// GetEpochDiff returns the epoch diff of an epoch.
func (m *Manager) GetEpochDiff(ei epoch.Index) (spent []*ledger.OutputWithMetadata, created []*ledger.OutputWithMetadata) {
m.epochCommitmentFactoryMutex.Lock()
defer m.epochCommitmentFactoryMutex.Unlock()
spent, created = m.epochCommitmentFactory.loadDiffUTXOs(ei)
return
}

// Bootstrapped returns the current value of pendingConflictsCount per epoch.
func (m *Manager) Bootstrapped() bool {
m.bootstrapMutex.RLock()
Expand Down Expand Up @@ -668,15 +676,8 @@ func (m *Manager) resolveOutputs(tx utxo.Transaction) (spentOutputsWithMetadata,
}

func (m *Manager) manaVectorUpdate(ei epoch.Index) (event *ManaVectorUpdateEvent) {
epochForManaVector := ei - epoch.Index(m.options.ManaEpochDelay)
if epochForManaVector < 1 {
return
}
spent, created := m.epochCommitmentFactory.loadDiffUTXOs(epochForManaVector)
return &ManaVectorUpdateEvent{
EI: ei,
EpochDiffCreated: created,
EpochDiffSpent: spent,
EI: ei,
}
}

Expand Down Expand Up @@ -737,8 +738,8 @@ func (m *Manager) updateEpochsBootstrapped(ei epoch.Index) {
}

// SnapshotEpochActivity snapshots accepted block counts from activity tree and updates provided SnapshotEpochActivity.
func (m *Manager) SnapshotEpochActivity() (epochActivity epoch.SnapshotEpochActivity, err error) {
epochActivity = m.tangle.WeightProvider.SnapshotEpochActivity()
func (m *Manager) SnapshotEpochActivity(epochDiffIndex epoch.Index) (epochActivity epoch.SnapshotEpochActivity, err error) {
epochActivity = m.tangle.WeightProvider.SnapshotEpochActivity(epochDiffIndex)
return
}

Expand All @@ -753,7 +754,6 @@ type ManagerOption func(options *ManagerOptions)
type ManagerOptions struct {
MinCommittableEpochAge time.Duration
BootstrapWindow time.Duration
ManaEpochDelay uint
Log *logger.Logger
}

Expand All @@ -771,13 +771,6 @@ func BootstrapWindow(d time.Duration) ManagerOption {
}
}

// ManaDelay specifies the epoch offset for mana vector from the last committable epoch.
func ManaDelay(d uint) ManagerOption {
return func(options *ManagerOptions) {
options.ManaEpochDelay = d
}
}

// Log provides the logger.
func Log(log *logger.Logger) ManagerOption {
return func(options *ManagerOptions) {
Expand Down
Loading