Skip to content

Commit

Permalink
Fix spammer state after error (#56)
Browse files Browse the repository at this point in the history
* Add linter settings

* Escape loop at error to not overwrite next outputState

* Set currentLedgerMilestoneIndex also if state was reset

* Use MaxUint32 to mark currentLedgerMilestoneIndex as invalid

* Fix usage of wrong context

* Release 1.0.0-beta.10
  • Loading branch information
muXxer authored Sep 21, 2022
1 parent d9359af commit 2ec7f8b
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 28 deletions.
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ linters-settings:
checks: ["all"]
stylecheck:
initialisms: ["ACL", "API", "ASCII", "CPU", "CSS", "DNS", "EOF", "GUID", "HTML", "HTTP", "HTTPS", "ID", "IP", "JSON", "QPS", "RAM", "RPC", "SLA", "SMTP", "SQL", "SSH", "TCP", "TLS", "TTL", "UDP", "UI", "GID", "UID", "UUID", "URI", "URL", "UTF8", "VM", "XML", "XMPP", "XSRF", "XSS", "SIP", "RTP", "AMQP", "DB", "TS"]
nlreturn:
# Size of the block (including return statement that is still "OK")
# so no return split required.
# Default: 1
block-size: 2

linters:
# Disable all linters.
Expand Down
2 changes: 1 addition & 1 deletion core/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
Name = "inx-spammer"

// Version of the app.
Version = "1.0.0-beta.9"
Version = "1.0.0-beta.10"
)

