Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mana vector fixes #2396

Merged
merged 25 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
75d88ce
Use mana delay param only from mana plugin
daria305 Aug 22, 2022
ac3d5c1
Update unit tests
daria305 Aug 22, 2022
40b1323
Prune activity on epoch confirmed
daria305 Aug 22, 2022
355017a
Add missing expected calls
daria305 Aug 24, 2022
aed4483
Merge branch 'develop' into feat/mana-vector-update-fixes
daria305 Aug 24, 2022
0e856c3
Update mana vector update event after merging
daria305 Aug 24, 2022
34941a3
Tiny renames and comment
karimodm Aug 25, 2022
5cedc23
Update DiffUTXO test
daria305 Aug 25, 2022
69a3b21
Merge remote-tracking branch 'origin/feat/mana-vector-update-fixes' i…
daria305 Aug 25, 2022
6840417
Make SetEpochs private
daria305 Aug 25, 2022
7f4eadd
Change NodesActivityLog to shrinking map. Serializing not works
daria305 Aug 25, 2022
ab6022c
Fix stackoverflow
daria305 Aug 29, 2022
ae4c11a
Fix serialization
daria305 Aug 29, 2022
8397d22
Unexport field
daria305 Aug 29, 2022
08aa2da
Fix comment and forEach
daria305 Aug 29, 2022
8528b52
Fix channel close race condition
karimodm Aug 29, 2022
c3d429b
Return lowest processed epoch
karimodm Aug 29, 2022
363182b
deadlock fix
karimodm Aug 29, 2022
84d6156
Warpsync more channel fixes
karimodm Aug 30, 2022
62a49a0
Fix snapshot mutable constructor
daria305 Aug 30, 2022
b9d87ff
Use mutable model for Activity log to hide setEpochs
daria305 Aug 30, 2022
d48b9f2
Fix: epochstorage deadlock
karimodm Aug 30, 2022
68246d7
Warpsync: properly check for overlapping ranges
karimodm Aug 30, 2022
e577168
Make Inbound peer resource allocation synchronous
karimodm Aug 30, 2022
e92df0d
Merge remote-tracking branch 'origin/fix/warpsync-bugs' into feat/man…
daria305 Aug 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 67 additions & 4 deletions packages/core/epoch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/cockroachdb/errors"
"github.com/iotaledger/hive.go/core/generics/set"
"github.com/iotaledger/hive.go/core/generics/shrinkingmap"
"github.com/iotaledger/hive.go/core/identity"
"strings"
"time"
Expand Down Expand Up @@ -227,9 +228,9 @@ func ComputeECR(tangleRoot, stateMutationRoot, stateRoot, manaRoot MerkleRoot) E

// region NodesActivityLog //////////////////////////////////////////////////////////////////////////////////////////////////

type NodesActivityLog map[Index]*ActivityLog
type NodesActivitySerializableMap map[Index]*ActivityLog

