diff --git a/CHANGELOG.md b/CHANGELOG.md index 620968ab27..7efefac6c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# v0.9.6 - 2022-09-01 + +> This release introduces major bugfixes to epoch notarization and networking. + +- Fix: several bugs in Notarization and Networking (#2412) +- Wrap faucet errors and add faucet logging (#2411) +- Fix concurrent map writes in integration network shutdown (#2410) + # v0.9.5 - 2022-08-31 > This release introduces a warpsync plugin for fast epochs retrieval, a simplified faucet, local snapshot improvements, and network and general bug fixes. diff --git a/deploy/ansible/roles/goshimmer-node/templates/docker-compose-analysis-server.yml.j2 b/deploy/ansible/roles/goshimmer-node/templates/docker-compose-analysis-server.yml.j2 index 21982d143c..7fbd70731d 100644 --- a/deploy/ansible/roles/goshimmer-node/templates/docker-compose-analysis-server.yml.j2 +++ b/deploy/ansible/roles/goshimmer-node/templates/docker-compose-analysis-server.yml.j2 @@ -29,6 +29,6 @@ services: --metrics.local=false --metrics.global=true --node.enablePlugins=analysisServer,analysisDashboard,prometheus - --node.disablePlugins=activity,analysisClient,chat,consensus,dashboard,faucet,gossip,firewall,issuer,mana,manualpeering,blockLayer,metrics,networkdelay,portcheck,pow,syncBeaconFollower,webAPIBroadcastDataEndpoint,WebAPIDataEndpoint,WebAPIHealthzEndpoint,WebAPIFaucetRequestEndpoint,webAPIFindTransactionHashesEndpoint,webAPIGetNeighborsEndpoint,webAPIGetTransactionObjectsByHashEndpoint,webAPIGetTransactionTrytesByHashEndpoint,WebAPIInfoEndpoint,WebAPILedgerstateEndpoint,WebAPIBlockEndpoint,WebAPIToolsBlockEndpoint,WebAPIWeightProviderEndpoint,remotelog,remotelogmetrics,DAGsVisualizer,WebAPIRateSetterEndpoint,WebAPISchedulerEndpoint,ManaInitializer,Notarization,EpochStorage,WebAPIEpochEndpoint,BootstrapManager + --node.disablePlugins=activity,analysisClient,chat,consensus,dashboard,faucet,gossip,firewall,issuer,mana,manualpeering,blockLayer,metrics,networkdelay,portcheck,pow,syncBeaconFollower,webAPIBroadcastDataEndpoint,WebAPIDataEndpoint,WebAPIHealthzEndpoint,WebAPIFaucetRequestEndpoint,webAPIFindTransactionHashesEndpoint,webAPIGetNeighborsEndpoint,webAPIGetTransactionObjectsByHashEndpoint,webAPIGetTransactionTrytesByHashEndpoint,WebAPIInfoEndpoint,WebAPILedgerstateEndpoint,WebAPIBlockEndpoint,WebAPIToolsBlockEndpoint,WebAPIWeightProviderEndpoint,remotelog,remotelogmetrics,DAGsVisualizer,WebAPIRateSetterEndpoint,WebAPISchedulerEndpoint,ManaInitializer,Notarization,EpochStorage,WebAPIEpochEndpoint,BootstrapManager,Warpsync,Snapshot --logger.level={{ logLevel }} --logger.outputPaths=stdout diff --git a/go.mod b/go.mod index 6a8a02ead8..b00d0a84af 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( go.uber.org/atomic v1.9.0 go.uber.org/dig v1.15.0 golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 google.golang.org/protobuf v1.28.1 gopkg.in/src-d/go-git.v4 v4.13.1 ) @@ -184,7 +185,6 @@ require ( go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.22.0 // indirect golang.org/x/net v0.0.0-20220809012201-f428fae20770 // indirect - golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect diff --git a/packages/core/epoch/types.go b/packages/core/epoch/types.go index 48ad759fba..f7cd2d5d5a 100644 --- a/packages/core/epoch/types.go +++ b/packages/core/epoch/types.go @@ -19,7 +19,7 @@ import ( var ( // GenesisTime is the time (Unix in seconds) of the genesis. - GenesisTime int64 = 1661859573 + GenesisTime int64 = 1662035280 // Duration is the default epoch duration in seconds. Duration int64 = 10 ) diff --git a/packages/core/mana/events.go b/packages/core/mana/events.go index 8f7a54b243..0604333453 100644 --- a/packages/core/mana/events.go +++ b/packages/core/mana/events.go @@ -9,6 +9,8 @@ import ( "github.com/iotaledger/hive.go/core/stringify" "github.com/mr-tron/base58" + "github.com/iotaledger/goshimmer/packages/core/epoch" + "github.com/iotaledger/goshimmer/packages/core/ledger" "github.com/iotaledger/goshimmer/packages/core/ledger/utxo" ) @@ -58,6 +60,16 @@ type Event interface { String() string } +// ManaVectorUpdateEvent is a container that acts as a dictionary for the EpochCommittable event related parameters. +type ManaVectorUpdateEvent struct { + // EI is the index of committable epoch. + EI epoch.Index + // Spent are outputs that is spent in a transaction. + Spent []*ledger.OutputWithMetadata + // Created are the outputs created in a transaction. + Created []*ledger.OutputWithMetadata +} + // PledgedEvent is the struct that is passed along with triggering a Pledged event. type PledgedEvent struct { NodeID identity.ID diff --git a/packages/core/notarization/commitments.go b/packages/core/notarization/commitments.go index 020335b9b9..ae3a7b8f02 100644 --- a/packages/core/notarization/commitments.go +++ b/packages/core/notarization/commitments.go @@ -14,6 +14,7 @@ import ( "github.com/iotaledger/hive.go/core/generics/lo" "github.com/iotaledger/hive.go/core/generics/objectstorage" + "github.com/iotaledger/hive.go/core/generics/shrinkingmap" "github.com/iotaledger/hive.go/core/kvstore" "golang.org/x/crypto/blake2b" @@ -49,7 +50,7 @@ type CommitmentTrees struct { // EpochCommitmentFactory manages epoch commitmentTrees. type EpochCommitmentFactory struct { - commitmentTrees map[epoch.Index]*CommitmentTrees + commitmentTrees *shrinkingmap.ShrinkingMap[epoch.Index, *CommitmentTrees] storage *EpochCommitmentStorage tangle *tangleold.Tangle @@ -74,7 +75,7 @@ func NewEpochCommitmentFactory(store kvstore.KVStore, tangle *tangleold.Tangle, manaRootTreeValueStore := objectstorage.NewStoreWithRealm(epochCommitmentStorage.baseStore, database.PrefixNotarization, prefixManaTreeValues) return &EpochCommitmentFactory{ - commitmentTrees: make(map[epoch.Index]*CommitmentTrees), + commitmentTrees: shrinkingmap.New[epoch.Index, *CommitmentTrees](), storage: epochCommitmentStorage, tangle: tangle, snapshotDepth: snapshotDepth, @@ -344,7 +345,10 @@ func (f *EpochCommitmentFactory) newEpochRoots(ei epoch.Index) (commitmentRoots } // We advance the LedgerState to the next epoch. - f.commitLedgerState(ei - epoch.Index(f.snapshotDepth)) + epochToCommit := ei - epoch.Index(f.snapshotDepth) + if epochToCommit > 0 { + f.commitLedgerState(epochToCommit) + } commitmentRoots = &epoch.CommitmentRoots{ StateRoot: epoch.NewMerkleRoot(stateRoot), @@ -354,7 +358,7 @@ func (f *EpochCommitmentFactory) newEpochRoots(ei epoch.Index) (commitmentRoots } // We are never going to use this epoch's commitment trees again. - delete(f.commitmentTrees, ei) + f.commitmentTrees.Delete(ei) return commitmentRoots, nil } @@ -383,10 +387,10 @@ func (f *EpochCommitmentFactory) getCommitmentTrees(ei epoch.Index) (commitmentT if ei <= lastCommittedEpoch { return nil, errors.Errorf("cannot get commitment trees for epoch %d, because it is already committed", ei) } - commitmentTrees, ok := f.commitmentTrees[ei] + commitmentTrees, ok := f.commitmentTrees.Get(ei) if !ok { commitmentTrees = f.newCommitmentTrees(ei) - f.commitmentTrees[ei] = commitmentTrees + f.commitmentTrees.Set(ei, commitmentTrees) } return } diff --git a/packages/core/notarization/events.go b/packages/core/notarization/events.go index 90514370d1..57b83f8fa6 100644 --- a/packages/core/notarization/events.go +++ b/packages/core/notarization/events.go @@ -4,6 +4,7 @@ import ( "github.com/iotaledger/goshimmer/packages/core/epoch" "github.com/iotaledger/goshimmer/packages/core/ledger" "github.com/iotaledger/goshimmer/packages/core/ledger/utxo" + "github.com/iotaledger/goshimmer/packages/core/mana" "github.com/iotaledger/goshimmer/packages/core/tangleold" "github.com/iotaledger/hive.go/core/generics/event" "github.com/iotaledger/hive.go/core/identity" @@ -20,7 +21,7 @@ type Events struct { // CompetingCommitmentDetected is an event that gets triggered whenever a competing epoch commitment is detected. CompetingCommitmentDetected *event.Event[*CompetingCommitmentDetectedEvent] // ManaVectorUpdate is an event that gets triggered whenever the consensus mana vector needs to be updated. - ManaVectorUpdate *event.Event[*ManaVectorUpdateEvent] + ManaVectorUpdate *event.Event[*mana.ManaVectorUpdateEvent] // TangleTreeInserted is an event that gets triggered when a Block is inserted into the Tangle smt. TangleTreeInserted *event.Event[*TangleTreeUpdatedEvent] // TangleTreeRemoved is an event that gets triggered when a Block is removed from Tangle smt. @@ -69,10 +70,10 @@ type StateMutationTreeUpdatedEvent struct { type UTXOUpdatedEvent struct { // EI is the index of updated UTXO. EI epoch.Index - // Created are the outputs created in a transaction. - Created []*ledger.OutputWithMetadata // Spent are outputs that is spent in a transaction. Spent []*ledger.OutputWithMetadata + // Created are the outputs created in a transaction. + Created []*ledger.OutputWithMetadata } // EpochCommittableEvent is a container that acts as a dictionary for the EpochCommittable event related parameters. @@ -95,12 +96,6 @@ type CompetingCommitmentDetectedEvent struct { Block *tangleold.Block } -// ManaVectorUpdateEvent is a container that acts as a dictionary for the EpochCommittable event related parameters. -type ManaVectorUpdateEvent struct { - // EI is the index of committable epoch. - EI epoch.Index -} - // SyncRangeEvent is a container that acts as a dictionary for the SyncRange event related parameters. type SyncRangeEvent struct { StartEI epoch.Index diff --git a/packages/core/notarization/manager.go b/packages/core/notarization/manager.go index 9d9c5fa64d..e88a668da3 100644 --- a/packages/core/notarization/manager.go +++ b/packages/core/notarization/manager.go @@ -10,9 +10,11 @@ import ( "github.com/iotaledger/hive.go/core/generics/event" "github.com/iotaledger/hive.go/core/generics/lo" + "github.com/iotaledger/hive.go/core/generics/shrinkingmap" "github.com/iotaledger/hive.go/core/logger" "github.com/iotaledger/goshimmer/packages/core/conflictdag" + "github.com/iotaledger/goshimmer/packages/core/mana" "github.com/iotaledger/goshimmer/packages/node/clock" "github.com/iotaledger/goshimmer/packages/core/epoch" @@ -35,7 +37,7 @@ type Manager struct { epochCommitmentFactoryMutex sync.RWMutex bootstrapMutex sync.RWMutex options *ManagerOptions - pendingConflictsCounters map[epoch.Index]uint64 + pendingConflictsCounters *shrinkingmap.ShrinkingMap[epoch.Index, uint64] log *logger.Logger Events *Events bootstrapped bool @@ -55,7 +57,7 @@ func NewManager(epochCommitmentFactory *EpochCommitmentFactory, t *tangleold.Tan new = &Manager{ tangle: t, epochCommitmentFactory: epochCommitmentFactory, - pendingConflictsCounters: make(map[epoch.Index]uint64), + pendingConflictsCounters: shrinkingmap.New[epoch.Index, uint64](), log: options.Log, options: options, Events: &Events{ @@ -66,7 +68,7 @@ func NewManager(epochCommitmentFactory *EpochCommitmentFactory, t *tangleold.Tan UTXOTreeInserted: event.New[*UTXOUpdatedEvent](), UTXOTreeRemoved: event.New[*UTXOUpdatedEvent](), EpochCommittable: event.New[*EpochCommittableEvent](), - ManaVectorUpdate: event.New[*ManaVectorUpdateEvent](), + ManaVectorUpdate: event.New[*mana.ManaVectorUpdateEvent](), Bootstrapped: event.New[*BootstrappedEvent](), SyncRange: event.New[*SyncRangeEvent](), ActivityTreeInserted: event.New[*ActivityTreeUpdatedEvent](), @@ -172,7 +174,7 @@ func (m *Manager) LoadOutputsWithMetadata(outputsWithMetadatas []*ledger.OutputW } } -// LoadEpochDiffs updates the state tree from a given snapshot. +// LoadEpochDiff loads an epoch diff. func (m *Manager) LoadEpochDiff(epochDiff *ledger.EpochDiff) { m.epochCommitmentFactoryMutex.Lock() defer m.epochCommitmentFactoryMutex.Unlock() @@ -328,8 +330,6 @@ func (m *Manager) OnBlockStored(block *tangleold.Block) { latestCommittableEI := lo.PanicOnErr(m.epochCommitmentFactory.storage.latestCommittableEpochIndex()) epochDeltaSeconds := time.Duration(int64(blockEI-latestCommittableEI)*epoch.Duration) * time.Second - m.log.Debugf("block committing to epoch %d stored, latest committable epoch is %d", blockEI, latestCommittableEI) - // If we are too far behind, we will warpsync if epochDeltaSeconds > m.options.BootstrapWindow { m.Events.SyncRange.Trigger(&SyncRangeEvent{ @@ -451,7 +451,7 @@ func (m *Manager) OnConflictAccepted(conflictID utxo.TransactionID) { } // OnConflictConfirmed is the handler for conflict confirmed event. -func (m *Manager) onConflictAccepted(conflictID utxo.TransactionID) ([]*EpochCommittableEvent, []*ManaVectorUpdateEvent) { +func (m *Manager) onConflictAccepted(conflictID utxo.TransactionID) ([]*EpochCommittableEvent, []*mana.ManaVectorUpdateEvent) { m.epochCommitmentFactoryMutex.Lock() defer m.epochCommitmentFactoryMutex.Unlock() @@ -485,7 +485,7 @@ func (m *Manager) OnConflictRejected(conflictID utxo.TransactionID) { } // OnConflictRejected is the handler for conflict created event. -func (m *Manager) onConflictRejected(conflictID utxo.TransactionID) ([]*EpochCommittableEvent, []*ManaVectorUpdateEvent) { +func (m *Manager) onConflictRejected(conflictID utxo.TransactionID) ([]*EpochCommittableEvent, []*mana.ManaVectorUpdateEvent) { m.epochCommitmentFactoryMutex.Lock() defer m.epochCommitmentFactoryMutex.Unlock() @@ -505,7 +505,7 @@ func (m *Manager) OnAcceptanceTimeUpdated(newTime time.Time) { } // OnAcceptanceTimeUpdated is the handler for time updated event and returns events to be triggered. -func (m *Manager) onAcceptanceTimeUpdated(newTime time.Time) ([]*EpochCommittableEvent, []*ManaVectorUpdateEvent) { +func (m *Manager) onAcceptanceTimeUpdated(newTime time.Time) ([]*EpochCommittableEvent, []*mana.ManaVectorUpdateEvent) { m.epochCommitmentFactoryMutex.Lock() defer m.epochCommitmentFactoryMutex.Unlock() @@ -532,21 +532,14 @@ func (m *Manager) PendingConflictsCountAll() (pendingConflicts map[epoch.Index]u m.epochCommitmentFactoryMutex.RLock() defer m.epochCommitmentFactoryMutex.RUnlock() - pendingConflicts = make(map[epoch.Index]uint64, len(m.pendingConflictsCounters)) - for k, v := range m.pendingConflictsCounters { + pendingConflicts = make(map[epoch.Index]uint64, m.pendingConflictsCounters.Size()) + m.pendingConflictsCounters.ForEach(func(k epoch.Index, v uint64) bool { pendingConflicts[k] = v - } + return true + }) return pendingConflicts } -// GetEpochDiff returns the epoch diff of an epoch. -func (m *Manager) GetEpochDiff(ei epoch.Index) (spent []*ledger.OutputWithMetadata, created []*ledger.OutputWithMetadata) { - m.epochCommitmentFactoryMutex.Lock() - defer m.epochCommitmentFactoryMutex.Unlock() - spent, created = m.epochCommitmentFactory.loadDiffUTXOs(ei) - return -} - // Bootstrapped returns the current value of pendingConflictsCount per epoch. func (m *Manager) Bootstrapped() bool { m.bootstrapMutex.RLock() @@ -563,16 +556,20 @@ func (m *Manager) Shutdown() { m.epochCommitmentFactory.storage.shutdown() } -func (m *Manager) decreasePendingConflictCounter(ei epoch.Index) ([]*EpochCommittableEvent, []*ManaVectorUpdateEvent) { - m.pendingConflictsCounters[ei]-- - if m.pendingConflictsCounters[ei] == 0 { +func (m *Manager) decreasePendingConflictCounter(ei epoch.Index) ([]*EpochCommittableEvent, []*mana.ManaVectorUpdateEvent) { + count, _ := m.pendingConflictsCounters.Get(ei) + count-- + m.pendingConflictsCounters.Set(ei, count) + if count == 0 { return m.moveLatestCommittableEpoch(ei) } return nil, nil } func (m *Manager) increasePendingConflictCounter(ei epoch.Index) { - m.pendingConflictsCounters[ei]++ + count, _ := m.pendingConflictsCounters.Get(ei) + count++ + m.pendingConflictsCounters.Set(ei, count) } func (m *Manager) includeTransactionInEpoch(txID utxo.TransactionID, ei epoch.Index, spent, created []*ledger.OutputWithMetadata) (err error) { @@ -611,7 +608,7 @@ func (m *Manager) allPastConflictsAreResolved(ei epoch.Index) (conflictsResolved } // epoch is not committable if there are any not resolved conflicts in this and past epochs for index := lastEI; index <= ei; index++ { - if m.pendingConflictsCounters[index] != 0 { + if count, _ := m.pendingConflictsCounters.Get(index); count != 0 { return false } } @@ -675,13 +672,23 @@ func (m *Manager) resolveOutputs(tx utxo.Transaction) (spentOutputsWithMetadata, return } -func (m *Manager) manaVectorUpdate(ei epoch.Index) (event *ManaVectorUpdateEvent) { - return &ManaVectorUpdateEvent{ - EI: ei, +func (m *Manager) manaVectorUpdate(ei epoch.Index) (event *mana.ManaVectorUpdateEvent) { + manaEpoch := ei - epoch.Index(m.options.ManaEpochDelay) + spent := []*ledger.OutputWithMetadata{} + created := []*ledger.OutputWithMetadata{} + + if manaEpoch > 0 { + spent, created = m.epochCommitmentFactory.loadDiffUTXOs(manaEpoch) + } + + return &mana.ManaVectorUpdateEvent{ + EI: ei, + Spent: spent, + Created: created, } } -func (m *Manager) moveLatestCommittableEpoch(currentEpoch epoch.Index) ([]*EpochCommittableEvent, []*ManaVectorUpdateEvent) { +func (m *Manager) moveLatestCommittableEpoch(currentEpoch epoch.Index) ([]*EpochCommittableEvent, []*mana.ManaVectorUpdateEvent) { latestCommittable, err := m.epochCommitmentFactory.storage.latestCommittableEpochIndex() if err != nil { m.log.Errorf("could not obtain last committed epoch index: %v", err) @@ -689,7 +696,7 @@ func (m *Manager) moveLatestCommittableEpoch(currentEpoch epoch.Index) ([]*Epoch } epochCommittableEvents := make([]*EpochCommittableEvent, 0) - manaVectorUpdateEvents := make([]*ManaVectorUpdateEvent, 0) + manaVectorUpdateEvents := make([]*mana.ManaVectorUpdateEvent, 0) for ei := latestCommittable + 1; ei <= currentEpoch; ei++ { if !m.isCommittable(ei) { break @@ -715,11 +722,14 @@ func (m *Manager) moveLatestCommittableEpoch(currentEpoch epoch.Index) ([]*Epoch if manaVectorUpdateEvent := m.manaVectorUpdate(ei); manaVectorUpdateEvent != nil { manaVectorUpdateEvents = append(manaVectorUpdateEvents, manaVectorUpdateEvent) } + + // We do not need to track pending conflicts for a committed epoch anymore. + m.pendingConflictsCounters.Delete(ei) } return epochCommittableEvents, manaVectorUpdateEvents } -func (m *Manager) triggerEpochEvents(epochCommittableEvents []*EpochCommittableEvent, manaVectorUpdateEvents []*ManaVectorUpdateEvent) { +func (m *Manager) triggerEpochEvents(epochCommittableEvents []*EpochCommittableEvent, manaVectorUpdateEvents []*mana.ManaVectorUpdateEvent) { for _, epochCommittableEvent := range epochCommittableEvents { m.Events.EpochCommittable.Trigger(epochCommittableEvent) } @@ -755,6 +765,15 @@ type ManagerOptions struct { MinCommittableEpochAge time.Duration BootstrapWindow time.Duration Log *logger.Logger + ManaEpochDelay uint +} + +// ManaEpochDelay specifies how many epochs the consensus mana booking is delayed with respect to the latest committable +// epoch. +func ManaEpochDelay(manaEpochDelay uint) ManagerOption { + return func(options *ManagerOptions) { + options.ManaEpochDelay = manaEpochDelay + } } // MinCommittableEpochAge specifies how old an epoch has to be for it to be committable. diff --git a/packages/core/notarization/proofs.go b/packages/core/notarization/proofs.go index 2f99403f87..caf121df1f 100644 --- a/packages/core/notarization/proofs.go +++ b/packages/core/notarization/proofs.go @@ -55,12 +55,16 @@ func (f *EpochCommitmentFactory) verifyRoot(proof CommitmentProof, key []byte, v // ProofStateRoot returns the merkle proof for the outputID against the state root. func (f *EpochCommitmentFactory) ProofStateRoot(ei epoch.Index, outID utxo.OutputID) (*CommitmentProof, error) { key := outID.Bytes() - root := f.commitmentTrees[ei].tangleTree.Root() - proof, err := f.stateRootTree.ProveForRoot(key, root) + root, exists := f.commitmentTrees.Get(ei) + if !exists { + return nil, errors.Errorf("could not obtain commitment trees for epoch %d", ei) + } + tangleRoot := root.tangleTree.Root() + proof, err := f.stateRootTree.ProveForRoot(key, tangleRoot) if err != nil { return nil, errors.Wrap(err, "could not generate the state root proof") } - return &CommitmentProof{ei, proof, root}, nil + return &CommitmentProof{ei, proof, tangleRoot}, nil } // ProofStateMutationRoot returns the merkle proof for the transactionID against the state mutation root. diff --git a/packages/core/notarization/storage.go b/packages/core/notarization/storage.go index 9fb5082cc0..de91610be4 100644 --- a/packages/core/notarization/storage.go +++ b/packages/core/notarization/storage.go @@ -6,6 +6,7 @@ import ( "github.com/cockroachdb/errors" "github.com/iotaledger/hive.go/core/generics/objectstorage" + "github.com/iotaledger/hive.go/core/generics/shrinkingmap" "github.com/iotaledger/hive.go/core/kvstore" "github.com/iotaledger/hive.go/core/kvstore/mapdb" @@ -26,7 +27,8 @@ type EpochCommitmentStorage struct { ecRecordStorage *objectstorage.ObjectStorage[*epoch.ECRecord] // Delta storages - epochDiffStorages map[epoch.Index]*epochDiffStorage + epochDiffStoragesMutex sync.Mutex + epochDiffStorages *shrinkingmap.ShrinkingMap[epoch.Index, *epochDiffStorage] // epochCommitmentStorageOptions is a dictionary for configuration parameters of the Storage. epochCommitmentStorageOptions *options @@ -62,7 +64,7 @@ func newEpochCommitmentStorage(options ...Option) (new *EpochCommitmentStorage) objectstorage.StoreOnCreation(true), ) - new.epochDiffStorages = make(map[epoch.Index]*epochDiffStorage) + new.epochDiffStorages = shrinkingmap.New[epoch.Index, *epochDiffStorage]() return new } @@ -83,10 +85,13 @@ func (s *EpochCommitmentStorage) shutdown() { s.shutdownOnce.Do(func() { s.ledgerstateStorage.Shutdown() s.ecRecordStorage.Shutdown() - for _, epochDiffStorage := range s.epochDiffStorages { + s.epochDiffStoragesMutex.Lock() + defer s.epochDiffStoragesMutex.Unlock() + s.epochDiffStorages.ForEach(func(_ epoch.Index, epochDiffStorage *epochDiffStorage) bool { epochDiffStorage.spent.Shutdown() epochDiffStorage.created.Shutdown() - } + return true + }) }) } @@ -137,7 +142,11 @@ func (s *EpochCommitmentStorage) setIndexFlag(flag string, ei epoch.Index) (err func (s *EpochCommitmentStorage) dropEpochDiffStorage(ei epoch.Index) { // TODO: properly drop (delete epoch bucketed) storage diffStorage := s.getEpochDiffStorage(ei) - delete(s.epochDiffStorages, ei) + + s.epochDiffStoragesMutex.Lock() + defer s.epochDiffStoragesMutex.Unlock() + + s.epochDiffStorages.Delete(ei) go func() { diffStorage.spent.Shutdown() diffStorage.created.Shutdown() @@ -145,7 +154,10 @@ func (s *EpochCommitmentStorage) dropEpochDiffStorage(ei epoch.Index) { } func (s *EpochCommitmentStorage) getEpochDiffStorage(ei epoch.Index) (diffStorage *epochDiffStorage) { - if epochDiffStorage, exists := s.epochDiffStorages[ei]; exists { + s.epochDiffStoragesMutex.Lock() + defer s.epochDiffStoragesMutex.Unlock() + + if epochDiffStorage, exists := s.epochDiffStorages.Get(ei); exists { return epochDiffStorage } @@ -174,7 +186,7 @@ func (s *EpochCommitmentStorage) getEpochDiffStorage(ei epoch.Index) (diffStorag ), } - s.epochDiffStorages[ei] = diffStorage + s.epochDiffStorages.Set(ei, diffStorage) return } diff --git a/packages/core/notarization/testutils.go b/packages/core/notarization/testutils.go index 27eb731a30..4596d829ca 100644 --- a/packages/core/notarization/testutils.go +++ b/packages/core/notarization/testutils.go @@ -12,6 +12,7 @@ import ( "github.com/iotaledger/goshimmer/packages/core/consensus/acceptance" "github.com/iotaledger/goshimmer/packages/core/ledger/utxo" + "github.com/iotaledger/goshimmer/packages/core/mana" ) const ( @@ -106,7 +107,7 @@ func (e *EventMock) EpochCommittable(event *EpochCommittableEvent) { } // ManaVectorUpdate is the mocked ManaVectorUpdate event. -func (e *EventMock) ManaVectorUpdate(event *ManaVectorUpdateEvent) { +func (e *EventMock) ManaVectorUpdate(event *mana.ManaVectorUpdateEvent) { e.Called(event.EI) atomic.AddUint64(&e.calledEvents, 1) } diff --git a/packages/node/p2p/neighbor.go b/packages/node/p2p/neighbor.go index ad757ea7c1..2ecfe2c022 100644 --- a/packages/node/p2p/neighbor.go +++ b/packages/node/p2p/neighbor.go @@ -106,9 +106,6 @@ func (n *Neighbor) readLoop() { packet := stream.packetFactory() err := stream.ReadPacket(packet) if err != nil { - if isTimeoutError(err) { - continue - } n.Log.Infow("Stream read packet error", "err", err) if disconnectErr := n.disconnect(); disconnectErr != nil { n.Log.Warnw("Failed to disconnect", "err", disconnectErr) diff --git a/packages/node/p2p/stream.go b/packages/node/p2p/stream.go index a383696e8f..b3e0cbcb39 100644 --- a/packages/node/p2p/stream.go +++ b/packages/node/p2p/stream.go @@ -112,8 +112,11 @@ func (m *Manager) acceptPeer(ctx context.Context, p *peer.Peer, opts []ConnectPe defer m.removeAcceptMatcher(am, protocolID) m.log.Debugw("waiting for incoming stream", "id", am.Peer.ID(), "proto", protocolID) + am.StreamChMutex.RLock() + streamCh := am.StreamCh[protocolID] + am.StreamChMutex.RUnlock() select { - case ps := <-am.StreamCh[protocolID]: + case ps := <-streamCh: if ps.Protocol() != protocolID { return nil, fmt.Errorf("accepted stream has wrong protocol: %s != %s", ps.Protocol(), protocolID) } @@ -205,10 +208,13 @@ func (m *Manager) handleStream(stream network.Stream) { return } am := m.matchNewStream(stream) - if am != nil { + am.StreamChMutex.RLock() + streamCh := am.StreamCh[protocolID] + am.StreamChMutex.RUnlock() + m.log.Debugw("incoming stream matched", "id", am.Peer.ID(), "proto", protocolID) - am.StreamCh[protocolID] <- ps + streamCh <- ps } else { // close the connection if not matched m.log.Debugw("unexpected connection", "addr", stream.Conn().RemoteMultiaddr(), @@ -220,9 +226,10 @@ func (m *Manager) handleStream(stream network.Stream) { // AcceptMatcher holds data to match an existing connection with a peer. type AcceptMatcher struct { - Peer *peer.Peer // connecting peer - Libp2pID libp2ppeer.ID - StreamCh map[protocol.ID]chan *PacketsStream + Peer *peer.Peer // connecting peer + Libp2pID libp2ppeer.ID + StreamChMutex sync.RWMutex + StreamCh map[protocol.ID]chan *PacketsStream } func (m *Manager) newAcceptMatcher(p *peer.Peer, protocolID protocol.ID) (*AcceptMatcher, error) { @@ -236,6 +243,8 @@ func (m *Manager) newAcceptMatcher(p *peer.Peer, protocolID protocol.ID) (*Accep acceptMatcher, acceptExists := m.acceptMap[libp2pID] if acceptExists { + acceptMatcher.StreamChMutex.Lock() + defer acceptMatcher.StreamChMutex.Unlock() if _, streamChanExists := acceptMatcher.StreamCh[protocolID]; streamChanExists { return nil, nil } @@ -260,10 +269,14 @@ func (m *Manager) removeAcceptMatcher(am *AcceptMatcher, protocolID protocol.ID) m.acceptMutex.Lock() defer m.acceptMutex.Unlock() - close(m.acceptMap[am.Libp2pID].StreamCh[protocolID]) - delete(m.acceptMap[am.Libp2pID].StreamCh, protocolID) + existingAm := m.acceptMap[am.Libp2pID] + existingAm.StreamChMutex.Lock() + defer existingAm.StreamChMutex.Unlock() - if len(m.acceptMap[am.Libp2pID].StreamCh) == 0 { + close(existingAm.StreamCh[protocolID]) + delete(existingAm.StreamCh, protocolID) + + if len(existingAm.StreamCh) == 0 { delete(m.acceptMap, am.Libp2pID) } } @@ -310,9 +323,6 @@ func NewPacketsStream(stream network.Stream, packetFactory func() proto.Message) func (ps *PacketsStream) WritePacket(message proto.Message) error { ps.writerLock.Lock() defer ps.writerLock.Unlock() - if err := ps.SetWriteDeadline(time.Now().Add(ioTimeout)); err != nil && !isDeadlineUnsupportedError(err) { - return errors.WithStack(err) - } err := ps.writer.WriteBlk(message) if err != nil { return errors.WithStack(err) @@ -325,9 +335,6 @@ func (ps *PacketsStream) WritePacket(message proto.Message) error { func (ps *PacketsStream) ReadPacket(message proto.Message) error { ps.readerLock.Lock() defer ps.readerLock.Unlock() - if err := ps.SetReadDeadline(time.Now().Add(ioTimeout)); err != nil && !isDeadlineUnsupportedError(err) { - return errors.WithStack(err) - } if err := ps.reader.ReadBlk(message); err != nil { return errors.WithStack(err) } diff --git a/packages/node/warpsync/syncing_dataflow.go b/packages/node/warpsync/syncing_dataflow.go index 5ce2482ba3..6fd8430e1f 100644 --- a/packages/node/warpsync/syncing_dataflow.go +++ b/packages/node/warpsync/syncing_dataflow.go @@ -122,7 +122,11 @@ func (m *Manager) epochVerifyCommand(params *syncingFlowParams, next dataflow.Ne func (m *Manager) epochProcessBlocksCommand(params *syncingFlowParams, next dataflow.Next[*syncingFlowParams]) (err error) { for _, blk := range params.epochBlocks { - m.blockProcessorFunc(blk, m.p2pManager.GetNeighborsByID([]identity.ID{params.peerID})[0].Peer) + neighbors := m.p2pManager.GetNeighborsByID([]identity.ID{params.peerID}) + if len(neighbors) != 1 { + return errors.Errorf("neighbor %s not peered anymore after receiving warpsynced block") + } + m.blockProcessorFunc(blk, neighbors[0].Peer) } return next(params) diff --git a/plugins/autopeering/discovery/parameters.go b/plugins/autopeering/discovery/parameters.go index d0cc4832c0..c1799f3faf 100644 --- a/plugins/autopeering/discovery/parameters.go +++ b/plugins/autopeering/discovery/parameters.go @@ -5,7 +5,7 @@ import "github.com/iotaledger/goshimmer/plugins/config" // ParametersDefinitionDiscovery contains the definition of configuration parameters used by the autopeering peer discovery. type ParametersDefinitionDiscovery struct { // NetworkVersion defines the config flag of the network version. - NetworkVersion uint32 `default:"63" usage:"autopeering network version"` + NetworkVersion uint32 `default:"64" usage:"autopeering network version"` // EntryNodes defines the config flag of the entry nodes. EntryNodes []string `default:"2PV5487xMw5rasGBXXWeqSi4hLz7r19YBt8Y1TGAsQbj@analysisentry-01.devnet.shimmer.iota.cafe:15626,5EDH4uY78EA6wrBkHHAVBWBMDt7EcksRq6pjzipoW15B@entry-0.devnet.tanglebay.com:14646,CAB87iQZR6BjBrCgEBupQJ4gpEBgvGKKv3uuGVRBKb4n@entry-1.devnet.tanglebay.com:14646" usage:"list of trusted entry nodes for auto peering"` diff --git a/plugins/banner/plugin.go b/plugins/banner/plugin.go index 5022125058..98da733f11 100644 --- a/plugins/banner/plugin.go +++ b/plugins/banner/plugin.go @@ -15,7 +15,7 @@ var ( Plugin = node.NewPlugin(PluginName, nil, node.Enabled, configure, run) // AppVersion version number - AppVersion = "v0.9.5" + AppVersion = "v0.9.6" // SimplifiedAppVersion is the version number without commit hash SimplifiedAppVersion = simplifiedVersion(AppVersion) ) diff --git a/plugins/blocklayer/mana_plugin.go b/plugins/blocklayer/mana_plugin.go index 30e3276f50..a2304c3d3f 100644 --- a/plugins/blocklayer/mana_plugin.go +++ b/plugins/blocklayer/mana_plugin.go @@ -6,7 +6,6 @@ import ( "sort" "time" - "github.com/iotaledger/goshimmer/packages/core/notarization" db_pkg "github.com/iotaledger/goshimmer/packages/node/database" "github.com/iotaledger/goshimmer/packages/node/p2p" "github.com/iotaledger/goshimmer/packages/node/shutdown" @@ -43,7 +42,7 @@ var ( storages map[mana.Type]*objectstorage.ObjectStorage[*mana.PersistableBaseMana] allowedPledgeNodes map[mana.Type]AllowedPledge onTransactionAcceptedClosure *event.Closure[*ledger.TransactionAcceptedEvent] - onManaVectorToUpdateClosure *event.Closure[*notarization.ManaVectorUpdateEvent] + onManaVectorToUpdateClosure *event.Closure[*mana.ManaVectorUpdateEvent] ) func init() { @@ -60,13 +59,8 @@ func configureManaPlugin(*node.Plugin) { manaLogger = logger.NewLogger(PluginName) onTransactionAcceptedClosure = event.NewClosure(func(event *ledger.TransactionAcceptedEvent) { onTransactionAccepted(event.TransactionID) }) - onManaVectorToUpdateClosure = event.NewClosure(func(event *notarization.ManaVectorUpdateEvent) { - manaVectorEI := event.EI - epoch.Index(ManaParameters.EpochDelay) - if manaVectorEI < 1 { - return - } - spent, created := deps.NotarizationMgr.GetEpochDiff(manaVectorEI) - baseManaVectors[mana.ConsensusMana].BookEpoch(created, spent) + onManaVectorToUpdateClosure = event.NewClosure(func(event *mana.ManaVectorUpdateEvent) { + baseManaVectors[mana.ConsensusMana].BookEpoch(event.Created, event.Spent) }) allowedPledgeNodes = make(map[mana.Type]AllowedPledge) diff --git a/plugins/blocklayer/notarization_plugin.go b/plugins/blocklayer/notarization_plugin.go index 680cbd2d48..0d286647ee 100644 --- a/plugins/blocklayer/notarization_plugin.go +++ b/plugins/blocklayer/notarization_plugin.go @@ -86,6 +86,7 @@ func newNotarizationManager(deps notarizationManagerDependencies) *notarization. deps.Tangle, notarization.MinCommittableEpochAge(NotarizationParameters.MinEpochCommittableAge), notarization.BootstrapWindow(NotarizationParameters.BootstrapWindow), + notarization.ManaEpochDelay(ManaParameters.EpochDelay), notarization.Log(Plugin.Logger())) } diff --git a/plugins/database/versioning.go b/plugins/database/versioning.go index a753565aa7..f44a90e8b3 100644 --- a/plugins/database/versioning.go +++ b/plugins/database/versioning.go @@ -11,7 +11,7 @@ import ( const ( // DBVersion defines the version of the database schema this version of GoShimmer supports. // Every time there's a breaking change regarding the stored data, this version flag should be adjusted. - DBVersion = 63 + DBVersion = 64 ) var ( diff --git a/plugins/faucet/connector.go b/plugins/faucet/connector.go index 74573123f3..db0c328440 100644 --- a/plugins/faucet/connector.go +++ b/plugins/faucet/connector.go @@ -1,6 +1,8 @@ package faucet import ( + "fmt" + "github.com/iotaledger/hive.go/core/types/confirmation" "github.com/pkg/errors" @@ -31,8 +33,10 @@ func (f *FaucetConnector) UnspentOutputs(addresses ...address.Address) (unspentO unspentOutputs = make(map[address.Address]map[utxo.OutputID]*wallet.Output) for _, addr := range addresses { + fmt.Println("> Getting unspent outputs for ", addr.Base58()) f.indexer.CachedAddressOutputMappings(addr.Address()).Consume(func(mapping *indexer.AddressOutputMapping) { f.tangle.Ledger.Storage.CachedOutput(mapping.OutputID()).Consume(func(output utxo.Output) { + fmt.Println("> > Found output ", output.String()) if typedOutput, ok := output.(devnetvm.Output); ok { f.tangle.Ledger.Storage.CachedOutputMetadata(typedOutput.ID()).Consume(func(outputMetadata *ledger.OutputMetadata) { if !outputMetadata.IsSpent() { @@ -57,7 +61,7 @@ func (f *FaucetConnector) UnspentOutputs(addresses ...address.Address) (unspentO }) }) } - + fmt.Printf("%+v\n", unspentOutputs) return } diff --git a/plugins/faucet/faucet.go b/plugins/faucet/faucet.go index a3c0825e36..f10b4116cb 100644 --- a/plugins/faucet/faucet.go +++ b/plugins/faucet/faucet.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/cockroachdb/errors" "github.com/iotaledger/hive.go/core/bitmask" "github.com/iotaledger/hive.go/core/identity" @@ -66,7 +67,7 @@ func (f *Faucet) handleFaucetRequest(p *faucet.Payload, ctx context.Context) (*d sendoptions.Context(ctx), ) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to send first transaction from %s to %s", f.Seed().Address(0).Base58(), f.Seed().Address(1).Base58()) } // send funds to requester @@ -78,5 +79,5 @@ func (f *Faucet) handleFaucetRequest(p *faucet.Payload, ctx context.Context) (*d sendoptions.WaitForConfirmation(true), sendoptions.Context(ctx), ) - return tx, err + return tx, errors.Wrapf(err, "failed to send second transaction from %s to %s", f.Seed().Address(1).Base58(), p.Address().Base58()) } diff --git a/tools/integration-tests/tester/framework/network.go b/tools/integration-tests/tester/framework/network.go index fb2ed16d16..ac2263782a 100644 --- a/tools/integration-tests/tester/framework/network.go +++ b/tools/integration-tests/tester/framework/network.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "sync" "time" "github.com/cockroachdb/errors" @@ -219,10 +220,13 @@ func (n *Network) Shutdown(ctx context.Context) error { // stop all peers in parallel var eg errgroup.Group + var exitStatusMutex sync.Mutex for _, peer := range n.peers { peer := peer // capture range variable eg.Go(func() error { status, err := peer.Shutdown(ctx) + exitStatusMutex.Lock() + defer exitStatusMutex.Unlock() exitStatus[peer.Name()] = status return err })