func App() *app.App {
Expand Down
88 changes: 61 additions & 27 deletions pkg/spammer/spammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package spammer
import (
"context"
"fmt"
"math"
"runtime"
"sort"
"strings"
Expand All @@ -17,7 +18,7 @@ import (
"github.com/iotaledger/hive.go/core/datastructure/timeheap"
"github.com/iotaledger/hive.go/core/events"
"github.com/iotaledger/hive.go/core/logger"
"github.com/iotaledger/hive.go/core/math"
hivemath "github.com/iotaledger/hive.go/core/math"
"github.com/iotaledger/hive.go/core/syncutils"
"github.com/iotaledger/inx-app/pow"
"github.com/iotaledger/inx-spammer/pkg/common"
Expand Down Expand Up @@ -185,8 +186,8 @@ type Spammer struct {
processID atomic.Uint32
spammerWaitGroup sync.WaitGroup

ledgerMilestoneIndex atomic.Uint32
currentLedgerMilestoneIndex uint32
ledgerMilestoneIndex *atomic.Uint32
currentLedgerMilestoneIndex *atomic.Uint32

Events *Events

Expand Down Expand Up @@ -288,11 +289,13 @@ func New(
SpamPerformed: events.NewEvent(SpamStatsCaller),
AvgSpamMetricsUpdated: events.NewEvent(AvgSpamMetricsCaller),
},
spammerAvgHeap: timeheap.NewTimeHeap(),
accountSender: accountSender,
accountReceiver: accountReceiver,
pendingTransactionsMap: make(map[iotago.BlockID]*pendingTransaction),
outputState: stateBasicOutputCreate,
spammerAvgHeap: timeheap.NewTimeHeap(),
accountSender: accountSender,
accountReceiver: accountReceiver,
ledgerMilestoneIndex: atomic.NewUint32(0),
currentLedgerMilestoneIndex: atomic.NewUint32(math.MaxUint32),
pendingTransactionsMap: make(map[iotago.BlockID]*pendingTransaction),
outputState: stateBasicOutputCreate,
}, nil
}

Expand Down Expand Up @@ -351,14 +354,14 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
return s.BuildTaggedDataBlockAndSend(ctx)
}

s.Lock()
defer s.Unlock()

if s.currentLedgerMilestoneIndex != s.ledgerMilestoneIndex.Load() {
if s.currentLedgerMilestoneIndex.Load() != s.ledgerMilestoneIndex.Load() {
// stop spamming if the ledger milestone has changed
return nil
}

s.Lock()
defer s.Unlock()

logDebugStateErrorFunc := func(state outputState, err error) {
s.LogDebugf("state: %d, %s failed: %s", state, outputStateNamesMap[s.outputState], err)
}
Expand All @@ -376,6 +379,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamSendBasicOutput {
if err := s.basicOutputSend(ctx, s.accountSender, s.accountSender, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -385,6 +389,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamSendBasicOutput {
if err := s.basicOutputSend(ctx, s.accountSender, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -394,6 +399,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamCreateAlias {
if err := s.aliasOutputCreate(ctx, s.accountSender, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -403,6 +409,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamCreateAlias {
if err := s.aliasOutputStateTransition(ctx, s.accountSender, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -412,6 +419,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamCreateAlias && s.valueSpamCreateFoundry {
if err := s.foundryOutputCreate(ctx, s.accountSender, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -421,6 +429,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamCreateAlias && s.valueSpamCreateFoundry && s.valueSpamMintNativeToken {
if err := s.foundryOutputMintNativeTokens(ctx, s.accountSender, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -430,6 +439,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamCreateAlias && s.valueSpamCreateFoundry && s.valueSpamMintNativeToken {
if err := s.basicOutputSendNativeTokens(ctx, s.accountSender, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -439,6 +449,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamCreateAlias {
if err := s.aliasOutputGovernanceTransition(ctx, s.accountSender, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -448,6 +459,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamCreateAlias && s.valueSpamCreateFoundry && s.valueSpamMintNativeToken && s.valueSpamMeltNativeToken {
if err := s.foundryOutputMeltNativeTokens(ctx, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -457,6 +469,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamCreateAlias && s.valueSpamCreateFoundry && s.valueSpamDestroyFoundry && (!s.valueSpamMintNativeToken || s.valueSpamMeltNativeToken) {
if err := s.foundryOutputDestroy(ctx, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -466,6 +479,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamCreateAlias && s.valueSpamDestroyAlias && ((!s.valueSpamCreateFoundry || s.valueSpamDestroyFoundry) && (!s.valueSpamMintNativeToken || s.valueSpamMeltNativeToken)) {
if err := s.aliasOutputDestroy(ctx, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -475,6 +489,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamCreateNFT {
if err := s.nftOutputCreate(ctx, s.accountSender, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -484,6 +499,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamCreateNFT {
if err := s.nftOutputSend(ctx, s.accountSender, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -493,6 +509,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamCreateNFT && s.valueSpamDestroyNFT {
if err := s.nftOutputDestroy(ctx, s.accountReceiver, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand All @@ -502,6 +519,7 @@ func (s *Spammer) doSpam(ctx context.Context, currentProcessID uint32) error {
if s.valueSpamCollectBasicOutput {
if err := s.basicOutputSend(ctx, s.accountReceiver, s.accountSender, outputStateNamesMap[s.outputState]); err != nil {
logDebugStateErrorFunc(s.outputState, err)
return nil
}
executed = true
}
Expand Down Expand Up @@ -802,7 +820,7 @@ func (s *Spammer) MeasureSpammerMetrics() {
}

sentSpamBlocks := s.spammerMetrics.SentSpamBlocks.Load()
newBlocks := math.Uint32Diff(sentSpamBlocks, s.lastSentSpamBlocks)
newBlocks := hivemath.Uint32Diff(sentSpamBlocks, s.lastSentSpamBlocks)
s.lastSentSpamBlocks = sentSpamBlocks

s.spammerAvgHeap.Add(uint64(newBlocks))
Expand Down Expand Up @@ -924,38 +942,50 @@ func (s *Spammer) ApplyNewLedgerUpdate(ctx context.Context, msIndex iotago.Miles
checkPendingBlockMetadata(pendingTx)
}

if conflicting {
// there was a conflict in the chain
// it may happen that after applying all ledger changes, there are no known outputs left.
// this mostly happens after a conflict, because updating the local state after a conflict
// may return outputs that are confirmed in the next milestone,
// but there are no new outputs created by the spammer.
// in this case we query the indexer again to get the latest state.
stateEmpty := s.accountSender.Empty() && s.accountReceiver.Empty() && s.isValueSpamEnabled

if conflicting || stateEmpty {
// there was a conflict in the chain, or the accounts were empty anyway
s.resetSpammerState()

// wait until the indexer got updated
if err := s.waitForIndexerUpdate(ctx, msIndex); err != nil {
return err
}
} else {
// we only allow the spammer to create new transactions if there was no conflict in the last milestone.
s.currentLedgerMilestoneIndex = s.ledgerMilestoneIndex.Load()
}

// it may happen that after applying all ledger changes, there are no known outputs left.
// this mostly happens after a conflict, because updating the local state after a conflict
// may return outputs that are confirmed in the next milestone,
// but there are no new outputs created by the spammer.
// in this case we query the indexer again to get the latest state.
if s.accountSender.Empty() && s.accountReceiver.Empty() && s.isValueSpamEnabled {
if conflicting {
s.LogDebug("conflict detected, fetching current spammer ledger state from indexer...")
} else {
s.LogDebug("accounts empty, fetching current spammer ledger state from indexer...")
}

if err := s.getCurrentSpammerLedgerState(ctx); err != nil {
// there was an error getting the current ledger state
s.resetSpammerState()

s.LogWarnf("failed to get current spammer ledger state: %s", err.Error())

return err
}
}

// we only allow the spammer to create new transactions if there was no conflict in the last milestone
// or if the spammer ledger state was successfully fetched.
s.currentLedgerMilestoneIndex.Store(s.ledgerMilestoneIndex.Load())

return nil
}

// resetSpammerState resets the spammer state in case of a conflict.
func (s *Spammer) resetSpammerState() {
// set the current ledger milestone index to MaxUint32 to stop all ongoing spammers with the old ledger state
// until the correct state was fetched by the indexer at the next milestone.
s.currentLedgerMilestoneIndex.Store(math.MaxUint32)

// forget all known outputs
s.accountSender.ResetOutputs()
s.accountReceiver.ResetOutputs()
Expand All @@ -980,7 +1010,7 @@ func (s *Spammer) waitForIndexerUpdate(ctx context.Context, msIndex iotago.Miles
for ctxWaitForUpdate.Err() == nil {

// we create a dummy call to check for the ledger index in the result
result, err := s.indexer.Outputs(ctx, &nodeclient.BasicOutputsQuery{
result, err := s.indexer.Outputs(ctxWaitForUpdate, &nodeclient.BasicOutputsQuery{
IndexerCursorParas: nodeclient.IndexerCursorParas{
PageSize: 1,
},
Expand All @@ -995,6 +1025,10 @@ func (s *Spammer) waitForIndexerUpdate(ctx context.Context, msIndex iotago.Miles
}
}

if result.Error != nil {
return fmt.Errorf("waitForIndexerUpdate failed, error: %w", result.Error)
}

// short sleep time to reduce load on the indexer
time.Sleep(20 * time.Millisecond)
}
Expand Down

0 comments on commit 2ec7f8b

Please sign in to comment.