func (al *NodesActivityLog) FromBytes(data []byte) (err error) {
func (al *NodesActivitySerializableMap) FromBytes(data []byte) (err error) {
_, err = serix.DefaultAPI.Decode(context.Background(), data, al, serix.WithValidation())
if err != nil {
err = errors.Errorf("failed to parse activeNodes: %w", err)
Expand All @@ -238,14 +239,61 @@ func (al *NodesActivityLog) FromBytes(data []byte) (err error) {
return
}

func (al *NodesActivityLog) Bytes() []byte {
func (al *NodesActivitySerializableMap) Bytes() []byte {
objBytes, err := serix.DefaultAPI.Encode(context.Background(), *al, serix.WithValidation())
if err != nil {
panic(err)
}
return objBytes
}

func (al *NodesActivitySerializableMap) nodesActivityLog() *NodesActivityLog {
activity := NewNodesActivityLog()
for ei, a := range *al {
activity.Set(ei, a)
}
return activity
}

type NodesActivityLog struct {
shrinkingmap.ShrinkingMap[Index, *ActivityLog] `serix:"0,lengthPrefixType=uint32"`
}

func (al *NodesActivityLog) FromBytes(data []byte) (err error) {
m := make(NodesActivitySerializableMap)
err = m.FromBytes(data)
if err != nil {
return err
}
al.loadActivityLogsMap(m)
return
}

func (al *NodesActivityLog) Bytes() []byte {
m := al.activityLogsMap()
return m.Bytes()
}

func NewNodesActivityLog() *NodesActivityLog {
return &NodesActivityLog{*shrinkingmap.New[Index, *ActivityLog]()}
}

func (al *NodesActivityLog) activityLogsMap() *NodesActivitySerializableMap {
activityMap := make(NodesActivitySerializableMap)
al.ForEach(func(ei Index, activity *ActivityLog) bool {
activityMap[ei] = activity
return true
})
return &activityMap
}

func (al *NodesActivityLog) loadActivityLogsMap(m NodesActivitySerializableMap) {
for ei, a := range m {
al.Set(ei, a)
}
return
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////

// region ActivityLog //////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -258,7 +306,6 @@ type ActivityLog struct {

// NewActivityLog is the constructor for ActivityLog.
func NewActivityLog() *ActivityLog {

a := &ActivityLog{
SetEpochs: set.NewAdvancedSet[identity.ID](),
}
Expand Down Expand Up @@ -304,6 +351,22 @@ func (a *ActivityLog) Clone() *ActivityLog {
return clone
}

// ForEach iterates through the activity set and calls the callback for every element.
func (a *ActivityLog) ForEach(callback func(nodeID identity.ID) (err error)) (err error) {
a.SetEpochs.ForEach(func(nodeID identity.ID) (err error) {
daria305 marked this conversation as resolved.
Show resolved Hide resolved
if err = callback(nodeID); err != nil {
return err
}
return nil
})

return err
}

func (a *ActivityLog) Size() int {
daria305 marked this conversation as resolved.
Show resolved Hide resolved
return a.SetEpochs.Size()
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////

// SnapshotEpochActivity is the data structure to store node activity for the snapshot.
Expand Down
19 changes: 7 additions & 12 deletions packages/core/notarization/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ type Events struct {
// SyncRange is an event that gets triggered when an entire range of epochs needs to be requested, validated and solidified
SyncRange *event.Event[*SyncRangeEvent]
// ActivityTreeInserted is an event that gets triggered when nodeID is added to the activity tree.
ActivityTreeInserted *event.Event[*ActivityTreeUpdatedEvent]
// ActivityTreeRemoved is an event that gets triggered when nodeID is removed from activity tree.
ActivityTreeRemoved *event.Event[*ActivityTreeUpdatedEvent]

ActivityTreeInserted *event.Event[*ActivityTreeUpdatedEvent]
// ActivityTreeRemoved is an event that gets triggered when nodeID is removed from activity tree.
ActivityTreeRemoved *event.Event[*ActivityTreeUpdatedEvent]
}

// TangleTreeUpdatedEvent is a container that acts as a dictionary for the TangleTree inserted/removed event related parameters.
Expand Down Expand Up @@ -100,10 +99,6 @@ type CompetingCommitmentDetectedEvent struct {
type ManaVectorUpdateEvent struct {
// EI is the index of committable epoch.
EI epoch.Index
// EpochDiffCreated is the list of outputs created in the epoch.
EpochDiffCreated []*ledger.OutputWithMetadata
// EpochDiffSpent is the list of outputs spent in the epoch.
EpochDiffSpent []*ledger.OutputWithMetadata
}

// SyncRangeEvent is a container that acts as a dictionary for the SyncRange event related parameters.
Expand All @@ -116,10 +111,10 @@ type SyncRangeEvent struct {

// ActivityTreeUpdatedEvent is a container that acts as a dictionary for the ActivityTree inserted/removed event related parameters.
type ActivityTreeUpdatedEvent struct {
// EI is the index of the epoch.
EI epoch.Index
// NodeID is the issuer nodeID.
NodeID identity.ID
// EI is the index of the epoch.
EI epoch.Index
// NodeID is the issuer nodeID.
NodeID identity.ID
}

// endregion ///////////////////////////////////////////////////////////////////////////////////////////////////////////
29 changes: 11 additions & 18 deletions packages/core/notarization/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,14 @@ func (m *Manager) PendingConflictsCountAll() (pendingConflicts map[epoch.Index]u
return pendingConflicts
}

// GetEpochDiff returns the epoch diff of an epoch.
func (m *Manager) GetEpochDiff(ei epoch.Index) (spent []*ledger.OutputWithMetadata, created []*ledger.OutputWithMetadata) {
m.epochCommitmentFactoryMutex.Lock()
defer m.epochCommitmentFactoryMutex.Unlock()
spent, created = m.epochCommitmentFactory.loadDiffUTXOs(ei)
return
}

// Bootstrapped returns the current value of pendingConflictsCount per epoch.
func (m *Manager) Bootstrapped() bool {
m.bootstrapMutex.RLock()
Expand Down Expand Up @@ -668,15 +676,8 @@ func (m *Manager) resolveOutputs(tx utxo.Transaction) (spentOutputsWithMetadata,
}

func (m *Manager) manaVectorUpdate(ei epoch.Index) (event *ManaVectorUpdateEvent) {
epochForManaVector := ei - epoch.Index(m.options.ManaEpochDelay)
if epochForManaVector < 1 {
return
}
spent, created := m.epochCommitmentFactory.loadDiffUTXOs(epochForManaVector)
return &ManaVectorUpdateEvent{
EI: ei,
EpochDiffCreated: created,
EpochDiffSpent: spent,
EI: ei,
}
}

Expand Down Expand Up @@ -737,8 +738,8 @@ func (m *Manager) updateEpochsBootstrapped(ei epoch.Index) {
}

// SnapshotEpochActivity snapshots accepted block counts from activity tree and updates provided SnapshotEpochActivity.
func (m *Manager) SnapshotEpochActivity() (epochActivity epoch.SnapshotEpochActivity, err error) {
epochActivity = m.tangle.WeightProvider.SnapshotEpochActivity()
func (m *Manager) SnapshotEpochActivity(epochDiffIndex epoch.Index) (epochActivity epoch.SnapshotEpochActivity, err error) {
epochActivity = m.tangle.WeightProvider.SnapshotEpochActivity(epochDiffIndex)
return
}

Expand All @@ -753,7 +754,6 @@ type ManagerOption func(options *ManagerOptions)
type ManagerOptions struct {
MinCommittableEpochAge time.Duration
BootstrapWindow time.Duration
ManaEpochDelay uint
Log *logger.Logger
}

Expand All @@ -771,13 +771,6 @@ func BootstrapWindow(d time.Duration) ManagerOption {
}
}

// ManaDelay specifies the epoch offset for mana vector from the last committable epoch.
func ManaDelay(d uint) ManagerOption {
return func(options *ManagerOptions) {
options.ManaEpochDelay = d
}
}

// Log provides the logger.
func Log(log *logger.Logger) ManagerOption {
return func(options *ManagerOptions) {
Expand Down
37 changes: 27 additions & 10 deletions packages/core/notarization/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ func TestManager_GetLatestEC(t *testing.T) {
nodes["A"].ID(): 100,
}
}
weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, time.Now)
confirmedRetrieverFunc := func() epoch.Index { return 0 }

weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, time.Now, confirmedRetrieverFunc)

genesisTime := time.Now().Add(-25 * time.Minute)
epochDuration := 5 * time.Minute
Expand Down Expand Up @@ -152,7 +154,8 @@ func TestManager_UpdateTangleTree(t *testing.T) {
nodes["D"].ID(): 25,
}
}
weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, time.Now)
confirmedRetrieverFunc := func() epoch.Index { return 0 }
weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, time.Now, confirmedRetrieverFunc)

epochInterval := 1 * time.Second

Expand Down Expand Up @@ -238,6 +241,8 @@ func TestManager_UpdateTangleTree(t *testing.T) {
assert.Equal(t, epoch.MerkleRoot{}, ecRecord.PrevEC())
event.Loop.WaitUntilAllTasksProcessed()
eventHandlerMock.Expect("EpochCommittable", epoch.Index(1))
eventHandlerMock.Expect("ManaVectorUpdate", epoch.Index(1))

testFramework.CreateBlock("Block4", tangleold.WithIssuingTime(issuingTime), tangleold.WithStrongParents("Block3", "Block2"), tangleold.WithIssuer(nodes["D"].PublicKey()), tangleold.WithECRecord(ecRecord))
testFramework.IssueBlocks("Block4").WaitUntilAllTasksProcessed()

Expand Down Expand Up @@ -289,7 +294,9 @@ func TestManager_UpdateStateMutationTree(t *testing.T) {
nodes["E"].ID(): 10,
}
}
weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, time.Now)
confirmedRetrieverFunc := func() epoch.Index { return 0 }

weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, time.Now, confirmedRetrieverFunc)

epochInterval := 1 * time.Second

Expand Down Expand Up @@ -354,6 +361,7 @@ func TestManager_UpdateStateMutationTree(t *testing.T) {
require.NoError(t, err)

eventHandlerMock.Expect("EpochCommittable", epoch.Index(1))
eventHandlerMock.Expect("ManaVectorUpdate", epoch.Index(1))
testFramework.CreateBlock("Block4", tangleold.WithIssuingTime(issuingTime), tangleold.WithStrongParents("Block3"), tangleold.WithIssuer(nodes["D"].PublicKey()), tangleold.WithECRecord(ecRecord))
testFramework.IssueBlocks("Block4").WaitUntilAllTasksProcessed()

Expand All @@ -372,6 +380,8 @@ func TestManager_UpdateStateMutationTree(t *testing.T) {
EC1 = ecRecord.ComputeEC()

eventHandlerMock.Expect("EpochCommittable", epoch.Index(2))
eventHandlerMock.Expect("ManaVectorUpdate", epoch.Index(2))

testFramework.CreateBlock("Block5", tangleold.WithIssuingTime(issuingTime), tangleold.WithStrongParents("Block4"), tangleold.WithIssuer(nodes["A"].PublicKey()), tangleold.WithInputs("A"), tangleold.WithOutput("C", 500), tangleold.WithECRecord(ecRecord))
testFramework.IssueBlocks("Block5").WaitUntilAllTasksProcessed()

Expand All @@ -389,7 +399,8 @@ func TestManager_UpdateStateMutationTree(t *testing.T) {
EC2 = ecRecord.ComputeEC()

eventHandlerMock.Expect("EpochCommittable", epoch.Index(3))
eventHandlerMock.Expect("ManaVectorUpdate", epoch.Index(3), []*ledger.OutputWithMetadata{}, []*ledger.OutputWithMetadata{})
eventHandlerMock.Expect("ManaVectorUpdate", epoch.Index(3))

testFramework.CreateBlock("Block6", tangleold.WithIssuingTime(issuingTime), tangleold.WithStrongParents("Block5"), tangleold.WithIssuer(nodes["E"].PublicKey()), tangleold.WithInputs("B"), tangleold.WithOutput("D", 500), tangleold.WithECRecord(ecRecord))
testFramework.IssueBlocks("Block6").WaitUntilAllTasksProcessed()

Expand All @@ -409,7 +420,7 @@ func TestManager_UpdateStateMutationTree(t *testing.T) {
require.NoError(t, err)

eventHandlerMock.Expect("EpochCommittable", epoch.Index(4))
eventHandlerMock.Expect("ManaVectorUpdate", epoch.Index(4), []*ledger.OutputWithMetadata{}, []*ledger.OutputWithMetadata{})
eventHandlerMock.Expect("ManaVectorUpdate", epoch.Index(4))
testFramework.CreateBlock("Block7", tangleold.WithIssuingTime(issuingTime), tangleold.WithStrongParents("Block6"), tangleold.WithIssuer(nodes["C"].PublicKey()), tangleold.WithInputs("C"), tangleold.WithOutput("E", 500), tangleold.WithECRecord(ecRecord))
testFramework.IssueBlocks("Block7").WaitUntilAllTasksProcessed()

Expand Down Expand Up @@ -463,8 +474,9 @@ func TestManager_UpdateStateMutationTreeWithConflict(t *testing.T) {

// Make Current Epoch be epoch 5
genesisTime := time.Now().Add(-epochInterval * 5)
confirmedRetrieverFunc := func() epoch.Index { return 0 }

weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, time.Now)
weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, time.Now, confirmedRetrieverFunc)
testFramework, eventHandlerMock, notarizationMgr := setupFramework(t, genesisTime, epochInterval, epochInterval*2, tangleold.ApprovalWeights(weightProvider), tangleold.WithConflictDAGOptions(conflictdag.WithMergeToMaster(false)))

issuingTime := genesisTime
Expand Down Expand Up @@ -586,6 +598,7 @@ func TestManager_UpdateStateMutationTreeWithConflict(t *testing.T) {
require.NoError(t, err)

eventHandlerMock.Expect("EpochCommittable", epoch.Index(1))
eventHandlerMock.Expect("ManaVectorUpdate", epoch.Index(1))
testFramework.CreateBlock("Block8", tangleold.WithIssuingTime(issuingTime), tangleold.WithStrongParents("Block7"), tangleold.WithIssuer(nodes["D"].PublicKey()), tangleold.WithECRecord(ecRecord))
testFramework.IssueBlocks("Block8").WaitUntilAllTasksProcessed()

Expand Down Expand Up @@ -628,8 +641,9 @@ func TestManager_TransactionInclusionUpdate(t *testing.T) {

// Make Current Epoch be epoch 5
genesisTime := time.Now().Add(-epochInterval * 5)
confirmedRetrieverFunc := func() epoch.Index { return 0 }

weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, time.Now)
weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, time.Now, confirmedRetrieverFunc)
testFramework, eventHandlerMock, notarizationMgr := setupFramework(t, genesisTime, epochInterval, epochInterval*2, tangleold.ApprovalWeights(weightProvider), tangleold.WithConflictDAGOptions(conflictdag.WithMergeToMaster(false)))

issuingTime := genesisTime
Expand Down Expand Up @@ -793,13 +807,14 @@ func TestManager_DiffUTXOs(t *testing.T) {
nodes["E"].ID(): 10,
}
}
confirmedRetrieverFunc := func() epoch.Index { return 0 }

epochInterval := 1 * time.Second

// Make Current Epoch be epoch 5
genesisTime := time.Now().Add(-epochInterval * 5)

weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, time.Now)
weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, time.Now, confirmedRetrieverFunc)
testFramework, eventHandlerMock, notarizationMgr := setupFramework(t, genesisTime, epochInterval, epochInterval*2, tangleold.ApprovalWeights(weightProvider), tangleold.WithConflictDAGOptions(conflictdag.WithMergeToMaster(false)))

issuingTime := genesisTime
Expand Down Expand Up @@ -895,6 +910,7 @@ func TestManager_DiffUTXOs(t *testing.T) {
require.Equal(t, epoch.Index(0), ecRecord.EI())

eventHandlerMock.Expect("EpochCommittable", epoch.Index(1))
eventHandlerMock.Expect("ManaVectorUpdate", epoch.Index(1))
testFramework.CreateBlock("Block6", tangleold.WithIssuingTime(issuingTime), tangleold.WithStrongParents("Block5"), tangleold.WithIssuer(nodes["E"].PublicKey()), tangleold.WithInputs("G5"), tangleold.WithOutput("H6", 500), tangleold.WithECRecord(ecRecord))
testFramework.IssueBlocks("Block6").WaitUntilAllTasksProcessed()

Expand Down Expand Up @@ -994,8 +1010,9 @@ func TestManager_ActivityTree(t *testing.T) {
// ei := epoch.IndexFromTime(time.Now())
// weightProvider.Update(ei, node.ID())
//}
confirmedRetrieverFunc := func() epoch.Index { return 0 }

weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, timeRetrieverFunc)
weightProvider = tangleold.NewCManaWeightProvider(manaRetrieverMock, timeRetrieverFunc, confirmedRetrieverFunc)
testFramework, _, _ := setupFramework(t, genesisTime, epochInterval, epochInterval*2, tangleold.ApprovalWeights(weightProvider), tangleold.WithConflictDAGOptions(conflictdag.WithMergeToMaster(false)))

// expected activity records
Expand Down Expand Up @@ -1082,7 +1099,7 @@ func setupFramework(t *testing.T, genesisTime time.Time, epochInterval time.Dura

// set up notarization manager
ecFactory := NewEpochCommitmentFactory(testTangle.Options.Store, testTangle, 0)
m = NewManager(ecFactory, testTangle, MinCommittableEpochAge(minCommittable), BootstrapWindow(minCommittable*2), ManaDelay(2), Log(logger.NewExampleLogger("test")))
m = NewManager(ecFactory, testTangle, MinCommittableEpochAge(minCommittable), BootstrapWindow(minCommittable*2), Log(logger.NewExampleLogger("test")))

commitmentFunc := func() (ecRecord *epoch.ECRecord, latestConfirmedEpoch epoch.Index, err error) {
ecRecord, err = m.GetLatestEC()
Expand Down
2 changes: 1 addition & 1 deletion packages/core/notarization/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,6 @@ func (e *EventMock) EpochCommittable(event *EpochCommittableEvent) {

// ManaVectorUpdate is the mocked ManaVectorUpdate event.
func (e *EventMock) ManaVectorUpdate(event *ManaVectorUpdateEvent) {
e.Called(event.EI, event.EpochDiffCreated, event.EpochDiffSpent)
e.Called(event.EI)
atomic.AddUint64(&e.calledEvents, 1)
}
2 changes: 1 addition & 1 deletion packages/core/snapshot/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (m *Manager) CreateSnapshot(snapshotFileName string) (header *ledger.Snapsh
sepsProd := NewSolidEntryPointsProducer(fullEpochIndex, ecRecord.EI(), m)
outputWithMetadataProd := NewLedgerUTXOStatesProducer(m.notarizationMgr)
epochDiffsProd := NewEpochDiffsProducer(fullEpochIndex, ecRecord.EI(), m.notarizationMgr)
activityProducer := NewActivityLogProducer(m.notarizationMgr)
activityProducer := NewActivityLogProducer(m.notarizationMgr, ecRecord.EI())

header, err = CreateSnapshot(snapshotFileName, headerProd, sepsProd, outputWithMetadataProd, epochDiffsProd, activityProducer)

Expand Down
4 changes: 2 additions & 2 deletions packages/core/snapshot/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ func writeSolidEntryPoints(writeSeeker io.WriteSeeker, seps *SolidEntryPoints) e
}

// NewActivityLogProducer returns an ActivityLogProducerFunc that provides activity log from weightProvider and notarization manager.
func NewActivityLogProducer(notarizationMgr *notarization.Manager) ActivityLogProducerFunc {
activityLog, err := notarizationMgr.SnapshotEpochActivity()
func NewActivityLogProducer(notarizationMgr *notarization.Manager, epochDiffIndex epoch.Index) ActivityLogProducerFunc {
activityLog, err := notarizationMgr.SnapshotEpochActivity(epochDiffIndex)
if err != nil {
panic(err)
}
Expand Down
Loading