diff --git a/.golangci.yml b/.golangci.yml index 33298a0..328d2b3 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -18,6 +18,11 @@ linters-settings: checks: ["all"] stylecheck: initialisms: ["ACL", "API", "ASCII", "CPU", "CSS", "DNS", "EOF", "GUID", "HTML", "HTTP", "HTTPS", "ID", "IP", "JSON", "QPS", "RAM", "RPC", "SLA", "SMTP", "SQL", "SSH", "TCP", "TLS", "TTL", "UDP", "UI", "GID", "UID", "UUID", "URI", "URL", "UTF8", "VM", "XML", "XMPP", "XSRF", "XSS", "SIP", "RTP", "AMQP", "DB", "TS"] + nlreturn: + # Size of the block (including return statement that is still "OK") + # so no return split required. + # Default: 1 + block-size: 2 linters: # Disable all linters. diff --git a/core/app/app.go b/core/app/app.go index 8235760..74c119a 100644 --- a/core/app/app.go +++ b/core/app/app.go @@ -14,7 +14,7 @@ var ( Name = "inx-spammer" // Version of the app. - Version = "1.0.0-beta.9" + Version = "1.0.0-beta.10" ) func App() *app.App { diff --git a/pkg/spammer/spammer.go b/pkg/spammer/spammer.go index 7f3d34e..f8e16bd 100644 --- a/pkg/spammer/spammer.go +++ b/pkg/spammer/spammer.go @@ -3,6 +3,7 @@ package spammer import ( "context" "fmt" + "math" "runtime" "sort" "strings" @@ -17,7 +18,7 @@ import ( "github.com/iotaledger/hive.go/core/datastructure/timeheap" "github.com/iotaledger/hive.go/core/events" "github.com/iotaledger/hive.go/core/logger" - "github.com/iotaledger/hive.go/core/math" + hivemath "github.com/iotaledger/hive.go/core/math" "github.com/iotaledger/hive.go/core/syncutils" "github.com/iotaledger/inx-app/pow" "github.com/iotaledger/inx-spammer/pkg/common" @@ -185,8 +186,8 @@ type Spammer struct { processID atomic.Uint32 spammerWaitGroup sync.WaitGroup - ledgerMilestoneIndex atomic.Uint32 - currentLedgerMilestoneIndex uint32 + ledgerMilestoneIndex *atomic.Uint32 + currentLedgerMilestoneIndex *atomic.Uint32 Events *Events @@ -288,11 +289,13 @@ func New( SpamPerformed: events.NewEvent(SpamStatsCaller), AvgSpamMetricsUpdated: events.NewEvent(AvgSpamMetricsCaller), }, - spammerAvgHeap: timeheap.NewTimeHeap(), - accountSender: accountSender, - accountReceiver: accountReceiver, - pendingTransactionsMap: make(map[iotago.BlockID]*pendingTransaction), - outputState: stateBasicOutputCreate, + spammerAvgHeap: timeheap.NewTimeHeap(), + accountSender: accountSender, + accountReceiver: accountReceiver, + ledgerMilestoneIndex: atomic.NewUint32(0), + currentLedgerMilestoneIndex: atomic.NewUint32(math.MaxUint32), + pendingTransactionsMap: make(map[iotago.BlockID]*pendingTransaction), + outputState: stateBasicOutputCreate, }, nil } @@ -351,14 +354,14 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { return s.BuildTaggedDataBlockAndSend(ctx) } - s.Lock() - defer s.Unlock() - - if s.currentLedgerMilestoneIndex != s.ledgerMilestoneIndex.Load() { + if s.currentLedgerMilestoneIndex.Load() != s.ledgerMilestoneIndex.Load() { // stop spamming if the ledger milestone has changed return nil } + s.Lock() + defer s.Unlock() + logDebugStateErrorFunc := func(state outputState, err error) { s.LogDebugf("state: %d, %s failed: %s", state, outputStateNamesMap[s.outputState], err) } @@ -376,6 +379,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamSendBasicOutput { if err := s.basicOutputSend(ctx, s.accountSender, s.accountSender, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -385,6 +389,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamSendBasicOutput { if err := s.basicOutputSend(ctx, s.accountSender, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -394,6 +399,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamCreateAlias { if err := s.aliasOutputCreate(ctx, s.accountSender, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -403,6 +409,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamCreateAlias { if err := s.aliasOutputStateTransition(ctx, s.accountSender, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -412,6 +419,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamCreateAlias && s.valueSpamCreateFoundry { if err := s.foundryOutputCreate(ctx, s.accountSender, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -421,6 +429,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamCreateAlias && s.valueSpamCreateFoundry && s.valueSpamMintNativeToken { if err := s.foundryOutputMintNativeTokens(ctx, s.accountSender, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -430,6 +439,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamCreateAlias && s.valueSpamCreateFoundry && s.valueSpamMintNativeToken { if err := s.basicOutputSendNativeTokens(ctx, s.accountSender, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -439,6 +449,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamCreateAlias { if err := s.aliasOutputGovernanceTransition(ctx, s.accountSender, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -448,6 +459,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamCreateAlias && s.valueSpamCreateFoundry && s.valueSpamMintNativeToken && s.valueSpamMeltNativeToken { if err := s.foundryOutputMeltNativeTokens(ctx, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -457,6 +469,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamCreateAlias && s.valueSpamCreateFoundry && s.valueSpamDestroyFoundry && (!s.valueSpamMintNativeToken || s.valueSpamMeltNativeToken) { if err := s.foundryOutputDestroy(ctx, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -466,6 +479,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamCreateAlias && s.valueSpamDestroyAlias && ((!s.valueSpamCreateFoundry || s.valueSpamDestroyFoundry) && (!s.valueSpamMintNativeToken || s.valueSpamMeltNativeToken)) { if err := s.aliasOutputDestroy(ctx, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -475,6 +489,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamCreateNFT { if err := s.nftOutputCreate(ctx, s.accountSender, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -484,6 +499,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamCreateNFT { if err := s.nftOutputSend(ctx, s.accountSender, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -493,6 +509,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamCreateNFT && s.valueSpamDestroyNFT { if err := s.nftOutputDestroy(ctx, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -502,6 +519,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error { if s.valueSpamCollectBasicOutput { if err := s.basicOutputSend(ctx, s.accountReceiver, s.accountSender, outputStateNamesMap[s.outputState]); err != nil { logDebugStateErrorFunc(s.outputState, err) + return nil } executed = true } @@ -802,7 +820,7 @@ func (s *Spammer) MeasureSpammerMetrics() { } sentSpamBlocks := s.spammerMetrics.SentSpamBlocks.Load() - newBlocks := math.Uint32Diff(sentSpamBlocks, s.lastSentSpamBlocks) + newBlocks := hivemath.Uint32Diff(sentSpamBlocks, s.lastSentSpamBlocks) s.lastSentSpamBlocks = sentSpamBlocks s.spammerAvgHeap.Add(uint64(newBlocks)) @@ -924,38 +942,50 @@ func (s *Spammer) ApplyNewLedgerUpdate(ctx context.Context, msIndex iotago.Miles checkPendingBlockMetadata(pendingTx) } - if conflicting { - // there was a conflict in the chain + // it may happen that after applying all ledger changes, there are no known outputs left. + // this mostly happens after a conflict, because updating the local state after a conflict + // may return outputs that are confirmed in the next milestone, + // but there are no new outputs created by the spammer. + // in this case we query the indexer again to get the latest state. + stateEmpty := s.accountSender.Empty() && s.accountReceiver.Empty() && s.isValueSpamEnabled + + if conflicting || stateEmpty { + // there was a conflict in the chain, or the accounts were empty anyway s.resetSpammerState() // wait until the indexer got updated if err := s.waitForIndexerUpdate(ctx, msIndex); err != nil { return err } - } else { - // we only allow the spammer to create new transactions if there was no conflict in the last milestone. - s.currentLedgerMilestoneIndex = s.ledgerMilestoneIndex.Load() - } - // it may happen that after applying all ledger changes, there are no known outputs left. - // this mostly happens after a conflict, because updating the local state after a conflict - // may return outputs that are confirmed in the next milestone, - // but there are no new outputs created by the spammer. - // in this case we query the indexer again to get the latest state. - if s.accountSender.Empty() && s.accountReceiver.Empty() && s.isValueSpamEnabled { + if conflicting { + s.LogDebug("conflict detected, fetching current spammer ledger state from indexer...") + } else { + s.LogDebug("accounts empty, fetching current spammer ledger state from indexer...") + } + if err := s.getCurrentSpammerLedgerState(ctx); err != nil { // there was an error getting the current ledger state s.resetSpammerState() - s.LogWarnf("failed to get current spammer ledger state: %s", err.Error()) + + return err } } + // we only allow the spammer to create new transactions if there was no conflict in the last milestone + // or if the spammer ledger state was successfully fetched. + s.currentLedgerMilestoneIndex.Store(s.ledgerMilestoneIndex.Load()) + return nil } // resetSpammerState resets the spammer state in case of a conflict. func (s *Spammer) resetSpammerState() { + // set the current ledger milestone index to MaxUint32 to stop all ongoing spammers with the old ledger state + // until the correct state was fetched by the indexer at the next milestone. + s.currentLedgerMilestoneIndex.Store(math.MaxUint32) + // forget all known outputs s.accountSender.ResetOutputs() s.accountReceiver.ResetOutputs() @@ -980,7 +1010,7 @@ func (s *Spammer) waitForIndexerUpdate(ctx context.Context, msIndex iotago.Miles for ctxWaitForUpdate.Err() == nil { // we create a dummy call to check for the ledger index in the result - result, err := s.indexer.Outputs(ctx, &nodeclient.BasicOutputsQuery{ + result, err := s.indexer.Outputs(ctxWaitForUpdate, &nodeclient.BasicOutputsQuery{ IndexerCursorParas: nodeclient.IndexerCursorParas{ PageSize: 1, }, @@ -995,6 +1025,10 @@ func (s *Spammer) waitForIndexerUpdate(ctx context.Context, msIndex iotago.Miles } } + if result.Error != nil { + return fmt.Errorf("waitForIndexerUpdate failed, error: %w", result.Error) + } + // short sleep time to reduce load on the indexer time.Sleep(20 * time.Millisecond) }